| 'use strict' |
| |
| const { |
| Readable, |
| Duplex, |
| PassThrough |
| } = require('stream') |
| const { |
| InvalidArgumentError, |
| InvalidReturnValueError, |
| RequestAbortedError |
| } = require('../core/errors') |
| const util = require('../core/util') |
| const { AsyncResource } = require('async_hooks') |
| const { addSignal, removeSignal } = require('./abort-signal') |
| const assert = require('assert') |
| |
| const kResume = Symbol('resume') |
| |
| class PipelineRequest extends Readable { |
| constructor () { |
| super({ autoDestroy: true }) |
| |
| this[kResume] = null |
| } |
| |
| _read () { |
| const { [kResume]: resume } = this |
| |
| if (resume) { |
| this[kResume] = null |
| resume() |
| } |
| } |
| |
| _destroy (err, callback) { |
| this._read() |
| |
| callback(err) |
| } |
| } |
| |
| class PipelineResponse extends Readable { |
| constructor (resume) { |
| super({ autoDestroy: true }) |
| this[kResume] = resume |
| } |
| |
| _read () { |
| this[kResume]() |
| } |
| |
| _destroy (err, callback) { |
| if (!err && !this._readableState.endEmitted) { |
| err = new RequestAbortedError() |
| } |
| |
| callback(err) |
| } |
| } |
| |
| class PipelineHandler extends AsyncResource { |
| constructor (opts, handler) { |
| if (!opts || typeof opts !== 'object') { |
| throw new InvalidArgumentError('invalid opts') |
| } |
| |
| if (typeof handler !== 'function') { |
| throw new InvalidArgumentError('invalid handler') |
| } |
| |
| const { signal, method, opaque, onInfo, responseHeaders } = opts |
| |
| if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { |
| throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') |
| } |
| |
| if (method === 'CONNECT') { |
| throw new InvalidArgumentError('invalid method') |
| } |
| |
| if (onInfo && typeof onInfo !== 'function') { |
| throw new InvalidArgumentError('invalid onInfo callback') |
| } |
| |
| super('UNDICI_PIPELINE') |
| |
| this.opaque = opaque || null |
| this.responseHeaders = responseHeaders || null |
| this.handler = handler |
| this.abort = null |
| this.context = null |
| this.onInfo = onInfo || null |
| |
| this.req = new PipelineRequest().on('error', util.nop) |
| |
| this.ret = new Duplex({ |
| readableObjectMode: opts.objectMode, |
| autoDestroy: true, |
| read: () => { |
| const { body } = this |
| |
| if (body && body.resume) { |
| body.resume() |
| } |
| }, |
| write: (chunk, encoding, callback) => { |
| const { req } = this |
| |
| if (req.push(chunk, encoding) || req._readableState.destroyed) { |
| callback() |
| } else { |
| req[kResume] = callback |
| } |
| }, |
| destroy: (err, callback) => { |
| const { body, req, res, ret, abort } = this |
| |
| if (!err && !ret._readableState.endEmitted) { |
| err = new RequestAbortedError() |
| } |
| |
| if (abort && err) { |
| abort() |
| } |
| |
| util.destroy(body, err) |
| util.destroy(req, err) |
| util.destroy(res, err) |
| |
| removeSignal(this) |
| |
| callback(err) |
| } |
| }).on('prefinish', () => { |
| const { req } = this |
| |
| // Node < 15 does not call _final in same tick. |
| req.push(null) |
| }) |
| |
| this.res = null |
| |
| addSignal(this, signal) |
| } |
| |
| onConnect (abort, context) { |
| const { ret, res } = this |
| |
| assert(!res, 'pipeline cannot be retried') |
| |
| if (ret.destroyed) { |
| throw new RequestAbortedError() |
| } |
| |
| this.abort = abort |
| this.context = context |
| } |
| |
| onHeaders (statusCode, rawHeaders, resume) { |
| const { opaque, handler, context } = this |
| |
| if (statusCode < 200) { |
| if (this.onInfo) { |
| const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) |
| this.onInfo({ statusCode, headers }) |
| } |
| return |
| } |
| |
| this.res = new PipelineResponse(resume) |
| |
| let body |
| try { |
| this.handler = null |
| const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) |
| body = this.runInAsyncScope(handler, null, { |
| statusCode, |
| headers, |
| opaque, |
| body: this.res, |
| context |
| }) |
| } catch (err) { |
| this.res.on('error', util.nop) |
| throw err |
| } |
| |
| if (!body || typeof body.on !== 'function') { |
| throw new InvalidReturnValueError('expected Readable') |
| } |
| |
| body |
| .on('data', (chunk) => { |
| const { ret, body } = this |
| |
| if (!ret.push(chunk) && body.pause) { |
| body.pause() |
| } |
| }) |
| .on('error', (err) => { |
| const { ret } = this |
| |
| util.destroy(ret, err) |
| }) |
| .on('end', () => { |
| const { ret } = this |
| |
| ret.push(null) |
| }) |
| .on('close', () => { |
| const { ret } = this |
| |
| if (!ret._readableState.ended) { |
| util.destroy(ret, new RequestAbortedError()) |
| } |
| }) |
| |
| this.body = body |
| } |
| |
| onData (chunk) { |
| const { res } = this |
| return res.push(chunk) |
| } |
| |
| onComplete (trailers) { |
| const { res } = this |
| res.push(null) |
| } |
| |
| onError (err) { |
| const { ret } = this |
| this.handler = null |
| util.destroy(ret, err) |
| } |
| } |
| |
| function pipeline (opts, handler) { |
| try { |
| const pipelineHandler = new PipelineHandler(opts, handler) |
| this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler) |
| return pipelineHandler.ret |
| } catch (err) { |
| return new PassThrough().destroy(err) |
| } |
| } |
| |
| module.exports = pipeline |