Start refactor to improve concurrent performance

main
Garrett Mills 3 years ago
parent d8e7a85122
commit 6f24822a5a
Signed by: garrettmills
GPG Key ID: D2BF5FBA8298F246

@ -8,8 +8,7 @@ import {Application} from "@extollo/lib"
* Base interface for an item in a Firebase RTDB collection. * Base interface for an item in a Firebase RTDB collection.
*/ */
export interface FirebaseResourceItem { export interface FirebaseResourceItem {
firebaseID: string; firebaseID?: string;
seqID: number;
} }
/** /**
@ -29,20 +28,10 @@ export class FirebaseResource<T extends FirebaseResourceItem> extends Iterable<T
return Application.getApplication().make<FirebaseUnit>(FirebaseUnit).ref(this.refName) return Application.getApplication().make<FirebaseUnit>(FirebaseUnit).ref(this.refName)
} }
/** Get the next sequential ID. */
async getNextID(): Promise<number> {
return new Promise<number>((res, rej) => {
this.ref().orderByChild('seqID')
.once('value', snapshot => {
res((this.resolveObject(snapshot.val()).reverse()?.[0]?.seqID ?? -1) + 1)
}, rej)
})
}
/** Get the record at the ith index. */ /** Get the record at the ith index. */
async at(i: number): Promise<T | undefined> { async at(i: number): Promise<T | undefined> {
return new Promise<T | undefined>((res, rej) => { return new Promise<T | undefined>((res, rej) => {
this.ref().orderByChild('seqID') this.ref().orderByKey()
.startAt(i).endAt(i) .startAt(i).endAt(i)
.once('value', snapshot => res(this.resolveObject(snapshot.val())[0]), rej) .once('value', snapshot => res(this.resolveObject(snapshot.val())[0]), rej)
}) })
@ -51,7 +40,7 @@ export class FirebaseResource<T extends FirebaseResourceItem> extends Iterable<T
/** Fetch an array of records in a range. */ /** Fetch an array of records in a range. */
async range(start: number, end: number): Promise<Collection<T>> { async range(start: number, end: number): Promise<Collection<T>> {
return new Promise<Collection<T>>((res, rej) => { return new Promise<Collection<T>>((res, rej) => {
this.ref().orderByChild('seqID') this.ref().orderByKey()
.startAt(start).endAt(end) .startAt(start).endAt(end)
.once('value', snapshot => { .once('value', snapshot => {
res(new Collection<T>(this.resolveObject(snapshot.val()))) res(new Collection<T>(this.resolveObject(snapshot.val())))
@ -61,34 +50,23 @@ export class FirebaseResource<T extends FirebaseResourceItem> extends Iterable<T
/** Count the items in the collection. */ /** Count the items in the collection. */
async count(): Promise<number> { async count(): Promise<number> {
console.log('[COUNT CALLED ON FIREBASE RESOURCE]')
return new Promise<number>((res, rej) => { return new Promise<number>((res, rej) => {
this.ref().orderByChild('seqID') this.ref().orderByKey()
.once('value', snapshot => { .once('value', snapshot => {
res(this.resolveObject(snapshot.val()).length) res(this.resolveObject(snapshot.val()).length)
}, rej) }, rej)
}) })
} }
findNextId(collection: FirebaseResourceItem[]) {
if ( !collection.length ) return 0
let maxSeq = -1
for ( const item of collection ) {
if ( !item ) continue
if ( !isNaN(item.seqID) && item.seqID > maxSeq ) {
maxSeq = item.seqID
}
}
return maxSeq + 1
}
/** /**
* Push a new item into the collection. * Push a new item into the collection.
* @param item * @param item
*/ */
async push(item: T): Promise<T> { async push(item: T): Promise<T> {
await this.ref().transaction((collection) => { await this.ref().push(item)
return item
/*await this.ref().transaction((collection) => {
if ( !collection ) collection = [] if ( !collection ) collection = []
if ( typeof collection === 'object' ) collection = Object.values(collection) if ( typeof collection === 'object' ) collection = Object.values(collection)
item.seqID = this.findNextId(collection) item.seqID = this.findNextId(collection)
@ -97,6 +75,7 @@ export class FirebaseResource<T extends FirebaseResourceItem> extends Iterable<T
delete item.firebaseID delete item.firebaseID
collection.push(this.filter(item)) collection.push(this.filter(item))
console.log('item seqID', collection)
return collection return collection
}) })
@ -111,7 +90,7 @@ export class FirebaseResource<T extends FirebaseResourceItem> extends Iterable<T
}) })
}) })
return item return item*/
} }
/** /**

@ -38,8 +38,7 @@ export class Blockchain extends Controller {
* most recent submission, that has NOT been accepted yet. * most recent submission, that has NOT been accepted yet.
*/ */
public async readBlockchainSubmission() { public async readBlockchainSubmission() {
const resultOfPeerRefresh = !!this.request.query.resultOfPeerRefresh return many((await this.blockchain.getSubmitChain()).map(x => {
return many((await this.blockchain.getSubmitChain(resultOfPeerRefresh)).map(x => {
// @ts-ignore // @ts-ignore
delete x.firebaseID delete x.firebaseID
return x return x
@ -62,7 +61,6 @@ export class Blockchain extends Controller {
public async postTransaction() { public async postTransaction() {
const item: TransactionResourceItem = { const item: TransactionResourceItem = {
firebaseID: '', firebaseID: '',
seqID: -1,
combinedHash: String(this.request.input('combinedHash')), combinedHash: String(this.request.input('combinedHash')),
timestamp: parseInt(String(this.request.input('timestamp'))), timestamp: parseInt(String(this.request.input('timestamp'))),
encodedGPSLocation: String(this.request.input('encodedGPSLocation')), encodedGPSLocation: String(this.request.input('encodedGPSLocation')),
@ -80,7 +78,6 @@ export class Blockchain extends Controller {
public async postExposure() { public async postExposure() {
const item: ExposureResourceItem = { const item: ExposureResourceItem = {
firebaseID: '', firebaseID: '',
seqID: -1,
clientID: String(this.request.input('clientID')), clientID: String(this.request.input('clientID')),
timestamp: parseInt(String(this.request.input('timestamp'))), timestamp: parseInt(String(this.request.input('timestamp'))),
} }
@ -94,12 +91,13 @@ export class Blockchain extends Controller {
if (!minTime) { if (!minTime) {
minTime = (new Date).getTime() - this.config.get('app.defaultTime') minTime = (new Date).getTime() - this.config.get('app.defaultTime')
} }
const snapshot = await (<BlockResource> this.make(BlockResource)).ref() const snapshot = await (<BlockResource> this.make(BlockResource)).ref()
.orderByChild('timestamp') .orderByChild('timestamp')
.startAt(minTime) .startAt(minTime)
.once('value') .once('value')
let blocks = (Object.values(snapshot.val()) as BlockResourceItem[]).filter((item: BlockResourceItem) => item.seqID !== 0) let blocks = (Object.values(snapshot.val()) as BlockResourceItem[])
return many(blocks) return many(blocks)
} }

@ -1,27 +1,32 @@
import { Singleton, Inject } from "@extollo/di" import {Inject, Singleton} from "@extollo/di"
import { Unit, Logging, Config, Application } from "@extollo/lib" import {Application, Config, Logging, Unit} from "@extollo/lib"
import { FirebaseUnit } from "./FirebaseUnit" import {FirebaseUnit} from "./FirebaseUnit"
import { import {
BlockEncounterTransaction, BlockEncounterTransaction,
BlockResource,
BlockResourceItem, BlockResourceItem,
BlockTransaction, BlockTransaction,
isBlockResourceItem isBlockResourceItem
} from "../rtdb/BlockResource" } from "../rtdb/BlockResource"
import { TransactionResourceItem } from "../rtdb/TransactionResource" import {TransactionResourceItem} from "../rtdb/TransactionResource"
import * as openpgp from "openpgp" import * as openpgp from "openpgp"
import * as crypto from "crypto" import * as crypto from "crypto"
import axios from "axios" import axios from "axios"
import { collect, uuid_v4 } from "@extollo/util" import {collect, uuid_v4} from "@extollo/util"
import {ExposureResourceItem} from "../rtdb/ExposureResource" import {ExposureResourceItem} from "../rtdb/ExposureResource"
import {PeerResource} from "../rtdb/PeerResource"
async function pgpVerify(armoredKey: string, armoredMessage: string) {
const [publicKeys, message] = await Promise.all([
openpgp.readKey({ armoredKey }),
openpgp.readMessage({ armoredMessage })
])
return !!(await (await openpgp.verify({ publicKeys, message })).signatures?.[0]?.verified)
}
/** /**
* Utility wrapper class for a block in the chain. * Utility wrapper class for a block in the chain.
*/ */
export class Block implements BlockResourceItem { export class Block implements BlockResourceItem {
firebaseID: string
seqID: number
uuid: string uuid: string
transactions: BlockTransaction[] transactions: BlockTransaction[]
timestamp: number timestamp: number
@ -36,14 +41,12 @@ export class Block implements BlockResourceItem {
} }
constructor(rec: BlockResourceItem) { constructor(rec: BlockResourceItem) {
this.firebaseID = rec.firebaseID this.uuid = rec.uuid || uuid_v4()
this.seqID = rec.seqID this.transactions = rec.transactions || []
this.uuid = rec.uuid this.lastBlockHash = rec.lastBlockHash || ''
this.transactions = rec.transactions this.lastBlockUUID = rec.lastBlockUUID || ''
this.lastBlockHash = rec.lastBlockHash
this.lastBlockUUID = rec.lastBlockUUID
this.proof = rec.proof this.proof = rec.proof
this.timestamp = rec.timestamp this.timestamp = rec.timestamp || (new Date).getTime()
this.waitTime = rec.waitTime this.waitTime = rec.waitTime
this.peer = rec.peer this.peer = rec.peer
} }
@ -57,16 +60,7 @@ export class Block implements BlockResourceItem {
const proof = this.proof const proof = this.proof
const publicKey = this.config.get("app.gpg.key.public") const publicKey = this.config.get("app.gpg.key.public")
const result = await openpgp.verify({ return pgpVerify(publicKey, proof)
publicKeys: await openpgp.readKey({
armoredKey: publicKey,
}),
message: await openpgp.readMessage({
armoredMessage: proof,
}),
})
return !!(await result.signatures?.[0]?.verified)
} }
/** Generate the hash for this block. */ /** Generate the hash for this block. */
@ -79,8 +73,6 @@ export class Block implements BlockResourceItem {
/** Cast the Block's data to a plain object. */ /** Cast the Block's data to a plain object. */
toItem(): BlockResourceItem { toItem(): BlockResourceItem {
return { return {
seqID: this.seqID,
firebaseID: this.firebaseID,
uuid: this.uuid, uuid: this.uuid,
transactions: this.transactions, transactions: this.transactions,
lastBlockHash: this.lastBlockHash, lastBlockHash: this.lastBlockHash,
@ -119,9 +111,9 @@ export interface Peer {
*/ */
@Singleton() @Singleton()
export class Blockchain extends Unit { export class Blockchain extends Unit {
private readonly MIN_WAIT_TIME = 10000 private readonly MIN_WAIT_TIME = 1000
private readonly MAX_WAIT_TIME = 20000 private readonly MAX_WAIT_TIME = 3000
private readonly PENALTY_INTERVAL = 1000 private readonly PENALTY_INTERVAL = 500
private readonly MAX_PEERS_PENALTY = 10 private readonly MAX_PEERS_PENALTY = 10
@Inject() @Inject()
@ -133,6 +125,12 @@ export class Blockchain extends Unit {
@Inject() @Inject()
protected readonly config!: Config protected readonly config!: Config
protected approvedChain: Block[] = []
protected peers: Peer[] = []
protected breakForExit = false
/** /**
* Block transactions that will be attempted as part of this host's * Block transactions that will be attempted as part of this host's
* next block submission. * next block submission.
@ -140,47 +138,87 @@ export class Blockchain extends Unit {
*/ */
protected pendingTransactions: BlockTransaction[] = [] protected pendingTransactions: BlockTransaction[] = []
protected pendingSubmit?: Block protected publicKey!: openpgp.Key
protected privateKey!: openpgp.Key
protected genesisProof!: string
protected isSubmitting: boolean = false protected nextWaitTime!: number
protected lastBlock!: Block
protected nextProof!: string
async up() { async up() {
this.logging.info('Generating OpenPGP assets...')
this.publicKey = await openpgp.readKey({
armoredKey: this.config.get("app.gpg.key.public")
})
this.privateKey = await openpgp.readKey({
armoredKey: this.config.get("app.gpg.key.private")
})
this.genesisProof = await openpgp.sign({
message: openpgp.Message.fromText('0000'),
privateKeys: this.privateKey,
})
this.logging.info('Performing initial load...')
await this.initialLoad()
this.logging.info('Contacting configured peers...')
const peers = this.config.get('server.peers') const peers = this.config.get('server.peers')
for ( const peer of peers ) { await Promise.all(peers.map((host: string) => this.registerPeer({ host })))
await this.registerPeer({
host: peer, this.logging.info('Performing initial writeback...')
}) await this.writeback()
} }
async down() {
this.breakForExit = true
}
async initialLoad() {
const [peers, chain] = await Promise.all([
this.firebase.ref('peers')
.once('value')
.then(val => val.val()),
this.firebase.ref('block')
.orderByKey()
.once('value')
.then(val => val.val())
])
this.logging.debug({peers, chain})
this.approvedChain = chain.map((item: BlockResourceItem) => new Block(item))
this.peers = peers || []
} }
/** /**
* Returns true if the given host is registered as a peer. * Returns true if the given host is registered as a peer.
* @param host * @param host
*/ */
public async hasPeer(host: string): Promise<boolean> { public hasPeer(host: string): boolean {
const peers = await this.getPeers() return this.peers.some(x => x.host === host)
return peers.some(peer => peer.host.toLowerCase() === host.toLowerCase())
} }
/** /**
* Get a list of all registered peers. * Get a list of all registered peers.
*/ */
public async getPeers(): Promise<Peer[]> { public getPeers(): Peer[] {
return PeerResource.collect().all() return this.peers
} }
/** /**
* From a peer, fetch the submission blockchain, if it is valid. * From a peer, fetch the submission blockchain, if it is valid.
* @param peer * @param peer
* @param resultOfPeerRefresh
*/ */
public async getPeerSubmit(peer: Peer, resultOfPeerRefresh: boolean): Promise<Block[] | undefined> { public async getPeerSubmit(peer: Peer): Promise<Block[] | undefined> {
try { try {
const result = await axios.get(`${peer.host}api/v1/chain/submit${resultOfPeerRefresh ? '?resultOfPeerRefresh=true' : ''}`) const result = await axios.get(`${peer.host}api/v1/chain/submit`)
const blocks: unknown = result.data?.data?.records const blocks: unknown = result.data?.data?.records
if ( Array.isArray(blocks) && blocks.every(block => { if ( Array.isArray(blocks) && blocks.every(block => {
const match = isBlockResourceItem(block) return isBlockResourceItem(block)
return match
}) ) { }) ) {
return blocks.map(x => new Block(x)) return blocks.map(x => new Block(x))
} }
@ -195,35 +233,19 @@ export class Blockchain extends Unit {
* @param peer * @param peer
*/ */
public async registerPeer(peer: Peer) { public async registerPeer(peer: Peer) {
if (!(await this.hasPeer(peer.host))) { if ( !this.hasPeer(peer.host) ) {
this.logging.info(`Registering peer: ${peer.host}`) this.logging.info(`Registering peer: ${peer.host}`)
await (<PeerResource> this.make(PeerResource)).push({
firebaseID: '',
seqID: -1,
name: peer.name,
host: peer.host,
})
const header = this.config.get('app.api_server_header') const header = this.config.get('app.api_server_header')
try { try {
console.log('return peering', [`${peer.host}api/v1/peer`, {
host: this.getBaseURL(),
}, {
headers: {
[header]: await this.getPeerToken(),
}
}])
await axios.post(`${peer.host}api/v1/peer`, { await axios.post(`${peer.host}api/v1/peer`, {
host: this.getBaseURL(), host: this.getBaseURL(),
}, { }, {
headers: { headers: {
[header]: await this.getPeerToken(), [header]: this.getPeerToken(),
'content-type': 'application/json', 'content-type': 'application/json',
} }
}) })
this.refresh(false)
} catch (e) { } catch (e) {
this.logging.error(e) this.logging.error(e)
} }
@ -239,21 +261,12 @@ export class Blockchain extends Unit {
const blocks = collect<Block>(chain) const blocks = collect<Block>(chain)
return ( return (
await blocks.promiseMap(async (block, idx) => { await blocks.promiseMap(async (block, idx) => {
if ( await block.isGenesis() ) {
return true
}
const previous: Block | undefined = blocks.at(idx - 1) const previous: Block | undefined = blocks.at(idx - 1)
if ( !previous ) { if ( !previous ) {
this.logging.debug(`Chain is invalid: block ${idx} is missing previous ${idx - 1}.`) this.logging.debug(`Chain is invalid: block ${idx} is missing previous ${idx - 1}.`)
return false; return false;
} }
if ( !(await this.validateProofOfWork(block, previous)) ) {
this.logging.debug(`Chain is invalid: block ${idx} failed proof of work validation`)
return false;
}
const pass = ( const pass = (
block.lastBlockUUID === previous.uuid block.lastBlockUUID === previous.uuid
&& block.lastBlockHash === previous.hash() && block.lastBlockHash === previous.hash()
@ -267,39 +280,43 @@ export class Blockchain extends Unit {
lastBlockHash: block.lastBlockHash, lastBlockHash: block.lastBlockHash,
computedLastHash: previous.hash(), computedLastHash: previous.hash(),
}) })
return false
} }
return pass if ( await block.isGenesis() ) {
return true
}
if ( !(await this.validateProofOfWork(block, previous)) ) {
this.logging.debug(`Chain is invalid: block ${idx} failed proof of work validation`)
return false;
}
return false
}) })
).every(Boolean) ).every(Boolean)
} }
public async refresh(resultOfPeerRefresh: boolean) { public async refresh() {
if ( this.isSubmitting ) { if ( this.breakForExit ) return;
return this.logging.debug('Called refresh().')
} else {
this.isSubmitting = true
}
await this.firebase.trylock('block', 'Blockchain_refresh')
const validSeqID = (await this.read()).reverse()[0]?.seqID
const peers = await this.getPeers() const peers = await this.getPeers()
const time_x_block: {[key: string]: Block} = {} const time_x_block: {[key: string]: Block} = {}
const time_x_blocks: {[key: string]: Block[]} = {} const time_x_blocks: {[key: string]: Block[]} = {}
const time_x_peer: {[key: string]: Peer | true} = {} const time_x_peer: {[key: string]: Peer | true} = {}
for ( const peer of peers ) { await Promise.all(peers.map(async peer => {
console.log('[PEERS]', peers) const blocks: Block[] | undefined = await this.getPeerSubmit(peer)
const blocks: Block[] | undefined = await this.getPeerSubmit(peer, resultOfPeerRefresh)
console.log('[PEER BLOCKS]', blocks)
if ( blocks && await this.validate(blocks) ) { if ( blocks && await this.validate(blocks) ) {
const block = blocks.reverse()[0] const block = blocks.slice(-1)[0]
if ( !block || block.seqID === validSeqID || !block.seqID ) continue if ( !block ) return // TODO fixme
const penalty = blocks.slice(0, 10) const penalty = blocks.slice(0, 10)
.map(block => block.peer === peer.host) .map(block => block.peer === peer.host)
.filter(Boolean).length * this.PENALTY_INTERVAL .filter(Boolean).length * this.PENALTY_INTERVAL
* (Math.min(peers.length, this.MAX_PEERS_PENALTY)) * (Math.min(peers.length, this.MAX_PEERS_PENALTY))
block.waitTime += penalty block.waitTime += penalty
@ -308,206 +325,120 @@ export class Blockchain extends Unit {
time_x_blocks[block.waitTime] = blocks.reverse() time_x_blocks[block.waitTime] = blocks.reverse()
time_x_peer[block.waitTime] = peer time_x_peer[block.waitTime] = peer
} else { } else {
console.log('VALIDATION FAIL') console.log('validation fail!')
} }
} }))
if ( this.pendingTransactions.length && !this.pendingSubmit ) { console.log(time_x_blocks, time_x_peer, time_x_block)
await this.attemptSubmit()
}
if ( this.pendingSubmit ) { const submitBlock = this.getSubmitBlock()
time_x_block[this.pendingSubmit.waitTime] = this.pendingSubmit if ( submitBlock ) {
time_x_peer[this.pendingSubmit.waitTime] = true time_x_block[submitBlock.waitTime] = submitBlock
time_x_peer[submitBlock.waitTime] = true
} }
console.log('submit block', submitBlock)
const min = Math.min(...Object.keys(time_x_block).map(parseFloat)) const min = Math.min(...Object.keys(time_x_block).map(parseFloat))
const block = time_x_block[min]
const peer = time_x_peer[min] const peer = time_x_peer[min]
console.log('peer?', peer)
if ( peer === true ) { if ( peer === true ) {
this.pendingSubmit = undefined // Our version of the chain was accepted
this.approvedChain.push(submitBlock!)
this.pendingTransactions = [] this.pendingTransactions = []
await (<BlockResource>this.app().make(BlockResource)).push(block) } else if ( peer ) {
} else { // A different server's chain was accepted
await this.firebase.ref('block').set((time_x_blocks[min] || []).map(x => { this.approvedChain = (time_x_blocks[min] || []).map(block => {
const item = x.toItem() if (!block.transactions) {
// @ts-ignore block.transactions = []
delete item.firebaseID
if ( !item.transactions ) {
item.transactions = []
} }
return item return block
})) })
/*await this.firebase.ref('block').transaction((_) => {
return (time_x_blocks[min] || []).map(x => {
const item = x.toItem()
// @ts-ignore
delete item.firebaseID
if ( !item.transactions ) {
item.transactions = []
}
return item
})
})*/
this.pendingSubmit = undefined
await this.attemptSubmit()
} }
await this.firebase.unlock('block') console.log('approved chain', this.approvedChain)
this.isSubmitting = false await this.writeback()
} }
public async getSubmitChain(resultOfPeerRefresh: boolean): Promise<BlockResourceItem[]> { public getSubmitChain(): BlockResourceItem[] {
await this.firebase.trylock('block', 'Blockchain_getSubmitChain') const submit = this.getSubmitBlock()
const blocks = await this.read() if ( !submit ) return this.approvedChain
const submit = await this.attemptSubmit() else return [...this.approvedChain, submit]
if ( submit ) {
submit.seqID = blocks.length > 0 ? collect<BlockResourceItem>(blocks).max('seqID') + 1 : 0
blocks.push(submit.toItem())
}
await this.firebase.unlock('block')
if ( !resultOfPeerRefresh ) {
this.refresh(true)
}
return blocks
} }
public async attemptSubmit() { public async writeback() {
if ( !this.pendingSubmit && this.pendingTransactions.length ) { if ( this.breakForExit ) return;
const lastBlock = await this.getLastBlock() this.logging.info('Generating initial proof-of-elapsed-time. This will take a second...')
const waitTime = this.random(this.MIN_WAIT_TIME, this.MAX_WAIT_TIME) this.nextWaitTime = this.random(this.MIN_WAIT_TIME, this.MAX_WAIT_TIME)
const proof = await this.generateProofOfWork(lastBlock, waitTime) this.lastBlock = this.getLastBlock()
this.nextProof = await this.generateProofOfWork(this.lastBlock, this.nextWaitTime)
const block: BlockResourceItem = {
timestamp: (new Date).getTime(),
uuid: uuid_v4(),
transactions: this.pendingTransactions,
lastBlockHash: lastBlock!.hash(),
lastBlockUUID: lastBlock!.uuid,
proof,
waitTime,
peer: this.getBaseURL(),
firebaseID: '',
seqID: -1,
}
this.pendingSubmit = new Block(block) console.log('writeback approved chain', this.approvedChain)
}
return this.pendingSubmit await Promise.all([
} this.firebase.ref('block').set(this.approvedChain.map(x => x.toItem())),
this.firebase.ref('peers').set(this.peers)
])
/** this.refresh()
* Submit a group of encounter transactions to be added to the chain. }
* @param group
*/
public async submitTransactions(group: [TransactionResourceItem, TransactionResourceItem]) {
const txes = group.map(item => this.getEncounterTransaction(item))
if ( this.pendingSubmit ) { public getSubmitBlock(): Block | undefined {
this.pendingSubmit.transactions.push(...txes) if ( !this.pendingTransactions?.length ) {
return
} }
this.pendingTransactions.push(...txes) return new Block({
this.refresh(false)
/*const lastBlock = await this.getLastBlock()
this.logging.verbose('Last block:')
this.logging.verbose(lastBlock)
const block: BlockResourceItem = {
timestamp: (new Date).getTime(), timestamp: (new Date).getTime(),
uuid: uuid_v4(), uuid: uuid_v4(),
transactions: group.map(item => this.getEncounterTransaction(item)), transactions: this.pendingTransactions,
lastBlockHash: lastBlock!.hash(), lastBlockHash: this.lastBlock.hash(),
lastBlockUUID: lastBlock!.uuid, lastBlockUUID: this.lastBlock.uuid,
proof: await this.generateProofOfWork(lastBlock!), proof: this.nextProof,
waitTime: this.nextWaitTime,
firebaseID: '', peer: this.getBaseURL(),
seqID: -1, })
} }
await (<BlockResource>this.app().make(BlockResource)).push(block) /**
return new Block(block)*/ * Submit a group of encounter transactions to be added to the chain.
* @param groups
*/
public submitTransactions(...groups: [TransactionResourceItem, TransactionResourceItem][]) {
groups.forEach(group => {
const txes = group.map(item => this.getEncounterTransaction(item))
this.pendingTransactions.push(...txes)
})
} }
/** /**
* Submit the given exposure notifications onto the blockchain. * Submit the given exposure notifications onto the blockchain.
* @param exposures * @param exposures
*/ */
public async submitExposures(...exposures: ExposureResourceItem[]) { public submitExposures(...exposures: ExposureResourceItem[]) {
if ( this.pendingSubmit ) {
this.pendingSubmit.transactions.push(...exposures)
}
this.pendingTransactions.push(...exposures) this.pendingTransactions.push(...exposures)
this.refresh(false)
/*const lastBlock = await this.getLastBlock()
this.logging.verbose('Last block:')
this.logging.verbose(lastBlock)
const block: BlockResourceItem = {
timestamp: (new Date).getTime(),
uuid: uuid_v4(),
transactions: exposures,
lastBlockHash: lastBlock!.hash(),
lastBlockUUID: lastBlock!.uuid,
proof: await this.generateProofOfWork(lastBlock),
firebaseID: '',
seqID: -1,
}
await (<BlockResource>this.app().make(BlockResource)).push(block)
return new Block(block)*/
} }
public async getPeerToken() { public getPeerToken() {
const message = openpgp.Message.fromText("0000") return Buffer.from(this.genesisProof, 'utf-8')
const privateKey = this.config.get("app.gpg.key.private") .toString('base64')
return Buffer.from((await openpgp.sign({
message,
// date: new Date(Date.now() - 30000),
privateKeys: await openpgp.readKey({
armoredKey: privateKey
}),
})), 'utf-8').toString('base64')
} }
/** /**
* Instantiate the genesis block of the entire chain. * Instantiate the genesis block of the entire chain.
*/ */
public async getGenesisBlock(): Promise<Block> { public getGenesisBlock(): Block {
const message = openpgp.Message.fromText("0000")
const privateKey = this.config.get("app.gpg.key.private")
return new Block({ return new Block({
timestamp: (new Date).getTime(), timestamp: (new Date).getTime(),
uuid: '0000', uuid: '0000',
transactions: [], transactions: [],
lastBlockHash: '', lastBlockHash: '',
lastBlockUUID: '', lastBlockUUID: '',
proof: (await openpgp.sign({ proof: this.genesisProof,
message,
// date: new Date(3000, 12),
privateKeys: await openpgp.readKey({
armoredKey: privateKey
}),
})),
firebaseID: '', firebaseID: '',
seqID: -1,
waitTime: 0, waitTime: 0,
peer: this.getBaseURL(), peer: this.getBaseURL(),
}) })
@ -516,20 +447,26 @@ export class Blockchain extends Unit {
/** /**
* Get the last block in the blockchain, or push the genesis if one doesn't already exist. * Get the last block in the blockchain, or push the genesis if one doesn't already exist.
*/ */
public async getLastBlock(): Promise<Block> { public getLastBlock(): Block {
const rec: BlockResourceItem | undefined = await BlockResource.collect().last() if ( !this.approvedChain ) {
if (rec) return new Block(rec) this.approvedChain = []
}
const genesis = (await this.getGenesisBlock()).toItem() const rec = this.approvedChain.slice(-1)[0]
await (<BlockResource>this.app().make(BlockResource)).push(genesis) if (rec) return rec
return new Block(genesis)
const genesis = this.getGenesisBlock()
this.approvedChain.push(genesis)
return genesis
} }
/** /**
* Get a list of all blocks in the chain, in order. * Get a list of all blocks in the chain, in order.
*/ */
public async read(): Promise<BlockResourceItem[]> { public read(): Promise<BlockResourceItem[]> {
return BlockResource.collect().all() return this.firebase.ref('block')
.once('value')
.then(snap => snap.val())
} }
/** /**
@ -553,19 +490,15 @@ export class Blockchain extends Unit {
*/ */
protected async generateProofOfWork(lastBlock: Block, waitTime: number): Promise<string> { protected async generateProofOfWork(lastBlock: Block, waitTime: number): Promise<string> {
const hashString = lastBlock.hash() const hashString = lastBlock.hash()
const privateKey = this.config.get("app.gpg.key.private")
const message = openpgp.Message.fromText(hashString) const message = openpgp.Message.fromText(hashString)
await this.sleep(waitTime) await this.sleep(waitTime)
// Sign the hash using the server's private key // Sign the hash using the server's private key
return (await openpgp.sign({ return openpgp.sign({
message, message,
// date: new Date(3000, 12), privateKeys: this.privateKey,
privateKeys: await openpgp.readKey({ })
armoredKey: privateKey,
})
}))
} }
/** /**
@ -574,20 +507,10 @@ export class Blockchain extends Unit {
* @param lastBlock * @param lastBlock
* @protected * @protected
*/ */
protected async validateProofOfWork(currentBlock: Block, lastBlock: Block): Promise<boolean> { protected validateProofOfWork(currentBlock: Block, lastBlock: Block): Promise<boolean> {
const proof = lastBlock.proof const proof = lastBlock.proof
const publicKey = this.config.get("app.gpg.key.public") const publicKey = this.config.get("app.gpg.key.public")
return pgpVerify(publicKey, proof)
const result = await openpgp.verify({
publicKeys: await openpgp.readKey({
armoredKey: publicKey,
}),
message: await openpgp.readMessage({
armoredMessage: proof,
}),
})
return !!(await result.signatures?.[0]?.verified)
} }
/** /**

@ -44,8 +44,8 @@ export class Exposure extends Unit {
* Subscribe to the transactions reference and wait for new transactions to be added. * Subscribe to the transactions reference and wait for new transactions to be added.
*/ */
public async up() { public async up() {
this.firebase.ref('exposure').on('child_added', async (snapshot) => { this.firebase.ref('exposure').on('child_added', (snapshot) => {
this.logging.debug('Received child_added event for exposures reference.') /*this.logging.debug('Received child_added event for exposures reference.')
if ( !this.claim() ) return if ( !this.claim() ) return
// await this.firebase.trylock('block', 'Exposure_child_added') // await this.firebase.trylock('block', 'Exposure_child_added')
@ -58,7 +58,9 @@ export class Exposure extends Unit {
await (<ExposureResource> this.make(ExposureResource)).ref().child(snapshot.key).remove() await (<ExposureResource> this.make(ExposureResource)).ref().child(snapshot.key).remove()
this.release() this.release()
// await this.firebase.unlock('block') // await this.firebase.unlock('block')*/
this.blockchain.submitExposures(snapshot.val())
}) })
} }
@ -67,6 +69,6 @@ export class Exposure extends Unit {
*/ */
public async down() { public async down() {
// Release all subscriptions before shutdown // Release all subscriptions before shutdown
this.firebase.ref("transaction").off() this.firebase.ref('exposure').off()
} }
} }

@ -14,9 +14,6 @@ import { Blockchain } from "../Blockchain"
*/ */
@Singleton() @Singleton()
export class Transaction extends Unit { export class Transaction extends Unit {
/** True if currently processing transactions. */
private processing: boolean = false
@Inject() @Inject()
protected readonly firebase!: FirebaseUnit protected readonly firebase!: FirebaseUnit
@ -26,64 +23,69 @@ export class Transaction extends Unit {
@Inject() @Inject()
protected readonly logging!: Logging protected readonly logging!: Logging
/** Claim the right to process transactions. Returns true if the right was granted. */ async compare(t1: TransactionResourceItem, t2: TransactionResourceItem) {
claim() { const [t2key, t1sig, t1key, t2sig] = await Promise.all([
if ( !this.processing ) { openpgp.readKey({
this.processing = true armoredKey: t2.partnerPublicKey
return true
}
return false
}
/** Release the right to claim transactions. */
release() {
this.processing = false
}
/**
* Given two transactions, determine whether the came from a valid interaction.
* That is, do the two transactions vouch for each-other cryptographically.
* @param transaction1
* @param transaction2
*/
public async compareTransactions(transaction1: TransactionResourceItem, transaction2: TransactionResourceItem) {
// verify signature
const result1 = await openpgp.verify({
publicKeys: await openpgp.readKey({
armoredKey: transaction2.partnerPublicKey
}), }),
message: await openpgp.readMessage({ openpgp.readMessage({
armoredMessage: transaction1.validationSignature, armoredMessage: t1.validationSignature,
}), }),
}) openpgp.readKey({
armoredKey: t1.partnerPublicKey
}),
openpgp.readMessage({
armoredMessage: t2.validationSignature,
}),
])
const result2 = await openpgp.verify({ const [r1, r2] = await Promise.all([
publicKeys: await openpgp.readKey({ openpgp.verify({
armoredKey: transaction1.partnerPublicKey publicKeys: t2key,
message: t1sig,
}), }),
message: await openpgp.readMessage({ openpgp.verify({
armoredMessage: transaction2.validationSignature, publicKeys: t1key,
message: t2sig,
}), }),
}) ])
const [v1, v2] = await Promise.all([
r1.signatures[0]?.verified,
r2.signatures[0]?.verified
])
return (await result1.signatures[0].verified) && (await result2.signatures[0].verified) return v1 && v2
} }
/** /**
* Subscribe to the transactions reference and wait for new transactions to be added. * Subscribe to the transactions reference and wait for new transactions to be added.
*/ */
public async up() { public async up() {
this.firebase.ref("transaction").on("child_added", async () => { this.firebase.ref('transaction').on('value', snapshot => {
if ( !Array.isArray(snapshot.val()) || snapshot.val().length < 2 ) return;
for ( const left of snapshot.val() ) {
for ( const right of snapshot.val() ) {
this.compare(left, right).then(match => {
if ( match ) {
this.blockchain.submitTransactions([left, right])
}
})
}
}
})
/*this.firebase.ref("transaction").on("child_added", async () => {
this.logging.debug('Received child_added event for transactions reference.') this.logging.debug('Received child_added event for transactions reference.')
if ( !this.claim() ) return // if ( !this.claim() ) return
await this.firebase.trylock('block', 'Transaction_child_added') // await this.firebase.trylock('block', 'Transaction_child_added')
// array of pairs of transaction resource items // array of pairs of transaction resource items
let groupedTransactions: [TransactionResourceItem, TransactionResourceItem][] = [] let groupedTransactions: [TransactionResourceItem, TransactionResourceItem][] = []
// collection of transaction resource items // collection of transaction resource items
let transactions = await TransactionResource.collect().collect() let transactions = await TransactionResource.collect().collect()
await this.firebase.unlock('block') // await this.firebase.unlock('block')
// compare each item // compare each item
await transactions.promiseMap(async transaction1 => { await transactions.promiseMap(async transaction1 => {
@ -112,7 +114,7 @@ export class Transaction extends Unit {
return false return false
}) })
await this.firebase.trylock('block', 'Transaction_submitTransactions') // await this.firebase.trylock('block', 'Transaction_submitTransactions')
for (const group of groupedTransactions) { for (const group of groupedTransactions) {
const block = await this.blockchain.submitTransactions(group) const block = await this.blockchain.submitTransactions(group)
@ -123,9 +125,9 @@ export class Transaction extends Unit {
await this.firebase.ref("transaction").child(group[1].firebaseID).remove() await this.firebase.ref("transaction").child(group[1].firebaseID).remove()
} }
this.release() // this.release()
await this.firebase.unlock('block') // await this.firebase.unlock('block')
}) })*/
} }
/** /**

Loading…
Cancel
Save