• kibitzer.js

  • ¶

    TODO I suppose my attitude is to use an event emitter very sparingly, because I want to use one to emit Kibitzer events, the paxos log, but I don’t want to create an interface that looks like one of the EventEmitter heavy interfaces.

    One of the challenges here is that events are going to flow immediately, unless I do something compliacated like only emit events when there is a listener. Having looked at that, it is not actually that complicated.

    Now we can fuss about some terminate logic.

    Put it out again, I prefer to program with error-first callbacks, but events happen. They are generally a way in which information enters the system. They are synchronous. Here is more information. Generally, they shouldn’t block.

    That’s how I use EventEmitters, but I’m not in for a penny, in for a pound. If there is a source of events, I treat that as a stream of events. Not in the Node.js streams sense, but I create an event emitter that emits a homogenous series of events terminated by a specific termination event.

    Then I stop. I don’t go on to implement multi-interfaces that could take an error-first callback, or maybe register an event handler, or maybe use some other form of asynchronous notification.

    TODO Which is why I’ve made the event emitter in this class a separate object. Which is such a good idea, I belive I’ll go and do it Happenstance.

    TODO Here I am, back to remove the EventEmitter. Not sure how anyone is able to program that way, treating everything in your application as a stream. Pushing log entires as events, hard to reason about this firehose events, and it makes any asynchronous calls in response to an event require subsequent queuing of events, so why not use this queue that already exists?

  • ¶

    Quality control.

    var assert = require('assert')
    
    
  • ¶

    EventEmitter API.

    var events = require('events')
    var util = require('util')
    var departure = require('departure')
    
    
  • ¶

    Common utiltieis.

    var util = require('util')
    var nop = require('nop')
    
    
  • ¶

    Control-flow libraries.

    var abend = require('abend')
    var cadence = require('cadence')
    var Timer = require('happenstance').Timer
    var Procession = require('procession')
    
    
  • ¶

    Paxos libraries.

    var Paxos = require('paxos')
    var Islander = require('islander')
    var Monotonic = require('monotonic').asString
    
    
  • ¶

    Construction notification and destruction.

    var Destructible = require('destructible')
    var Signal = require('signal')
    
    
  • ¶

    Logging.

    var logger = require('prolific.logger').createLogger('kibitz')
    
    
  • ¶

    Message queue.

    var Caller = require('conduit/caller')
    
    
  • ¶

    Catch exceptions based on a regex match of an error message or property.

    var rescue = require('rescue')
    
    
  • ¶

    The Kibitzer object contains an islander, which will submit messages to Paxos, track the log generated by Paxos and resubit any messages that might have been dropped.

  • ¶
    function Kibitzer (options) {
    
  • ¶

    Log used to drain Islander.

        this.log = new Procession
    
    
  • ¶

    These defaults are a bit harsh if you’re going to log everything.

        options.ping || (options.ping = 250)
        options.timeout || (options.timeout = 1000)
    
    
  • ¶

    Time obtained from optional Date for unit testing.

        this._Date = options.Date || Date
    
        this.paxos = new Paxos(this._Date.now(), null, options.id, {
            ping: options.ping,
            timeout: options.timeout
        })
    
    
  • ¶

    Submission queue with resubmission logic.

        this.islander = new Islander(options.id)
    
    
  • ¶

    Copy messages from the Paxos log to our log.

        this.paxos.log.shifter().pump(this.log, 'enqueue')
    
    
  • ¶

    Paxos also sends messages to Islander for accounting.

        this.paxos.log.shifter().pump(this.islander, 'enqueue')
    
        this._shifters = null
    
    
  • ¶

    Caller to make network requests.

        this._caller = new Caller
    
        this.read = this._caller.read
        this.write = this._caller.write
    
        this.played = new Procession
    
    
        this._destructible = new Destructible(1000, 'kibitzer')
        this._destructible.markDestroyed(this, 'destroyed')
    
        this._shifters = {
            paxos: this.paxos.outbox.shifter(),
            islander: this.islander.outbox.shifter()
        }
    
        this._destructible.addDestructor('publish', this._shifters.islander, 'destroy')
        this._destructible.addDestructor('send', this._shifters.paxos, 'destroy')
    
        this._destructible.addDestructor('scheduler', this.paxos.scheduler, 'clear')
        this._destructible.addDestructor('eos', this.write, 'push')
    
        this.destruction = this._destructible.events
    
        this.ready = new Signal
    }
    
    Kibitzer.prototype.listen = cadence(function (async) {
    
  • ¶

    TODO Pass an “operation” to Procession.pump.

        var timer = new Timer(this.paxos.scheduler)
        timer.events.shifter().pump(function (envelope) {
            logger.info('timer', envelope)
            this.play('event', envelope)
        }.bind(this))
        this.paxos.scheduler.events.shifter().pump(timer, 'enqueue')
        this._publish(this._destructible.monitor('publish'))
        this._send(this._destructible.monitor('send'))
        this.ready.unlatch()
        this._destructible.completed.wait(async())
    })
    
    
  • ¶

    You can just as easily use POSIX time for the republic.

    Kibitzer.prototype.bootstrap = function (republic, properties) {
        assert(republic != null)
        this.play('bootstrap', { republic: republic, properties: properties })
    }
    
    
  • ¶

    Enqueue a user message into the Islander. The Islander will submit the message, monitor the atomic log, and then resubmit the message if it detects that the message was lost.

    Kibitzer.prototype.publish = function (entry) {
        this.play('publish', entry)
    }
    
    
  • ¶

    Called by your network implementation with messages enqueued from another Kibitz.

    Kibitzer.prototype.request = cadence(function (async, envelope) {
        switch (envelope.method) {
        case 'immigrate':
            this._immigrate(envelope.body, async())
            break
        case 'receive':
            return [ this.play('receive', envelope.body) ]
        case 'enqueue':
            return [ this.play('enqueue', envelope.body) ]
        }
    })
    
    Kibitzer.prototype.play = function (method, body) {
        var envelope = {
            module: 'kibitz',
            method: method,
            when: this._Date.now(),
            body: body
        }
        return this.replay(envelope)
    }
    
    
    Kibitzer.prototype.replay = function (envelope) {
        this.played.push(envelope)
        switch (envelope.method) {
        case 'bootstrap':
            this.paxos.republic = envelope.body.republic
            this.paxos.bootstrap(envelope.when, envelope.body.properties)
            break
        case 'join':
            this.paxos.cookie = envelope.when
            this.paxos.republic = envelope.body.republic
            break
        case 'naturalize':
            this.paxos.naturalize()
            break
        case 'event':
            this.paxos.event(envelope.body)
            break
        case 'immigrate':
            var body = envelope.body
            return this.paxos.immigrate(envelope.when, body.republic, body.id, body.cookie, body.properties)
        case 'receive':
    
  • ¶

    TODO Split pulse from messages somehow, make them siblings, not nested.

            return this.paxos.request(envelope.when, envelope.body)
        case 'enqueue':
            return this._enqueue(envelope.when, envelope.body)
        case 'publish':
            this.islander.publish(envelope.body)
            break
        case 'published':
            this.islander.sent(envelope.body.cookie, envelope.body.promises)
            break
        case 'sent':
            this.paxos.response(envelope.when, envelope.body.cookie, envelope.body.responses)
            break
        }
    }
    
    
  • ¶

    Stop timers, and stop timers only. We’re not in a position to notify clients that there will be no more messages.

    Kibitzer.prototype.destroy = function () {
        this._destructible.destroy()
    }
    
    
  • ¶

    TODO You are assuming that an address is not an address but a set of properties, so you need to provide those properties for leader as an argument, not just a url or identifier.

  • ¶
    Kibitzer.prototype.join = cadence(function (async, republic, leader, properties) {
    
  • ¶

    TODO Should this be or should this not be? It should be. You’re sending your enqueue messages until you immigrate. You don’t know when that will be. You’re only going to know if you’ve succeeded if your legislator has immigrated. That’s the only way.

  • ¶

    TODO Was a test, but it is now an assertion and it really ought be an exception because it is not impossible.

        if (this.paxos.government.promise != '0/0') {
            return
        }
    
        assert(republic != null)
    
    
  • ¶

    throw new Error

        async(function () {
            this.play('join', { republic: republic })
    
  • ¶

    Note that we’re passing properties so that they’re logged for inspection during debugging replay, but they’re not going to be used as an argument to Paxos on this side. We give them to our leader when we request immigration.

            this._caller.invoke({
                module: 'kibitz',
                method: 'immigrate',
                to: leader,
                body: {
                    republic: this.paxos.republic,
                    id: this.paxos.id,
                    cookie: this.paxos.cookie,
                    properties: properties,
                    hops: 0
                }
            }, async())
        }, function (response) {
            return response != null && response.enqueued
        })
    })
    
    Kibitzer.prototype.naturalize = function () {
        this.play('naturalize', {})
    }
    
    
  • ¶

    Publish to consensus algorithm from islander retryable client.

    Kibitzer.prototype._publish = cadence(function (async) {
        var loop = async(function () {
            this._shifters.islander.dequeue(async())
        }, function (envelope) {
            if (envelope == null) {
                return [ loop.break ]
            }
            async([function () {
                var properties = this.paxos.government.properties[this.paxos.government.majority[0]]
                this._caller.invoke({
                    module: 'kibitz',
                    method: 'enqueue',
                    to: properties,
                    body: {
                        republic: this.paxos.republic,
                        entries: envelope.messages
                    }
                }, async())
            }, rescue(/^conduit#endOfStream$/m, null)], function (promises) {
                this.play('published', { cookie: envelope.cookie, promises: promises })
            })
        })()
    })
    
    
  • ¶

    TODO Annoying how difficult it is to stop this crazy thing. There are going to be race conditions where we have a termination, come in, we shut things down, but then we continue with processing a pulse which triggers a timer. Sending messages to paxos can restart it’s scheduler.

    TODO We could kill the timer in the scheduler, set the boolean we added to tell it to no longer schedule.

    Kibitzer.prototype._send = cadence(function (async) {
        var loop = async(function () {
            this._shifters.paxos.dequeue(async())
        }, function (communique) {
            if (communique == null) {
                return [ loop.break ]
            }
            var responses = {}
            async(function () {
                communique.envelopes.forEach(function (envelope) {
                    async([function () {
                        this._caller.invoke({
                            module: 'kibitz',
                            method: 'receive',
                            to: envelope.properties,
                            body: envelope.request
                        }, async())
                    }, rescue(/^conduit#endOfStream$/m, null)], function (response) {
                        communique.responses[envelope.to] = response
                    })
                }, this)
            }, function () {
                this.play('sent', { cookie: communique.cookie, responses: communique.responses })
            })
        })()
    })
    
    
  • ¶

    TODO Hopping is a second way of doing a thing and we don’t need a second way of doing a thing.

    Kibitzer.prototype._immigrate = cadence(function (async, post) {
        async(function () {
            assert(post.hops != null)
            var outcome = this.play('immigrate', post)
            return outcome
        }, function (outcome) {
            if (!outcome.enqueued && outcome.leader != null && post.hops == 0) {
                var properties = this.paxos.government.properties[outcome.leader]
                post.hops++
                this._caller.invoke({
                    module: 'kibtiz',
                    method: 'immigrate',
                    to: properties,
                    body: post
                }, async())
            } else {
                return [ outcome ]
            }
        })
    })
    
    Kibitzer.prototype._enqueue = function (when, post) {
        var promises = {}
        for (var i = 0, I = post.entries.length; i < I; i++) {
            var entry = post.entries[i]
            var outcome = this.paxos.enqueue(when, post.republic, entry)
            if (!outcome.enqueued) {
                promises = null
                break
            }
            promises[entry.cookie] = outcome.promise
        }
        return promises
    }
    
    module.exports = Kibitzer