Make realtime database concurrency-safe using mutex-style locking

working-state
Garrett Mills 3 years ago
parent 5f3a1940e5
commit 69c441ba56
Signed by: garrettmills
GPG Key ID: D2BF5FBA8298F246

@ -65,24 +65,34 @@ export class FirebaseResource<T extends FirebaseResourceItem> extends Iterable<T
})
}
findNextId(collection: FirebaseResourceItem[]) {
if ( !collection.length ) return 0
return collect<FirebaseResourceItem>(collection).max<number>('seqID') + 1
}
/**
* Push a new item into the collection.
* @param item
*/
async push(item: T): Promise<T> {
item.seqID = await this.getNextID()
// @ts-ignore
delete item.firebaseID
await this.ref().push(item)
await this.ref().transaction((collection) => {
if ( !collection ) collection = []
item.seqID = this.findNextId(collection)
// Look up the firebaseID
await new Promise<void>((res, rej) => {
this.ref().orderByChild('seqID')
.limitToLast(1)
// @ts-ignore
delete item.firebaseID
collection.push(item)
return collection
})
await new Promise<void>(res => {
this.ref()
.orderByChild('seqID')
.startAt(item.seqID)
.limitToFirst(1)
.on('value', snapshot => {
if ( snapshot.val() ) {
item.firebaseID = Object.keys(snapshot.val())[0]
}
console.log('got push ID back', snapshot.val(), snapshot.key)
res()
})
})

@ -31,9 +31,11 @@ export default {
// Mapping of ref-shortname to actual database reference
// If you add a value here, also add it to the RTDBRef type alias
refs: {
peers: 'chain/server/peers',
transaction: 'chain/pending/transactions',
block: 'chain/local/block',
locks: 'server/locks', // Mutex-style locks for database refs
peers: 'server/peers', // Collection of federated peers
transaction: 'chain/pending/transactions', // List of pending encounter transactions
exposure: 'chain/pending/exposures', // List of pending exposure notifications
block: 'chain/block', // The blockchain itself
},
},
}

@ -2,7 +2,7 @@ import { Singleton, Inject } from "@extollo/di"
import { Unit, Logging, Config } from "@extollo/lib"
import * as firebase from "firebase-admin"
export type RTDBRef = 'peers' | 'transaction' | 'block'
export type RTDBRef = 'peers' | 'transaction' | 'block' | 'exposure' | 'locks'
/**
* FirebaseUnit Unit
@ -19,16 +19,66 @@ export class FirebaseUnit extends Unit {
@Inject()
protected readonly config!: Config
/** Get the underlying Firebase library. */
get() {
return this._firebase
}
/** Get a realtime-database Reference using our internal aliases. */
ref(name: RTDBRef): firebase.database.Reference {
return this._firebase.database().ref(
String(this.config.get(`app.firebase.rtdb.refs.${name}`))
)
}
/** Get the realtime database object directly. */
db(): firebase.database.Database {
return this._firebase.database()
}
/**
* Try to lock the given database ref alias.
* Promise will sleep if lock is held, and will resolve once lock is acquired.
* @param name
*/
async trylock(name: RTDBRef): Promise<any> {
return this._firebase.database()
.ref(`${this.config.get('app.firebase.rtdb.refs.locks')}/${name}`)
.transaction(current => {
if ( !current || current.time < 1 ) {
return {
time: (new Date).getTime(),
}
}
}, undefined, false).then(async result => {
if ( result.committed ) {
this.logging.debug(`Lock acquired: ${name}`)
return Promise.resolve()
}
this.logging.debug(`Unable to acquire lock: ${name}. Trying again soon...`)
await this.sleep(500)
return this.trylock(name)
})
.catch(async reason => {
this.logging.debug(`Unable to acquire lock: ${name}. Trying again soon...`)
await this.sleep(500)
return this.trylock(name)
})
}
/**
* Release the lock on the given database ref.
* @param name
*/
async unlock(name: RTDBRef) {
await this._firebase.database()
.ref(`${this.config.get('app.firebase.rtdb.refs.locks')}/${name}`)
.set({time: 0}, err => {
if ( err ) this.logging.error(err)
})
}
/** Called on app start. */
public async up() {
this.logging.info('Initializing Firebase application credentials...')
@ -42,4 +92,11 @@ export class FirebaseUnit extends Unit {
public async down() {
}
/** Sleep for (roughly) the given number of milliseconds. */
async sleep(ms: number) {
await new Promise<void>(res => {
setTimeout(res, ms)
})
}
}

@ -77,6 +77,7 @@ export class Transaction extends Unit {
this.firebase.ref("transaction").on("child_added", async () => {
this.logging.debug('Received child_added event for transactions reference.')
if ( !this.claim() ) return
await this.firebase.trylock('block')
// array of pairs of transaction resource items
let groupedTransactions: [TransactionResourceItem, TransactionResourceItem][] = []
@ -121,6 +122,7 @@ export class Transaction extends Unit {
}
this.release()
await this.firebase.unlock('block')
})
}

Loading…
Cancel
Save