Padrões práticos

TL;DR

Recipes do dia a dia: line parser, CSV → JSONL, multipart upload, fetch streaming e stream tee. Cada um em sua sub-seção; foco em “este é o pattern, copie e adapte”. Quando a lógica for simples, implemente na mão. Quando o formato for complexo (multipart, CSV com quoting, logs estruturados), use uma lib madura como csv-parser ou busboy.


O que é

Esta nota é um catálogo de padrões recorrentes de streams em produção. Não é referência de API — é receita. Cada padrão tem um code sample completo e uma nota de armadilha.

Padrões cobertos:

#PadrãoCaso de uso típico
1Line parserProcessar arquivos de log ou NDJSON linha a linha
2CSV → JSONLConverter dump de banco em formato consumível por outras ferramentas
3Multipart upload streamingReceber upload de arquivo grande sem explodir a RAM
4Fetch streamingConsumir LLM SSE, downloads grandes, ou APIs de streaming
5Stream teeEnviar os mesmos bytes para dois destinos simultâneos
6Multiplexing N streamsConcatenar várias fontes em um único stream de saída

Padrão 1: Line parser

Um Transform que acumula chunks num buffer interno e emite uma linha completa a cada \n. O detalhe crítico é o método _flush: ele garante que a última linha — que pode chegar sem \n final — não seja descartada.

// line-parser.js
import { Transform } from 'node:stream';
 
class LineParser extends Transform {
  constructor(options = {}) {
    super({ ...options, objectMode: true });
    this._buffer = '';
  }
 
  _transform(chunk, _encoding, callback) {
    this._buffer += chunk.toString();
    const lines = this._buffer.split('\n');
    // A última parte pode ser incompleta — guarda pro próximo chunk
    this._buffer = lines.pop();
    for (const line of lines) {
      if (line.trim()) this.push(line);
    }
    callback();
  }
 
  _flush(callback) {
    // Emite o que sobrou no buffer (última linha sem \n)
    if (this._buffer.trim()) this.push(this._buffer);
    this._buffer = '';
    callback();
  }
}
 
export { LineParser };

Uso:

import { createReadStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { LineParser } from './line-parser.js';
import { Writable } from 'node:stream';
 
await pipeline(
  createReadStream('access.log'),
  new LineParser(),
  new Writable({
    objectMode: true,
    write(line, _enc, cb) {
      console.log('linha:', line);
      cb();
    },
  })
);

Armadilha

Sem _flush, a última linha do arquivo — se não terminar com \n — fica presa no _buffer e nunca é emitida. Sempre implemente _flush.


Padrão 2: CSV → JSONL

Compose de Transforms em pipeline: LineParser → separação por vírgula → JSON.stringify → arquivo JSONL. A ideia é que cada Transform faça uma única coisa.

// csv-to-jsonl.js
import { createReadStream, createWriteStream } from 'node:fs';
import { Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { LineParser } from './line-parser.js';
 
// Transform: string de linha CSV → objeto JS
class CsvRowToObject extends Transform {
  constructor() {
    super({ objectMode: true, readableObjectMode: true, writableObjectMode: true });
    this._headers = null;
  }
 
  _transform(line, _enc, callback) {
    const cols = line.split(',').map((c) => c.trim());
    if (!this._headers) {
      this._headers = cols; // primeira linha = cabeçalho
    } else {
      const obj = Object.fromEntries(this._headers.map((h, i) => [h, cols[i]]));
      this.push(obj);
    }
    callback();
  }
}
 
// Transform: objeto JS → string JSON + newline
const toJsonl = new Transform({
  writableObjectMode: true,
  transform(obj, _enc, callback) {
    callback(null, JSON.stringify(obj) + '\n');
  },
});
 
await pipeline(
  createReadStream('dados.csv'),
  new LineParser(),
  new CsvRowToObject(),
  toJsonl,
  createWriteStream('saida.jsonl')
);
 
console.log('Conversão concluída.');

Quando usar lib

CsvRowToObject acima não trata aspas, escapes ou valores multilinhas. Para CSV real (Excel exports, dumps de banco), use csv-parser — um Transform stream que faz isso a ~90 000 linhas/s e passa no csv-spectrum test suite:

import csv from 'csv-parser';
import { createReadStream } from 'node:fs';
createReadStream('dados.csv').pipe(csv()).on('data', (row) => console.log(row));

Padrão 3: Multipart upload streaming

Imagine um endpoint que recebe upload de vídeos grandes. Bufferizar o req inteiro antes de processar explode a RAM e não escala. A solução é usar busboy: um Writable que parseia multipart/form-data chunk a chunk, emitindo cada arquivo como um Readable stream.

// upload-route.js  (Express)
import busboy from 'busboy';
import { createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
 
app.post('/upload', (req, res) => {
  const bb = busboy({ headers: req.headers });
 
  bb.on('file', async (fieldname, fileStream, info) => {
    const { filename } = info;
    console.log(`Recebendo: ${filename}`);
 
    try {
      // O fileStream é um Readable — pipe direto pro disco (ou S3, ou GCS)
      await pipeline(fileStream, createWriteStream(`/tmp/${filename}`));
      console.log(`Salvo: ${filename}`);
    } catch (err) {
      console.error('Erro ao salvar arquivo:', err);
      fileStream.resume(); // drena o stream mesmo em erro
    }
  });
 
  bb.on('field', (name, value) => {
    console.log(`Campo: ${name} = ${value}`);
  });
 
  bb.on('close', () => res.json({ status: 'ok' }));
  bb.on('error', (err) => res.status(500).json({ error: err.message }));
 
  req.pipe(bb);
});

Armadilha

Se o fileStream não for consumido (nem por pipeline, nem por .resume()), o busboy trava e nunca emite o evento close. A requisição fica pendurada para sempre.


Padrão 4: Fetch streaming

fetch() retorna response.body como um ReadableStream da Web Streams API. Bom para consumir respostas grandes (downloads, LLM streaming, SSE) sem bufferizar tudo em memória.

// fetch-streaming.js
const url = 'https://example.com/large-file.ndjson';
const response = await fetch(url);
 
if (!response.ok) throw new Error(`HTTP ${response.status}`);
 
// response.body é um ReadableStream (Web Streams) — iterável com for-await
let lineBuffer = '';
const decoder = new TextDecoder();
 
for await (const chunk of response.body) {
  lineBuffer += decoder.decode(chunk, { stream: true });
  const lines = lineBuffer.split('\n');
  lineBuffer = lines.pop(); // última parte pode estar incompleta
 
  for (const line of lines) {
    if (line.trim()) {
      const obj = JSON.parse(line);
      console.log(obj);
    }
  }
}
 
// Último fragmento (se houver)
if (lineBuffer.trim()) console.log(JSON.parse(lineBuffer));

Para integrar com Node Streams (ex: passar por um pipeline), converta com Readable.fromWeb():

import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';
 
const response = await fetch(url);
const nodeReadable = Readable.fromWeb(response.body);
 
await pipeline(
  nodeReadable,
  new LineParser(),
  // ... demais estágios
);

Caso de uso: LLM streaming

APIs como a da Anthropic ou OpenAI retornam SSE via response.body. O loop for await processa cada token à medida que chega, sem esperar a resposta completa — essencial para UI responsiva.


Padrão 5: Stream tee

tee = bifurcar um stream para dois consumidores. Útil para “salvar no disco E enviar para S3 ao mesmo tempo”, ou “processar E logar simultaneamente”.

A forma mais pragmática em Node Streams é um PassThrough:

// stream-tee.js
import { createReadStream, createWriteStream } from 'node:fs';
import { PassThrough } from 'node:stream';
import { pipeline } from 'node:stream/promises';
 
async function teeToTwoSinks(sourcePath, sink1Path, sink2Path) {
  const source = createReadStream(sourcePath);
  const passthrough = new PassThrough();
 
  // Inicia os dois pipelines a partir do PassThrough
  const p1 = pipeline(passthrough, createWriteStream(sink1Path));
  const p2 = pipeline(passthrough, createWriteStream(sink2Path));
 
  // Alimenta o PassThrough com a fonte
  source.pipe(passthrough);
 
  // Aguarda ambos os destinos terminarem
  await Promise.all([p1, p2]);
}
 
await teeToTwoSinks('video-original.mp4', '/tmp/copia-local.mp4', '/tmp/copia-backup.mp4');

Para Web Streams, a API tem .tee() nativo:

const [branch1, branch2] = response.body.tee();
// branch1 e branch2 são ReadableStreams independentes

Armadilha

Se os dois consumidores têm velocidades muito diferentes, o mais lento aplica backpressure sobre o PassThrough, que por sua vez freia a fonte. O stream mais rápido fica bloqueado esperando o mais lento. Se isso for um problema, use um buffer explícito no consumidor lento, ou aceite que o rápido vai esperar.


Padrão 6 (bônus): Multiplexing N streams em 1

Concatenar múltiplas fontes em um único stream de saída — útil para “servir vários arquivos como um único body de resposta”, ou “concatenar logs de múltiplos serviços”.

// merge-streams.js
import { Readable, PassThrough } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { createReadStream } from 'node:fs';
 
/**
 * Concatena N Readable streams em sequência num único stream de saída.
 * Cada fonte é drenada por completo antes de iniciar a próxima.
 */
async function mergeSequential(sources, destination) {
  for (const source of sources) {
    await pipeline(source, destination, { end: false }); // não fecha o destino entre fontes
  }
  destination.end(); // fecha só no final
}
 
const files = ['parte1.log', 'parte2.log', 'parte3.log'].map(createReadStream);
const output = createWriteStream('merged.log');
 
await mergeSequential(files, output);
console.log('Arquivos concatenados.');

Para merge concorrente (intercalar chunks de N fontes sem ordem garantida), use stream.addListener('data') em cada fonte e empurre tudo para um PassThrough compartilhado — mas atenção ao gerenciamento de end: só feche o destino quando todas as fontes encerrarem.


Na prática

Quando implementar na mão:

  • A lógica é simples (line parser, JSON stringify, contador de bytes).
  • O formato é trivial (NDJSON, texto, binary blob sem framing).
  • Zero dependências é um requisito.

Quando usar uma lib:

NecessidadeLib
CSV com quoting, escapes, BOMcsv-parser
Multipart / form-databusboy
Logging estruturado de alta performancepino
Gzip/brotlinode:zlib (built-in)
Criptografianode:crypto (built-in)

A regra prática: se o formato tem uma spec (RFC, MIME type, W3C), existe uma lib madura para ele. Não reimplemente multipart na mão.


Armadilhas

Armadilha 1: Line parser sem _flush → última linha perdida

O _buffer interno guarda o fragmento incompleto entre chunks. Se _flush não for implementado, esse fragmento nunca é emitido. Arquivos sem \n final — comum em logs — perdem a última entrada silenciosamente.

Armadilha 2: Multipart sem stream → buffer everything no body

Usar express.json() ou body-parser em rotas de upload bufferiza o corpo inteiro antes de passar para o handler. Um upload de 2 GB usa 2 GB de RAM por requisição. Use busboy (ou multer, que usa busboy internamente) para processar chunk a chunk.

Armadilha 3: Tee com consumidores de velocidades muito diferentes

O PassThrough aplica backpressure de ambos os consumers. O consumer lento segura o rápido. Se um dos destinos for uma rede lenta (S3 via conexão ruim) e o outro for disco local rápido, o disco vai esperar a rede. Avalie se processamento sequencial (primeiro disco, depois S3) seria mais simples e aceitável.

Armadilha 4: fileStream não consumido no busboy

Se o handler do evento file não consumir o fileStream (nem pipe, nem .resume()), o busboy para de parsear o body e o evento close nunca dispara. A requisição trava.

Armadilha 5: TextDecoder sem { stream: true } em fetch streaming

Sem a opção stream: true, o decoder trata cada chunk como um texto completo. Caracteres multibyte (UTF-8 de 2–4 bytes) que chegam partidos entre dois chunks são decodificados errado. Sempre passe { stream: true } no loop e { stream: false } (ou nenhum flag) na chamada final.


Em entrevista

Frase pronta:

“Common stream patterns in production: a line parser is a Transform with an internal buffer that splits chunks on newlines, with _flush to emit any partial last line. CSV-to-JSONL is just a pipeline of Transforms — line parser, CSV split, JSON.stringify, write to file with newlines. For multipart uploads, libraries like busboy give you Transforms that parse the body chunk by chunk without buffering. For fetch() streaming, the response body is a Web Stream that you can iterate with for await of. For sending the same data to multiple sinks, tee() or PassThrough clones the stream.”

Vocabulário:

PT-BREN
analisador de linhasline parser
buffer internointernal buffer
upload multipartmultipart upload
streaming de fetchfetch streaming
bifurcação de streamstream tee
multiplexaçãomultiplexing
modo objetoobject mode
descarte / drenagemdrain / resume

Perguntas que podem vir:

  • “Como você processaria um CSV de 10 GB sem estourar a memória?” → Pipeline: createReadStreamcsv-parser (Transform) → Transform de processamento → createWriteStream. Nunca fs.readFileSync.

  • “Como você implementaria upload de arquivo grande no Express?”busboy pipeado do req, com o fileStream de cada arquivo pipeado para o destino final (S3 via SDK, disco via fs).

  • “Como você consumiria streaming de um LLM?”fetch()for await (const chunk of response.body) → decodificar com TextDecoder({ stream: true }) → exibir token a token.


Veja também

  • [[03 - Readable streams]]
  • [[04 - Writable streams]]
  • [[05 - Duplex e Transform]]
  • [[07 - pipeline vs pipe - error handling]]
  • [[08 - Async iteration de streams]]
  • [[09 - Web Streams - interop com padrão universal]]
  • [[Node.js]] (tronco)