[WIP] Start trying to tackle concurrency issues (this is very, very broken)

This commit is contained in:
Garrett Mills 2024-10-02 00:09:11 -04:00
parent 0545c8337b
commit d53760b668
5 changed files with 184 additions and 69 deletions

View File

@ -15,13 +15,20 @@ export class Volumes extends Controller {
private readonly provisioner!: Provisioner private readonly provisioner!: Provisioner
public async create(req: CreateVolume) { public async create(req: CreateVolume) {
let vol = await this.provisioner.createVolume(req.name, req.sizeInBytes) const masterNode = await Node.getMaster()
const handle = await masterNode.lock(`Creating volume ${req.name}`)
try {
let vol = await this.provisioner.createVolume(req.name, req.sizeInBytes, false)
await new Promise<void>(res => setTimeout(res, 5000)) await new Promise<void>(res => setTimeout(res, 5000))
vol = await this.provisioner.unmountVolume(vol) vol = await this.provisioner.unmountVolume(vol, false)
await new Promise<void>(res => setTimeout(res, 5000)) await new Promise<void>(res => setTimeout(res, 5000))
return vol.toAPI() return vol.toAPI()
} finally {
await masterNode.unlock(handle)
}
} }
public async get(vol: Volume) { public async get(vol: Volume) {

View File

@ -49,6 +49,13 @@ export default class CreateP5xNodesTableMigration extends Migration {
.type(FieldType.varchar) .type(FieldType.varchar)
.required() .required()
table.column('lock_owner')
.type(FieldType.varchar)
.nullable()
table.column('lock_reason')
.type(FieldType.text)
await schema.commit(table) await schema.commit(table)
} }

View File

@ -1,4 +1,15 @@
import {Injectable, Model, Field, FieldType, Collection, UniversalPath, Awaitable, Maybe} from '@extollo/lib' import {
Injectable,
Model,
Field,
FieldType,
Collection,
UniversalPath,
Awaitable,
Maybe,
TypeTag,
uuid4, Inject, Logging,
} from '@extollo/lib'
import {IpAddress, Subnet} from '../types' import {IpAddress, Subnet} from '../types'
import {Host, SSHHost} from '../support/hosts' import {Host, SSHHost} from '../support/hosts'
import * as ssh2 from 'ssh2' import * as ssh2 from 'ssh2'
@ -7,6 +18,8 @@ import {Setting} from './Setting.model'
import {PCTHost} from '../support/hosts/PCTHost' import {PCTHost} from '../support/hosts/PCTHost'
import {Provisioner} from '../services/Provisioner.service' import {Provisioner} from '../services/Provisioner.service'
export type NodeLockHandle = TypeTag<'p5x.node-lock-handle'> & string
export class ConfigLines extends Collection<string> { export class ConfigLines extends Collection<string> {
public nextNthValue(prefix: string): number { public nextNthValue(prefix: string): number {
const maxValue = this const maxValue = this
@ -70,6 +83,9 @@ export class Node extends Model<Node> {
return master return master
} }
// @Inject()
// protected readonly logging!: Logging
@Field(FieldType.serial) @Field(FieldType.serial)
public id?: number public id?: number
@ -94,6 +110,12 @@ export class Node extends Model<Node> {
@Field(FieldType.varchar, 'pve_host') @Field(FieldType.varchar, 'pve_host')
public pveHost!: string public pveHost!: string
@Field(FieldType.varchar, 'lock_owner')
public lockOwner?: string
@Field(FieldType.varchar, 'lock_reason')
public lockReason?: string
public unqualifiedPVEHost(): string { public unqualifiedPVEHost(): string {
return this.pveHost.split('/')[1] return this.pveHost.split('/')[1]
} }
@ -151,4 +173,52 @@ export class Node extends Model<Node> {
password: await Setting.loadOneRequired('pveRootPassword'), password: await Setting.loadOneRequired('pveRootPassword'),
}, this.pveId) }, this.pveId)
} }
public async tryLock(reason: string): Promise<Maybe<NodeLockHandle>> {
const handle = uuid4() as NodeLockHandle
this.query()
.where('id', '=', this.id!)
.whereNull('lock_owner')
.update({
lock_owner: handle,
lock_reason: reason,
})
const inst = await this.query()
.where('id', '=', this.id!)
.first()
if ( inst?.lockOwner === handle ) {
this.logging.info(`Locked node ${this.id} (reason: ${reason})`)
return handle
}
this.logging.debug(`Failed to lock node ${this.id} for reason: ${reason} (owner: ${this.lockOwner} | reason: ${this.lockReason})`)
}
public async lock(reason: string, maxTries = 30): Promise<NodeLockHandle> {
for ( let tryNum = 0; tryNum < maxTries; tryNum += 1 ) {
const handle = await this.tryLock(reason)
if ( handle ) {
return handle
}
await new Promise<void>(res => setTimeout(res, 5000))
}
throw new Error(`Could not obtain lock on node ${this.id} in time - max retries exceeded`)
}
public async unlock(handle: NodeLockHandle): Promise<void> {
this.query()
.where('id', '=', this.id!)
.where('lock_owner', '=', handle)
.update({
lock_owner: null,
lock_reason: null,
})
this.logging.info(`Released lock ${handle} for node ${this.id}`)
}
} }

View File

@ -120,20 +120,25 @@ export class Provisioner {
public async mountVolume(volume: Volume, mountpoint?: string): Promise<Volume> { public async mountVolume(volume: Volume, mountpoint?: string): Promise<Volume> {
mountpoint = mountpoint || volume.getDefaultMountpoint() mountpoint = mountpoint || volume.getDefaultMountpoint()
// TODO Lock the container's config
const node = await volume.getNode() const node = await volume.getNode()
const handle = await node.lock(`Mounting volume ${volume.volumeId}`)
let nextMountpoint: number
try {
const ctConfig = await node.getConfigLines() const ctConfig = await node.getConfigLines()
const nextMountpoint = ctConfig.nextNthValue('mp') nextMountpoint = ctConfig.nextNthValue('mp')
// FIXME: unlock container config
const api = await this.getApi() const api = await this.getApi()
const line = `${await volume.getQualifiedName()},mp=${mountpoint},backup=1` const line = `${await volume.getQualifiedName()},mp=${mountpoint},backup=1`
await api.nodes.$(node.unqualifiedPVEHost()) await api.nodes.$(node.unqualifiedPVEHost())
.lxc.$(node.pveId) .lxc
.config.$put({ [`mp${nextMountpoint}`]: line }) .$(node.pveId)
.config
.$put({[`mp${nextMountpoint}`]: line})
} finally {
await node.unlock(handle)
}
volume.mountpointIdentifier = `mp${nextMountpoint}` volume.mountpointIdentifier = `mp${nextMountpoint}`
volume.mountpoint = mountpoint volume.mountpoint = mountpoint
@ -141,7 +146,7 @@ export class Provisioner {
return volume return volume
} }
public async unmountVolume(volume: Volume): Promise<Volume> { public async unmountVolume(volume: Volume, shouldLock: boolean = false): Promise<Volume> {
if ( !volume.mountpoint || !volume.mountpointIdentifier ) { if ( !volume.mountpoint || !volume.mountpointIdentifier ) {
this.logging.info(`Cannot unmount volume ${volume.volumeId}: not mounted`) this.logging.info(`Cannot unmount volume ${volume.volumeId}: not mounted`)
return volume return volume
@ -179,6 +184,9 @@ export class Provisioner {
// Replace the disk's mountpoint with an unused disk // Replace the disk's mountpoint with an unused disk
const pveFilesystem = await pveHost.getFilesystem() const pveFilesystem = await pveHost.getFilesystem()
const handle = shouldLock ? await node.lock(`Unmounting volume ${volume.volumeId}`) : undefined
try {
const ctConfig = pveFilesystem.getPath(`/etc/pve/lxc/${node.pveId}.conf`) const ctConfig = pveFilesystem.getPath(`/etc/pve/lxc/${node.pveId}.conf`)
const ctConfigLines = await ctConfig.read() const ctConfigLines = await ctConfig.read()
.then(x => x.split('\n')) .then(x => x.split('\n'))
@ -186,23 +194,27 @@ export class Provisioner {
const maxUnused = ctConfigLines const maxUnused = ctConfigLines
.filter(line => line.startsWith('unused')) .filter(line => line.startsWith('unused'))
.map(line => parseInt(line.substring('unused'.length).split(':')[0], 10)) .map(line => parseInt(line.substring('unused'.length)
.split(':')[0], 10))
.sortDesc() .sortDesc()
.first() || -1 .first() || -1
const newConfigLines = await ctConfigLines const newConfigLines = await ctConfigLines
.promiseMap(async line => { .promiseMap(async line => {
if ( !line.startsWith(volume.mountpointIdentifier!) ) { if (!line.startsWith(volume.mountpointIdentifier!)) {
return line return line
} }
return `unused${maxUnused+1}: ${await volume.getQualifiedName()}` return `unused${maxUnused + 1}: ${await volume.getQualifiedName()}`
}) })
volume.mountpointIdentifier = `unused${maxUnused+1}` volume.mountpointIdentifier = `unused${maxUnused + 1}`
// Update the container's config // Update the container's config
await ctConfig.write(newConfigLines.join('\n')) await ctConfig.write(newConfigLines.join('\n'))
} finally {
if ( handle ) await node.unlock(handle)
}
volume.save() volume.save()
return volume return volume
@ -540,32 +552,40 @@ export class Provisioner {
return node return node
} }
public async createVolume(name: string, sizeInBytes: number): Promise<Volume> { public async createVolume(name: string, sizeInBytes: number, shouldLock: boolean = true): Promise<Volume> {
this.logging.info(`Creating volume ${name} with size ${sizeInBytes / 1024}KiB...`) this.logging.info(`Creating volume ${name} with size ${sizeInBytes / 1024}KiB...`)
const masterNode = await Node.getMaster() const masterNode = await Node.getMaster()
const api = await this.getApi() const api = await this.getApi()
let ctConfig = await masterNode.getConfigLines() const handle = shouldLock ? await masterNode.lock(`Creating volume ${name}`) : undefined
const nextMountpoint = ctConfig.nextNthValue('mp')
// FIXME: unlock container config
const vol = this.container.makeNew<Volume>(Volume) const vol = this.container.makeNew<Volume>(Volume)
const storage = await Setting.loadOneRequired('pveStoragePool')
let nextMountpoint: number
try {
const ctConfig = await masterNode.getConfigLines()
nextMountpoint = ctConfig.nextNthValue('mp')
vol.name = name vol.name = name
vol.sizeInBytes = sizeInBytes vol.sizeInBytes = sizeInBytes
vol.nodeId = masterNode.id! vol.nodeId = masterNode.id!
vol.mountpoint = vol.getDefaultMountpoint() vol.mountpoint = vol.getDefaultMountpoint()
vol.mountpointIdentifier = `mp${nextMountpoint}` vol.mountpointIdentifier = `mp${nextMountpoint}`
const provisionSizeInGiB = Math.max(Math.ceil(sizeInBytes/(1024*1024*1024)), 1) const provisionSizeInGiB = Math.max(Math.ceil(sizeInBytes / (1024 * 1024 * 1024)), 1)
const storage = await Setting.loadOneRequired('pveStoragePool')
const line = `${storage}:${provisionSizeInGiB},mp=${vol.getDefaultMountpoint()},backup=1` const line = `${storage}:${provisionSizeInGiB},mp=${vol.getDefaultMountpoint()},backup=1`
await api.nodes.$(masterNode.unqualifiedPVEHost()) await api.nodes.$(masterNode.unqualifiedPVEHost())
.lxc.$(masterNode.pveId) .lxc
.config.$put({ [`mp${nextMountpoint}`]: line }) .$(masterNode.pveId)
.config
.$put({[`mp${nextMountpoint}`]: line})
ctConfig = await masterNode.getConfigLines() await new Promise<void>(res => setTimeout(res, 5000))
} finally {
if ( handle ) await masterNode.unlock(handle)
}
const ctConfig = await masterNode.getConfigLines()
const mount = ctConfig.getForKey(`mp${nextMountpoint}`) const mount = ctConfig.getForKey(`mp${nextMountpoint}`)
if ( !mount ) { if ( !mount ) {
throw new Error('Could not find mountpoint config after creating volume!') throw new Error('Could not find mountpoint config after creating volume!')
@ -573,6 +593,8 @@ export class Provisioner {
vol.diskName = mount.substring(`${storage}:${masterNode.pveId}/`.length).split(',')[0] vol.diskName = mount.substring(`${storage}:${masterNode.pveId}/`.length).split(',')[0]
await vol.save() await vol.save()
this.logging.info(`Created volume ${vol.volumeId} (mpid: ${vol.mountpointIdentifier})`)
return vol return vol
} }
@ -607,24 +629,33 @@ export class Provisioner {
// ASSUMPTION: both hosts reside on the same physical node // ASSUMPTION: both hosts reside on the same physical node
// First, figure out the new mountpointIdentifier // First, figure out the new mountpointIdentifier
const handle = await toNode.lock(`Transferring volume to node: ${vol.volumeId}`)
let newMountpointIdentifier: string
try {
let toNodeCtConfig = await toNode.getConfigLines() let toNodeCtConfig = await toNode.getConfigLines()
const newMountpointIdentifier = `unused${toNodeCtConfig.nextNthValue('unused')}` newMountpointIdentifier = `unused${toNodeCtConfig.nextNthValue('unused')}`
const fromNode = await vol.getNode() const fromNode = await vol.getNode()
const api = await this.getApi() const api = await this.getApi()
const upid = await api await api
.nodes.$(fromNode.unqualifiedPVEHost()) .nodes.$(fromNode.unqualifiedPVEHost())
.lxc.$(fromNode.pveId) .lxc
.move_volume.$post({ .$(fromNode.pveId)
.move_volume
.$post({
volume: vol.mountpointIdentifier as any, volume: vol.mountpointIdentifier as any,
'target-vmid': toNode.pveId, 'target-vmid': toNode.pveId,
'target-volume': newMountpointIdentifier as any, 'target-volume': newMountpointIdentifier as any,
}) })
} finally {
await toNode.unlock(handle)
}
// The API request technically returns a UPID, but waiting for move_volume UPIDs
// from the API is not yet supported. So do this nonsense instead.
await new Promise<void>(res => setTimeout(res, 5000)) await new Promise<void>(res => setTimeout(res, 5000))
// await this.waitForNodeTask(fromNode.pveHost, upid)
toNodeCtConfig = await toNode.getConfigLines() const toNodeCtConfig = await toNode.getConfigLines()
const mount = toNodeCtConfig.getForKey(newMountpointIdentifier) const mount = toNodeCtConfig.getForKey(newMountpointIdentifier)
if ( !mount ) { if ( !mount ) {
throw new Error('Could not find mountpoint config after transferring volume!') throw new Error('Could not find mountpoint config after transferring volume!')

View File

@ -40,7 +40,7 @@ export class CommandError extends ErrorWithContext {
} }
constructor(command: ShellCommand, result: ExecutionResult) { constructor(command: ShellCommand, result: ExecutionResult) {
super(`Unable to execute command: ${command}`, { command, result }) super(`Unable to execute command: ${command}: ${result.combinedOutput.join('\n')}`, { command, result })
} }
} }