[WIP] Start implenting mount/unmount/transfer

This commit is contained in:
Garrett Mills 2024-09-29 10:39:20 -04:00
parent 7a7c90801a
commit e164263106
9 changed files with 426 additions and 38 deletions

View File

@ -1,7 +1,8 @@
import {Controller, view, Inject, Injectable} from '@extollo/lib'
import {Controller, Inject, Injectable} from '@extollo/lib'
import {CreateVolume} from '../../../middlewares/api/v1/ValidCreateVolume.middleware'
import {Volume} from '../../../../models/Volume.model'
import {Provisioner} from '../../../../services/Provisioner.service'
import {Node} from '../../../../models/Node.model'
/**
* Volumes Controller
@ -14,22 +15,46 @@ export class Volumes extends Controller {
private readonly provisioner!: Provisioner
public async create(req: CreateVolume) {
const node = await this.provisioner.createVolume(req.name, req.sizeInBytes)
let vol = await this.provisioner.createVolume(req.name, req.sizeInBytes)
await new Promise<void>(res => setTimeout(res, 5000))
const vol = this.request.makeNew<Volume>(Volume)
vol.name = req.name
vol.sizeInBytes = req.sizeInBytes
vol.nodeId = node.id!
await vol.save()
vol = await this.provisioner.unmountVolume(vol)
await new Promise<void>(res => setTimeout(res, 5000))
return vol.toAPI()
}
public async get(vol: Volume) {
return vol.toAPI()
}
public async delete(vol: Volume) {
// fixme: handle unmounting and when the vol is on a non-master node
const node = await vol.getNode()
await this.provisioner.deleteVolume(vol.name, node.pveId)
await this.provisioner.unmountVolume(vol)
await new Promise(res => setTimeout(res, 5000))
await this.provisioner.deleteVolume(vol)
await vol.delete()
return vol.toAPI()
}
public async mount(vol: Volume) {
let mountpoint = vol.getDefaultMountpoint()
const rawMountpoint = this.request.input('mountpoint')
if ( rawMountpoint && typeof rawMountpoint === 'string' ) {
mountpoint = rawMountpoint
}
vol = await this.provisioner.mountVolume(vol, mountpoint)
return vol.toAPI()
}
public async unmount(vol: Volume) {
vol = await this.provisioner.unmountVolume(vol)
return vol.toAPI()
}
public async transfer(vol: Volume, toNode: Node) {
vol = await this.provisioner.transferVolume(vol, toNode)
return vol.toAPI()
}
}

View File

@ -0,0 +1,18 @@
import {ParameterMiddleware, Injectable, Either, ResponseObject, right, left, error} from '@extollo/lib'
import {Node} from '../../../../models/Node.model'
@Injectable()
export class NodeByName extends ParameterMiddleware<Node> {
async handle(): Promise<Either<ResponseObject, Node>> {
const name = this.request.safe('node').present().string()
const volume = await Node.query<Node>()
.where('hostname', '=', name)
.first()
if ( !volume ) {
return left(error(`Could not find node with the hostname "${name}"`))
}
return right(volume)
}
}

View File

@ -4,7 +4,7 @@ import {Volume} from '../../../../models/Volume.model'
@Injectable()
export class VolumeByName extends ParameterMiddleware<Volume> {
async handle(): Promise<Either<ResponseObject, Volume>> {
const name = this.request.safe('name').present().string()
const name = this.request.safe('volume').present().string()
const volume = await Volume.query<Volume>()
.where('name', '=', name)
.first()

View File

@ -3,6 +3,7 @@ import {Misc} from '../controllers/api/v1/Misc.controller'
import {Volumes} from '../controllers/api/v1/Volumes.controller'
import {ValidCreateVolume} from '../middlewares/api/v1/ValidCreateVolume.middleware'
import {VolumeByName} from '../middlewares/api/v1/VolumeByName.middleware'
import {NodeByName} from '../middlewares/api/v1/NodeByName.middleware'
Route.group('/api/v1', () => {
Route.get('/ready')
@ -12,7 +13,24 @@ Route.group('/api/v1', () => {
.parameterMiddleware(ValidCreateVolume)
.calls<Volumes>(Volumes, v => v.create)
Route.delete('/volumes/:name')
Route.get('/volumes/:volume')
.parameterMiddleware(VolumeByName)
.calls<Volumes>(Volumes, v => v.get)
Route.delete('/volumes/:volume')
.parameterMiddleware(VolumeByName)
.calls<Volumes>(Volumes, v => v.delete)
Route.post('/volumes/:volume/transfer-to/:node')
.parameterMiddleware(VolumeByName)
.parameterMiddleware(NodeByName)
.calls<Volumes>(Volumes, v => v.transfer)
Route.post('/volumes/:volume/mount')
.parameterMiddleware(VolumeByName)
.calls<Volumes>(Volumes, v => v.mount)
Route.post('/volumes/:volume/unmount')
.parameterMiddleware(VolumeByName)
.calls<Volumes>(Volumes, v => v.unmount)
})

View File

@ -21,6 +21,10 @@ export default class CreateVolumesTableMigration extends Migration {
table.column('name').type(FieldType.varchar)
table.column('size_in_bytes').type(FieldType.bigint)
table.column('pve_node_id').type(FieldType.int)
table.column('disk_name').type(FieldType.varchar).nullable()
table.column('mountpoint').type(FieldType.varchar).nullable()
table.column('mountpoint_identifier').type(FieldType.varchar).nullable()
table.column('mountpoint_host').type(FieldType.varchar).nullable()
await schema.commit(table)
}

View File

@ -1,4 +1,4 @@
import {Injectable, Model, Field, FieldType} from '@extollo/lib'
import {Injectable, Model, Field, FieldType, Collection, UniversalPath, Awaitable, Maybe} from '@extollo/lib'
import {IpAddress, Subnet} from '../types'
import {Host, SSHHost} from '../support/hosts'
import * as ssh2 from 'ssh2'
@ -7,6 +7,46 @@ import {Setting} from './Setting.model'
import {PCTHost} from '../support/hosts/PCTHost'
import {Provisioner} from '../services/Provisioner.service'
export class ConfigLines extends Collection<string> {
public nextNthValue(prefix: string): number {
const maxValue = this
.filter(line => line.startsWith(prefix))
.map(line => parseInt(line.substring(prefix.length).split(':')[0], 10))
.sortDesc()
.first() || -1
return maxValue + 1
}
public getForKey(key: string): Maybe<string> {
return this.filter(line => line.startsWith(`${key}:`))
.first()
?.substring(`${key}:`.length)
?.trim()
}
public removePending(line: string): ConfigLines {
let parts = this.join('\n')
.split('[pve:pending]')
if ( parts.length !== 2 ) {
return this
}
parts[1] = parts[1]
.split('\n')
.filter(l => !l.includes(line))
.join('\n')
.trim()
if ( !parts[1] ) {
return new ConfigLines(parts[0].split('\n'))
}
return new ConfigLines(parts.join('[pve:pending]\n').split('\n'))
}
}
/**
* Node Model
* -----------------------------------
@ -54,6 +94,35 @@ export class Node extends Model<Node> {
@Field(FieldType.varchar, 'pve_host')
public pveHost!: string
public unqualifiedPVEHost(): string {
return this.pveHost.split('/')[1]
}
public async getConfig(): Promise<UniversalPath> {
const pr = this.container().make<Provisioner>(Provisioner)
const pveHost = await pr.getPVEHost(this.pveHost.split('/')[1])
const pveFilesystem = await pveHost.getFilesystem()
return pveFilesystem.getPath(`/etc/pve/lxc/${this.pveId}.conf`)
}
public async getConfigLines(): Promise<ConfigLines> {
const config = await this.getConfig()
const content = await config.read()
return new ConfigLines(content.split('\n'))
}
public async putConfigLines(ctConfig: ConfigLines): Promise<void> {
const config = await this.getConfig()
await config.write(ctConfig.join('\n').trim())
}
public async updateConfig(operator: (c: ConfigLines) => Awaitable<ConfigLines>): Promise<void> {
const config = await this.getConfig()
const content = await config.read()
const result = await operator(new ConfigLines(content.split('\n')))
await config.write(result.join('\n').trim())
}
public async getHost(): Promise<Host> {
// Try to make a direct SSH connection to the container
const privKey = await Setting.loadOneRequired('sshPrivateKey')

View File

@ -1,5 +1,6 @@
import {Injectable, Model, Field, FieldType, JSONState, Maybe} from '@extollo/lib'
import {Node} from './Node.model'
import {Setting} from './Setting.model'
/**
* Volume Model
@ -23,6 +24,30 @@ export class Volume extends Model<Volume> {
@Field(FieldType.int, 'pve_node_id')
nodeId!: number
@Field(FieldType.varchar)
mountpoint?: string
@Field(FieldType.varchar, 'mountpoint_identifier')
mountpointIdentifier?: string
@Field(FieldType.varchar, 'mountpoint_host')
mountpointHost?: string
@Field(FieldType.varchar, 'disk_name')
diskName?: string
getDefaultMountpoint(): string {
return `/mnt/p5x-${this.name}`
}
async getQualifiedName(): Promise<string> {
const [storage, node] = await Promise.all([
Setting.loadOneRequired('pveStoragePool'),
this.getNode(),
])
return `${storage}:${node.pveId}/${this.diskName || 'pve-' + this.name + '.raw'}`
}
async getNode(): Promise<Node> {
const node = await Node.query<Node>()
.where('id', '=', this.nodeId)

View File

@ -6,7 +6,7 @@ import {
fetch,
Container,
Logging,
universalPath, uuid4, collect, ArrayElement, Collection, Maybe,
universalPath, uuid4, collect, ArrayElement, Collection, Maybe, SSHFilesystem,
} from '@extollo/lib'
import { Node } from "../models/Node.model";
import {Setting} from '../models/Setting.model'
@ -15,10 +15,11 @@ import { Proxmox } from 'proxmox-api'
import {Images, LXCImage} from './Images.service'
import {unsafeESMImport} from '@extollo/lib/lib/util/unsafe'
import * as sshpk from 'sshpk'
import {shellCommand, SSHHost} from '../support/hosts'
import {Host, shellCommand, SSHHost} from '../support/hosts'
import {User} from '../models/User.model'
import {HostGroupHost} from '../models/HostGroupHost.model'
import {HostGroup} from '../models/HostGroup.model'
import {Volume} from '../models/Volume.model'
export interface HostUsage {
host: HostGroupHost,
@ -117,6 +118,132 @@ export class Provisioner {
return `p5x-node-${(Math.random() + 1).toString(36).substring(9)}${nextNodeNum}`
}
public async mountVolume(volume: Volume, mountpoint?: string, idOffset: number = 0): Promise<Volume> {
mountpoint = mountpoint || volume.getDefaultMountpoint()
// TODO Lock the container's config
const node = await volume.getNode()
const ctConfig = await node.getConfigLines()
const nextMountpoint = ctConfig.nextNthValue('mp') + idOffset
// FIXME: unlock container config
const api = await this.getApi()
const line = `${await volume.getQualifiedName()},mp=${mountpoint},backup=1`
try {
await api.nodes.$(node.unqualifiedPVEHost())
.lxc.$(node.pveId)
.config.$put({ [`mp${nextMountpoint}`]: line })
} catch (e) {
if ( idOffset > 4 ) {
throw e
}
this.logging.error('Error mounting volume! Will retry with a higher mountpointIdentifier. Original error:')
this.logging.error(e)
await node.updateConfig(lines => lines.removePending(line))
return this.mountVolume(volume, mountpoint, idOffset + 1)
}
volume.mountpointIdentifier = `mp${nextMountpoint}`
volume.mountpoint = mountpoint
volume.save()
return volume
}
public async unmountVolume(volume: Volume): Promise<Volume> {
if ( !volume.mountpoint || !volume.mountpointIdentifier ) {
this.logging.info(`Cannot unmount volume ${volume.volumeId}: not mounted`)
return volume
}
// FIXME If the container is offline, skip the umount
// Unmount the disk's filesystem from the node
const node = await volume.getNode()
const nodeHost = await node.getHost()
await nodeHost.run(shellCommand(`umount "${volume.mountpoint}"`))
volume.mountpoint = undefined
// TODO Lock the container's config
// Replace the disk's mountpoint with an unused disk
const pveHost = await this.getPVEHost(node.pveHost.split('/')[1])
const pveFilesystem = await pveHost.getFilesystem()
const ctConfig = pveFilesystem.getPath(`/etc/pve/lxc/${node.pveId}.conf`)
const ctConfigLines = await ctConfig.read()
.then(x => x.split('\n'))
.then(x => collect(x))
const maxUnused = ctConfigLines
.filter(line => line.startsWith('unused'))
.map(line => parseInt(line.substring('unused'.length).split(':')[0], 10))
.sortDesc()
.first() || -1
const newConfigLines = await ctConfigLines
.promiseMap(async line => {
if ( !line.startsWith(volume.mountpointIdentifier!) ) {
return line
}
return `unused${maxUnused+1}: ${await volume.getQualifiedName()}`
})
volume.mountpointIdentifier = `unused${maxUnused+1}`
// Update the container's config and FIXME: unlock it
await ctConfig.write(newConfigLines.join('\n'))
volume.save()
return volume
}
public async provisionCarrierContainer(node: Node): Promise<string> {
// Get the Proxmox API!
const proxmox = await this.getApi()
const nodeName = node.pveHost.split('/')[1]
const pveHost = proxmox.nodes.$(nodeName)
// Ensure the empty filesystem template exists
const host = await node.getHost() // fixme this is wrong -- need pve host
const fs = await host.getFilesystem()
const stat = await fs.stat({ storePath: '/var/lib/vz/template/cache/p5x-empty.tar.xz' })
if ( !stat.exists ) {
await host.run(shellCommand('mkdir -p /tmp/p5x-empty-tmp'))
await host.run(shellCommand('touch /tmp/p5x-empty-tmp/placeholder.txt'))
await host.run(shellCommand('sh -c "cd /tmp/p5x-empty-tmp && tar cfJ /var/lib/vz/template/cache/p5x-empty.tar.xz placeholder.txt"'))
await host.run(shellCommand('rm -rf /tmp/p5x-empty-tmp'))
// FIXME: we should really upload this to our p5x.image-cache during standup
}
// Get the next vmid to be used by the LXC container
const carrierVMID = await proxmox.cluster.nextid.$get()
const ipRange = await IpRange.getDefault()
const carrierIP = await ipRange.getNextAvailableOrFail() // this doesn't matter; we won't be booting it
// Create the carrier LXC container
const name = `p5x-tmp-${uuid4()}`
const createUPID = await pveHost.lxc.$post({
ostemplate: 'local:vztmpl/p5x-empty.tar.xz',
vmid: carrierVMID,
cores: 1,
description: 'Temporary container managed by P5x',
hostname: name,
memory: 10, // in MB
start: false,
storage: await Setting.loadOneRequired('pveStoragePool'),
tags: 'p5x',
})
this.logging.info('Waiting for PVE carrier container to be created')
await this.waitForNodeTask(nodeName, createUPID)
this.logging.info(`Created carrier container: ${name}`)
return name
}
public async provisionNode(group: HostGroup): Promise<Node> {
// Look up a target node for provisioning
const host = await this.selectCandidate(group)
@ -382,38 +509,129 @@ export class Provisioner {
return node
}
public async createVolume(name: string, sizeInBytes: number): Promise<Node> {
public async createVolume(name: string, sizeInBytes: number, idOffset: number = 0): Promise<Volume> {
this.logging.info(`Creating volume ${name} with size ${sizeInBytes / 1024}KiB...`)
const masterNode = await Node.getMaster()
const api = await this.getApi()
const nodeApi = api.nodes.$(await Setting.loadOneRequired('pveMasterNode'))
const storage = nodeApi.storage.$(await Setting.loadOneRequired('pveStoragePool'))
const result = await storage.content.$post({
filename: `p5x-${name}.raw`,
size: `${sizeInBytes / 1024}`, // expected in KiB
vmid: masterNode.pveId,
})
this.logging.info(`Created volume ${name} with size ${sizeInBytes / 1024}KiB. Result: '${result}'`)
return masterNode
let ctConfig = await masterNode.getConfigLines()
const nextMountpoint = ctConfig.nextNthValue('mp') + idOffset
// FIXME: unlock container config
const vol = this.container.makeNew<Volume>(Volume)
vol.name = name
vol.sizeInBytes = sizeInBytes
vol.nodeId = masterNode.id!
vol.mountpoint = vol.getDefaultMountpoint()
vol.mountpointIdentifier = `mp${nextMountpoint}`
const provisionSizeInGiB = Math.max(Math.ceil(sizeInBytes/(1024*1024*1024)), 1)
const storage = await Setting.loadOneRequired('pveStoragePool')
const line = `${storage}:${provisionSizeInGiB},mp=${vol.getDefaultMountpoint()},backup=1`
try {
await api.nodes.$(masterNode.unqualifiedPVEHost())
.lxc.$(masterNode.pveId)
.config.$put({ [`mp${nextMountpoint}`]: line })
} catch (e) {
if ( idOffset > 4 ) {
throw e
}
public async deleteVolume(name: string, pveNodeId: number): Promise<void> {
this.logging.error('Encountered error while creating volume. Will retry. Original error:')
this.logging.error(e)
await masterNode.updateConfig(lines => lines.removePending(line))
return this.createVolume(name, sizeInBytes, idOffset + 1)
}
ctConfig = await masterNode.getConfigLines()
const mount = ctConfig.getForKey(`mp${nextMountpoint}`)
if ( !mount ) {
throw new Error('Could not find mountpoint config after creating volume!')
}
vol.diskName = mount.substring(`${storage}:${masterNode.pveId}/`.length).split(',')[0]
await vol.save()
return vol
}
public async deleteVolume(vol: Volume): Promise<void> {
// FIXME: this will need to account for the case when the volume is mounted on a different node than the master!
this.logging.info(`Deleting volume ${name}...`)
// FIXME: remove unusedX entry from config
this.logging.info(`Deleting volume ${vol.name}...`)
if ( !vol.mountpointIdentifier ) {
throw new Error(`Cannot unmount volume: ${vol.volumeId}: mountpointIdentifier is not set`)
}
const [api, masterNode, storage] = await Promise.all([
this.getApi(),
Setting.loadOneRequired('pveMasterNode'),
Setting.loadOneRequired('pveStoragePool'),
])
const api = await this.getApi()
const node = await vol.getNode()
const nodeApi = api.nodes.$(node.unqualifiedPVEHost())
await nodeApi.lxc.$(node.pveId)
.config.$put({
delete: vol.mountpointIdentifier,
})
const nodeApi = api.nodes.$(masterNode)
const storageApi = nodeApi.storage.$(storage)
const result = await storageApi.content.$(`${storage}:${pveNodeId}/p5x-${name}.raw`).$delete()
this.logging.info(`Deleted volume ${vol.name}`)
}
this.logging.info(`Waiting for task: ${result}`)
await this.waitForNodeTask(masterNode, result)
this.logging.info(`Deleted volume ${name}. ${result}`)
public async transferVolume(vol: Volume, toNode: Node): Promise<Volume> {
if ( vol.mountpoint ) {
throw new Error('Cannot transfer volume: volume is still mounted')
}
if ( !vol.mountpointIdentifier ) {
throw new Error('Cannot transfer volume: missing mountpointIdentifier')
}
// ASSUMPTION: both hosts reside on the same physical node
// First, figure out the new mountpointIdentifier
let toNodeCtConfig = await toNode.getConfigLines()
const newMountpointIdentifier = `unused${toNodeCtConfig.nextNthValue('unused')}`
const fromNode = await vol.getNode()
const api = await this.getApi()
const upid = await api
.nodes.$(fromNode.unqualifiedPVEHost())
.lxc.$(fromNode.pveId)
.move_volume.$post({
volume: vol.mountpointIdentifier as any,
'target-vmid': toNode.pveId,
'target-volume': newMountpointIdentifier as any,
})
await new Promise<void>(res => setTimeout(res, 5000))
// await this.waitForNodeTask(fromNode.pveHost, upid)
toNodeCtConfig = await toNode.getConfigLines()
const mount = toNodeCtConfig.getForKey(newMountpointIdentifier)
if ( !mount ) {
throw new Error('Could not find mountpoint config after transferring volume!')
}
const storage = await Setting.loadOneRequired('pveStoragePool')
vol.diskName = mount.substring(`${storage}:${toNode.pveId}/`.length).split(',')[0]
vol.mountpointIdentifier = newMountpointIdentifier
vol.nodeId = toNode.id!
await vol.save()
return vol
}
public async getPVEHost(pveHost: string): Promise<Host> {
const api = await this.getApi()
const ifaces = await api.nodes.$(pveHost).network.$get()
const hostAddr = ifaces.filter(x => x.address)[0]?.address
if ( !hostAddr ) {
throw new Error('Unable to determine real address for host: ' + pveHost)
}
return this.container.makeNew<Host>(SSHHost, {
host: hostAddr,
username: 'root',
password: await Setting.loadOneRequired('pveRootPassword'),
baseDir: '/',
})
}
protected async waitForNodeTask(nodeName: string, taskUPID: string): Promise<void> {

View File

@ -3,6 +3,17 @@ import {Application, Awaitable, Filesystem, SSHFilesystem} from '@extollo/lib'
import {ExecutionResult} from './ExecutionResult'
import {ShellCommand} from './types'
import * as ssh2 from 'ssh2'
import * as path from 'node:path'
export class SSHRootFilesystem extends SSHFilesystem {
protected storePath(storePath: string): string {
return path.join(storePath)
}
protected metadataPath(storePath: string): string {
return path.join('.meta-' + storePath + '.json')
}
}
export class SSHHost extends Host {
private sshClient?: ssh2.Client
@ -49,7 +60,7 @@ export class SSHHost extends Host {
getFilesystem(): Awaitable<Filesystem> {
if ( !this.filesystem ) {
this.filesystem = Application.getContainer().makeNew<SSHFilesystem>(SSHFilesystem, {
this.filesystem = Application.getContainer().makeNew<SSHRootFilesystem>(SSHRootFilesystem, {
ssh: this.config,
baseDir: '/',
})