var assert = require('assert');
var assert = require('assert');
Stream is a Duplex stream
subclass that implements the HTTP/2 Stream
concept. It has two ‘sides’: one that is used by the user to send/receive data (the stream
object itself) and one that is used by a Connection to read/write frames to/from the other peer
(stream.upstream
).
var Duplex = require('stream').Duplex;
exports.Stream = Stream;
new Stream(log, connection): create a new Stream
Event: ‘headers’ (headers): signals incoming headers
Event: ‘promise’ (stream, headers): signals an incoming push promise
Event: ‘priority’ (priority): signals a priority change. priority
is a number between 0
(highest priority) and 2^31-1 (lowest priority). Default value is 2^30.
Event: ‘error’ (type): signals an error
headers(headers): send headers
promise(headers): Stream: promise a stream
priority(priority): set the priority of the stream. Priority can be changed by the peer too, but once it is set locally, it can not be changed remotely.
reset(error): reset the stream with an error code
upstream: a Flow that is used by the parent connection to write/read frames that are to be sent/arrived to/from the peer and are related to this stream.
Headers are always in the regular node.js header format.
The main aspects of managing the stream are:
function Stream(log, connection) {
Duplex.call(this);
this._log = log.child({ component: 'stream', s: this });
this._initializeManagement();
this._initializeDataFlow();
this._initializeState();
this.connection = connection;
}
Stream.prototype = Object.create(Duplex.prototype, { constructor: { value: Stream } });
the default stream priority is 2^30
var DEFAULT_PRIORITY = Math.pow(2, 30);
var MAX_PRIORITY = Math.pow(2, 31) - 1;
PUSH_PROMISE and HEADERS are forwarded to the user through events.
Stream.prototype._initializeManagement = function _initializeManagement() {
this._resetSent = false;
this._priority = DEFAULT_PRIORITY;
this._letPeerPrioritize = true;
};
Stream.prototype.promise = function promise(headers) {
var stream = new Stream(this._log, this.connection);
stream._priority = Math.min(this._priority + 1, MAX_PRIORITY);
this._pushUpstream({
type: 'PUSH_PROMISE',
flags: {},
stream: this.id,
promised_stream: stream,
headers: headers
});
return stream;
};
Stream.prototype._onPromise = function _onPromise(frame) {
this.emit('promise', frame.promised_stream, frame.headers);
};
Stream.prototype.headers = function headers(headers) {
this._pushUpstream({
type: 'HEADERS',
flags: {},
stream: this.id,
headers: headers
});
};
Stream.prototype._onHeaders = function _onHeaders(frame) {
if (frame.priority !== undefined) {
this.priority(frame.priority, true);
}
this.emit('headers', frame.headers);
};
Stream.prototype.priority = function priority(priority, peer) {
if ((peer && this._letPeerPrioritize) || !peer) {
if (!peer) {
this._letPeerPrioritize = false;
var lastFrame = this.upstream.getLastQueuedFrame();
if (lastFrame && ((lastFrame.type === 'HEADERS') || (lastFrame.type === 'PRIORITY'))) {
lastFrame.priority = priority;
} else {
this._pushUpstream({
type: 'PRIORITY',
flags: {},
stream: this.id,
priority: priority
});
}
}
this._log.debug({ priority: priority }, 'Changing priority');
this.emit('priority', priority);
this._priority = priority;
}
};
Stream.prototype._onPriority = function _onPriority(frame) {
this.priority(frame.priority, true);
};
Resetting the stream. Normally, an endpoint SHOULD NOT send more than one RST_STREAM frame for any stream.
Stream.prototype.reset = function reset(error) {
if (!this._resetSent) {
this._resetSent = true;
this._pushUpstream({
type: 'RST_STREAM',
flags: {},
stream: this.id,
error: error
});
}
};
Specify an alternate service for the origin of this stream
Stream.prototype.altsvc = function altsvc(host, port, protocolID, maxAge, origin) {
var stream;
if (origin) {
stream = 0;
} else {
stream = this.id;
}
this._pushUpstream({
type: 'ALTSVC',
flags: {},
stream: stream,
host: host,
port: port,
protocolID: protocolID,
origin: origin,
maxAge: maxAge
});
};
The incoming and the generated outgoing frames are received/transmitted on the this.upstream
Flow. The Connection object instantiating the stream will read
and write frames to/from it. The stream itself is a regular Duplex stream, and is used by
the user to write or read the body of the request.
upstream side stream user side
+------------------------------------+
| |
+------------------+ |
| upstream | |
| | |
+--+ | +--|
read() | | _send() | _write() | | write(buf)
<--------------|B |<--------------|--------------| B|<------------
| | | | |
frames +--+ | +--| buffers
| | | | |
-------------->|B |---------------|------------->| B|------------>
write(frame) | | _receive() | _read() | | read()
+--+ | +--|
| | |
| | |
+------------------+ |
| |
+------------------------------------+
B: input or output buffer
var Flow = require('./flow').Flow;
Stream.prototype._initializeDataFlow = function _initializeDataFlow() {
this.id = undefined;
this._ended = false;
this.upstream = new Flow();
this.upstream._log = this._log;
this.upstream._send = this._send.bind(this);
this.upstream._receive = this._receive.bind(this);
this.upstream.write = this._writeUpstream.bind(this);
this.upstream.on('error', this.emit.bind(this, 'error'));
this.on('finish', this._finishing);
};
Stream.prototype._pushUpstream = function _pushUpstream(frame) {
this.upstream.push(frame);
this._transition(true, frame);
};
Overriding the upstream’s write
allows us to act immediately instead of waiting for the input
queue to empty. This is important in case of control frames.
Stream.prototype._writeUpstream = function _writeUpstream(frame) {
this._log.debug({ frame: frame }, 'Receiving frame');
var moreNeeded = Flow.prototype.write.call(this.upstream, frame);
this._transition(false, frame);
if (frame.type === 'HEADERS') {
if (this._processedHeaders && !frame.flags['END_STREAM']) {
this.emit('error', 'PROTOCOL_ERROR');
}
this._processedHeaders = true;
this._onHeaders(frame);
} else if (frame.type === 'PUSH_PROMISE') {
this._onPromise(frame);
} else if (frame.type === 'PRIORITY') {
this._onPriority(frame);
} else if (frame.type === 'ALTSVC') {
TODO
} else if (frame.type === 'BLOCKED') {
TODO
}
else if ((frame.type !== 'DATA') &&
(frame.type !== 'WINDOW_UPDATE') &&
(frame.type !== 'RST_STREAM')) {
this._log.error({ frame: frame }, 'Invalid stream level frame');
this.emit('error', 'PROTOCOL_ERROR');
}
return moreNeeded;
};
The _receive
method (= upstream._receive
) gets called when there’s an incoming frame.
Stream.prototype._receive = function _receive(frame, ready) {
if (!this._ended && (frame.type === 'DATA')) {
var moreNeeded = this.push(frame.data);
if (!moreNeeded) {
this._receiveMore = ready;
}
}
if (!this._ended && (frame.flags.END_STREAM || (frame.type === 'RST_STREAM'))) {
this.push(null);
this._ended = true;
}
ready
if push()
returned a falsy value if (this._receiveMore !== ready) {
ready();
}
};
The _read
method is called when the user side is ready to receive more data. If there’s a
pending write on the upstream, then call its pending ready callback to receive more frames.
Stream.prototype._read = function _read() {
if (this._receiveMore) {
var receiveMore = this._receiveMore;
delete this._receiveMore;
receiveMore();
}
};
The write
method gets called when there’s a write request from the user.
Stream.prototype._write = function _write(buffer, encoding, ready) {
var moreNeeded = this._pushUpstream({
type: 'DATA',
flags: {},
stream: this.id,
data: buffer
});
if (moreNeeded) {
ready();
} else {
this._sendMore = ready;
}
};
The _send
(= upstream._send
) method is called when upstream is ready to receive more frames.
If there’s a pending write on the user side, then call its pending ready callback to receive more
writes.
Stream.prototype._send = function _send() {
if (this._sendMore) {
var sendMore = this._sendMore;
delete this._sendMore;
sendMore();
}
};
When the stream is finishing (the user calls end()
on it), then we have to set the END_STREAM
flag on the last frame. If there’s no frame in the queue, or if it doesn’t support this flag,
then we create a 0 length DATA frame. We could do this all the time, but putting the flag on an
existing frame is a nice optimization.
var emptyBuffer = new Buffer(0);
Stream.prototype._finishing = function _finishing() {
var endFrame = {
type: 'DATA',
flags: { END_STREAM: true },
stream: this.id,
data: emptyBuffer
};
var lastFrame = this.upstream.getLastQueuedFrame();
if (lastFrame && ((lastFrame.type === 'DATA') || (lastFrame.type === 'HEADERS'))) {
this._log.debug({ frame: lastFrame }, 'Marking last frame with END_STREAM flag.');
lastFrame.flags.END_STREAM = true;
this._transition(true, endFrame);
} else {
this._pushUpstream(endFrame);
}
};
+--------+
PP | | PP
,--------| idle |--------.
/ | | \
v +--------+ v
+----------+ | +----------+
| | | H | |
,---| reserved | | | reserved |---.
| | (local) | v | (remote) | |
| +----------+ +--------+ +----------+ |
| | ES | | ES | |
| | H ,-------| open |-------. | H |
| | / | | \ | |
| v v +--------+ v v |
| +----------+ | +----------+ |
| | half | | | half | |
| | closed | | R | closed | |
| | (remote) | | | (local) | |
| +----------+ | +----------+ |
| | v | |
| | ES / R +--------+ ES / R | |
| `----------->| |<-----------' |
| R | closed | R |
`-------------------->| |<--------------------'
+--------+
Streams begin in the IDLE state and transitions happen when there’s an incoming or outgoing frame
Stream.prototype._initializeState = function _initializeState() {
this.state = 'IDLE';
this._initiated = undefined;
this._closedByUs = undefined;
this._closedWithRst = undefined;
this._processedHeaders = false;
};
Only _setState
should change this.state
directly. It also logs the state change and notifies
interested parties using the ‘state’ event.
Stream.prototype._setState = function transition(state) {
assert(this.state !== state);
this._log.debug({ from: this.state, to: state }, 'State transition');
this.state = state;
this.emit('state', state);
};
A state is ‘active’ if the stream in that state counts towards the concurrency limit. Streams that are in the “open” state, or either of the “half closed” states count toward this limit.
function activeState(state) {
return ((state === 'HALF_CLOSED_LOCAL') || (state === 'HALF_CLOSED_REMOTE') || (state === 'OPEN'));
}
_transition
is called every time there’s an incoming or outgoing frame. It manages state
transitions, and detects stream errors. A stream error is always caused by a frame that is not
allowed in the current state.
Stream.prototype._transition = function transition(sending, frame) {
var receiving = !sending;
var connectionError;
var streamError;
var DATA = false, HEADERS = false, PRIORITY = false, ALTSVC = false, BLOCKED = false;
var RST_STREAM = false, PUSH_PROMISE = false, WINDOW_UPDATE = false;
switch(frame.type) {
case 'DATA' : DATA = true; break;
case 'HEADERS' : HEADERS = true; break;
case 'PRIORITY' : PRIORITY = true; break;
case 'RST_STREAM' : RST_STREAM = true; break;
case 'PUSH_PROMISE' : PUSH_PROMISE = true; break;
case 'WINDOW_UPDATE': WINDOW_UPDATE = true; break;
case 'ALTSVC' : ALTSVC = true; break;
case 'BLOCKED' : BLOCKED = true; break;
}
var previousState = this.state;
switch (this.state) {
All streams start in the idle state. In this state, no frames have been exchanged.
When the HEADERS frame contains the END_STREAM flags, then two state transitions happen.
case 'IDLE':
if (HEADERS) {
this._setState('OPEN');
if (frame.flags.END_STREAM) {
this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE');
}
this._initiated = sending;
} else if (sending && RST_STREAM) {
this._setState('CLOSED');
} else if (PRIORITY) {
/* No state change */
} else {
connectionError = 'PROTOCOL_ERROR';
}
break;
A stream in the reserved (local) state is one that has been promised by sending a PUSH_PROMISE frame.
case 'RESERVED_LOCAL':
if (sending && HEADERS) {
this._setState('HALF_CLOSED_REMOTE');
} else if (RST_STREAM) {
this._setState('CLOSED');
} else if (PRIORITY) {
/* No state change */
} else {
connectionError = 'PROTOCOL_ERROR';
}
break;
A stream in the reserved (remote) state has been reserved by a remote peer.
case 'RESERVED_REMOTE':
if (RST_STREAM) {
this._setState('CLOSED');
} else if (receiving && HEADERS) {
this._setState('HALF_CLOSED_LOCAL');
} else if (BLOCKED || PRIORITY) {
/* No state change */
} else {
connectionError = 'PROTOCOL_ERROR';
}
break;
The open state is where both peers can send frames. In this state, sending peers observe advertised stream level flow control limits.
case 'OPEN':
if (frame.flags.END_STREAM) {
this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE');
} else if (RST_STREAM) {
this._setState('CLOSED');
} else {
/* No state change */
}
break;
A stream that is half closed (local) cannot be used for sending frames.
case 'HALF_CLOSED_LOCAL':
if (RST_STREAM || (receiving && frame.flags.END_STREAM)) {
this._setState('CLOSED');
} else if (BLOCKED || ALTSVC || receiving || PRIORITY || (sending && WINDOW_UPDATE)) {
/* No state change */
} else {
connectionError = 'PROTOCOL_ERROR';
}
break;
A stream that is half closed (remote) is no longer being used by the peer to send frames. In this state, an endpoint is no longer obligated to maintain a receiver flow control window if it performs flow control.
case 'HALF_CLOSED_REMOTE':
if (RST_STREAM || (sending && frame.flags.END_STREAM)) {
this._setState('CLOSED');
} else if (BLOCKED || ALTSVC || sending || PRIORITY || (receiving && WINDOW_UPDATE)) {
/* No state change */
} else {
connectionError = 'PROTOCOL_ERROR';
}
break;
The closed state is the terminal state.
case 'CLOSED':
if (PRIORITY || (sending && RST_STREAM) ||
(receiving && WINDOW_UPDATE) ||
(receiving && this._closedByUs &&
(this._closedWithRst || RST_STREAM || ALTSVC))) {
/* No state change */
} else {
streamError = 'STREAM_CLOSED';
}
break;
}
Noting that the connection was closed by the other endpoint. It may be important in edge cases. For example, when the peer tries to cancel a promised stream, but we already sent every data on it, then the stream is in CLOSED state, yet we want to ignore the incoming RST_STREAM.
if ((this.state === 'CLOSED') && (previousState !== 'CLOSED')) {
this._closedByUs = sending;
this._closedWithRst = RST_STREAM;
}
Sending/receiving a PUSH_PROMISE
if (PUSH_PROMISE && !connectionError && !streamError) {
/* This assertion must hold, because _transition is called immediately when a frame is written
to the stream. If it would be called when a frame gets out of the input queue, the state
of the reserved could have been changed by then. */
assert(frame.promised_stream.state === 'IDLE', frame.promised_stream.state);
frame.promised_stream._setState(sending ? 'RESERVED_LOCAL' : 'RESERVED_REMOTE');
frame.promised_stream._initiated = sending;
}
Signaling how sending/receiving this frame changes the active stream count (-1, 0 or +1)
if (this._initiated) {
var change = (activeState(this.state) - activeState(previousState));
if (sending) {
frame.count_change = change;
} else {
frame.count_change(change);
}
} else if (sending) {
frame.count_change = 0;
}
Common error handling.
if (connectionError || streamError) {
var info = {
error: connectionError,
frame: frame,
state: this.state,
closedByUs: this._closedByUs,
closedWithRst: this._closedWithRst
};
if (sending) {
this._log.error(info, 'Sending illegal frame.');
return this.emit('error', new Error('Sending illegal frame (' + frame.type + ') in ' + this.state + ' state.'));
}
reset
method.
This will automatically cause a transition to the CLOSED state. else {
this._log.error(info, 'Received illegal frame.');
if (connectionError) {
this.emit('connectionError', connectionError);
} else {
this.reset(streamError);
this.emit('error', streamError);
}
}
}
};
exports.serializers = {};
var nextId = 0;
exports.serializers.s = function(stream) {
if (!('_id' in stream)) {
stream._id = nextId;
nextId += 1;
}
return stream._id;
};