npm i @msiviero/stream-utils
Typedocs with full documentation at https://msiviero.github.io/stream-utils
The file is processed in chunks, so the memory usage is linear. Plus the backpressure mechanism is built-in
import { createReadStream } from "fs";
import { Collect, Count, Distinct, Filter, Map, Splitter } from "@msiviero/stream-utils";
createReadStream("./data/bigfile.txt")
.pipe(new Splitter({ separator: "\n" }))
.pipe(new Map((line: Buffer) => line.toString("utf8")))
.pipe(new Distinct((line: string) => line))
.pipe(new Map((line: string) => line.split(";")))
.pipe(new Filter((columns: string[]) => columns[0] === "to_keep"))
.pipe(new Count())
.pipe(new Collect())
.on("close", (items: number[]) => {
console.log(`Count of remaining records is: ${items[0]}`);
});
A node.js server's request is a readable stream, so you can receive a large request body and processing can be done just in time chunk per chunk, without having to fit it in memory
import { Collect, Count, Distinct, Map, Splitter } from "@msiviero/stream-utils";
import { createServer } from "http";
createServer((request, response) => {
request
.pipe(new Splitter({ separator: "\n" }))
.pipe(new Map((line: Buffer) => line.toString("utf8")))
.pipe(new Distinct((line: string) => line))
.pipe(new Count())
.pipe(new Collect())
.on("close", (items: number[]) => {
response.end(`Request body contains ${items[0]} lines`);
});
})
.listen(9000);
That can be used as a source or sink for transformations provided by this package
An example of http server that receives a text body, parse it, deduplicates lines and then creates a gzip file with the content
import { Distinct, Map, Splitter } from "@msiviero/stream-utils";
import { createWriteStream } from "fs";
import { createServer } from "http";
import { createGzip } from "zlib";
createServer((request, response) => {
request
.pipe(new Splitter({ separator: "\n" }))
.pipe(new Map((line: Buffer) => line.toString("utf8")))
.pipe(new Distinct((line: string) => line))
.pipe(new Map((line: string) => `${line}\n`))
.pipe(createGzip())
.pipe(createWriteStream("request.gz"))
.on("close", () => response.end());
})
.listen(9000);
An example of http server that receives a gzipped file as body, parse it, the distinct lines and then pipes to the response stream
import { Count, Distinct, Map, Splitter } from "@msiviero/stream-utils";
import { createServer } from "http";
import { createGunzip } from "zlib";
createServer((request, response) => {
request
.pipe(createGunzip())
.pipe(new Splitter({ separator: "\n" }))
.pipe(new Map((line: Buffer) => line.toString("utf8")))
.pipe(new Distinct((line: string) => line))
.pipe(new Count())
.pipe(new Map((items: [number]) => Buffer.from(`${items}`))) // serialize item
.pipe(response);
})
.listen(9000);
Generated using TypeDoc