From 69c441ba56b3e69a1f77beb62106c173652cadd1 Mon Sep 17 00:00:00 2001 From: garrettmills Date: Sat, 10 Apr 2021 09:00:45 -0500 Subject: [PATCH] Make realtime database concurrency-safe using mutex-style locking --- src/app/FirebaseResource.ts | 32 +++++++++++------ src/app/configs/app.config.ts | 8 +++-- src/app/units/FirebaseUnit.ts | 59 ++++++++++++++++++++++++++++++- src/app/units/rtdb/Transaction.ts | 2 ++ 4 files changed, 86 insertions(+), 15 deletions(-) diff --git a/src/app/FirebaseResource.ts b/src/app/FirebaseResource.ts index bc25ac3..dc57a40 100644 --- a/src/app/FirebaseResource.ts +++ b/src/app/FirebaseResource.ts @@ -65,24 +65,34 @@ export class FirebaseResource extends Iterable(collection).max('seqID') + 1 + } + /** * Push a new item into the collection. * @param item */ async push(item: T): Promise { - item.seqID = await this.getNextID() - // @ts-ignore - delete item.firebaseID - await this.ref().push(item) + await this.ref().transaction((collection) => { + if ( !collection ) collection = [] + item.seqID = this.findNextId(collection) - // Look up the firebaseID - await new Promise((res, rej) => { - this.ref().orderByChild('seqID') - .limitToLast(1) + // @ts-ignore + delete item.firebaseID + collection.push(item) + + return collection + }) + + await new Promise(res => { + this.ref() + .orderByChild('seqID') + .startAt(item.seqID) + .limitToFirst(1) .on('value', snapshot => { - if ( snapshot.val() ) { - item.firebaseID = Object.keys(snapshot.val())[0] - } + console.log('got push ID back', snapshot.val(), snapshot.key) res() }) }) diff --git a/src/app/configs/app.config.ts b/src/app/configs/app.config.ts index 1254bf4..0f61a08 100644 --- a/src/app/configs/app.config.ts +++ b/src/app/configs/app.config.ts @@ -31,9 +31,11 @@ export default { // Mapping of ref-shortname to actual database reference // If you add a value here, also add it to the RTDBRef type alias refs: { - peers: 'chain/server/peers', - transaction: 'chain/pending/transactions', - block: 'chain/local/block', + locks: 'server/locks', // Mutex-style locks for database refs + peers: 'server/peers', // Collection of federated peers + transaction: 'chain/pending/transactions', // List of pending encounter transactions + exposure: 'chain/pending/exposures', // List of pending exposure notifications + block: 'chain/block', // The blockchain itself }, }, } diff --git a/src/app/units/FirebaseUnit.ts b/src/app/units/FirebaseUnit.ts index 55dd825..3da2cae 100644 --- a/src/app/units/FirebaseUnit.ts +++ b/src/app/units/FirebaseUnit.ts @@ -2,7 +2,7 @@ import { Singleton, Inject } from "@extollo/di" import { Unit, Logging, Config } from "@extollo/lib" import * as firebase from "firebase-admin" -export type RTDBRef = 'peers' | 'transaction' | 'block' +export type RTDBRef = 'peers' | 'transaction' | 'block' | 'exposure' | 'locks' /** * FirebaseUnit Unit @@ -19,16 +19,66 @@ export class FirebaseUnit extends Unit { @Inject() protected readonly config!: Config + /** Get the underlying Firebase library. */ get() { return this._firebase } + /** Get a realtime-database Reference using our internal aliases. */ ref(name: RTDBRef): firebase.database.Reference { return this._firebase.database().ref( String(this.config.get(`app.firebase.rtdb.refs.${name}`)) ) } + /** Get the realtime database object directly. */ + db(): firebase.database.Database { + return this._firebase.database() + } + + /** + * Try to lock the given database ref alias. + * Promise will sleep if lock is held, and will resolve once lock is acquired. + * @param name + */ + async trylock(name: RTDBRef): Promise { + return this._firebase.database() + .ref(`${this.config.get('app.firebase.rtdb.refs.locks')}/${name}`) + .transaction(current => { + if ( !current || current.time < 1 ) { + return { + time: (new Date).getTime(), + } + } + }, undefined, false).then(async result => { + if ( result.committed ) { + this.logging.debug(`Lock acquired: ${name}`) + return Promise.resolve() + } + + this.logging.debug(`Unable to acquire lock: ${name}. Trying again soon...`) + await this.sleep(500) + return this.trylock(name) + }) + .catch(async reason => { + this.logging.debug(`Unable to acquire lock: ${name}. Trying again soon...`) + await this.sleep(500) + return this.trylock(name) + }) + } + + /** + * Release the lock on the given database ref. + * @param name + */ + async unlock(name: RTDBRef) { + await this._firebase.database() + .ref(`${this.config.get('app.firebase.rtdb.refs.locks')}/${name}`) + .set({time: 0}, err => { + if ( err ) this.logging.error(err) + }) + } + /** Called on app start. */ public async up() { this.logging.info('Initializing Firebase application credentials...') @@ -42,4 +92,11 @@ export class FirebaseUnit extends Unit { public async down() { } + + /** Sleep for (roughly) the given number of milliseconds. */ + async sleep(ms: number) { + await new Promise(res => { + setTimeout(res, ms) + }) + } } diff --git a/src/app/units/rtdb/Transaction.ts b/src/app/units/rtdb/Transaction.ts index a3ae171..1407ac2 100644 --- a/src/app/units/rtdb/Transaction.ts +++ b/src/app/units/rtdb/Transaction.ts @@ -77,6 +77,7 @@ export class Transaction extends Unit { this.firebase.ref("transaction").on("child_added", async () => { this.logging.debug('Received child_added event for transactions reference.') if ( !this.claim() ) return + await this.firebase.trylock('block') // array of pairs of transaction resource items let groupedTransactions: [TransactionResourceItem, TransactionResourceItem][] = [] @@ -121,6 +122,7 @@ export class Transaction extends Unit { } this.release() + await this.firebase.unlock('block') }) }