open.ephemeral('shutdown', async () => {
await memento._destructible.mutators.drain()
await memento._locker.drain()
await memento._locker.rotate()
for (const store in memento._stores) {
memento._stores[store].destructible.decrement()
for (const index in memento._stores[store].indices) {
memento._stores[store].indices[index].destructible.decrement()
}
}
})
})
const list = async () => {
try {
return await fs.readdir(memento.directory)
} catch (error) {
rescue(error, [{ code: 'ENOENT' }])
await fs.mdkir(memento.directory, { recursive: true })
return await list()
}
}
memento._commits = new Strata(memento._destructible.amalgamators.durable('commits'), {
directory: path.resolve(memento.directory, 'commits'),
cache: memento._cache,
comparator: (left, right) => left - right,
serializer: 'json'
})
const subdirs = [ 'versions', 'stores', 'indices', 'commits' ].sort()
const dirs = await list()
if (dirs.length == 0) {
for (const dir of subdirs) {
await fs.mkdir(path.resolve(memento.directory, dir))
}
await fs.mkdir(path.resolve(memento.directory, './versions/0'))
await memento._commits.create()
} else {
await memento._commits.open()
const versions = new Set
const iterator = riffle.forward(memento._commits, Strata.MIN)
const trampoline = new Trampoline
while (! iterator.done) {
iterator.next(trampoline, items => {
for (const item of items) {
versions.add(item.parts[0])
}
})
while (trampoline.seek()) {
await trampoline.shift()
}
}
for (const store of (await fs.readdir(path.join(directory, 'stores')))) {
await memento._store(versions, store, path.join(directory, 'stores', store))
}
for (const store of (await fs.readdir(path.join(directory, 'indices')))) {
for (const index of (await fs.readdir(path.join(directory, 'indices', store)))) {
await memento._index(versions, [ store, index ], path.join(directory, 'indices', store, index))
}
}
}
const versions = await fs.readdir(path.resolve(memento.directory, 'versions'))
const latest = versions.sort((left, right) => +left - +right).pop()
if (latest < version) {
}
if (latest < version && upgrade != null) {
const journalist = await Journalist.create(memento.directory)
const schema = new Schema(journalist, memento, version)
try {
await upgrade(schema)
await schema.commit()
await journalist.mkdir(path.join('versions', String(version)))
await journalist.write()
await Journalist.prepare(journalist)
await Journalist.commit(journalist)
await journalist.dispose()
await memento._destructible.open.destroy().rejected
return await Memento.open({
destructible,
directory,
version,
comparators
}, upgrade)
} catch (error) {
await schema._rollback()
if (error === ROLLBACK) {
throw new Memento.Error('rollback')
}
throw error
}
}
return memento
}
get _version () {
throw new Error
}
async _open (upgrade, version) {
}
async _store (versions, name, directory, create = false) {
const comparisons = JSON.parse(await fs.readFile(path.join(directory, 'key.json'), 'utf8'))
const extractors = comparisons.map(part => {
return function (object) {
const parts = part.parts.slice()
while (object != null && parts.length != 0) {
object = object[parts.shift()]
}
return object
}
})
const extractor = function (parts) {
return extractors.map(extractor => extractor(parts[0]))
}
const comparator = ascension(comparisons.map(part => {
return [
typeof part.type == 'string'
? this._comparators[part.type]
: ASCENSION_TYPE[part.type],
part.direction
]
}), function (object) {
return object
})
const destructible = this._destructible.amalgamators.ephemeral([ 'store', name ])
destructible.increment()
const amalgamator = new Amalgamator(destructible, {
locker: this._locker,
directory: path.join(directory, 'store'),
cache: this._cache,
key: {
extract: extractor,
compare: comparator,
serialize: function (key) {
return [ Buffer.from(JSON.stringify(key)) ]
},
deserialize: function (parts) {
return JSON.parse(parts[0].toString())
}
},
parts: {
serialize: function (parts) {
return [ Buffer.from(JSON.stringify(parts)) ]
},
deserialize: function (parts) {
return JSON.parse(parts[0].toString())
}
},
transformer: function (operation) {
if (operation.parts[0].method == 'insert') {
return {
method: 'insert',
key: operation.key[0],
parts: [ operation.parts[1] ]
}
}
return {
method: 'remove',
key: operation.key[0]
}
},
createIfMissing: create,
errorIfExists: create
})
await amalgamator.ready
await amalgamator.recover(versions)
this._stores[name] = { destructible, amalgamator, indices: {}, comparisons }
}
async _index (versions, [ storeName, name ], directory, create = false) {
const key = JSON.parse(await fs.readFile(path.join(directory, 'key.json'), 'utf8'))
const store = this._stores[storeName]
const comparisons = key.comparisons.concat(store.comparisons)
const extractors = comparisons.map(part => {
return function (object) {
const parts = part.parts.slice()
while (object != null && parts.length != 0) {
object = object[parts.shift()]
}
return object
}
})
const extractor = function (parts) {
return extractors.map(extractor => extractor(parts[0]))
}
const comparator = ascension(comparisons.map(part => {
return [
typeof part.type == 'string'
? this._comparators[part.type]
: ASCENSION_TYPE[part.type],
part.direction
]
}), function (object) {
return object
})
const destructible = this._destructible.amalgamators.ephemeral([ 'store', name ])
destructible.increment()
const amalgamator = new Amalgamator(destructible, {
locker: this._locker,
directory: path.join(directory, 'store'),
cache: this._cache,
key: {
compare: comparator,
serialize: function (key) {
return [ Buffer.from(JSON.stringify(key)) ]
},
deserialize: function (parts) {
return JSON.parse(parts[0].toString())
}
},
parts: {
serialize: function (parts) {
return [ Buffer.from(JSON.stringify(parts)) ]
},
deserialize: function (parts) {
return JSON.parse(parts[0].toString())
}
},
transformer: function (operation) {
if (operation.parts[0].method == 'insert') {
return {
method: 'insert',
key: operation.key[0],
parts: [ operation.parts[1] ]
}
}
return {
method: 'remove',
key: operation.key[0]
}
},
createIfMissing: create,
errorIfExists: create
})
await amalgamator.ready
await amalgamator.recover(versions)
store.indices[name] = {
destructible, amalgamator, extractor, keyLength: key.comparisons.length
}
}
async mutator (block) {
const mutator = new Mutator(this)
do {
try {
await block(mutator)
} catch (error) {
await mutator._rollback()
if (error === ROLLBACK) {
return
}
throw error
}
} while (! await mutator.commit())
}
async close () {
await this.destructible.destroy().rejected
}
}
module.exports = Memento