diff --git a/Cargo.toml b/Cargo.toml index 2621213..1fd235d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,9 @@ categories = ["command-line-utilities"] [dependencies] anyhow = "1.0" -bollard = "0.15" +# bollard = "0.15" +bollard = { git = "https://www.github.com/fussybeaver/bollard.git", rev = "cef1cd5" } + cansi = "2.2" clap = { version = "4.4", features = ["derive", "unicode", "color"] } crossterm = "0.27" diff --git a/src/app_data/container_state.rs b/src/app_data/container_state.rs index 08f0785..4ca9b72 100644 --- a/src/app_data/container_state.rs +++ b/src/app_data/container_state.rs @@ -121,6 +121,9 @@ pub enum State { } impl State { + pub const fn is_alive(self) -> bool { + matches!(self, Self::Running) + } pub const fn get_color(self) -> Color { match self { Self::Paused => Color::Yellow, @@ -158,6 +161,12 @@ impl From<&str> for State { } } +impl From> for State { + fn from(input: Option) -> Self { + input.map_or(Self::Unknown, |input| Self::from(input.as_str())) + } +} + impl fmt::Display for State { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let disp = match self { @@ -216,7 +225,7 @@ impl fmt::Display for DockerControls { Self::Restart => "restart", Self::Start => "start", Self::Stop => "stop", - Self::Unpause => "unpause", + Self::Unpause => "resume", }; write!(f, "{disp}") } diff --git a/src/docker_data/mod.rs b/src/docker_data/mod.rs index 0f69d97..b88d915 100644 --- a/src/docker_data/mod.rs +++ b/src/docker_data/mod.rs @@ -10,13 +10,19 @@ use futures_util::StreamExt; use parking_lot::Mutex; use std::{ collections::HashMap, - sync::{atomic::AtomicBool, Arc}, + sync::{ + atomic::{AtomicBool, AtomicUsize}, + Arc, + }, +}; +use tokio::{ + sync::mpsc::{Receiver, Sender}, + task::JoinHandle, }; -use tokio::{sync::mpsc::Receiver, task::JoinHandle}; use uuid::Uuid; use crate::{ - app_data::{AppData, ContainerId, DockerControls}, + app_data::{AppData, ContainerId, DockerControls, State}, app_error::AppError, parse_args::CliArgs, ui::{GuiState, Status}, @@ -50,6 +56,11 @@ impl Binate { } } +// struct Init { +// done: , +// len: +// } + pub struct DockerData { app_data: Arc>, args: CliArgs, @@ -57,6 +68,7 @@ pub struct DockerData { docker: Arc, gui_state: Arc>, is_running: Arc, + init: Option>, receiver: Receiver, spawns: Arc>>>, } @@ -96,67 +108,93 @@ impl DockerData { app_data: Arc>, docker: Arc, id: ContainerId, - is_running: bool, + init: Option<(Arc, usize)>, + state: State, spawn_id: SpawnId, spawns: Arc>>>, ) { + // if dead and !init then inspect! - let mut stream = docker - .stats( - id.get(), - Some(StatsOptions { - stream: false, - one_shot: !is_running, - }), - ) - .take(1); + if state.is_alive() || init.is_some() { + // // if state == State::Paused && init.is_some() { + // // app_data.lock().debug_string.push_str("is paused"); - while let Some(Ok(stats)) = stream.next().await { - let mem_stat = stats.memory_stats.usage.unwrap_or_default(); - let mem_limit = stats.memory_stats.limit.unwrap_or_default(); + // // if let Ok(result) = docker.inspect_container(id.get(), Some(InspectContainerOptions{size:false})).await { + // // let mem_limit = format!("{}", result.host_config.map_or(0, |i|i.memory.unwrap_or_default())); - let op_key = stats - .networks - .as_ref() - .and_then(|networks| networks.keys().next().cloned()); + // // app_data.lock().debug_string.push_str(&mem_limit); + // // } - let cpu_stats = Self::calculate_usage(&stats); + // // }else if state.is_alive() || init.is_some() { + let mut stream = docker + .stats( + id.get(), + Some(StatsOptions { + stream: false, + one_shot: false, + }), + ) + .take(1); - let (rx, tx) = if let Some(key) = op_key { - stats + // let a = stream.next().await; + // app_data.lock().debug_string.push_str(&format!("{:?}", a.is_some())); + // let bb = a.unwrap().unwrap(); + + // // } + + // app_data.lock().debug_string.push_str("jkl"); + + while let Some(Ok(stats)) = stream.next().await { + let mem_stat = if state.is_alive() { + Some(stats.memory_stats.usage.unwrap_or_default()) + } else { + None + }; + + let mem_limit = stats.memory_stats.limit.unwrap_or_default(); + + let op_key = stats .networks - .unwrap_or_default() - .get(&key) - .map_or((0, 0), |f| (f.rx_bytes, f.tx_bytes)) - } else { - (0, 0) - }; + .as_ref() + .and_then(|networks| networks.keys().next().cloned()); + + let cpu_stats = if state.is_alive() { + Some(Self::calculate_usage(&stats)) + } else { + None + }; + let (rx, tx) = if let Some(key) = op_key { + stats + .networks + .unwrap_or_default() + .get(&key) + .map_or((0, 0), |f| (f.rx_bytes, f.tx_bytes)) + } else { + (0, 0) + }; - if is_running { - app_data.lock().update_stats( - &id, - Some(cpu_stats), - Some(mem_stat), - mem_limit, - rx, - tx, - ); - } else { app_data .lock() - .update_stats(&id, None, None, mem_limit, rx, tx); + .update_stats(&id, cpu_stats, mem_stat, mem_limit, rx, tx); } - spawns.lock().remove(&spawn_id); + } + spawns.lock().remove(&spawn_id); + if let Some((target, _)) = init { + target.fetch_add(1, std::sync::atomic::Ordering::SeqCst); } } /// Update all stats, spawn each container into own tokio::spawn thread - fn update_all_container_stats(&mut self, all_ids: &[(bool, ContainerId)]) { - for (is_running, id) in all_ids { + fn update_all_container_stats(&mut self, all_ids: &[(State, ContainerId)]) { + // let thing =all_ids.len(); + for (state, id) in all_ids { + // let init = self.init.as_ref().map_or_else(|| None, |x| Some((Arc::clone(x), all_ids.len()))); let docker = Arc::clone(&self.docker); let app_data = Arc::clone(&self.app_data); let spawns = Arc::clone(&self.spawns); let spawn_id = SpawnId::Stats((id.clone(), self.binate)); + + let init = self.init.as_ref().map(|i| (Arc::clone(i), all_ids.len())); self.spawns .lock() .entry(spawn_id.clone()) @@ -165,7 +203,8 @@ impl DockerData { app_data, docker, id.clone(), - *is_running, + init, + *state, spawn_id, spawns, )) @@ -177,7 +216,7 @@ impl DockerData { /// Get all current containers, handle into ContainerItem in the app_data struct rather than here /// Just make sure that items sent are guaranteed to have an id /// If in a containerised runtime, will ignore any container that uses the `/app/oxker` as an entry point, unless the `-s` flag is set - pub async fn update_all_containers(&mut self) -> Vec<(bool, ContainerId)> { + pub async fn update_all_containers(&mut self) -> Vec<(State, ContainerId)> { let containers = self .docker .list_containers(Some(ListContainersOptions:: { @@ -212,13 +251,7 @@ impl DockerData { output .into_iter() .filter_map(|i| { - i.id.map(|id| { - ( - i.state == Some("running".to_owned()) - || i.state == Some("restarting".to_owned()), - ContainerId::from(id.as_str()), - ) - }) + i.id.map(|id| (State::from(i.state), ContainerId::from(id.as_str()))) }) .collect::>() } @@ -253,7 +286,7 @@ impl DockerData { } /// Update all logs, spawn each container into own tokio::spawn thread - fn init_all_logs(&mut self, all_ids: &[(bool, ContainerId)]) { + fn init_all_logs(&mut self, all_ids: &[(State, ContainerId)]) { for (_, id) in all_ids { let docker = Arc::clone(&self.docker); let app_data = Arc::clone(&self.app_data); @@ -275,6 +308,7 @@ impl DockerData { .lock() .entry(SpawnId::Log(container.id.clone())) .or_insert_with(|| { + // TODO make a struct that can create this data let app_data = Arc::clone(&self.app_data); let docker = Arc::clone(&self.docker); let id = container.id.clone(); @@ -291,17 +325,17 @@ impl DockerData { self.gui_state.lock().status_push(Status::Init); let loading_uuid = Uuid::new_v4(); let loading_handle = GuiState::start_loading_animation(&self.gui_state, loading_uuid); - // let handle = self.gui_state.lock().st - let all_ids = self.update_all_containers().await; self.update_all_container_stats(&all_ids); self.init_all_logs(&all_ids); - // wait until all logs have initialised - while !self.app_data.lock().initialised(&all_ids) { + while let Some(x) = self.init.as_ref() { tokio::time::sleep(std::time::Duration::from_millis(100)).await; + if x.load(std::sync::atomic::Ordering::SeqCst) == all_ids.len() { + self.init = None; + } } self.gui_state .lock() @@ -422,11 +456,26 @@ impl DockerData { } } + /// Send an update message every x ms, where x is the args.docker_interval + fn croner(args: &CliArgs, docker_tx: Sender) { + let update_duration = std::time::Duration::from_millis(u64::from(args.docker_interval)); + let mut now = std::time::Instant::now(); + tokio::spawn(async move { + loop { + let to_sleep = update_duration.saturating_sub(now.elapsed()); + tokio::time::sleep(to_sleep).await; + docker_tx.send(DockerMessage::Update).await.ok(); + now = std::time::Instant::now(); + } + }); + } + /// Initialise self, and start the message receiving loop pub async fn init( app_data: Arc>, docker: Docker, docker_rx: Receiver, + docker_tx: Sender, gui_state: Arc>, is_running: Arc, ) { @@ -434,15 +483,17 @@ impl DockerData { if app_data.lock().get_error().is_none() { let mut inner = Self { app_data, - args, + args: args.clone(), binate: Binate::One, docker: Arc::new(docker), gui_state, + init: Some(Arc::new(AtomicUsize::new(0))), is_running, receiver: docker_rx, spawns: Arc::new(Mutex::new(HashMap::new())), }; inner.initialise_container_data().await; + Self::croner(&args, docker_tx); inner.message_handler().await; } }