refactor: DockerData refactors

Use a croner in the docker_data, instead of in the ui thread, as this thread will be paused when in exec mode.

is_initilised is again done in docker_data, after stats have been calculated

use bollard from git, waiting for new release due to Docker changes
This commit is contained in:
Jack Wills
2023-11-15 16:12:18 +00:00
parent 6a4cf6490d
commit 650aa0fc91
3 changed files with 122 additions and 60 deletions
+109 -58
View File
@@ -10,13 +10,19 @@ use futures_util::StreamExt;
use parking_lot::Mutex;
use std::{
collections::HashMap,
sync::{atomic::AtomicBool, Arc},
sync::{
atomic::{AtomicBool, AtomicUsize},
Arc,
},
};
use tokio::{
sync::mpsc::{Receiver, Sender},
task::JoinHandle,
};
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
use uuid::Uuid;
use crate::{
app_data::{AppData, ContainerId, DockerControls},
app_data::{AppData, ContainerId, DockerControls, State},
app_error::AppError,
parse_args::CliArgs,
ui::{GuiState, Status},
@@ -50,6 +56,11 @@ impl Binate {
}
}
// struct Init {
// done: ,
// len:
// }
pub struct DockerData {
app_data: Arc<Mutex<AppData>>,
args: CliArgs,
@@ -57,6 +68,7 @@ pub struct DockerData {
docker: Arc<Docker>,
gui_state: Arc<Mutex<GuiState>>,
is_running: Arc<AtomicBool>,
init: Option<Arc<AtomicUsize>>,
receiver: Receiver<DockerMessage>,
spawns: Arc<Mutex<HashMap<SpawnId, JoinHandle<()>>>>,
}
@@ -96,67 +108,93 @@ impl DockerData {
app_data: Arc<Mutex<AppData>>,
docker: Arc<Docker>,
id: ContainerId,
is_running: bool,
init: Option<(Arc<AtomicUsize>, usize)>,
state: State,
spawn_id: SpawnId,
spawns: Arc<Mutex<HashMap<SpawnId, JoinHandle<()>>>>,
) {
// if dead and !init then inspect!
let mut stream = docker
.stats(
id.get(),
Some(StatsOptions {
stream: false,
one_shot: !is_running,
}),
)
.take(1);
if state.is_alive() || init.is_some() {
// // if state == State::Paused && init.is_some() {
// // app_data.lock().debug_string.push_str("is paused");
while let Some(Ok(stats)) = stream.next().await {
let mem_stat = stats.memory_stats.usage.unwrap_or_default();
let mem_limit = stats.memory_stats.limit.unwrap_or_default();
// // if let Ok(result) = docker.inspect_container(id.get(), Some(InspectContainerOptions{size:false})).await {
// // let mem_limit = format!("{}", result.host_config.map_or(0, |i|i.memory.unwrap_or_default()));
let op_key = stats
.networks
.as_ref()
.and_then(|networks| networks.keys().next().cloned());
// // app_data.lock().debug_string.push_str(&mem_limit);
// // }
let cpu_stats = Self::calculate_usage(&stats);
// // }else if state.is_alive() || init.is_some() {
let mut stream = docker
.stats(
id.get(),
Some(StatsOptions {
stream: false,
one_shot: false,
}),
)
.take(1);
let (rx, tx) = if let Some(key) = op_key {
stats
// let a = stream.next().await;
// app_data.lock().debug_string.push_str(&format!("{:?}", a.is_some()));
// let bb = a.unwrap().unwrap();
// // }
// app_data.lock().debug_string.push_str("jkl");
while let Some(Ok(stats)) = stream.next().await {
let mem_stat = if state.is_alive() {
Some(stats.memory_stats.usage.unwrap_or_default())
} else {
None
};
let mem_limit = stats.memory_stats.limit.unwrap_or_default();
let op_key = stats
.networks
.unwrap_or_default()
.get(&key)
.map_or((0, 0), |f| (f.rx_bytes, f.tx_bytes))
} else {
(0, 0)
};
.as_ref()
.and_then(|networks| networks.keys().next().cloned());
let cpu_stats = if state.is_alive() {
Some(Self::calculate_usage(&stats))
} else {
None
};
let (rx, tx) = if let Some(key) = op_key {
stats
.networks
.unwrap_or_default()
.get(&key)
.map_or((0, 0), |f| (f.rx_bytes, f.tx_bytes))
} else {
(0, 0)
};
if is_running {
app_data.lock().update_stats(
&id,
Some(cpu_stats),
Some(mem_stat),
mem_limit,
rx,
tx,
);
} else {
app_data
.lock()
.update_stats(&id, None, None, mem_limit, rx, tx);
.update_stats(&id, cpu_stats, mem_stat, mem_limit, rx, tx);
}
spawns.lock().remove(&spawn_id);
}
spawns.lock().remove(&spawn_id);
if let Some((target, _)) = init {
target.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}
/// Update all stats, spawn each container into own tokio::spawn thread
fn update_all_container_stats(&mut self, all_ids: &[(bool, ContainerId)]) {
for (is_running, id) in all_ids {
fn update_all_container_stats(&mut self, all_ids: &[(State, ContainerId)]) {
// let thing =all_ids.len();
for (state, id) in all_ids {
// let init = self.init.as_ref().map_or_else(|| None, |x| Some((Arc::clone(x), all_ids.len())));
let docker = Arc::clone(&self.docker);
let app_data = Arc::clone(&self.app_data);
let spawns = Arc::clone(&self.spawns);
let spawn_id = SpawnId::Stats((id.clone(), self.binate));
let init = self.init.as_ref().map(|i| (Arc::clone(i), all_ids.len()));
self.spawns
.lock()
.entry(spawn_id.clone())
@@ -165,7 +203,8 @@ impl DockerData {
app_data,
docker,
id.clone(),
*is_running,
init,
*state,
spawn_id,
spawns,
))
@@ -177,7 +216,7 @@ impl DockerData {
/// Get all current containers, handle into ContainerItem in the app_data struct rather than here
/// Just make sure that items sent are guaranteed to have an id
/// If in a containerised runtime, will ignore any container that uses the `/app/oxker` as an entry point, unless the `-s` flag is set
pub async fn update_all_containers(&mut self) -> Vec<(bool, ContainerId)> {
pub async fn update_all_containers(&mut self) -> Vec<(State, ContainerId)> {
let containers = self
.docker
.list_containers(Some(ListContainersOptions::<String> {
@@ -212,13 +251,7 @@ impl DockerData {
output
.into_iter()
.filter_map(|i| {
i.id.map(|id| {
(
i.state == Some("running".to_owned())
|| i.state == Some("restarting".to_owned()),
ContainerId::from(id.as_str()),
)
})
i.id.map(|id| (State::from(i.state), ContainerId::from(id.as_str())))
})
.collect::<Vec<_>>()
}
@@ -253,7 +286,7 @@ impl DockerData {
}
/// Update all logs, spawn each container into own tokio::spawn thread
fn init_all_logs(&mut self, all_ids: &[(bool, ContainerId)]) {
fn init_all_logs(&mut self, all_ids: &[(State, ContainerId)]) {
for (_, id) in all_ids {
let docker = Arc::clone(&self.docker);
let app_data = Arc::clone(&self.app_data);
@@ -275,6 +308,7 @@ impl DockerData {
.lock()
.entry(SpawnId::Log(container.id.clone()))
.or_insert_with(|| {
// TODO make a struct that can create this data
let app_data = Arc::clone(&self.app_data);
let docker = Arc::clone(&self.docker);
let id = container.id.clone();
@@ -291,17 +325,17 @@ impl DockerData {
self.gui_state.lock().status_push(Status::Init);
let loading_uuid = Uuid::new_v4();
let loading_handle = GuiState::start_loading_animation(&self.gui_state, loading_uuid);
// let handle = self.gui_state.lock().st
let all_ids = self.update_all_containers().await;
self.update_all_container_stats(&all_ids);
self.init_all_logs(&all_ids);
// wait until all logs have initialised
while !self.app_data.lock().initialised(&all_ids) {
while let Some(x) = self.init.as_ref() {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
if x.load(std::sync::atomic::Ordering::SeqCst) == all_ids.len() {
self.init = None;
}
}
self.gui_state
.lock()
@@ -422,11 +456,26 @@ impl DockerData {
}
}
/// Send an update message every x ms, where x is the args.docker_interval
fn croner(args: &CliArgs, docker_tx: Sender<DockerMessage>) {
let update_duration = std::time::Duration::from_millis(u64::from(args.docker_interval));
let mut now = std::time::Instant::now();
tokio::spawn(async move {
loop {
let to_sleep = update_duration.saturating_sub(now.elapsed());
tokio::time::sleep(to_sleep).await;
docker_tx.send(DockerMessage::Update).await.ok();
now = std::time::Instant::now();
}
});
}
/// Initialise self, and start the message receiving loop
pub async fn init(
app_data: Arc<Mutex<AppData>>,
docker: Docker,
docker_rx: Receiver<DockerMessage>,
docker_tx: Sender<DockerMessage>,
gui_state: Arc<Mutex<GuiState>>,
is_running: Arc<AtomicBool>,
) {
@@ -434,15 +483,17 @@ impl DockerData {
if app_data.lock().get_error().is_none() {
let mut inner = Self {
app_data,
args,
args: args.clone(),
binate: Binate::One,
docker: Arc::new(docker),
gui_state,
init: Some(Arc::new(AtomicUsize::new(0))),
is_running,
receiver: docker_rx,
spawns: Arc::new(Mutex::new(HashMap::new())),
};
inner.initialise_container_data().await;
Self::croner(&args, docker_tx);
inner.message_handler().await;
}
}