diff --git a/src/api/cluster/carrier.rs b/src/api/cluster/carrier.rs index d74a3cd..3bff999 100644 --- a/src/api/cluster/carrier.rs +++ b/src/api/cluster/carrier.rs @@ -8,6 +8,7 @@ use crate::api::cluster::node::wait_upid; use crate::api::entity::locks::lock_vmid; use crate::api::entity::{locks, nodes}; use crate::api::entity::nodes::P5xError; +use crate::api::entity::oplogs::Op; use crate::api::services::{ssh_run_trimmed, Services}; use crate::api::services::SshError; @@ -18,13 +19,55 @@ pub async fn provision_carrier( from_node: &nodes::Model, ) -> Result { // Make sure the empty filesystem template exists - ensure_carrier_template(svc, from_node).await?; + ensure_carrier_template(svc, &from_node.pve_host).await?; // Get the next VMID to be used by the carrier container let _lock = lock_vmid(svc.db, Some("Provisioning carrier container")) .await.map_err(P5xError::DbErr)?; - let pve_client = svc.pve() - .map_err(P5xError::ServiceError)?; + + let (hostname, vm_id) = svc.op_wrap_f( + svc.op_start(Op::ProvisionCarrier, &format!("Host {}", &from_node.pve_host)).await?, + "Provisioned carrier", + async || { + provision_carrier_unmanaged(svc, &from_node.pve_host).await + }, + ).await?; + + // Create a new node instance + let node = nodes::ActiveModel { + hostname: Set(hostname), + pve_id: Set(i32::try_from(vm_id.get()).unwrap()), + pve_host: Set(from_node.pve_host.to_string()), + assigned_ip: Set("0.0.0.0".to_string()), + assigned_subnet: Set(0), + is_permanent: Set(false), + ..Default::default() + } + .save(svc.db) + .await + .and_then(|v| v.try_into_model()) + .map_err(P5xError::DbErr)?; + + // Create a lock instance for the new node + locks::ActiveModel { + lock_type: Set("nodes".to_string()), + lock_resource: Set(node.id.to_string()), + ..Default::default() + } + .save(svc.db) + .await + .map_err(P5xError::DbErr)?; + + Ok(node) +} + + +/** Like provision_carrier, but without any of the SQL bookkeeping, locking, or safety checks. */ +pub async fn provision_carrier_unmanaged( + svc: &Services<'_>, + pve_host: &str, +) -> Result<(String, VmId), P5xError> { + let pve_client = svc.pve().map_err(P5xError::ServiceError)?; let pve = proxmox_api::cluster::ClusterClient::new(pve_client); let vm_id = pve.nextid() @@ -46,51 +89,24 @@ pub async fn provision_carrier( params.tags = Some("p5x".to_string()); // Ask the PVE API to start creating the carrier node based on our empty template - let upid = svc.pve_node(&from_node.pve_host) + let upid = svc.pve_node(&pve_host) .map_err(P5xError::ServiceError)? .lxc() .post(params) .map_err(P5xError::PveError)?; // Wait for the container creation task to finish - wait_upid(svc, &from_node.pve_host, &upid).await?; + wait_upid(svc, &pve_host, &upid).await?; - // Create a new node instance - let node = nodes::ActiveModel { - hostname: Set(hostname), - pve_id: Set(i32::try_from(vm_id.get()).unwrap()), - pve_host: Set(from_node.pve_host.to_string()), - assigned_ip: Set("0.0.0.0".to_string()), - assigned_subnet: Set(0), - is_permanent: Set(false), - ..Default::default() - } - .save(svc.db) - .await - .map_err(P5xError::DbErr)?; - - let node = node.try_into_model().map_err(P5xError::DbErr)?; - - // Create a lock instance for the new node - locks::ActiveModel { - lock_type: Set("nodes".to_string()), - lock_resource: Set(node.id.to_string()), - ..Default::default() - } - .save(svc.db) - .await - .map_err(P5xError::DbErr)?; - - Ok(node) + Ok((hostname, vm_id)) } - /** Make sure the tarball LXC template for the carrier container exists on the given node. */ -pub async fn ensure_carrier_template( +async fn ensure_carrier_template( svc: &Services<'_>, - node: &nodes::Model, + pve_host: &str, ) -> Result<(), P5xError> { - let pve_ssh = svc.pve_ssh(&node.pve_host) + let pve_ssh = svc.pve_ssh(pve_host) .map_err(P5xError::ServiceError)?; // Use SFTP to check whether the file exists @@ -120,31 +136,58 @@ pub async fn ensure_carrier_template( pub async fn terminate_carrier( svc: &Services<'_>, carrier: nodes::Model, +) -> Result<(), P5xError> { + // pre-condition: the carrier is offline + if !carrier.is_offline(svc)? { + return Err(P5xError::BadPrecondition("Cannot terminate carrier instance: carrier is still running")); + } + + // pre-condition: the carrier has a clean disk config + if carrier.config(svc)?.has_pending() { + return Err(P5xError::BadPrecondition("Cannot terminate carrier instance: carrier has unclean PVE config")) + } + + svc.op_wrap_f( + svc.op_start(Op::TerminateCarrier, &format!("Carrier {} ({})", carrier.vm_id(), &carrier.pve_host)).await?, + "Terminated carrier", + async || { + terminate_carrier_unmanaged(svc, &carrier.pve_host, carrier.vm_id()).await?; + + nodes::Entity::delete_by_id(carrier.id) + .exec(svc.db) + .await + .map_err(P5xError::DbErr)?; + + locks::Entity::delete_many() + .filter(locks::Column::LockType.eq("nodes")) + .filter(locks::Column::LockResource.eq(carrier.id.to_string())) + .exec(svc.db) + .await + .map_err(P5xError::DbErr)?; + + Ok(()) + } + ).await +} + + +/** Like terminate_carrier, but without any of the SQL bookkeeping, locking, or safety checks. */ +pub async fn terminate_carrier_unmanaged( + svc: &Services<'_>, + pve_host: &str, + vm_id: VmId, ) -> Result<(), P5xError> { let mut params = vmid::DeleteParams::default(); params.purge = Some(true); params.destroy_unreferenced_disks = Some(true); - let upid = svc.pve_node(&carrier.pve_host) + let upid = svc.pve_node(pve_host) .map_err(P5xError::ServiceError)? .lxc() - .vmid(carrier.vm_id()) + .vmid(vm_id) .delete(params) .map_err(P5xError::PveError)?; - wait_upid(svc, &carrier.pve_host, &upid).await?; - - nodes::Entity::delete_by_id(carrier.id) - .exec(svc.db) - .await - .map_err(P5xError::DbErr)?; - - locks::Entity::delete_many() - .filter(locks::Column::LockType.eq("nodes")) - .filter(locks::Column::LockResource.eq(carrier.id.to_string())) - .exec(svc.db) - .await - .map_err(P5xError::DbErr)?; - - Ok(()) + // post-condition: the carrier was successfully deleted + wait_upid(svc, pve_host, &upid).await } diff --git a/src/api/cluster/node.rs b/src/api/cluster/node.rs index 793497d..6c4c3b4 100644 --- a/src/api/cluster/node.rs +++ b/src/api/cluster/node.rs @@ -4,8 +4,10 @@ use proxmox_api::nodes::node::tasks::upid; use sea_orm::*; use tokio::time::sleep; use log::{info}; +use proxmox_api::types::VmId; use crate::api::entity::{locks, nodes}; use crate::api::entity::nodes::{NodeParams, P5xError}; +use crate::api::entity::oplogs::Op; use crate::api::services::Services; @@ -100,18 +102,23 @@ pub async fn migrate_node( node: nodes::Model, to_host: &str, ) -> Result { - // Ask the PVE API to start migrating the node - let params = migrate::PostParams::new(to_host.to_string()); - let upid = svc.pve_node(&node.pve_host) - .map_err(P5xError::ServiceError)? - .lxc() - .vmid(node.vm_id()) - .migrate() - .post(params) - .map_err(P5xError::PveError)?; + // pre-condition: the node is offline + if !node.is_offline(svc)? { + return Err(P5xError::BadPrecondition("Cannot migrate node: node is not offline")) + } - // Wait for the UPID to finish - wait_upid(svc, &node.pve_host, &upid).await?; + // pre-condition: the LXC container has a clean disk config + if node.config(svc)?.has_pending() { + return Err(P5xError::BadPrecondition("Cannot migrate node: node has unclean PVE config")) + } + + svc.op_wrap_f( + svc.op_start(Op::MigrateNode, &format!("Migrate node {} ({} to {})", node.vm_id(), &node.pve_host, to_host)).await?, + "Migrated node", + async || { + migrate_node_unmanaged(svc, &node.pve_host, node.pve_id, to_host).await + } + ).await?; // Persist the node let mut node = node.into_active_model(); @@ -126,6 +133,28 @@ pub async fn migrate_node( } +/** Like migrate_node, but without any of the SQL bookkeeping, locking, or safety checks. */ +pub async fn migrate_node_unmanaged( + svc: &Services<'_>, + from_pve_host: &str, + from_pve_id: i32, + to_host: &str, +) -> Result<(), P5xError> { + // Ask the PVE API to start migrating the node + let params = migrate::PostParams::new(to_host.to_string()); + let upid = svc.pve_node(from_pve_host) + .map_err(P5xError::ServiceError)? + .lxc() + .vmid(VmId::new(i64::from(from_pve_id)).unwrap()) + .migrate() + .post(params) + .map_err(P5xError::PveError)?; + + // post-condition: the LXC container migrated successfully + wait_upid(svc, from_pve_host, &upid).await +} + + /** Wait for a PVE task to complete using its UPID */ pub async fn wait_upid(svc: &Services<'_>, node: &str, upid: &str) -> Result<(), P5xError> { info!("Waiting for UPID {upid} on node {node}"); diff --git a/src/api/cluster/system.rs b/src/api/cluster/system.rs index f1c3b88..c84911b 100644 --- a/src/api/cluster/system.rs +++ b/src/api/cluster/system.rs @@ -3,15 +3,22 @@ use std::fs::{File, Permissions}; use std::io::Write; use std::os::unix::fs::PermissionsExt; use std::path::Path; +use std::time::Duration; use kube::{Api, Client}; use log::{info, debug}; use rand::rngs::OsRng; use ssh_key::{PrivateKey, Algorithm, LineEnding}; use k8s_openapi::api::core::v1::{ConfigMap, Node, Pod}; use kube::api::{Patch, PatchParams}; +use proxmox_api::nodes::node::lxc::vmid::move_volume; +use proxmox_api::types::VmId; +use proxmox_api::UreqError; use serde_json::json; +use tokio::time::sleep; +use crate::api::cluster::carrier::{provision_carrier_unmanaged, terminate_carrier_unmanaged}; +use crate::api::cluster::node::migrate_node_unmanaged; use crate::api::cluster::volume::create_volume_unmanaged; -use crate::api::entity::nodes::P5xError; +use crate::api::entity::nodes::{P5xError, PveConfig}; use crate::api::services::Services; pub async fn ensure_system_disk(svc: &Services<'_>) -> Result<(), P5xError> { @@ -69,6 +76,7 @@ pub async fn ensure_system_disk(svc: &Services<'_>) -> Result<(), P5xError> { // Add it to the dynamic-kv config and save data.insert("api-pve-host".to_string(), pve_host.to_string()); + data.insert("api-pve-id".to_string(), pve_id.to_string()); data.insert("api-pve-disk".to_string(), disk_name); let patch = json!({ "data": data @@ -83,6 +91,53 @@ pub async fn ensure_system_disk(svc: &Services<'_>) -> Result<(), P5xError> { } +pub async fn migrate_system_disk_if_necessary(svc: &Services<'_>) -> Result<(), P5xError> { + // Load the dynamic-kv and get the current host/mount + let client = Client::try_default().await.map_err(P5xError::KubeError)?; + let namespace = fs::read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + .unwrap_or_else(|_| "p5x-system".to_string()); + + let maps: Api = Api::namespaced(client.clone(), &namespace); + let map = maps.get("dynamic-kv").await.map_err(P5xError::KubeError)?; + + let data = map.data.unwrap_or_default(); + let current_host = data.get("api-pve-host").expect("Could not find api-pve-host in dynamic-kv config"); + let current_mount = data.get("api-pve-disk").expect("Could not find api-pve-disk in dynamic-kv config"); + let current_pve_id: i32 = data.get("api-pve-id").expect("Could not find api-pve-id in dynamic-kv config").parse().unwrap(); + + // Load the labels for this pod's node + let pod_name = env::var("POD_NAME").expect("Could not determine POD_NAME from environment!"); + let pods: Api = Api::namespaced(client.clone(), &namespace); + let pod = pods.get(&pod_name).await.map_err(P5xError::KubeError)?; + + let node_name = pod.spec.and_then(|spec| spec.node_name).expect("Could not determine the Node name for pod!"); + + let nodes: Api = Api::all(client); + let node = nodes.get(&node_name).await.map_err(P5xError::KubeError)?; + let labels = node.metadata.labels.expect("Could not load labels for node"); + + let pve_host = labels.get("p5x.garrettmills.dev/pve-host") + .expect("Node is missing required label: p5x.garrettmills.dev/pve-host"); + + let pve_id: i32 = labels.get("p5x.garrettmills.dev/pve-id") + .expect("Node is missing required label: p5x.garrettmills.dev/pve-id") + .parse() + .unwrap(); + + // If the disk is already on our LXC container (K8s node), we're done + if pve_id == current_pve_id { + return Ok(()) + } + + // Otherwise, we need to migrate the disk to this node + // FIXME: Consideration: What if the disk is still mounted somewhere? + transfer_unmanaged( + svc, current_mount, current_host, current_pve_id, pve_host, pve_id).await?; + + Ok(()) +} + + /** Check if the SSH pubkey/privkey exist at the configured paths. If not, generate them. */ pub fn ensure_ssh_keypair() -> Result<(), ssh_key::Error> { let pubkey_path = env::var("P5X_SSH_PUBKEY_PATH").expect("Missing env: P5X_SSH_PUBKEY_PATH"); @@ -115,3 +170,111 @@ pub fn ensure_ssh_keypair() -> Result<(), ssh_key::Error> { Ok(()) } + + +// todo: these are very similar to transfer() and transfer_directly(), but there was too much +// sql bookkeeping in those to generalize effectively. + +/** Migrate a volume from its current LXC container to the specified LXC container. */ +async fn transfer_unmanaged( + svc: &Services<'_>, + old_mountpoint_identifier: &str, + from_pve_host: &str, + from_pve_id: i32, + to_pve_host: &str, + to_pve_id: i32, +) -> Result { + // If from_node and to_node are on the same physical host, transfer directly + if from_pve_host == to_pve_host { + return transfer_directly_unmanaged( + svc, old_mountpoint_identifier, from_pve_host, from_pve_id, to_pve_host, to_pve_id).await; + } + + // If the nodes are on different physical hosts, we need to create a temporary + // container on shared storage to attach the volume to. We'll then migrate that + // container to the target physical host. + let (hostname, vm_id) = provision_carrier_unmanaged(svc, from_pve_host).await?; + let carrier_mountpoint = transfer_directly_unmanaged( + svc, old_mountpoint_identifier, from_pve_host, from_pve_id, &hostname, vm_id.get() as i32).await?; + + // Migrate the carrier to the target host + migrate_node_unmanaged(svc, from_pve_host, vm_id.get() as i32, to_pve_host).await?; + + // Transfer the volume from the carrier to the target LXC container + let new_mountpoint = transfer_directly_unmanaged( + svc, &carrier_mountpoint, &hostname, vm_id.get() as i32, to_pve_host, to_pve_id).await?; + + // Delete the carrier container + terminate_carrier_unmanaged(svc, to_pve_host, vm_id).await?; + + Ok(new_mountpoint) +} + + +async fn transfer_directly_unmanaged( + svc: &Services<'_>, + old_mountpoint_identifier: &str, + from_pve_host: &str, + from_pve_id: i32, + to_pve_host: &str, + to_pve_id: i32, +) -> Result { + // pre-condition: to_node's config is clean + let old_to_config = PveConfig::load(svc, to_pve_host, to_pve_id)?; + if old_to_config.has_pending() { + return Err(P5xError::BadPrecondition("Cannot transfer volume: to-node has unclean LXC config")) + } + + // pre-condition: from_node's config is clean + let old_from_config = PveConfig::load(svc, from_pve_host, from_pve_id)?; + if old_from_config.has_pending() { + return Err(P5xError::BadPrecondition("Cannot transfer volume: from-node has unclean LXC config")) + } + + // Figure out the mountpoint identifier on the new node + let new_mountpoint_identifier = old_to_config.next_nth("unused"); + let new_mountpoint_identifier = format!("unused{new_mountpoint_identifier}"); + + // Ask the PVE API to move the volume to the new node + let pve_from_vol = move_volume::Volume::try_from(old_mountpoint_identifier).unwrap(); + let pve_to_vol = move_volume::TargetVolume::try_from(new_mountpoint_identifier.as_str()).unwrap(); + let mut post_params = move_volume::PostParams::new(pve_from_vol); + post_params.target_volume = Some(pve_to_vol); + post_params.target_vmid = Some(VmId::new(i64::from(to_pve_id)).unwrap()); + + let res = svc.pve_node(from_pve_host) + .map_err(P5xError::ServiceError)? + .lxc() + .vmid(VmId::new(i64::from(from_pve_id)).unwrap()) + .move_volume() + .post(post_params); + + // This is necessary because POST returns {data: null} on success, + // which the UreqClient flags as an unknown error. + if let Err(UreqError::EncounteredErrors(e)) = res { + return Err(P5xError::PveError(UreqError::EncounteredErrors(e))); + } + + // Wait for the volume to transfer, since we don't get back a UPID + sleep(Duration::from_secs(10)).await; + + // post-condition: from_node's config is clean + let new_from_config = PveConfig::load(svc, from_pve_host, from_pve_id)?; + if new_from_config.has_pending() { + // todo: don't have a great way to revert this currently w/o a ton of work + return Err(P5xError::BadPostcondition("Could not transfer volume: from-node had unclean LXC config after transfer")) + } + + // post-condition: to_node's config is clean + let new_to_config = PveConfig::load(svc, to_pve_host, to_pve_id)?; + if new_to_config.has_pending() { + // todo: don't have a great way to revert this currently w/o a ton of work + return Err(P5xError::BadPostcondition("Could not transfer volume: to-node had unclean LXC config after transfer")) + } + + // Verify that the volume appears in the new node's config + new_to_config.get(&new_mountpoint_identifier) + .ok_or(P5xError::BadPostcondition("Could not find mountpoint config after transferring volume"))?; + + Ok(new_mountpoint_identifier) +} diff --git a/src/api/cluster/volume.rs b/src/api/cluster/volume.rs index e283b9e..b3ca453 100644 --- a/src/api/cluster/volume.rs +++ b/src/api/cluster/volume.rs @@ -15,6 +15,7 @@ use crate::api::cluster::carrier::{provision_carrier, terminate_carrier}; use crate::api::cluster::node::migrate_node; use crate::api::entity::nodes::{lock_first_available, P5xError, PveConfig}; use crate::api::entity::{nodes, volumes}; +use crate::api::entity::oplogs::Op; use crate::api::services::{ssh_run_trimmed, Services}; @@ -54,9 +55,14 @@ pub async fn create_volume_unmanaged( pve_id: i32, name: &str, ) -> Result<(String, u32), P5xError> { + // pre-condition: the PVE host has a clean config state + let old_conf = PveConfig::load(svc, pve_host, pve_id)?; + if old_conf.has_pending() { + return Err(P5xError::BadPrecondition("Cannot create volume: LXC container has unclean PVE config")) + } + // Get the next available mountpoint ID - let conf = PveConfig::load(svc, pve_host, pve_id)?; - let mp_id = conf.next_nth("mp"); + let mp_id = old_conf.next_nth("mp"); info!(target: "p5x", "Volume {name} will become mp{mp_id} on PVE {}", pve_id); // Generate a new mountpoint entry for the node's config @@ -66,33 +72,57 @@ pub async fn create_volume_unmanaged( debug!(target: "p5x", "Volume {name}: {line}"); let mut params = PutParams::default(); - params.mps.insert(mp_id, line); + params.mps.insert(mp_id, line.clone()); // Update the node config to create the volume debug!(target: "p5x", "Patching PVE config for volume {name}"); let vm_id = VmId::new(i64::from(pve_id)).unwrap(); - let res = svc.pve_node(pve_host) - .map_err(P5xError::ServiceError)? - .lxc() - .vmid(vm_id) - .config() - .put(params); - // This is necessary because PUT returns {data: null} on success, - // which the UreqClient flags as an unknown error. - if let Err(UreqError::EncounteredErrors(e)) = res { - return Err(P5xError::PveError(UreqError::EncounteredErrors(e))); - } + let mount = svc.op_wrap_f( + svc.op_start(Op::CreateVolumeUnmanaged, &format!("Vol {name} on pve {pve_id} ({pve_host}) as {mp_id} -- {line}")).await?, + "Created volume", + async || { + let res = svc.pve_node(pve_host) + .map_err(P5xError::ServiceError)? + .lxc() + .vmid(vm_id) + .config() + .put(params); - // Stupid hack is stupid, but we don't get back a UPID to wait on the vol to be created - info!(target: "p5x", "Successfully patched PVE config. Waiting for volume {name} to appear"); - sleep(Duration::from_secs(5)).await; + // post-condition: the config was applied successfully + // This is necessary because PUT returns {data: null} on success, + // which the UreqClient flags as an unknown error. + if let Err(UreqError::EncounteredErrors(e)) = res { + // Revert the config on error + old_conf.write(svc, pve_host, pve_id)?; + return Err(P5xError::PveError(UreqError::EncounteredErrors(e))); + } - // Load the updated config - debug!(target: "p5x", "Loading updated node config for volume {name}"); - let conf = PveConfig::load(svc, pve_host, pve_id)?; - let mount = conf.get(&format!("mp{mp_id}")) - .ok_or(P5xError::BadPrecondition("Could not find mountpoint in config after creating volume"))?; + // Stupid hack is stupid, but we don't get back a UPID to wait on the vol to be created + info!(target: "p5x", "Successfully patched PVE config. Waiting for volume {name} to appear"); + sleep(Duration::from_secs(5)).await; + + // post-condition: The new volume appears in the PVE config cleanly + debug!(target: "p5x", "Loading updated node config for volume {name}"); + let new_conf = PveConfig::load(svc, pve_host, pve_id)?; + if new_conf.has_pending() { + // Revert the config on error + old_conf.write(svc, pve_host, pve_id)?; + return Err(P5xError::BadPostcondition("Could not create volume: applying config change caused an unclean config state")) + } + + let mount = new_conf.get(&format!("mp{mp_id}")); + if let None = mount { + // Revert the config on error + old_conf.write(svc, pve_host, pve_id)?; + return Err(P5xError::BadPrecondition("Could not find mountpoint in config after creating volume")) + } + + Ok(mount) + } + ).await?; + + let mount = mount.unwrap(); debug!(target: "p5x", "Found mountpoint details for volume {name}: {mount}"); // Parse the disk name from the config @@ -185,33 +215,54 @@ pub async fn mount( let reason = format!("Mounting volume {} at {}", params.name, mountpoint); let _lock = node.lock(svc, Some(&reason)); - // Find the next available mountpoint identifier - let qualified_name = vol.qualified_name(svc).await?; - let mountpoint_identifier = node.config(svc)?.next_nth("mp"); - let mount_line = format!("{qualified_name},mp={mountpoint},backup=1"); + let mountpoint_identifier = svc.op_wrap_f( + svc.op_start(Op::MountVolume, &format!("Node {}, vol {}, mp {mountpoint}", node.hostname, params.name)).await?, + "Mounted volume", + async || { + // pre-condition: the node's config is clean + let old_conf = node.config(svc)?; + if old_conf.has_pending() { + return Err(P5xError::BadPrecondition("Cannot mount volume: LXC container has unclean PVE config")) + } - // Patch the node's config to mount the volume - let pve_node = svc.pve_node(&node.pve_host) - .map_err(P5xError::ServiceError)?; + // Find the next available mountpoint identifier + let qualified_name = vol.qualified_name(svc).await?; + let mountpoint_identifier = old_conf.next_nth("mp"); + let mount_line = format!("{qualified_name},mp={mountpoint},backup=1"); - let mut put_params = PutParams::default(); - put_params.mps.insert(mountpoint_identifier, mount_line); + // Patch the node's config to mount the volume + let pve_node = svc.pve_node(&node.pve_host) + .map_err(P5xError::ServiceError)?; - debug!("Patching node config to mount volume {} ({put_params:?})", params.name); - let res = pve_node.lxc() - .vmid(node.vm_id()) - .config() - .put(put_params); + let mut put_params = PutParams::default(); + put_params.mps.insert(mountpoint_identifier, mount_line); - if let Err(Ureq(Status(_, ires))) = res { - debug!("PVE response: {}", ires.into_string().unwrap()); - } + debug!("Patching node config to mount volume {} ({put_params:?})", params.name); + let res = pve_node.lxc() + .vmid(node.vm_id()) + .config() + .put(put_params); - // This is necessary because PUT returns {data: null} on success, - // which the UreqClient flags as an unknown error. - /*if let Err(UreqError::EncounteredErrors(e)) = res { - return Err(P5xError::PveError(UreqError::EncounteredErrors(e))); - }*/ + // This is necessary because PUT returns {data: null} on success, + // which the UreqClient flags as an unknown error. + if let Err(UreqError::EncounteredErrors(e)) = res { + return Err(P5xError::PveError(UreqError::EncounteredErrors(e))); + } else if let Err(Ureq(Status(_, ires))) = res { + // FIXME: WTF here + debug!("PVE response: {}", ires.into_string().unwrap()); + } + + // post-condition: the config was applied cleanly + let new_conf = node.config(svc)?; + if new_conf.has_pending() { + // Revert the config on error + node.write_config(svc, &old_conf)?; + return Err(P5xError::BadPostcondition("Cannot mount volume: LXC container config was unclean after mount")) + } + + Ok(mountpoint_identifier) + } + ).await?; // Persist the volume debug!("Persisting mount changes to volume {} in database", params.name); @@ -248,64 +299,101 @@ pub async fn unmount( return Err(P5xError::BadPrecondition("Tried to unmount volume without a disk_name set")); } - // Unmount the disk's filesystem from within the K8s node - debug!("Unmounting volume {} ({}) from K8s host", vol.name, vol.mountpoint.as_ref().unwrap()); - let cmd = format!("umount '{}'", vol.mountpoint.as_ref().unwrap()); + // Lock the node's config let node = vol.node(svc).await?; - node.ssh_run_trimmed(svc, &cmd)?; + let _lock = node.lock(svc, Some(&format!("Unmounting volume {}", vol.volume_id))) + .await + .map_err(P5xError::LockErr)?; - // Unmount the disk's filesystem from the PVE node's shadow tree (see UNMOUNT_NOTES below) - // -- Find the PID of the LXC container itself - debug!("Attempting to identify host PID for node {}", &node.pve_host); - let pve_ssh = svc.pve_ssh(&node.pve_host) - .map_err(P5xError::ServiceError)?; - let cmd = format!("lxc-info -n {} -p", node.pve_id); - let ct_pid = ssh_run_trimmed(&pve_ssh, &cmd)?; - let ct_pid = ct_pid - .split("PID:") - .nth(1) - .ok_or(P5xError::BadPrecondition("Failed to parse PID of node's LXC container"))? - .trim(); + // pre-condition: the node's disk config is clean + let old_conf = node.config(svc)?; + if old_conf.has_pending() { + return Err(P5xError::BadPrecondition("Cannot unmount volume: LXC container has unclean config")) + } - // -- Find the parent PID (where the shadow tree exists) - debug!("Attempting to identify parent PID for node {} (pid: {})", &node.pve_host, ct_pid); - let cmd = format!("ps -o ppid= -p {ct_pid}"); - let parent_pid = ssh_run_trimmed(&pve_ssh, &cmd)?; - - // -- Unmount the disk's filesystem from the shadow tree's mount namespace: let mp_id = vol.mountpoint_identifier.clone().unwrap(); - debug!("Unmounting volume {} from shadow namespace (pid: {}) {}", &vol.name, parent_pid, &mp_id); - let cmd = format!("nsenter --target {parent_pid} --mount /bin/bash -c 'umount /var/lib/lxc/.pve-staged-mounts/{}'", mp_id); - ssh_run_trimmed(&pve_ssh, &cmd)?; - // For LVM-type storage pools, we also need to deactivate the logical volume - let pool_name = &svc.config.pve_storage_pool; - let pool_driver = &svc.config.pve_storage_driver; - if pool_driver == "lvm" { - let cmd = format!("lvchange -aln '/dev/{pool_name}/{}'", vol.disk_name.as_ref().unwrap()); - ssh_run_trimmed(&pve_ssh, &cmd)?; + // Hot-unmounting logic is only necessary if the LXC container is running + if !node.is_offline(svc)? { + let mpath = vol.mountpoint.as_ref().unwrap(); + + svc.op_wrap_f( + svc.op_start(Op::UnmountVolume, &format!("Vol {} ({})", vol.name, mpath)).await?, + "Unmounted volume", + async || { + // Unmount the disk's filesystem from within the K8s node + // (1) The path must be a directory + // (2) The path must be a mountpoint (the `stat` command) + debug!("Unmounting volume {} ({}) from K8s host", vol.name, mpath); + let cmd = format!("([ -d '{}' ] && [ \"$(stat --printf '%m' '{}')\" != '{}' ]) || umount '{}'", mpath, mpath, mpath, mpath); + debug!("Unmount cmd: {cmd}"); // fixme remove this + node.ssh_run_trimmed(svc, &cmd)?; + + // Unmount the disk's filesystem from the PVE node's shadow tree (see UNMOUNT_NOTES below) + // -- Find the PID of the LXC container itself + debug!("Attempting to identify host PID for node {}", &node.pve_host); + let pve_ssh = svc.pve_ssh(&node.pve_host) + .map_err(P5xError::ServiceError)?; + let cmd = format!("lxc-info -n {} -p", node.pve_id); + let ct_pid = ssh_run_trimmed(&pve_ssh, &cmd)?; + let ct_pid = ct_pid + .split("PID:") + .nth(1) + .ok_or(P5xError::BadPrecondition("Failed to parse PID of node's LXC container"))? + .trim(); + + // -- Find the parent PID (where the shadow tree exists) + debug!("Attempting to identify parent PID for node {} (pid: {})", &node.pve_host, ct_pid); + let cmd = format!("ps -o ppid= -p {ct_pid}"); + let parent_pid = ssh_run_trimmed(&pve_ssh, &cmd)?; + + // -- Unmount the disk's filesystem from the shadow tree's mount namespace: + debug!("Unmounting volume {} from shadow namespace (pid: {}) {}", &vol.name, parent_pid, &mp_id); + let cmd = format!("nsenter --target {parent_pid} --mount /bin/bash -c 'umount /var/lib/lxc/.pve-staged-mounts/{}'", mp_id); + ssh_run_trimmed(&pve_ssh, &cmd)?; + + // For LVM-type storage pools, we also need to deactivate the logical volume + let pool_name = &svc.config.pve_storage_pool; + let pool_driver = &svc.config.pve_storage_driver; + if pool_driver == "lvm" { + let cmd = format!("lvchange -aln '/dev/{pool_name}/{}'", vol.disk_name.as_ref().unwrap()); + ssh_run_trimmed(&pve_ssh, &cmd)?; + } + + Ok(()) + }, + ).await?; } // Patch the PVE node's config to mark the volume as an unused disk let qualified_name = vol.qualified_name(svc).await?; - let lock_reason = format!("Unmounting volume {}", vol.volume_id); let mut vol = vol.into_active_model(); - node.mutate_config( - svc, - Some(&lock_reason), - |mut conf| { - let next_unused = conf.next_nth("unused"); - conf.replace( - |s| s.starts_with(&mp_id), - |_| Some(format!("unused{next_unused}: {qualified_name}")), - ); + let mut conf = node.config(svc)?; + let next_unused = conf.next_nth("unused"); + conf.replace( + |s| s.starts_with(&mp_id), + |_| Some(format!("unused{next_unused}: {qualified_name}")), + ); + + svc.op_wrap_f( + svc.op_start(Op::RewriteConfigUnmount, &format!("Vol {} Node {}", qualified_name, node.hostname)).await?, + "Updated config", + async || { vol.mountpoint_identifier = Set(Some(format!("unused{next_unused}"))); + node.write_config(svc, &conf)?; - conf - } - ) - .await?; + // post-condition: the node's config was applied cleanly + let new_conf = node.config(svc)?; + if new_conf.has_pending() { + // Revert the config change (since we're direct-patching it, this shouldn't really happen) + node.write_config(svc, &old_conf)?; + return Err(P5xError::BadPostcondition("Could not unmount volume: patching resulted in unclean LXC config")) + } + + Ok(()) + }, + ).await?; // Persist the volume changes vol.mountpoint = Set(None); @@ -350,27 +438,56 @@ pub async fn delete( return Err(P5xError::BadPrecondition("Could not delete volume: volume has no mountpoint identifier")); } - // Patch the PVE config to delete the volume from the config + // Lock the node config let node = vol.node(svc).await?; + let _lock = node.lock(svc, Some(&format!("Deleting volume {}", params.name))) + .await + .map_err(P5xError::LockErr)?; + + // pre-condition: the node's config is clean + let old_conf = node.config(svc)?; + if old_conf.has_pending() { + return Err(P5xError::BadPrecondition("Could not delete volume: LXC container has unclean config")) + } + + // Patch the PVE config to delete the volume from the config debug!("Patching node {} config to delete volume {} ({})", node.hostname, params.name, vol.mountpoint_identifier.as_ref().unwrap()); let pve_node = svc.pve_node(&node.pve_host) .map_err(P5xError::ServiceError)?; - let mut pve_params = PutParams::default(); - pve_params.delete = vol.mountpoint_identifier; - let vm_id = VmId::new(i64::from(node.pve_id)).unwrap(); - let r = pve_node - .lxc() - .vmid(vm_id) - .config() - .put(pve_params) - .map_err(P5xError::PveError); + let mut pve_params = PutParams::default(); + pve_params.delete = vol.mountpoint_identifier.clone(); + + svc.op_wrap_f( + svc.op_start(Op::DeleteVolume, &format!("Vol {} ({}) on {}", vol.name, vol.mountpoint_identifier.as_ref().unwrap(), vm_id)).await?, + "Deleted volume", + async || { + let r = pve_node + .lxc() + .vmid(vm_id) + .config() + .put(pve_params) + .map_err(P5xError::PveError); + + // post-condition: the API request applied successfully + if let Err(P5xError::PveError(proxmox_api::UreqError::Ureq(ureq::Error::Status(status, response)))) = r { + let json_str = response.into_string().unwrap(); + error!("Error response from PVE API (status: {status}): {json_str}"); + } + + // post-condition: the node's config was patched cleanly + let new_conf = node.config(svc)?; + if new_conf.has_pending() { + // revert the config change so we don't lose track of the volume + node.write_config(svc, &old_conf)?; + return Err(P5xError::BadPostcondition("Could not delete volume: applying patch resulted in an unclean LXC config")) + } + + Ok(()) + }, + ).await?; - if let Err(P5xError::PveError(proxmox_api::UreqError::Ureq(ureq::Error::Status(status, response)))) = r { - let json_str = response.into_string().unwrap(); - error!("Error response from PVE API (status: {status}): {json_str}"); - } debug!("Successfully patched node {} config to delete volume {}", node.hostname, params.name); // Persist the volume @@ -439,12 +556,27 @@ async fn transfer_directly( return Err(P5xError::BadPrecondition("Cannot transfer volume: nodes reside on different physical hosts")); } - // Figure out the mountpoint identifier on the new node - let _lock = to_node.lock(svc, Some("Receiving volume transfer")) + // Lock both nodes' configs + let _from_lock = from_node.lock(svc, Some("Sending volume transfer")) .await.map_err(P5xError::LockErr)?; - let config = to_node.config(svc)?; - let mountpoint_identifier = config.next_nth("unused"); + let _to_lock = to_node.lock(svc, Some("Receiving volume transfer")) + .await.map_err(P5xError::LockErr)?; + + // pre-condition: to_node's config is clean + let old_to_config = to_node.config(svc)?; + if old_to_config.has_pending() { + return Err(P5xError::BadPrecondition("Cannot transfer volume: to-node has unclean LXC config")) + } + + // pre-condition: from_node's config is clean + let old_from_config = from_node.config(svc)?; + if old_from_config.has_pending() { + return Err(P5xError::BadPrecondition("Cannot transfer volume: from-node has unclean LXC config")) + } + + // Figure out the mountpoint identifier on the new node + let mountpoint_identifier = old_to_config.next_nth("unused"); let mountpoint_identifier = format!("unused{mountpoint_identifier}"); // Ask the PVE API to move the volume to the new node @@ -455,26 +587,45 @@ async fn transfer_directly( post_params.target_volume = Some(pve_to_vol); post_params.target_vmid = Some(to_node.vm_id()); - let res = svc.pve_node(&from_node.pve_host) - .map_err(P5xError::ServiceError)? - .lxc() - .vmid(from_node.vm_id()) - .move_volume() - .post(post_params); + let mount = svc.op_wrap_f( + svc.op_start(Op::TransferVolume, &format!("Vol {} from {} to {}", vol.name, from_node.pve_id, to_node.pve_id)).await?, + "Transferred volume", + async || { + let res = svc.pve_node(&from_node.pve_host) + .map_err(P5xError::ServiceError)? + .lxc() + .vmid(from_node.vm_id()) + .move_volume() + .post(post_params); - // This is necessary because POST returns {data: null} on success, - // which the UreqClient flags as an unknown error. - if let Err(UreqError::EncounteredErrors(e)) = res { - return Err(P5xError::PveError(UreqError::EncounteredErrors(e))); - } + // This is necessary because POST returns {data: null} on success, + // which the UreqClient flags as an unknown error. + if let Err(UreqError::EncounteredErrors(e)) = res { + return Err(P5xError::PveError(UreqError::EncounteredErrors(e))); + } - // Wait for the volume to transfer, since we don't get back a UPID - sleep(Duration::from_secs(5)).await; + // Wait for the volume to transfer, since we don't get back a UPID + sleep(Duration::from_secs(10)).await; - // Verify that the volume appears in the new node's config - let config = to_node.config(svc)?; - let mount = config.get(&mountpoint_identifier) - .ok_or(P5xError::BadPostcondition("Could not find mountpoint config after transferring volume"))?; + // post-condition: from_node's config is clean + let new_from_config = from_node.config(svc)?; + if new_from_config.has_pending() { + // todo: don't have a great way to revert this currently w/o a ton of work + return Err(P5xError::BadPostcondition("Could not transfer volume: from-node had unclean LXC config after transfer")) + } + + // post-condition: to_node's config is clean + let new_to_config = to_node.config(svc)?; + if new_to_config.has_pending() { + // todo: don't have a great way to revert this currently w/o a ton of work + return Err(P5xError::BadPostcondition("Could not transfer volume: to-node had unclean LXC config after transfer")) + } + + // Verify that the volume appears in the new node's config + new_to_config.get(&mountpoint_identifier) + .ok_or(P5xError::BadPostcondition("Could not find mountpoint config after transferring volume")) + }, + ).await?; // Parse the disk name from the config // synology-scsi-lun:vm-103-disk-1,mp=/mnt/p5x-test0,backup=1,size=1G diff --git a/src/api/db/migrations/m20250415_000001_create_oplog_table.rs b/src/api/db/migrations/m20250415_000001_create_oplog_table.rs new file mode 100644 index 0000000..1b4f25c --- /dev/null +++ b/src/api/db/migrations/m20250415_000001_create_oplog_table.rs @@ -0,0 +1,46 @@ +use async_trait::async_trait; +use sea_orm_migration::{prelude::*, schema::*}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Oplogs::Table) + .if_not_exists() + .col(pk_auto(Oplogs::Id)) + .col(string(Oplogs::Op)) + .col(text(Oplogs::OpDesc)) + .col(timestamp(Oplogs::OpAt)) + .col(boolean_null(Oplogs::Result)) + .col(text_null(Oplogs::ResultDesc)) + .col(timestamp_null(Oplogs::ResultAt)) + .to_owned()) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table( + Table::drop() + .table(Oplogs::Table) + .to_owned()) + .await + } +} + +#[derive(DeriveIden)] +enum Oplogs { + Table, + Id, + Op, + OpDesc, + OpAt, + Result, + ResultDesc, + ResultAt, +} diff --git a/src/api/db/migrations/mod.rs b/src/api/db/migrations/mod.rs index 4866926..34bbc2a 100644 --- a/src/api/db/migrations/mod.rs +++ b/src/api/db/migrations/mod.rs @@ -8,6 +8,7 @@ use crate::api::Db; mod m20241102_000001_create_nodes_table; mod m20241102_000002_create_locks_table; mod m20241103_000001_create_volumes_table; +mod m20250415_000001_create_oplog_table; pub struct Migrator; @@ -18,6 +19,7 @@ impl MigratorTrait for Migrator { Box::new(m20241102_000001_create_nodes_table::Migration), Box::new(m20241102_000002_create_locks_table::Migration), Box::new(m20241103_000001_create_volumes_table::Migration), + Box::new(m20250415_000001_create_oplog_table::Migration), ] } } diff --git a/src/api/entity/mod.rs b/src/api/entity/mod.rs index 0c2ce0d..2143140 100644 --- a/src/api/entity/mod.rs +++ b/src/api/entity/mod.rs @@ -1,3 +1,4 @@ pub mod locks; pub mod nodes; pub mod volumes; +pub mod oplogs; diff --git a/src/api/entity/nodes.rs b/src/api/entity/nodes.rs index 66632f8..f0b82e5 100644 --- a/src/api/entity/nodes.rs +++ b/src/api/entity/nodes.rs @@ -9,6 +9,7 @@ use sea_orm::QueryOrder; use serde::{Deserialize, Serialize}; use tokio::time::sleep; use proxmox_api; +use proxmox_api::nodes::node::lxc::vmid::status::current::Status; use log::{warn, debug}; use proxmox_api::types::VmId; use crate::api::entity::locks; @@ -136,6 +137,33 @@ impl PveConfig { Ok(PveConfig::new(&contents)) } + pub fn write(&self, svc: &Services<'_>, pve_host: &str, pve_id: i32) -> Result<(), P5xError> { + let conf = self.to_string(); + let conf = conf.as_bytes(); + let pve_ssh = svc.pve_ssh(pve_host) + .map_err(P5xError::ServiceError)?; + + // Write the file + let sftp = pve_ssh.sftp() + .map_err(SshError::ClientError) + .map_err(P5xError::SshError)?; + + let path = format!("/etc/pve/lxc/{}.conf", pve_id); + let path = Path::new(&path); + + debug!("Attempting to open file to mutate config {} on {}", path.display(), pve_host); + let mut f = sftp.open_mode(path, OpenFlags::WRITE, 0o640, OpenType::File) + .map_err(SshError::ClientError) + .map_err(P5xError::SshError)?; + + debug!("Attempting to write config to {} on {}", path.display(), pve_host); + f.write_all(conf) + .map_err(SshError::IoError) + .map_err(P5xError::SshError)?; + + Ok(()) + } + /** Replace a line in the config file. */ pub fn replace( &mut self, @@ -214,6 +242,18 @@ impl PveConfig { 0 } + + /** + * Returns true if the loaded config file has any pending operations. + * This is generally regarded as "unclean" in P5x parlance, and is considered + * a failed pre-condition for applying any operations and a failed post-condition + * _after_ applying an operation. + */ + pub fn has_pending(&self) -> bool { + self.lines + .iter() + .any(|line| line.starts_with("[pve:pending]")) + } } impl Display for PveConfig { @@ -303,30 +343,7 @@ impl Model { /** Replace the LXC container config for this node. */ pub fn write_config(&self, svc: &Services, conf: &PveConfig) -> Result<(), P5xError> { - let conf = conf.to_string(); - let conf = conf.as_bytes(); - let pve_ssh = svc.pve_ssh(&self.pve_host) - .map_err(P5xError::ServiceError)?; - - // Write the file - let sftp = pve_ssh.sftp() - .map_err(SshError::ClientError) - .map_err(P5xError::SshError)?; - - let path = format!("/etc/pve/lxc/{}.conf", self.pve_id); - let path = Path::new(&path); - - debug!("Attempting to open file to mutate config {} on {}", path.display(), self.pve_host); - let mut f = sftp.open_mode(path, OpenFlags::WRITE, 0o640, OpenType::File) - .map_err(SshError::ClientError) - .map_err(P5xError::SshError)?; - - debug!("Attempting to write config to {} on {}", path.display(), self.pve_host); - f.write_all(conf) - .map_err(SshError::IoError) - .map_err(P5xError::SshError)?; - - Ok(()) + conf.write(svc, &self.pve_host, self.pve_id) } /** Try to acquire the node lock for this LXC container. Used for any operations that impact the config. */ @@ -364,6 +381,21 @@ impl Model { self.write_config(svc, &config) } + /** Returns true if the LXC container is STOPPED in PVE. */ + pub fn is_offline(&self, svc: &Services<'_>) -> Result { + let stat = svc.pve_node(&self.pve_host) + .map_err(P5xError::ServiceError)? + .lxc() + .vmid(self.vm_id()) + .status() + .current() + .get() + .map_err(P5xError::PveError)? + .status; + + Ok(stat == Status::Stopped) + } + /** Run an SSH command on the node and return the output of the command, with whitespace trimmed. */ pub fn ssh_run_trimmed(&self, svc: &Services, cmd: &str) -> Result { let node_ssh = self.ssh(svc)?; diff --git a/src/api/entity/oplogs.rs b/src/api/entity/oplogs.rs new file mode 100644 index 0000000..5d1c967 --- /dev/null +++ b/src/api/entity/oplogs.rs @@ -0,0 +1,55 @@ +use sea_orm::entity::prelude::*; +use sea_orm::sqlx::types::time::OffsetDateTime; +use serde::{Deserialize, Serialize}; + +pub enum Op { + CreateLXC, + ProvisionCarrier, + TerminateCarrier, + MigrateNode, + CreateVolumeUnmanaged, + MountVolume, + UnmountVolume, + RewriteConfigUnmount, + DeleteVolume, + TransferVolume, +} + +impl Op { + pub fn as_str(&self) -> &'static str { + match self { + Op::CreateLXC => "create-lxc", + Op::ProvisionCarrier => "provision-carrier", + Op::TerminateCarrier => "terminate-carrier", + Op::MigrateNode => "migrate-node", + Op::CreateVolumeUnmanaged => "create-volume-unmanaged", + Op::MountVolume => "mount-volume", + Op::UnmountVolume => "unmount-volume", + Op::RewriteConfigUnmount => "rewrite-config-unmount", + Op::DeleteVolume => "delete-volume", + Op::TransferVolume => "transfer-volume", + } + } + + pub fn to_string(&self) -> String { + self.as_str().to_string() + } +} + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize)] +#[sea_orm(table_name = "oplogs")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i64, + pub op: String, + pub op_desc: String, + pub op_at: OffsetDateTime, + pub result: Option, + pub result_desc: Option, + pub result_at: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/api/services.rs b/src/api/services.rs index 8a33bd1..39963c0 100644 --- a/src/api/services.rs +++ b/src/api/services.rs @@ -1,11 +1,16 @@ +use std::future::Future; use std::io::Read; use std::net::TcpStream; use proxmox_api::nodes::node::NodeClient; use proxmox_api::UreqClient; -use sea_orm::{DatabaseConnection, DbErr}; +use sea_orm::*; +use sea_orm::ActiveValue::Set; +use sea_orm::sqlx::types::time::OffsetDateTime; use ssh2::Session; use crate::api::entity::nodes::P5xError; use crate::api::util::{read_p5x_config, P5xConfig}; +use crate::api::entity::oplogs; +use crate::api::entity::oplogs::Op; #[derive(Debug)] pub enum SshError { @@ -108,6 +113,90 @@ impl<'a> Services<'a> { Ok(sess) } + + pub async fn op_unwrap(&self, op: i64, val: Result) -> Result + where + Err: std::fmt::Debug, + P5xError: From, + { + match val { + Ok(v) => Ok(v), + Err(e) => { + self.op_fail(op, &format!("{:?}", e)).await?; + Err(P5xError::from(e)) + } + } + } + + pub async fn op_unwrap_f(&self, op: i64, val: FutureVal) -> Result + where + Err: std::fmt::Debug, + P5xError: From, + FutureVal: Future>, + { + self.op_unwrap(op, val.await).await + } + + pub async fn op_wrap_f(&self, op: i64, success: &str, cb: Closure) -> Result + where + FutureClosure: Future>, + Closure: FnOnce() -> FutureClosure, + { + match cb().await { + Ok(v) => { + self.op_finish(op, success).await?; + Ok(v) + }, + Err(e) => { + self.op_fail(op, &format!("{:?}", e)).await?; + Err(e) + } + } + } + + pub async fn op_start(&self, op: Op, desc: &str) -> Result { + let res = oplogs::ActiveModel { + op: Set(op.to_string()), + op_desc: Set(desc.to_string()), + op_at: Set(OffsetDateTime::now_utc()), + ..Default::default() + } + .save(self.db) + .await + .map_err(P5xError::DbErr)? + .try_into_model() + .map_err(P5xError::DbErr)?; + + Ok(res.id) + } + + pub async fn op_finish(&self, id: i64, desc: &str) -> Result<(), P5xError> { + self.op_result(true, id, desc).await + } + + pub async fn op_fail(&self, id: i64, desc: &str) -> Result<(), P5xError> { + self.op_result(false, id, desc).await + } + + async fn op_result(&self, result: bool, id: i64, desc: &str) -> Result<(), P5xError> { + let mut op = self.resolve_op(id).await?.into_active_model(); + op.result = Set(Some(result)); + op.result_desc = Set(Some(desc.to_string())); + op.result_at = Set(Some(OffsetDateTime::now_utc())); + op.save(self.db) + .await + .map_err(P5xError::DbErr)?; + + Ok(()) + } + + async fn resolve_op(&self, id: i64) -> Result { + oplogs::Entity::find_by_id(id) + .one(self.db) + .await + .map_err(P5xError::DbErr)? + .ok_or(P5xError::BadPrecondition("Could not resolve Oplog model instance")) + } } diff --git a/src/main.rs b/src/main.rs index 930fdd2..0e45c77 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use rocket::{Build, Rocket}; use log::{error, info}; use std::{env, process}; use sea_orm::Database; -use crate::api::cluster::system::{ensure_ssh_keypair, ensure_system_disk}; +use crate::api::cluster::system::{ensure_ssh_keypair, ensure_system_disk, migrate_system_disk_if_necessary}; use crate::api::services::Services; use crate::api::util::read_p5x_config; @@ -22,7 +22,7 @@ async fn main() { let args: Vec = env::args().collect(); if args.len() < 2 { - error!(target: "p5x", "Missing required argument. Valid modes: api-server,ensure-system-disk"); + error!(target: "p5x", "Missing required argument. Valid modes: api-server,ensure-system-disk,migrate-system-disk"); process::exit(1); } @@ -34,6 +34,13 @@ async fn main() { return; } + if mode == "migrate-system-disk" { + let anon_db = Database::connect("sqlite::memory:").await.unwrap(); + let svc = Services::build(&anon_db).await.unwrap(); // fixme: this is going to fail because of the SSH keys + migrate_system_disk_if_necessary(&svc).await.unwrap(); + return; + } + ensure_ssh_keypair().expect("Could not ensure SSH keypair exists."); let config = read_p5x_config(); // Do this so we early-fail if there are missing env vars