feat: docker_recv for all docker commands

This commit is contained in:
Jack Wills
2022-04-29 16:10:14 +00:00
parent 9c85470bdc
commit 679203cf2d
7 changed files with 270 additions and 161 deletions
+9
View File
@@ -0,0 +1,9 @@
#[derive(Debug, Clone)]
pub enum DockerMessage {
Update,
Start(String),
Restart(String),
Pause(String),
Unpause(String),
Stop(String),
}
+125 -46
View File
@@ -1,22 +1,27 @@
use bollard::{
container::{ListContainersOptions, LogsOptions, Stats, StatsOptions},
container::{ListContainersOptions, LogsOptions, StartContainerOptions, Stats, StatsOptions},
Docker,
};
use futures_util::{future::join_all, StreamExt};
use parking_lot::Mutex;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use std::sync::Arc;
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
use crate::{app_data::AppData, parse_args::CliArgs, ui::GuiState};
use crate::{
app_data::{AppData, DockerControls},
app_error::AppError,
parse_args::CliArgs,
ui::GuiState,
};
mod message;
pub use message::DockerMessage;
pub struct DockerData {
app_data: Arc<Mutex<AppData>>,
docker: Arc<Docker>,
gui_state: Arc<Mutex<GuiState>>,
initialised: bool,
sleep_duration: Duration,
receiver: Receiver<DockerMessage>,
timestamps: bool,
}
@@ -207,37 +212,23 @@ impl DockerData {
self.update_all_container_stats(&all_ids).await;
}
/// Initialise self, and start the updated loop
pub async fn init(
args: CliArgs,
app_data: Arc<Mutex<AppData>>,
docker: Arc<Docker>,
gui_state: Arc<Mutex<GuiState>>,
) {
if app_data.lock().get_error().is_none() {
let mut inner = Self {
app_data,
docker,
gui_state,
initialised: false,
sleep_duration: Duration::from_millis(args.docker_interval as u64),
timestamps: args.timestamp,
};
inner.initialise_container_data().await;
inner.update_loop().await;
}
}
async fn initialise_container_data(&mut self) {
let gui_state = Arc::clone(&self.gui_state);
// could also just loop while init is false, would need to move an arc mutex into here
// so instead just abort at end of function
let loading_spin = tokio::spawn(async move {
async fn loading_spin(gui_state: Arc<Mutex<GuiState>>) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
gui_state.lock().next_loading();
}
});
})
}
fn stop_loading_spin(handle: JoinHandle<()>, gui_state: &Arc<Mutex<GuiState>>) {
handle.abort();
gui_state.lock().reset_loading();
}
async fn initialise_container_data(&mut self) {
let gui_state = Arc::clone(&self.gui_state);
let loading_spin = Self::loading_spin(gui_state).await;
let all_ids = self.update_all_containers().await;
self.update_all_container_stats(&all_ids).await;
@@ -255,23 +246,111 @@ impl DockerData {
self.initialised = self.app_data.lock().initialised(&all_ids);
}
self.app_data.lock().init = true;
loading_spin.abort();
self.gui_state.lock().reset_loading();
Self::stop_loading_spin(loading_spin, &self.gui_state);
}
/// Update all items, wait until all complete
/// sleep for CliArgs.docker ms before updating next
async fn update_loop(&mut self) {
loop {
let start = Instant::now();
self.update_everything().await;
let elapsed = start.elapsed();
if elapsed < self.sleep_duration {
tokio::time::sleep(self.sleep_duration - elapsed).await;
/// Handle incoming messages, container controls & all container information update
async fn message_handler(&mut self) {
while let Some(message) = self.receiver.recv().await {
let docker = Arc::clone(&self.docker);
let app_data = Arc::clone(&self.app_data);
let gui_state = Arc::clone(&self.gui_state);
match message {
DockerMessage::Pause(id) => {
let spin_gui = Arc::clone(&gui_state);
let loading_spin = Self::loading_spin(gui_state).await;
tokio::spawn(async move {
docker.pause_container(&id).await.unwrap_or_else(|_| {
app_data
.lock()
.set_error(AppError::DockerCommand(DockerControls::Pause))
});
Self::stop_loading_spin(loading_spin, &spin_gui);
});
}
DockerMessage::Restart(id) => {
let spin_gui = Arc::clone(&gui_state);
let loading_spin = Self::loading_spin(gui_state).await;
tokio::spawn(async move {
docker
.restart_container(&id, None)
.await
.unwrap_or_else(|_| {
app_data
.lock()
.set_error(AppError::DockerCommand(DockerControls::Restart))
});
Self::stop_loading_spin(loading_spin, &spin_gui);
});
}
DockerMessage::Start(id) => {
let spin_gui = Arc::clone(&gui_state);
let loading_spin = Self::loading_spin(gui_state).await;
tokio::spawn(async move {
docker
.start_container(&id, None::<StartContainerOptions<String>>)
.await
.unwrap_or_else(|_| {
app_data
.lock()
.set_error(AppError::DockerCommand(DockerControls::Start))
});
Self::stop_loading_spin(loading_spin, &spin_gui);
});
}
DockerMessage::Stop(id) => {
let spin_gui = Arc::clone(&gui_state);
let loading_spin = Self::loading_spin(gui_state).await;
tokio::spawn(async move {
docker.stop_container(&id, None).await.unwrap_or_else(|_| {
app_data
.lock()
.set_error(AppError::DockerCommand(DockerControls::Stop))
});
Self::stop_loading_spin(loading_spin, &spin_gui);
});
}
DockerMessage::Unpause(id) => {
let spin_gui = Arc::clone(&gui_state);
let loading_spin = Self::loading_spin(gui_state).await;
tokio::spawn(async move {
docker.unpause_container(&id).await.unwrap_or_else(|_| {
app_data
.lock()
.set_error(AppError::DockerCommand(DockerControls::Unpause))
});
Self::stop_loading_spin(loading_spin, &spin_gui);
});
}
DockerMessage::Update => self.update_everything().await,
}
}
}
/// Initialise self, and start the updated loop
pub async fn init(
args: CliArgs,
app_data: Arc<Mutex<AppData>>,
docker: Arc<Docker>,
gui_state: Arc<Mutex<GuiState>>,
receiver: Receiver<DockerMessage>,
) {
if app_data.lock().get_error().is_none() {
let mut inner = Self {
app_data,
docker,
gui_state,
initialised: false,
receiver,
timestamps: args.timestamp,
};
inner.initialise_container_data().await;
// todo!(" change this to recv.next()");
// inner.update_loop().await;
inner.message_handler().await;
}
}
}
// tests, use redis-test container, check logs exists, and selector of logs, and that it increases, and matches end, when you run restart on the docker containers