var http = require('http')
Node.js API.
var http = require('http')
Control-flow utilities.
var cadence = require('cadence')
var delta = require('delta')
Route Sencha Connect middleware based on request method and URL patterns.
var dispatch = require('dispatch')
Exceptions that you can catch by type.
var interrupt = require('interrupt').createInterrupter('reactor')
Contextualized callbacks and event handlers.
var Operation = require('operation/variadic')
Evented work queue.
var Turnstile = require('turnstile')
Turnstile.Queue = require('turnstile/queue')
Catch exceptions based on a regex match of an error message or property.
var rescue = require('rescue')
Return the first not null-like value.
var coalesce = require('extant')
Do nothing.
var noop = require('nop')
MIME type parser.
var typer = require('media-typer')
var arrayed = require('./arrayed')
function Constructor (object, dispatch) {
this._object = object
this._dispatch = dispatch
this._use = []
this._defaultUse = true
this.logger = function (entry) {
if (entry.error) {
console.log(entry.error.stack)
}
}
}
Constructor.prototype.useDefault = function () {
this.use(require('express-auth-parser'))
this.use(require('body-parser').urlencoded({ extended: false, limit: '64mb' }))
this.use(require('body-parser').json({ limit: '64mb' }))
}
Constructor.prototype.use = function () {
var vargs = Array.prototype.slice.call(arguments)
this._defaultUse = false
while (vargs.length) {
var varg = vargs.shift()
if (Array.isArray(varg)) {
this._use.push.apply(this._use, varg)
} else {
this._use.push(varg)
}
}
}
Constructor.prototype.dispatch = function () {
var vargs = Array.prototype.slice.call(arguments)
this._dispatch[vargs.shift()] = Operation(vargs, { object: this._object })
}
function handler (queue, before, operation) {
return function (request, response, next) {
var vargs = Array.prototype.slice.call(arguments, 3)
request.entry = {
when: {
push: Date.now(),
start: null,
headers: null,
finish: null
},
health: {
push: JSON.parse(JSON.stringify(queue.turnstile.health)),
start: null,
finish: null
},
duration: {
start: null,
headers: null,
finish: null,
},
request: {
method: request.method,
header: request.headers,
url: request.url,
vargs: vargs
}
}
before(request, response, function (error) {
queue.push({
error: coalesce(error),
operation: operation,
request: request,
response: response,
vargs: vargs
})
})
}
}
function Reactor (object, configurator) {
var constructor = new Constructor(object, this._dispatch = {})
configurator(constructor)
this.turnstile = new Turnstile({
Date: coalesce(constructor.Date, Date),
turnstiles: coalesce(constructor.turnstiles, 24),
timeout: coalesce(constructor.timeout)
})
this._queue = new Turnstile.Queue(this, '_respond', this.turnstile)
this._logger = constructor.logger
this._completed = coalesce(constructor.completed, noop)
this._object = object
if (constructor._defaultUse) {
constructor.useDefault()
}
var before = require('connect')()
constructor._use.forEach(function (middleware) {
before.use(middleware)
})
var dispatcher = {}
for (var pattern in this._dispatch) {
dispatcher[pattern] = handler(this._queue, before, this._dispatch[pattern])
}
this.middleware = require('connect')().use(dispatch(dispatcher))
}
Reactor.prototype._timeout = cadence(function () { throw 503 })
function createProperties (properties) {
return {
statusCode: properties.statusCode,
headers: coalesce(properties.headers, {}),
description: coalesce(properties.description)
}
}
Reactor.prototype._respond = cadence(function (async, envelope) {
var work = envelope.body
var next = work.next
var entry = work.request.entry
entry.when.start = Date.now()
entry.health.start = JSON.parse(JSON.stringify(this.turnstile.health))
work.request.entry = entry
if (envelope.timedout) {
work.operation = Operation([ this, '_timeout' ])
}
var finish
async(function () {
async(function () {
async([function () {
async(function () {
if (work.error) {
throw { cause: work.error, statusCode: coalesce(work.error.statusCode) }
}
work.operation.apply(null, [ work.request ].concat(work.vargs, async()))
}, function () {
return arrayed(Array.prototype.slice.call(arguments))
})
}, function (caught) {
for (;;) {
try {
return rescue(/^reactor#http$/m, function (error) {
var statusCode = error.statusCode
var description = coalesce(error.description, http.STATUS_CODES[statusCode])
var headers = coalesce(error.headers, {})
var body = coalesce(error.body, description)
entry.error = coalesce(error.cause)
interrupt.assert(description != null, 'unknown.http.status', { statusCode: statusCode })
return {
statusCode,
description: description,
headers: headers,
body: body
}
})(caught)
} catch (error) {
if (
typeof error == 'number' &&
!isNaN(error) &&
(error | 0) === error &&
Math.floor(error / 100) <= 5 &&
Math.floor(error / 100) >= 3
) {
error = { statusCode: error }
} else if (typeof error == 'string') {
error = { statusCode: 307, location: error }
}
if (Array.isArray(error)) {
error = interrupt('http', arrayed(error.slice()))
} else if (
! (error instanceof Error) &&
typeof error == 'object' &&
typeof error.statusCode == 'number' &&
!isNaN(error.statusCode) &&
(error.statusCode | 0) === error.statusCode &&
Math.floor(error.statusCode / 100) <= 5 &&
Math.floor(error.statusCode / 100) >= 2
) {
var properties = createProperties(error)
if (error.location) {
properties.headers.location = error.location
}
error = interrupt('http', properties, { cause: coalesce(error.cause) })
} else {
error = { statusCode: 500, cause: error }
}
caught = error
}
}
}])
}, function (responder) {
var body, f
if (typeof responder.body == 'function') {
f = responder.body
} else {
if (!('content-type' in responder.headers)) {
responder.headers['content-type'] = 'application/json'
}
var type = typer.parse(responder.headers['content-type'])
if (
!Buffer.isBuffer(responder.body) &&
type.type + '/' + type.subtype == 'application/json'
) {
responder.body = new Buffer(JSON.stringify(responder.body) + '\n')
}
f = function (response) {
response.end(responder.body)
}
}
work.response.writeHead(responder.statusCode, responder.description, responder.headers)
entry.when.headers = Date.now()
entry.statusCode = responder.statusCode
entry.description = responder.description
entry.headers = responder.headers
var finish = delta(async()).ee(work.response).on('finish')
async([function () {
async(function () {
if (f.length == 2) {
f.call(work.operation.object, work.response, async())
} else {
return [ f.call(work.operation.object, work.response) ]
}
}, function () {
return []
})
}, function (error) {
entry.error = error
work.response.end()
finish.cancel()
}])
})
}, function () {
entry.health.done = JSON.parse(JSON.stringify(this.turnstile.health))
entry.when.finish = Date.now()
entry.duration = {
start: entry.when.start - entry.when.push,
headers: entry.when.headers - entry.when.push,
finish: entry.when.finish - entry.when.push
}
this._logger.call(null, entry)
this._completed.call(null, entry)
})
})
Reactor.resend = function (statusCode, headers, body) {
return Reactor.send({ statusCode: statusCode, headers: headers }, body)
}
Reactor.send = function (properties, buffer) {
var headers = JSON.parse(JSON.stringify(properties.headers))
delete headers['content-length']
delete headers['transfer-encoding']
return [ properties.statusCode, properties.statusMessage, headers, buffer ]
}
Reactor.stream = function (properties, stream) {
var vargs = Array.prototype.slice.call(arguments)
var stream = vargs.pop()
vargs.push(function (response) { stream.pipe(response) })
var response = arrayed(vargs)
response.headers['content-length']
response.headers['transfer-encoding'] = 'chunked'
return [ response.statusCode, response.description, response.headers, response.body ]
}
module.exports = Reactor