2023-02-03 14:06:24 +01:00

62 lines
2.1 KiB
TypeScript

import { PassThrough, Readable, TransformCallback, Writable } from "stream";
import { pipeline } from "stream/promises";
import { deflateRaw as deflateRawCb, createDeflateRaw } from "zlib";
import { promisify } from "util";
import { crc32 } from "./crc32";
import tee from "./tee";
const deflateRaw = promisify(deflateRawCb);
/**
* A stream transformer that records the number of bytes
* passed in its `size` property.
*/
class ByteCounter extends PassThrough {
size: number = 0;
_transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback) {
if ("length" in chunk) this.size += chunk.length;
super._transform(chunk, encoding, callback);
}
}
/**
* @param data buffer containing the data to be compressed
* @returns a buffer containing the compressed/deflated data and the crc32 checksum
* of the source data
*/
export async function deflateBuffer(data: Buffer) {
const [deflated, checksum] = await Promise.all([deflateRaw(data), crc32(data)]);
return { deflated, crc32: checksum };
}
/**
* @param input a byte stream, containing data to be compressed
* @param sink a method that will accept chunks of compressed data; We don't pass
* a writable here, since we don't want the writablestream to be closed after
* a single file
* @returns a promise, which will resolve with the crc32 checksum and the
* compressed size
*/
export async function deflateStream(input: Readable, sink: (chunk: Buffer) => void) {
const deflateWriter = new Writable({
write(chunk, _, callback) {
sink(chunk);
callback();
}
});
// tee the input stream, so we can compress and calc crc32 in parallel
const [rs1, rs2] = tee(input);
const byteCounter = new ByteCounter();
const [_, crc] = await Promise.all([
// pipe input into zip compressor, count the bytes
// returned and pass compressed data to the sink
pipeline(rs1, createDeflateRaw(), byteCounter, deflateWriter),
// calc checksum
crc32(rs2)
]);
return { crc32: crc, compressedSize: byteCounter.size };
}