feat: spawn docker updates into own thread

Collects spawns into a hashmap, then on next update if spawn exists in hash map, don't bother to run another update
This commit is contained in:
Jack Wills
2022-07-23 02:46:38 +00:00
parent f5504c47c5
commit d0f617820c
7 changed files with 128 additions and 83 deletions
+5 -17
View File
@@ -10,7 +10,7 @@ use super::Header;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct StatefulList<T> { pub struct StatefulList<T> {
pub state: ListState, pub state: ListState,
// HASH MAP! // todo BTreeMap
pub items: Vec<T>, pub items: Vec<T>,
} }
@@ -84,7 +84,6 @@ impl<T> StatefulList<T> {
} }
/// States of the container /// States of the container
// / impl ord
#[derive(Clone, Debug, PartialEq, PartialOrd)] #[derive(Clone, Debug, PartialEq, PartialOrd)]
pub enum State { pub enum State {
Dead, Dead,
@@ -96,17 +95,6 @@ pub enum State {
Unknown, Unknown,
} }
// impl Ord for State {
// fn cmp(&self, other: &Self) -> Ordering {
// match (self, other) {
// (Self::Dead)
// // (_, Foo::B) => Ordering::Less,
// // (Foo::A { val: l }, Foo::A { val: r }) => l.cmp(&r),
// // (Foo::B, _) => Ordering::Greater,
// }
// }
// }
impl State { impl State {
pub fn get_color(&self) -> Color { pub fn get_color(&self) -> Color {
match self { match self {
@@ -331,8 +319,8 @@ pub struct ContainerItem {
pub mem_limit: ByteStats, pub mem_limit: ByteStats,
pub mem_stats: VecDeque<ByteStats>, pub mem_stats: VecDeque<ByteStats>,
pub name: String, pub name: String,
pub net_rx: ByteStats, pub rx: ByteStats,
pub net_tx: ByteStats, pub tx: ByteStats,
pub state: State, pub state: State,
pub status: String, pub status: String,
} }
@@ -355,8 +343,8 @@ impl ContainerItem {
mem_limit: ByteStats::new(0), mem_limit: ByteStats::new(0),
mem_stats: VecDeque::with_capacity(60), mem_stats: VecDeque::with_capacity(60),
name, name,
net_rx: ByteStats::new(0), rx: ByteStats::new(0),
net_tx: ByteStats::new(0), tx: ByteStats::new(0),
state, state,
status, status,
} }
+14 -26
View File
@@ -233,24 +233,12 @@ impl AppData {
SortedOrder::Desc => self.containers.items.sort_by(|a, b| b.name.cmp(&a.name)), SortedOrder::Desc => self.containers.items.sort_by(|a, b| b.name.cmp(&a.name)),
}, },
Header::Rx => match so { Header::Rx => match so {
SortedOrder::Asc => self SortedOrder::Asc => self.containers.items.sort_by(|a, b| a.rx.cmp(&b.rx)),
.containers SortedOrder::Desc => self.containers.items.sort_by(|a, b| b.rx.cmp(&a.rx)),
.items
.sort_by(|a, b| a.net_rx.cmp(&b.net_rx)),
SortedOrder::Desc => self
.containers
.items
.sort_by(|a, b| b.net_rx.cmp(&a.net_rx)),
}, },
Header::Tx => match so { Header::Tx => match so {
SortedOrder::Asc => self SortedOrder::Asc => self.containers.items.sort_by(|a, b| a.tx.cmp(&b.tx)),
.containers SortedOrder::Desc => self.containers.items.sort_by(|a, b| b.tx.cmp(&a.tx)),
.items
.sort_by(|a, b| a.net_tx.cmp(&b.net_tx)),
SortedOrder::Desc => self
.containers
.items
.sort_by(|a, b| b.net_tx.cmp(&a.net_tx)),
}, },
} }
} }
@@ -341,8 +329,8 @@ impl AppData {
container.mem_limit container.mem_limit
)); ));
let net_rx_count = count(&container.net_rx.to_string()); let net_rx_count = count(&container.rx.to_string());
let net_tx_count = count(&container.net_tx.to_string()); let net_tx_count = count(&container.tx.to_string());
let image_count = count(&container.image); let image_count = count(&container.image);
let name_count = count(&container.name); let name_count = count(&container.name);
let state_count = count(&container.state.to_string()); let state_count = count(&container.state.to_string());
@@ -415,8 +403,8 @@ impl AppData {
container.mem_stats.push_back(ByteStats::new(mem)); container.mem_stats.push_back(ByteStats::new(mem));
} }
container.net_rx.update(rx); container.rx.update(rx);
container.net_tx.update(tx); container.tx.update(tx);
container.mem_limit.update(mem_limit); container.mem_limit.update(mem_limit);
} }
} }
@@ -527,10 +515,10 @@ impl AppData {
self.logs_parsed = true; self.logs_parsed = true;
} }
/// Update all containers logs, should only be used on first initialisation // /// Update all containers logs, should only be used on first initialisation
pub fn update_all_logs(&mut self, all_logs: Vec<Vec<String>>) { // pub fn update_all_logs(&mut self, all_logs: Vec<Vec<String>>) {
for (index, output) in all_logs.into_iter().enumerate() { // for (index, output) in all_logs.into_iter().enumerate() {
self.update_log_by_index(output, index); // self.update_log_by_index(output, index);
} // }
} // }
} }
+1
View File
@@ -6,4 +6,5 @@ pub enum DockerMessage {
Pause(String), Pause(String),
Unpause(String), Unpause(String),
Stop(String), Stop(String),
Quit,
} }
+62 -23
View File
@@ -2,9 +2,15 @@ use bollard::{
container::{ListContainersOptions, LogsOptions, StartContainerOptions, Stats, StatsOptions}, container::{ListContainersOptions, LogsOptions, StartContainerOptions, Stats, StatsOptions},
Docker, Docker,
}; };
use futures_util::{future::join_all, StreamExt}; use futures_util::StreamExt;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::sync::Arc; use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::{sync::mpsc::Receiver, task::JoinHandle}; use tokio::{sync::mpsc::Receiver, task::JoinHandle};
use crate::{ use crate::{
@@ -23,6 +29,9 @@ pub struct DockerData {
initialised: bool, initialised: bool,
receiver: Receiver<DockerMessage>, receiver: Receiver<DockerMessage>,
timestamps: bool, timestamps: bool,
spawns: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
// log_spawns: Arc<Mutex<HashMap<String, JoinHandle<Vec<String>>>>>,
is_running: Arc<AtomicBool>,
} }
impl DockerData { impl DockerData {
@@ -60,6 +69,7 @@ impl DockerData {
id: String, id: String,
app_data: Arc<Mutex<AppData>>, app_data: Arc<Mutex<AppData>>,
is_running: bool, is_running: bool,
spawns: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
) { ) {
let mut stream = docker let mut stream = docker
.stats( .stats(
@@ -108,6 +118,7 @@ impl DockerData {
.update_stats(id.clone(), None, None, mem_limit, rx, tx); .update_stats(id.clone(), None, None, mem_limit, rx, tx);
} }
} }
spawns.lock().remove(&id);
} }
/// Update all stats, spawn each container into own tokio::spawn thread /// Update all stats, spawn each container into own tokio::spawn thread
@@ -115,11 +126,19 @@ impl DockerData {
for (is_running, id) in all_ids.iter() { for (is_running, id) in all_ids.iter() {
let docker = Arc::clone(&self.docker); let docker = Arc::clone(&self.docker);
let app_data = Arc::clone(&self.app_data); let app_data = Arc::clone(&self.app_data);
let spawns = Arc::clone(&self.spawns);
let is_running = *is_running; let is_running = *is_running;
let id = id.to_owned(); let id = id.to_owned();
tokio::spawn(Self::update_container_stat(
docker, id, app_data, is_running, let spawn_contains_id = spawns.lock().contains_key(&id);
)); if !spawn_contains_id {
self.spawns.lock().insert(
id.to_owned(),
tokio::spawn(Self::update_container_stat(
docker, id, app_data, is_running, spawns,
)),
);
}
} }
} }
@@ -168,7 +187,10 @@ impl DockerData {
id: String, id: String,
timestamps: bool, timestamps: bool,
since: i64, since: i64,
) -> Vec<String> { app_data: Arc<Mutex<AppData>>,
spawns: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
index: usize
) {
let options = Some(LogsOptions::<String> { let options = Some(LogsOptions::<String> {
stdout: true, stdout: true,
timestamps, timestamps,
@@ -188,38 +210,44 @@ impl DockerData {
} }
} }
} }
output
app_data.lock().update_log_by_index(output, index);
spawns.lock().remove(&id);
} }
// async fn stop(&self) {
// self.docker.
// }
/// Update all logs, spawn each container into own tokio::spawn thread /// Update all logs, spawn each container into own tokio::spawn thread
async fn init_all_logs(&mut self, all_ids: &[(bool, String)]) { async fn init_all_logs(&mut self, all_ids: &[(bool, String)]) {
let mut handles = vec![]; // let mut handles = vec![];
for (_, id) in all_ids.iter() { for (index, (_, id)) in all_ids.iter().enumerate() {
let docker = Arc::clone(&self.docker); let docker = Arc::clone(&self.docker);
let timestamps = self.timestamps; let timestamps = self.timestamps;
let id = id.to_owned(); let id = id.to_owned();
handles.push(Self::update_log(docker, id, timestamps, 0)); let app_data = Arc::clone(&self.app_data);
let spawns = Arc::clone(&self.spawns);
self.spawns.lock().insert(id.to_owned(), tokio::spawn(Self::update_log(docker, id, timestamps, 0, app_data, spawns, index)));
} }
let all_logs = join_all(handles).await;
self.app_data.lock().update_all_logs(all_logs);
} }
async fn update_everything(&mut self) { async fn update_everything(&mut self) {
let all_ids = self.update_all_containers().await; let all_ids = self.update_all_containers().await;
let optional_index = self.app_data.lock().get_selected_log_index(); let optional_index = self.app_data.lock().get_selected_log_index();
if let Some(index) = optional_index { if let Some(index) = optional_index {
let id = self.app_data.lock().containers.items[index].id.to_owned(); let id = self.app_data.lock().containers.items[index].id.to_owned();
let since = self.app_data.lock().containers.items[index].last_updated as i64;
let docker = Arc::clone(&self.docker); let running = self.spawns.lock().contains_key(&id);
let timestamps = self.timestamps;
let logs = Self::update_log(docker, id, timestamps, since).await; if !running {
self.app_data.lock().update_log_by_index(logs, index); let since = self.app_data.lock().containers.items[index].last_updated as i64;
let docker = Arc::clone(&self.docker);
let timestamps = self.timestamps;
let app_data = Arc::clone(&self.app_data);
let spawns = Arc::clone(&self.spawns);
self.spawns.lock().insert(id.to_owned(), tokio::spawn(Self::update_log(docker, id, timestamps, since, app_data, spawns, index)));
}
}; };
self.update_all_container_stats(&all_ids).await; self.update_all_container_stats(&all_ids).await;
@@ -324,6 +352,14 @@ impl DockerData {
self.update_everything().await self.update_everything().await
} }
DockerMessage::Update => self.update_everything().await, DockerMessage::Update => self.update_everything().await,
DockerMessage::Quit => {
self.spawns
.lock()
.values()
.into_iter()
.for_each(|i| i.abort());
self.is_running.store(false, Ordering::SeqCst);
}
} }
} }
} }
@@ -335,6 +371,7 @@ impl DockerData {
docker: Arc<Docker>, docker: Arc<Docker>,
gui_state: Arc<Mutex<GuiState>>, gui_state: Arc<Mutex<GuiState>>,
receiver: Receiver<DockerMessage>, receiver: Receiver<DockerMessage>,
is_running: Arc<AtomicBool>,
) { ) {
if app_data.lock().get_error().is_none() { if app_data.lock().get_error().is_none() {
let mut inner = Self { let mut inner = Self {
@@ -343,7 +380,9 @@ impl DockerData {
gui_state, gui_state,
initialised: false, initialised: false,
receiver, receiver,
spawns: Arc::new(Mutex::new(HashMap::new())),
timestamps: args.timestamp, timestamps: args.timestamp,
is_running,
}; };
inner.initialise_container_data().await; inner.initialise_container_data().await;
+41 -14
View File
@@ -77,6 +77,7 @@ impl InputHandler {
} }
} }
/// Mouse button
fn m_button(&mut self) { fn m_button(&mut self) {
if self.mouse_capture { if self.mouse_capture {
match execute!(std::io::stdout(), DisableMouseCapture) { match execute!(std::io::stdout(), DisableMouseCapture) {
@@ -120,13 +121,21 @@ impl InputHandler {
let mut output = Some((header.to_owned(), SortedOrder::Desc)); let mut output = Some((header.to_owned(), SortedOrder::Desc));
let mut locked_data = self.app_data.lock(); let mut locked_data = self.app_data.lock();
if let Some((h, order)) = locked_data.get_sorted().as_ref() { if let Some((h, order)) = locked_data.get_sorted().as_ref() {
if &SortedOrder::Desc == order && h == &header { if &SortedOrder::Desc == order && h == &header {
output = Some((header, SortedOrder::Asc)) output = Some((header, SortedOrder::Asc))
} }
} }
locked_data.set_sorted(output) locked_data.set_sorted(output)
} }
/// Send a quit message to docker, to abort all spawns, if error, quit here instead
async fn quit(&self) {
match self.docker_sender.send(DockerMessage::Quit).await {
Ok(_) => (),
Err(_) => self.is_running.store(false, Ordering::SeqCst),
}
}
/// Handle any keyboard button events /// Handle any keyboard button events
async fn button_press(&mut self, key_code: KeyCode) { async fn button_press(&mut self, key_code: KeyCode) {
let show_error = self.app_data.lock().show_error; let show_error = self.app_data.lock().show_error;
@@ -134,9 +143,7 @@ impl InputHandler {
if show_error { if show_error {
match key_code { match key_code {
KeyCode::Char('q') => { KeyCode::Char('q') => self.quit().await,
self.is_running.store(false, Ordering::SeqCst);
}
KeyCode::Char('c') => { KeyCode::Char('c') => {
self.app_data.lock().show_error = false; self.app_data.lock().show_error = false;
self.app_data.lock().remove_error(); self.app_data.lock().remove_error();
@@ -145,7 +152,7 @@ impl InputHandler {
} }
} else if show_info { } else if show_info {
match key_code { match key_code {
KeyCode::Char('q') => self.is_running.store(false, Ordering::SeqCst), KeyCode::Char('q') => self.quit().await,
KeyCode::Char('h') => self.gui_state.lock().show_help = false, KeyCode::Char('h') => self.gui_state.lock().show_help = false,
KeyCode::Char('m') => self.m_button(), KeyCode::Char('m') => self.m_button(),
_ => (), _ => (),
@@ -162,16 +169,36 @@ impl InputHandler {
KeyCode::Char('7') => self.sort(Header::Image), KeyCode::Char('7') => self.sort(Header::Image),
KeyCode::Char('8') => self.sort(Header::Rx), KeyCode::Char('8') => self.sort(Header::Rx),
KeyCode::Char('9') => self.sort(Header::Tx), KeyCode::Char('9') => self.sort(Header::Tx),
KeyCode::Char('q') => self.is_running.store(false, Ordering::SeqCst), KeyCode::Char('q') => self.quit().await,
KeyCode::Char('h') => self.gui_state.lock().show_help = true, KeyCode::Char('h') => self.gui_state.lock().show_help = true,
KeyCode::Char('m') => self.m_button(), KeyCode::Char('m') => self.m_button(),
KeyCode::Tab => { KeyCode::Tab => {
// TODO if no containers, skip controls panel // Skip control panel if no containers, could be refactored
self.gui_state.lock().next_panel(); let has_containers = self.app_data.lock().get_container_len() == 0;
let is_containers =
self.gui_state.lock().selected_panel == SelectablePanel::Containers;
let count = if has_containers && is_containers {
2
} else {
1
};
for _ in 0..count {
self.gui_state.lock().next_panel();
}
} }
KeyCode::BackTab => { KeyCode::BackTab => {
// TODO if no containers, skip controls panel // Skip control panel if no containers, could be refactored
self.gui_state.lock().previous_panel(); let has_containers = self.app_data.lock().get_container_len() == 0;
let is_containers =
self.gui_state.lock().selected_panel == SelectablePanel::Logs;
let count = if has_containers && is_containers {
2
} else {
1
};
for _ in 0..count {
self.gui_state.lock().previous_panel();
}
} }
KeyCode::Home => { KeyCode::Home => {
let mut locked_data = self.app_data.lock(); let mut locked_data = self.app_data.lock();
@@ -274,7 +301,7 @@ impl InputHandler {
} }
} }
/// Change state of selected container /// Change state to next, depending which panel is currently in focus
fn next(&mut self) { fn next(&mut self) {
let mut locked_data = self.app_data.lock(); let mut locked_data = self.app_data.lock();
match self.gui_state.lock().selected_panel { match self.gui_state.lock().selected_panel {
@@ -284,7 +311,7 @@ impl InputHandler {
}; };
} }
/// Change state of selected container /// Change state to previous, depending which panel is currently in focus
fn previous(&mut self) { fn previous(&mut self) {
let mut locked_data = self.app_data.lock(); let mut locked_data = self.app_data.lock();
match self.gui_state.lock().selected_panel { match self.gui_state.lock().selected_panel {
+3 -1
View File
@@ -26,6 +26,7 @@ async fn main() {
let args = CliArgs::new(); let args = CliArgs::new();
let app_data = Arc::new(Mutex::new(AppData::default(args.clone()))); let app_data = Arc::new(Mutex::new(AppData::default(args.clone())));
let gui_state = Arc::new(Mutex::new(GuiState::default())); let gui_state = Arc::new(Mutex::new(GuiState::default()));
let is_running = Arc::new(AtomicBool::new(true));
let docker_args = args.clone(); let docker_args = args.clone();
let docker_app_data = Arc::clone(&app_data); let docker_app_data = Arc::clone(&app_data);
@@ -38,12 +39,14 @@ async fn main() {
match docker.ping().await { match docker.ping().await {
Ok(_) => { Ok(_) => {
let docker = Arc::clone(&docker); let docker = Arc::clone(&docker);
let is_running = Arc::clone(&is_running);
tokio::spawn(DockerData::init( tokio::spawn(DockerData::init(
docker_args, docker_args,
docker_app_data, docker_app_data,
docker, docker,
docker_gui_state, docker_gui_state,
docker_rx, docker_rx,
is_running,
)); ));
} }
Err(_) => app_data.lock().set_error(AppError::DockerConnect), Err(_) => app_data.lock().set_error(AppError::DockerConnect),
@@ -53,7 +56,6 @@ async fn main() {
let (input_sx, input_rx) = tokio::sync::mpsc::channel(16); let (input_sx, input_rx) = tokio::sync::mpsc::channel(16);
let is_running = Arc::new(AtomicBool::new(true));
let input_is_running = Arc::clone(&is_running); let input_is_running = Arc::clone(&is_running);
let input_gui_state = Arc::clone(&gui_state); let input_gui_state = Arc::clone(&gui_state);
let input_docker_sender = docker_sx.clone(); let input_docker_sender = docker_sx.clone();
+2 -2
View File
@@ -180,11 +180,11 @@ pub fn draw_containers<B: Backend>(
blue, blue,
), ),
Span::styled( Span::styled(
format!("{}{:>width$}", MARGIN, i.net_rx, width = widths.net_rx.1), format!("{}{:>width$}", MARGIN, i.rx, width = widths.net_rx.1),
Style::default().fg(Color::Rgb(255, 233, 193)), Style::default().fg(Color::Rgb(255, 233, 193)),
), ),
Span::styled( Span::styled(
format!("{}{:>width$}", MARGIN, i.net_tx, width = widths.net_tx.1), format!("{}{:>width$}", MARGIN, i.tx, width = widths.net_tx.1),
Style::default().fg(Color::Rgb(205, 140, 140)), Style::default().fg(Color::Rgb(205, 140, 140)),
), ),
]); ]);