diff --git a/src/transports/net.js b/src/transports/net.js index dcd7d786..50c33b95 100644 --- a/src/transports/net.js +++ b/src/transports/net.js @@ -24,24 +24,51 @@ module.exports = class Connection extends EventEmitter { this.state = SOCK_DISCONNECTED; this.last_socket_error = null; this.socket_events = []; + this.requested_disconnect = false; this.encoding = 'utf8'; + this._encodingIsUtf8 = true; + + this.write_queue = []; + this.write_draining = false; } isConnected() { return this.state === SOCK_CONNECTED; } + _encodeData(line) { + return this._encodingIsUtf8 ? + line + '\r\n' : + iconv.encode(line + '\r\n', this.encoding); + } + + _flushWriteQueue() { + if (!this.socket) return; + this.write_draining = false; + while (this.write_queue.length > 0) { + const { data, cb } = this.write_queue.shift(); + const ok = this.socket.write(data, cb); + if (!ok) { + this.write_draining = true; + return; + } + } + } + writeLine(line, cb) { if (this.socket && this.isConnected()) { - if (this.encoding !== 'utf8') { - this.socket.write(iconv.encode(line + '\r\n', this.encoding), cb); + const data = this._encodeData(line); + if (this.write_draining) { + this.write_queue.push({ data, cb }); } else { - this.socket.write(line + '\r\n', cb); + const ok = this.socket.write(data, cb); + if (!ok) { + this.write_draining = true; + } } } else { this.debugOut('writeLine() called when not connected'); - if (cb) { process.nextTick(cb); } @@ -63,6 +90,7 @@ module.exports = class Connection extends EventEmitter { _unbindEvents() { this.socket_events.forEach(fn => fn()); + this.socket_events = []; } connect() { @@ -76,6 +104,8 @@ module.exports = class Connection extends EventEmitter { this.disposeSocket(); this.requested_disconnect = false; this.incoming_buffer = Buffer.from(''); + this.write_queue = []; + this.write_draining = false; // Include server name (SNI) if provided host is not an IP address if (!this.getAddressFamily(ircd_host)) { @@ -125,16 +155,15 @@ module.exports = class Connection extends EventEmitter { this._onSocketCreate(options, connection); }).catch(this.onSocketError.bind(this)); } else { - let socket = null; if ((options.tls || options.ssl) && options.path) { - socket = this.socket = tls.connect({ + this.socket = tls.connect({ path: options.path, rejectUnauthorized: options.rejectUnauthorized, key: options.client_certificate && options.client_certificate.private_key, cert: options.client_certificate && options.client_certificate.certificate, }); } else if (options.tls || options.ssl) { - socket = this.socket = tls.connect({ + this.socket = tls.connect({ servername: sni, host: ircd_host, port: ircd_port, @@ -145,18 +174,18 @@ module.exports = class Connection extends EventEmitter { family: this.getAddressFamily(options.outgoing_addr) }); } else if (options.path) { - socket = this.socket = net.connect({ + this.socket = net.connect({ path: options.path }); } else { - socket = this.socket = net.connect({ + this.socket = net.connect({ host: ircd_host, port: ircd_port, localAddress: options.outgoing_addr, family: this.getAddressFamily(options.outgoing_addr) }); } - this._onSocketCreate(options, socket); + this._onSocketCreate(options, this.socket); } } @@ -181,6 +210,7 @@ module.exports = class Connection extends EventEmitter { socket instanceof tls.TLSSocket ? 'secureConnect' : 'connect', this.onSocketFullyConnected.bind(this) ); + this._bindEvent(socket, 'drain', this._flushWriteQueue.bind(this)); this._bindEvent(socket, 'close', this.onSocketClose.bind(this)); this._bindEvent(socket, 'error', this.onSocketError.bind(this)); this._bindEvent(socket, 'data', this.onSocketData.bind(this)); @@ -205,13 +235,12 @@ module.exports = class Connection extends EventEmitter { onSocketClose() { this.debugOut('socketClose()'); this.state = SOCK_DISCONNECTED; - this.emit('close', this.last_socket_error ? this.last_socket_error : false); + this.emit('close', this.last_socket_error || false); } onSocketError(err) { this.debugOut('socketError() ' + err.message); this.last_socket_error = err; - // this.emit('error', err); } onSocketTimeout() { @@ -221,29 +250,28 @@ module.exports = class Connection extends EventEmitter { onSocketData(data) { // Buffer incoming data because multiple messages can arrive at once - // without necessarily ending in a new line - this.incoming_buffer = Buffer.concat( - [this.incoming_buffer, data], - this.incoming_buffer.length + data.length - ); + // without necessarily ending in a new line. + // Only concat if we have a leftover partial line from a previous chunk. + const buffer = this.incoming_buffer.length > 0 ? + Buffer.concat([this.incoming_buffer, data], this.incoming_buffer.length + data.length) : + data; let startIndex = 0; while (true) { // Search for the next new line in the buffered data - const endIndex = this.incoming_buffer.indexOf(0x0A, startIndex) + 1; + const endIndex = buffer.indexOf(0x0A, startIndex) + 1; // If this message is partial, keep it in the buffer until more data arrives. - // If startIndex is equal to incoming_buffer.length, that means we reached the end + // If startIndex is equal to buffer.length, that means we reached the end // of the buffer and it ended on a new line, slice will return an empty buffer. if (endIndex === 0) { - this.incoming_buffer = this.incoming_buffer.slice(startIndex); + this.incoming_buffer = buffer.slice(startIndex); break; } // Slice a single message delimited by a new line, decode it and emit it out - let line = this.incoming_buffer.slice(startIndex, endIndex); - line = iconv.decode(line, this.encoding); + const line = iconv.decode(buffer.slice(startIndex, endIndex), this.encoding); this.emit('line', line); startIndex = endIndex; @@ -280,19 +308,17 @@ module.exports = class Connection extends EventEmitter { } setEncoding(encoding) { - let encoded_test; - this.debugOut('Connection.setEncoding() encoding=' + encoding); try { const testString = 'TEST\r\ntest'; - - encoded_test = iconv.encode(testString, encoding); + const encoded_test = iconv.encode(testString, encoding); // This test is done to check if this encoding also supports // the ASCII charset required by the IRC protocols // (Avoid the use of base64 or incompatible encodings) if (encoded_test.toString('ascii') === testString) { this.encoding = encoding; + this._encodingIsUtf8 = (encoding === 'utf8'); return true; } return false; @@ -308,5 +334,8 @@ module.exports = class Connection extends EventEmitter { if (net.isIPv6(addr)) { return 6; } + // Returns undefined for hostnames or unset addresses. + // Callers rely on this being falsy (e.g. for SNI detection). + return undefined; } };