| // @ts-check |
| |
| 'use strict' |
| |
| /* global WebAssembly */ |
| |
| const assert = require('assert') |
| const net = require('net') |
| const http = require('http') |
| const { pipeline } = require('stream') |
| const util = require('./core/util') |
| const timers = require('./timers') |
| const Request = require('./core/request') |
| const DispatcherBase = require('./dispatcher-base') |
| const { |
| RequestContentLengthMismatchError, |
| ResponseContentLengthMismatchError, |
| InvalidArgumentError, |
| RequestAbortedError, |
| HeadersTimeoutError, |
| HeadersOverflowError, |
| SocketError, |
| InformationalError, |
| BodyTimeoutError, |
| HTTPParserError, |
| ResponseExceededMaxSizeError, |
| ClientDestroyedError |
| } = require('./core/errors') |
| const buildConnector = require('./core/connect') |
| const { |
| kUrl, |
| kReset, |
| kServerName, |
| kClient, |
| kBusy, |
| kParser, |
| kConnect, |
| kBlocking, |
| kResuming, |
| kRunning, |
| kPending, |
| kSize, |
| kWriting, |
| kQueue, |
| kConnected, |
| kConnecting, |
| kNeedDrain, |
| kNoRef, |
| kKeepAliveDefaultTimeout, |
| kHostHeader, |
| kPendingIdx, |
| kRunningIdx, |
| kError, |
| kPipelining, |
| kSocket, |
| kKeepAliveTimeoutValue, |
| kMaxHeadersSize, |
| kKeepAliveMaxTimeout, |
| kKeepAliveTimeoutThreshold, |
| kHeadersTimeout, |
| kBodyTimeout, |
| kStrictContentLength, |
| kConnector, |
| kMaxRedirections, |
| kMaxRequests, |
| kCounter, |
| kClose, |
| kDestroy, |
| kDispatch, |
| kInterceptors, |
| kLocalAddress, |
| kMaxResponseSize, |
| kHTTPConnVersion, |
| // HTTP2 |
| kHost, |
| kHTTP2Session, |
| kHTTP2SessionState, |
| kHTTP2BuildRequest, |
| kHTTP2CopyHeaders, |
| kHTTP1BuildRequest |
| } = require('./core/symbols') |
| |
| /** @type {import('http2')} */ |
| let http2 |
| try { |
| http2 = require('http2') |
| } catch { |
| // @ts-ignore |
| http2 = { constants: {} } |
| } |
| |
| const { |
| constants: { |
| HTTP2_HEADER_AUTHORITY, |
| HTTP2_HEADER_METHOD, |
| HTTP2_HEADER_PATH, |
| HTTP2_HEADER_SCHEME, |
| HTTP2_HEADER_CONTENT_LENGTH, |
| HTTP2_HEADER_EXPECT, |
| HTTP2_HEADER_STATUS |
| } |
| } = http2 |
| |
| // Experimental |
| let h2ExperimentalWarned = false |
| |
| const FastBuffer = Buffer[Symbol.species] |
| |
| const kClosedResolve = Symbol('kClosedResolve') |
| |
| const channels = {} |
| |
| try { |
| const diagnosticsChannel = require('diagnostics_channel') |
| channels.sendHeaders = diagnosticsChannel.channel('undici:client:sendHeaders') |
| channels.beforeConnect = diagnosticsChannel.channel('undici:client:beforeConnect') |
| channels.connectError = diagnosticsChannel.channel('undici:client:connectError') |
| channels.connected = diagnosticsChannel.channel('undici:client:connected') |
| } catch { |
| channels.sendHeaders = { hasSubscribers: false } |
| channels.beforeConnect = { hasSubscribers: false } |
| channels.connectError = { hasSubscribers: false } |
| channels.connected = { hasSubscribers: false } |
| } |
| |
| /** |
| * @type {import('../types/client').default} |
| */ |
| class Client extends DispatcherBase { |
| /** |
| * |
| * @param {string|URL} url |
| * @param {import('../types/client').Client.Options} options |
| */ |
| constructor (url, { |
| interceptors, |
| maxHeaderSize, |
| headersTimeout, |
| socketTimeout, |
| requestTimeout, |
| connectTimeout, |
| bodyTimeout, |
| idleTimeout, |
| keepAlive, |
| keepAliveTimeout, |
| maxKeepAliveTimeout, |
| keepAliveMaxTimeout, |
| keepAliveTimeoutThreshold, |
| socketPath, |
| pipelining, |
| tls, |
| strictContentLength, |
| maxCachedSessions, |
| maxRedirections, |
| connect, |
| maxRequestsPerClient, |
| localAddress, |
| maxResponseSize, |
| autoSelectFamily, |
| autoSelectFamilyAttemptTimeout, |
| // h2 |
| allowH2, |
| maxConcurrentStreams |
| } = {}) { |
| super() |
| |
| if (keepAlive !== undefined) { |
| throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead') |
| } |
| |
| if (socketTimeout !== undefined) { |
| throw new InvalidArgumentError('unsupported socketTimeout, use headersTimeout & bodyTimeout instead') |
| } |
| |
| if (requestTimeout !== undefined) { |
| throw new InvalidArgumentError('unsupported requestTimeout, use headersTimeout & bodyTimeout instead') |
| } |
| |
| if (idleTimeout !== undefined) { |
| throw new InvalidArgumentError('unsupported idleTimeout, use keepAliveTimeout instead') |
| } |
| |
| if (maxKeepAliveTimeout !== undefined) { |
| throw new InvalidArgumentError('unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead') |
| } |
| |
| if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) { |
| throw new InvalidArgumentError('invalid maxHeaderSize') |
| } |
| |
| if (socketPath != null && typeof socketPath !== 'string') { |
| throw new InvalidArgumentError('invalid socketPath') |
| } |
| |
| if (connectTimeout != null && (!Number.isFinite(connectTimeout) || connectTimeout < 0)) { |
| throw new InvalidArgumentError('invalid connectTimeout') |
| } |
| |
| if (keepAliveTimeout != null && (!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0)) { |
| throw new InvalidArgumentError('invalid keepAliveTimeout') |
| } |
| |
| if (keepAliveMaxTimeout != null && (!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0)) { |
| throw new InvalidArgumentError('invalid keepAliveMaxTimeout') |
| } |
| |
| if (keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold)) { |
| throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold') |
| } |
| |
| if (headersTimeout != null && (!Number.isInteger(headersTimeout) || headersTimeout < 0)) { |
| throw new InvalidArgumentError('headersTimeout must be a positive integer or zero') |
| } |
| |
| if (bodyTimeout != null && (!Number.isInteger(bodyTimeout) || bodyTimeout < 0)) { |
| throw new InvalidArgumentError('bodyTimeout must be a positive integer or zero') |
| } |
| |
| if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') { |
| throw new InvalidArgumentError('connect must be a function or an object') |
| } |
| |
| if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) { |
| throw new InvalidArgumentError('maxRedirections must be a positive number') |
| } |
| |
| if (maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)) { |
| throw new InvalidArgumentError('maxRequestsPerClient must be a positive number') |
| } |
| |
| if (localAddress != null && (typeof localAddress !== 'string' || net.isIP(localAddress) === 0)) { |
| throw new InvalidArgumentError('localAddress must be valid string IP address') |
| } |
| |
| if (maxResponseSize != null && (!Number.isInteger(maxResponseSize) || maxResponseSize < -1)) { |
| throw new InvalidArgumentError('maxResponseSize must be a positive number') |
| } |
| |
| if ( |
| autoSelectFamilyAttemptTimeout != null && |
| (!Number.isInteger(autoSelectFamilyAttemptTimeout) || autoSelectFamilyAttemptTimeout < -1) |
| ) { |
| throw new InvalidArgumentError('autoSelectFamilyAttemptTimeout must be a positive number') |
| } |
| |
| // h2 |
| if (allowH2 != null && typeof allowH2 !== 'boolean') { |
| throw new InvalidArgumentError('allowH2 must be a valid boolean value') |
| } |
| |
| if (maxConcurrentStreams != null && (typeof maxConcurrentStreams !== 'number' || maxConcurrentStreams < 1)) { |
| throw new InvalidArgumentError('maxConcurrentStreams must be a possitive integer, greater than 0') |
| } |
| |
| if (typeof connect !== 'function') { |
| connect = buildConnector({ |
| ...tls, |
| maxCachedSessions, |
| allowH2, |
| socketPath, |
| timeout: connectTimeout, |
| ...(util.nodeHasAutoSelectFamily && autoSelectFamily ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined), |
| ...connect |
| }) |
| } |
| |
| this[kInterceptors] = interceptors && interceptors.Client && Array.isArray(interceptors.Client) |
| ? interceptors.Client |
| : [createRedirectInterceptor({ maxRedirections })] |
| this[kUrl] = util.parseOrigin(url) |
| this[kConnector] = connect |
| this[kSocket] = null |
| this[kPipelining] = pipelining != null ? pipelining : 1 |
| this[kMaxHeadersSize] = maxHeaderSize || http.maxHeaderSize |
| this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout |
| this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout |
| this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 1e3 : keepAliveTimeoutThreshold |
| this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout] |
| this[kServerName] = null |
| this[kLocalAddress] = localAddress != null ? localAddress : null |
| this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming |
| this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming |
| this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n` |
| this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3 |
| this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3 |
| this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength |
| this[kMaxRedirections] = maxRedirections |
| this[kMaxRequests] = maxRequestsPerClient |
| this[kClosedResolve] = null |
| this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1 |
| this[kHTTPConnVersion] = 'h1' |
| |
| // HTTP/2 |
| this[kHTTP2Session] = null |
| this[kHTTP2SessionState] = !allowH2 |
| ? null |
| : { |
| // streams: null, // Fixed queue of streams - For future support of `push` |
| openStreams: 0, // Keep track of them to decide wether or not unref the session |
| maxConcurrentStreams: maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server |
| } |
| this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}` |
| |
| // kQueue is built up of 3 sections separated by |
| // the kRunningIdx and kPendingIdx indices. |
| // | complete | running | pending | |
| // ^ kRunningIdx ^ kPendingIdx ^ kQueue.length |
| // kRunningIdx points to the first running element. |
| // kPendingIdx points to the first pending element. |
| // This implements a fast queue with an amortized |
| // time of O(1). |
| |
| this[kQueue] = [] |
| this[kRunningIdx] = 0 |
| this[kPendingIdx] = 0 |
| } |
| |
| get pipelining () { |
| return this[kPipelining] |
| } |
| |
| set pipelining (value) { |
| this[kPipelining] = value |
| resume(this, true) |
| } |
| |
| get [kPending] () { |
| return this[kQueue].length - this[kPendingIdx] |
| } |
| |
| get [kRunning] () { |
| return this[kPendingIdx] - this[kRunningIdx] |
| } |
| |
| get [kSize] () { |
| return this[kQueue].length - this[kRunningIdx] |
| } |
| |
| get [kConnected] () { |
| return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed |
| } |
| |
| get [kBusy] () { |
| const socket = this[kSocket] |
| return ( |
| (socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) || |
| (this[kSize] >= (this[kPipelining] || 1)) || |
| this[kPending] > 0 |
| ) |
| } |
| |
| /* istanbul ignore: only used for test */ |
| [kConnect] (cb) { |
| connect(this) |
| this.once('connect', cb) |
| } |
| |
| [kDispatch] (opts, handler) { |
| const origin = opts.origin || this[kUrl].origin |
| |
| const request = this[kHTTPConnVersion] === 'h2' |
| ? Request[kHTTP2BuildRequest](origin, opts, handler) |
| : Request[kHTTP1BuildRequest](origin, opts, handler) |
| |
| this[kQueue].push(request) |
| if (this[kResuming]) { |
| // Do nothing. |
| } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) { |
| // Wait a tick in case stream/iterator is ended in the same tick. |
| this[kResuming] = 1 |
| process.nextTick(resume, this) |
| } else { |
| resume(this, true) |
| } |
| |
| if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) { |
| this[kNeedDrain] = 2 |
| } |
| |
| return this[kNeedDrain] < 2 |
| } |
| |
| async [kClose] () { |
| // TODO: for H2 we need to gracefully flush the remaining enqueued |
| // request and close each stream. |
| return new Promise((resolve) => { |
| if (!this[kSize]) { |
| resolve(null) |
| } else { |
| this[kClosedResolve] = resolve |
| } |
| }) |
| } |
| |
| async [kDestroy] (err) { |
| return new Promise((resolve) => { |
| const requests = this[kQueue].splice(this[kPendingIdx]) |
| for (let i = 0; i < requests.length; i++) { |
| const request = requests[i] |
| errorRequest(this, request, err) |
| } |
| |
| const callback = () => { |
| if (this[kClosedResolve]) { |
| // TODO (fix): Should we error here with ClientDestroyedError? |
| this[kClosedResolve]() |
| this[kClosedResolve] = null |
| } |
| resolve() |
| } |
| |
| if (this[kHTTP2Session] != null) { |
| util.destroy(this[kHTTP2Session], err) |
| this[kHTTP2Session] = null |
| this[kHTTP2SessionState] = null |
| } |
| |
| if (!this[kSocket]) { |
| queueMicrotask(callback) |
| } else { |
| util.destroy(this[kSocket].on('close', callback), err) |
| } |
| |
| resume(this) |
| }) |
| } |
| } |
| |
| function onHttp2SessionError (err) { |
| assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') |
| |
| this[kSocket][kError] = err |
| |
| onError(this[kClient], err) |
| } |
| |
| function onHttp2FrameError (type, code, id) { |
| const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) |
| |
| if (id === 0) { |
| this[kSocket][kError] = err |
| onError(this[kClient], err) |
| } |
| } |
| |
| function onHttp2SessionEnd () { |
| util.destroy(this, new SocketError('other side closed')) |
| util.destroy(this[kSocket], new SocketError('other side closed')) |
| } |
| |
| function onHTTP2GoAway (code) { |
| const client = this[kClient] |
| const err = new InformationalError(`HTTP/2: "GOAWAY" frame received with code ${code}`) |
| client[kSocket] = null |
| client[kHTTP2Session] = null |
| |
| if (client.destroyed) { |
| assert(this[kPending] === 0) |
| |
| // Fail entire queue. |
| const requests = client[kQueue].splice(client[kRunningIdx]) |
| for (let i = 0; i < requests.length; i++) { |
| const request = requests[i] |
| errorRequest(this, request, err) |
| } |
| } else if (client[kRunning] > 0) { |
| // Fail head of pipeline. |
| const request = client[kQueue][client[kRunningIdx]] |
| client[kQueue][client[kRunningIdx]++] = null |
| |
| errorRequest(client, request, err) |
| } |
| |
| client[kPendingIdx] = client[kRunningIdx] |
| |
| assert(client[kRunning] === 0) |
| |
| client.emit('disconnect', |
| client[kUrl], |
| [client], |
| err |
| ) |
| |
| resume(client) |
| } |
| |
| const constants = require('./llhttp/constants') |
| const createRedirectInterceptor = require('./interceptor/redirectInterceptor') |
| const EMPTY_BUF = Buffer.alloc(0) |
| |
| async function lazyllhttp () { |
| const llhttpWasmData = process.env.JEST_WORKER_ID ? require('./llhttp/llhttp-wasm.js') : undefined |
| |
| let mod |
| try { |
| mod = await WebAssembly.compile(Buffer.from(require('./llhttp/llhttp_simd-wasm.js'), 'base64')) |
| } catch (e) { |
| /* istanbul ignore next */ |
| |
| // We could check if the error was caused by the simd option not |
| // being enabled, but the occurring of this other error |
| // * https://github.com/emscripten-core/emscripten/issues/11495 |
| // got me to remove that check to avoid breaking Node 12. |
| mod = await WebAssembly.compile(Buffer.from(llhttpWasmData || require('./llhttp/llhttp-wasm.js'), 'base64')) |
| } |
| |
| return await WebAssembly.instantiate(mod, { |
| env: { |
| /* eslint-disable camelcase */ |
| |
| wasm_on_url: (p, at, len) => { |
| /* istanbul ignore next */ |
| return 0 |
| }, |
| wasm_on_status: (p, at, len) => { |
| assert.strictEqual(currentParser.ptr, p) |
| const start = at - currentBufferPtr + currentBufferRef.byteOffset |
| return currentParser.onStatus(new FastBuffer(currentBufferRef.buffer, start, len)) || 0 |
| }, |
| wasm_on_message_begin: (p) => { |
| assert.strictEqual(currentParser.ptr, p) |
| return currentParser.onMessageBegin() || 0 |
| }, |
| wasm_on_header_field: (p, at, len) => { |
| assert.strictEqual(currentParser.ptr, p) |
| const start = at - currentBufferPtr + currentBufferRef.byteOffset |
| return currentParser.onHeaderField(new FastBuffer(currentBufferRef.buffer, start, len)) || 0 |
| }, |
| wasm_on_header_value: (p, at, len) => { |
| assert.strictEqual(currentParser.ptr, p) |
| const start = at - currentBufferPtr + currentBufferRef.byteOffset |
| return currentParser.onHeaderValue(new FastBuffer(currentBufferRef.buffer, start, len)) || 0 |
| }, |
| wasm_on_headers_complete: (p, statusCode, upgrade, shouldKeepAlive) => { |
| assert.strictEqual(currentParser.ptr, p) |
| return currentParser.onHeadersComplete(statusCode, Boolean(upgrade), Boolean(shouldKeepAlive)) || 0 |
| }, |
| wasm_on_body: (p, at, len) => { |
| assert.strictEqual(currentParser.ptr, p) |
| const start = at - currentBufferPtr + currentBufferRef.byteOffset |
| return currentParser.onBody(new FastBuffer(currentBufferRef.buffer, start, len)) || 0 |
| }, |
| wasm_on_message_complete: (p) => { |
| assert.strictEqual(currentParser.ptr, p) |
| return currentParser.onMessageComplete() || 0 |
| } |
| |
| /* eslint-enable camelcase */ |
| } |
| }) |
| } |
| |
| let llhttpInstance = null |
| let llhttpPromise = lazyllhttp() |
| llhttpPromise.catch() |
| |
| let currentParser = null |
| let currentBufferRef = null |
| let currentBufferSize = 0 |
| let currentBufferPtr = null |
| |
| const TIMEOUT_HEADERS = 1 |
| const TIMEOUT_BODY = 2 |
| const TIMEOUT_IDLE = 3 |
| |
| class Parser { |
| constructor (client, socket, { exports }) { |
| assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0) |
| |
| this.llhttp = exports |
| this.ptr = this.llhttp.llhttp_alloc(constants.TYPE.RESPONSE) |
| this.client = client |
| this.socket = socket |
| this.timeout = null |
| this.timeoutValue = null |
| this.timeoutType = null |
| this.statusCode = null |
| this.statusText = '' |
| this.upgrade = false |
| this.headers = [] |
| this.headersSize = 0 |
| this.headersMaxSize = client[kMaxHeadersSize] |
| this.shouldKeepAlive = false |
| this.paused = false |
| this.resume = this.resume.bind(this) |
| |
| this.bytesRead = 0 |
| |
| this.keepAlive = '' |
| this.contentLength = '' |
| this.connection = '' |
| this.maxResponseSize = client[kMaxResponseSize] |
| } |
| |
| setTimeout (value, type) { |
| this.timeoutType = type |
| if (value !== this.timeoutValue) { |
| timers.clearTimeout(this.timeout) |
| if (value) { |
| this.timeout = timers.setTimeout(onParserTimeout, value, this) |
| // istanbul ignore else: only for jest |
| if (this.timeout.unref) { |
| this.timeout.unref() |
| } |
| } else { |
| this.timeout = null |
| } |
| this.timeoutValue = value |
| } else if (this.timeout) { |
| // istanbul ignore else: only for jest |
| if (this.timeout.refresh) { |
| this.timeout.refresh() |
| } |
| } |
| } |
| |
| resume () { |
| if (this.socket.destroyed || !this.paused) { |
| return |
| } |
| |
| assert(this.ptr != null) |
| assert(currentParser == null) |
| |
| this.llhttp.llhttp_resume(this.ptr) |
| |
| assert(this.timeoutType === TIMEOUT_BODY) |
| if (this.timeout) { |
| // istanbul ignore else: only for jest |
| if (this.timeout.refresh) { |
| this.timeout.refresh() |
| } |
| } |
| |
| this.paused = false |
| this.execute(this.socket.read() || EMPTY_BUF) // Flush parser. |
| this.readMore() |
| } |
| |
| readMore () { |
| while (!this.paused && this.ptr) { |
| const chunk = this.socket.read() |
| if (chunk === null) { |
| break |
| } |
| this.execute(chunk) |
| } |
| } |
| |
| execute (data) { |
| assert(this.ptr != null) |
| assert(currentParser == null) |
| assert(!this.paused) |
| |
| const { socket, llhttp } = this |
| |
| if (data.length > currentBufferSize) { |
| if (currentBufferPtr) { |
| llhttp.free(currentBufferPtr) |
| } |
| currentBufferSize = Math.ceil(data.length / 4096) * 4096 |
| currentBufferPtr = llhttp.malloc(currentBufferSize) |
| } |
| |
| new Uint8Array(llhttp.memory.buffer, currentBufferPtr, currentBufferSize).set(data) |
| |
| // Call `execute` on the wasm parser. |
| // We pass the `llhttp_parser` pointer address, the pointer address of buffer view data, |
| // and finally the length of bytes to parse. |
| // The return value is an error code or `constants.ERROR.OK`. |
| try { |
| let ret |
| |
| try { |
| currentBufferRef = data |
| currentParser = this |
| ret = llhttp.llhttp_execute(this.ptr, currentBufferPtr, data.length) |
| /* eslint-disable-next-line no-useless-catch */ |
| } catch (err) { |
| /* istanbul ignore next: difficult to make a test case for */ |
| throw err |
| } finally { |
| currentParser = null |
| currentBufferRef = null |
| } |
| |
| const offset = llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr |
| |
| if (ret === constants.ERROR.PAUSED_UPGRADE) { |
| this.onUpgrade(data.slice(offset)) |
| } else if (ret === constants.ERROR.PAUSED) { |
| this.paused = true |
| socket.unshift(data.slice(offset)) |
| } else if (ret !== constants.ERROR.OK) { |
| const ptr = llhttp.llhttp_get_error_reason(this.ptr) |
| let message = '' |
| /* istanbul ignore else: difficult to make a test case for */ |
| if (ptr) { |
| const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0) |
| message = |
| 'Response does not match the HTTP/1.1 protocol (' + |
| Buffer.from(llhttp.memory.buffer, ptr, len).toString() + |
| ')' |
| } |
| throw new HTTPParserError(message, constants.ERROR[ret], data.slice(offset)) |
| } |
| } catch (err) { |
| util.destroy(socket, err) |
| } |
| } |
| |
| destroy () { |
| assert(this.ptr != null) |
| assert(currentParser == null) |
| |
| this.llhttp.llhttp_free(this.ptr) |
| this.ptr = null |
| |
| timers.clearTimeout(this.timeout) |
| this.timeout = null |
| this.timeoutValue = null |
| this.timeoutType = null |
| |
| this.paused = false |
| } |
| |
| onStatus (buf) { |
| this.statusText = buf.toString() |
| } |
| |
| onMessageBegin () { |
| const { socket, client } = this |
| |
| /* istanbul ignore next: difficult to make a test case for */ |
| if (socket.destroyed) { |
| return -1 |
| } |
| |
| const request = client[kQueue][client[kRunningIdx]] |
| if (!request) { |
| return -1 |
| } |
| } |
| |
| onHeaderField (buf) { |
| const len = this.headers.length |
| |
| if ((len & 1) === 0) { |
| this.headers.push(buf) |
| } else { |
| this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf]) |
| } |
| |
| this.trackHeader(buf.length) |
| } |
| |
| onHeaderValue (buf) { |
| let len = this.headers.length |
| |
| if ((len & 1) === 1) { |
| this.headers.push(buf) |
| len += 1 |
| } else { |
| this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf]) |
| } |
| |
| const key = this.headers[len - 2] |
| if (key.length === 10 && key.toString().toLowerCase() === 'keep-alive') { |
| this.keepAlive += buf.toString() |
| } else if (key.length === 10 && key.toString().toLowerCase() === 'connection') { |
| this.connection += buf.toString() |
| } else if (key.length === 14 && key.toString().toLowerCase() === 'content-length') { |
| this.contentLength += buf.toString() |
| } |
| |
| this.trackHeader(buf.length) |
| } |
| |
| trackHeader (len) { |
| this.headersSize += len |
| if (this.headersSize >= this.headersMaxSize) { |
| util.destroy(this.socket, new HeadersOverflowError()) |
| } |
| } |
| |
| onUpgrade (head) { |
| const { upgrade, client, socket, headers, statusCode } = this |
| |
| assert(upgrade) |
| |
| const request = client[kQueue][client[kRunningIdx]] |
| assert(request) |
| |
| assert(!socket.destroyed) |
| assert(socket === client[kSocket]) |
| assert(!this.paused) |
| assert(request.upgrade || request.method === 'CONNECT') |
| |
| this.statusCode = null |
| this.statusText = '' |
| this.shouldKeepAlive = null |
| |
| assert(this.headers.length % 2 === 0) |
| this.headers = [] |
| this.headersSize = 0 |
| |
| socket.unshift(head) |
| |
| socket[kParser].destroy() |
| socket[kParser] = null |
| |
| socket[kClient] = null |
| socket[kError] = null |
| socket |
| .removeListener('error', onSocketError) |
| .removeListener('readable', onSocketReadable) |
| .removeListener('end', onSocketEnd) |
| .removeListener('close', onSocketClose) |
| |
| client[kSocket] = null |
| client[kQueue][client[kRunningIdx]++] = null |
| client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade')) |
| |
| try { |
| request.onUpgrade(statusCode, headers, socket) |
| } catch (err) { |
| util.destroy(socket, err) |
| } |
| |
| resume(client) |
| } |
| |
| onHeadersComplete (statusCode, upgrade, shouldKeepAlive) { |
| const { client, socket, headers, statusText } = this |
| |
| /* istanbul ignore next: difficult to make a test case for */ |
| if (socket.destroyed) { |
| return -1 |
| } |
| |
| const request = client[kQueue][client[kRunningIdx]] |
| |
| /* istanbul ignore next: difficult to make a test case for */ |
| if (!request) { |
| return -1 |
| } |
| |
| assert(!this.upgrade) |
| assert(this.statusCode < 200) |
| |
| if (statusCode === 100) { |
| util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket))) |
| return -1 |
| } |
| |
| /* this can only happen if server is misbehaving */ |
| if (upgrade && !request.upgrade) { |
| util.destroy(socket, new SocketError('bad upgrade', util.getSocketInfo(socket))) |
| return -1 |
| } |
| |
| assert.strictEqual(this.timeoutType, TIMEOUT_HEADERS) |
| |
| this.statusCode = statusCode |
| this.shouldKeepAlive = ( |
| shouldKeepAlive || |
| // Override llhttp value which does not allow keepAlive for HEAD. |
| (request.method === 'HEAD' && !socket[kReset] && this.connection.toLowerCase() === 'keep-alive') |
| ) |
| |
| if (this.statusCode >= 200) { |
| const bodyTimeout = request.bodyTimeout != null |
| ? request.bodyTimeout |
| : client[kBodyTimeout] |
| this.setTimeout(bodyTimeout, TIMEOUT_BODY) |
| } else if (this.timeout) { |
| // istanbul ignore else: only for jest |
| if (this.timeout.refresh) { |
| this.timeout.refresh() |
| } |
| } |
| |
| if (request.method === 'CONNECT') { |
| assert(client[kRunning] === 1) |
| this.upgrade = true |
| return 2 |
| } |
| |
| if (upgrade) { |
| assert(client[kRunning] === 1) |
| this.upgrade = true |
| return 2 |
| } |
| |
| assert(this.headers.length % 2 === 0) |
| this.headers = [] |
| this.headersSize = 0 |
| |
| if (this.shouldKeepAlive && client[kPipelining]) { |
| const keepAliveTimeout = this.keepAlive ? util.parseKeepAliveTimeout(this.keepAlive) : null |
| |
| if (keepAliveTimeout != null) { |
| const timeout = Math.min( |
| keepAliveTimeout - client[kKeepAliveTimeoutThreshold], |
| client[kKeepAliveMaxTimeout] |
| ) |
| if (timeout <= 0) { |
| socket[kReset] = true |
| } else { |
| client[kKeepAliveTimeoutValue] = timeout |
| } |
| } else { |
| client[kKeepAliveTimeoutValue] = client[kKeepAliveDefaultTimeout] |
| } |
| } else { |
| // Stop more requests from being dispatched. |
| socket[kReset] = true |
| } |
| |
| const pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false |
| |
| if (request.aborted) { |
| return -1 |
| } |
| |
| if (request.method === 'HEAD') { |
| return 1 |
| } |
| |
| if (statusCode < 200) { |
| return 1 |
| } |
| |
| if (socket[kBlocking]) { |
| socket[kBlocking] = false |
| resume(client) |
| } |
| |
| return pause ? constants.ERROR.PAUSED : 0 |
| } |
| |
| onBody (buf) { |
| const { client, socket, statusCode, maxResponseSize } = this |
| |
| if (socket.destroyed) { |
| return -1 |
| } |
| |
| const request = client[kQueue][client[kRunningIdx]] |
| assert(request) |
| |
| assert.strictEqual(this.timeoutType, TIMEOUT_BODY) |
| if (this.timeout) { |
| // istanbul ignore else: only for jest |
| if (this.timeout.refresh) { |
| this.timeout.refresh() |
| } |
| } |
| |
| assert(statusCode >= 200) |
| |
| if (maxResponseSize > -1 && this.bytesRead + buf.length > maxResponseSize) { |
| util.destroy(socket, new ResponseExceededMaxSizeError()) |
| return -1 |
| } |
| |
| this.bytesRead += buf.length |
| |
| if (request.onData(buf) === false) { |
| return constants.ERROR.PAUSED |
| } |
| } |
| |
| onMessageComplete () { |
| const { client, socket, statusCode, upgrade, headers, contentLength, bytesRead, shouldKeepAlive } = this |
| |
| if (socket.destroyed && (!statusCode || shouldKeepAlive)) { |
| return -1 |
| } |
| |
| if (upgrade) { |
| return |
| } |
| |
| const request = client[kQueue][client[kRunningIdx]] |
| assert(request) |
| |
| assert(statusCode >= 100) |
| |
| this.statusCode = null |
| this.statusText = '' |
| this.bytesRead = 0 |
| this.contentLength = '' |
| this.keepAlive = '' |
| this.connection = '' |
| |
| assert(this.headers.length % 2 === 0) |
| this.headers = [] |
| this.headersSize = 0 |
| |
| if (statusCode < 200) { |
| return |
| } |
| |
| /* istanbul ignore next: should be handled by llhttp? */ |
| if (request.method !== 'HEAD' && contentLength && bytesRead !== parseInt(contentLength, 10)) { |
| util.destroy(socket, new ResponseContentLengthMismatchError()) |
| return -1 |
| } |
| |
| request.onComplete(headers) |
| |
| client[kQueue][client[kRunningIdx]++] = null |
| |
| if (socket[kWriting]) { |
| assert.strictEqual(client[kRunning], 0) |
| // Response completed before request. |
| util.destroy(socket, new InformationalError('reset')) |
| return constants.ERROR.PAUSED |
| } else if (!shouldKeepAlive) { |
| util.destroy(socket, new InformationalError('reset')) |
| return constants.ERROR.PAUSED |
| } else if (socket[kReset] && client[kRunning] === 0) { |
| // Destroy socket once all requests have completed. |
| // The request at the tail of the pipeline is the one |
| // that requested reset and no further requests should |
| // have been queued since then. |
| util.destroy(socket, new InformationalError('reset')) |
| return constants.ERROR.PAUSED |
| } else if (client[kPipelining] === 1) { |
| // We must wait a full event loop cycle to reuse this socket to make sure |
| // that non-spec compliant servers are not closing the connection even if they |
| // said they won't. |
| setImmediate(resume, client) |
| } else { |
| resume(client) |
| } |
| } |
| } |
| |
| function onParserTimeout (parser) { |
| const { socket, timeoutType, client } = parser |
| |
| /* istanbul ignore else */ |
| if (timeoutType === TIMEOUT_HEADERS) { |
| if (!socket[kWriting] || socket.writableNeedDrain || client[kRunning] > 1) { |
| assert(!parser.paused, 'cannot be paused while waiting for headers') |
| util.destroy(socket, new HeadersTimeoutError()) |
| } |
| } else if (timeoutType === TIMEOUT_BODY) { |
| if (!parser.paused) { |
| util.destroy(socket, new BodyTimeoutError()) |
| } |
| } else if (timeoutType === TIMEOUT_IDLE) { |
| assert(client[kRunning] === 0 && client[kKeepAliveTimeoutValue]) |
| util.destroy(socket, new InformationalError('socket idle timeout')) |
| } |
| } |
| |
| function onSocketReadable () { |
| const { [kParser]: parser } = this |
| if (parser) { |
| parser.readMore() |
| } |
| } |
| |
| function onSocketError (err) { |
| const { [kClient]: client, [kParser]: parser } = this |
| |
| assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') |
| |
| if (client[kHTTPConnVersion] !== 'h2') { |
| // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded |
| // to the user. |
| if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) { |
| // We treat all incoming data so for as a valid response. |
| parser.onMessageComplete() |
| return |
| } |
| } |
| |
| this[kError] = err |
| |
| onError(this[kClient], err) |
| } |
| |
| function onError (client, err) { |
| if ( |
| client[kRunning] === 0 && |
| err.code !== 'UND_ERR_INFO' && |
| err.code !== 'UND_ERR_SOCKET' |
| ) { |
| // Error is not caused by running request and not a recoverable |
| // socket error. |
| |
| assert(client[kPendingIdx] === client[kRunningIdx]) |
| |
| const requests = client[kQueue].splice(client[kRunningIdx]) |
| for (let i = 0; i < requests.length; i++) { |
| const request = requests[i] |
| errorRequest(client, request, err) |
| } |
| assert(client[kSize] === 0) |
| } |
| } |
| |
| function onSocketEnd () { |
| const { [kParser]: parser, [kClient]: client } = this |
| |
| if (client[kHTTPConnVersion] !== 'h2') { |
| if (parser.statusCode && !parser.shouldKeepAlive) { |
| // We treat all incoming data so far as a valid response. |
| parser.onMessageComplete() |
| return |
| } |
| } |
| |
| util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) |
| } |
| |
| function onSocketClose () { |
| const { [kClient]: client, [kParser]: parser } = this |
| |
| if (client[kHTTPConnVersion] === 'h1' && parser) { |
| if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) { |
| // We treat all incoming data so far as a valid response. |
| parser.onMessageComplete() |
| } |
| |
| this[kParser].destroy() |
| this[kParser] = null |
| } |
| |
| const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)) |
| |
| client[kSocket] = null |
| |
| if (client.destroyed) { |
| assert(client[kPending] === 0) |
| |
| // Fail entire queue. |
| const requests = client[kQueue].splice(client[kRunningIdx]) |
| for (let i = 0; i < requests.length; i++) { |
| const request = requests[i] |
| errorRequest(client, request, err) |
| } |
| } else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') { |
| // Fail head of pipeline. |
| const request = client[kQueue][client[kRunningIdx]] |
| client[kQueue][client[kRunningIdx]++] = null |
| |
| errorRequest(client, request, err) |
| } |
| |
| client[kPendingIdx] = client[kRunningIdx] |
| |
| assert(client[kRunning] === 0) |
| |
| client.emit('disconnect', client[kUrl], [client], err) |
| |
| resume(client) |
| } |
| |
| async function connect (client) { |
| assert(!client[kConnecting]) |
| assert(!client[kSocket]) |
| |
| let { host, hostname, protocol, port } = client[kUrl] |
| |
| // Resolve ipv6 |
| if (hostname[0] === '[') { |
| const idx = hostname.indexOf(']') |
| |
| assert(idx !== -1) |
| const ip = hostname.substring(1, idx) |
| |
| assert(net.isIP(ip)) |
| hostname = ip |
| } |
| |
| client[kConnecting] = true |
| |
| if (channels.beforeConnect.hasSubscribers) { |
| channels.beforeConnect.publish({ |
| connectParams: { |
| host, |
| hostname, |
| protocol, |
| port, |
| servername: client[kServerName], |
| localAddress: client[kLocalAddress] |
| }, |
| connector: client[kConnector] |
| }) |
| } |
| |
| try { |
| const socket = await new Promise((resolve, reject) => { |
| client[kConnector]({ |
| host, |
| hostname, |
| protocol, |
| port, |
| servername: client[kServerName], |
| localAddress: client[kLocalAddress] |
| }, (err, socket) => { |
| if (err) { |
| reject(err) |
| } else { |
| resolve(socket) |
| } |
| }) |
| }) |
| |
| if (client.destroyed) { |
| util.destroy(socket.on('error', () => {}), new ClientDestroyedError()) |
| return |
| } |
| |
| client[kConnecting] = false |
| |
| assert(socket) |
| |
| const isH2 = socket.alpnProtocol === 'h2' |
| if (isH2) { |
| if (!h2ExperimentalWarned) { |
| h2ExperimentalWarned = true |
| process.emitWarning('H2 support is experimental, expect them to change at any time.', { |
| code: 'UNDICI-H2' |
| }) |
| } |
| |
| const session = http2.connect(client[kUrl], { |
| createConnection: () => socket, |
| peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams |
| }) |
| |
| client[kHTTPConnVersion] = 'h2' |
| session[kClient] = client |
| session[kSocket] = socket |
| session.on('error', onHttp2SessionError) |
| session.on('frameError', onHttp2FrameError) |
| session.on('end', onHttp2SessionEnd) |
| session.on('goaway', onHTTP2GoAway) |
| session.on('close', onSocketClose) |
| session.unref() |
| |
| client[kHTTP2Session] = session |
| socket[kHTTP2Session] = session |
| } else { |
| if (!llhttpInstance) { |
| llhttpInstance = await llhttpPromise |
| llhttpPromise = null |
| } |
| |
| socket[kNoRef] = false |
| socket[kWriting] = false |
| socket[kReset] = false |
| socket[kBlocking] = false |
| socket[kParser] = new Parser(client, socket, llhttpInstance) |
| } |
| |
| socket[kCounter] = 0 |
| socket[kMaxRequests] = client[kMaxRequests] |
| socket[kClient] = client |
| socket[kError] = null |
| |
| socket |
| .on('error', onSocketError) |
| .on('readable', onSocketReadable) |
| .on('end', onSocketEnd) |
| .on('close', onSocketClose) |
| |
| client[kSocket] = socket |
| |
| if (channels.connected.hasSubscribers) { |
| channels.connected.publish({ |
| connectParams: { |
| host, |
| hostname, |
| protocol, |
| port, |
| servername: client[kServerName], |
| localAddress: client[kLocalAddress] |
| }, |
| connector: client[kConnector], |
| socket |
| }) |
| } |
| client.emit('connect', client[kUrl], [client]) |
| } catch (err) { |
| if (client.destroyed) { |
| return |
| } |
| |
| client[kConnecting] = false |
| |
| if (channels.connectError.hasSubscribers) { |
| channels.connectError.publish({ |
| connectParams: { |
| host, |
| hostname, |
| protocol, |
| port, |
| servername: client[kServerName], |
| localAddress: client[kLocalAddress] |
| }, |
| connector: client[kConnector], |
| error: err |
| }) |
| } |
| |
| if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') { |
| assert(client[kRunning] === 0) |
| while (client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName]) { |
| const request = client[kQueue][client[kPendingIdx]++] |
| errorRequest(client, request, err) |
| } |
| } else { |
| onError(client, err) |
| } |
| |
| client.emit('connectionError', client[kUrl], [client], err) |
| } |
| |
| resume(client) |
| } |
| |
| function emitDrain (client) { |
| client[kNeedDrain] = 0 |
| client.emit('drain', client[kUrl], [client]) |
| } |
| |
| function resume (client, sync) { |
| if (client[kResuming] === 2) { |
| return |
| } |
| |
| client[kResuming] = 2 |
| |
| _resume(client, sync) |
| client[kResuming] = 0 |
| |
| if (client[kRunningIdx] > 256) { |
| client[kQueue].splice(0, client[kRunningIdx]) |
| client[kPendingIdx] -= client[kRunningIdx] |
| client[kRunningIdx] = 0 |
| } |
| } |
| |
| function _resume (client, sync) { |
| while (true) { |
| if (client.destroyed) { |
| assert(client[kPending] === 0) |
| return |
| } |
| |
| if (client[kClosedResolve] && !client[kSize]) { |
| client[kClosedResolve]() |
| client[kClosedResolve] = null |
| return |
| } |
| |
| const socket = client[kSocket] |
| |
| if (socket && !socket.destroyed && socket.alpnProtocol !== 'h2') { |
| if (client[kSize] === 0) { |
| if (!socket[kNoRef] && socket.unref) { |
| socket.unref() |
| socket[kNoRef] = true |
| } |
| } else if (socket[kNoRef] && socket.ref) { |
| socket.ref() |
| socket[kNoRef] = false |
| } |
| |
| if (client[kSize] === 0) { |
| if (socket[kParser].timeoutType !== TIMEOUT_IDLE) { |
| socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_IDLE) |
| } |
| } else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) { |
| if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) { |
| const request = client[kQueue][client[kRunningIdx]] |
| const headersTimeout = request.headersTimeout != null |
| ? request.headersTimeout |
| : client[kHeadersTimeout] |
| socket[kParser].setTimeout(headersTimeout, TIMEOUT_HEADERS) |
| } |
| } |
| } |
| |
| if (client[kBusy]) { |
| client[kNeedDrain] = 2 |
| } else if (client[kNeedDrain] === 2) { |
| if (sync) { |
| client[kNeedDrain] = 1 |
| process.nextTick(emitDrain, client) |
| } else { |
| emitDrain(client) |
| } |
| continue |
| } |
| |
| if (client[kPending] === 0) { |
| return |
| } |
| |
| if (client[kRunning] >= (client[kPipelining] || 1)) { |
| return |
| } |
| |
| const request = client[kQueue][client[kPendingIdx]] |
| |
| if (client[kUrl].protocol === 'https:' && client[kServerName] !== request.servername) { |
| if (client[kRunning] > 0) { |
| return |
| } |
| |
| client[kServerName] = request.servername |
| |
| if (socket && socket.servername !== request.servername) { |
| util.destroy(socket, new InformationalError('servername changed')) |
| return |
| } |
| } |
| |
| if (client[kConnecting]) { |
| return |
| } |
| |
| if (!socket && !client[kHTTP2Session]) { |
| connect(client) |
| return |
| } |
| |
| if (socket.destroyed || socket[kWriting] || socket[kReset] || socket[kBlocking]) { |
| return |
| } |
| |
| if (client[kRunning] > 0 && !request.idempotent) { |
| // Non-idempotent request cannot be retried. |
| // Ensure that no other requests are inflight and |
| // could cause failure. |
| return |
| } |
| |
| if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) { |
| // Don't dispatch an upgrade until all preceding requests have completed. |
| // A misbehaving server might upgrade the connection before all pipelined |
| // request has completed. |
| return |
| } |
| |
| if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 && |
| (util.isStream(request.body) || util.isAsyncIterable(request.body))) { |
| // Request with stream or iterator body can error while other requests |
| // are inflight and indirectly error those as well. |
| // Ensure this doesn't happen by waiting for inflight |
| // to complete before dispatching. |
| |
| // Request with stream or iterator body cannot be retried. |
| // Ensure that no other requests are inflight and |
| // could cause failure. |
| return |
| } |
| |
| if (!request.aborted && write(client, request)) { |
| client[kPendingIdx]++ |
| } else { |
| client[kQueue].splice(client[kPendingIdx], 1) |
| } |
| } |
| } |
| |
| // https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2 |
| function shouldSendContentLength (method) { |
| return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT' |
| } |
| |
| function write (client, request) { |
| if (client[kHTTPConnVersion] === 'h2') { |
| writeH2(client, client[kHTTP2Session], request) |
| return |
| } |
| |
| const { body, method, path, host, upgrade, headers, blocking, reset } = request |
| |
| // https://tools.ietf.org/html/rfc7231#section-4.3.1 |
| // https://tools.ietf.org/html/rfc7231#section-4.3.2 |
| // https://tools.ietf.org/html/rfc7231#section-4.3.5 |
| |
| // Sending a payload body on a request that does not |
| // expect it can cause undefined behavior on some |
| // servers and corrupt connection state. Do not |
| // re-use the connection for further requests. |
| |
| const expectsPayload = ( |
| method === 'PUT' || |
| method === 'POST' || |
| method === 'PATCH' |
| ) |
| |
| if (body && typeof body.read === 'function') { |
| // Try to read EOF in order to get length. |
| body.read(0) |
| } |
| |
| const bodyLength = util.bodyLength(body) |
| |
| let contentLength = bodyLength |
| |
| if (contentLength === null) { |
| contentLength = request.contentLength |
| } |
| |
| if (contentLength === 0 && !expectsPayload) { |
| // https://tools.ietf.org/html/rfc7230#section-3.3.2 |
| // A user agent SHOULD NOT send a Content-Length header field when |
| // the request message does not contain a payload body and the method |
| // semantics do not anticipate such a body. |
| |
| contentLength = null |
| } |
| |
| // https://github.com/nodejs/undici/issues/2046 |
| // A user agent may send a Content-Length header with 0 value, this should be allowed. |
| if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength !== null && request.contentLength !== contentLength) { |
| if (client[kStrictContentLength]) { |
| errorRequest(client, request, new RequestContentLengthMismatchError()) |
| return false |
| } |
| |
| process.emitWarning(new RequestContentLengthMismatchError()) |
| } |
| |
| const socket = client[kSocket] |
| |
| try { |
| request.onConnect((err) => { |
| if (request.aborted || request.completed) { |
| return |
| } |
| |
| errorRequest(client, request, err || new RequestAbortedError()) |
| |
| util.destroy(socket, new InformationalError('aborted')) |
| }) |
| } catch (err) { |
| errorRequest(client, request, err) |
| } |
| |
| if (request.aborted) { |
| return false |
| } |
| |
| if (method === 'HEAD') { |
| // https://github.com/mcollina/undici/issues/258 |
| // Close after a HEAD request to interop with misbehaving servers |
| // that may send a body in the response. |
| |
| socket[kReset] = true |
| } |
| |
| if (upgrade || method === 'CONNECT') { |
| // On CONNECT or upgrade, block pipeline from dispatching further |
| // requests on this connection. |
| |
| socket[kReset] = true |
| } |
| |
| if (reset != null) { |
| socket[kReset] = reset |
| } |
| |
| if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) { |
| socket[kReset] = true |
| } |
| |
| if (blocking) { |
| socket[kBlocking] = true |
| } |
| |
| let header = `${method} ${path} HTTP/1.1\r\n` |
| |
| if (typeof host === 'string') { |
| header += `host: ${host}\r\n` |
| } else { |
| header += client[kHostHeader] |
| } |
| |
| if (upgrade) { |
| header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n` |
| } else if (client[kPipelining] && !socket[kReset]) { |
| header += 'connection: keep-alive\r\n' |
| } else { |
| header += 'connection: close\r\n' |
| } |
| |
| if (headers) { |
| header += headers |
| } |
| |
| if (channels.sendHeaders.hasSubscribers) { |
| channels.sendHeaders.publish({ request, headers: header, socket }) |
| } |
| |
| /* istanbul ignore else: assertion */ |
| if (!body || bodyLength === 0) { |
| if (contentLength === 0) { |
| socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1') |
| } else { |
| assert(contentLength === null, 'no body must not have content length') |
| socket.write(`${header}\r\n`, 'latin1') |
| } |
| request.onRequestSent() |
| } else if (util.isBuffer(body)) { |
| assert(contentLength === body.byteLength, 'buffer body must have content length') |
| |
| socket.cork() |
| socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1') |
| socket.write(body) |
| socket.uncork() |
| request.onBodySent(body) |
| request.onRequestSent() |
| if (!expectsPayload) { |
| socket[kReset] = true |
| } |
| } else if (util.isBlobLike(body)) { |
| if (typeof body.stream === 'function') { |
| writeIterable({ body: body.stream(), client, request, socket, contentLength, header, expectsPayload }) |
| } else { |
| writeBlob({ body, client, request, socket, contentLength, header, expectsPayload }) |
| } |
| } else if (util.isStream(body)) { |
| writeStream({ body, client, request, socket, contentLength, header, expectsPayload }) |
| } else if (util.isIterable(body)) { |
| writeIterable({ body, client, request, socket, contentLength, header, expectsPayload }) |
| } else { |
| assert(false) |
| } |
| |
| return true |
| } |
| |
| function writeH2 (client, session, request) { |
| const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request |
| |
| let headers |
| if (typeof reqHeaders === 'string') headers = Request[kHTTP2CopyHeaders](reqHeaders.trim()) |
| else headers = reqHeaders |
| |
| if (upgrade) { |
| errorRequest(client, request, new Error('Upgrade not supported for H2')) |
| return false |
| } |
| |
| try { |
| // TODO(HTTP/2): Should we call onConnect immediately or on stream ready event? |
| request.onConnect((err) => { |
| if (request.aborted || request.completed) { |
| return |
| } |
| |
| errorRequest(client, request, err || new RequestAbortedError()) |
| }) |
| } catch (err) { |
| errorRequest(client, request, err) |
| } |
| |
| if (request.aborted) { |
| return false |
| } |
| |
| /** @type {import('node:http2').ClientHttp2Stream} */ |
| let stream |
| const h2State = client[kHTTP2SessionState] |
| |
| headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost] |
| headers[HTTP2_HEADER_METHOD] = method |
| |
| if (method === 'CONNECT') { |
| session.ref() |
| // we are already connected, streams are pending, first request |
| // will create a new stream. We trigger a request to create the stream and wait until |
| // `ready` event is triggered |
| // We disabled endStream to allow the user to write to the stream |
| stream = session.request(headers, { endStream: false, signal }) |
| |
| if (stream.id && !stream.pending) { |
| request.onUpgrade(null, null, stream) |
| ++h2State.openStreams |
| } else { |
| stream.once('ready', () => { |
| request.onUpgrade(null, null, stream) |
| ++h2State.openStreams |
| }) |
| } |
| |
| stream.once('close', () => { |
| h2State.openStreams -= 1 |
| // TODO(HTTP/2): unref only if current streams count is 0 |
| if (h2State.openStreams === 0) session.unref() |
| }) |
| |
| return true |
| } |
| |
| // https://tools.ietf.org/html/rfc7540#section-8.3 |
| // :path and :scheme headers must be omited when sending CONNECT |
| |
| headers[HTTP2_HEADER_PATH] = path |
| headers[HTTP2_HEADER_SCHEME] = 'https' |
| |
| // https://tools.ietf.org/html/rfc7231#section-4.3.1 |
| // https://tools.ietf.org/html/rfc7231#section-4.3.2 |
| // https://tools.ietf.org/html/rfc7231#section-4.3.5 |
| |
| // Sending a payload body on a request that does not |
| // expect it can cause undefined behavior on some |
| // servers and corrupt connection state. Do not |
| // re-use the connection for further requests. |
| |
| const expectsPayload = ( |
| method === 'PUT' || |
| method === 'POST' || |
| method === 'PATCH' |
| ) |
| |
| if (body && typeof body.read === 'function') { |
| // Try to read EOF in order to get length. |
| body.read(0) |
| } |
| |
| let contentLength = util.bodyLength(body) |
| |
| if (contentLength == null) { |
| contentLength = request.contentLength |
| } |
| |
| if (contentLength === 0 || !expectsPayload) { |
| // https://tools.ietf.org/html/rfc7230#section-3.3.2 |
| // A user agent SHOULD NOT send a Content-Length header field when |
| // the request message does not contain a payload body and the method |
| // semantics do not anticipate such a body. |
| |
| contentLength = null |
| } |
| |
| // https://github.com/nodejs/undici/issues/2046 |
| // A user agent may send a Content-Length header with 0 value, this should be allowed. |
| if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) { |
| if (client[kStrictContentLength]) { |
| errorRequest(client, request, new RequestContentLengthMismatchError()) |
| return false |
| } |
| |
| process.emitWarning(new RequestContentLengthMismatchError()) |
| } |
| |
| if (contentLength != null) { |
| assert(body, 'no body must not have content length') |
| headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}` |
| } |
| |
| session.ref() |
| |
| const shouldEndStream = method === 'GET' || method === 'HEAD' |
| if (expectContinue) { |
| headers[HTTP2_HEADER_EXPECT] = '100-continue' |
| stream = session.request(headers, { endStream: shouldEndStream, signal }) |
| |
| stream.once('continue', writeBodyH2) |
| } else { |
| stream = session.request(headers, { |
| endStream: shouldEndStream, |
| signal |
| }) |
| writeBodyH2() |
| } |
| |
| // Increment counter as we have new several streams open |
| ++h2State.openStreams |
| |
| stream.once('response', headers => { |
| const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers |
| |
| if (request.onHeaders(Number(statusCode), realHeaders, stream.resume.bind(stream), '') === false) { |
| stream.pause() |
| } |
| }) |
| |
| stream.once('end', () => { |
| request.onComplete([]) |
| }) |
| |
| stream.on('data', (chunk) => { |
| if (request.onData(chunk) === false) { |
| stream.pause() |
| } |
| }) |
| |
| stream.once('close', () => { |
| h2State.openStreams -= 1 |
| // TODO(HTTP/2): unref only if current streams count is 0 |
| if (h2State.openStreams === 0) { |
| session.unref() |
| } |
| }) |
| |
| stream.once('error', function (err) { |
| if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { |
| h2State.streams -= 1 |
| util.destroy(stream, err) |
| } |
| }) |
| |
| stream.once('frameError', (type, code) => { |
| const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) |
| errorRequest(client, request, err) |
| |
| if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { |
| h2State.streams -= 1 |
| util.destroy(stream, err) |
| } |
| }) |
| |
| // stream.on('aborted', () => { |
| // // TODO(HTTP/2): Support aborted |
| // }) |
| |
| // stream.on('timeout', () => { |
| // // TODO(HTTP/2): Support timeout |
| // }) |
| |
| // stream.on('push', headers => { |
| // // TODO(HTTP/2): Suppor push |
| // }) |
| |
| // stream.on('trailers', headers => { |
| // // TODO(HTTP/2): Support trailers |
| // }) |
| |
| return true |
| |
| function writeBodyH2 () { |
| /* istanbul ignore else: assertion */ |
| if (!body) { |
| request.onRequestSent() |
| } else if (util.isBuffer(body)) { |
| assert(contentLength === body.byteLength, 'buffer body must have content length') |
| stream.cork() |
| stream.write(body) |
| stream.uncork() |
| stream.end() |
| request.onBodySent(body) |
| request.onRequestSent() |
| } else if (util.isBlobLike(body)) { |
| if (typeof body.stream === 'function') { |
| writeIterable({ |
| client, |
| request, |
| contentLength, |
| h2stream: stream, |
| expectsPayload, |
| body: body.stream(), |
| socket: client[kSocket], |
| header: '' |
| }) |
| } else { |
| writeBlob({ |
| body, |
| client, |
| request, |
| contentLength, |
| expectsPayload, |
| h2stream: stream, |
| header: '', |
| socket: client[kSocket] |
| }) |
| } |
| } else if (util.isStream(body)) { |
| writeStream({ |
| body, |
| client, |
| request, |
| contentLength, |
| expectsPayload, |
| socket: client[kSocket], |
| h2stream: stream, |
| header: '' |
| }) |
| } else if (util.isIterable(body)) { |
| writeIterable({ |
| body, |
| client, |
| request, |
| contentLength, |
| expectsPayload, |
| header: '', |
| h2stream: stream, |
| socket: client[kSocket] |
| }) |
| } else { |
| assert(false) |
| } |
| } |
| } |
| |
| function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { |
| assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined') |
| |
| if (client[kHTTPConnVersion] === 'h2') { |
| // For HTTP/2, is enough to pipe the stream |
| const pipe = pipeline( |
| body, |
| h2stream, |
| (err) => { |
| if (err) { |
| util.destroy(body, err) |
| util.destroy(h2stream, err) |
| } else { |
| request.onRequestSent() |
| } |
| } |
| ) |
| |
| pipe.on('data', onPipeData) |
| pipe.once('end', () => { |
| pipe.removeListener('data', onPipeData) |
| util.destroy(pipe) |
| }) |
| |
| function onPipeData (chunk) { |
| request.onBodySent(chunk) |
| } |
| |
| return |
| } |
| |
| let finished = false |
| |
| const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header }) |
| |
| const onData = function (chunk) { |
| if (finished) { |
| return |
| } |
| |
| try { |
| if (!writer.write(chunk) && this.pause) { |
| this.pause() |
| } |
| } catch (err) { |
| util.destroy(this, err) |
| } |
| } |
| const onDrain = function () { |
| if (finished) { |
| return |
| } |
| |
| if (body.resume) { |
| body.resume() |
| } |
| } |
| const onAbort = function () { |
| if (finished) { |
| return |
| } |
| const err = new RequestAbortedError() |
| queueMicrotask(() => onFinished(err)) |
| } |
| const onFinished = function (err) { |
| if (finished) { |
| return |
| } |
| |
| finished = true |
| |
| assert(socket.destroyed || (socket[kWriting] && client[kRunning] <= 1)) |
| |
| socket |
| .off('drain', onDrain) |
| .off('error', onFinished) |
| |
| body |
| .removeListener('data', onData) |
| .removeListener('end', onFinished) |
| .removeListener('error', onFinished) |
| .removeListener('close', onAbort) |
| |
| if (!err) { |
| try { |
| writer.end() |
| } catch (er) { |
| err = er |
| } |
| } |
| |
| writer.destroy(err) |
| |
| if (err && (err.code !== 'UND_ERR_INFO' || err.message !== 'reset')) { |
| util.destroy(body, err) |
| } else { |
| util.destroy(body) |
| } |
| } |
| |
| body |
| .on('data', onData) |
| .on('end', onFinished) |
| .on('error', onFinished) |
| .on('close', onAbort) |
| |
| if (body.resume) { |
| body.resume() |
| } |
| |
| socket |
| .on('drain', onDrain) |
| .on('error', onFinished) |
| } |
| |
| async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { |
| assert(contentLength === body.size, 'blob body must have content length') |
| |
| const isH2 = client[kHTTPConnVersion] === 'h2' |
| try { |
| if (contentLength != null && contentLength !== body.size) { |
| throw new RequestContentLengthMismatchError() |
| } |
| |
| const buffer = Buffer.from(await body.arrayBuffer()) |
| |
| if (isH2) { |
| h2stream.cork() |
| h2stream.write(buffer) |
| h2stream.uncork() |
| } else { |
| socket.cork() |
| socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1') |
| socket.write(buffer) |
| socket.uncork() |
| } |
| |
| request.onBodySent(buffer) |
| request.onRequestSent() |
| |
| if (!expectsPayload) { |
| socket[kReset] = true |
| } |
| |
| resume(client) |
| } catch (err) { |
| util.destroy(isH2 ? h2stream : socket, err) |
| } |
| } |
| |
| async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { |
| assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined') |
| |
| let callback = null |
| function onDrain () { |
| if (callback) { |
| const cb = callback |
| callback = null |
| cb() |
| } |
| } |
| |
| const waitForDrain = () => new Promise((resolve, reject) => { |
| assert(callback === null) |
| |
| if (socket[kError]) { |
| reject(socket[kError]) |
| } else { |
| callback = resolve |
| } |
| }) |
| |
| if (client[kHTTPConnVersion] === 'h2') { |
| h2stream |
| .on('close', onDrain) |
| .on('drain', onDrain) |
| |
| try { |
| // It's up to the user to somehow abort the async iterable. |
| for await (const chunk of body) { |
| if (socket[kError]) { |
| throw socket[kError] |
| } |
| |
| const res = h2stream.write(chunk) |
| request.onBodySent(chunk) |
| if (!res) { |
| await waitForDrain() |
| } |
| } |
| } catch (err) { |
| h2stream.destroy(err) |
| } finally { |
| request.onRequestSent() |
| h2stream.end() |
| h2stream |
| .off('close', onDrain) |
| .off('drain', onDrain) |
| } |
| |
| return |
| } |
| |
| socket |
| .on('close', onDrain) |
| .on('drain', onDrain) |
| |
| const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header }) |
| try { |
| // It's up to the user to somehow abort the async iterable. |
| for await (const chunk of body) { |
| if (socket[kError]) { |
| throw socket[kError] |
| } |
| |
| if (!writer.write(chunk)) { |
| await waitForDrain() |
| } |
| } |
| |
| writer.end() |
| } catch (err) { |
| writer.destroy(err) |
| } finally { |
| socket |
| .off('close', onDrain) |
| .off('drain', onDrain) |
| } |
| } |
| |
| class AsyncWriter { |
| constructor ({ socket, request, contentLength, client, expectsPayload, header }) { |
| this.socket = socket |
| this.request = request |
| this.contentLength = contentLength |
| this.client = client |
| this.bytesWritten = 0 |
| this.expectsPayload = expectsPayload |
| this.header = header |
| |
| socket[kWriting] = true |
| } |
| |
| write (chunk) { |
| const { socket, request, contentLength, client, bytesWritten, expectsPayload, header } = this |
| |
| if (socket[kError]) { |
| throw socket[kError] |
| } |
| |
| if (socket.destroyed) { |
| return false |
| } |
| |
| const len = Buffer.byteLength(chunk) |
| if (!len) { |
| return true |
| } |
| |
| // We should defer writing chunks. |
| if (contentLength !== null && bytesWritten + len > contentLength) { |
| if (client[kStrictContentLength]) { |
| throw new RequestContentLengthMismatchError() |
| } |
| |
| process.emitWarning(new RequestContentLengthMismatchError()) |
| } |
| |
| socket.cork() |
| |
| if (bytesWritten === 0) { |
| if (!expectsPayload) { |
| socket[kReset] = true |
| } |
| |
| if (contentLength === null) { |
| socket.write(`${header}transfer-encoding: chunked\r\n`, 'latin1') |
| } else { |
| socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1') |
| } |
| } |
| |
| if (contentLength === null) { |
| socket.write(`\r\n${len.toString(16)}\r\n`, 'latin1') |
| } |
| |
| this.bytesWritten += len |
| |
| const ret = socket.write(chunk) |
| |
| socket.uncork() |
| |
| request.onBodySent(chunk) |
| |
| if (!ret) { |
| if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) { |
| // istanbul ignore else: only for jest |
| if (socket[kParser].timeout.refresh) { |
| socket[kParser].timeout.refresh() |
| } |
| } |
| } |
| |
| return ret |
| } |
| |
| end () { |
| const { socket, contentLength, client, bytesWritten, expectsPayload, header, request } = this |
| request.onRequestSent() |
| |
| socket[kWriting] = false |
| |
| if (socket[kError]) { |
| throw socket[kError] |
| } |
| |
| if (socket.destroyed) { |
| return |
| } |
| |
| if (bytesWritten === 0) { |
| if (expectsPayload) { |
| // https://tools.ietf.org/html/rfc7230#section-3.3.2 |
| // A user agent SHOULD send a Content-Length in a request message when |
| // no Transfer-Encoding is sent and the request method defines a meaning |
| // for an enclosed payload body. |
| |
| socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1') |
| } else { |
| socket.write(`${header}\r\n`, 'latin1') |
| } |
| } else if (contentLength === null) { |
| socket.write('\r\n0\r\n\r\n', 'latin1') |
| } |
| |
| if (contentLength !== null && bytesWritten !== contentLength) { |
| if (client[kStrictContentLength]) { |
| throw new RequestContentLengthMismatchError() |
| } else { |
| process.emitWarning(new RequestContentLengthMismatchError()) |
| } |
| } |
| |
| if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) { |
| // istanbul ignore else: only for jest |
| if (socket[kParser].timeout.refresh) { |
| socket[kParser].timeout.refresh() |
| } |
| } |
| |
| resume(client) |
| } |
| |
| destroy (err) { |
| const { socket, client } = this |
| |
| socket[kWriting] = false |
| |
| if (err) { |
| assert(client[kRunning] <= 1, 'pipeline should only contain this request') |
| util.destroy(socket, err) |
| } |
| } |
| } |
| |
| function errorRequest (client, request, err) { |
| try { |
| request.onError(err) |
| assert(request.aborted) |
| } catch (err) { |
| client.emit('error', err) |
| } |
| } |
| |
| module.exports = Client |