diff --git a/src/app/FirebaseResource.ts b/src/app/FirebaseResource.ts index e3547a1..66b62dc 100644 --- a/src/app/FirebaseResource.ts +++ b/src/app/FirebaseResource.ts @@ -8,8 +8,7 @@ import {Application} from "@extollo/lib" * Base interface for an item in a Firebase RTDB collection. */ export interface FirebaseResourceItem { - firebaseID: string; - seqID: number; + firebaseID?: string; } /** @@ -29,20 +28,10 @@ export class FirebaseResource extends Iterable(FirebaseUnit).ref(this.refName) } - /** Get the next sequential ID. */ - async getNextID(): Promise { - return new Promise((res, rej) => { - this.ref().orderByChild('seqID') - .once('value', snapshot => { - res((this.resolveObject(snapshot.val()).reverse()?.[0]?.seqID ?? -1) + 1) - }, rej) - }) - } - /** Get the record at the ith index. */ async at(i: number): Promise { return new Promise((res, rej) => { - this.ref().orderByChild('seqID') + this.ref().orderByKey() .startAt(i).endAt(i) .once('value', snapshot => res(this.resolveObject(snapshot.val())[0]), rej) }) @@ -51,7 +40,7 @@ export class FirebaseResource extends Iterable> { return new Promise>((res, rej) => { - this.ref().orderByChild('seqID') + this.ref().orderByKey() .startAt(start).endAt(end) .once('value', snapshot => { res(new Collection(this.resolveObject(snapshot.val()))) @@ -61,34 +50,23 @@ export class FirebaseResource extends Iterable { + console.log('[COUNT CALLED ON FIREBASE RESOURCE]') return new Promise((res, rej) => { - this.ref().orderByChild('seqID') + this.ref().orderByKey() .once('value', snapshot => { res(this.resolveObject(snapshot.val()).length) }, rej) }) } - findNextId(collection: FirebaseResourceItem[]) { - if ( !collection.length ) return 0 - - let maxSeq = -1 - for ( const item of collection ) { - if ( !item ) continue - if ( !isNaN(item.seqID) && item.seqID > maxSeq ) { - maxSeq = item.seqID - } - } - - return maxSeq + 1 - } - /** * Push a new item into the collection. * @param item */ async push(item: T): Promise { - await this.ref().transaction((collection) => { + await this.ref().push(item) + return item + /*await this.ref().transaction((collection) => { if ( !collection ) collection = [] if ( typeof collection === 'object' ) collection = Object.values(collection) item.seqID = this.findNextId(collection) @@ -97,6 +75,7 @@ export class FirebaseResource extends Iterable extends Iterable { + return many((await this.blockchain.getSubmitChain()).map(x => { // @ts-ignore delete x.firebaseID return x @@ -62,7 +61,6 @@ export class Blockchain extends Controller { public async postTransaction() { const item: TransactionResourceItem = { firebaseID: '', - seqID: -1, combinedHash: String(this.request.input('combinedHash')), timestamp: parseInt(String(this.request.input('timestamp'))), encodedGPSLocation: String(this.request.input('encodedGPSLocation')), @@ -80,7 +78,6 @@ export class Blockchain extends Controller { public async postExposure() { const item: ExposureResourceItem = { firebaseID: '', - seqID: -1, clientID: String(this.request.input('clientID')), timestamp: parseInt(String(this.request.input('timestamp'))), } @@ -94,12 +91,13 @@ export class Blockchain extends Controller { if (!minTime) { minTime = (new Date).getTime() - this.config.get('app.defaultTime') } + const snapshot = await ( this.make(BlockResource)).ref() .orderByChild('timestamp') .startAt(minTime) .once('value') - - let blocks = (Object.values(snapshot.val()) as BlockResourceItem[]).filter((item: BlockResourceItem) => item.seqID !== 0) + + let blocks = (Object.values(snapshot.val()) as BlockResourceItem[]) return many(blocks) } diff --git a/src/app/units/Blockchain.ts b/src/app/units/Blockchain.ts index f84cf0e..c15e1a7 100644 --- a/src/app/units/Blockchain.ts +++ b/src/app/units/Blockchain.ts @@ -1,27 +1,32 @@ -import { Singleton, Inject } from "@extollo/di" -import { Unit, Logging, Config, Application } from "@extollo/lib" -import { FirebaseUnit } from "./FirebaseUnit" +import {Inject, Singleton} from "@extollo/di" +import {Application, Config, Logging, Unit} from "@extollo/lib" +import {FirebaseUnit} from "./FirebaseUnit" import { BlockEncounterTransaction, - BlockResource, BlockResourceItem, BlockTransaction, isBlockResourceItem } from "../rtdb/BlockResource" -import { TransactionResourceItem } from "../rtdb/TransactionResource" +import {TransactionResourceItem} from "../rtdb/TransactionResource" import * as openpgp from "openpgp" import * as crypto from "crypto" import axios from "axios" -import { collect, uuid_v4 } from "@extollo/util" +import {collect, uuid_v4} from "@extollo/util" import {ExposureResourceItem} from "../rtdb/ExposureResource" -import {PeerResource} from "../rtdb/PeerResource" + +async function pgpVerify(armoredKey: string, armoredMessage: string) { + const [publicKeys, message] = await Promise.all([ + openpgp.readKey({ armoredKey }), + openpgp.readMessage({ armoredMessage }) + ]) + + return !!(await (await openpgp.verify({ publicKeys, message })).signatures?.[0]?.verified) +} /** * Utility wrapper class for a block in the chain. */ export class Block implements BlockResourceItem { - firebaseID: string - seqID: number uuid: string transactions: BlockTransaction[] timestamp: number @@ -36,14 +41,12 @@ export class Block implements BlockResourceItem { } constructor(rec: BlockResourceItem) { - this.firebaseID = rec.firebaseID - this.seqID = rec.seqID - this.uuid = rec.uuid - this.transactions = rec.transactions - this.lastBlockHash = rec.lastBlockHash - this.lastBlockUUID = rec.lastBlockUUID + this.uuid = rec.uuid || uuid_v4() + this.transactions = rec.transactions || [] + this.lastBlockHash = rec.lastBlockHash || '' + this.lastBlockUUID = rec.lastBlockUUID || '' this.proof = rec.proof - this.timestamp = rec.timestamp + this.timestamp = rec.timestamp || (new Date).getTime() this.waitTime = rec.waitTime this.peer = rec.peer } @@ -57,16 +60,7 @@ export class Block implements BlockResourceItem { const proof = this.proof const publicKey = this.config.get("app.gpg.key.public") - const result = await openpgp.verify({ - publicKeys: await openpgp.readKey({ - armoredKey: publicKey, - }), - message: await openpgp.readMessage({ - armoredMessage: proof, - }), - }) - - return !!(await result.signatures?.[0]?.verified) + return pgpVerify(publicKey, proof) } /** Generate the hash for this block. */ @@ -79,8 +73,6 @@ export class Block implements BlockResourceItem { /** Cast the Block's data to a plain object. */ toItem(): BlockResourceItem { return { - seqID: this.seqID, - firebaseID: this.firebaseID, uuid: this.uuid, transactions: this.transactions, lastBlockHash: this.lastBlockHash, @@ -119,9 +111,9 @@ export interface Peer { */ @Singleton() export class Blockchain extends Unit { - private readonly MIN_WAIT_TIME = 10000 - private readonly MAX_WAIT_TIME = 20000 - private readonly PENALTY_INTERVAL = 1000 + private readonly MIN_WAIT_TIME = 1000 + private readonly MAX_WAIT_TIME = 3000 + private readonly PENALTY_INTERVAL = 500 private readonly MAX_PEERS_PENALTY = 10 @Inject() @@ -133,6 +125,12 @@ export class Blockchain extends Unit { @Inject() protected readonly config!: Config + protected approvedChain: Block[] = [] + + protected peers: Peer[] = [] + + protected breakForExit = false + /** * Block transactions that will be attempted as part of this host's * next block submission. @@ -140,47 +138,87 @@ export class Blockchain extends Unit { */ protected pendingTransactions: BlockTransaction[] = [] - protected pendingSubmit?: Block + protected publicKey!: openpgp.Key + protected privateKey!: openpgp.Key + protected genesisProof!: string - protected isSubmitting: boolean = false + protected nextWaitTime!: number + protected lastBlock!: Block + protected nextProof!: string async up() { + this.logging.info('Generating OpenPGP assets...') + this.publicKey = await openpgp.readKey({ + armoredKey: this.config.get("app.gpg.key.public") + }) + + this.privateKey = await openpgp.readKey({ + armoredKey: this.config.get("app.gpg.key.private") + }) + + this.genesisProof = await openpgp.sign({ + message: openpgp.Message.fromText('0000'), + privateKeys: this.privateKey, + }) + + this.logging.info('Performing initial load...') + await this.initialLoad() + + this.logging.info('Contacting configured peers...') const peers = this.config.get('server.peers') - for ( const peer of peers ) { - await this.registerPeer({ - host: peer, - }) - } + await Promise.all(peers.map((host: string) => this.registerPeer({ host }))) + + this.logging.info('Performing initial writeback...') + await this.writeback() + } + + async down() { + this.breakForExit = true + } + + async initialLoad() { + const [peers, chain] = await Promise.all([ + this.firebase.ref('peers') + .once('value') + .then(val => val.val()), + + this.firebase.ref('block') + .orderByKey() + .once('value') + .then(val => val.val()) + ]) + + this.logging.debug({peers, chain}) + + this.approvedChain = chain.map((item: BlockResourceItem) => new Block(item)) + this.peers = peers || [] } /** * Returns true if the given host is registered as a peer. * @param host */ - public async hasPeer(host: string): Promise { - const peers = await this.getPeers() - return peers.some(peer => peer.host.toLowerCase() === host.toLowerCase()) + public hasPeer(host: string): boolean { + return this.peers.some(x => x.host === host) } /** * Get a list of all registered peers. */ - public async getPeers(): Promise { - return PeerResource.collect().all() + public getPeers(): Peer[] { + return this.peers } /** * From a peer, fetch the submission blockchain, if it is valid. * @param peer - * @param resultOfPeerRefresh */ - public async getPeerSubmit(peer: Peer, resultOfPeerRefresh: boolean): Promise { + public async getPeerSubmit(peer: Peer): Promise { try { - const result = await axios.get(`${peer.host}api/v1/chain/submit${resultOfPeerRefresh ? '?resultOfPeerRefresh=true' : ''}`) + const result = await axios.get(`${peer.host}api/v1/chain/submit`) const blocks: unknown = result.data?.data?.records if ( Array.isArray(blocks) && blocks.every(block => { - const match = isBlockResourceItem(block) - return match + return isBlockResourceItem(block) }) ) { return blocks.map(x => new Block(x)) } @@ -195,35 +233,19 @@ export class Blockchain extends Unit { * @param peer */ public async registerPeer(peer: Peer) { - if (!(await this.hasPeer(peer.host))) { + if ( !this.hasPeer(peer.host) ) { this.logging.info(`Registering peer: ${peer.host}`) - await ( this.make(PeerResource)).push({ - firebaseID: '', - seqID: -1, - name: peer.name, - host: peer.host, - }) - const header = this.config.get('app.api_server_header') try { - console.log('return peering', [`${peer.host}api/v1/peer`, { - host: this.getBaseURL(), - }, { - headers: { - [header]: await this.getPeerToken(), - } - }]) await axios.post(`${peer.host}api/v1/peer`, { host: this.getBaseURL(), }, { headers: { - [header]: await this.getPeerToken(), + [header]: this.getPeerToken(), 'content-type': 'application/json', } }) - - this.refresh(false) } catch (e) { this.logging.error(e) } @@ -239,21 +261,12 @@ export class Blockchain extends Unit { const blocks = collect(chain) return ( await blocks.promiseMap(async (block, idx) => { - if ( await block.isGenesis() ) { - return true - } - const previous: Block | undefined = blocks.at(idx - 1) if ( !previous ) { this.logging.debug(`Chain is invalid: block ${idx} is missing previous ${idx - 1}.`) return false; } - if ( !(await this.validateProofOfWork(block, previous)) ) { - this.logging.debug(`Chain is invalid: block ${idx} failed proof of work validation`) - return false; - } - const pass = ( block.lastBlockUUID === previous.uuid && block.lastBlockHash === previous.hash() @@ -267,39 +280,43 @@ export class Blockchain extends Unit { lastBlockHash: block.lastBlockHash, computedLastHash: previous.hash(), }) + + return false } - return pass + if ( await block.isGenesis() ) { + return true + } + + if ( !(await this.validateProofOfWork(block, previous)) ) { + this.logging.debug(`Chain is invalid: block ${idx} failed proof of work validation`) + return false; + } + + return false }) ).every(Boolean) } - public async refresh(resultOfPeerRefresh: boolean) { - if ( this.isSubmitting ) { - return - } else { - this.isSubmitting = true - } - - await this.firebase.trylock('block', 'Blockchain_refresh') - const validSeqID = (await this.read()).reverse()[0]?.seqID + public async refresh() { + if ( this.breakForExit ) return; + this.logging.debug('Called refresh().') const peers = await this.getPeers() const time_x_block: {[key: string]: Block} = {} const time_x_blocks: {[key: string]: Block[]} = {} const time_x_peer: {[key: string]: Peer | true} = {} - for ( const peer of peers ) { - console.log('[PEERS]', peers) - const blocks: Block[] | undefined = await this.getPeerSubmit(peer, resultOfPeerRefresh) - console.log('[PEER BLOCKS]', blocks) + await Promise.all(peers.map(async peer => { + const blocks: Block[] | undefined = await this.getPeerSubmit(peer) + if ( blocks && await this.validate(blocks) ) { - const block = blocks.reverse()[0] - if ( !block || block.seqID === validSeqID || !block.seqID ) continue + const block = blocks.slice(-1)[0] + if ( !block ) return // TODO fixme const penalty = blocks.slice(0, 10) - .map(block => block.peer === peer.host) - .filter(Boolean).length * this.PENALTY_INTERVAL + .map(block => block.peer === peer.host) + .filter(Boolean).length * this.PENALTY_INTERVAL * (Math.min(peers.length, this.MAX_PEERS_PENALTY)) block.waitTime += penalty @@ -308,206 +325,120 @@ export class Blockchain extends Unit { time_x_blocks[block.waitTime] = blocks.reverse() time_x_peer[block.waitTime] = peer } else { - console.log('VALIDATION FAIL') + console.log('validation fail!') } - } + })) - if ( this.pendingTransactions.length && !this.pendingSubmit ) { - await this.attemptSubmit() - } + console.log(time_x_blocks, time_x_peer, time_x_block) - if ( this.pendingSubmit ) { - time_x_block[this.pendingSubmit.waitTime] = this.pendingSubmit - time_x_peer[this.pendingSubmit.waitTime] = true + const submitBlock = this.getSubmitBlock() + if ( submitBlock ) { + time_x_block[submitBlock.waitTime] = submitBlock + time_x_peer[submitBlock.waitTime] = true } + console.log('submit block', submitBlock) + const min = Math.min(...Object.keys(time_x_block).map(parseFloat)) - const block = time_x_block[min] const peer = time_x_peer[min] + console.log('peer?', peer) + if ( peer === true ) { - this.pendingSubmit = undefined + // Our version of the chain was accepted + this.approvedChain.push(submitBlock!) this.pendingTransactions = [] - await (this.app().make(BlockResource)).push(block) - } else { - await this.firebase.ref('block').set((time_x_blocks[min] || []).map(x => { - const item = x.toItem() - // @ts-ignore - delete item.firebaseID - - if ( !item.transactions ) { - item.transactions = [] + } else if ( peer ) { + // A different server's chain was accepted + this.approvedChain = (time_x_blocks[min] || []).map(block => { + if (!block.transactions) { + block.transactions = [] } - return item - })) - /*await this.firebase.ref('block').transaction((_) => { - return (time_x_blocks[min] || []).map(x => { - const item = x.toItem() - // @ts-ignore - delete item.firebaseID - - if ( !item.transactions ) { - item.transactions = [] - } - - return item - }) - })*/ - - this.pendingSubmit = undefined - await this.attemptSubmit() + return block + }) } - await this.firebase.unlock('block') - this.isSubmitting = false + console.log('approved chain', this.approvedChain) + await this.writeback() } - public async getSubmitChain(resultOfPeerRefresh: boolean): Promise { - await this.firebase.trylock('block', 'Blockchain_getSubmitChain') - const blocks = await this.read() - const submit = await this.attemptSubmit() - if ( submit ) { - submit.seqID = blocks.length > 0 ? collect(blocks).max('seqID') + 1 : 0 - blocks.push(submit.toItem()) - } - - await this.firebase.unlock('block') - if ( !resultOfPeerRefresh ) { - this.refresh(true) - } - return blocks + public getSubmitChain(): BlockResourceItem[] { + const submit = this.getSubmitBlock() + if ( !submit ) return this.approvedChain + else return [...this.approvedChain, submit] } - public async attemptSubmit() { - if ( !this.pendingSubmit && this.pendingTransactions.length ) { - const lastBlock = await this.getLastBlock() - const waitTime = this.random(this.MIN_WAIT_TIME, this.MAX_WAIT_TIME) - const proof = await this.generateProofOfWork(lastBlock, waitTime) - - const block: BlockResourceItem = { - timestamp: (new Date).getTime(), - uuid: uuid_v4(), - transactions: this.pendingTransactions, - lastBlockHash: lastBlock!.hash(), - lastBlockUUID: lastBlock!.uuid, - proof, - waitTime, - peer: this.getBaseURL(), - - firebaseID: '', - seqID: -1, - } + public async writeback() { + if ( this.breakForExit ) return; + this.logging.info('Generating initial proof-of-elapsed-time. This will take a second...') + this.nextWaitTime = this.random(this.MIN_WAIT_TIME, this.MAX_WAIT_TIME) + this.lastBlock = this.getLastBlock() + this.nextProof = await this.generateProofOfWork(this.lastBlock, this.nextWaitTime) - this.pendingSubmit = new Block(block) - } + console.log('writeback approved chain', this.approvedChain) - return this.pendingSubmit - } + await Promise.all([ + this.firebase.ref('block').set(this.approvedChain.map(x => x.toItem())), + this.firebase.ref('peers').set(this.peers) + ]) - /** - * Submit a group of encounter transactions to be added to the chain. - * @param group - */ - public async submitTransactions(group: [TransactionResourceItem, TransactionResourceItem]) { - const txes = group.map(item => this.getEncounterTransaction(item)) + this.refresh() + } - if ( this.pendingSubmit ) { - this.pendingSubmit.transactions.push(...txes) + public getSubmitBlock(): Block | undefined { + if ( !this.pendingTransactions?.length ) { + return } - this.pendingTransactions.push(...txes) - this.refresh(false) - - /*const lastBlock = await this.getLastBlock() - - this.logging.verbose('Last block:') - this.logging.verbose(lastBlock) - - const block: BlockResourceItem = { + return new Block({ timestamp: (new Date).getTime(), uuid: uuid_v4(), - transactions: group.map(item => this.getEncounterTransaction(item)), - lastBlockHash: lastBlock!.hash(), - lastBlockUUID: lastBlock!.uuid, - proof: await this.generateProofOfWork(lastBlock!), - - firebaseID: '', - seqID: -1, - } + transactions: this.pendingTransactions, + lastBlockHash: this.lastBlock.hash(), + lastBlockUUID: this.lastBlock.uuid, + proof: this.nextProof, + waitTime: this.nextWaitTime, + peer: this.getBaseURL(), + }) + } - await (this.app().make(BlockResource)).push(block) - return new Block(block)*/ + /** + * Submit a group of encounter transactions to be added to the chain. + * @param groups + */ + public submitTransactions(...groups: [TransactionResourceItem, TransactionResourceItem][]) { + groups.forEach(group => { + const txes = group.map(item => this.getEncounterTransaction(item)) + this.pendingTransactions.push(...txes) + }) } /** * Submit the given exposure notifications onto the blockchain. * @param exposures */ - public async submitExposures(...exposures: ExposureResourceItem[]) { - if ( this.pendingSubmit ) { - this.pendingSubmit.transactions.push(...exposures) - } - + public submitExposures(...exposures: ExposureResourceItem[]) { this.pendingTransactions.push(...exposures) - this.refresh(false) - - /*const lastBlock = await this.getLastBlock() - - this.logging.verbose('Last block:') - this.logging.verbose(lastBlock) - - const block: BlockResourceItem = { - timestamp: (new Date).getTime(), - uuid: uuid_v4(), - transactions: exposures, - lastBlockHash: lastBlock!.hash(), - lastBlockUUID: lastBlock!.uuid, - proof: await this.generateProofOfWork(lastBlock), - - firebaseID: '', - seqID: -1, - } - - await (this.app().make(BlockResource)).push(block) - return new Block(block)*/ } - public async getPeerToken() { - const message = openpgp.Message.fromText("0000") - const privateKey = this.config.get("app.gpg.key.private") - - return Buffer.from((await openpgp.sign({ - message, - // date: new Date(Date.now() - 30000), - privateKeys: await openpgp.readKey({ - armoredKey: privateKey - }), - })), 'utf-8').toString('base64') + public getPeerToken() { + return Buffer.from(this.genesisProof, 'utf-8') + .toString('base64') } /** * Instantiate the genesis block of the entire chain. */ - public async getGenesisBlock(): Promise { - const message = openpgp.Message.fromText("0000") - const privateKey = this.config.get("app.gpg.key.private") - + public getGenesisBlock(): Block { return new Block({ timestamp: (new Date).getTime(), uuid: '0000', transactions: [], lastBlockHash: '', lastBlockUUID: '', - proof: (await openpgp.sign({ - message, - // date: new Date(3000, 12), - privateKeys: await openpgp.readKey({ - armoredKey: privateKey - }), - })), + proof: this.genesisProof, firebaseID: '', - seqID: -1, waitTime: 0, peer: this.getBaseURL(), }) @@ -516,20 +447,26 @@ export class Blockchain extends Unit { /** * Get the last block in the blockchain, or push the genesis if one doesn't already exist. */ - public async getLastBlock(): Promise { - const rec: BlockResourceItem | undefined = await BlockResource.collect().last() - if (rec) return new Block(rec) + public getLastBlock(): Block { + if ( !this.approvedChain ) { + this.approvedChain = [] + } - const genesis = (await this.getGenesisBlock()).toItem() - await (this.app().make(BlockResource)).push(genesis) - return new Block(genesis) + const rec = this.approvedChain.slice(-1)[0] + if (rec) return rec + + const genesis = this.getGenesisBlock() + this.approvedChain.push(genesis) + return genesis } /** * Get a list of all blocks in the chain, in order. */ - public async read(): Promise { - return BlockResource.collect().all() + public read(): Promise { + return this.firebase.ref('block') + .once('value') + .then(snap => snap.val()) } /** @@ -553,19 +490,15 @@ export class Blockchain extends Unit { */ protected async generateProofOfWork(lastBlock: Block, waitTime: number): Promise { const hashString = lastBlock.hash() - const privateKey = this.config.get("app.gpg.key.private") const message = openpgp.Message.fromText(hashString) await this.sleep(waitTime) // Sign the hash using the server's private key - return (await openpgp.sign({ + return openpgp.sign({ message, - // date: new Date(3000, 12), - privateKeys: await openpgp.readKey({ - armoredKey: privateKey, - }) - })) + privateKeys: this.privateKey, + }) } /** @@ -574,20 +507,10 @@ export class Blockchain extends Unit { * @param lastBlock * @protected */ - protected async validateProofOfWork(currentBlock: Block, lastBlock: Block): Promise { + protected validateProofOfWork(currentBlock: Block, lastBlock: Block): Promise { const proof = lastBlock.proof const publicKey = this.config.get("app.gpg.key.public") - - const result = await openpgp.verify({ - publicKeys: await openpgp.readKey({ - armoredKey: publicKey, - }), - message: await openpgp.readMessage({ - armoredMessage: proof, - }), - }) - - return !!(await result.signatures?.[0]?.verified) + return pgpVerify(publicKey, proof) } /** diff --git a/src/app/units/rtdb/Exposure.ts b/src/app/units/rtdb/Exposure.ts index 657ff94..50ddd5a 100644 --- a/src/app/units/rtdb/Exposure.ts +++ b/src/app/units/rtdb/Exposure.ts @@ -44,8 +44,8 @@ export class Exposure extends Unit { * Subscribe to the transactions reference and wait for new transactions to be added. */ public async up() { - this.firebase.ref('exposure').on('child_added', async (snapshot) => { - this.logging.debug('Received child_added event for exposures reference.') + this.firebase.ref('exposure').on('child_added', (snapshot) => { + /*this.logging.debug('Received child_added event for exposures reference.') if ( !this.claim() ) return // await this.firebase.trylock('block', 'Exposure_child_added') @@ -58,7 +58,9 @@ export class Exposure extends Unit { await ( this.make(ExposureResource)).ref().child(snapshot.key).remove() this.release() - // await this.firebase.unlock('block') + // await this.firebase.unlock('block')*/ + + this.blockchain.submitExposures(snapshot.val()) }) } @@ -67,6 +69,6 @@ export class Exposure extends Unit { */ public async down() { // Release all subscriptions before shutdown - this.firebase.ref("transaction").off() + this.firebase.ref('exposure').off() } } diff --git a/src/app/units/rtdb/Transaction.ts b/src/app/units/rtdb/Transaction.ts index 47ae6d1..beec348 100644 --- a/src/app/units/rtdb/Transaction.ts +++ b/src/app/units/rtdb/Transaction.ts @@ -14,9 +14,6 @@ import { Blockchain } from "../Blockchain" */ @Singleton() export class Transaction extends Unit { - /** True if currently processing transactions. */ - private processing: boolean = false - @Inject() protected readonly firebase!: FirebaseUnit @@ -26,64 +23,69 @@ export class Transaction extends Unit { @Inject() protected readonly logging!: Logging - /** Claim the right to process transactions. Returns true if the right was granted. */ - claim() { - if ( !this.processing ) { - this.processing = true - return true - } - - return false - } - - /** Release the right to claim transactions. */ - release() { - this.processing = false - } - - /** - * Given two transactions, determine whether the came from a valid interaction. - * That is, do the two transactions vouch for each-other cryptographically. - * @param transaction1 - * @param transaction2 - */ - public async compareTransactions(transaction1: TransactionResourceItem, transaction2: TransactionResourceItem) { - // verify signature - const result1 = await openpgp.verify({ - publicKeys: await openpgp.readKey({ - armoredKey: transaction2.partnerPublicKey + async compare(t1: TransactionResourceItem, t2: TransactionResourceItem) { + const [t2key, t1sig, t1key, t2sig] = await Promise.all([ + openpgp.readKey({ + armoredKey: t2.partnerPublicKey }), - message: await openpgp.readMessage({ - armoredMessage: transaction1.validationSignature, + openpgp.readMessage({ + armoredMessage: t1.validationSignature, }), - }) + openpgp.readKey({ + armoredKey: t1.partnerPublicKey + }), + openpgp.readMessage({ + armoredMessage: t2.validationSignature, + }), + ]) - const result2 = await openpgp.verify({ - publicKeys: await openpgp.readKey({ - armoredKey: transaction1.partnerPublicKey + const [r1, r2] = await Promise.all([ + openpgp.verify({ + publicKeys: t2key, + message: t1sig, }), - message: await openpgp.readMessage({ - armoredMessage: transaction2.validationSignature, + openpgp.verify({ + publicKeys: t1key, + message: t2sig, }), - }) + ]) + + const [v1, v2] = await Promise.all([ + r1.signatures[0]?.verified, + r2.signatures[0]?.verified + ]) - return (await result1.signatures[0].verified) && (await result2.signatures[0].verified) + return v1 && v2 } /** * Subscribe to the transactions reference and wait for new transactions to be added. */ public async up() { - this.firebase.ref("transaction").on("child_added", async () => { + this.firebase.ref('transaction').on('value', snapshot => { + if ( !Array.isArray(snapshot.val()) || snapshot.val().length < 2 ) return; + + for ( const left of snapshot.val() ) { + for ( const right of snapshot.val() ) { + this.compare(left, right).then(match => { + if ( match ) { + this.blockchain.submitTransactions([left, right]) + } + }) + } + } + }) + + /*this.firebase.ref("transaction").on("child_added", async () => { this.logging.debug('Received child_added event for transactions reference.') - if ( !this.claim() ) return - await this.firebase.trylock('block', 'Transaction_child_added') + // if ( !this.claim() ) return + // await this.firebase.trylock('block', 'Transaction_child_added') // array of pairs of transaction resource items let groupedTransactions: [TransactionResourceItem, TransactionResourceItem][] = [] // collection of transaction resource items let transactions = await TransactionResource.collect().collect() - await this.firebase.unlock('block') + // await this.firebase.unlock('block') // compare each item await transactions.promiseMap(async transaction1 => { @@ -112,7 +114,7 @@ export class Transaction extends Unit { return false }) - await this.firebase.trylock('block', 'Transaction_submitTransactions') + // await this.firebase.trylock('block', 'Transaction_submitTransactions') for (const group of groupedTransactions) { const block = await this.blockchain.submitTransactions(group) @@ -123,9 +125,9 @@ export class Transaction extends Unit { await this.firebase.ref("transaction").child(group[1].firebaseID).remove() } - this.release() - await this.firebase.unlock('block') - }) + // this.release() + // await this.firebase.unlock('block') + })*/ } /**