| 'use strict' |
| |
| const { finished, PassThrough } = require('stream') |
| const { |
| InvalidArgumentError, |
| InvalidReturnValueError, |
| RequestAbortedError |
| } = require('../core/errors') |
| const util = require('../core/util') |
| const { getResolveErrorBodyCallback } = require('./util') |
| const { AsyncResource } = require('async_hooks') |
| const { addSignal, removeSignal } = require('./abort-signal') |
| |
| class StreamHandler extends AsyncResource { |
| constructor (opts, factory, callback) { |
| if (!opts || typeof opts !== 'object') { |
| throw new InvalidArgumentError('invalid opts') |
| } |
| |
| const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError } = opts |
| |
| try { |
| if (typeof callback !== 'function') { |
| throw new InvalidArgumentError('invalid callback') |
| } |
| |
| if (typeof factory !== 'function') { |
| throw new InvalidArgumentError('invalid factory') |
| } |
| |
| if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { |
| throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') |
| } |
| |
| if (method === 'CONNECT') { |
| throw new InvalidArgumentError('invalid method') |
| } |
| |
| if (onInfo && typeof onInfo !== 'function') { |
| throw new InvalidArgumentError('invalid onInfo callback') |
| } |
| |
| super('UNDICI_STREAM') |
| } catch (err) { |
| if (util.isStream(body)) { |
| util.destroy(body.on('error', util.nop), err) |
| } |
| throw err |
| } |
| |
| this.responseHeaders = responseHeaders || null |
| this.opaque = opaque || null |
| this.factory = factory |
| this.callback = callback |
| this.res = null |
| this.abort = null |
| this.context = null |
| this.trailers = null |
| this.body = body |
| this.onInfo = onInfo || null |
| this.throwOnError = throwOnError || false |
| |
| if (util.isStream(body)) { |
| body.on('error', (err) => { |
| this.onError(err) |
| }) |
| } |
| |
| addSignal(this, signal) |
| } |
| |
| onConnect (abort, context) { |
| if (!this.callback) { |
| throw new RequestAbortedError() |
| } |
| |
| this.abort = abort |
| this.context = context |
| } |
| |
| onHeaders (statusCode, rawHeaders, resume, statusMessage) { |
| const { factory, opaque, context, callback, responseHeaders } = this |
| |
| const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) |
| |
| if (statusCode < 200) { |
| if (this.onInfo) { |
| this.onInfo({ statusCode, headers }) |
| } |
| return |
| } |
| |
| this.factory = null |
| |
| let res |
| |
| if (this.throwOnError && statusCode >= 400) { |
| const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers |
| const contentType = parsedHeaders['content-type'] |
| res = new PassThrough() |
| |
| this.callback = null |
| this.runInAsyncScope(getResolveErrorBodyCallback, null, |
| { callback, body: res, contentType, statusCode, statusMessage, headers } |
| ) |
| } else { |
| if (factory === null) { |
| return |
| } |
| |
| res = this.runInAsyncScope(factory, null, { |
| statusCode, |
| headers, |
| opaque, |
| context |
| }) |
| |
| if ( |
| !res || |
| typeof res.write !== 'function' || |
| typeof res.end !== 'function' || |
| typeof res.on !== 'function' |
| ) { |
| throw new InvalidReturnValueError('expected Writable') |
| } |
| |
| // TODO: Avoid finished. It registers an unnecessary amount of listeners. |
| finished(res, { readable: false }, (err) => { |
| const { callback, res, opaque, trailers, abort } = this |
| |
| this.res = null |
| if (err || !res.readable) { |
| util.destroy(res, err) |
| } |
| |
| this.callback = null |
| this.runInAsyncScope(callback, null, err || null, { opaque, trailers }) |
| |
| if (err) { |
| abort() |
| } |
| }) |
| } |
| |
| res.on('drain', resume) |
| |
| this.res = res |
| |
| const needDrain = res.writableNeedDrain !== undefined |
| ? res.writableNeedDrain |
| : res._writableState && res._writableState.needDrain |
| |
| return needDrain !== true |
| } |
| |
| onData (chunk) { |
| const { res } = this |
| |
| return res ? res.write(chunk) : true |
| } |
| |
| onComplete (trailers) { |
| const { res } = this |
| |
| removeSignal(this) |
| |
| if (!res) { |
| return |
| } |
| |
| this.trailers = util.parseHeaders(trailers) |
| |
| res.end() |
| } |
| |
| onError (err) { |
| const { res, callback, opaque, body } = this |
| |
| removeSignal(this) |
| |
| this.factory = null |
| |
| if (res) { |
| this.res = null |
| util.destroy(res, err) |
| } else if (callback) { |
| this.callback = null |
| queueMicrotask(() => { |
| this.runInAsyncScope(callback, null, err, { opaque }) |
| }) |
| } |
| |
| if (body) { |
| this.body = null |
| util.destroy(body, err) |
| } |
| } |
| } |
| |
| function stream (opts, factory, callback) { |
| if (callback === undefined) { |
| return new Promise((resolve, reject) => { |
| stream.call(this, opts, factory, (err, data) => { |
| return err ? reject(err) : resolve(data) |
| }) |
| }) |
| } |
| |
| try { |
| this.dispatch(opts, new StreamHandler(opts, factory, callback)) |
| } catch (err) { |
| if (typeof callback !== 'function') { |
| throw err |
| } |
| const opaque = opts && opts.opaque |
| queueMicrotask(() => callback(err, { opaque })) |
| } |
| } |
| |
| module.exports = stream |