• Jump To … +
    API compressor.js connection.js endpoint.js flow.js framer.js index.js stream.js
  • ¶
    var assert = require('assert');
    
    var Serializer   = require('./framer').Serializer;
    var Deserializer = require('./framer').Deserializer;
    var Compressor   = require('./compressor').Compressor;
    var Decompressor = require('./compressor').Decompressor;
    var Connection   = require('./connection').Connection;
    var Duplex       = require('stream').Duplex;
    var Transform    = require('stream').Transform;
    
    exports.Endpoint = Endpoint;
  • ¶

    The Endpoint class

  • ¶
  • ¶

    Public API

  • ¶
  • ¶
    • new Endpoint(log, role, settings, filters): create a new Endpoint.

      • log: bunyan logger of the parent
      • role: ‘CLIENT’ or ‘SERVER’
      • settings: initial HTTP/2 settings
      • filters: a map of functions that filter the traffic between components (for debugging or intentional failure injection).

        Filter functions get three arguments:

        1. frame: the current frame
        2. forward(frame): function that can be used to forward a frame to the next component
        3. done(): callback to signal the end of the filter process

        Valid filter names and their position in the stack:

        • beforeSerialization: after compression, before serialization
        • beforeCompression: after multiplexing, before compression
        • afterDeserialization: after deserialization, before decompression
        • afterDecompression: after decompression, before multiplexing
    • Event: ‘stream’ (Stream): ‘stream’ event forwarded from the underlying Connection

    • Event: ‘error’ (type): signals an error

    • createStream(): Stream: initiate a new stream (forwarded to the underlying Connection)

    • close([error]): close the connection with an error code

  • ¶

    Constructor

  • ¶
  • ¶

    The process of initialization:

    function Endpoint(log, role, settings, filters) {
      Duplex.call(this);
  • ¶
    • Initializing logging infrastructure
      this._log = log.child({ component: 'endpoint', e: this });
  • ¶
    • First part of the handshake process: sending and receiving the client connection header prelude.
      assert((role === 'CLIENT') || role === 'SERVER');
      if (role === 'CLIENT') {
        this._writePrelude();
      } else {
        this._readPrelude();
      }
  • ¶
    • Initialization of component. This includes the second part of the handshake process: sending the first SETTINGS frame. This is done by the connection class right after initialization.
      this._initializeDataFlow(role, settings, filters || {});
  • ¶
    • Initialization of management code.
      this._initializeManagement();
  • ¶
    • Initializing error handling.
      this._initializeErrorHandling();
    }
    Endpoint.prototype = Object.create(Duplex.prototype, { constructor: { value: Endpoint } });
  • ¶

    Handshake

  • ¶
    var CLIENT_PRELUDE = new Buffer('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n');
  • ¶

    Writing the client header is simple and synchronous.

    Endpoint.prototype._writePrelude = function _writePrelude() {
      this._log.debug('Sending the client connection header prelude.');
      this.push(CLIENT_PRELUDE);
    };
  • ¶

    The asynchronous process of reading the client header:

    Endpoint.prototype._readPrelude = function _readPrelude() {
  • ¶
    • progress in the header is tracker using a cursor
      var cursor = 0;
  • ¶
    • _write is temporarily replaced by the comparator function
      this._write = function _temporalWrite(chunk, encoding, done) {
  • ¶
    • which compares the stored header with the current chunk byte by byte and emits the ‘error’ event if there’s a byte that doesn’t match
        var offset = cursor;
        while(cursor < CLIENT_PRELUDE.length && (cursor - offset) < chunk.length) {
          if (CLIENT_PRELUDE[cursor] !== chunk[cursor - offset]) {
            this._log.fatal({ cursor: cursor, offset: offset, chunk: chunk },
                            'Client connection header prelude does not match.');
            this._error('handshake', 'PROTOCOL_ERROR');
            return;
          }
          cursor += 1;
        }
  • ¶
    • if the whole header is over, and there were no error then restore the original _write and call it with the remaining part of the current chunk
        if (cursor === CLIENT_PRELUDE.length) {
          this._log.debug('Successfully received the client connection header prelude.');
          delete this._write;
          chunk = chunk.slice(cursor - offset);
          this._write(chunk, encoding, done);
        }
      };
    };
  • ¶

    Data flow

  • ¶
  • ¶
    +---------------------------------------------+
    |                                             |
    |   +-------------------------------------+   |
    |   | +---------+ +---------+ +---------+ |   |
    |   | | stream1 | | stream2 | |   ...   | |   |
    |   | +---------+ +---------+ +---------+ |   |
    |   |             connection              |   |
    |   +-------------------------------------+   |
    |             |                 ^             |
    |        pipe |                 | pipe        |
    |             v                 |             |
    |   +------------------+------------------+   |
    |   |    compressor    |   decompressor   |   |
    |   +------------------+------------------+   |
    |             |                 ^             |
    |        pipe |                 | pipe        |
    |             v                 |             |
    |   +------------------+------------------+   |
    |   |    serializer    |   deserializer   |   |
    |   +------------------+------------------+   |
    |             |                 ^             |
    |     _read() |                 | _write()    |
    |             v                 |             |
    |      +------------+     +-----------+       |
    |      |output queue|     |input queue|       |
    +------+------------+-----+-----------+-------+
                  |                 ^
           read() |                 | write()
                  v                 |
    
    function createTransformStream(filter) {
      var transform = new Transform({ objectMode: true });
      var push = transform.push.bind(transform);
      transform._transform = function(frame, encoding, done) {
        filter(frame, push, done);
      };
      return transform;
    }
    
    function pipeAndFilter(stream1, stream2, filter) {
      if (filter) {
        stream1.pipe(createTransformStream(filter)).pipe(stream2);
      } else {
        stream1.pipe(stream2);
      }
    }
    
    Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) {
      var firstStreamId, compressorRole, decompressorRole;
      if (role === 'CLIENT') {
        firstStreamId = 1;
        compressorRole = 'REQUEST';
        decompressorRole = 'RESPONSE';
      } else {
        firstStreamId = 2;
        compressorRole = 'RESPONSE';
        decompressorRole = 'REQUEST';
      }
    
      this._serializer   = new Serializer(this._log);
      this._deserializer = new Deserializer(this._log);
      this._compressor   = new Compressor(this._log, compressorRole);
      this._decompressor = new Decompressor(this._log, decompressorRole);
      this._connection   = new Connection(this._log, firstStreamId, settings);
    
      pipeAndFilter(this._connection, this._compressor, filters.beforeCompression);
      pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization);
      pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization);
      pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression);
    
      this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE',
                          this._decompressor.setTableSizeLimit.bind(this._decompressor));
      this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE',
                          this._compressor.setTableSizeLimit.bind(this._compressor));
    };
    
    var noread = {};
    Endpoint.prototype._read = function _read() {
      this._readableState.sync = true;
      var moreNeeded = noread, chunk;
      while (moreNeeded && (chunk = this._serializer.read())) {
        moreNeeded = this.push(chunk);
      }
      if (moreNeeded === noread) {
        this._serializer.once('readable', this._read.bind(this));
      }
      this._readableState.sync = false;
    };
    
    Endpoint.prototype._write = function _write(chunk, encoding, done) {
      this._deserializer.write(chunk, encoding, done);
    };
  • ¶

    Management

  • ¶
    Endpoint.prototype._initializeManagement = function _initializeManagement() {
      this._connection.on('stream', this.emit.bind(this, 'stream'));
    };
    
    Endpoint.prototype.createStream = function createStream() {
      return this._connection.createStream();
    };
  • ¶

    Error handling

  • ¶
    Endpoint.prototype._initializeErrorHandling = function _initializeErrorHandling() {
      this._serializer.on('error', this._error.bind(this, 'serializer'));
      this._deserializer.on('error', this._error.bind(this, 'deserializer'));
      this._compressor.on('error', this._error.bind(this, 'compressor'));
      this._decompressor.on('error', this._error.bind(this, 'decompressor'));
      this._connection.on('error', this._error.bind(this, 'connection'));
    
      this._connection.on('peerError', this.emit.bind(this, 'peerError'));
    };
    
    Endpoint.prototype._error = function _error(component, error) {
      this._log.fatal({ source: component, message: error }, 'Fatal error, closing connection');
      this.close(error);
      setImmediate(this.emit.bind(this, 'error', error));
    };
    
    Endpoint.prototype.close = function close(error) {
      this._connection.close(error);
    };
  • ¶

    Bunyan serializers

  • ¶
    exports.serializers = {};
    
    var nextId = 0;
    exports.serializers.e = function(endpoint) {
      if (!('id' in endpoint)) {
        endpoint.id = nextId;
        nextId += 1;
      }
      return endpoint.id;
    };