[WIP] Start implementing init-time process to create /mnt/p5x-system-data volume

This commit is contained in:
Garrett Mills 2025-03-29 20:28:30 -04:00
parent ba05c5ba98
commit cc7700346e
8 changed files with 978 additions and 132 deletions

836
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -22,3 +22,5 @@ ureq = "2.10.1"
dotenv = "0.15.0"
ssh-key = { version = "0.6.7", features = ["ed25519"] }
rand = "0.8.5"
kube = { version = "0.99.0", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.24", features = ["v1_32"] }

View File

@ -48,6 +48,7 @@ In your Kubernetes cluster, in the `p5x-system` namespace, you should now see a
## License
P5x: Proxmox on Kubernetes - API Server
Copyright (C) 2025 Garrett Mills <shout@garrettmills.dev>
This program is free software: you can redistribute it and/or modify

10
deploy/10-dynamic-kv.yaml Normal file
View File

@ -0,0 +1,10 @@
---
apiVersion: v1
kind: ConfigMap
metadata:
namespace: p5x-system
name: dynamic-kv
labels:
p5x.garrettmills.dev/managed-by: 'p5x'
data:
p5x: "true"

View File

@ -1,11 +1,86 @@
use std::env;
use std::{env, fs};
use std::fs::{File, Permissions};
use std::io::Write;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use log::info;
use kube::{Api, Client};
use log::{info, debug};
use rand::rngs::OsRng;
use ssh_key::{PrivateKey, Algorithm, LineEnding};
use k8s_openapi::api::core::v1::{ConfigMap, Node, Pod};
use kube::api::{Patch, PatchParams};
use serde_json::json;
use crate::api::cluster::volume::create_volume_unmanaged;
use crate::api::entity::nodes::P5xError;
use crate::api::services::Services;
pub async fn ensure_system_disk(svc: &Services<'_>) -> Result<(), P5xError> {
info!(target: "p5x", "Ensuring that the P5x API system disk exists...");
// Load the dynamic-kv
let client = Client::try_default().await.map_err(P5xError::KubeError)?;
let namespace = fs::read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
.unwrap_or_else(|_| "p5x-system".to_string());
let maps: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
let map = maps.get("dynamic-kv").await.map_err(P5xError::KubeError)?;
// If there's already a PVE host and disk mount in the config, we're done
let mut data = map.data.unwrap_or_default();
if let Some(host) = data.get("api-pve-host") {
if let Some(mount) = data.get("api-pve-disk") {
info!(target: "p5x", "Found existing P5x API system disk on {host}:{mount}");
return Ok(());
}
}
// Otherwise, create the disk on this PVE host
info!(target: "p5x", "Provisioning new P5x API system disk (this is a one-time fixup)...");
// Load the labels for this pod's node
let pod_name = env::var("POD_NAME").expect("Could not determine POD_NAME from environment!");
let pods: Api<Pod> = Api::namespaced(client.clone(), &namespace);
let pod = pods.get(&pod_name).await.map_err(P5xError::KubeError)?;
let node_name = pod.spec.and_then(|spec| spec.node_name).expect("Could not determine the Node name for pod!");
let nodes: Api<Node> = Api::all(client);
let node = nodes.get(&node_name).await.map_err(P5xError::KubeError)?;
let labels = node.metadata.labels.expect("Could not load labels for node");
let pve_host = labels.get("p5x.garrettmills.dev/pve-host")
.expect("Node is missing required label: p5x.garrettmills.dev/pve-host");
let pve_id: i32 = labels.get("p5x.garrettmills.dev/pve-id")
.expect("Node is missing required label: p5x.garrettmills.dev/pve-id")
.parse()
.unwrap();
debug!(target: "p5x", "pod {pod_name} | node {node_name} | host {pve_host} | vmid {pve_id}");
// Create a new disk and mount it to the K8s node (LXC container)
let (disk_name, _) = create_volume_unmanaged(
svc,
5 * 1024 * 1024 * 1024,
pve_host,
pve_id,
"p5x-api-system-disk"
).await?;
// Add it to the dynamic-kv config and save
data.insert("api-pve-host".to_string(), pve_host.to_string());
data.insert("api-pve-disk".to_string(), disk_name);
let patch = json!({
"data": data
});
let params = PatchParams::apply("p5x-api");
maps.patch("dynamic-kv", &params, &Patch::Merge(&patch))
.await.map_err(P5xError::KubeError)?;
info!(target: "p5x", "Successfully provisioned P5x API system disk.");
Ok(())
}
/** Check if the SSH pubkey/privkey exist at the configured paths. If not, generate them. */

View File

@ -13,7 +13,7 @@ use tokio::time::sleep;
use ureq::Error::Status;
use crate::api::cluster::carrier::{provision_carrier, terminate_carrier};
use crate::api::cluster::node::migrate_node;
use crate::api::entity::nodes::{lock_first_available, P5xError};
use crate::api::entity::nodes::{lock_first_available, P5xError, PveConfig};
use crate::api::entity::{nodes, volumes};
use crate::api::services::{ssh_run_trimmed, Services};
@ -40,6 +40,69 @@ impl VolumeParams {
}
/**
* At runtime, you *probably* want to use actual create().
* This just creates/mounts the actual PVE disk, but doesn't do any of
* our locking/database work.
* Returns (Proxmox disk name, mountpoint ID)
* Example (vm-100-disk-1, 0)
*/
pub async fn create_volume_unmanaged(
svc: &Services<'_>,
size_in_bytes: u64,
pve_host: &str,
pve_id: i32,
name: &str,
) -> Result<(String, u32), P5xError> {
// Get the next available mountpoint ID
let conf = PveConfig::load(svc, pve_host, pve_id)?;
let mp_id = conf.next_nth("mp");
info!(target: "p5x", "Volume {name} will become mp{mp_id} on PVE {}", pve_id);
// Generate a new mountpoint entry for the node's config
let storage = &svc.config.pve_storage_pool;
let size_in_gib = max(size_in_bytes.div_ceil(1024 * 1024 * 1024) as i64, 1);
let line = format!("{storage}:{size_in_gib},mp=/mnt/p5x-{name},backup=1");
debug!(target: "p5x", "Volume {name}: {line}");
let mut params = PutParams::default();
params.mps.insert(mp_id, line);
// Update the node config to create the volume
debug!(target: "p5x", "Patching PVE config for volume {name}");
let vm_id = VmId::new(i64::from(pve_id)).unwrap();
let res = svc.pve_node(pve_host)
.map_err(P5xError::ServiceError)?
.lxc()
.vmid(vm_id)
.config()
.put(params);
// This is necessary because PUT returns {data: null} on success,
// which the UreqClient flags as an unknown error.
if let Err(UreqError::EncounteredErrors(e)) = res {
return Err(P5xError::PveError(UreqError::EncounteredErrors(e)));
}
// Stupid hack is stupid, but we don't get back a UPID to wait on the vol to be created
info!(target: "p5x", "Successfully patched PVE config. Waiting for volume {name} to appear");
sleep(Duration::from_secs(5)).await;
// Load the updated config
debug!(target: "p5x", "Loading updated node config for volume {name}");
let conf = PveConfig::load(svc, pve_host, pve_id)?;
let mount = conf.get(&format!("mp{mp_id}"))
.ok_or(P5xError::BadPrecondition("Could not find mountpoint in config after creating volume"))?;
debug!(target: "p5x", "Found mountpoint details for volume {name}: {mount}");
// Parse the disk name from the config
// synology-scsi-lun:vm-103-disk-1,mp=/mnt/p5x-test0,backup=1,size=1G // fixme: how does this behave for NFS?
let name_offset = storage.len() + 1; // 1 for the colon (:)
let disk_name = mount[name_offset..].split(",").next().unwrap().to_string();
Ok((disk_name, mp_id))
}
/** Create a new PVE volume of the given size. */
pub async fn create(
svc: &Services<'_>,
@ -64,51 +127,14 @@ pub async fn create(
let (node, _lock) = lock_first_available(svc.db, Some(&reason))
.await.map_err(P5xError::LockErr)?;
// Get the next available mountpoint ID
let conf = node.config(svc)?;
let mp_id = conf.next_nth("mp");
info!("Volume {name} will become mp{mp_id} on node {} ({})", node.hostname, node.pve_id);
// Generate a new mountpoint entry for the node's config
let storage = &svc.config.pve_storage_pool;
let size_in_gib = max((size_in_bytes as u64).div_ceil(1024 * 1024 * 1024) as i64, 1);
let line = format!("{storage}:{size_in_gib},mp=/mnt/p5x-{name},backup=1");
debug!("Volume {name}: {line}");
let mut params = PutParams::default();
params.mps.insert(mp_id, line);
// Update the node config to create the volume
debug!("Patching PVE config for volume {name}");
let vm_id = VmId::new(i64::from(node.pve_id)).unwrap();
let res = svc.pve_node(&node.pve_host)
.map_err(P5xError::ServiceError)?
.lxc()
.vmid(vm_id)
.config()
.put(params);
// This is necessary because PUT returns {data: null} on success,
// which the UreqClient flags as an unknown error.
if let Err(UreqError::EncounteredErrors(e)) = res {
return Err(P5xError::PveError(UreqError::EncounteredErrors(e)));
}
// Stupid hack is stupid, but we don't get back a UPID to wait on the vol to be created
info!("Successfully patched PVE config. Waiting for volume {name} to appear");
sleep(Duration::from_secs(5)).await;
// Load the updated config
debug!("Loading updated node config for volume {name}");
let conf = node.config(svc)?;
let mount = conf.get(&format!("mp{mp_id}"))
.ok_or(P5xError::BadPrecondition("Could not find mountpoint in config after creating volume"))?;
debug!("Found mountpoint details for volume {name}: {mount}");
// Parse the disk name from the config
// synology-scsi-lun:vm-103-disk-1,mp=/mnt/p5x-test0,backup=1,size=1G // fixme: how does this behave for NFS?
let name_offset = storage.len() + 1; // 1 for the colon (:)
let disk_name = mount[name_offset..].split(",").next().unwrap();
// Create and attach the actual PVE disk
let (disk_name, mp_id) = create_volume_unmanaged(
svc,
size_in_bytes as u64,
&node.pve_host,
node.pve_id,
name,
).await?;
// Persist the volume
debug!("Inserting record into volumes table for volume {name} ({disk_name})");

View File

@ -28,6 +28,7 @@ pub enum P5xError {
ServiceError(ServiceError),
PveError(proxmox_api::UreqError),
SshError(SshError),
KubeError(kube::Error),
InvalidNetworkInterface,
BadPrecondition(&'static str),
BadPostcondition(&'static str),
@ -110,6 +111,31 @@ impl PveConfig {
PveConfig { lines }
}
pub fn load(svc: &Services<'_>, pve_host: &str, pve_id: i32) -> Result<PveConfig, P5xError> {
let pve_ssh = svc.pve_ssh(pve_host)
.map_err(P5xError::ServiceError)?;
let path = format!("/etc/pve/lxc/{}.conf", pve_id);
let path = Path::new(&path);
let (mut remote_file, _) = pve_ssh.scp_recv(path)
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
let mut contents = String::new();
remote_file.read_to_string(&mut contents)
.map_err(SshError::IoError)
.map_err(P5xError::SshError)?;
// Close the channel and wait for the whole content to be transferred
remote_file.send_eof().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.wait_eof().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.close().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.wait_close().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
Ok(PveConfig::new(&contents))
}
/** Replace a line in the config file. */
pub fn replace<F1, F2>(
&mut self,
@ -272,28 +298,7 @@ impl Model {
/** Load the LXC container config for this node. */
pub fn config(&self, svc: &Services) -> Result<PveConfig, P5xError> {
let pve_ssh = svc.pve_ssh(&self.pve_host)
.map_err(P5xError::ServiceError)?;
let path = format!("/etc/pve/lxc/{}.conf", self.pve_id);
let path = Path::new(&path);
let (mut remote_file, _) = pve_ssh.scp_recv(path)
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
let mut contents = String::new();
remote_file.read_to_string(&mut contents)
.map_err(SshError::IoError)
.map_err(P5xError::SshError)?;
// Close the channel and wait for the whole content to be transferred
remote_file.send_eof().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.wait_eof().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.close().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.wait_close().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
Ok(PveConfig::new(&contents))
PveConfig::load(svc, &self.pve_host, self.pve_id)
}
/** Replace the LXC container config for this node. */

View File

@ -4,7 +4,9 @@ use dotenv::dotenv;
use rocket::{Build, Rocket};
use log::{error, info};
use std::{env, process};
use crate::api::cluster::system::ensure_ssh_keypair;
use sea_orm::Database;
use crate::api::cluster::system::{ensure_ssh_keypair, ensure_system_disk};
use crate::api::services::Services;
use crate::api::util::read_p5x_config;
fn configure_rocket() -> Rocket<Build> {
@ -20,10 +22,18 @@ async fn main() {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
error!(target: "p5x", "Missing required <mode> argument. Valid modes: api-server");
error!(target: "p5x", "Missing required <mode> argument. Valid modes: api-server,ensure-system-disk");
process::exit(1);
}
let mode = &args[1];
if mode == "ensure-system-disk" {
let anon_db = Database::connect("sqlite::memory:").await.unwrap();
let svc = Services::build(&anon_db).await.unwrap(); // fixme: this is going to fail because of the SSH keys
ensure_system_disk(&svc).await.unwrap();
return;
}
ensure_ssh_keypair().expect("Could not ensure SSH keypair exists.");
let config = read_p5x_config(); // Do this so we early-fail if there are missing env vars
@ -31,7 +41,6 @@ async fn main() {
info!(target: "p5x", "Cluster host: {} ({})", config.pve_host_name, config.pve_api_host);
info!(target: "p5x", "Storage pool: {} ({})", config.pve_storage_pool, config.pve_storage_driver);
let mode = &args[1];
if mode == "api-server" {
let rocket = configure_rocket();
if let Err(e) = rocket.launch().await {