api-server/src/api/cluster/node.rs

158 lines
4.6 KiB
Rust

use std::time::Duration;
use proxmox_api::nodes::node::lxc::vmid::migrate;
use proxmox_api::nodes::node::tasks::upid;
use sea_orm::*;
use tokio::time::sleep;
use log::{info};
use crate::api::entity::{locks, nodes};
use crate::api::entity::nodes::{NodeParams, P5xError};
use crate::api::services::Services;
/** Register an existing LXC container with P5x. */
pub async fn register_node(
svc: &Services<'_>,
params: &NodeParams,
) -> Result<nodes::Model, P5xError> {
info!(target: "p5x", "Registering node {} ({})", params.hostname, params.pve_id);
// Check if the node is already registered
let existing = nodes::Entity::find()
.filter(nodes::Column::Hostname.eq(&params.hostname))
.one(svc.db)
.await
.map_err(P5xError::DbErr)?;
if let Some(model) = existing {
return Ok(model);
}
// Insert the new node record
let model = nodes::ActiveModel {
hostname: Set(params.hostname.to_string()),
pve_id: Set(params.pve_id),
pve_host: Set(params.pve_host.to_string()),
assigned_ip: Set(params.assigned_ip.to_string()),
assigned_subnet: Set(params.assigned_subnet),
is_permanent: Set(params.is_permanent),
..Default::default()
}
.save(svc.db)
.await
.map_err(P5xError::DbErr)?;
// Create the corresponding lock record
locks::ActiveModel {
lock_type: Set("nodes".to_string()),
lock_resource: Set(model.id.clone().unwrap().to_string()),
..Default::default()
}
.save(svc.db)
.await
.map_err(P5xError::DbErr)?;
let node = model.try_into_model()
.map_err(P5xError::DbErr)?;
Ok(node)
}
/** Unregister an existing LXC container from P5x. */
pub async fn unregister_node(
svc: &Services<'_>,
hostname: &str,
) -> Result<(), P5xError> {
// Look up the existing node
let node = nodes::Entity::find()
.filter(nodes::Column::Hostname.eq(hostname))
.one(svc.db)
.await
.map_err(P5xError::DbErr)?
.ok_or(P5xError::BadPrecondition("Could not unregister node: unable to find node with that hostname"))?;
// Acquire the node lock
let lock = node.lock(svc, Some("Unregistering node from p5x"))
.await.map_err(P5xError::LockErr)?;
// Delete the node record first to avoid timing issues w/ locking
nodes::Entity::delete_by_id(node.id)
.exec(svc.db)
.await
.map_err(P5xError::DbErr)?;
// Release the lock and delete it
drop(lock);
locks::Entity::delete_many()
.filter(locks::Column::LockType.eq("nodes"))
.filter(locks::Column::LockResource.eq(node.id.to_string()))
.exec(svc.db)
.await
.map_err(P5xError::DbErr)?;
Ok(())
}
/** Migrate an LXC container from its current PVE node to the given PVE node. */
pub async fn migrate_node(
svc: &Services<'_>,
node: nodes::Model,
to_host: &str,
) -> Result<nodes::Model, P5xError> {
// Ask the PVE API to start migrating the node
let params = migrate::PostParams::new(to_host.to_string());
let upid = svc.pve_node(&node.pve_host)
.map_err(P5xError::ServiceError)?
.lxc()
.vmid(node.vm_id())
.migrate()
.post(params)
.map_err(P5xError::PveError)?;
// Wait for the UPID to finish
wait_upid(svc, &node.pve_host, &upid).await?;
// Persist the node
let mut node = node.into_active_model();
node.pve_host = Set(to_host.to_string());
let node = node.save(svc.db).await.map_err(P5xError::DbErr)?;
nodes::Entity::find_by_id(node.id.unwrap())
.one(svc.db)
.await
.map_err(P5xError::DbErr)?
.ok_or(P5xError::BadPostcondition("Could not look up node after persisting"))
}
/** Wait for a PVE task to complete using its UPID */
pub async fn wait_upid(svc: &Services<'_>, node: &str, upid: &str) -> Result<(), P5xError> {
info!("Waiting for UPID {upid} on node {node}");
let pve = svc.pve_node(node)
.map_err(P5xError::ServiceError)?;
loop {
let status = pve.tasks()
.upid(upid)
.status()
.get()
.map_err(P5xError::PveError)?;
if status.status == upid::status::Status::Running {
sleep(Duration::from_secs(1)).await;
continue;
}
if let Some(s) = status.exitstatus {
if s == "OK" {
info!("UPID {upid} on node {node} finished");
return Ok(());
}
error!("UPID {upid} on node {node} failed");
return Err(P5xError::UpidFailed(node.to_string(), upid.to_string()));
}
}
}