From 9de72da49569cbed85eba7d00149487076256be2 Mon Sep 17 00:00:00 2001 From: garrettmills Date: Wed, 2 Oct 2024 22:58:21 -0400 Subject: [PATCH] [WIP] Continue resolving concurrency issues. There are still some gremlins, but it is significantly more stable now when standing up multi-disk operations. --- src/app/cli/directive/StandupDirective.ts | 4 + .../controllers/api/v1/Volumes.controller.ts | 21 +-- ...18Z_CreateLocksTableMigration.migration.ts | 47 ++++++ src/app/models/Lock.model.ts | 88 ++++++++++ src/app/models/Node.model.ts | 91 +++++----- src/app/services/Provisioner.service.ts | 158 +++++++++--------- 6 files changed, 267 insertions(+), 142 deletions(-) create mode 100644 src/app/migrations/2024-10-03T02:07:23.618Z_CreateLocksTableMigration.migration.ts create mode 100644 src/app/models/Lock.model.ts diff --git a/src/app/cli/directive/StandupDirective.ts b/src/app/cli/directive/StandupDirective.ts index 4c64ffd..a75c973 100644 --- a/src/app/cli/directive/StandupDirective.ts +++ b/src/app/cli/directive/StandupDirective.ts @@ -4,6 +4,7 @@ import { Setting } from "../../models/Setting.model"; import { Provisioner } from "../../services/Provisioner.service"; import { IpAddress, isIpAddress, isSubnet, Subnet } from "../../types"; import * as crypto from 'crypto' +import {Lock, Locks} from '../../models/Lock.model' @Injectable() export class StandupDirective extends Directive { @@ -55,6 +56,9 @@ export class StandupDirective extends Directive { await Setting.set('sshPublicKey', sshPublicKey) await Setting.set('sshPrivateKey', sshPrivateKey) + // Setup global locks + await Lock.ensure(Locks.VMID) + // Store the IP address pool: const ipRange = this.container().makeNew(IpRange) ipRange.name = 'Default address pool' diff --git a/src/app/http/controllers/api/v1/Volumes.controller.ts b/src/app/http/controllers/api/v1/Volumes.controller.ts index 24a3c0c..a253658 100644 --- a/src/app/http/controllers/api/v1/Volumes.controller.ts +++ b/src/app/http/controllers/api/v1/Volumes.controller.ts @@ -15,20 +15,10 @@ export class Volumes extends Controller { private readonly provisioner!: Provisioner public async create(req: CreateVolume) { - const masterNode = await Node.getMaster() - - const handle = await masterNode.lock(`Creating volume ${req.name}`) - try { - let vol = await this.provisioner.createVolume(req.name, req.sizeInBytes, false) - await new Promise(res => setTimeout(res, 5000)) - - vol = await this.provisioner.unmountVolume(vol, false) - await new Promise(res => setTimeout(res, 5000)) - - return vol.toAPI() - } finally { - await masterNode.unlock(handle) - } + let vol = await this.provisioner.createVolume(req.name, req.sizeInBytes) + vol = await this.provisioner.unmountVolume(vol) + await new Promise(res => setTimeout(res, 2000)) + return vol.toAPI() } public async get(vol: Volume) { @@ -36,9 +26,8 @@ export class Volumes extends Controller { } public async delete(vol: Volume) { - // fixme: handle unmounting and when the vol is on a non-master node await this.provisioner.unmountVolume(vol) - await new Promise(res => setTimeout(res, 5000)) + await new Promise(res => setTimeout(res, 2000)) await this.provisioner.deleteVolume(vol) await vol.delete() return vol.toAPI() diff --git a/src/app/migrations/2024-10-03T02:07:23.618Z_CreateLocksTableMigration.migration.ts b/src/app/migrations/2024-10-03T02:07:23.618Z_CreateLocksTableMigration.migration.ts new file mode 100644 index 0000000..1baccc6 --- /dev/null +++ b/src/app/migrations/2024-10-03T02:07:23.618Z_CreateLocksTableMigration.migration.ts @@ -0,0 +1,47 @@ +import {Injectable, Migration, Inject, DatabaseService, FieldType} from '@extollo/lib' + +/** + * CreateLocksTableMigration + * ---------------------------------- + * Put some description here. + */ +@Injectable() +export default class CreateLocksTableMigration extends Migration { + @Inject() + protected readonly db!: DatabaseService + + /** + * Apply the migration. + */ + async up(): Promise { + const schema = this.db.get().schema() + const table = await schema.table('p5x_locks') + + table.primaryKey('id') + + table.column('resource') + .type(FieldType.varchar) + + table.column('lock_reason') + .type(FieldType.text) + .nullable() + + table.column('lock_owner') + .type(FieldType.varchar) + .nullable() + + await schema.commit(table) + } + + /** + * Undo the migration. + */ + async down(): Promise { + const schema = this.db.get().schema() + const table = await schema.table('p5x_locks') + + table.dropIfExists() + + await schema.commit(table) + } +} diff --git a/src/app/models/Lock.model.ts b/src/app/models/Lock.model.ts new file mode 100644 index 0000000..c8baccc --- /dev/null +++ b/src/app/models/Lock.model.ts @@ -0,0 +1,88 @@ +import {Injectable, Model, Field, FieldType, Maybe, TypeTag, uuid4, Logging, make} from '@extollo/lib' + +export type LockHandle = TypeTag<'p5x.LockHandle'> & string + +export enum Locks { + VMID = 'pve-next-vmid' +} + +/** + * Lock Model + * ----------------------------------- + * Put some description here. + */ +@Injectable() +export class Lock extends Model { + protected static table = 'p5x_locks' + protected static key = 'id' + + @Field(FieldType.serial) + public id?: number + + @Field(FieldType.varchar) + public resource!: string + + @Field(FieldType.text, 'lock_reason') + public lockReason?: string + + @Field(FieldType.varchar, 'lock_owner') + public lockOwner?: string + + public static async ensure(resource: string): Promise { + const inst = await this.query().where('resource', '=', resource).first() + if ( !inst ) { + const newInst = make(Lock) + newInst.resource = resource + await newInst.save() + } + } + + public static async tryLock(resource: string, reason: string): Promise> { + const log = make(Logging) + const handle = uuid4() as LockHandle + + this.query() + .where('resource', '=', resource) + .whereNull('lock_owner') + .update({ + lock_owner: handle, + lock_reason: reason, + }) + + const inst = await this.query() + .where('resource', '=', resource) + .first() + + if ( inst?.lockOwner === handle ) { + log.info(`Locked resource ${resource} (reason: ${reason})`) + return handle + } + + log.debug(`Failed to lock resource ${resource} for reason: ${reason} (owner: ${inst?.lockOwner} | reason: ${inst?.lockReason})`) + } + + public static async lock(resource: string, reason: string, maxTries = 30): Promise { + for ( let tryNum = 0; tryNum < maxTries; tryNum += 1 ) { + const handle = await this.tryLock(resource, reason) + if ( handle ) { + return handle + } + + await new Promise(res => setTimeout(res, 5000)) + } + + throw new Error(`Could not obtain lock on resource ${resource} in time - max retries exceeded`) + } + + public static async unlock(resource: string, handle: LockHandle): Promise { + this.query() + .where('resource', '=', resource) + .where('lock_owner', '=', handle) + .update({ + lock_owner: null, + lock_reason: null, + }) + + make(Logging).info(`Released lock ${handle} for resource ${resource}`) + } +} diff --git a/src/app/models/Node.model.ts b/src/app/models/Node.model.ts index 818f7e4..6f87f4b 100644 --- a/src/app/models/Node.model.ts +++ b/src/app/models/Node.model.ts @@ -7,8 +7,6 @@ import { UniversalPath, Awaitable, Maybe, - TypeTag, - uuid4, Inject, Logging, } from '@extollo/lib' import {IpAddress, Subnet} from '../types' import {Host, SSHHost} from '../support/hosts' @@ -17,8 +15,7 @@ import * as sshpk from 'sshpk' import {Setting} from './Setting.model' import {PCTHost} from '../support/hosts/PCTHost' import {Provisioner} from '../services/Provisioner.service' - -export type NodeLockHandle = TypeTag<'p5x.node-lock-handle'> & string +import {Lock, LockHandle} from './Lock.model' export class ConfigLines extends Collection { public nextNthValue(prefix: string): number { @@ -26,9 +23,9 @@ export class ConfigLines extends Collection { .filter(line => line.startsWith(prefix)) .map(line => parseInt(line.substring(prefix.length).split(':')[0], 10)) .sortDesc() - .first() || -1 + .first() - return maxValue + 1 + return typeof maxValue === 'undefined' ? 0 : maxValue + 1 } public getForKey(key: string): Maybe { @@ -83,6 +80,32 @@ export class Node extends Model { return master } + public static async tryLockFirstAvailable(reason: string): Promise<[Maybe, Maybe]> { + const nodes = await Node.query() + .where('is_permanent', '=', true) + .get() + .all() + + for ( const node of nodes ) { + const handle = await node.tryLock(reason) + if ( handle ) { + return [node, handle] + } + } + + return [undefined, undefined] + } + + public static async lockFirstAvailable(reason: string, tries: number = 30): Promise<[Node, LockHandle]> { + for ( let tryNum = 0; tryNum < tries; tryNum += 1 ) { + const [node, handle] = await this.tryLockFirstAvailable(reason) + if ( node && handle ) return [node, handle] + await new Promise(res => setTimeout(res, 5000)) + } + + throw new Error(`Could not lock first available node for reason "${reason}" - max tries exceeded`) + } + // @Inject() // protected readonly logging!: Logging @@ -116,6 +139,10 @@ export class Node extends Model { @Field(FieldType.varchar, 'lock_reason') public lockReason?: string + public get lockName(): string { + return `node-${this.id}` + } + public unqualifiedPVEHost(): string { return this.pveHost.split('/')[1] } @@ -133,12 +160,12 @@ export class Node extends Model { return new ConfigLines(content.split('\n')) } - public async putConfigLines(ctConfig: ConfigLines): Promise { + public async putConfigLines(ctConfig: Collection): Promise { const config = await this.getConfig() await config.write(ctConfig.join('\n').trim()) } - public async updateConfig(operator: (c: ConfigLines) => Awaitable): Promise { + public async updateConfig(operator: (c: ConfigLines) => Awaitable>): Promise { const config = await this.getConfig() const content = await config.read() const result = await operator(new ConfigLines(content.split('\n'))) @@ -174,51 +201,15 @@ export class Node extends Model { }, this.pveId) } - public async tryLock(reason: string): Promise> { - const handle = uuid4() as NodeLockHandle - - this.query() - .where('id', '=', this.id!) - .whereNull('lock_owner') - .update({ - lock_owner: handle, - lock_reason: reason, - }) - - const inst = await this.query() - .where('id', '=', this.id!) - .first() - - if ( inst?.lockOwner === handle ) { - this.logging.info(`Locked node ${this.id} (reason: ${reason})`) - return handle - } - - this.logging.debug(`Failed to lock node ${this.id} for reason: ${reason} (owner: ${this.lockOwner} | reason: ${this.lockReason})`) + public tryLock(reason: string): Promise> { + return Lock.tryLock(this.lockName, reason) } - public async lock(reason: string, maxTries = 30): Promise { - for ( let tryNum = 0; tryNum < maxTries; tryNum += 1 ) { - const handle = await this.tryLock(reason) - if ( handle ) { - return handle - } - - await new Promise(res => setTimeout(res, 5000)) - } - - throw new Error(`Could not obtain lock on node ${this.id} in time - max retries exceeded`) + public lock(reason: string, maxTries = 30): Promise { + return Lock.lock(this.lockName, reason, maxTries) } - public async unlock(handle: NodeLockHandle): Promise { - this.query() - .where('id', '=', this.id!) - .where('lock_owner', '=', handle) - .update({ - lock_owner: null, - lock_reason: null, - }) - - this.logging.info(`Released lock ${handle} for node ${this.id}`) + public unlock(handle: LockHandle): Promise { + return Lock.unlock(this.lockName, handle) } } diff --git a/src/app/services/Provisioner.service.ts b/src/app/services/Provisioner.service.ts index a06887c..d0d916c 100644 --- a/src/app/services/Provisioner.service.ts +++ b/src/app/services/Provisioner.service.ts @@ -8,7 +8,7 @@ import { Logging, universalPath, uuid4, collect, ArrayElement, Collection, Maybe, SSHFilesystem, } from '@extollo/lib' -import { Node } from "../models/Node.model"; +import {ConfigLines, Node} from '../models/Node.model' import {Setting} from '../models/Setting.model' import {IpRange} from '../models/IpRange.model' import { Proxmox } from 'proxmox-api' @@ -20,6 +20,7 @@ import {User} from '../models/User.model' import {HostGroupHost} from '../models/HostGroupHost.model' import {HostGroup} from '../models/HostGroup.model' import {Volume} from '../models/Volume.model' +import {Lock, Locks} from '../models/Lock.model' export interface HostUsage { host: HostGroupHost, @@ -146,7 +147,7 @@ export class Provisioner { return volume } - public async unmountVolume(volume: Volume, shouldLock: boolean = false): Promise { + public async unmountVolume(volume: Volume): Promise { if ( !volume.mountpoint || !volume.mountpointIdentifier ) { this.logging.info(`Cannot unmount volume ${volume.volumeId}: not mounted`) return volume @@ -183,35 +184,22 @@ export class Provisioner { await pveHost.run(shellCommand(`nsenter --target ${parentPID} --mount /bin/bash -c 'umount /var/lib/lxc/.pve-staged-mounts/${volume.mountpointIdentifier}'`)) // Replace the disk's mountpoint with an unused disk - const pveFilesystem = await pveHost.getFilesystem() - - const handle = shouldLock ? await node.lock(`Unmounting volume ${volume.volumeId}`) : undefined + const qualifiedName = await volume.getQualifiedName() + const handle = await node.lock(`Unmounting volume ${volume.volumeId}`) try { - const ctConfig = pveFilesystem.getPath(`/etc/pve/lxc/${node.pveId}.conf`) - const ctConfigLines = await ctConfig.read() - .then(x => x.split('\n')) - .then(x => collect(x)) - - const maxUnused = ctConfigLines - .filter(line => line.startsWith('unused')) - .map(line => parseInt(line.substring('unused'.length) - .split(':')[0], 10)) - .sortDesc() - .first() || -1 - - const newConfigLines = await ctConfigLines - .promiseMap(async line => { + let nextUnused: number + await node.updateConfig((c: ConfigLines) => { + nextUnused = c.nextNthValue('unused') + return c.map(line => { if (!line.startsWith(volume.mountpointIdentifier!)) { return line } - return `unused${maxUnused + 1}: ${await volume.getQualifiedName()}` + return `unused${nextUnused}: ${qualifiedName}` }) + }) - volume.mountpointIdentifier = `unused${maxUnused + 1}` - - // Update the container's config - await ctConfig.write(newConfigLines.join('\n')) + volume.mountpointIdentifier = `unused${nextUnused!}` } finally { if ( handle ) await node.unlock(handle) } @@ -239,23 +227,30 @@ export class Provisioner { } // Get the next vmid to be used by the LXC container - const carrierVMID = await proxmox.cluster.nextid.$get() + const name = `p5x-tmp-${uuid4()}` const ipRange = await IpRange.getDefault() const carrierIP = await ipRange.getNextAvailableOrFail() // this doesn't matter; we won't be booting it + const handle = await Lock.lock(Locks.VMID, `Provisioning carrier container for node ${node.hostname}`) + let createUPID: string + let carrierVMID: number + try { + carrierVMID = await proxmox.cluster.nextid.$get() - // Create the carrier LXC container - const name = `p5x-tmp-${uuid4()}` - const createUPID = await pveHost.lxc.$post({ - ostemplate: 'local:vztmpl/p5x-empty.tar.xz', - vmid: carrierVMID, - cores: 1, - description: 'Temporary container managed by P5x', - hostname: name, - memory: 16, // in MB - start: false, - storage: await Setting.loadOneRequired('pveStoragePool'), - tags: 'p5x', - }) + // Create the carrier LXC container + createUPID = await pveHost.lxc.$post({ + ostemplate: 'local:vztmpl/p5x-empty.tar.xz', + vmid: carrierVMID, + cores: 1, + description: 'Temporary container managed by P5x', + hostname: name, + memory: 16, // in MB + start: false, + storage: await Setting.loadOneRequired('pveStoragePool'), + tags: 'p5x', + }) + } finally { + await Lock.unlock(Locks.VMID, handle) + } this.logging.info('Waiting for PVE carrier container to be created') await this.waitForNodeTask(nodeName, createUPID) @@ -268,6 +263,7 @@ export class Provisioner { carrierNode.assignedSubnet = ipRange.subnet carrierNode.pveHost = node.pveHost await carrierNode.save() + await Lock.ensure(carrierNode.lockName) return carrierNode } @@ -318,38 +314,46 @@ export class Provisioner { // Get the next vmid to be used by the LXC container const nodeHostname = await this.getNextHostname() - const nodeVMID = await proxmox.cluster.nextid.$get() const nodeIP = await ipRange.getNextAvailableOrFail() const sshPubKey = sshpk.parseKey(await Setting.loadOneRequired('sshPublicKey')) .toString('ssh') - // Create a new LXC container on the node for the new instance - const nodeTaskUPID = await pveHost.lxc.$post({ - ostemplate: 'p5x.image-cache:vztmpl/p5x-base.tar.xz', - vmid: nodeVMID, - cores: await Setting.loadOneRequired('nodeCpus'), - description: 'P5x Worker Node', - hostname: nodeHostname, - memory: await Setting.loadOneRequired('nodeRamMib'), - net0: - objectToKeyValue({ - name: 'eth0', - bridge: await Setting.loadOneRequired('nodeNetworkBridge'), - firewall: 1, - gw: ipRange.gatewayIp, - ip: `${nodeIP}/${ipRange.subnet}`, - }) - .map(x => `${x.key}=${x.value}`) - .join(','), - onboot: true, - password: 'strongpassword', //rootPassword, // fixme - rootfs: `${pveStoragePool}:8`, // 8 GiB // fixme - searchdomain: await Setting.loadOneRequired('dnsDomain'), - 'ssh-public-keys': sshPubKey, - start: true, - storage: pveStoragePool, - tags: 'p5x', - }) + const handle = await Lock.lock(Locks.VMID, `Provisioning node ${nodeHostname}`) + let nodeTaskUPID: string + let nodeVMID: number + try { + nodeVMID = await proxmox.cluster.nextid.$get() + + // Create a new LXC container on the node for the new instance + nodeTaskUPID = await pveHost.lxc.$post({ + ostemplate: 'p5x.image-cache:vztmpl/p5x-base.tar.xz', + vmid: nodeVMID, + cores: await Setting.loadOneRequired('nodeCpus'), + description: 'P5x Worker Node', + hostname: nodeHostname, + memory: await Setting.loadOneRequired('nodeRamMib'), + net0: + objectToKeyValue({ + name: 'eth0', + bridge: await Setting.loadOneRequired('nodeNetworkBridge'), + firewall: 1, + gw: ipRange.gatewayIp, + ip: `${nodeIP}/${ipRange.subnet}`, + }) + .map(x => `${x.key}=${x.value}`) + .join(','), + onboot: true, + password: 'strongpassword', //rootPassword, // fixme + rootfs: `${pveStoragePool}:8`, // 8 GiB // fixme + searchdomain: await Setting.loadOneRequired('dnsDomain'), + 'ssh-public-keys': sshPubKey, + start: true, + storage: pveStoragePool, + tags: 'p5x', + }) + } finally { + await Lock.unlock(Locks.VMID, handle) + } this.logging.info('Waiting for PVE node to be created...') await this.waitForNodeTask(host!.pveHost.split('/')[1], nodeTaskUPID) @@ -364,6 +368,7 @@ export class Provisioner { node.isPermanent = false node.isMaster = false await node.save() + await Lock.ensure(node.lockName) // Wait for the host to come online (this will be a PCTHost passed through PVE) this.logging.info('Waiting for node to come online...') @@ -502,6 +507,7 @@ export class Provisioner { node.isPermanent = true node.isMaster = true await node.save() + await Lock.ensure(node.lockName) // Wait for the host to come online (this will be a PCTHost passed through PVE) this.logging.info('Waiting for master node to come online...') @@ -552,46 +558,46 @@ export class Provisioner { return node } - public async createVolume(name: string, sizeInBytes: number, shouldLock: boolean = true): Promise { + public async createVolume(name: string, sizeInBytes: number): Promise { this.logging.info(`Creating volume ${name} with size ${sizeInBytes / 1024}KiB...`) - const masterNode = await Node.getMaster() const api = await this.getApi() - const handle = shouldLock ? await masterNode.lock(`Creating volume ${name}`) : undefined + const [node, handle] = await Node.lockFirstAvailable(`Creating volume ${name}`) + this.logging.info(`Locked node ${node.hostname}`) const vol = this.container.makeNew(Volume) const storage = await Setting.loadOneRequired('pveStoragePool') let nextMountpoint: number try { - const ctConfig = await masterNode.getConfigLines() + const ctConfig = await node.getConfigLines() nextMountpoint = ctConfig.nextNthValue('mp') vol.name = name vol.sizeInBytes = sizeInBytes - vol.nodeId = masterNode.id! + vol.nodeId = node.id! vol.mountpoint = vol.getDefaultMountpoint() vol.mountpointIdentifier = `mp${nextMountpoint}` const provisionSizeInGiB = Math.max(Math.ceil(sizeInBytes / (1024 * 1024 * 1024)), 1) const line = `${storage}:${provisionSizeInGiB},mp=${vol.getDefaultMountpoint()},backup=1` - await api.nodes.$(masterNode.unqualifiedPVEHost()) + await api.nodes.$(node.unqualifiedPVEHost()) .lxc - .$(masterNode.pveId) + .$(node.pveId) .config .$put({[`mp${nextMountpoint}`]: line}) await new Promise(res => setTimeout(res, 5000)) } finally { - if ( handle ) await masterNode.unlock(handle) + if ( handle ) await node.unlock(handle) } - const ctConfig = await masterNode.getConfigLines() + const ctConfig = await node.getConfigLines() const mount = ctConfig.getForKey(`mp${nextMountpoint}`) if ( !mount ) { throw new Error('Could not find mountpoint config after creating volume!') } - vol.diskName = mount.substring(`${storage}:${masterNode.pveId}/`.length).split(',')[0] + vol.diskName = mount.substring(`${storage}:${node.pveId}/`.length).split(',')[0] await vol.save() this.logging.info(`Created volume ${vol.volumeId} (mpid: ${vol.mountpointIdentifier})`)