feat: spawn docker exec commands into own thread
This commit is contained in:
+86
-83
@@ -3,7 +3,7 @@ use bollard::{
|
|||||||
service::ContainerSummary,
|
service::ContainerSummary,
|
||||||
Docker,
|
Docker,
|
||||||
};
|
};
|
||||||
use futures_util::{Future, StreamExt};
|
use futures_util::StreamExt;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
@@ -33,7 +33,7 @@ enum SpawnId {
|
|||||||
|
|
||||||
/// Cpu & Mem stats take twice as long as the update interval to get a value, so will have two being executed at the same time
|
/// Cpu & Mem stats take twice as long as the update interval to get a value, so will have two being executed at the same time
|
||||||
/// SpawnId::Stats takes container_id and binate value to enable both cycles of the same container_id to be inserted into the hashmap
|
/// SpawnId::Stats takes container_id and binate value to enable both cycles of the same container_id to be inserted into the hashmap
|
||||||
/// Binate value is toggled when all join handles have been spawned off
|
/// Binate value is toggled when all handles have been spawned off
|
||||||
/// Also effectively means that if the docker_update interval minimum will be 1000ms
|
/// Also effectively means that if the docker_update interval minimum will be 1000ms
|
||||||
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
|
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)]
|
||||||
enum Binate {
|
enum Binate {
|
||||||
@@ -56,7 +56,6 @@ pub struct DockerData {
|
|||||||
binate: Binate,
|
binate: Binate,
|
||||||
docker: Arc<Docker>,
|
docker: Arc<Docker>,
|
||||||
gui_state: Arc<Mutex<GuiState>>,
|
gui_state: Arc<Mutex<GuiState>>,
|
||||||
initialised: bool,
|
|
||||||
is_running: Arc<AtomicBool>,
|
is_running: Arc<AtomicBool>,
|
||||||
receiver: Receiver<DockerMessage>,
|
receiver: Receiver<DockerMessage>,
|
||||||
spawns: Arc<Mutex<HashMap<SpawnId, JoinHandle<()>>>>,
|
spawns: Arc<Mutex<HashMap<SpawnId, JoinHandle<()>>>>,
|
||||||
@@ -94,12 +93,12 @@ impl DockerData {
|
|||||||
/// don't take &self, so that can tokio::spawn into it's own thread
|
/// don't take &self, so that can tokio::spawn into it's own thread
|
||||||
/// remove if from spawns hashmap when complete
|
/// remove if from spawns hashmap when complete
|
||||||
async fn update_container_stat(
|
async fn update_container_stat(
|
||||||
|
app_data: Arc<Mutex<AppData>>,
|
||||||
docker: Arc<Docker>,
|
docker: Arc<Docker>,
|
||||||
id: ContainerId,
|
id: ContainerId,
|
||||||
app_data: Arc<Mutex<AppData>>,
|
|
||||||
is_running: bool,
|
is_running: bool,
|
||||||
spawns: Arc<Mutex<HashMap<SpawnId, JoinHandle<()>>>>,
|
|
||||||
spawn_id: SpawnId,
|
spawn_id: SpawnId,
|
||||||
|
spawns: Arc<Mutex<HashMap<SpawnId, JoinHandle<()>>>>,
|
||||||
) {
|
) {
|
||||||
let mut stream = docker
|
let mut stream = docker
|
||||||
.stats(
|
.stats(
|
||||||
@@ -156,18 +155,18 @@ impl DockerData {
|
|||||||
let docker = Arc::clone(&self.docker);
|
let docker = Arc::clone(&self.docker);
|
||||||
let app_data = Arc::clone(&self.app_data);
|
let app_data = Arc::clone(&self.app_data);
|
||||||
let spawns = Arc::clone(&self.spawns);
|
let spawns = Arc::clone(&self.spawns);
|
||||||
let spawn_key = SpawnId::Stats((id.clone(), self.binate));
|
let spawn_id = SpawnId::Stats((id.clone(), self.binate));
|
||||||
self.spawns
|
self.spawns
|
||||||
.lock()
|
.lock()
|
||||||
.entry(spawn_key.clone())
|
.entry(spawn_id.clone())
|
||||||
.or_insert_with(|| {
|
.or_insert_with(|| {
|
||||||
tokio::spawn(Self::update_container_stat(
|
tokio::spawn(Self::update_container_stat(
|
||||||
|
app_data,
|
||||||
docker,
|
docker,
|
||||||
id.clone(),
|
id.clone(),
|
||||||
app_data,
|
|
||||||
*is_running,
|
*is_running,
|
||||||
|
spawn_id,
|
||||||
spawns,
|
spawns,
|
||||||
spawn_key,
|
|
||||||
))
|
))
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -223,19 +222,17 @@ impl DockerData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Update single container logs
|
/// Update single container logs
|
||||||
/// don't take &self, so that can tokio::spawn into it's own thread
|
/// remove it from spawns hashmap when complete
|
||||||
/// remove if from spawns hashmap when complete
|
|
||||||
async fn update_log(
|
async fn update_log(
|
||||||
|
app_data: Arc<Mutex<AppData>>,
|
||||||
docker: Arc<Docker>,
|
docker: Arc<Docker>,
|
||||||
id: ContainerId,
|
id: ContainerId,
|
||||||
timestamps: bool,
|
|
||||||
since: u64,
|
since: u64,
|
||||||
app_data: Arc<Mutex<AppData>>,
|
|
||||||
spawns: Arc<Mutex<HashMap<SpawnId, JoinHandle<()>>>>,
|
spawns: Arc<Mutex<HashMap<SpawnId, JoinHandle<()>>>>,
|
||||||
) {
|
) {
|
||||||
let options = Some(LogsOptions::<String> {
|
let options = Some(LogsOptions::<String> {
|
||||||
stdout: true,
|
stdout: true,
|
||||||
timestamps,
|
timestamps: true,
|
||||||
since: i64::try_from(since).unwrap_or_default(),
|
since: i64::try_from(since).unwrap_or_default(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
});
|
});
|
||||||
@@ -243,16 +240,14 @@ impl DockerData {
|
|||||||
let mut logs = docker.logs(id.get(), options);
|
let mut logs = docker.logs(id.get(), options);
|
||||||
let mut output = vec![];
|
let mut output = vec![];
|
||||||
|
|
||||||
while let Some(value) = logs.next().await {
|
while let Some(Ok(value)) = logs.next().await {
|
||||||
if let Ok(data) = value {
|
let data = value.to_string();
|
||||||
let log_string = data.to_string();
|
if !data.trim().is_empty() {
|
||||||
if !log_string.trim().is_empty() {
|
output.push(data);
|
||||||
output.push(log_string);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
spawns.lock().remove(&SpawnId::Log(id.clone()));
|
spawns.lock().remove(&SpawnId::Log(id.clone()));
|
||||||
app_data.lock().update_log_by_id(&output, &id);
|
app_data.lock().update_log_by_id(output, &id);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update all logs, spawn each container into own tokio::spawn thread
|
/// Update all logs, spawn each container into own tokio::spawn thread
|
||||||
@@ -264,14 +259,7 @@ impl DockerData {
|
|||||||
let key = SpawnId::Log(id.clone());
|
let key = SpawnId::Log(id.clone());
|
||||||
self.spawns.lock().insert(
|
self.spawns.lock().insert(
|
||||||
key,
|
key,
|
||||||
tokio::spawn(Self::update_log(
|
tokio::spawn(Self::update_log(app_data, docker, id.clone(), 0, spawns)),
|
||||||
docker,
|
|
||||||
id.clone(),
|
|
||||||
self.args.timestamp,
|
|
||||||
0,
|
|
||||||
app_data,
|
|
||||||
spawns,
|
|
||||||
)),
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -290,11 +278,10 @@ impl DockerData {
|
|||||||
let app_data = Arc::clone(&self.app_data);
|
let app_data = Arc::clone(&self.app_data);
|
||||||
let spawns = Arc::clone(&self.spawns);
|
let spawns = Arc::clone(&self.spawns);
|
||||||
tokio::spawn(Self::update_log(
|
tokio::spawn(Self::update_log(
|
||||||
|
app_data,
|
||||||
docker,
|
docker,
|
||||||
container.id.clone(),
|
container.id.clone(),
|
||||||
self.args.timestamp,
|
|
||||||
container.last_updated,
|
container.last_updated,
|
||||||
app_data,
|
|
||||||
spawns,
|
spawns,
|
||||||
))
|
))
|
||||||
});
|
});
|
||||||
@@ -305,8 +292,8 @@ impl DockerData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Animate the loading icon
|
/// Animate the loading icon
|
||||||
async fn loading_spin(&mut self, loading_uuid: Uuid) -> JoinHandle<()> {
|
async fn loading_spin(loading_uuid: Uuid, gui_state: &Arc<Mutex<GuiState>>) -> JoinHandle<()> {
|
||||||
let gui_state = Arc::clone(&self.gui_state);
|
let gui_state = Arc::clone(&gui_state);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
@@ -316,89 +303,106 @@ impl DockerData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Stop the loading_spin function, and reset gui loading status
|
/// Stop the loading_spin function, and reset gui loading status
|
||||||
fn stop_loading_spin(&mut self, handle: &JoinHandle<()>, loading_uuid: Uuid) {
|
fn stop_loading_spin(
|
||||||
|
gui_state: &Arc<Mutex<GuiState>>,
|
||||||
|
handle: &JoinHandle<()>,
|
||||||
|
loading_uuid: Uuid,
|
||||||
|
) {
|
||||||
handle.abort();
|
handle.abort();
|
||||||
self.gui_state.lock().remove_loading(loading_uuid);
|
gui_state.lock().remove_loading(loading_uuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Initialize docker container data, before any messages are received
|
/// Initialize docker container data, before any messages are received
|
||||||
async fn initialise_container_data(&mut self) {
|
async fn initialise_container_data(&mut self) {
|
||||||
self.gui_state.lock().status_push(Status::Init);
|
self.gui_state.lock().status_push(Status::Init);
|
||||||
let loading_uuid = Uuid::new_v4();
|
let loading_uuid = Uuid::new_v4();
|
||||||
let loading_spin = self.loading_spin(loading_uuid).await;
|
let loading_spin = Self::loading_spin(loading_uuid, &Arc::clone(&self.gui_state)).await;
|
||||||
|
|
||||||
let all_ids = self.update_all_containers().await;
|
let all_ids = self.update_all_containers().await;
|
||||||
|
|
||||||
self.update_all_container_stats(&all_ids);
|
self.update_all_container_stats(&all_ids);
|
||||||
|
|
||||||
// Maybe only do a single one at first?
|
|
||||||
self.init_all_logs(&all_ids);
|
self.init_all_logs(&all_ids);
|
||||||
|
|
||||||
if all_ids.is_empty() {
|
|
||||||
self.initialised = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait until all logs have initialised
|
// wait until all logs have initialised
|
||||||
while !self.initialised {
|
while !self.app_data.lock().initialised(&all_ids) {
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
self.initialised = self.app_data.lock().initialised(&all_ids);
|
|
||||||
}
|
}
|
||||||
self.gui_state.lock().status_del(Status::Init);
|
self.gui_state.lock().status_del(Status::Init);
|
||||||
self.stop_loading_spin(&loading_spin, loading_uuid);
|
Self::stop_loading_spin(&self.gui_state, &loading_spin, loading_uuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set the global error as the docker error, and set gui_state to error
|
/// Set the global error as the docker error, and set gui_state to error
|
||||||
fn set_error(&mut self, error: DockerControls) {
|
fn set_error(
|
||||||
self.app_data
|
app_data: &Arc<Mutex<AppData>>,
|
||||||
.lock()
|
error: DockerControls,
|
||||||
.set_error(AppError::DockerCommand(error));
|
gui_state: &Arc<Mutex<GuiState>>,
|
||||||
self.gui_state.lock().status_push(Status::Error);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Execute a docker command, will start and stop the loading spinner, and set correct error
|
|
||||||
async fn exec_docker(
|
|
||||||
&mut self,
|
|
||||||
docker_fn: impl Future<Output = Result<(), bollard::errors::Error>> + Send,
|
|
||||||
control: DockerControls,
|
|
||||||
) {
|
) {
|
||||||
let uuid = Uuid::new_v4();
|
app_data.lock().set_error(AppError::DockerCommand(error));
|
||||||
let loading_spin = self.loading_spin(uuid).await;
|
gui_state.lock().status_push(Status::Error);
|
||||||
if docker_fn.await.is_err() {
|
|
||||||
self.set_error(control);
|
|
||||||
};
|
|
||||||
self.stop_loading_spin(&loading_spin, uuid);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle incoming messages, container controls & all container information update
|
/// Handle incoming messages, container controls & all container information update
|
||||||
|
/// Spawn dowcker commands off into own thread
|
||||||
async fn message_handler(&mut self) {
|
async fn message_handler(&mut self) {
|
||||||
while let Some(message) = self.receiver.recv().await {
|
while let Some(message) = self.receiver.recv().await {
|
||||||
let docker = Arc::clone(&self.docker);
|
let docker = Arc::clone(&self.docker);
|
||||||
|
let gui_state = Arc::clone(&self.gui_state);
|
||||||
|
let app_data = Arc::clone(&self.app_data);
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
match message {
|
match message {
|
||||||
DockerMessage::Pause(id) => {
|
DockerMessage::Pause(id) => {
|
||||||
self.exec_docker(docker.pause_container(id.get()), DockerControls::Pause)
|
tokio::spawn(async move {
|
||||||
.await;
|
let loading_spin = Self::loading_spin(uuid, &gui_state).await;
|
||||||
|
if docker.pause_container(id.get()).await.is_err() {
|
||||||
|
Self::set_error(&app_data, DockerControls::Pause, &gui_state);
|
||||||
|
}
|
||||||
|
Self::stop_loading_spin(&gui_state, &loading_spin, uuid);
|
||||||
|
});
|
||||||
|
self.update_everything().await;
|
||||||
}
|
}
|
||||||
DockerMessage::Restart(id) => {
|
DockerMessage::Restart(id) => {
|
||||||
self.exec_docker(
|
tokio::spawn(async move {
|
||||||
docker.restart_container(id.get(), None),
|
let loading_spin = Self::loading_spin(uuid, &gui_state).await;
|
||||||
DockerControls::Restart,
|
if docker.restart_container(id.get(), None).await.is_err() {
|
||||||
)
|
Self::set_error(&app_data, DockerControls::Restart, &gui_state);
|
||||||
.await;
|
}
|
||||||
|
Self::stop_loading_spin(&gui_state, &loading_spin, uuid);
|
||||||
|
});
|
||||||
|
self.update_everything().await;
|
||||||
}
|
}
|
||||||
DockerMessage::Start(id) => {
|
DockerMessage::Start(id) => {
|
||||||
self.exec_docker(
|
tokio::spawn(async move {
|
||||||
docker.start_container(id.get(), None::<StartContainerOptions<String>>),
|
let loading_spin = Self::loading_spin(uuid, &gui_state).await;
|
||||||
DockerControls::Start,
|
if docker
|
||||||
)
|
.start_container(id.get(), None::<StartContainerOptions<String>>)
|
||||||
.await;
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
Self::set_error(&app_data, DockerControls::Start, &gui_state);
|
||||||
|
}
|
||||||
|
Self::stop_loading_spin(&gui_state, &loading_spin, uuid);
|
||||||
|
});
|
||||||
|
self.update_everything().await;
|
||||||
}
|
}
|
||||||
DockerMessage::Stop(id) => {
|
DockerMessage::Stop(id) => {
|
||||||
self.exec_docker(docker.stop_container(id.get(), None), DockerControls::Stop)
|
tokio::spawn(async move {
|
||||||
.await;
|
let loading_spin = Self::loading_spin(uuid, &gui_state).await;
|
||||||
|
if docker.stop_container(id.get(), None).await.is_err() {
|
||||||
|
Self::set_error(&app_data, DockerControls::Stop, &gui_state);
|
||||||
|
}
|
||||||
|
Self::stop_loading_spin(&gui_state, &loading_spin, uuid);
|
||||||
|
});
|
||||||
|
self.update_everything().await;
|
||||||
}
|
}
|
||||||
DockerMessage::Unpause(id) => {
|
DockerMessage::Unpause(id) => {
|
||||||
self.exec_docker(docker.unpause_container(id.get()), DockerControls::Unpause)
|
tokio::spawn(async move {
|
||||||
.await;
|
let loading_spin = Self::loading_spin(uuid, &gui_state).await;
|
||||||
|
if docker.unpause_container(id.get()).await.is_err() {
|
||||||
|
Self::set_error(&app_data, DockerControls::Unpause, &gui_state);
|
||||||
|
}
|
||||||
|
Self::stop_loading_spin(&gui_state, &loading_spin, uuid);
|
||||||
|
});
|
||||||
self.update_everything().await;
|
self.update_everything().await;
|
||||||
}
|
}
|
||||||
DockerMessage::Update => self.update_everything().await,
|
DockerMessage::Update => self.update_everything().await,
|
||||||
@@ -418,8 +422,8 @@ impl DockerData {
|
|||||||
pub async fn init(
|
pub async fn init(
|
||||||
app_data: Arc<Mutex<AppData>>,
|
app_data: Arc<Mutex<AppData>>,
|
||||||
docker: Docker,
|
docker: Docker,
|
||||||
|
docker_rx: Receiver<DockerMessage>,
|
||||||
gui_state: Arc<Mutex<GuiState>>,
|
gui_state: Arc<Mutex<GuiState>>,
|
||||||
receiver: Receiver<DockerMessage>,
|
|
||||||
is_running: Arc<AtomicBool>,
|
is_running: Arc<AtomicBool>,
|
||||||
) {
|
) {
|
||||||
let args = app_data.lock().args;
|
let args = app_data.lock().args;
|
||||||
@@ -427,13 +431,12 @@ impl DockerData {
|
|||||||
let mut inner = Self {
|
let mut inner = Self {
|
||||||
app_data,
|
app_data,
|
||||||
args,
|
args,
|
||||||
|
binate: Binate::One,
|
||||||
docker: Arc::new(docker),
|
docker: Arc::new(docker),
|
||||||
gui_state,
|
gui_state,
|
||||||
initialised: false,
|
|
||||||
receiver,
|
|
||||||
spawns: Arc::new(Mutex::new(HashMap::new())),
|
|
||||||
is_running,
|
is_running,
|
||||||
binate: Binate::One,
|
receiver: docker_rx,
|
||||||
|
spawns: Arc::new(Mutex::new(HashMap::new())),
|
||||||
};
|
};
|
||||||
inner.initialise_container_data().await;
|
inner.initialise_container_data().await;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user