Initial WIP

This commit is contained in:
Garrett Mills 2024-11-06 23:41:49 -05:00
commit cbddc5db18
27 changed files with 6203 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
/target
.idea
k
*.sqlite

4264
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

24
Cargo.toml Normal file
View File

@ -0,0 +1,24 @@
[package]
name = "p5x"
version = "0.1.0"
edition = "2021"
[dependencies]
env_logger = "0.11.5"
log = "0.4.22"
rocket = { version = "0.5.1", features = ["json"] }
sea-orm = { version = "1.1.0", features = ["sqlx-sqlite", "runtime-tokio-native-tls", "macros"] }
sea-orm-rocket = "0.5.4"
serde = { version = "1.0.214", features = ["derive"] }
tokio = { version = "1.0.0", features = ["rt", "rt-multi-thread", "macros"] }
async-trait = "0.1.83"
rocket_dyn_templates = { version = "0.2.0", features = ["handlebars"] }
sea-orm-migration = "1.1.0"
uuid = { version = "1.11.0", features = ["v4", "fast-rng"] }
ssh2 = "0.9.4"
reqwest = "0.12.9"
futures = { version = "0.3.31", features = ["executor"] }
serde_json = "1.0.132"
serde_urlencoded = "0.7.1"
proxmox-api = { git = "https://github.com/glmdev/p5x-proxmox-api", version = "0.1.2-pre", features = ["ureq-client"] }
ureq = "2.10.1"

5
Rocket.toml Normal file
View File

@ -0,0 +1,5 @@
[default]
template_dir = "resources/views"
[default.databases.p5x_api]
url = "sqlite://p5x_api.sqlite?mode=rwc"

View File

@ -0,0 +1,27 @@
<html lang="en">
<head>
<title>Settings | P5x</title>
</head>
<body>
<h1>Configure P5x</h1>
<small>Current config version: {{#if settings.id}}v{{settings.id}}{{^}}v0{{/if}}</small>
<form action="/configure" method="post">
<br><b>Proxmox VE</b><br>
<label for="pve_master_node">PVE Master Node Hostname:</label>
<input type="text" name="pve_master_node" id="pve_master_node" placeholder="node-name" value="{{{ settings.pve_master_node }}}"><br>
<label for="pve_api_host">PVE API Host:</label>
<input type="text" name="pve_api_host" id="pve_api_host" placeholder="192.168.1.X" value="{{{ settings.pve_api_host }}}"><br>
<label for="pve_root_password">PVE Root Password:</label>
<input type="password" name="pve_root_password" id="pve_root_password" value="{{{ settings.pve_root_password }}}"><br>
<label for="pve_storage_pool">PVE Storage Pool:</label>
<input type="text" name="pve_storage_pool" id="pve_storage_pool" value="{{{ settings.pve_storage_pool }}}"><br>
<small>Storage pool must be network-attached (Ceph/iSCSI). NFS is discouraged because it lacks proper support for locking.</small><br>
<br><button type="submit">Save</button>
</form>
</body>
</html>

126
src/api/cluster/carrier.rs Normal file
View File

@ -0,0 +1,126 @@
use std::path::Path;
use uuid::Uuid;
use proxmox_api::nodes::node::lxc;
use proxmox_api::nodes::node::lxc::vmid;
use proxmox_api::types::VmId;
use sea_orm::{ActiveModelTrait, EntityTrait, Set, TryIntoModel};
use crate::api::cluster::node::wait_upid;
use crate::api::entity::locks::lock_vmid;
use crate::api::entity::nodes;
use crate::api::entity::nodes::P5xError;
use crate::api::services::{ssh_run_trimmed, Services};
use crate::api::services::SshError;
pub async fn provision_carrier(
svc: &Services<'_>,
from_node: &nodes::Model,
) -> Result<nodes::Model, P5xError> {
// Make sure the empty filesystem template exists
ensure_carrier_template(svc, from_node).await?;
// Get the next VMID to be used by the carrier container
let _lock = lock_vmid(svc.db, Some("Provisioning carrier container"))
.await.map_err(P5xError::DbErr)?;
let pve_client = svc.pve()
.map_err(P5xError::ServiceError)?;
let pve = proxmox_api::cluster::ClusterClient::new(pve_client);
let vm_id = pve.nextid()
.get(Default::default())
.map_err(P5xError::PveError)?;
let vm_id = VmId::new(vm_id).unwrap();
// Build the new container params
let hostname = format!("carrier-{}", Uuid::new_v4().to_string());
let storage = svc.setting_req(|s| &s.pve_storage_pool)
.map_err(P5xError::ServiceError)?;
let mut params = lxc::PostParams::new("local:vztmpl/p5x-empty.tar.xz".to_string(), vm_id);
params.cores = Some(1);
params.description = Some("Temporary container managed by P5x".to_string());
params.hostname = Some(hostname.to_string());
params.memory = Some(16); // in MB, min 16
params.start = Some(false);
params.storage = Some(storage);
params.tags = Some("p5x".to_string());
// Ask the PVE API to start creating the carrier node based on our empty template
let upid = svc.pve_node(&from_node.pve_host)
.map_err(P5xError::ServiceError)?
.lxc()
.post(params)
.map_err(P5xError::PveError)?;
// Wait for the container creation task to finish
wait_upid(svc, &from_node.pve_host, &upid).await?;
// Create a new node instance
let node = nodes::ActiveModel {
hostname: Set(hostname),
pve_id: Set(i32::try_from(vm_id.get()).unwrap()),
pve_host: Set(from_node.pve_host.to_string()),
assigned_ip: Set("0.0.0.0".to_string()),
assigned_subnet: Set(0),
is_permanent: Set(false),
..Default::default()
}
.save(svc.db)
.await
.map_err(P5xError::DbErr)?;
node.try_into_model().map_err(P5xError::DbErr)
}
pub async fn ensure_carrier_template(
svc: &Services<'_>,
node: &nodes::Model,
) -> Result<(), P5xError> {
let pve_ssh = svc.pve_ssh(&node.pve_host)
.map_err(P5xError::ServiceError)?;
// Use SFTP to check whether the file exists
let path = Path::new("/var/lib/vz/template/cache/p5x-empty.tar.xz");
let exists = pve_ssh.sftp()
.and_then(|f| f.stat(path))
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
// The empty template already exists, so we're done
if exists.is_file() {
return Ok(());
}
// Initialize the template directly
// todo: we should really upload this to our p5x.image-cache during standup
ssh_run_trimmed(&pve_ssh, "mkdir -p /tmp/p5x-empty-tmp")?;
ssh_run_trimmed(&pve_ssh, "touch /tmp/p5x-empty-tmp/placeholder.txt")?;
ssh_run_trimmed(&pve_ssh, "sh -c 'cd /tmp/p5x-empty-tmp && tar cfJ /var/lib/vz/template/cache/p5x-empty.tar.xz placeholder.txt'")?;
ssh_run_trimmed(&pve_ssh, "rm -rf /tmp/p5x-empty-tmp")?;
Ok(())
}
pub async fn terminate_carrier(
svc: &Services<'_>,
carrier: nodes::Model,
) -> Result<(), P5xError> {
let mut params = vmid::DeleteParams::default();
params.purge = Some(true);
params.destroy_unreferenced_disks = Some(true);
let upid = svc.pve_node(&carrier.pve_host)
.map_err(P5xError::ServiceError)?
.lxc()
.vmid(carrier.vm_id())
.delete(params)
.map_err(P5xError::PveError)?;
wait_upid(svc, &carrier.pve_host, &upid).await?;
nodes::Entity::delete_by_id(carrier.id)
.exec(svc.db)
.await
.map_err(P5xError::DbErr)?;
Ok(())
}

3
src/api/cluster/mod.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod node;
pub mod carrier;
pub mod volume;

67
src/api/cluster/node.rs Normal file
View File

@ -0,0 +1,67 @@
use std::time::Duration;
use proxmox_api::nodes::node::lxc::vmid::migrate;
use proxmox_api::nodes::node::tasks::upid;
use sea_orm::{ActiveModelTrait, EntityTrait, IntoActiveModel, Set};
use tokio::time::sleep;
use crate::api::entity::nodes;
use crate::api::entity::nodes::P5xError;
use crate::api::services::Services;
pub async fn migrate_node(
svc: &Services<'_>,
node: nodes::Model,
to_host: &str,
) -> Result<nodes::Model, P5xError> {
// Ask the PVE API to start migrating the node
let params = migrate::PostParams::new(to_host.to_string());
let upid = svc.pve_node(&node.pve_host)
.map_err(P5xError::ServiceError)?
.lxc()
.vmid(node.vm_id())
.migrate()
.post(params)
.map_err(P5xError::PveError)?;
// Wait for the UPID to finish
wait_upid(svc, &node.pve_host, &upid).await?;
// Persist the node
let mut node = node.into_active_model();
node.pve_host = Set(to_host.to_string());
let node = node.save(svc.db).await.map_err(P5xError::DbErr)?;
nodes::Entity::find_by_id(node.id.unwrap())
.one(svc.db)
.await
.map_err(P5xError::DbErr)?
.ok_or(P5xError::BadPostcondition("Could not look up node after persisting"))
}
pub async fn wait_upid(svc: &Services<'_>, node: &str, upid: &str) -> Result<(), P5xError> {
info!("Waiting for UPID {upid} on node {node}");
let pve = svc.pve_node(node)
.map_err(P5xError::ServiceError)?;
loop {
let status = pve.tasks()
.upid(upid)
.status()
.get()
.map_err(P5xError::PveError)?;
if status.status == upid::status::Status::Running {
sleep(Duration::from_secs(1)).await;
continue;
}
if let Some(s) = status.exitstatus {
if s == "OK" {
info!("UPID {upid} on node {node} finished");
return Ok(());
}
error!("UPID {upid} on node {node} failed");
return Err(P5xError::UpidFailed(node.to_string(), upid.to_string()));
}
}
}

453
src/api/cluster/volume.rs Normal file
View File

@ -0,0 +1,453 @@
use std::cmp::max;
use std::time::Duration;
use log::{info, debug, warn, error};
use proxmox_api::nodes::node::lxc::vmid::config::PutParams;
use proxmox_api::nodes::node::lxc::vmid::move_volume;
use proxmox_api::types::VmId;
use proxmox_api::UreqError;
use sea_orm::*;
use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use crate::api::cluster::carrier::{provision_carrier, terminate_carrier};
use crate::api::cluster::node::migrate_node;
use crate::api::entity::nodes::{lock_first_available, P5xError};
use crate::api::entity::{nodes, volumes};
use crate::api::services::{ssh_run_trimmed, Services};
#[derive(Serialize, Deserialize, FromForm)]
pub struct VolumeParams {
pub id: Option<i32>,
pub name: String,
#[serde(default)]
pub size_in_bytes: i64,
pub mountpoint: Option<String>,
}
impl VolumeParams {
pub async fn resolve(svc: &Services<'_>, name: &str) -> Result<Option<VolumeParams>, DbErr> {
volumes::Entity::find()
.filter(volumes::Column::Name.eq(name))
.one(svc.db)
.await
.map(|v| v.map(|v| v.into()))
}
}
pub async fn create(
svc: &Services<'_>,
name: &str,
size_in_bytes: i64,
) -> Result<volumes::Model, P5xError> {
info!("Creating volume {name} with size {}KiB", size_in_bytes / 1024);
// Make sure the volume name is unique
let existing = volumes::Entity::find()
.filter(volumes::Column::Name.eq(name))
.one(svc.db)
.await
.map_err(P5xError::DbErr)?;
if let Some(_) = existing {
return Err(P5xError::BadPrecondition("Cannot create volume: a volume with the specified name already exists"));
}
// Lock a node in the k8s cluster
let reason = format!("Creating volume: {name}");
let (node, _lock) = lock_first_available(svc.db, Some(&reason))
.await.map_err(P5xError::LockErr)?;
// Get the next available mountpoint ID
let conf = node.config(svc)?;
let mp_id = conf.next_nth("mp");
info!("Volume {name} will become mp{mp_id} on node {} ({})", node.hostname, node.pve_id);
// Generate a new mountpoint entry for the node's config
let storage = svc.setting_req(|s| &s.pve_storage_pool)
.map_err(P5xError::ServiceError)?;
let size_in_gib = max((size_in_bytes as u64).div_ceil(1024 * 1024 * 1024) as i64, 1);
let line = format!("{storage}:{size_in_gib},mp=/mnt/p5x-{name},backup=1");
debug!("Volume {name}: {line}");
let mut params = PutParams::default();
params.mps.insert(mp_id, line);
// Update the node config to create the volume
debug!("Patching PVE config for volume {name}");
let vm_id = VmId::new(i64::from(node.pve_id)).unwrap();
let res = svc.pve_node(&node.pve_host)
.map_err(P5xError::ServiceError)?
.lxc()
.vmid(vm_id)
.config()
.put(params);
// This is necessary because PUT returns {data: null} on success,
// which the UreqClient flags as an unknown error.
if let Err(UreqError::EncounteredErrors(e)) = res {
return Err(P5xError::PveError(UreqError::EncounteredErrors(e)));
}
// Stupid hack is stupid, but we don't get back a UPID to wait on the vol to be created
info!("Successfully patched PVE config. Waiting for volume {name} to appear");
sleep(Duration::from_secs(5)).await;
// Load the updated config
debug!("Loading updated node config for volume {name}");
let conf = node.config(svc)?;
let mount = conf.get(&format!("mp{mp_id}"))
.ok_or(P5xError::BadPrecondition("Could not find mountpoint in config after creating volume"))?;
debug!("Found mountpoint details for volume {name}: {mount}");
// Parse the disk name from the config
let name_offset = format!("{storage}:{}/", node.pve_id).len() + 1;
let disk_name = mount[name_offset..].split(",").next().unwrap();
// Persist the volume
debug!("Inserting record into volumes table for volume {name}");
let res = volumes::ActiveModel {
name: Set(name.to_string()),
size_in_bytes: Set(size_in_bytes),
pve_node_id: Set(node.pve_id),
mountpoint: Set(Some(format!("/mnt/p5x-{name}"))),
mountpoint_identifier: Set(Some(format!("mp{mp_id}"))),
disk_name: Set(Some(disk_name.to_string())),
..Default::default()
}
.save(svc.db)
.await
.map_err(P5xError::DbErr)?;
info!("Wrote new volume {name} to volumes table");
let vol = res.try_into_model()
.map_err(P5xError::DbErr)?;
debug!("Volume {name} creation completed successfully");
Ok(vol)
}
pub async fn mount(
svc: &Services<'_>,
params: &VolumeParams,
) -> Result<(), P5xError> {
info!("Mounting volume {}", params.name);
if params.mountpoint.is_none() {
return Err(P5xError::BadPrecondition("Missing required mountpoint parameter"));
}
let mountpoint = params.mountpoint.as_ref().unwrap();
// Look up the volume instance
let vol = volumes::resolve(svc, params).await?;
// Make sure the volume isn't already mounted somewhere
if vol.mountpoint.is_some() {
return Err(P5xError::BadPrecondition("Tried to mount volume that is already mounted"));
}
// Lock the volume's node
let node = vol.node(svc).await?;
debug!("Locking node {} to mount volume {} at {}", node.hostname, params.name, mountpoint);
let reason = format!("Mounting volume {} at {}", params.name, mountpoint);
let _lock = node.lock(svc, Some(&reason));
// Find the next available mountpoint identifier
let qualified_name = vol.qualified_name(svc).await?;
let mountpoint_identifier = node.config(svc)?.next_nth("mp");
let mount_line = format!("{qualified_name},mp={mountpoint},backup=1");
// Patch the node's config to mount the volume
let pve_node = svc.pve_node(&node.pve_host)
.map_err(P5xError::ServiceError)?;
let mut put_params = PutParams::default();
put_params.mps.insert(mountpoint_identifier, mount_line);
debug!("Patching node config to mount volume {}", params.name);
let res = pve_node.lxc()
.vmid(node.vm_id())
.config()
.put(put_params);
// This is necessary because PUT returns {data: null} on success,
// which the UreqClient flags as an unknown error.
if let Err(UreqError::EncounteredErrors(e)) = res {
return Err(P5xError::PveError(UreqError::EncounteredErrors(e)));
}
// Persist the volume
debug!("Persisting mount changes to volume {} in database", params.name);
let mut vol = vol.into_active_model();
vol.mountpoint_identifier = Set(Some(format!("mp{mountpoint_identifier}")));
vol.mountpoint = Set(Some(mountpoint.to_string()));
vol.save(svc.db)
.await
.map_err(P5xError::DbErr)?;
info!("Successfully mounted volume {} at {} on {}", params.name, mountpoint, node.hostname);
Ok(())
}
pub async fn unmount(
svc: &Services<'_>,
params: &VolumeParams,
) -> Result<(), P5xError> {
info!("Unmounting volume {}", params.name);
// Look up the volume instance
let vol = volumes::resolve(svc, params).await?;
// Check that the volume is actually mounted
if vol.mountpoint.is_none() || vol.mountpoint_identifier.is_none() {
warn!("Tried to unmount volume {}#{}, but volume is not mounted", params.name, vol.volume_id);
return Ok(());
}
// Make sure we have the registered disk name
if vol.disk_name.is_none() {
return Err(P5xError::BadPrecondition("Tried to unmount volume without a disk_name set"));
}
// Unmount the disk's filesystem from within the K8s node
debug!("Unmounting volume {} ({}) from K8s host", vol.name, vol.mountpoint.as_ref().unwrap());
let cmd = format!("umount '{}'", vol.mountpoint.as_ref().unwrap());
let node = vol.node(svc).await?;
node.ssh_run_trimmed(svc, &cmd)?;
// Unmount the disk's filesystem from the PVE node's shadow tree (see UNMOUNT_NOTES below)
// -- Find the PID of the LXC container itself
debug!("Attempting to identify host PID for node {}", &node.pve_host);
let pve_ssh = svc.pve_ssh(&node.pve_host)
.map_err(P5xError::ServiceError)?;
let cmd = format!("lxc-info -n {} -p", node.pve_id);
let ct_pid = ssh_run_trimmed(&pve_ssh, &cmd)?;
let ct_pid = ct_pid
.split("PID:")
.nth(1)
.ok_or(P5xError::BadPrecondition("Failed to parse PID of node's LXC container"))?
.trim();
// -- Find the parent PID (where the shadow tree exists)
debug!("Attempting to identify parent PID for node {} (pid: {})", &node.pve_host, ct_pid);
let cmd = format!("ps -o ppid= -p {ct_pid}");
let parent_pid = ssh_run_trimmed(&pve_ssh, &cmd)?;
// -- Unmount the disk's filesystem from the shadow tree's mount namespace:
let mp_id = vol.mountpoint_identifier.clone().unwrap();
debug!("Unmounting volume {} from shadow namespace (pid: {}) {}", &vol.name, parent_pid, &mp_id);
let cmd = format!("nsenter --target {parent_pid} --mount /bin/bash -c 'umount /var/lib/lxc/.pve-staged-mounts/{}'", mp_id);
ssh_run_trimmed(&pve_ssh, &cmd)?;
// For LVM-type storage pools, we also need to deactivate the logical volume
let pool_name = svc.setting_req(|s| &s.pve_storage_pool).map_err(P5xError::ServiceError)?;
let pool_driver = svc.setting_req(|s| &s.pve_storage_driver).map_err(P5xError::ServiceError)?;
if pool_driver == "lvm" {
let cmd = format!("lvchange -aln '/dev/{pool_name}/{}'", vol.disk_name.as_ref().unwrap());
ssh_run_trimmed(&pve_ssh, &cmd)?;
}
// Patch the PVE node's config to mark the volume as an unused disk
let qualified_name = vol.qualified_name(svc).await?;
let lock_reason = format!("Unmounting volume {}", vol.volume_id);
let mut vol = vol.into_active_model();
node.mutate_config(
svc,
Some(&lock_reason),
|mut conf| {
let next_unused = conf.next_nth("unused");
conf.replace(
|s| s.starts_with(&mp_id),
|_| Some(format!("unused{next_unused}: {qualified_name}")),
);
vol.mountpoint_identifier = Set(Some(format!("unused{next_unused}")));
conf
}
)
.await?;
// Persist the volume changes
vol.mountpoint = Set(None);
vol.save(svc.db).await
.map_err(P5xError::DbErr)?;
info!("Successfully unmounted volume {} from node {}", &params.name, &node.hostname);
Ok(())
}
// UNMOUNT_NOTES
// Okay, here's some fucky-wucky shit:
// To avoid security vulnerabilities where hosts can umount their disks and break the
// firewall between CT and host, when a disk is mounted to a CT, Proxmox opens a clone
// of the mount in a special directory in the mount namespace of the container process'
// parent (i.e. the "monitor" process).
// If we umount the disk from the container, but not the monitor process, we won't be
// able to reattach any disks to the same mpX path (mp0, mp1, mp2, &c.)
// So to get around this, we (1) look up the container process, (2) figure out the
// monitor process, then (3) umount the clone.
// This was *such* a pain in the ass to figure out, but it's a testament to open-
// source that I was able to do it at all.
pub async fn delete(
svc: &Services<'_>,
params: &VolumeParams,
) -> Result<(), P5xError> {
info!("Deleting volume {}", params.name);
// Look up the existing volume
let vol = volumes::Entity::find()
.filter(volumes::Column::Name.eq(&params.name))
.one(svc.db)
.await
.map_err(P5xError::DbErr)?
.ok_or(P5xError::BadPrecondition("Could not delete volume: unable to find volume with that name"))?;
// Make sure we have a mountpoint identifier
if vol.mountpoint_identifier.is_none() {
return Err(P5xError::BadPrecondition("Could not delete volume: volume has no mountpoint identifier"));
}
// Patch the PVE config to delete the volume from the config
let node = vol.node(svc).await?;
debug!("Patching node {} config to delete volume {} ({})", node.hostname, params.name, vol.mountpoint_identifier.as_ref().unwrap());
let pve_node = svc.pve_node(&node.pve_host)
.map_err(P5xError::ServiceError)?;
let mut pve_params = PutParams::default();
pve_params.delete = vol.mountpoint_identifier;
let vm_id = VmId::new(i64::from(node.pve_id)).unwrap();
let r = pve_node
.lxc()
.vmid(vm_id)
.config()
.put(pve_params)
.map_err(P5xError::PveError);
if let Err(P5xError::PveError(proxmox_api::UreqError::Ureq(ureq::Error::Status(status, response)))) = r {
let json_str = response.into_string().unwrap();
error!("Error response from PVE API (status: {status}): {json_str}");
}
debug!("Successfully patched node {} config to delete volume {}", node.hostname, params.name);
// Persist the volume
volumes::Entity::delete_by_id(vol.volume_id)
.exec(svc.db)
.await
.map_err(P5xError::DbErr)?;
info!("Successfully deleted volume {}#{}", vol.name, vol.volume_id);
Ok(())
}
pub async fn transfer(
svc: &Services<'_>,
params: &VolumeParams,
to_node: &nodes::Model,
) -> Result<(), P5xError> {
// Look up the volume from the params
let vol = volumes::resolve(svc, params).await?;
let from_node = vol.node(svc).await?;
// If the volume already resides on to_node, we're done
if from_node.pve_id == to_node.pve_id {
return Ok(());
}
// If from_node and to_node are on the same physical host, transfer directly
if from_node.pve_host == to_node.pve_host {
transfer_directly(svc, vol, &from_node, to_node).await?;
return Ok(());
}
// If the nodes are on different physical hosts, we need to create a temporary
// container on shared storage to attach the volume to. We'll then migrate that
// container to the target physical host.
let carrier = provision_carrier(svc, &from_node).await?;
let vol = transfer_directly(svc, vol, &from_node, &carrier).await?;
let carrier = migrate_node(svc, carrier, &to_node.pve_host).await?;
let _vol = transfer_directly(svc, vol, &carrier, &to_node).await?;
terminate_carrier(svc, carrier).await?;
Ok(())
}
async fn transfer_directly(
svc: &Services<'_>,
vol: volumes::Model,
from_node: &nodes::Model,
to_node: &nodes::Model,
) -> Result<volumes::Model, P5xError> {
if vol.mountpoint.is_some() {
return Err(P5xError::BadPrecondition("Cannot transfer volume: volume is still mounted"));
}
if vol.mountpoint_identifier.is_none() {
return Err(P5xError::BadPrecondition("Cannot transfer volume: volume is missing mountpoint identifier"));
}
if from_node.pve_host != to_node.pve_host {
return Err(P5xError::BadPrecondition("Cannot transfer volume: nodes reside on different physical hosts"));
}
// Figure out the mountpoint identifier on the new node
let _lock = to_node.lock(svc, Some("Receiving volume transfer"))
.await.map_err(P5xError::LockErr)?;
let config = to_node.config(svc)?;
let mountpoint_identifier = config.next_nth("unused");
let mountpoint_identifier = format!("unused{mountpoint_identifier}");
// Ask the PVE API to move the volume to the new node
let old_mountpoint_identifier = vol.mountpoint_identifier.as_ref().unwrap().as_str();
let pve_from_vol = move_volume::Volume::try_from(old_mountpoint_identifier).unwrap();
let pve_to_vol = move_volume::TargetVolume::try_from(mountpoint_identifier.as_str()).unwrap();
let mut post_params = move_volume::PostParams::new(pve_from_vol);
post_params.target_volume = Some(pve_to_vol);
post_params.target_vmid = Some(to_node.vm_id());
let res = svc.pve_node(&from_node.pve_host)
.map_err(P5xError::ServiceError)?
.lxc()
.vmid(from_node.vm_id())
.move_volume()
.post(post_params);
// This is necessary because POST returns {data: null} on success,
// which the UreqClient flags as an unknown error.
if let Err(UreqError::EncounteredErrors(e)) = res {
return Err(P5xError::PveError(UreqError::EncounteredErrors(e)));
}
// Wait for the volume to transfer, since we don't get back a UPID
sleep(Duration::from_secs(5)).await;
// Verify that the volume appears in the new node's config
let config = to_node.config(svc)?;
let mount = config.get(&mountpoint_identifier)
.ok_or(P5xError::BadPostcondition("Could not find mountpoint config after transferring volume"))?;
// Parse the disk name from the config
let storage = svc.setting_req(|s| &s.pve_storage_pool)
.map_err(P5xError::ServiceError)?;
let name_offset = format!("{storage}:{}/", to_node.pve_id).len() + 1;
let disk_name = mount[name_offset..].split(",").next().unwrap();
// Persist the volume
let mut vol = vol.into_active_model();
vol.pve_node_id = Set(to_node.pve_id);
vol.mountpoint_identifier = Set(Some(mountpoint_identifier));
vol.disk_name = Set(Some(disk_name.to_string()));
let vol = vol.save(svc.db).await
.map_err(P5xError::DbErr)?;
volumes::Entity::find_by_id(vol.volume_id.unwrap())
.one(svc.db)
.await
.map_err(P5xError::DbErr)?
.ok_or(P5xError::BadPostcondition("Could not look up volume after persisting"))
}

View File

@ -0,0 +1,58 @@
use async_trait::async_trait;
use sea_orm_migration::{prelude::*, schema::*};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Settings::Table)
.if_not_exists()
.col(pk_auto(Settings::Id))
.col(string_null(Settings::PveMasterNode))
.col(string_null(Settings::PveApiHost))
.col(string_null(Settings::PveRootPassword))
.col(string_null(Settings::PveStoragePool))
.col(string_null(Settings::PveStorageDriver))
.col(string_null(Settings::DnsDomain))
.col(string_null(Settings::NodeNetworkBridge))
.col(big_unsigned_null(Settings::NodeCpus))
.col(big_unsigned_null(Settings::NodeRamInMib))
.col(string_null(Settings::RootPassword))
.col(text_null(Settings::SshPublicKey))
.col(text_null(Settings::SshPrivateKey))
.to_owned())
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(
Table::drop()
.table(Settings::Table)
.to_owned())
.await
}
}
#[derive(DeriveIden)]
enum Settings {
Table,
Id,
PveMasterNode,
PveApiHost,
PveRootPassword,
PveStoragePool,
PveStorageDriver,
DnsDomain,
NodeNetworkBridge,
NodeCpus,
NodeRamInMib,
RootPassword,
SshPublicKey,
SshPrivateKey,
}

View File

@ -0,0 +1,46 @@
use async_trait::async_trait;
use sea_orm_migration::{prelude::*, schema::*};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Nodes::Table)
.if_not_exists()
.col(pk_auto(Nodes::Id))
.col(string(Nodes::Hostname))
.col(integer(Nodes::PveId))
.col(string(Nodes::PveHost))
.col(string(Nodes::AssignedIp))
.col(integer(Nodes::AssignedSubnet))
.col(boolean(Nodes::IsPermanent).default(false))
.to_owned())
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(
Table::drop()
.table(Nodes::Table)
.to_owned())
.await
}
}
#[derive(DeriveIden)]
enum Nodes {
Table,
Id,
Hostname,
PveId,
PveHost,
AssignedIp,
AssignedSubnet,
IsPermanent,
}

View File

@ -0,0 +1,42 @@
use async_trait::async_trait;
use sea_orm_migration::{prelude::*, schema::*};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Locks::Table)
.if_not_exists()
.col(pk_auto(Locks::Id))
.col(string(Locks::LockType))
.col(string(Locks::LockResource))
.col(string_null(Locks::LockOwner))
.col(text_null(Locks::LockReason))
.to_owned())
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(
Table::drop()
.table(Locks::Table)
.to_owned())
.await
}
}
#[derive(DeriveIden)]
enum Locks {
Table,
Id,
LockType,
LockResource,
LockOwner,
LockReason
}

View File

@ -0,0 +1,48 @@
use async_trait::async_trait;
use sea_orm_migration::{prelude::*, schema::*};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Volumes::Table)
.if_not_exists()
.col(pk_auto(Volumes::VolumeId))
.col(string(Volumes::Name))
.col(big_unsigned(Volumes::SizeInBytes))
.col(integer(Volumes::PveNodeId))
.col(string_null(Volumes::MountpointIdentifier))
.col(string_null(Volumes::MountpointHost))
.col(string_null(Volumes::Mountpoint))
.col(string_null(Volumes::DiskName))
.to_owned())
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(
Table::drop()
.table(Volumes::Table)
.to_owned())
.await
}
}
#[derive(DeriveIden)]
enum Volumes {
Table,
VolumeId,
Name,
SizeInBytes,
PveNodeId,
MountpointIdentifier,
MountpointHost,
Mountpoint,
DiskName,
}

View File

@ -0,0 +1,35 @@
use async_trait::async_trait;
use rocket::{fairing, Build, Rocket};
use rocket::fairing::AdHoc;
pub use sea_orm_migration::prelude::*;
use sea_orm_rocket::Database;
use crate::api::Db;
mod m20241101_000001_create_settings_table;
mod m20241102_000001_create_nodes_table;
mod m20241102_000002_create_locks_table;
mod m20241103_000001_create_volumes_table;
pub struct Migrator;
#[async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20241101_000001_create_settings_table::Migration),
Box::new(m20241102_000001_create_nodes_table::Migration),
Box::new(m20241102_000002_create_locks_table::Migration),
Box::new(m20241103_000001_create_volumes_table::Migration),
]
}
}
async fn run_migrations(rocket: Rocket<Build>) -> fairing::Result {
let conn = &Db::fetch(&rocket).unwrap().conn;
let _ = Migrator::up(conn, None).await;
Ok(rocket)
}
pub(super) fn init() -> AdHoc {
AdHoc::try_on_ignite("Applying migrations", run_migrations)
}

58
src/api/db/mod.rs Normal file
View File

@ -0,0 +1,58 @@
mod migrations;
use std::time::Duration;
use rocket::figment::Figment;
use sea_orm;
use sea_orm_rocket::{Config, Database, Pool};
use async_trait::async_trait;
use rocket::fairing::AdHoc;
use sea_orm::ConnectOptions;
#[derive(Database, Debug)]
#[database("p5x_api")]
pub struct Db(DbPool);
#[derive(Debug, Clone)]
pub struct DbPool {
pub conn: sea_orm::DatabaseConnection,
}
#[async_trait]
impl Pool for DbPool {
type Connection = sea_orm::DatabaseConnection;
type Error = sea_orm::DbErr;
async fn init(figment: &Figment) -> Result<Self, Self::Error> {
let config = figment.extract::<Config>().unwrap();
let mut options: ConnectOptions = config.url.into();
options
.max_connections(config.max_connections as u32)
.min_connections(config.min_connections.unwrap_or_default())
.connect_timeout(Duration::from_secs(config.connect_timeout))
.sqlx_logging(config.sqlx_logging);
if let Some(idle_timeout) = config.idle_timeout {
options.idle_timeout(Duration::from_secs(idle_timeout));
}
let conn = sea_orm::Database::connect(options).await?;
Ok(DbPool { conn })
}
fn borrow(&self) -> &Self::Connection {
&self.conn
}
}
// async fn run_migrations(rocket: Rocket<Build>) -> fairing::Result {
// let conn = &Db::fetch(&rocket).unwrap().conn;
//
// }
pub fn init() -> AdHoc {
AdHoc::on_ignite("Setting up database pool", |rocket| async {
rocket.attach(Db::init())
.attach(migrations::init())
})
}

101
src/api/entity/locks.rs Normal file
View File

@ -0,0 +1,101 @@
use std::time::Duration;
use rocket::futures;
use sea_orm::ActiveValue::Set;
use sea_orm::entity::prelude::*;
use sea_orm::{IntoActiveModel};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use uuid::Uuid;
pub async fn try_lock<'a>(
db: &'a DatabaseConnection,
lock_type: &str,
lock_resource: &str,
lock_reason: Option<&str>,
) -> Result<Option<LockHandle<'a>>, DbErr> {
let owner = Uuid::new_v4();
let mut l = ActiveModel::new();
l.lock_owner = Set(Some(owner.to_string()));
l.lock_reason = Set(lock_reason.map(|s| s.to_string()));
Entity::update_many()
.filter(Column::LockOwner.is_null())
.filter(Column::LockType.eq(lock_type))
.filter(Column::LockResource.eq(lock_resource))
.set(l)
.exec(db)
.await?;
Entity::find()
.filter(Column::LockOwner.eq(owner.to_string()))
.filter(Column::LockType.eq(lock_type))
.filter(Column::LockResource.eq(lock_resource))
.one(db)
.await
.map(|lock|
lock.map(|lock | LockHandle::new(lock, db)))
}
pub async fn lock<'a>(
db: &'a DatabaseConnection,
lock_type: &str,
lock_resource: &str,
lock_reason: Option<&str>,
) -> Result<LockHandle<'a>, DbErr> {
loop {
let r = try_lock(db, lock_type, lock_resource, lock_reason).await?;
if let Some(r) = r {
return Ok(r);
}
sleep(Duration::from_secs(10)).await;
}
}
pub async fn lock_vmid<'a>(db: &'a DatabaseConnection, lock_reason: Option<&str>) -> Result<LockHandle<'a>, DbErr> {
lock(db, "global_vmid", "0", lock_reason).await
}
#[derive(Clone)]
pub struct LockHandle<'a> {
pub lock: Model,
db: &'a DatabaseConnection,
}
impl<'a> LockHandle<'a> {
async fn release(&mut self) {
debug!("Releasing {} {} lock", self.lock.lock_type, self.lock.lock_resource);
let mut l = self.lock.clone().into_active_model();
l.lock_owner = Set(None);
l.lock_reason = Set(None);
l.save(self.db).await.unwrap();
}
fn new(lock: Model, db: &'a DatabaseConnection) -> LockHandle<'a> {
LockHandle { lock, db }
}
}
impl<'a> Drop for LockHandle<'a> {
fn drop(&mut self) {
futures::executor::block_on(self.release());
}
}
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize, FromForm)]
#[sea_orm(table_name = "locks")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub lock_type: String,
pub lock_resource: String,
pub lock_owner: Option<String>,
pub lock_reason: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

4
src/api/entity/mod.rs Normal file
View File

@ -0,0 +1,4 @@
pub mod settings;
pub mod locks;
pub mod nodes;
pub mod volumes;

326
src/api/entity/nodes.rs Normal file
View File

@ -0,0 +1,326 @@
use std::fmt::Display;
use std::net::TcpStream;
use ssh2::{OpenFlags, OpenType, Session};
use std::io::{Read, Write};
use std::path::Path;
use std::time::Duration;
use sea_orm::entity::prelude::*;
use sea_orm::QueryOrder;
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use proxmox_api;
use log::{warn, debug};
use proxmox_api::types::VmId;
use crate::api::entity::{locks, settings};
use crate::api::entity::locks::{lock, try_lock, LockHandle};
use crate::api::services::{Services, ServiceError, SshError, ssh_run_trimmed};
#[derive(Debug)]
pub enum NodeLockErr {
DbErr(DbErr),
WaitTimeLimitExceeded,
}
#[derive(Debug)]
pub enum P5xError {
DbErr(DbErr),
LockErr(NodeLockErr),
ServiceError(ServiceError),
PveError(proxmox_api::UreqError),
SshError(SshError),
InvalidNetworkInterface,
BadPrecondition(&'static str),
BadPostcondition(&'static str),
UpidFailed(String, String),
}
pub async fn lock_first_available<'a>(
db: &'a DatabaseConnection,
lock_reason: Option<&str>,
) -> Result<(Model, locks::LockHandle<'a>), NodeLockErr> {
for _ in 0..600 {
let r = try_lock_first_available(db, lock_reason)
.await
.map_err(NodeLockErr::DbErr)?;
if let Some(r) = r {
return Ok(r);
}
sleep(Duration::from_secs(10)).await;
}
warn!("Failed to acquire a lock on ANY nodes before the timeout was exceeded!");
Err(NodeLockErr::WaitTimeLimitExceeded)
}
pub async fn try_lock_first_available<'a>(
db: &'a DatabaseConnection,
lock_reason: Option<&str>,
) -> Result<Option<(Model, locks::LockHandle<'a>)>, DbErr> {
debug!("Trying to lock first available node. Reason: {}", lock_reason.unwrap_or("(none)"));
// Get all permanent nodes
let nodes = Entity::find()
.filter(Column::IsPermanent.eq(true))
.order_by_asc(Column::Id)
.all(db)
.await?;
// Try to lock them and return the first successful one
for node in nodes {
let lock =
try_lock(db, "nodes", &node.id.to_string(), lock_reason)
.await?;
if let Some(lock) = lock {
info!("Locked node {} ({}): {}", node.hostname, node.pve_id, lock_reason.unwrap_or("(none)"));
return Ok(Some((node, lock)))
}
}
// No available nodes to lock
Ok(None)
}
pub struct PveConfig {
lines: Vec<String>,
}
impl PveConfig {
pub fn new(conf: &str) -> PveConfig {
let lines = conf.split("\n")
.map(|s| s.to_string())
.collect();
PveConfig { lines }
}
pub fn replace<F1, F2>(
&mut self,
matcher: F1,
replacer: F2,
)
where
F1: Fn(&String) -> bool,
F2: Fn(&String) -> Option<String>
{
self.map(|s: &String| {
if matcher(s) {
return replacer(s);
}
Some(s.to_string())
});
}
pub fn map<F>(
&mut self,
replacer: F
)
where
F: FnMut(&String) -> Option<String>
{
self.lines = self.lines
.iter()
.filter_map(replacer)
.collect();
}
pub fn get(&self, key: &str) -> Option<String> {
let key = format!("{key}:");
self.lines
.iter()
.filter(|line| line.starts_with(&key))
.next()
.map(|s| s.to_string())
}
pub fn next_nth(&self, prefix: &str) -> u32 {
let res = self.lines
.iter()
.filter(|line| line.starts_with(prefix))
.map(|line|
line[prefix.len()..]
.split(":")
.next()
.unwrap()
.parse::<u32>())
.filter_map(|idx| idx.ok())
.next();
if let Some(idx) = res {
return idx + 1;
}
0
}
}
impl Display for PveConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.lines.join("\n"))
}
}
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize, FromForm)]
#[sea_orm(table_name = "nodes")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub hostname: String,
pub pve_id: i32,
pub pve_host: String,
pub assigned_ip: String,
pub assigned_subnet: u32,
pub is_permanent: bool,
}
impl Model {
fn setting<F>(&self, svc: &Services, f: F) -> Result<String, P5xError>
where
F: FnOnce(&settings::Model) -> &Option<String> {
svc.setting_req(f).map_err(P5xError::ServiceError)
}
pub fn vm_id(&self) -> VmId {
VmId::new(i64::from(self.pve_id)).unwrap()
}
pub fn ssh(&self, svc: &Services) -> Result<Session, P5xError> {
let addr = format!("{}:22", self.assigned_ip);
let tcp = TcpStream::connect(addr)
.map_err(SshError::IoError)
.map_err(P5xError::SshError)?;
let mut sess = Session::new()
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
sess.set_tcp_stream(tcp);
sess.handshake()
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
let pubkey = self.setting(svc, |s| &s.ssh_public_key)?;
let privkey = self.setting(svc, |s| &s.ssh_private_key)?;
log::debug!("privkey: {privkey}");
sess.userauth_pubkey_memory("root", Some(&pubkey), &privkey, None)
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
if !sess.authenticated() {
return Err(P5xError::SshError(SshError::AuthFailed));
}
Ok(sess)
}
pub fn config(&self, svc: &Services) -> Result<PveConfig, P5xError> {
let pve_ssh = svc.pve_ssh(&self.pve_host)
.map_err(P5xError::ServiceError)?;
let path = format!("/etc/pve/lxc/{}.conf", self.pve_id);
let path = Path::new(&path);
let (mut remote_file, _) = pve_ssh.scp_recv(path)
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
let mut contents = String::new();
remote_file.read_to_string(&mut contents)
.map_err(SshError::IoError)
.map_err(P5xError::SshError)?;
// Close the channel and wait for the whole content to be transferred
remote_file.send_eof().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.wait_eof().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.close().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.wait_close().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
Ok(PveConfig::new(&contents))
}
pub fn write_config(&self, svc: &Services, conf: &PveConfig) -> Result<(), P5xError> {
let conf = conf.to_string();
let conf = conf.as_bytes();
let pve_ssh = svc.pve_ssh(&self.pve_host)
.map_err(P5xError::ServiceError)?;
// Write the file
let sftp = pve_ssh.sftp()
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
let path = format!("/etc/pve/lxc/{}.conf", self.pve_id);
let path = Path::new(&path);
debug!("Attempting to open file to mutate config {} on {}", path.display(), self.pve_host);
let mut f = sftp.open_mode(path, OpenFlags::WRITE, 0o640, OpenType::File)
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
debug!("Attempting to write config to {} on {}", path.display(), self.pve_host);
f.write_all(conf)
.map_err(SshError::IoError)
.map_err(P5xError::SshError)?;
/*let mut remote_file = pve_ssh.scp_send(path, 0o640, conf.len() as u64, None)
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
remote_file.write_all(conf).map_err(SshError::IoError).map_err(P5xError::SshError)?;*/
// Close the channel and wait for the whole content to be transferred
/*remote_file.send_eof().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.wait_eof().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.close().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.wait_close().map_err(SshError::ClientError).map_err(P5xError::SshError)?;*/
Ok(())
}
pub async fn try_lock<'a>(&self, svc: &Services<'a>, lock_reason: Option<&str>) -> Result<Option<LockHandle<'a>>, NodeLockErr> {
try_lock(svc.db, "nodes", &self.id.to_string(), lock_reason)
.await
.map_err(NodeLockErr::DbErr)
}
pub async fn lock<'a>(&self, svc: &Services<'a>, lock_reason: Option<&str>) -> Result<LockHandle<'a>, NodeLockErr> {
lock(svc.db, "nodes", &self.id.to_string(), lock_reason)
.await
.map_err(NodeLockErr::DbErr)
}
pub async fn mutate_config<F>(
&self,
svc: &Services<'_>,
lock_reason: Option<&str>,
mutator: F,
) -> Result<(), P5xError>
where
F: FnOnce(PveConfig) -> PveConfig
{
debug!("Mutating config on node {}", self.hostname);
let _lock = self.lock(svc, lock_reason)
.await.map_err(P5xError::LockErr)?;
let config = self.config(svc)?;
debug!("Successfully loaded config for mutate on node {}", self.hostname);
let config = mutator(config);
self.write_config(svc, &config)
}
pub fn ssh_run_trimmed(&self, svc: &Services, cmd: &str) -> Result<String, P5xError> {
let node_ssh = self.ssh(svc)?;
ssh_run_trimmed(&node_ssh, cmd)
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -0,0 +1,29 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize, FromForm)]
#[sea_orm(table_name = "settings")]
pub struct Model {
#[sea_orm(primary_key)]
#[field(default = 0)]
pub id: i32,
pub pve_master_node: Option<String>,
pub pve_api_host: Option<String>,
pub pve_root_password: Option<String>,
pub pve_storage_pool: Option<String>,
pub pve_storage_driver: Option<String>,
// pub dns_domain: Option<String>,
// pub node_network_bridge: Option<String>,
// pub node_cpus: Option<u64>,
// pub node_ram_in_mib: Option<u64>,
// pub root_password: Option<String>,
#[sea_orm(column_type = "Text")]
pub ssh_public_key: Option<String>,
#[sea_orm(column_type = "Text")]
pub ssh_private_key: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

83
src/api/entity/volumes.rs Normal file
View File

@ -0,0 +1,83 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
use crate::api::cluster::volume::VolumeParams;
use crate::api::entity::nodes;
use crate::api::entity::nodes::P5xError;
use crate::api::services::Services;
pub async fn resolve(
svc: &Services<'_>,
params: &VolumeParams,
) -> Result<Model, P5xError> {
let mut q = Entity::find();
match params.id {
Some(id) => q = q.filter(Column::VolumeId.eq(id)),
None => q = q.filter(Column::Name.eq(&params.name)),
};
q.one(svc.db)
.await
.map_err(P5xError::DbErr)?
.ok_or(P5xError::BadPrecondition("Could not resolve volume from params: query matched no results"))
}
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize, FromForm)]
#[sea_orm(table_name = "volumes")]
pub struct Model {
#[sea_orm(primary_key)]
pub volume_id: i32,
pub name: String,
pub size_in_bytes: i64,
pub pve_node_id: i32,
pub mountpoint_identifier: Option<String>,
pub mountpoint_host: Option<String>,
pub mountpoint: Option<String>,
pub disk_name: Option<String>,
}
impl Model {
pub async fn node(&self, svc: &Services<'_>) -> Result<nodes::Model, P5xError> {
nodes::Entity::find()
.filter(nodes::Column::PveId.eq(self.pve_node_id))
.one(svc.db)
.await
.map_err(P5xError::DbErr)?
.ok_or(P5xError::BadPrecondition("Could not find node for volume: pve_node_id does not match any nodes"))
}
pub async fn qualified_name(&self, svc: &Services<'_>) -> Result<String, P5xError> {
let node = self.node(svc).await?;
let storage = svc.setting_req(|s| &s.pve_storage_pool)
.map_err(P5xError::ServiceError)?;
let storage_type = svc.setting_req(|s| &s.pve_storage_driver)
.map_err(P5xError::ServiceError)?;
let disk_name = format!("pve-{}.raw", self.name);
let disk_name = self.disk_name.as_ref().unwrap_or(&disk_name);
let qn = match storage_type.as_str() {
"lvm" => format!("{storage}:{disk_name}"),
_ => format!("{storage}:{}/{disk_name}", node.pve_id),
};
Ok(qn)
}
}
impl Into<VolumeParams> for Model {
fn into(self) -> VolumeParams {
VolumeParams {
id: Some(self.volume_id),
name: self.name,
size_in_bytes: self.size_in_bytes,
mountpoint: self.mountpoint,
}
}
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

16
src/api/mod.rs Normal file
View File

@ -0,0 +1,16 @@
use rocket::fairing::AdHoc;
mod db;
mod route;
mod util;
mod cluster;
pub mod entity;
pub use db::Db;
pub mod services;
pub fn init() -> AdHoc {
AdHoc::on_ignite("mod(db)", |rocket| async {
rocket.attach(db::init())
.attach(route::init())
})
}

View File

@ -0,0 +1,51 @@
use rocket::fairing::AdHoc;
use rocket::form::Form;
use rocket::response::{status};
use rocket_dyn_templates::{context, Template};
use sea_orm::*;
use sea_orm_rocket::Connection;
use crate::api;
use crate::api::entity::settings;
use crate::api::util::raise_500;
async fn render(db: &DatabaseConnection) -> Result<Template, status::Custom<String>> {
let settings = settings::Entity::find().order_by_desc(settings::Column::Id).one(db).await.map_err(raise_500)?;
Ok(Template::render("settings", context! {
settings: settings,
}))
}
#[get("/")]
async fn get(
conn: Connection<'_, api::Db>,
) -> Result<Template, status::Custom<String>> {
let db = conn.into_inner();
render(db).await
}
#[post("/", data = "<input>")]
async fn save(
conn: Connection<'_, api::Db>,
input: Form<settings::Model>,
) -> Result<Template, status::Custom<String>> {
let db = conn.into_inner();
settings::ActiveModel {
pve_master_node: Set(input.pve_master_node.to_owned()),
pve_api_host: Set(input.pve_api_host.to_owned()),
pve_root_password: Set(input.pve_root_password.to_owned()),
pve_storage_pool: Set(input.pve_storage_pool.to_owned()),
..Default::default()
}
.save(db)
.await
.map_err(raise_500)?;
render(db).await
}
pub(super) fn init() -> AdHoc {
AdHoc::on_ignite("Routes: /configure", |rocket| async {
rocket.mount("/configure", routes![get, save])
})
}

11
src/api/route/mod.rs Normal file
View File

@ -0,0 +1,11 @@
use rocket::fairing::AdHoc;
mod configure;
mod volume;
pub(super) fn init() -> AdHoc {
AdHoc::on_ignite("Registering routes", |rocket| async {
rocket.attach(configure::init())
.attach(volume::init())
})
}

130
src/api/route/volume.rs Normal file
View File

@ -0,0 +1,130 @@
use rocket::fairing::AdHoc;
use rocket::response::status;
use rocket::serde::json::Json;
use sea_orm_rocket::Connection;
use sea_orm::*;
use crate::api;
use crate::api::cluster;
use crate::api::cluster::volume::VolumeParams;
use crate::api::entity::nodes;
use crate::api::entity::nodes::P5xError;
use crate::api::services::Services;
use crate::api::util::raise_500;
#[get("/<name>")]
async fn get_vol(
conn: Connection<'_, api::Db>,
name: &str,
) -> Result<Json<VolumeParams>, status::Custom<String>> {
let db = conn.into_inner();
let svc = Services::build(db).await.map_err(raise_500)?;
let vol = VolumeParams::resolve(&svc, name)
.await.map_err(raise_500)?
.ok_or(P5xError::BadPrecondition("Could not find a volume with that name"))
.map_err(raise_500)?;
Ok(Json(vol))
}
#[post("/", data = "<input>")]
async fn create_vol(
conn: Connection<'_, api::Db>,
input: Json<VolumeParams>,
) -> Result<Json<VolumeParams>, status::Custom<String>> {
let input = input.into_inner();
let db = conn.into_inner();
let svc = Services::build(db).await.map_err(raise_500)?;
let vol: VolumeParams = cluster::volume::create(&svc, &input.name, input.size_in_bytes)
.await
.map_err(raise_500)?
.into();
cluster::volume::unmount(&svc, &vol).await.map_err(raise_500)?;
Ok(Json(vol.into()))
}
#[delete("/<name>")]
async fn delete_vol(
conn: Connection<'_, api::Db>,
name: &str,
) -> Result<(), status::Custom<String>> {
let db = conn.into_inner();
let svc = Services::build(db).await.map_err(raise_500)?;
let vol = VolumeParams::resolve(&svc, name)
.await.map_err(raise_500)?
.ok_or(P5xError::BadPrecondition("Could not find a volume with that name"))
.map_err(raise_500)?;
cluster::volume::unmount(&svc, &vol).await.map_err(raise_500)?;
cluster::volume::delete(&svc, &vol).await.map_err(raise_500)?;
Ok(())
}
#[post("/mount", data = "<params>")]
async fn mount_vol(
conn: Connection<'_, api::Db>,
params: Json<VolumeParams>,
) -> Result<(), status::Custom<String>> {
let db = conn.into_inner();
let svc = Services::build(db).await.map_err(raise_500)?;
let params = params.into_inner();
cluster::volume::mount(&svc, &params).await.map_err(raise_500)?;
Ok(())
}
#[post("/unmount/<name>")]
async fn unmount_vol(
conn: Connection<'_, api::Db>,
name: &str,
) -> Result<(), status::Custom<String>> {
let db = conn.into_inner();
let svc = Services::build(db).await.map_err(raise_500)?;
let vol = VolumeParams::resolve(&svc, name)
.await.map_err(raise_500)?
.ok_or(P5xError::BadPrecondition("Could not find a volume with that name"))
.map_err(raise_500)?;
cluster::volume::unmount(&svc, &vol).await.map_err(raise_500)?;
Ok(())
}
#[post("/transfer/<name>/to/<node>")]
async fn transfer_vol(
conn: Connection<'_, api::Db>,
name: &str,
node: &str,
) -> Result<(), status::Custom<String>> {
let db = conn.into_inner();
let svc = Services::build(db).await.map_err(raise_500)?;
let vol = VolumeParams::resolve(&svc, name)
.await.map_err(raise_500)?
.ok_or(P5xError::BadPrecondition("Could not find a volume with that name"))
.map_err(raise_500)?;
let node = nodes::Entity::find()
.filter(nodes::Column::Hostname.eq(node))
.one(db)
.await.map_err(raise_500)?
.ok_or(P5xError::BadPrecondition("Could not find a node with that name"))
.map_err(raise_500)?;
cluster::volume::transfer(&svc, &vol, &node).await.map_err(raise_500)?;
Ok(())
}
pub(super) fn init() -> AdHoc {
AdHoc::on_ignite("Routes: /volume", |rocket| async {
rocket.mount("/volume", routes![create_vol, delete_vol, get_vol, mount_vol, unmount_vol, transfer_vol])
})
}

152
src/api/services.rs Normal file
View File

@ -0,0 +1,152 @@
use std::io::Read;
use std::net::TcpStream;
use proxmox_api::nodes::node::NodeClient;
use proxmox_api::UreqClient;
use sea_orm::{DatabaseConnection, DbErr, EntityTrait, QueryOrder};
use ssh2::Session;
use crate::api::entity::nodes::P5xError;
use crate::api::entity::settings;
#[derive(Debug)]
pub enum SshError {
IoError(std::io::Error),
ClientError(ssh2::Error),
CommandFailed(i32, String),
AuthFailed,
}
impl SshError {
pub fn command_failed(exit_code: i32, command: &str, output: &str) -> SshError {
SshError::CommandFailed(exit_code, format!("Command `{command}` failed: {output}"))
}
}
#[derive(Debug)]
pub enum ServiceError {
MissingSetting,
PveError(proxmox_api::UreqError),
InvalidNetworkInterface,
SshError(SshError),
}
pub struct Services<'a> {
pub settings: Option<settings::Model>,
pub db: &'a DatabaseConnection,
}
impl<'a> Services<'a> {
pub async fn build(db: &'a DatabaseConnection) -> Result<Services<'a>, DbErr> {
let settings = settings::Entity::find().order_by_desc(settings::Column::Id).one(db).await?;
Ok(Services {
db,
settings,
})
}
pub fn setting<F>(&self, f: F) -> Option<String>
where
F: FnOnce(&settings::Model) -> &Option<String>,
{
let model = self.settings.as_ref()?;
let res = f(model);
if let Some(res) = res {
return Some(res.clone());
}
None
}
pub fn setting_req<F>(&self, f: F) -> Result<String, ServiceError>
where
F: FnOnce(&settings::Model) -> &Option<String>
{
self.setting(f).ok_or(ServiceError::MissingSetting)
}
pub fn pve(&self) -> Result<proxmox_api::UreqClient, ServiceError> {
let host = self.setting_req(|s| &s.pve_api_host)?;
let pw = self.setting_req(|s| &s.pve_root_password)?;
let host = format!("https://{host}:8006");
let api = proxmox_api::UreqClient::new(&host, "root", "pam", &pw)
.map_err(ServiceError::PveError)?;
Ok(api)
}
pub fn pve_node(&self, host: &str) -> Result<NodeClient<UreqClient>, ServiceError> {
let pve = self.pve()?;
Ok(proxmox_api::nodes::NodesClient::new(pve).node(host))
}
pub fn pve_addr(&self, host: &str) -> Result<String, ServiceError> {
let ifaces = self.pve_node(host)?
.network()
.get(Default::default())
.map_err(ServiceError::PveError)?;
let addr = ifaces.iter()
.filter_map(|i| i.additional_properties.get("address"))
.next()
.ok_or(ServiceError::InvalidNetworkInterface)?;
if !addr.is_string() {
return Err(ServiceError::InvalidNetworkInterface);
}
Ok(addr.as_str().unwrap().to_string())
}
pub fn pve_ssh(&self, host: &str) -> Result<Session, ServiceError> {
let addr = self.pve_addr(host)?;
let addr = format!("{}:22", addr);
let tcp = TcpStream::connect(addr)
.map_err(SshError::IoError)
.map_err(ServiceError::SshError)?;
let mut sess = Session::new()
.map_err(SshError::ClientError)
.map_err(ServiceError::SshError)?;
sess.set_tcp_stream(tcp);
sess.handshake()
.map_err(SshError::ClientError)
.map_err(ServiceError::SshError)?;
let pw = self.setting_req(|s| &s.pve_root_password)?;
sess.userauth_password("root", &pw)
.map_err(SshError::ClientError)
.map_err(ServiceError::SshError)?;
if !sess.authenticated() {
return Err(ServiceError::SshError(SshError::AuthFailed));
}
Ok(sess)
}
}
pub fn ssh_run_trimmed(session: &Session, cmd: &str) -> Result<String, P5xError> {
let mut channel = session.channel_session()
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
channel.exec(cmd)
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
let mut contents = String::new();
channel.read_to_string(&mut contents)
.map_err(SshError::IoError)
.map_err(P5xError::SshError)?;
let exit_code = channel.exit_status()
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
let contents = contents.trim();
if exit_code != 0 {
return Err(P5xError::SshError(SshError::command_failed(exit_code, cmd, &contents)));
}
Ok(contents.to_string())
}

7
src/api/util.rs Normal file
View File

@ -0,0 +1,7 @@
use std::fmt::Debug;
use rocket::http;
use rocket::response::status;
pub fn raise_500(e: impl Debug) -> status::Custom<String> {
status::Custom(http::Status::InternalServerError, format!("An unexpected error has occurred: {:?}", e))
}

33
src/main.rs Normal file
View File

@ -0,0 +1,33 @@
pub mod api;
#[macro_use] extern crate rocket;
use rocket::{Build, Rocket};
use log::{error, info};
use std::{env, process};
use rocket_dyn_templates::{ Template};
fn configure_rocket() -> Rocket<Build> {
rocket::build()
.attach(Template::fairing())
.attach(api::init())
}
#[tokio::main]
async fn main() {
env_logger::init();
info!(target: "p5x", "Starting p5x...");
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
error!(target: "p5x", "Missing required <mode> argument. Valid modes: api-server");
process::exit(1);
}
let mode = &args[1];
if mode == "api-server" {
let rocket = configure_rocket();
if let Err(e) = rocket.launch().await {
error!(target: "p5x", "Rocket failed to launch: {:?}", e)
}
return;
}
}