var assert = require('assert');
var assert = require('assert');
The Connection class manages HTTP/2 connections. Each instance corresponds to one transport stream (TCP stream). It operates by sending and receiving frames and is implemented as a Flow subclass.
var Flow = require('./flow').Flow;
exports.Connection = Connection;
new Connection(log, firstStreamId, settings): create a new Connection
Event: ‘error’ (type): signals a connection level error made by the other end
Event: ‘peerError’ (type): signals the receipt of a GOAWAY frame that contains an error code other than NO_ERROR
Event: ‘stream’ (stream): signals that there’s an incoming stream
createStream(): stream: initiate a new stream
set(settings, callback): change the value of one or more settings according to the
key-value pairs of settings
. The callback is called after the peer acknowledged the changes.
ping([callback]): send a ping and call callback when the answer arrives
close([error]): close the stream with an error code
The main aspects of managing the connection are:
function Connection(log, firstStreamId, settings) {
Flow.call(this, 0);
this._log = log.child({ component: 'connection' });
this._initializeStreamManagement(firstStreamId);
this._initializeLifecycleManagement();
this._initializeFlowControl();
this._initializeSettingsManagement(settings);
this._initializeMultiplexing();
}
Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } });
| ^ | ^
v | v |
+--------------+ +--------------+
+---| stream1 |---| stream2 |---- .... ---+
| | +----------+ | | +----------+ | |
| | | stream1. | | | | stream2. | | |
| +-| upstream |-+ +-| upstream |-+ |
| +----------+ +----------+ |
| | ^ | ^ |
| v | v | |
| +-----+-------------+-----+-------- .... |
| ^ | | | |
| | v | | |
| +--------------+ | | |
| | stream0 | | | |
| | connection | | | |
| | management | multiplexing |
| +--------------+ flow control |
| | ^ |
| _read() | | _write() |
| v | |
| +------------+ +-----------+ |
| |output queue| |input queue| |
+----------------+------------+-+-----------+-----------------+
| ^
read() | | write()
v |
var Stream = require('./stream').Stream;
Initialization:
Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) {
_streamIds
is an id -> stream map of the streams that are allowed to receive frames._streamPriorities
is a priority -> [stream] map of stream that allowed to send frames. this._streamIds = [];
this._streamPriorities = [];
this._nextStreamId = firstStreamId;
this._lastIncomingStream = 0;
_writeControlFrame
when there’s an incoming stream with 0 as stream ID this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } };
_streamLimit
can
be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting. this._streamSlotsFree = Infinity;
this._streamLimit = Infinity;
this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit);
};
_writeControlFrame
is called when there’s an incoming frame in the _control
stream. It
broadcasts the message by creating an event on it.
Connection.prototype._writeControlFrame = function _writeControlFrame(frame) {
if ((frame.type === 'SETTINGS') || (frame.type === 'PING') ||
(frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE') ||
(frame.type === 'ALTSVC')) {
this._log.debug({ frame: frame }, 'Receiving connection level frame');
this.emit(frame.type, frame);
} else {
this._log.error({ frame: frame }, 'Invalid connection level frame');
this.emit('error', 'PROTOCOL_ERROR');
}
};
Methods to manage the stream slot pool:
Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) {
var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit);
this._streamSlotsFree += newStreamLimit - this._streamLimit;
this._streamLimit = newStreamLimit;
if (wakeup) {
this.emit('wakeup');
}
};
Connection.prototype._changeStreamCount = function _changeStreamCount(change) {
if (change) {
this._log.trace({ free: this._streamSlotsFree, change: change },
'Changing active stream count.');
var wakeup = (this._streamSlotsFree === 0) && (change < 0);
this._streamSlotsFree -= change;
if (wakeup) {
this.emit('wakeup');
}
}
};
Creating a new inbound or outbound stream with the given id
(which is undefined in case of
an outbound stream) consists of three steps:
Allocating an ID to a stream
Connection.prototype._allocateId = function _allocateId(stream, id) {
if (id === undefined) {
id = this._nextStreamId;
this._nextStreamId += 2;
}
else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) {
this._lastIncomingStream = id;
}
else {
this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream },
'Invalid incoming stream ID.');
this.emit('error', 'PROTOCOL_ERROR');
return undefined;
}
assert(!(id in this._streamIds));
this._streamIds
this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.');
this._streamIds[id] = stream;
stream.id = id;
this.emit('new_stream', stream, id);
stream.on('connectionError', this.emit.bind(this, 'error'));
return id;
};
Allocating a priority to a stream, and managing priority changes
Connection.prototype._allocatePriority = function _allocatePriority(stream) {
this._log.trace({ s: stream }, 'Allocating priority for stream.');
this._insert(stream, stream._priority);
stream.on('priority', this._reprioritize.bind(this, stream));
stream.upstream.on('readable', this.emit.bind(this, 'wakeup'));
this.emit('wakeup');
};
Connection.prototype._insert = function _insert(stream, priority) {
if (priority in this._streamPriorities) {
this._streamPriorities[priority].push(stream);
} else {
this._streamPriorities[priority] = [stream];
}
};
Connection.prototype._reprioritize = function _reprioritize(stream, priority) {
var bucket = this._streamPriorities[stream._priority];
var index = bucket.indexOf(stream);
assert(index !== -1);
bucket.splice(index, 1);
if (bucket.length === 0) {
delete this._streamPriorities[stream._priority];
}
this._insert(stream, priority);
};
Creating an inbound stream with the given ID. It is called when there’s an incoming frame to a previously nonexistent stream.
Connection.prototype._createIncomingStream = function _createIncomingStream(id) {
this._log.debug({ stream_id: id }, 'New incoming stream.');
var stream = new Stream(this._log, this);
this._allocateId(stream, id);
this._allocatePriority(stream);
this.emit('stream', stream, id);
return stream;
};
Creating an outbound stream
Connection.prototype.createStream = function createStream() {
this._log.trace('Creating new outbound stream.');
var stream = new Stream(this._log, this);
this._allocatePriority(stream);
return stream;
};
Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() {
this.on('window_update', this.emit.bind(this, 'wakeup'));
this._sendScheduled = false;
this._firstFrameReceived = false;
};
The _send
method is a virtual method of the Flow class that has to be implemented
by child classes. It reads frames from streams and pushes them to the output buffer.
Connection.prototype._send = function _send(immediate) {
if (this._closed) {
return;
}
if (immediate) {
this._sendScheduled = false;
} else {
if (!this._sendScheduled) {
this._sendScheduled = true;
setImmediate(this._send.bind(this, true));
}
return;
}
this._log.trace('Starting forwarding frames from streams.');
bucket
s in priority order.priority_loop:
for (var priority in this._streamPriorities) {
var bucket = this._streamPriorities[priority];
var nextBucket = [];
streamCount
greater than streamLimit
, skip
this streamstreamCount
as appropriate while (bucket.length > 0) {
for (var index = 0; index < bucket.length; index++) {
var stream = bucket[index];
var frame = stream.upstream.read((this._window > 0) ? this._window : -1);
if (!frame) {
continue;
} else if (frame.count_change > this._streamSlotsFree) {
stream.upstream.unshift(frame);
continue;
}
nextBucket.push(stream);
if (frame.stream === undefined) {
frame.stream = stream.id || this._allocateId(stream);
}
if (frame.type === 'PUSH_PROMISE') {
this._allocatePriority(frame.promised_stream);
frame.promised_stream = this._allocateId(frame.promised_stream);
}
this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame');
var moreNeeded = this.push(frame);
this._changeStreamCount(frame.count_change);
assert(moreNeeded !== null); // The frame shouldn't be unforwarded
if (moreNeeded === false) {
break priority_loop;
}
}
bucket = nextBucket;
nextBucket = [];
}
}
if (moreNeeded === undefined) {
this.once('wakeup', this._send.bind(this));
}
this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.');
};
The _receive
method is another virtual method of the Flow class that has to be
implemented by child classes. It forwards the given frame to the appropriate stream:
Connection.prototype._receive = function _receive(frame, done) {
this._log.trace({ frame: frame }, 'Forwarding incoming frame');
_onFirstFrameReceived
method if (!this._firstFrameReceived) {
this._firstFrameReceived = true;
this._onFirstFrameReceived(frame);
}
Do some sanity checking here before we create a stream
if ((frame.type == 'SETTINGS' ||
frame.type == 'PING' ||
frame.type == 'GOAWAY') &&
frame.stream != 0) {
Got connection-level frame on a stream - EEP!
this.close('PROTOCOL_ERROR');
return;
} else if ((frame.type == 'DATA' ||
frame.type == 'HEADERS' ||
frame.type == 'PRIORITY' ||
frame.type == 'RST_STREAM' ||
frame.type == 'PUSH_PROMISE' ||
frame.type == 'CONTINUATION') &&
frame.stream == 0) {
Got stream-level frame on connection - EEP!
this.close('PROTOCOL_ERROR');
return;
}
WINDOW_UPDATE can be on either stream or connection
var stream = this._streamIds[frame.stream];
this.streams
if (!stream) {
stream = this._createIncomingStream(frame.stream);
}
if (frame.type === 'PUSH_PROMISE') {
frame.promised_stream = this._createIncomingStream(frame.promised_stream);
}
frame.count_change = this._changeStreamCount.bind(this);
stream
‘s upstream
stream.upstream.write(frame);
done();
};
var defaultSettings = {
};
Settings management initialization:
Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) {
this._settingsAckCallbacks = [];
this._log.debug({ settings: settings },
'Sending the first SETTINGS frame as part of the connection header.');
this.set(settings || defaultSettings);
_receiveSettings
method this.on('SETTINGS', this._receiveSettings);
this.on('RECEIVING_SETTINGS_MAX_FRAME_SIZE', this._sanityCheckMaxFrameSize);
};
Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) {
if ((frame.stream === 0) && (frame.type === 'SETTINGS')) {
this._log.debug('Receiving the first SETTINGS frame as part of the connection header.');
} else {
this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.');
this.emit('error', 'PROTOCOL_ERROR');
}
};
Handling of incoming SETTINGS frames.
Connection.prototype._receiveSettings = function _receiveSettings(frame) {
if (frame.flags.ACK) {
var callback = this._settingsAckCallbacks.shift();
if (callback) {
callback();
}
}
else {
if (!this._closed) {
this.push({
type: 'SETTINGS',
flags: { ACK: true },
stream: 0,
settings: {}
});
}
for (var name in frame.settings) {
this.emit('RECEIVING_' + name, frame.settings[name]);
}
}
};
Connection.prototype._sanityCheckMaxFrameSize = function _sanityCheckMaxFrameSize(value) {
if ((value < 0x4000) || (value >= 0x01000000)) {
this._log.fatal('Received invalid value for max frame size: ' + value);
this.emit('error');
}
};
Changing one or more settings value and sending out a SETTINGS frame
Connection.prototype.set = function set(settings, callback) {
var self = this;
this._settingsAckCallbacks.push(function() {
for (var name in settings) {
self.emit('ACKNOWLEDGED_' + name, settings[name]);
}
if (callback) {
callback();
}
});
this.push({
type: 'SETTINGS',
flags: { ACK: false },
stream: 0,
settings: settings
});
for (var name in settings) {
this.emit('SENDING_' + name, settings[name]);
}
};
The main responsibilities of lifecycle management code:
Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() {
this._pings = {};
this.on('PING', this._receivePing);
this.on('GOAWAY', this._receiveGoaway);
this._closed = false;
};
Generating a string of length 16 with random hexadecimal digits
Connection.prototype._generatePingId = function _generatePingId() {
do {
var id = '';
for (var i = 0; i < 16; i++) {
id += Math.floor(Math.random()*16).toString(16);
}
} while(id in this._pings);
return id;
};
Sending a ping and calling callback
when the answer arrives
Connection.prototype.ping = function ping(callback) {
var id = this._generatePingId();
var data = new Buffer(id, 'hex');
this._pings[id] = callback;
this._log.debug({ data: data }, 'Sending PING.');
this.push({
type: 'PING',
flags: {
ACK: false
},
stream: 0,
data: data
});
};
Answering pings
Connection.prototype._receivePing = function _receivePing(frame) {
if (frame.flags.ACK) {
var id = frame.data.toString('hex');
if (id in this._pings) {
this._log.debug({ data: frame.data }, 'Receiving answer for a PING.');
var callback = this._pings[id];
if (callback) {
callback();
}
delete this._pings[id];
} else {
this._log.warn({ data: frame.data }, 'Unsolicited PING answer.');
}
} else {
this._log.debug({ data: frame.data }, 'Answering PING.');
this.push({
type: 'PING',
flags: {
ACK: true
},
stream: 0,
data: frame.data
});
}
};
Terminating the connection
Connection.prototype.close = function close(error) {
if (this._closed) {
this._log.warn('Trying to close an already closed connection');
return;
}
this._log.debug({ error: error }, 'Closing the connection');
this.push({
type: 'GOAWAY',
flags: {},
stream: 0,
last_stream: this._lastIncomingStream,
error: error || 'NO_ERROR'
});
this.push(null);
this._closed = true;
};
Connection.prototype._receiveGoaway = function _receiveGoaway(frame) {
this._log.debug({ error: frame.error }, 'Other end closed the connection');
this.push(null);
this._closed = true;
if (frame.error !== 'NO_ERROR') {
this.emit('peerError', frame.error);
}
};
Connection.prototype._initializeFlowControl = function _initializeFlowControl() {
Handling of initial window size of individual streams.
this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE;
this.on('new_stream', function(stream) {
stream.upstream.setInitialWindow(this._initialStreamWindowSize);
});
this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize);
this._streamIds[0].upstream.setInitialWindow = function noop() {};
};
The initial connection flow control window is 65535 bytes.
var INITIAL_STREAM_WINDOW_SIZE = 65535;
A SETTINGS frame can alter the initial flow control window size for all current streams. When the
value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all
stream by calling the setInitialStreamWindowSize
method. The window size has to be modified by
the difference between the new value and the old value.
Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) {
if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) {
this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.');
this.emit('error', 'FLOW_CONTROL_ERROR');
} else {
this._log.debug({ size: size }, 'Changing stream initial window size.');
this._initialStreamWindowSize = size;
this._streamIds.forEach(function(stream) {
stream.upstream.setInitialWindow(size);
});
}
};