function createTransformStream(filter) {
var transform = new Transform({ objectMode: true });
var push = transform.push.bind(transform);
transform._transform = function(frame, encoding, done) {
filter(frame, push, done);
};
return transform;
}
function pipeAndFilter(stream1, stream2, filter) {
if (filter) {
stream1.pipe(createTransformStream(filter)).pipe(stream2);
} else {
stream1.pipe(stream2);
}
}
Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) {
var firstStreamId, compressorRole, decompressorRole;
if (role === 'CLIENT') {
firstStreamId = 1;
compressorRole = 'REQUEST';
decompressorRole = 'RESPONSE';
} else {
firstStreamId = 2;
compressorRole = 'RESPONSE';
decompressorRole = 'REQUEST';
}
this._serializer = new Serializer(this._log);
this._deserializer = new Deserializer(this._log);
this._compressor = new Compressor(this._log, compressorRole);
this._decompressor = new Decompressor(this._log, decompressorRole);
this._connection = new Connection(this._log, firstStreamId, settings);
pipeAndFilter(this._connection, this._compressor, filters.beforeCompression);
pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization);
pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization);
pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression);
this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE',
this._decompressor.setTableSizeLimit.bind(this._decompressor));
this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE',
this._compressor.setTableSizeLimit.bind(this._compressor));
};
var noread = {};
Endpoint.prototype._read = function _read() {
this._readableState.sync = true;
var moreNeeded = noread, chunk;
while (moreNeeded && (chunk = this._serializer.read())) {
moreNeeded = this.push(chunk);
}
if (moreNeeded === noread) {
this._serializer.once('readable', this._read.bind(this));
}
this._readableState.sync = false;
};
Endpoint.prototype._write = function _write(chunk, encoding, done) {
this._deserializer.write(chunk, encoding, done);
};