[WIP] (1) Improve pre- and post-condition checks for PVE operations; (2) implement operation logging to SQL along with result; (3) implement "migrate-system-disk" early mode
This commit is contained in:
parent
cc7700346e
commit
8a526de004
@ -8,6 +8,7 @@ use crate::api::cluster::node::wait_upid;
|
||||
use crate::api::entity::locks::lock_vmid;
|
||||
use crate::api::entity::{locks, nodes};
|
||||
use crate::api::entity::nodes::P5xError;
|
||||
use crate::api::entity::oplogs::Op;
|
||||
use crate::api::services::{ssh_run_trimmed, Services};
|
||||
use crate::api::services::SshError;
|
||||
|
||||
@ -18,13 +19,55 @@ pub async fn provision_carrier(
|
||||
from_node: &nodes::Model,
|
||||
) -> Result<nodes::Model, P5xError> {
|
||||
// Make sure the empty filesystem template exists
|
||||
ensure_carrier_template(svc, from_node).await?;
|
||||
ensure_carrier_template(svc, &from_node.pve_host).await?;
|
||||
|
||||
// Get the next VMID to be used by the carrier container
|
||||
let _lock = lock_vmid(svc.db, Some("Provisioning carrier container"))
|
||||
.await.map_err(P5xError::DbErr)?;
|
||||
let pve_client = svc.pve()
|
||||
.map_err(P5xError::ServiceError)?;
|
||||
|
||||
let (hostname, vm_id) = svc.op_wrap_f(
|
||||
svc.op_start(Op::ProvisionCarrier, &format!("Host {}", &from_node.pve_host)).await?,
|
||||
"Provisioned carrier",
|
||||
async || {
|
||||
provision_carrier_unmanaged(svc, &from_node.pve_host).await
|
||||
},
|
||||
).await?;
|
||||
|
||||
// Create a new node instance
|
||||
let node = nodes::ActiveModel {
|
||||
hostname: Set(hostname),
|
||||
pve_id: Set(i32::try_from(vm_id.get()).unwrap()),
|
||||
pve_host: Set(from_node.pve_host.to_string()),
|
||||
assigned_ip: Set("0.0.0.0".to_string()),
|
||||
assigned_subnet: Set(0),
|
||||
is_permanent: Set(false),
|
||||
..Default::default()
|
||||
}
|
||||
.save(svc.db)
|
||||
.await
|
||||
.and_then(|v| v.try_into_model())
|
||||
.map_err(P5xError::DbErr)?;
|
||||
|
||||
// Create a lock instance for the new node
|
||||
locks::ActiveModel {
|
||||
lock_type: Set("nodes".to_string()),
|
||||
lock_resource: Set(node.id.to_string()),
|
||||
..Default::default()
|
||||
}
|
||||
.save(svc.db)
|
||||
.await
|
||||
.map_err(P5xError::DbErr)?;
|
||||
|
||||
Ok(node)
|
||||
}
|
||||
|
||||
|
||||
/** Like provision_carrier, but without any of the SQL bookkeeping, locking, or safety checks. */
|
||||
pub async fn provision_carrier_unmanaged(
|
||||
svc: &Services<'_>,
|
||||
pve_host: &str,
|
||||
) -> Result<(String, VmId), P5xError> {
|
||||
let pve_client = svc.pve().map_err(P5xError::ServiceError)?;
|
||||
let pve = proxmox_api::cluster::ClusterClient::new(pve_client);
|
||||
|
||||
let vm_id = pve.nextid()
|
||||
@ -46,51 +89,24 @@ pub async fn provision_carrier(
|
||||
params.tags = Some("p5x".to_string());
|
||||
|
||||
// Ask the PVE API to start creating the carrier node based on our empty template
|
||||
let upid = svc.pve_node(&from_node.pve_host)
|
||||
let upid = svc.pve_node(&pve_host)
|
||||
.map_err(P5xError::ServiceError)?
|
||||
.lxc()
|
||||
.post(params)
|
||||
.map_err(P5xError::PveError)?;
|
||||
|
||||
// Wait for the container creation task to finish
|
||||
wait_upid(svc, &from_node.pve_host, &upid).await?;
|
||||
wait_upid(svc, &pve_host, &upid).await?;
|
||||
|
||||
// Create a new node instance
|
||||
let node = nodes::ActiveModel {
|
||||
hostname: Set(hostname),
|
||||
pve_id: Set(i32::try_from(vm_id.get()).unwrap()),
|
||||
pve_host: Set(from_node.pve_host.to_string()),
|
||||
assigned_ip: Set("0.0.0.0".to_string()),
|
||||
assigned_subnet: Set(0),
|
||||
is_permanent: Set(false),
|
||||
..Default::default()
|
||||
}
|
||||
.save(svc.db)
|
||||
.await
|
||||
.map_err(P5xError::DbErr)?;
|
||||
|
||||
let node = node.try_into_model().map_err(P5xError::DbErr)?;
|
||||
|
||||
// Create a lock instance for the new node
|
||||
locks::ActiveModel {
|
||||
lock_type: Set("nodes".to_string()),
|
||||
lock_resource: Set(node.id.to_string()),
|
||||
..Default::default()
|
||||
}
|
||||
.save(svc.db)
|
||||
.await
|
||||
.map_err(P5xError::DbErr)?;
|
||||
|
||||
Ok(node)
|
||||
Ok((hostname, vm_id))
|
||||
}
|
||||
|
||||
|
||||
/** Make sure the tarball LXC template for the carrier container exists on the given node. */
|
||||
pub async fn ensure_carrier_template(
|
||||
async fn ensure_carrier_template(
|
||||
svc: &Services<'_>,
|
||||
node: &nodes::Model,
|
||||
pve_host: &str,
|
||||
) -> Result<(), P5xError> {
|
||||
let pve_ssh = svc.pve_ssh(&node.pve_host)
|
||||
let pve_ssh = svc.pve_ssh(pve_host)
|
||||
.map_err(P5xError::ServiceError)?;
|
||||
|
||||
// Use SFTP to check whether the file exists
|
||||
@ -120,31 +136,58 @@ pub async fn ensure_carrier_template(
|
||||
pub async fn terminate_carrier(
|
||||
svc: &Services<'_>,
|
||||
carrier: nodes::Model,
|
||||
) -> Result<(), P5xError> {
|
||||
// pre-condition: the carrier is offline
|
||||
if !carrier.is_offline(svc)? {
|
||||
return Err(P5xError::BadPrecondition("Cannot terminate carrier instance: carrier is still running"));
|
||||
}
|
||||
|
||||
// pre-condition: the carrier has a clean disk config
|
||||
if carrier.config(svc)?.has_pending() {
|
||||
return Err(P5xError::BadPrecondition("Cannot terminate carrier instance: carrier has unclean PVE config"))
|
||||
}
|
||||
|
||||
svc.op_wrap_f(
|
||||
svc.op_start(Op::TerminateCarrier, &format!("Carrier {} ({})", carrier.vm_id(), &carrier.pve_host)).await?,
|
||||
"Terminated carrier",
|
||||
async || {
|
||||
terminate_carrier_unmanaged(svc, &carrier.pve_host, carrier.vm_id()).await?;
|
||||
|
||||
nodes::Entity::delete_by_id(carrier.id)
|
||||
.exec(svc.db)
|
||||
.await
|
||||
.map_err(P5xError::DbErr)?;
|
||||
|
||||
locks::Entity::delete_many()
|
||||
.filter(locks::Column::LockType.eq("nodes"))
|
||||
.filter(locks::Column::LockResource.eq(carrier.id.to_string()))
|
||||
.exec(svc.db)
|
||||
.await
|
||||
.map_err(P5xError::DbErr)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
).await
|
||||
}
|
||||
|
||||
|
||||
/** Like terminate_carrier, but without any of the SQL bookkeeping, locking, or safety checks. */
|
||||
pub async fn terminate_carrier_unmanaged(
|
||||
svc: &Services<'_>,
|
||||
pve_host: &str,
|
||||
vm_id: VmId,
|
||||
) -> Result<(), P5xError> {
|
||||
let mut params = vmid::DeleteParams::default();
|
||||
params.purge = Some(true);
|
||||
params.destroy_unreferenced_disks = Some(true);
|
||||
|
||||
let upid = svc.pve_node(&carrier.pve_host)
|
||||
let upid = svc.pve_node(pve_host)
|
||||
.map_err(P5xError::ServiceError)?
|
||||
.lxc()
|
||||
.vmid(carrier.vm_id())
|
||||
.vmid(vm_id)
|
||||
.delete(params)
|
||||
.map_err(P5xError::PveError)?;
|
||||
|
||||
wait_upid(svc, &carrier.pve_host, &upid).await?;
|
||||
|
||||
nodes::Entity::delete_by_id(carrier.id)
|
||||
.exec(svc.db)
|
||||
.await
|
||||
.map_err(P5xError::DbErr)?;
|
||||
|
||||
locks::Entity::delete_many()
|
||||
.filter(locks::Column::LockType.eq("nodes"))
|
||||
.filter(locks::Column::LockResource.eq(carrier.id.to_string()))
|
||||
.exec(svc.db)
|
||||
.await
|
||||
.map_err(P5xError::DbErr)?;
|
||||
|
||||
Ok(())
|
||||
// post-condition: the carrier was successfully deleted
|
||||
wait_upid(svc, pve_host, &upid).await
|
||||
}
|
||||
|
@ -4,8 +4,10 @@ use proxmox_api::nodes::node::tasks::upid;
|
||||
use sea_orm::*;
|
||||
use tokio::time::sleep;
|
||||
use log::{info};
|
||||
use proxmox_api::types::VmId;
|
||||
use crate::api::entity::{locks, nodes};
|
||||
use crate::api::entity::nodes::{NodeParams, P5xError};
|
||||
use crate::api::entity::oplogs::Op;
|
||||
use crate::api::services::Services;
|
||||
|
||||
|
||||
@ -100,18 +102,23 @@ pub async fn migrate_node(
|
||||
node: nodes::Model,
|
||||
to_host: &str,
|
||||
) -> Result<nodes::Model, P5xError> {
|
||||
// Ask the PVE API to start migrating the node
|
||||
let params = migrate::PostParams::new(to_host.to_string());
|
||||
let upid = svc.pve_node(&node.pve_host)
|
||||
.map_err(P5xError::ServiceError)?
|
||||
.lxc()
|
||||
.vmid(node.vm_id())
|
||||
.migrate()
|
||||
.post(params)
|
||||
.map_err(P5xError::PveError)?;
|
||||
// pre-condition: the node is offline
|
||||
if !node.is_offline(svc)? {
|
||||
return Err(P5xError::BadPrecondition("Cannot migrate node: node is not offline"))
|
||||
}
|
||||
|
||||
// Wait for the UPID to finish
|
||||
wait_upid(svc, &node.pve_host, &upid).await?;
|
||||
// pre-condition: the LXC container has a clean disk config
|
||||
if node.config(svc)?.has_pending() {
|
||||
return Err(P5xError::BadPrecondition("Cannot migrate node: node has unclean PVE config"))
|
||||
}
|
||||
|
||||
svc.op_wrap_f(
|
||||
svc.op_start(Op::MigrateNode, &format!("Migrate node {} ({} to {})", node.vm_id(), &node.pve_host, to_host)).await?,
|
||||
"Migrated node",
|
||||
async || {
|
||||
migrate_node_unmanaged(svc, &node.pve_host, node.pve_id, to_host).await
|
||||
}
|
||||
).await?;
|
||||
|
||||
// Persist the node
|
||||
let mut node = node.into_active_model();
|
||||
@ -126,6 +133,28 @@ pub async fn migrate_node(
|
||||
}
|
||||
|
||||
|
||||
/** Like migrate_node, but without any of the SQL bookkeeping, locking, or safety checks. */
|
||||
pub async fn migrate_node_unmanaged(
|
||||
svc: &Services<'_>,
|
||||
from_pve_host: &str,
|
||||
from_pve_id: i32,
|
||||
to_host: &str,
|
||||
) -> Result<(), P5xError> {
|
||||
// Ask the PVE API to start migrating the node
|
||||
let params = migrate::PostParams::new(to_host.to_string());
|
||||
let upid = svc.pve_node(from_pve_host)
|
||||
.map_err(P5xError::ServiceError)?
|
||||
.lxc()
|
||||
.vmid(VmId::new(i64::from(from_pve_id)).unwrap())
|
||||
.migrate()
|
||||
.post(params)
|
||||
.map_err(P5xError::PveError)?;
|
||||
|
||||
// post-condition: the LXC container migrated successfully
|
||||
wait_upid(svc, from_pve_host, &upid).await
|
||||
}
|
||||
|
||||
|
||||
/** Wait for a PVE task to complete using its UPID */
|
||||
pub async fn wait_upid(svc: &Services<'_>, node: &str, upid: &str) -> Result<(), P5xError> {
|
||||
info!("Waiting for UPID {upid} on node {node}");
|
||||
|
@ -3,15 +3,22 @@ use std::fs::{File, Permissions};
|
||||
use std::io::Write;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use kube::{Api, Client};
|
||||
use log::{info, debug};
|
||||
use rand::rngs::OsRng;
|
||||
use ssh_key::{PrivateKey, Algorithm, LineEnding};
|
||||
use k8s_openapi::api::core::v1::{ConfigMap, Node, Pod};
|
||||
use kube::api::{Patch, PatchParams};
|
||||
use proxmox_api::nodes::node::lxc::vmid::move_volume;
|
||||
use proxmox_api::types::VmId;
|
||||
use proxmox_api::UreqError;
|
||||
use serde_json::json;
|
||||
use tokio::time::sleep;
|
||||
use crate::api::cluster::carrier::{provision_carrier_unmanaged, terminate_carrier_unmanaged};
|
||||
use crate::api::cluster::node::migrate_node_unmanaged;
|
||||
use crate::api::cluster::volume::create_volume_unmanaged;
|
||||
use crate::api::entity::nodes::P5xError;
|
||||
use crate::api::entity::nodes::{P5xError, PveConfig};
|
||||
use crate::api::services::Services;
|
||||
|
||||
pub async fn ensure_system_disk(svc: &Services<'_>) -> Result<(), P5xError> {
|
||||
@ -69,6 +76,7 @@ pub async fn ensure_system_disk(svc: &Services<'_>) -> Result<(), P5xError> {
|
||||
|
||||
// Add it to the dynamic-kv config and save
|
||||
data.insert("api-pve-host".to_string(), pve_host.to_string());
|
||||
data.insert("api-pve-id".to_string(), pve_id.to_string());
|
||||
data.insert("api-pve-disk".to_string(), disk_name);
|
||||
let patch = json!({
|
||||
"data": data
|
||||
@ -83,6 +91,53 @@ pub async fn ensure_system_disk(svc: &Services<'_>) -> Result<(), P5xError> {
|
||||
}
|
||||
|
||||
|
||||
pub async fn migrate_system_disk_if_necessary(svc: &Services<'_>) -> Result<(), P5xError> {
|
||||
// Load the dynamic-kv and get the current host/mount
|
||||
let client = Client::try_default().await.map_err(P5xError::KubeError)?;
|
||||
let namespace = fs::read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
|
||||
.unwrap_or_else(|_| "p5x-system".to_string());
|
||||
|
||||
let maps: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
|
||||
let map = maps.get("dynamic-kv").await.map_err(P5xError::KubeError)?;
|
||||
|
||||
let data = map.data.unwrap_or_default();
|
||||
let current_host = data.get("api-pve-host").expect("Could not find api-pve-host in dynamic-kv config");
|
||||
let current_mount = data.get("api-pve-disk").expect("Could not find api-pve-disk in dynamic-kv config");
|
||||
let current_pve_id: i32 = data.get("api-pve-id").expect("Could not find api-pve-id in dynamic-kv config").parse().unwrap();
|
||||
|
||||
// Load the labels for this pod's node
|
||||
let pod_name = env::var("POD_NAME").expect("Could not determine POD_NAME from environment!");
|
||||
let pods: Api<Pod> = Api::namespaced(client.clone(), &namespace);
|
||||
let pod = pods.get(&pod_name).await.map_err(P5xError::KubeError)?;
|
||||
|
||||
let node_name = pod.spec.and_then(|spec| spec.node_name).expect("Could not determine the Node name for pod!");
|
||||
|
||||
let nodes: Api<Node> = Api::all(client);
|
||||
let node = nodes.get(&node_name).await.map_err(P5xError::KubeError)?;
|
||||
let labels = node.metadata.labels.expect("Could not load labels for node");
|
||||
|
||||
let pve_host = labels.get("p5x.garrettmills.dev/pve-host")
|
||||
.expect("Node is missing required label: p5x.garrettmills.dev/pve-host");
|
||||
|
||||
let pve_id: i32 = labels.get("p5x.garrettmills.dev/pve-id")
|
||||
.expect("Node is missing required label: p5x.garrettmills.dev/pve-id")
|
||||
.parse()
|
||||
.unwrap();
|
||||
|
||||
// If the disk is already on our LXC container (K8s node), we're done
|
||||
if pve_id == current_pve_id {
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
// Otherwise, we need to migrate the disk to this node
|
||||
// FIXME: Consideration: What if the disk is still mounted somewhere?
|
||||
transfer_unmanaged(
|
||||
svc, current_mount, current_host, current_pve_id, pve_host, pve_id).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
/** Check if the SSH pubkey/privkey exist at the configured paths. If not, generate them. */
|
||||
pub fn ensure_ssh_keypair() -> Result<(), ssh_key::Error> {
|
||||
let pubkey_path = env::var("P5X_SSH_PUBKEY_PATH").expect("Missing env: P5X_SSH_PUBKEY_PATH");
|
||||
@ -115,3 +170,111 @@ pub fn ensure_ssh_keypair() -> Result<(), ssh_key::Error> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
// todo: these are very similar to transfer() and transfer_directly(), but there was too much
|
||||
// sql bookkeeping in those to generalize effectively.
|
||||
|
||||
/** Migrate a volume from its current LXC container to the specified LXC container. */
|
||||
async fn transfer_unmanaged(
|
||||
svc: &Services<'_>,
|
||||
old_mountpoint_identifier: &str,
|
||||
from_pve_host: &str,
|
||||
from_pve_id: i32,
|
||||
to_pve_host: &str,
|
||||
to_pve_id: i32,
|
||||
) -> Result<String, P5xError> {
|
||||
// If from_node and to_node are on the same physical host, transfer directly
|
||||
if from_pve_host == to_pve_host {
|
||||
return transfer_directly_unmanaged(
|
||||
svc, old_mountpoint_identifier, from_pve_host, from_pve_id, to_pve_host, to_pve_id).await;
|
||||
}
|
||||
|
||||
// If the nodes are on different physical hosts, we need to create a temporary
|
||||
// container on shared storage to attach the volume to. We'll then migrate that
|
||||
// container to the target physical host.
|
||||
let (hostname, vm_id) = provision_carrier_unmanaged(svc, from_pve_host).await?;
|
||||
let carrier_mountpoint = transfer_directly_unmanaged(
|
||||
svc, old_mountpoint_identifier, from_pve_host, from_pve_id, &hostname, vm_id.get() as i32).await?;
|
||||
|
||||
// Migrate the carrier to the target host
|
||||
migrate_node_unmanaged(svc, from_pve_host, vm_id.get() as i32, to_pve_host).await?;
|
||||
|
||||
// Transfer the volume from the carrier to the target LXC container
|
||||
let new_mountpoint = transfer_directly_unmanaged(
|
||||
svc, &carrier_mountpoint, &hostname, vm_id.get() as i32, to_pve_host, to_pve_id).await?;
|
||||
|
||||
// Delete the carrier container
|
||||
terminate_carrier_unmanaged(svc, to_pve_host, vm_id).await?;
|
||||
|
||||
Ok(new_mountpoint)
|
||||
}
|
||||
|
||||
|
||||
async fn transfer_directly_unmanaged(
|
||||
svc: &Services<'_>,
|
||||
old_mountpoint_identifier: &str,
|
||||
from_pve_host: &str,
|
||||
from_pve_id: i32,
|
||||
to_pve_host: &str,
|
||||
to_pve_id: i32,
|
||||
) -> Result<String, P5xError> {
|
||||
// pre-condition: to_node's config is clean
|
||||
let old_to_config = PveConfig::load(svc, to_pve_host, to_pve_id)?;
|
||||
if old_to_config.has_pending() {
|
||||
return Err(P5xError::BadPrecondition("Cannot transfer volume: to-node has unclean LXC config"))
|
||||
}
|
||||
|
||||
// pre-condition: from_node's config is clean
|
||||
let old_from_config = PveConfig::load(svc, from_pve_host, from_pve_id)?;
|
||||
if old_from_config.has_pending() {
|
||||
return Err(P5xError::BadPrecondition("Cannot transfer volume: from-node has unclean LXC config"))
|
||||
}
|
||||
|
||||
// Figure out the mountpoint identifier on the new node
|
||||
let new_mountpoint_identifier = old_to_config.next_nth("unused");
|
||||
let new_mountpoint_identifier = format!("unused{new_mountpoint_identifier}");
|
||||
|
||||
// Ask the PVE API to move the volume to the new node
|
||||
let pve_from_vol = move_volume::Volume::try_from(old_mountpoint_identifier).unwrap();
|
||||
let pve_to_vol = move_volume::TargetVolume::try_from(new_mountpoint_identifier.as_str()).unwrap();
|
||||
let mut post_params = move_volume::PostParams::new(pve_from_vol);
|
||||
post_params.target_volume = Some(pve_to_vol);
|
||||
post_params.target_vmid = Some(VmId::new(i64::from(to_pve_id)).unwrap());
|
||||
|
||||
let res = svc.pve_node(from_pve_host)
|
||||
.map_err(P5xError::ServiceError)?
|
||||
.lxc()
|
||||
.vmid(VmId::new(i64::from(from_pve_id)).unwrap())
|
||||
.move_volume()
|
||||
.post(post_params);
|
||||
|
||||
// This is necessary because POST returns {data: null} on success,
|
||||
// which the UreqClient flags as an unknown error.
|
||||
if let Err(UreqError::EncounteredErrors(e)) = res {
|
||||
return Err(P5xError::PveError(UreqError::EncounteredErrors(e)));
|
||||
}
|
||||
|
||||
// Wait for the volume to transfer, since we don't get back a UPID
|
||||
sleep(Duration::from_secs(10)).await;
|
||||
|
||||
// post-condition: from_node's config is clean
|
||||
let new_from_config = PveConfig::load(svc, from_pve_host, from_pve_id)?;
|
||||
if new_from_config.has_pending() {
|
||||
// todo: don't have a great way to revert this currently w/o a ton of work
|
||||
return Err(P5xError::BadPostcondition("Could not transfer volume: from-node had unclean LXC config after transfer"))
|
||||
}
|
||||
|
||||
// post-condition: to_node's config is clean
|
||||
let new_to_config = PveConfig::load(svc, to_pve_host, to_pve_id)?;
|
||||
if new_to_config.has_pending() {
|
||||
// todo: don't have a great way to revert this currently w/o a ton of work
|
||||
return Err(P5xError::BadPostcondition("Could not transfer volume: to-node had unclean LXC config after transfer"))
|
||||
}
|
||||
|
||||
// Verify that the volume appears in the new node's config
|
||||
new_to_config.get(&new_mountpoint_identifier)
|
||||
.ok_or(P5xError::BadPostcondition("Could not find mountpoint config after transferring volume"))?;
|
||||
|
||||
Ok(new_mountpoint_identifier)
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ use crate::api::cluster::carrier::{provision_carrier, terminate_carrier};
|
||||
use crate::api::cluster::node::migrate_node;
|
||||
use crate::api::entity::nodes::{lock_first_available, P5xError, PveConfig};
|
||||
use crate::api::entity::{nodes, volumes};
|
||||
use crate::api::entity::oplogs::Op;
|
||||
use crate::api::services::{ssh_run_trimmed, Services};
|
||||
|
||||
|
||||
@ -54,9 +55,14 @@ pub async fn create_volume_unmanaged(
|
||||
pve_id: i32,
|
||||
name: &str,
|
||||
) -> Result<(String, u32), P5xError> {
|
||||
// pre-condition: the PVE host has a clean config state
|
||||
let old_conf = PveConfig::load(svc, pve_host, pve_id)?;
|
||||
if old_conf.has_pending() {
|
||||
return Err(P5xError::BadPrecondition("Cannot create volume: LXC container has unclean PVE config"))
|
||||
}
|
||||
|
||||
// Get the next available mountpoint ID
|
||||
let conf = PveConfig::load(svc, pve_host, pve_id)?;
|
||||
let mp_id = conf.next_nth("mp");
|
||||
let mp_id = old_conf.next_nth("mp");
|
||||
info!(target: "p5x", "Volume {name} will become mp{mp_id} on PVE {}", pve_id);
|
||||
|
||||
// Generate a new mountpoint entry for the node's config
|
||||
@ -66,33 +72,57 @@ pub async fn create_volume_unmanaged(
|
||||
debug!(target: "p5x", "Volume {name}: {line}");
|
||||
|
||||
let mut params = PutParams::default();
|
||||
params.mps.insert(mp_id, line);
|
||||
params.mps.insert(mp_id, line.clone());
|
||||
|
||||
// Update the node config to create the volume
|
||||
debug!(target: "p5x", "Patching PVE config for volume {name}");
|
||||
let vm_id = VmId::new(i64::from(pve_id)).unwrap();
|
||||
let res = svc.pve_node(pve_host)
|
||||
.map_err(P5xError::ServiceError)?
|
||||
.lxc()
|
||||
.vmid(vm_id)
|
||||
.config()
|
||||
.put(params);
|
||||
|
||||
// This is necessary because PUT returns {data: null} on success,
|
||||
// which the UreqClient flags as an unknown error.
|
||||
if let Err(UreqError::EncounteredErrors(e)) = res {
|
||||
return Err(P5xError::PveError(UreqError::EncounteredErrors(e)));
|
||||
}
|
||||
let mount = svc.op_wrap_f(
|
||||
svc.op_start(Op::CreateVolumeUnmanaged, &format!("Vol {name} on pve {pve_id} ({pve_host}) as {mp_id} -- {line}")).await?,
|
||||
"Created volume",
|
||||
async || {
|
||||
let res = svc.pve_node(pve_host)
|
||||
.map_err(P5xError::ServiceError)?
|
||||
.lxc()
|
||||
.vmid(vm_id)
|
||||
.config()
|
||||
.put(params);
|
||||
|
||||
// Stupid hack is stupid, but we don't get back a UPID to wait on the vol to be created
|
||||
info!(target: "p5x", "Successfully patched PVE config. Waiting for volume {name} to appear");
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
// post-condition: the config was applied successfully
|
||||
// This is necessary because PUT returns {data: null} on success,
|
||||
// which the UreqClient flags as an unknown error.
|
||||
if let Err(UreqError::EncounteredErrors(e)) = res {
|
||||
// Revert the config on error
|
||||
old_conf.write(svc, pve_host, pve_id)?;
|
||||
return Err(P5xError::PveError(UreqError::EncounteredErrors(e)));
|
||||
}
|
||||
|
||||
// Load the updated config
|
||||
debug!(target: "p5x", "Loading updated node config for volume {name}");
|
||||
let conf = PveConfig::load(svc, pve_host, pve_id)?;
|
||||
let mount = conf.get(&format!("mp{mp_id}"))
|
||||
.ok_or(P5xError::BadPrecondition("Could not find mountpoint in config after creating volume"))?;
|
||||
// Stupid hack is stupid, but we don't get back a UPID to wait on the vol to be created
|
||||
info!(target: "p5x", "Successfully patched PVE config. Waiting for volume {name} to appear");
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
|
||||
// post-condition: The new volume appears in the PVE config cleanly
|
||||
debug!(target: "p5x", "Loading updated node config for volume {name}");
|
||||
let new_conf = PveConfig::load(svc, pve_host, pve_id)?;
|
||||
if new_conf.has_pending() {
|
||||
// Revert the config on error
|
||||
old_conf.write(svc, pve_host, pve_id)?;
|
||||
return Err(P5xError::BadPostcondition("Could not create volume: applying config change caused an unclean config state"))
|
||||
}
|
||||
|
||||
let mount = new_conf.get(&format!("mp{mp_id}"));
|
||||
if let None = mount {
|
||||
// Revert the config on error
|
||||
old_conf.write(svc, pve_host, pve_id)?;
|
||||
return Err(P5xError::BadPrecondition("Could not find mountpoint in config after creating volume"))
|
||||
}
|
||||
|
||||
Ok(mount)
|
||||
}
|
||||
).await?;
|
||||
|
||||
let mount = mount.unwrap();
|
||||
debug!(target: "p5x", "Found mountpoint details for volume {name}: {mount}");
|
||||
|
||||
// Parse the disk name from the config
|
||||
@ -185,33 +215,54 @@ pub async fn mount(
|
||||
let reason = format!("Mounting volume {} at {}", params.name, mountpoint);
|
||||
let _lock = node.lock(svc, Some(&reason));
|
||||
|
||||
// Find the next available mountpoint identifier
|
||||
let qualified_name = vol.qualified_name(svc).await?;
|
||||
let mountpoint_identifier = node.config(svc)?.next_nth("mp");
|
||||
let mount_line = format!("{qualified_name},mp={mountpoint},backup=1");
|
||||
let mountpoint_identifier = svc.op_wrap_f(
|
||||
svc.op_start(Op::MountVolume, &format!("Node {}, vol {}, mp {mountpoint}", node.hostname, params.name)).await?,
|
||||
"Mounted volume",
|
||||
async || {
|
||||
// pre-condition: the node's config is clean
|
||||
let old_conf = node.config(svc)?;
|
||||
if old_conf.has_pending() {
|
||||
return Err(P5xError::BadPrecondition("Cannot mount volume: LXC container has unclean PVE config"))
|
||||
}
|
||||
|
||||
// Patch the node's config to mount the volume
|
||||
let pve_node = svc.pve_node(&node.pve_host)
|
||||
.map_err(P5xError::ServiceError)?;
|
||||
// Find the next available mountpoint identifier
|
||||
let qualified_name = vol.qualified_name(svc).await?;
|
||||
let mountpoint_identifier = old_conf.next_nth("mp");
|
||||
let mount_line = format!("{qualified_name},mp={mountpoint},backup=1");
|
||||
|
||||
let mut put_params = PutParams::default();
|
||||
put_params.mps.insert(mountpoint_identifier, mount_line);
|
||||
// Patch the node's config to mount the volume
|
||||
let pve_node = svc.pve_node(&node.pve_host)
|
||||
.map_err(P5xError::ServiceError)?;
|
||||
|
||||
debug!("Patching node config to mount volume {} ({put_params:?})", params.name);
|
||||
let res = pve_node.lxc()
|
||||
.vmid(node.vm_id())
|
||||
.config()
|
||||
.put(put_params);
|
||||
let mut put_params = PutParams::default();
|
||||
put_params.mps.insert(mountpoint_identifier, mount_line);
|
||||
|
||||
if let Err(Ureq(Status(_, ires))) = res {
|
||||
debug!("PVE response: {}", ires.into_string().unwrap());
|
||||
}
|
||||
debug!("Patching node config to mount volume {} ({put_params:?})", params.name);
|
||||
let res = pve_node.lxc()
|
||||
.vmid(node.vm_id())
|
||||
.config()
|
||||
.put(put_params);
|
||||
|
||||
// This is necessary because PUT returns {data: null} on success,
|
||||
// which the UreqClient flags as an unknown error.
|
||||
/*if let Err(UreqError::EncounteredErrors(e)) = res {
|
||||
return Err(P5xError::PveError(UreqError::EncounteredErrors(e)));
|
||||
}*/
|
||||
// This is necessary because PUT returns {data: null} on success,
|
||||
// which the UreqClient flags as an unknown error.
|
||||
if let Err(UreqError::EncounteredErrors(e)) = res {
|
||||
return Err(P5xError::PveError(UreqError::EncounteredErrors(e)));
|
||||
} else if let Err(Ureq(Status(_, ires))) = res {
|
||||
// FIXME: WTF here
|
||||
debug!("PVE response: {}", ires.into_string().unwrap());
|
||||
}
|
||||
|
||||
// post-condition: the config was applied cleanly
|
||||
let new_conf = node.config(svc)?;
|
||||
if new_conf.has_pending() {
|
||||
// Revert the config on error
|
||||
node.write_config(svc, &old_conf)?;
|
||||
return Err(P5xError::BadPostcondition("Cannot mount volume: LXC container config was unclean after mount"))
|
||||
}
|
||||
|
||||
Ok(mountpoint_identifier)
|
||||
}
|
||||
).await?;
|
||||
|
||||
// Persist the volume
|
||||
debug!("Persisting mount changes to volume {} in database", params.name);
|
||||
@ -248,64 +299,101 @@ pub async fn unmount(
|
||||
return Err(P5xError::BadPrecondition("Tried to unmount volume without a disk_name set"));
|
||||
}
|
||||
|
||||
// Unmount the disk's filesystem from within the K8s node
|
||||
debug!("Unmounting volume {} ({}) from K8s host", vol.name, vol.mountpoint.as_ref().unwrap());
|
||||
let cmd = format!("umount '{}'", vol.mountpoint.as_ref().unwrap());
|
||||
// Lock the node's config
|
||||
let node = vol.node(svc).await?;
|
||||
node.ssh_run_trimmed(svc, &cmd)?;
|
||||
let _lock = node.lock(svc, Some(&format!("Unmounting volume {}", vol.volume_id)))
|
||||
.await
|
||||
.map_err(P5xError::LockErr)?;
|
||||
|
||||
// Unmount the disk's filesystem from the PVE node's shadow tree (see UNMOUNT_NOTES below)
|
||||
// -- Find the PID of the LXC container itself
|
||||
debug!("Attempting to identify host PID for node {}", &node.pve_host);
|
||||
let pve_ssh = svc.pve_ssh(&node.pve_host)
|
||||
.map_err(P5xError::ServiceError)?;
|
||||
let cmd = format!("lxc-info -n {} -p", node.pve_id);
|
||||
let ct_pid = ssh_run_trimmed(&pve_ssh, &cmd)?;
|
||||
let ct_pid = ct_pid
|
||||
.split("PID:")
|
||||
.nth(1)
|
||||
.ok_or(P5xError::BadPrecondition("Failed to parse PID of node's LXC container"))?
|
||||
.trim();
|
||||
// pre-condition: the node's disk config is clean
|
||||
let old_conf = node.config(svc)?;
|
||||
if old_conf.has_pending() {
|
||||
return Err(P5xError::BadPrecondition("Cannot unmount volume: LXC container has unclean config"))
|
||||
}
|
||||
|
||||
// -- Find the parent PID (where the shadow tree exists)
|
||||
debug!("Attempting to identify parent PID for node {} (pid: {})", &node.pve_host, ct_pid);
|
||||
let cmd = format!("ps -o ppid= -p {ct_pid}");
|
||||
let parent_pid = ssh_run_trimmed(&pve_ssh, &cmd)?;
|
||||
|
||||
// -- Unmount the disk's filesystem from the shadow tree's mount namespace:
|
||||
let mp_id = vol.mountpoint_identifier.clone().unwrap();
|
||||
debug!("Unmounting volume {} from shadow namespace (pid: {}) {}", &vol.name, parent_pid, &mp_id);
|
||||
let cmd = format!("nsenter --target {parent_pid} --mount /bin/bash -c 'umount /var/lib/lxc/.pve-staged-mounts/{}'", mp_id);
|
||||
ssh_run_trimmed(&pve_ssh, &cmd)?;
|
||||
|
||||
// For LVM-type storage pools, we also need to deactivate the logical volume
|
||||
let pool_name = &svc.config.pve_storage_pool;
|
||||
let pool_driver = &svc.config.pve_storage_driver;
|
||||
if pool_driver == "lvm" {
|
||||
let cmd = format!("lvchange -aln '/dev/{pool_name}/{}'", vol.disk_name.as_ref().unwrap());
|
||||
ssh_run_trimmed(&pve_ssh, &cmd)?;
|
||||
// Hot-unmounting logic is only necessary if the LXC container is running
|
||||
if !node.is_offline(svc)? {
|
||||
let mpath = vol.mountpoint.as_ref().unwrap();
|
||||
|
||||
svc.op_wrap_f(
|
||||
svc.op_start(Op::UnmountVolume, &format!("Vol {} ({})", vol.name, mpath)).await?,
|
||||
"Unmounted volume",
|
||||
async || {
|
||||
// Unmount the disk's filesystem from within the K8s node
|
||||
// (1) The path must be a directory
|
||||
// (2) The path must be a mountpoint (the `stat` command)
|
||||
debug!("Unmounting volume {} ({}) from K8s host", vol.name, mpath);
|
||||
let cmd = format!("([ -d '{}' ] && [ \"$(stat --printf '%m' '{}')\" != '{}' ]) || umount '{}'", mpath, mpath, mpath, mpath);
|
||||
debug!("Unmount cmd: {cmd}"); // fixme remove this
|
||||
node.ssh_run_trimmed(svc, &cmd)?;
|
||||
|
||||
// Unmount the disk's filesystem from the PVE node's shadow tree (see UNMOUNT_NOTES below)
|
||||
// -- Find the PID of the LXC container itself
|
||||
debug!("Attempting to identify host PID for node {}", &node.pve_host);
|
||||
let pve_ssh = svc.pve_ssh(&node.pve_host)
|
||||
.map_err(P5xError::ServiceError)?;
|
||||
let cmd = format!("lxc-info -n {} -p", node.pve_id);
|
||||
let ct_pid = ssh_run_trimmed(&pve_ssh, &cmd)?;
|
||||
let ct_pid = ct_pid
|
||||
.split("PID:")
|
||||
.nth(1)
|
||||
.ok_or(P5xError::BadPrecondition("Failed to parse PID of node's LXC container"))?
|
||||
.trim();
|
||||
|
||||
// -- Find the parent PID (where the shadow tree exists)
|
||||
debug!("Attempting to identify parent PID for node {} (pid: {})", &node.pve_host, ct_pid);
|
||||
let cmd = format!("ps -o ppid= -p {ct_pid}");
|
||||
let parent_pid = ssh_run_trimmed(&pve_ssh, &cmd)?;
|
||||
|
||||
// -- Unmount the disk's filesystem from the shadow tree's mount namespace:
|
||||
debug!("Unmounting volume {} from shadow namespace (pid: {}) {}", &vol.name, parent_pid, &mp_id);
|
||||
let cmd = format!("nsenter --target {parent_pid} --mount /bin/bash -c 'umount /var/lib/lxc/.pve-staged-mounts/{}'", mp_id);
|
||||
ssh_run_trimmed(&pve_ssh, &cmd)?;
|
||||
|
||||
// For LVM-type storage pools, we also need to deactivate the logical volume
|
||||
let pool_name = &svc.config.pve_storage_pool;
|
||||
let pool_driver = &svc.config.pve_storage_driver;
|
||||
if pool_driver == "lvm" {
|
||||
let cmd = format!("lvchange -aln '/dev/{pool_name}/{}'", vol.disk_name.as_ref().unwrap());
|
||||
ssh_run_trimmed(&pve_ssh, &cmd)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
},
|
||||
).await?;
|
||||
}
|
||||
|
||||
// Patch the PVE node's config to mark the volume as an unused disk
|
||||
let qualified_name = vol.qualified_name(svc).await?;
|
||||
let lock_reason = format!("Unmounting volume {}", vol.volume_id);
|
||||
let mut vol = vol.into_active_model();
|
||||
node.mutate_config(
|
||||
svc,
|
||||
Some(&lock_reason),
|
||||
|mut conf| {
|
||||
let next_unused = conf.next_nth("unused");
|
||||
conf.replace(
|
||||
|s| s.starts_with(&mp_id),
|
||||
|_| Some(format!("unused{next_unused}: {qualified_name}")),
|
||||
);
|
||||
let mut conf = node.config(svc)?;
|
||||
let next_unused = conf.next_nth("unused");
|
||||
|
||||
conf.replace(
|
||||
|s| s.starts_with(&mp_id),
|
||||
|_| Some(format!("unused{next_unused}: {qualified_name}")),
|
||||
);
|
||||
|
||||
svc.op_wrap_f(
|
||||
svc.op_start(Op::RewriteConfigUnmount, &format!("Vol {} Node {}", qualified_name, node.hostname)).await?,
|
||||
"Updated config",
|
||||
async || {
|
||||
vol.mountpoint_identifier = Set(Some(format!("unused{next_unused}")));
|
||||
node.write_config(svc, &conf)?;
|
||||
|
||||
conf
|
||||
}
|
||||
)
|
||||
.await?;
|
||||
// post-condition: the node's config was applied cleanly
|
||||
let new_conf = node.config(svc)?;
|
||||
if new_conf.has_pending() {
|
||||
// Revert the config change (since we're direct-patching it, this shouldn't really happen)
|
||||
node.write_config(svc, &old_conf)?;
|
||||
return Err(P5xError::BadPostcondition("Could not unmount volume: patching resulted in unclean LXC config"))
|
||||
}
|
||||
|
||||
Ok(())
|
||||
},
|
||||
).await?;
|
||||
|
||||
// Persist the volume changes
|
||||
vol.mountpoint = Set(None);
|
||||
@ -350,27 +438,56 @@ pub async fn delete(
|
||||
return Err(P5xError::BadPrecondition("Could not delete volume: volume has no mountpoint identifier"));
|
||||
}
|
||||
|
||||
// Patch the PVE config to delete the volume from the config
|
||||
// Lock the node config
|
||||
let node = vol.node(svc).await?;
|
||||
let _lock = node.lock(svc, Some(&format!("Deleting volume {}", params.name)))
|
||||
.await
|
||||
.map_err(P5xError::LockErr)?;
|
||||
|
||||
// pre-condition: the node's config is clean
|
||||
let old_conf = node.config(svc)?;
|
||||
if old_conf.has_pending() {
|
||||
return Err(P5xError::BadPrecondition("Could not delete volume: LXC container has unclean config"))
|
||||
}
|
||||
|
||||
// Patch the PVE config to delete the volume from the config
|
||||
debug!("Patching node {} config to delete volume {} ({})", node.hostname, params.name, vol.mountpoint_identifier.as_ref().unwrap());
|
||||
let pve_node = svc.pve_node(&node.pve_host)
|
||||
.map_err(P5xError::ServiceError)?;
|
||||
|
||||
let mut pve_params = PutParams::default();
|
||||
pve_params.delete = vol.mountpoint_identifier;
|
||||
|
||||
let vm_id = VmId::new(i64::from(node.pve_id)).unwrap();
|
||||
let r = pve_node
|
||||
.lxc()
|
||||
.vmid(vm_id)
|
||||
.config()
|
||||
.put(pve_params)
|
||||
.map_err(P5xError::PveError);
|
||||
let mut pve_params = PutParams::default();
|
||||
pve_params.delete = vol.mountpoint_identifier.clone();
|
||||
|
||||
svc.op_wrap_f(
|
||||
svc.op_start(Op::DeleteVolume, &format!("Vol {} ({}) on {}", vol.name, vol.mountpoint_identifier.as_ref().unwrap(), vm_id)).await?,
|
||||
"Deleted volume",
|
||||
async || {
|
||||
let r = pve_node
|
||||
.lxc()
|
||||
.vmid(vm_id)
|
||||
.config()
|
||||
.put(pve_params)
|
||||
.map_err(P5xError::PveError);
|
||||
|
||||
// post-condition: the API request applied successfully
|
||||
if let Err(P5xError::PveError(proxmox_api::UreqError::Ureq(ureq::Error::Status(status, response)))) = r {
|
||||
let json_str = response.into_string().unwrap();
|
||||
error!("Error response from PVE API (status: {status}): {json_str}");
|
||||
}
|
||||
|
||||
// post-condition: the node's config was patched cleanly
|
||||
let new_conf = node.config(svc)?;
|
||||
if new_conf.has_pending() {
|
||||
// revert the config change so we don't lose track of the volume
|
||||
node.write_config(svc, &old_conf)?;
|
||||
return Err(P5xError::BadPostcondition("Could not delete volume: applying patch resulted in an unclean LXC config"))
|
||||
}
|
||||
|
||||
Ok(())
|
||||
},
|
||||
).await?;
|
||||
|
||||
if let Err(P5xError::PveError(proxmox_api::UreqError::Ureq(ureq::Error::Status(status, response)))) = r {
|
||||
let json_str = response.into_string().unwrap();
|
||||
error!("Error response from PVE API (status: {status}): {json_str}");
|
||||
}
|
||||
debug!("Successfully patched node {} config to delete volume {}", node.hostname, params.name);
|
||||
|
||||
// Persist the volume
|
||||
@ -439,12 +556,27 @@ async fn transfer_directly(
|
||||
return Err(P5xError::BadPrecondition("Cannot transfer volume: nodes reside on different physical hosts"));
|
||||
}
|
||||
|
||||
// Figure out the mountpoint identifier on the new node
|
||||
let _lock = to_node.lock(svc, Some("Receiving volume transfer"))
|
||||
// Lock both nodes' configs
|
||||
let _from_lock = from_node.lock(svc, Some("Sending volume transfer"))
|
||||
.await.map_err(P5xError::LockErr)?;
|
||||
|
||||
let config = to_node.config(svc)?;
|
||||
let mountpoint_identifier = config.next_nth("unused");
|
||||
let _to_lock = to_node.lock(svc, Some("Receiving volume transfer"))
|
||||
.await.map_err(P5xError::LockErr)?;
|
||||
|
||||
// pre-condition: to_node's config is clean
|
||||
let old_to_config = to_node.config(svc)?;
|
||||
if old_to_config.has_pending() {
|
||||
return Err(P5xError::BadPrecondition("Cannot transfer volume: to-node has unclean LXC config"))
|
||||
}
|
||||
|
||||
// pre-condition: from_node's config is clean
|
||||
let old_from_config = from_node.config(svc)?;
|
||||
if old_from_config.has_pending() {
|
||||
return Err(P5xError::BadPrecondition("Cannot transfer volume: from-node has unclean LXC config"))
|
||||
}
|
||||
|
||||
// Figure out the mountpoint identifier on the new node
|
||||
let mountpoint_identifier = old_to_config.next_nth("unused");
|
||||
let mountpoint_identifier = format!("unused{mountpoint_identifier}");
|
||||
|
||||
// Ask the PVE API to move the volume to the new node
|
||||
@ -455,26 +587,45 @@ async fn transfer_directly(
|
||||
post_params.target_volume = Some(pve_to_vol);
|
||||
post_params.target_vmid = Some(to_node.vm_id());
|
||||
|
||||
let res = svc.pve_node(&from_node.pve_host)
|
||||
.map_err(P5xError::ServiceError)?
|
||||
.lxc()
|
||||
.vmid(from_node.vm_id())
|
||||
.move_volume()
|
||||
.post(post_params);
|
||||
let mount = svc.op_wrap_f(
|
||||
svc.op_start(Op::TransferVolume, &format!("Vol {} from {} to {}", vol.name, from_node.pve_id, to_node.pve_id)).await?,
|
||||
"Transferred volume",
|
||||
async || {
|
||||
let res = svc.pve_node(&from_node.pve_host)
|
||||
.map_err(P5xError::ServiceError)?
|
||||
.lxc()
|
||||
.vmid(from_node.vm_id())
|
||||
.move_volume()
|
||||
.post(post_params);
|
||||
|
||||
// This is necessary because POST returns {data: null} on success,
|
||||
// which the UreqClient flags as an unknown error.
|
||||
if let Err(UreqError::EncounteredErrors(e)) = res {
|
||||
return Err(P5xError::PveError(UreqError::EncounteredErrors(e)));
|
||||
}
|
||||
// This is necessary because POST returns {data: null} on success,
|
||||
// which the UreqClient flags as an unknown error.
|
||||
if let Err(UreqError::EncounteredErrors(e)) = res {
|
||||
return Err(P5xError::PveError(UreqError::EncounteredErrors(e)));
|
||||
}
|
||||
|
||||
// Wait for the volume to transfer, since we don't get back a UPID
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
// Wait for the volume to transfer, since we don't get back a UPID
|
||||
sleep(Duration::from_secs(10)).await;
|
||||
|
||||
// Verify that the volume appears in the new node's config
|
||||
let config = to_node.config(svc)?;
|
||||
let mount = config.get(&mountpoint_identifier)
|
||||
.ok_or(P5xError::BadPostcondition("Could not find mountpoint config after transferring volume"))?;
|
||||
// post-condition: from_node's config is clean
|
||||
let new_from_config = from_node.config(svc)?;
|
||||
if new_from_config.has_pending() {
|
||||
// todo: don't have a great way to revert this currently w/o a ton of work
|
||||
return Err(P5xError::BadPostcondition("Could not transfer volume: from-node had unclean LXC config after transfer"))
|
||||
}
|
||||
|
||||
// post-condition: to_node's config is clean
|
||||
let new_to_config = to_node.config(svc)?;
|
||||
if new_to_config.has_pending() {
|
||||
// todo: don't have a great way to revert this currently w/o a ton of work
|
||||
return Err(P5xError::BadPostcondition("Could not transfer volume: to-node had unclean LXC config after transfer"))
|
||||
}
|
||||
|
||||
// Verify that the volume appears in the new node's config
|
||||
new_to_config.get(&mountpoint_identifier)
|
||||
.ok_or(P5xError::BadPostcondition("Could not find mountpoint config after transferring volume"))
|
||||
},
|
||||
).await?;
|
||||
|
||||
// Parse the disk name from the config
|
||||
// synology-scsi-lun:vm-103-disk-1,mp=/mnt/p5x-test0,backup=1,size=1G
|
||||
|
46
src/api/db/migrations/m20250415_000001_create_oplog_table.rs
Normal file
46
src/api/db/migrations/m20250415_000001_create_oplog_table.rs
Normal file
@ -0,0 +1,46 @@
|
||||
use async_trait::async_trait;
|
||||
use sea_orm_migration::{prelude::*, schema::*};
|
||||
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
#[async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager
|
||||
.create_table(
|
||||
Table::create()
|
||||
.table(Oplogs::Table)
|
||||
.if_not_exists()
|
||||
.col(pk_auto(Oplogs::Id))
|
||||
.col(string(Oplogs::Op))
|
||||
.col(text(Oplogs::OpDesc))
|
||||
.col(timestamp(Oplogs::OpAt))
|
||||
.col(boolean_null(Oplogs::Result))
|
||||
.col(text_null(Oplogs::ResultDesc))
|
||||
.col(timestamp_null(Oplogs::ResultAt))
|
||||
.to_owned())
|
||||
.await
|
||||
}
|
||||
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
manager
|
||||
.drop_table(
|
||||
Table::drop()
|
||||
.table(Oplogs::Table)
|
||||
.to_owned())
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(DeriveIden)]
|
||||
enum Oplogs {
|
||||
Table,
|
||||
Id,
|
||||
Op,
|
||||
OpDesc,
|
||||
OpAt,
|
||||
Result,
|
||||
ResultDesc,
|
||||
ResultAt,
|
||||
}
|
@ -8,6 +8,7 @@ use crate::api::Db;
|
||||
mod m20241102_000001_create_nodes_table;
|
||||
mod m20241102_000002_create_locks_table;
|
||||
mod m20241103_000001_create_volumes_table;
|
||||
mod m20250415_000001_create_oplog_table;
|
||||
|
||||
pub struct Migrator;
|
||||
|
||||
@ -18,6 +19,7 @@ impl MigratorTrait for Migrator {
|
||||
Box::new(m20241102_000001_create_nodes_table::Migration),
|
||||
Box::new(m20241102_000002_create_locks_table::Migration),
|
||||
Box::new(m20241103_000001_create_volumes_table::Migration),
|
||||
Box::new(m20250415_000001_create_oplog_table::Migration),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
@ -1,3 +1,4 @@
|
||||
pub mod locks;
|
||||
pub mod nodes;
|
||||
pub mod volumes;
|
||||
pub mod oplogs;
|
||||
|
@ -9,6 +9,7 @@ use sea_orm::QueryOrder;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time::sleep;
|
||||
use proxmox_api;
|
||||
use proxmox_api::nodes::node::lxc::vmid::status::current::Status;
|
||||
use log::{warn, debug};
|
||||
use proxmox_api::types::VmId;
|
||||
use crate::api::entity::locks;
|
||||
@ -136,6 +137,33 @@ impl PveConfig {
|
||||
Ok(PveConfig::new(&contents))
|
||||
}
|
||||
|
||||
pub fn write(&self, svc: &Services<'_>, pve_host: &str, pve_id: i32) -> Result<(), P5xError> {
|
||||
let conf = self.to_string();
|
||||
let conf = conf.as_bytes();
|
||||
let pve_ssh = svc.pve_ssh(pve_host)
|
||||
.map_err(P5xError::ServiceError)?;
|
||||
|
||||
// Write the file
|
||||
let sftp = pve_ssh.sftp()
|
||||
.map_err(SshError::ClientError)
|
||||
.map_err(P5xError::SshError)?;
|
||||
|
||||
let path = format!("/etc/pve/lxc/{}.conf", pve_id);
|
||||
let path = Path::new(&path);
|
||||
|
||||
debug!("Attempting to open file to mutate config {} on {}", path.display(), pve_host);
|
||||
let mut f = sftp.open_mode(path, OpenFlags::WRITE, 0o640, OpenType::File)
|
||||
.map_err(SshError::ClientError)
|
||||
.map_err(P5xError::SshError)?;
|
||||
|
||||
debug!("Attempting to write config to {} on {}", path.display(), pve_host);
|
||||
f.write_all(conf)
|
||||
.map_err(SshError::IoError)
|
||||
.map_err(P5xError::SshError)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/** Replace a line in the config file. */
|
||||
pub fn replace<F1, F2>(
|
||||
&mut self,
|
||||
@ -214,6 +242,18 @@ impl PveConfig {
|
||||
|
||||
0
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the loaded config file has any pending operations.
|
||||
* This is generally regarded as "unclean" in P5x parlance, and is considered
|
||||
* a failed pre-condition for applying any operations and a failed post-condition
|
||||
* _after_ applying an operation.
|
||||
*/
|
||||
pub fn has_pending(&self) -> bool {
|
||||
self.lines
|
||||
.iter()
|
||||
.any(|line| line.starts_with("[pve:pending]"))
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for PveConfig {
|
||||
@ -303,30 +343,7 @@ impl Model {
|
||||
|
||||
/** Replace the LXC container config for this node. */
|
||||
pub fn write_config(&self, svc: &Services, conf: &PveConfig) -> Result<(), P5xError> {
|
||||
let conf = conf.to_string();
|
||||
let conf = conf.as_bytes();
|
||||
let pve_ssh = svc.pve_ssh(&self.pve_host)
|
||||
.map_err(P5xError::ServiceError)?;
|
||||
|
||||
// Write the file
|
||||
let sftp = pve_ssh.sftp()
|
||||
.map_err(SshError::ClientError)
|
||||
.map_err(P5xError::SshError)?;
|
||||
|
||||
let path = format!("/etc/pve/lxc/{}.conf", self.pve_id);
|
||||
let path = Path::new(&path);
|
||||
|
||||
debug!("Attempting to open file to mutate config {} on {}", path.display(), self.pve_host);
|
||||
let mut f = sftp.open_mode(path, OpenFlags::WRITE, 0o640, OpenType::File)
|
||||
.map_err(SshError::ClientError)
|
||||
.map_err(P5xError::SshError)?;
|
||||
|
||||
debug!("Attempting to write config to {} on {}", path.display(), self.pve_host);
|
||||
f.write_all(conf)
|
||||
.map_err(SshError::IoError)
|
||||
.map_err(P5xError::SshError)?;
|
||||
|
||||
Ok(())
|
||||
conf.write(svc, &self.pve_host, self.pve_id)
|
||||
}
|
||||
|
||||
/** Try to acquire the node lock for this LXC container. Used for any operations that impact the config. */
|
||||
@ -364,6 +381,21 @@ impl Model {
|
||||
self.write_config(svc, &config)
|
||||
}
|
||||
|
||||
/** Returns true if the LXC container is STOPPED in PVE. */
|
||||
pub fn is_offline(&self, svc: &Services<'_>) -> Result<bool, P5xError> {
|
||||
let stat = svc.pve_node(&self.pve_host)
|
||||
.map_err(P5xError::ServiceError)?
|
||||
.lxc()
|
||||
.vmid(self.vm_id())
|
||||
.status()
|
||||
.current()
|
||||
.get()
|
||||
.map_err(P5xError::PveError)?
|
||||
.status;
|
||||
|
||||
Ok(stat == Status::Stopped)
|
||||
}
|
||||
|
||||
/** Run an SSH command on the node and return the output of the command, with whitespace trimmed. */
|
||||
pub fn ssh_run_trimmed(&self, svc: &Services, cmd: &str) -> Result<String, P5xError> {
|
||||
let node_ssh = self.ssh(svc)?;
|
||||
|
55
src/api/entity/oplogs.rs
Normal file
55
src/api/entity/oplogs.rs
Normal file
@ -0,0 +1,55 @@
|
||||
use sea_orm::entity::prelude::*;
|
||||
use sea_orm::sqlx::types::time::OffsetDateTime;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub enum Op {
|
||||
CreateLXC,
|
||||
ProvisionCarrier,
|
||||
TerminateCarrier,
|
||||
MigrateNode,
|
||||
CreateVolumeUnmanaged,
|
||||
MountVolume,
|
||||
UnmountVolume,
|
||||
RewriteConfigUnmount,
|
||||
DeleteVolume,
|
||||
TransferVolume,
|
||||
}
|
||||
|
||||
impl Op {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
Op::CreateLXC => "create-lxc",
|
||||
Op::ProvisionCarrier => "provision-carrier",
|
||||
Op::TerminateCarrier => "terminate-carrier",
|
||||
Op::MigrateNode => "migrate-node",
|
||||
Op::CreateVolumeUnmanaged => "create-volume-unmanaged",
|
||||
Op::MountVolume => "mount-volume",
|
||||
Op::UnmountVolume => "unmount-volume",
|
||||
Op::RewriteConfigUnmount => "rewrite-config-unmount",
|
||||
Op::DeleteVolume => "delete-volume",
|
||||
Op::TransferVolume => "transfer-volume",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_string(&self) -> String {
|
||||
self.as_str().to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize)]
|
||||
#[sea_orm(table_name = "oplogs")]
|
||||
pub struct Model {
|
||||
#[sea_orm(primary_key)]
|
||||
pub id: i64,
|
||||
pub op: String,
|
||||
pub op_desc: String,
|
||||
pub op_at: OffsetDateTime,
|
||||
pub result: Option<bool>,
|
||||
pub result_desc: Option<String>,
|
||||
pub result_at: Option<OffsetDateTime>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
pub enum Relation {}
|
||||
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
@ -1,11 +1,16 @@
|
||||
use std::future::Future;
|
||||
use std::io::Read;
|
||||
use std::net::TcpStream;
|
||||
use proxmox_api::nodes::node::NodeClient;
|
||||
use proxmox_api::UreqClient;
|
||||
use sea_orm::{DatabaseConnection, DbErr};
|
||||
use sea_orm::*;
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use sea_orm::sqlx::types::time::OffsetDateTime;
|
||||
use ssh2::Session;
|
||||
use crate::api::entity::nodes::P5xError;
|
||||
use crate::api::util::{read_p5x_config, P5xConfig};
|
||||
use crate::api::entity::oplogs;
|
||||
use crate::api::entity::oplogs::Op;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SshError {
|
||||
@ -108,6 +113,90 @@ impl<'a> Services<'a> {
|
||||
|
||||
Ok(sess)
|
||||
}
|
||||
|
||||
pub async fn op_unwrap<Ret, Err>(&self, op: i64, val: Result<Ret, Err>) -> Result<Ret, P5xError>
|
||||
where
|
||||
Err: std::fmt::Debug,
|
||||
P5xError: From<Err>,
|
||||
{
|
||||
match val {
|
||||
Ok(v) => Ok(v),
|
||||
Err(e) => {
|
||||
self.op_fail(op, &format!("{:?}", e)).await?;
|
||||
Err(P5xError::from(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn op_unwrap_f<Ret, Err, FutureVal>(&self, op: i64, val: FutureVal) -> Result<Ret, P5xError>
|
||||
where
|
||||
Err: std::fmt::Debug,
|
||||
P5xError: From<Err>,
|
||||
FutureVal: Future<Output = Result<Ret, Err>>,
|
||||
{
|
||||
self.op_unwrap(op, val.await).await
|
||||
}
|
||||
|
||||
pub async fn op_wrap_f<Ret, Closure, FutureClosure>(&self, op: i64, success: &str, cb: Closure) -> Result<Ret, P5xError>
|
||||
where
|
||||
FutureClosure: Future<Output = Result<Ret, P5xError>>,
|
||||
Closure: FnOnce() -> FutureClosure,
|
||||
{
|
||||
match cb().await {
|
||||
Ok(v) => {
|
||||
self.op_finish(op, success).await?;
|
||||
Ok(v)
|
||||
},
|
||||
Err(e) => {
|
||||
self.op_fail(op, &format!("{:?}", e)).await?;
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn op_start(&self, op: Op, desc: &str) -> Result<i64, P5xError> {
|
||||
let res = oplogs::ActiveModel {
|
||||
op: Set(op.to_string()),
|
||||
op_desc: Set(desc.to_string()),
|
||||
op_at: Set(OffsetDateTime::now_utc()),
|
||||
..Default::default()
|
||||
}
|
||||
.save(self.db)
|
||||
.await
|
||||
.map_err(P5xError::DbErr)?
|
||||
.try_into_model()
|
||||
.map_err(P5xError::DbErr)?;
|
||||
|
||||
Ok(res.id)
|
||||
}
|
||||
|
||||
pub async fn op_finish(&self, id: i64, desc: &str) -> Result<(), P5xError> {
|
||||
self.op_result(true, id, desc).await
|
||||
}
|
||||
|
||||
pub async fn op_fail(&self, id: i64, desc: &str) -> Result<(), P5xError> {
|
||||
self.op_result(false, id, desc).await
|
||||
}
|
||||
|
||||
async fn op_result(&self, result: bool, id: i64, desc: &str) -> Result<(), P5xError> {
|
||||
let mut op = self.resolve_op(id).await?.into_active_model();
|
||||
op.result = Set(Some(result));
|
||||
op.result_desc = Set(Some(desc.to_string()));
|
||||
op.result_at = Set(Some(OffsetDateTime::now_utc()));
|
||||
op.save(self.db)
|
||||
.await
|
||||
.map_err(P5xError::DbErr)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn resolve_op(&self, id: i64) -> Result<oplogs::Model, P5xError> {
|
||||
oplogs::Entity::find_by_id(id)
|
||||
.one(self.db)
|
||||
.await
|
||||
.map_err(P5xError::DbErr)?
|
||||
.ok_or(P5xError::BadPrecondition("Could not resolve Oplog model instance"))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
11
src/main.rs
11
src/main.rs
@ -5,7 +5,7 @@ use rocket::{Build, Rocket};
|
||||
use log::{error, info};
|
||||
use std::{env, process};
|
||||
use sea_orm::Database;
|
||||
use crate::api::cluster::system::{ensure_ssh_keypair, ensure_system_disk};
|
||||
use crate::api::cluster::system::{ensure_ssh_keypair, ensure_system_disk, migrate_system_disk_if_necessary};
|
||||
use crate::api::services::Services;
|
||||
use crate::api::util::read_p5x_config;
|
||||
|
||||
@ -22,7 +22,7 @@ async fn main() {
|
||||
|
||||
let args: Vec<String> = env::args().collect();
|
||||
if args.len() < 2 {
|
||||
error!(target: "p5x", "Missing required <mode> argument. Valid modes: api-server,ensure-system-disk");
|
||||
error!(target: "p5x", "Missing required <mode> argument. Valid modes: api-server,ensure-system-disk,migrate-system-disk");
|
||||
process::exit(1);
|
||||
}
|
||||
|
||||
@ -34,6 +34,13 @@ async fn main() {
|
||||
return;
|
||||
}
|
||||
|
||||
if mode == "migrate-system-disk" {
|
||||
let anon_db = Database::connect("sqlite::memory:").await.unwrap();
|
||||
let svc = Services::build(&anon_db).await.unwrap(); // fixme: this is going to fail because of the SSH keys
|
||||
migrate_system_disk_if_necessary(&svc).await.unwrap();
|
||||
return;
|
||||
}
|
||||
|
||||
ensure_ssh_keypair().expect("Could not ensure SSH keypair exists.");
|
||||
|
||||
let config = read_p5x_config(); // Do this so we early-fail if there are missing env vars
|
||||
|
Loading…
Reference in New Issue
Block a user