diff --git a/requests/nodes.http b/requests/nodes.http new file mode 100644 index 0000000..140ea3f --- /dev/null +++ b/requests/nodes.http @@ -0,0 +1,32 @@ +POST http://localhost:3450/node +Content-Type: application/json + +{ + "hostname": "worker-7.k8s", + "pve_id": 110, + "pve_host": "p-blade01", + "assigned_ip": "172.20.80.8", + "assigned_subnet": 16, + "is_permanent": true +} + +### +POST http://localhost:3450/node +Content-Type: application/json + +{ + "hostname": "worker-4.k8s", + "pve_id": 102, + "pve_host": "p-blade02", + "assigned_ip": "172.20.80.5", + "assigned_subnet": 16, + "is_permanent": true +} + +### +DELETE http://localhost:3450/node/worker-4.k8s +Content-Type: application/json + +{} + +### diff --git a/requests/volumes.http b/requests/volumes.http new file mode 100644 index 0000000..cb048f5 --- /dev/null +++ b/requests/volumes.http @@ -0,0 +1,40 @@ +POST http://localhost:3450/volume +Content-Type: application/json + +{ + "name": "testvol", + "size_in_bytes": 102400 +} + +### +GET http://localhost:3450/volume/testvol +Accept: application/json + +### +POST http://localhost:3450/volume/mount +Content-Type: application/json + +{ + "name": "testvol", + "mountpoint": "/mounted-testvol" +} + +### +POST http://localhost:3450/volume/unmount/testvol +Content-Type: application/json + +{} + +### +POST http://localhost:3450/volume/transfer/testvol/to/worker-4.k8s +Content-Type: application/json + +{} + +### +DELETE http://localhost:3450/volume/testvol +Content-Type: application/json + +{} + +### diff --git a/src/api/cluster/node.rs b/src/api/cluster/node.rs index 6a40640..793497d 100644 --- a/src/api/cluster/node.rs +++ b/src/api/cluster/node.rs @@ -1,13 +1,99 @@ use std::time::Duration; use proxmox_api::nodes::node::lxc::vmid::migrate; use proxmox_api::nodes::node::tasks::upid; -use sea_orm::{ActiveModelTrait, EntityTrait, IntoActiveModel, Set}; +use sea_orm::*; use tokio::time::sleep; -use crate::api::entity::nodes; -use crate::api::entity::nodes::P5xError; +use log::{info}; +use crate::api::entity::{locks, nodes}; +use crate::api::entity::nodes::{NodeParams, P5xError}; use crate::api::services::Services; +/** Register an existing LXC container with P5x. */ +pub async fn register_node( + svc: &Services<'_>, + params: &NodeParams, +) -> Result { + info!(target: "p5x", "Registering node {} ({})", params.hostname, params.pve_id); + + // Check if the node is already registered + let existing = nodes::Entity::find() + .filter(nodes::Column::Hostname.eq(¶ms.hostname)) + .one(svc.db) + .await + .map_err(P5xError::DbErr)?; + + if let Some(model) = existing { + return Ok(model); + } + + // Insert the new node record + let model = nodes::ActiveModel { + hostname: Set(params.hostname.to_string()), + pve_id: Set(params.pve_id), + pve_host: Set(params.pve_host.to_string()), + assigned_ip: Set(params.assigned_ip.to_string()), + assigned_subnet: Set(params.assigned_subnet), + is_permanent: Set(params.is_permanent), + ..Default::default() + } + .save(svc.db) + .await + .map_err(P5xError::DbErr)?; + + // Create the corresponding lock record + locks::ActiveModel { + lock_type: Set("nodes".to_string()), + lock_resource: Set(model.id.clone().unwrap().to_string()), + ..Default::default() + } + .save(svc.db) + .await + .map_err(P5xError::DbErr)?; + + let node = model.try_into_model() + .map_err(P5xError::DbErr)?; + + Ok(node) +} + + +/** Unregister an existing LXC container from P5x. */ +pub async fn unregister_node( + svc: &Services<'_>, + hostname: &str, +) -> Result<(), P5xError> { + // Look up the existing node + let node = nodes::Entity::find() + .filter(nodes::Column::Hostname.eq(hostname)) + .one(svc.db) + .await + .map_err(P5xError::DbErr)? + .ok_or(P5xError::BadPrecondition("Could not unregister node: unable to find node with that hostname"))?; + + // Acquire the node lock + let lock = node.lock(svc, Some("Unregistering node from p5x")) + .await.map_err(P5xError::LockErr)?; + + // Delete the node record first to avoid timing issues w/ locking + nodes::Entity::delete_by_id(node.id) + .exec(svc.db) + .await + .map_err(P5xError::DbErr)?; + + // Release the lock and delete it + drop(lock); + locks::Entity::delete_many() + .filter(locks::Column::LockType.eq("nodes")) + .filter(locks::Column::LockResource.eq(node.id.to_string())) + .exec(svc.db) + .await + .map_err(P5xError::DbErr)?; + + Ok(()) +} + + /** Migrate an LXC container from its current PVE node to the given PVE node. */ pub async fn migrate_node( svc: &Services<'_>, diff --git a/src/api/db/mod.rs b/src/api/db/mod.rs index 59effb2..cc2bc01 100644 --- a/src/api/db/mod.rs +++ b/src/api/db/mod.rs @@ -7,6 +7,7 @@ use sea_orm_rocket::{Config, Database, Pool}; use async_trait::async_trait; use rocket::fairing::AdHoc; use sea_orm::ConnectOptions; +use crate::api::entity::locks::ensure_vmid_lock; #[derive(Database, Debug)] #[database("p5x_api")] @@ -41,6 +42,9 @@ impl Pool for DbPool { } let conn = sea_orm::Database::connect(options).await?; + + ensure_vmid_lock(&conn).await?; // todo: probably a better place to put this + Ok(DbPool { conn }) } diff --git a/src/api/entity/locks.rs b/src/api/entity/locks.rs index cae6904..5575826 100644 --- a/src/api/entity/locks.rs +++ b/src/api/entity/locks.rs @@ -6,6 +6,7 @@ use sea_orm::{IntoActiveModel}; use serde::{Deserialize, Serialize}; use tokio::time::sleep; use uuid::Uuid; +use log::info; /** Try to acquire the given lock, if it is not held. */ pub async fn try_lock<'a>( @@ -57,12 +58,39 @@ pub async fn lock<'a>( } -/** Acquire the global lock for an LXC container, lazy-waiting until it is available. */ +/** Acquire the global lock the overall cluster, lazy-waiting until it is available. (Used to create new LXC containers.) */ pub async fn lock_vmid<'a>(db: &'a DatabaseConnection, lock_reason: Option<&str>) -> Result, DbErr> { lock(db, "global_vmid", "0", lock_reason).await } +/** Ensure the global VMID lock (used by lock_vmid) exists. */ +pub async fn ensure_vmid_lock(db: &DatabaseConnection) -> Result<(), DbErr> { + // Try to find the lock record + let existing = Entity::find() + .filter(Column::LockType.eq("global_vmid")) + .filter(Column::LockResource.eq("0")) + .one(db) + .await?; + + if let Some(_) = existing { + return Ok(()); + } + + // Create the lock record + info!(target: "p5x", "Creating global VMID lock... (one-time fix up)"); + ActiveModel { + lock_type: Set("global_vmid".to_string()), + lock_resource: Set("0".to_string()), + ..Default::default() + } + .save(db) + .await?; + + Ok(()) +} + + /** A held lock. */ #[derive(Clone)] pub struct LockHandle<'a> { diff --git a/src/api/entity/nodes.rs b/src/api/entity/nodes.rs index 118a64d..72538a2 100644 --- a/src/api/entity/nodes.rs +++ b/src/api/entity/nodes.rs @@ -193,6 +193,31 @@ impl Display for PveConfig { } } + +/** API-parameter version of a Node, used for form data I/O */ +#[derive(Serialize, Deserialize, FromForm)] +pub struct NodeParams { + pub id: Option, + pub hostname: String, + pub pve_id: i32, + pub pve_host: String, + pub assigned_ip: String, + pub assigned_subnet: u32, + pub is_permanent: bool, +} + +impl NodeParams { + /** Look up a node by hostname and get its params. */ + pub async fn resolve(svc: &Services<'_>, hostname: &str) -> Result, DbErr> { + Entity::find() + .filter(Column::Hostname.eq(hostname)) + .one(svc.db) + .await + .map(|v| v.map(|v| v.into())) + } +} + + #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize, FromForm)] #[sea_orm(table_name = "nodes")] pub struct Model { @@ -342,3 +367,17 @@ impl Model { pub enum Relation {} impl ActiveModelBehavior for ActiveModel {} + +impl Into for Model { + fn into(self) -> NodeParams { + NodeParams { + id: Some(self.id), + hostname: self.hostname, + pve_id: self.pve_id, + pve_host: self.pve_host, + assigned_ip: self.assigned_ip, + assigned_subnet: self.assigned_subnet, + is_permanent: self.is_permanent, + } + } +} diff --git a/src/api/route/mod.rs b/src/api/route/mod.rs index 29b27f5..bbf1ef3 100644 --- a/src/api/route/mod.rs +++ b/src/api/route/mod.rs @@ -1,9 +1,11 @@ use rocket::fairing::AdHoc; mod volume; +mod node; pub(super) fn init() -> AdHoc { AdHoc::on_ignite("Registering routes", |rocket| async { rocket.attach(volume::init()) + .attach(node::init()) }) } diff --git a/src/api/route/node.rs b/src/api/route/node.rs new file mode 100644 index 0000000..09e652f --- /dev/null +++ b/src/api/route/node.rs @@ -0,0 +1,45 @@ +use rocket::fairing::AdHoc; +use rocket::response::status; +use rocket::serde::json::Json; +use sea_orm_rocket::Connection; +use crate::api; +use crate::api::cluster; +use crate::api::entity::nodes::NodeParams; +use crate::api::services::Services; +use crate::api::util::raise_500; + +#[post("/", data = "")] +async fn register_node( + conn: Connection<'_, api::Db>, + input: Json, +) -> Result, status::Custom> { + let input = input.into_inner(); + let db = conn.into_inner(); + let svc = Services::build(db).await.map_err(raise_500)?; + + let node: NodeParams = cluster::node::register_node(&svc, &input) + .await + .map_err(raise_500)? + .into(); + + Ok(Json(node.into())) +} + +#[delete("/")] +async fn unregister_node( + conn: Connection<'_, api::Db>, + hostname: &str, +) -> Result, status::Custom> { + let db = conn.into_inner(); + let svc = Services::build(db).await.map_err(raise_500)?; + + cluster::node::unregister_node(&svc, hostname).await.map_err(raise_500)?; + + Ok(Json(serde_json::json!({}))) +} + +pub(super) fn init() -> AdHoc { + AdHoc::on_ignite("Routes: /node", |rocket| async { + rocket.mount("/node", routes![register_node, unregister_node]) + }) +} diff --git a/src/main.rs b/src/main.rs index 7319321..90893c6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,7 +23,10 @@ async fn main() { process::exit(1); } - read_p5x_config(); // Do this so we early-fail if there are missing env vars + let config = read_p5x_config(); // Do this so we early-fail if there are missing env vars + info!(target: "p5x", "Successfully read config from environment."); + info!(target: "p5x", "Cluster host: {} ({})", config.pve_host_name, config.pve_api_host); + info!(target: "p5x", "Storage pool: {} ({})", config.pve_storage_pool, config.pve_storage_driver); let mode = &args[1]; if mode == "api-server" {