Port over initial implementation of API server and simplify configuration

This commit is contained in:
2025-02-23 12:42:48 -05:00
parent cbddc5db18
commit c89d94dd66
25 changed files with 260 additions and 561 deletions

View File

@@ -3,14 +3,16 @@ use uuid::Uuid;
use proxmox_api::nodes::node::lxc;
use proxmox_api::nodes::node::lxc::vmid;
use proxmox_api::types::VmId;
use sea_orm::{ActiveModelTrait, EntityTrait, Set, TryIntoModel};
use sea_orm::*;
use crate::api::cluster::node::wait_upid;
use crate::api::entity::locks::lock_vmid;
use crate::api::entity::nodes;
use crate::api::entity::{locks, nodes};
use crate::api::entity::nodes::P5xError;
use crate::api::services::{ssh_run_trimmed, Services};
use crate::api::services::SshError;
/** Create an empty Proxmox LXC container that can be used to shuttle a volume between PVE nodes. */
pub async fn provision_carrier(
svc: &Services<'_>,
from_node: &nodes::Model,
@@ -32,8 +34,7 @@ pub async fn provision_carrier(
// Build the new container params
let hostname = format!("carrier-{}", Uuid::new_v4().to_string());
let storage = svc.setting_req(|s| &s.pve_storage_pool)
.map_err(P5xError::ServiceError)?;
let storage = &svc.config.pve_storage_pool;
let mut params = lxc::PostParams::new("local:vztmpl/p5x-empty.tar.xz".to_string(), vm_id);
params.cores = Some(1);
@@ -41,7 +42,7 @@ pub async fn provision_carrier(
params.hostname = Some(hostname.to_string());
params.memory = Some(16); // in MB, min 16
params.start = Some(false);
params.storage = Some(storage);
params.storage = Some(storage.to_string());
params.tags = Some("p5x".to_string());
// Ask the PVE API to start creating the carrier node based on our empty template
@@ -68,9 +69,23 @@ pub async fn provision_carrier(
.await
.map_err(P5xError::DbErr)?;
node.try_into_model().map_err(P5xError::DbErr)
let node = node.try_into_model().map_err(P5xError::DbErr)?;
// Create a lock instance for the new node
locks::ActiveModel {
lock_type: Set("nodes".to_string()),
lock_resource: Set(node.id.to_string()),
..Default::default()
}
.save(svc.db)
.await
.map_err(P5xError::DbErr)?;
Ok(node)
}
/** Make sure the tarball LXC template for the carrier container exists on the given node. */
pub async fn ensure_carrier_template(
svc: &Services<'_>,
node: &nodes::Model,
@@ -100,6 +115,8 @@ pub async fn ensure_carrier_template(
Ok(())
}
/** Destroy the given carrier LXC container. */
pub async fn terminate_carrier(
svc: &Services<'_>,
carrier: nodes::Model,
@@ -122,5 +139,12 @@ pub async fn terminate_carrier(
.await
.map_err(P5xError::DbErr)?;
locks::Entity::delete_many()
.filter(locks::Column::LockType.eq("nodes"))
.filter(locks::Column::LockResource.eq(carrier.id.to_string()))
.exec(svc.db)
.await
.map_err(P5xError::DbErr)?;
Ok(())
}

View File

@@ -7,6 +7,8 @@ use crate::api::entity::nodes;
use crate::api::entity::nodes::P5xError;
use crate::api::services::Services;
/** Migrate an LXC container from its current PVE node to the given PVE node. */
pub async fn migrate_node(
svc: &Services<'_>,
node: nodes::Model,
@@ -37,6 +39,8 @@ pub async fn migrate_node(
.ok_or(P5xError::BadPostcondition("Could not look up node after persisting"))
}
/** Wait for a PVE task to complete using its UPID */
pub async fn wait_upid(svc: &Services<'_>, node: &str, upid: &str) -> Result<(), P5xError> {
info!("Waiting for UPID {upid} on node {node}");
let pve = svc.pve_node(node)

View File

@@ -5,16 +5,20 @@ use proxmox_api::nodes::node::lxc::vmid::config::PutParams;
use proxmox_api::nodes::node::lxc::vmid::move_volume;
use proxmox_api::types::VmId;
use proxmox_api::UreqError;
use proxmox_api::UreqError::Ureq;
use sea_orm::*;
use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use ureq::Error::Status;
use crate::api::cluster::carrier::{provision_carrier, terminate_carrier};
use crate::api::cluster::node::migrate_node;
use crate::api::entity::nodes::{lock_first_available, P5xError};
use crate::api::entity::{nodes, volumes};
use crate::api::services::{ssh_run_trimmed, Services};
/** Parameters required from an API call to manage a volume. */
#[derive(Serialize, Deserialize, FromForm)]
pub struct VolumeParams {
pub id: Option<i32>,
@@ -25,6 +29,7 @@ pub struct VolumeParams {
}
impl VolumeParams {
/** Look up a volume by name and get its params. */
pub async fn resolve(svc: &Services<'_>, name: &str) -> Result<Option<VolumeParams>, DbErr> {
volumes::Entity::find()
.filter(volumes::Column::Name.eq(name))
@@ -34,6 +39,8 @@ impl VolumeParams {
}
}
/** Create a new PVE volume of the given size. */
pub async fn create(
svc: &Services<'_>,
name: &str,
@@ -63,8 +70,7 @@ pub async fn create(
info!("Volume {name} will become mp{mp_id} on node {} ({})", node.hostname, node.pve_id);
// Generate a new mountpoint entry for the node's config
let storage = svc.setting_req(|s| &s.pve_storage_pool)
.map_err(P5xError::ServiceError)?;
let storage = &svc.config.pve_storage_pool;
let size_in_gib = max((size_in_bytes as u64).div_ceil(1024 * 1024 * 1024) as i64, 1);
let line = format!("{storage}:{size_in_gib},mp=/mnt/p5x-{name},backup=1");
debug!("Volume {name}: {line}");
@@ -100,11 +106,12 @@ pub async fn create(
debug!("Found mountpoint details for volume {name}: {mount}");
// Parse the disk name from the config
let name_offset = format!("{storage}:{}/", node.pve_id).len() + 1;
// synology-scsi-lun:vm-103-disk-1,mp=/mnt/p5x-test0,backup=1,size=1G // fixme: how does this behave for NFS?
let name_offset = storage.len() + 1; // 1 for the colon (:)
let disk_name = mount[name_offset..].split(",").next().unwrap();
// Persist the volume
debug!("Inserting record into volumes table for volume {name}");
debug!("Inserting record into volumes table for volume {name} ({disk_name})");
let res = volumes::ActiveModel {
name: Set(name.to_string()),
size_in_bytes: Set(size_in_bytes),
@@ -126,6 +133,8 @@ pub async fn create(
Ok(vol)
}
/** Mount a volume to the specified mountpoint on the LXC container it is attached to. */
pub async fn mount(
svc: &Services<'_>,
params: &VolumeParams,
@@ -162,17 +171,21 @@ pub async fn mount(
let mut put_params = PutParams::default();
put_params.mps.insert(mountpoint_identifier, mount_line);
debug!("Patching node config to mount volume {}", params.name);
debug!("Patching node config to mount volume {} ({put_params:?})", params.name);
let res = pve_node.lxc()
.vmid(node.vm_id())
.config()
.put(put_params);
if let Err(Ureq(Status(_, ires))) = res {
debug!("PVE response: {}", ires.into_string().unwrap());
}
// This is necessary because PUT returns {data: null} on success,
// which the UreqClient flags as an unknown error.
if let Err(UreqError::EncounteredErrors(e)) = res {
/*if let Err(UreqError::EncounteredErrors(e)) = res {
return Err(P5xError::PveError(UreqError::EncounteredErrors(e)));
}
}*/
// Persist the volume
debug!("Persisting mount changes to volume {} in database", params.name);
@@ -187,6 +200,8 @@ pub async fn mount(
Ok(())
}
/** Unmount a volume from the LXC container it is attached to. */
pub async fn unmount(
svc: &Services<'_>,
params: &VolumeParams,
@@ -238,8 +253,8 @@ pub async fn unmount(
ssh_run_trimmed(&pve_ssh, &cmd)?;
// For LVM-type storage pools, we also need to deactivate the logical volume
let pool_name = svc.setting_req(|s| &s.pve_storage_pool).map_err(P5xError::ServiceError)?;
let pool_driver = svc.setting_req(|s| &s.pve_storage_driver).map_err(P5xError::ServiceError)?;
let pool_name = &svc.config.pve_storage_pool;
let pool_driver = &svc.config.pve_storage_driver;
if pool_driver == "lvm" {
let cmd = format!("lvchange -aln '/dev/{pool_name}/{}'", vol.disk_name.as_ref().unwrap());
ssh_run_trimmed(&pve_ssh, &cmd)?;
@@ -288,6 +303,8 @@ pub async fn unmount(
// This was *such* a pain in the ass to figure out, but it's a testament to open-
// source that I was able to do it at all.
/** Delete the given volume from the PVE cluster. */
pub async fn delete(
svc: &Services<'_>,
params: &VolumeParams,
@@ -340,11 +357,13 @@ pub async fn delete(
Ok(())
}
/** Migrate a volume from its current LXC container to the specified LXC container. */
pub async fn transfer(
svc: &Services<'_>,
params: &VolumeParams,
to_node: &nodes::Model,
) -> Result<(), P5xError> {
) -> Result<volumes::Model, P5xError> {
// Look up the volume from the params
let vol = volumes::resolve(svc, params).await?;
@@ -352,13 +371,12 @@ pub async fn transfer(
// If the volume already resides on to_node, we're done
if from_node.pve_id == to_node.pve_id {
return Ok(());
return Ok(vol);
}
// If from_node and to_node are on the same physical host, transfer directly
if from_node.pve_host == to_node.pve_host {
transfer_directly(svc, vol, &from_node, to_node).await?;
return Ok(());
return transfer_directly(svc, vol, &from_node, to_node).await;
}
// If the nodes are on different physical hosts, we need to create a temporary
@@ -368,13 +386,15 @@ pub async fn transfer(
let vol = transfer_directly(svc, vol, &from_node, &carrier).await?;
let carrier = migrate_node(svc, carrier, &to_node.pve_host).await?;
let _vol = transfer_directly(svc, vol, &carrier, &to_node).await?;
let vol = transfer_directly(svc, vol, &carrier, &to_node).await?;
terminate_carrier(svc, carrier).await?;
Ok(())
Ok(vol)
}
/** Migrate a volume from one LXC container to another LXC container, when both reside on the same PVE host. */
async fn transfer_directly(
svc: &Services<'_>,
vol: volumes::Model,
@@ -431,11 +451,11 @@ async fn transfer_directly(
.ok_or(P5xError::BadPostcondition("Could not find mountpoint config after transferring volume"))?;
// Parse the disk name from the config
let storage = svc.setting_req(|s| &s.pve_storage_pool)
.map_err(P5xError::ServiceError)?;
let name_offset = format!("{storage}:{}/", to_node.pve_id).len() + 1;
// synology-scsi-lun:vm-103-disk-1,mp=/mnt/p5x-test0,backup=1,size=1G
let storage = &svc.config.pve_storage_pool;
let name_offset = storage.len() + 1; // 1 for the colon (:)
let disk_name = mount[name_offset..].split(",").next().unwrap();
debug!("transfer_directly: mount {mount} | name_offset {name_offset} | disk_name {disk_name}");
// Persist the volume
let mut vol = vol.into_active_model();
vol.pve_node_id = Set(to_node.pve_id);

View File

@@ -1,58 +0,0 @@
use async_trait::async_trait;
use sea_orm_migration::{prelude::*, schema::*};
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Settings::Table)
.if_not_exists()
.col(pk_auto(Settings::Id))
.col(string_null(Settings::PveMasterNode))
.col(string_null(Settings::PveApiHost))
.col(string_null(Settings::PveRootPassword))
.col(string_null(Settings::PveStoragePool))
.col(string_null(Settings::PveStorageDriver))
.col(string_null(Settings::DnsDomain))
.col(string_null(Settings::NodeNetworkBridge))
.col(big_unsigned_null(Settings::NodeCpus))
.col(big_unsigned_null(Settings::NodeRamInMib))
.col(string_null(Settings::RootPassword))
.col(text_null(Settings::SshPublicKey))
.col(text_null(Settings::SshPrivateKey))
.to_owned())
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(
Table::drop()
.table(Settings::Table)
.to_owned())
.await
}
}
#[derive(DeriveIden)]
enum Settings {
Table,
Id,
PveMasterNode,
PveApiHost,
PveRootPassword,
PveStoragePool,
PveStorageDriver,
DnsDomain,
NodeNetworkBridge,
NodeCpus,
NodeRamInMib,
RootPassword,
SshPublicKey,
SshPrivateKey,
}

View File

@@ -5,7 +5,6 @@ pub use sea_orm_migration::prelude::*;
use sea_orm_rocket::Database;
use crate::api::Db;
mod m20241101_000001_create_settings_table;
mod m20241102_000001_create_nodes_table;
mod m20241102_000002_create_locks_table;
mod m20241103_000001_create_volumes_table;
@@ -16,7 +15,6 @@ pub struct Migrator;
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20241101_000001_create_settings_table::Migration),
Box::new(m20241102_000001_create_nodes_table::Migration),
Box::new(m20241102_000002_create_locks_table::Migration),
Box::new(m20241103_000001_create_volumes_table::Migration),

View File

@@ -26,9 +26,13 @@ impl Pool for DbPool {
let config = figment.extract::<Config>().unwrap();
let mut options: ConnectOptions = config.url.into();
debug!("max_connections: {} | min: {}", config.max_connections, config.min_connections.unwrap_or_default());
options
.max_connections(config.max_connections as u32)
.min_connections(config.min_connections.unwrap_or_default())
// .max_connections(config.max_connections as u32)
// .min_connections(config.min_connections.unwrap_or_default())
.max_connections(512)
.min_connections(128)
.connect_timeout(Duration::from_secs(config.connect_timeout))
.sqlx_logging(config.sqlx_logging);

View File

@@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use uuid::Uuid;
/** Try to acquire the given lock, if it is not held. */
pub async fn try_lock<'a>(
db: &'a DatabaseConnection,
lock_type: &str,
@@ -36,6 +37,8 @@ pub async fn try_lock<'a>(
lock.map(|lock | LockHandle::new(lock, db)))
}
/** Acquire the given lock, lazy-waiting until it is available. */
pub async fn lock<'a>(
db: &'a DatabaseConnection,
lock_type: &str,
@@ -53,10 +56,14 @@ pub async fn lock<'a>(
}
}
/** Acquire the global lock for an LXC container, lazy-waiting until it is available. */
pub async fn lock_vmid<'a>(db: &'a DatabaseConnection, lock_reason: Option<&str>) -> Result<LockHandle<'a>, DbErr> {
lock(db, "global_vmid", "0", lock_reason).await
}
/** A held lock. */
#[derive(Clone)]
pub struct LockHandle<'a> {
pub lock: Model,

View File

@@ -1,4 +1,3 @@
pub mod settings;
pub mod locks;
pub mod nodes;
pub mod volumes;

View File

@@ -3,15 +3,17 @@ use std::net::TcpStream;
use ssh2::{OpenFlags, OpenType, Session};
use std::io::{Read, Write};
use std::path::Path;
use std::str::FromStr;
use std::time::Duration;
use sea_orm::entity::prelude::*;
use sea_orm::QueryOrder;
use serde::{Deserialize, Serialize};
use serde::{de, Deserialize, Deserializer, Serialize};
use tokio::time::sleep;
use proxmox_api;
use log::{warn, debug};
use proxmox_api::types::VmId;
use crate::api::entity::{locks, settings};
use serde::de::Error;
use crate::api::entity::locks;
use crate::api::entity::locks::{lock, try_lock, LockHandle};
use crate::api::services::{Services, ServiceError, SshError, ssh_run_trimmed};
@@ -34,6 +36,12 @@ pub enum P5xError {
UpidFailed(String, String),
}
/**
* Acquire the first free LXC container lock.
* Useful for operations that can occur on an arbitrary node (e.g. creating a new volume).
* Lazy-wait until a lock is acquired.
*/
pub async fn lock_first_available<'a>(
db: &'a DatabaseConnection,
lock_reason: Option<&str>,
@@ -54,6 +62,11 @@ pub async fn lock_first_available<'a>(
Err(NodeLockErr::WaitTimeLimitExceeded)
}
/**
* Try to acquire the first free LXC container lock.
* Useful for operations that can occur on an arbitrary node (e.g. creating a new volume).
*/
pub async fn try_lock_first_available<'a>(
db: &'a DatabaseConnection,
lock_reason: Option<&str>,
@@ -84,6 +97,7 @@ pub async fn try_lock_first_available<'a>(
}
/** The PVE config file for an LXC container (i.e. /etc/pve/lxc/XYZ.conf) */
pub struct PveConfig {
lines: Vec<String>,
}
@@ -97,6 +111,7 @@ impl PveConfig {
PveConfig { lines }
}
/** Replace a line in the config file. */
pub fn replace<F1, F2>(
&mut self,
matcher: F1,
@@ -115,6 +130,7 @@ impl PveConfig {
});
}
/** Run a map function on every line in the config file. */
pub fn map<F>(
&mut self,
replacer: F
@@ -128,15 +144,30 @@ impl PveConfig {
.collect();
}
/**
* Get the config setting based on its key.
* Example: If key="fubar", it will find the line "fubar: something"
* and return "something"
*/
pub fn get(&self, key: &str) -> Option<String> {
let key = format!("{key}:");
self.lines
let val = self.lines
.iter()
.filter(|line| line.starts_with(&key))
.next()
.map(|s| s.to_string())
.map(|s| s.to_string())?;
let offset = key.len() + 1;
let val = val[offset..].trim();
Some(val.to_string())
}
/**
* Proxmox stores configs for resources that can have duplicates (e.g. volumes) in a predictable
* format. For example, volumes will have "volume0: ..." / "volume1: ..." / &c.
* This method will find the first free number for a given prefix.
* Using that example, next_nth("volume") would return 2.
*/
pub fn next_nth(&self, prefix: &str) -> u32 {
let res = self.lines
.iter()
@@ -178,16 +209,12 @@ pub struct Model {
}
impl Model {
fn setting<F>(&self, svc: &Services, f: F) -> Result<String, P5xError>
where
F: FnOnce(&settings::Model) -> &Option<String> {
svc.setting_req(f).map_err(P5xError::ServiceError)
}
/** Get the VM ID used by the Proxmox API library. */
pub fn vm_id(&self) -> VmId {
VmId::new(i64::from(self.pve_id)).unwrap()
}
/** Open an SSH session to this LXC container. */
pub fn ssh(&self, svc: &Services) -> Result<Session, P5xError> {
let addr = format!("{}:22", self.assigned_ip);
let tcp = TcpStream::connect(addr)
@@ -203,11 +230,10 @@ impl Model {
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
let pubkey = self.setting(svc, |s| &s.ssh_public_key)?;
let privkey = self.setting(svc, |s| &s.ssh_private_key)?;
log::debug!("privkey: {privkey}");
let pubkey = &svc.config.ssh_pubkey;
let privkey = &svc.config.ssh_privkey;
sess.userauth_pubkey_memory("root", Some(&pubkey), &privkey, None)
sess.userauth_pubkey_memory("root", Some(pubkey), privkey, None)
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
@@ -218,6 +244,7 @@ impl Model {
Ok(sess)
}
/** Load the LXC container config for this node. */
pub fn config(&self, svc: &Services) -> Result<PveConfig, P5xError> {
let pve_ssh = svc.pve_ssh(&self.pve_host)
.map_err(P5xError::ServiceError)?;
@@ -243,6 +270,7 @@ impl Model {
Ok(PveConfig::new(&contents))
}
/** Replace the LXC container config for this node. */
pub fn write_config(&self, svc: &Services, conf: &PveConfig) -> Result<(), P5xError> {
let conf = conf.to_string();
let conf = conf.as_bytes();
@@ -267,33 +295,24 @@ impl Model {
.map_err(SshError::IoError)
.map_err(P5xError::SshError)?;
/*let mut remote_file = pve_ssh.scp_send(path, 0o640, conf.len() as u64, None)
.map_err(SshError::ClientError)
.map_err(P5xError::SshError)?;
remote_file.write_all(conf).map_err(SshError::IoError).map_err(P5xError::SshError)?;*/
// Close the channel and wait for the whole content to be transferred
/*remote_file.send_eof().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.wait_eof().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.close().map_err(SshError::ClientError).map_err(P5xError::SshError)?;
remote_file.wait_close().map_err(SshError::ClientError).map_err(P5xError::SshError)?;*/
Ok(())
}
/** Try to acquire the node lock for this LXC container. Used for any operations that impact the config. */
pub async fn try_lock<'a>(&self, svc: &Services<'a>, lock_reason: Option<&str>) -> Result<Option<LockHandle<'a>>, NodeLockErr> {
try_lock(svc.db, "nodes", &self.id.to_string(), lock_reason)
.await
.map_err(NodeLockErr::DbErr)
}
/** Block until we acquire the node lock for this LXC container. Used for any operations that impact the config. */
pub async fn lock<'a>(&self, svc: &Services<'a>, lock_reason: Option<&str>) -> Result<LockHandle<'a>, NodeLockErr> {
lock(svc.db, "nodes", &self.id.to_string(), lock_reason)
.await
.map_err(NodeLockErr::DbErr)
}
/** Helper for loading, modifying, then replacing the config, while managing the lock automatically. */
pub async fn mutate_config<F>(
&self,
svc: &Services<'_>,
@@ -314,6 +333,7 @@ impl Model {
self.write_config(svc, &config)
}
/** Run an SSH command on the node and return the output of the command, with whitespace trimmed. */
pub fn ssh_run_trimmed(&self, svc: &Services, cmd: &str) -> Result<String, P5xError> {
let node_ssh = self.ssh(svc)?;
ssh_run_trimmed(&node_ssh, cmd)

View File

@@ -1,29 +0,0 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize, FromForm)]
#[sea_orm(table_name = "settings")]
pub struct Model {
#[sea_orm(primary_key)]
#[field(default = 0)]
pub id: i32,
pub pve_master_node: Option<String>,
pub pve_api_host: Option<String>,
pub pve_root_password: Option<String>,
pub pve_storage_pool: Option<String>,
pub pve_storage_driver: Option<String>,
// pub dns_domain: Option<String>,
// pub node_network_bridge: Option<String>,
// pub node_cpus: Option<u64>,
// pub node_ram_in_mib: Option<u64>,
// pub root_password: Option<String>,
#[sea_orm(column_type = "Text")]
pub ssh_public_key: Option<String>,
#[sea_orm(column_type = "Text")]
pub ssh_private_key: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@@ -5,6 +5,8 @@ use crate::api::entity::nodes;
use crate::api::entity::nodes::P5xError;
use crate::api::services::Services;
/** Given the API volume params, look up the Volume model instance. */
pub async fn resolve(
svc: &Services<'_>,
params: &VolumeParams,
@@ -37,6 +39,7 @@ pub struct Model {
}
impl Model {
/** Get the Node model for the LXC container this volume is attached to. */
pub async fn node(&self, svc: &Services<'_>) -> Result<nodes::Model, P5xError> {
nodes::Entity::find()
.filter(nodes::Column::PveId.eq(self.pve_node_id))
@@ -46,17 +49,14 @@ impl Model {
.ok_or(P5xError::BadPrecondition("Could not find node for volume: pve_node_id does not match any nodes"))
}
/** Get the name of this volume as it would appear in the LXC config file. */
pub async fn qualified_name(&self, svc: &Services<'_>) -> Result<String, P5xError> {
let node = self.node(svc).await?;
let storage = svc.setting_req(|s| &s.pve_storage_pool)
.map_err(P5xError::ServiceError)?;
let storage_type = svc.setting_req(|s| &s.pve_storage_driver)
.map_err(P5xError::ServiceError)?;
let storage = &svc.config.pve_storage_pool;
let storage_type = &svc.config.pve_storage_driver;
let disk_name = format!("pve-{}.raw", self.name);
let disk_name = self.disk_name.as_ref().unwrap_or(&disk_name);
let qn = match storage_type.as_str() {
"lvm" => format!("{storage}:{disk_name}"),
_ => format!("{storage}:{}/{disk_name}", node.pve_id),

View File

@@ -2,7 +2,7 @@ use rocket::fairing::AdHoc;
mod db;
mod route;
mod util;
pub mod util;
mod cluster;
pub mod entity;
pub use db::Db;

View File

@@ -1,51 +0,0 @@
use rocket::fairing::AdHoc;
use rocket::form::Form;
use rocket::response::{status};
use rocket_dyn_templates::{context, Template};
use sea_orm::*;
use sea_orm_rocket::Connection;
use crate::api;
use crate::api::entity::settings;
use crate::api::util::raise_500;
async fn render(db: &DatabaseConnection) -> Result<Template, status::Custom<String>> {
let settings = settings::Entity::find().order_by_desc(settings::Column::Id).one(db).await.map_err(raise_500)?;
Ok(Template::render("settings", context! {
settings: settings,
}))
}
#[get("/")]
async fn get(
conn: Connection<'_, api::Db>,
) -> Result<Template, status::Custom<String>> {
let db = conn.into_inner();
render(db).await
}
#[post("/", data = "<input>")]
async fn save(
conn: Connection<'_, api::Db>,
input: Form<settings::Model>,
) -> Result<Template, status::Custom<String>> {
let db = conn.into_inner();
settings::ActiveModel {
pve_master_node: Set(input.pve_master_node.to_owned()),
pve_api_host: Set(input.pve_api_host.to_owned()),
pve_root_password: Set(input.pve_root_password.to_owned()),
pve_storage_pool: Set(input.pve_storage_pool.to_owned()),
..Default::default()
}
.save(db)
.await
.map_err(raise_500)?;
render(db).await
}
pub(super) fn init() -> AdHoc {
AdHoc::on_ignite("Routes: /configure", |rocket| async {
rocket.mount("/configure", routes![get, save])
})
}

View File

@@ -1,11 +1,9 @@
use rocket::fairing::AdHoc;
mod configure;
mod volume;
pub(super) fn init() -> AdHoc {
AdHoc::on_ignite("Registering routes", |rocket| async {
rocket.attach(configure::init())
.attach(volume::init())
rocket.attach(volume::init())
})
}

View File

@@ -11,6 +11,7 @@ use crate::api::entity::nodes::P5xError;
use crate::api::services::Services;
use crate::api::util::raise_500;
#[get("/<name>")]
async fn get_vol(
conn: Connection<'_, api::Db>,
@@ -27,6 +28,7 @@ async fn get_vol(
Ok(Json(vol))
}
#[post("/", data = "<input>")]
async fn create_vol(
conn: Connection<'_, api::Db>,
@@ -46,11 +48,12 @@ async fn create_vol(
Ok(Json(vol.into()))
}
#[delete("/<name>")]
async fn delete_vol(
conn: Connection<'_, api::Db>,
name: &str,
) -> Result<(), status::Custom<String>> {
) -> Result<Json<serde_json::Value>, status::Custom<String>> {
let db = conn.into_inner();
let svc = Services::build(db).await.map_err(raise_500)?;
@@ -62,28 +65,30 @@ async fn delete_vol(
cluster::volume::unmount(&svc, &vol).await.map_err(raise_500)?;
cluster::volume::delete(&svc, &vol).await.map_err(raise_500)?;
Ok(())
Ok(Json(serde_json::json!({})))
}
#[post("/mount", data = "<params>")]
async fn mount_vol(
conn: Connection<'_, api::Db>,
params: Json<VolumeParams>,
) -> Result<(), status::Custom<String>> {
) -> Result<Json<serde_json::Value>, status::Custom<String>> {
let db = conn.into_inner();
let svc = Services::build(db).await.map_err(raise_500)?;
let params = params.into_inner();
cluster::volume::mount(&svc, &params).await.map_err(raise_500)?;
Ok(())
Ok(Json(serde_json::json!({})))
}
#[post("/unmount/<name>")]
async fn unmount_vol(
conn: Connection<'_, api::Db>,
name: &str,
) -> Result<(), status::Custom<String>> {
) -> Result<Json<serde_json::Value>, status::Custom<String>> {
let db = conn.into_inner();
let svc = Services::build(db).await.map_err(raise_500)?;
@@ -94,15 +99,16 @@ async fn unmount_vol(
cluster::volume::unmount(&svc, &vol).await.map_err(raise_500)?;
Ok(())
Ok(Json(serde_json::json!({})))
}
#[post("/transfer/<name>/to/<node>")]
async fn transfer_vol(
conn: Connection<'_, api::Db>,
name: &str,
node: &str,
) -> Result<(), status::Custom<String>> {
) -> Result<Json<VolumeParams>, status::Custom<String>> {
let db = conn.into_inner();
let svc = Services::build(db).await.map_err(raise_500)?;
@@ -118,11 +124,12 @@ async fn transfer_vol(
.ok_or(P5xError::BadPrecondition("Could not find a node with that name"))
.map_err(raise_500)?;
cluster::volume::transfer(&svc, &vol, &node).await.map_err(raise_500)?;
let vol = cluster::volume::transfer(&svc, &vol, &node).await.map_err(raise_500)?;
Ok(())
Ok(Json(vol.into()))
}
pub(super) fn init() -> AdHoc {
AdHoc::on_ignite("Routes: /volume", |rocket| async {
rocket.mount("/volume", routes![create_vol, delete_vol, get_vol, mount_vol, unmount_vol, transfer_vol])

View File

@@ -2,10 +2,10 @@ use std::io::Read;
use std::net::TcpStream;
use proxmox_api::nodes::node::NodeClient;
use proxmox_api::UreqClient;
use sea_orm::{DatabaseConnection, DbErr, EntityTrait, QueryOrder};
use sea_orm::{DatabaseConnection, DbErr};
use ssh2::Session;
use crate::api::entity::nodes::P5xError;
use crate::api::entity::settings;
use crate::api::util::{read_p5x_config, P5xConfig};
#[derive(Debug)]
pub enum SshError {
@@ -23,61 +23,46 @@ impl SshError {
#[derive(Debug)]
pub enum ServiceError {
MissingSetting,
PveError(proxmox_api::UreqError),
InvalidNetworkInterface,
SshError(SshError),
}
/** Helper for managing singleton services. */
pub struct Services<'a> {
pub settings: Option<settings::Model>,
pub db: &'a DatabaseConnection,
pub config: P5xConfig,
}
impl<'a> Services<'a> {
/** Create a new services instance. */
pub async fn build(db: &'a DatabaseConnection) -> Result<Services<'a>, DbErr> {
let settings = settings::Entity::find().order_by_desc(settings::Column::Id).one(db).await?;
Ok(Services {
db,
settings,
config: read_p5x_config(),
})
}
pub fn setting<F>(&self, f: F) -> Option<String>
where
F: FnOnce(&settings::Model) -> &Option<String>,
{
let model = self.settings.as_ref()?;
let res = f(model);
if let Some(res) = res {
return Some(res.clone());
}
None
}
pub fn setting_req<F>(&self, f: F) -> Result<String, ServiceError>
where
F: FnOnce(&settings::Model) -> &Option<String>
{
self.setting(f).ok_or(ServiceError::MissingSetting)
}
pub fn pve(&self) -> Result<proxmox_api::UreqClient, ServiceError> {
let host = self.setting_req(|s| &s.pve_api_host)?;
let pw = self.setting_req(|s| &s.pve_root_password)?;
/** Get a new Proxmox API client instance. */
pub fn pve(&self) -> Result<UreqClient, ServiceError> {
let host = &self.config.pve_api_host;
let pw = &self.config.pve_root_password;
let host = format!("https://{host}:8006");
let api = proxmox_api::UreqClient::new(&host, "root", "pam", &pw)
let api = UreqClient::new(&host, "root", "pam", &pw)
.map_err(ServiceError::PveError)?;
Ok(api)
}
/** Get a Proxmox Node API client instance for the given node. */
pub fn pve_node(&self, host: &str) -> Result<NodeClient<UreqClient>, ServiceError> {
let pve = self.pve()?;
Ok(proxmox_api::nodes::NodesClient::new(pve).node(host))
}
/** Given a PVE host name, get its network address. */
pub fn pve_addr(&self, host: &str) -> Result<String, ServiceError> {
let ifaces = self.pve_node(host)?
.network()
@@ -96,6 +81,7 @@ impl<'a> Services<'a> {
Ok(addr.as_str().unwrap().to_string())
}
/** Open a new SSH session to the given PVE node. */
pub fn pve_ssh(&self, host: &str) -> Result<Session, ServiceError> {
let addr = self.pve_addr(host)?;
let addr = format!("{}:22", addr);
@@ -112,8 +98,7 @@ impl<'a> Services<'a> {
.map_err(SshError::ClientError)
.map_err(ServiceError::SshError)?;
let pw = self.setting_req(|s| &s.pve_root_password)?;
sess.userauth_password("root", &pw)
sess.userauth_password("root", &self.config.pve_root_password)
.map_err(SshError::ClientError)
.map_err(ServiceError::SshError)?;
@@ -125,6 +110,8 @@ impl<'a> Services<'a> {
}
}
/** Run an SSH command and return the output as a string, with whitespace trimmed. */
pub fn ssh_run_trimmed(session: &Session, cmd: &str) -> Result<String, P5xError> {
let mut channel = session.channel_session()
.map_err(SshError::ClientError)

View File

@@ -1,7 +1,48 @@
use std::{env, fs};
use std::fmt::Debug;
use rocket::http;
use rocket::response::status;
use log::error;
use rocket::serde::Deserialize;
pub fn raise_500(e: impl Debug) -> status::Custom<String> {
status::Custom(http::Status::InternalServerError, format!("An unexpected error has occurred: {:?}", e))
/** Global config for the P5x application. */
#[derive(Deserialize)]
pub struct P5xConfig {
pub pve_host_name: String,
pub pve_api_host: String,
pub pve_root_password: String,
pub pve_storage_pool: String,
pub pve_storage_driver: String,
pub k8s_root_password: String,
pub ssh_pubkey: String,
pub ssh_privkey: String,
}
/** Read the P5xConfig instance from the corresponding environment variables. */
pub fn read_p5x_config() -> P5xConfig {
let pubkey_path = env::var("P5X_SSH_PUBKEY_PATH").expect("Missing env: P5X_SSH_PUBKEY_PATH");
let privkey_path = env::var("P5X_SSH_PRIVKEY_PATH").expect("Missing env: P5X_SSH_PRIVKEY_PATH");
let config = P5xConfig {
pve_host_name: env::var("P5X_NODE_HOSTNAME").expect("Missing env: P5X_NODE_HOSTNAME"),
pve_api_host: env::var("P5X_API_HOST").expect("Missing env: P5X_API_HOST"),
pve_root_password: env::var("P5X_API_ROOT_PASSWORD").expect("Missing env: P5X_API_ROOT_PASSWORD"),
pve_storage_pool: env::var("P5X_STORAGE_POOL").expect("Missing env: P5X_STORAGE_POOL"),
pve_storage_driver: env::var("P5X_STORAGE_DRIVER").expect("Missing env: P5X_STORAGE_DRIVER"),
k8s_root_password: env::var("P5X_K8S_ROOT_PASSWORD").expect("Missing env: P5X_K8S_ROOT_PASSWORD"),
ssh_pubkey: fs::read_to_string(&pubkey_path).expect(&format!("Could not read SSH pubkey from file: {pubkey_path}")),
ssh_privkey: fs::read_to_string(&privkey_path).expect(&format!("Could not read SSH privkey from file: {privkey_path}")),
};
config
}
/** Catch-all Rocket helper to generate an HTTP 500 response for the given error. */
pub fn raise_500(e: impl Debug) -> status::Custom<String> {
error!("Unhandled error: {e:?}");
status::Custom(http::Status::InternalServerError, format!("An unexpected error has occurred: {e:?}"))
}

View File

@@ -1,9 +1,13 @@
pub mod api;
#[macro_use] extern crate rocket;
use dotenv::dotenv;
use rocket::{Build, Rocket};
use log::{error, info};
use std::{env, process};
use rocket::figment::Figment;
use rocket::figment::providers::{Env, Format, Toml};
use rocket_dyn_templates::{ Template};
use serde::Deserialize;
fn configure_rocket() -> Rocket<Build> {
rocket::build()
@@ -13,6 +17,7 @@ fn configure_rocket() -> Rocket<Build> {
#[tokio::main]
async fn main() {
dotenv().ok();
env_logger::init();
info!(target: "p5x", "Starting p5x...");