diff --git a/Cargo.lock b/Cargo.lock index a8ea256..55aa956 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -855,6 +855,7 @@ dependencies = [ "parking_lot", "ratatui", "tokio", + "tokio-util", "tracing", "tracing-subscriber", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 12b1974..298cbd9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ futures-util = "0.3" parking_lot = { version = "0.12" } ratatui = "0.26" tokio = { version = "1.37", features = ["full"] } +tokio-util = "0.7.10" tracing = "0.1" tracing-subscriber = "0.3" uuid = { version = "1.8", features = ["fast-rng", "v4"] } diff --git a/src/exec.rs b/src/exec.rs index 06b51e1..37c6df8 100644 --- a/src/exec.rs +++ b/src/exec.rs @@ -1,6 +1,6 @@ use std::{ io::{Read, Stdout, Write}, - sync::{atomic::AtomicBool, Arc}, + sync::{atomic::AtomicBool, mpsc::Sender, Arc}, }; use bollard::{ @@ -11,7 +11,11 @@ use crossterm::terminal::enable_raw_mode; use futures_util::StreamExt; use parking_lot::Mutex; use ratatui::{backend::CrosstermBackend, Terminal}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncWriteExt}, +}; +use tokio_util::sync::CancellationToken; use crate::{ app_data::{AppData, ContainerId, State}, @@ -80,24 +84,32 @@ pub fn tty_readable() -> bool { .is_ok() } +async fn tty_read_loop(mut f: File, tx: Sender, cancel_token: CancellationToken) { + loop { + let mut buf = [0]; + if tokio::time::timeout(std::time::Duration::from_millis(10), f.read_exact(&mut buf)) + .await + .is_ok() + && tx.send(buf[0]).is_err() + { + cancel_token.cancel(); + } + } +} /// Async tty reading, spawned into its own tokio thread -fn tty(run: Arc) -> Option { +/// This should be a cancel token +fn tty(cancel_token: &CancellationToken) -> Option { if tty_readable() { let (tx, rx) = std::sync::mpsc::channel(); + let cancel_token = cancel_token.to_owned(); tokio::spawn(async move { - if let Ok(mut f) = tokio::fs::File::open(TTY).await { - while run.load(std::sync::atomic::Ordering::SeqCst) { - let mut buf = [0]; - if tokio::time::timeout( - std::time::Duration::from_millis(10), - f.read_exact(&mut buf), - ) - .await - .is_ok() - && tx.send(buf[0]).is_err() - { - run.store(false, std::sync::atomic::Ordering::SeqCst); - } + if let Ok(f) = tokio::fs::File::open(TTY).await { + let c_1 = cancel_token.clone(); + let c_2 = cancel_token.clone(); + + tokio::select! { + () = c_1.cancelled() => (), + () = tty_read_loop(f, tx, c_2) => (), } } }); @@ -217,7 +229,7 @@ impl ExecMode { docker: &Arc, terminal_size: Option, ) -> Result<(), AppError> { - let run = Arc::new(AtomicBool::new(true)); + let cancel_token = CancellationToken::new(); if let Ok(exec_result) = docker .create_exec( @@ -246,21 +258,17 @@ impl ExecMode { ) .await { - if let Some(async_tty) = tty(Arc::clone(&run)) { - let run_thread = Arc::clone(&run); + if let Some(async_tty) = tty(&cancel_token) { tokio::spawn(async move { enable_raw_mode().ok(); let mut stdout = std::io::stdout(); stdout.write_all(CURSOR_POS.as_bytes()).ok(); stdout.flush().ok(); - - while run_thread.load(std::sync::atomic::Ordering::SeqCst) { - while let Some(Ok(x)) = output.next().await { - stdout.write_all(&x.into_bytes()).ok(); - stdout.flush().ok(); - } - run_thread.store(false, std::sync::atomic::Ordering::SeqCst); + while let Some(Ok(x)) = output.next().await { + stdout.write_all(&x.into_bytes()).ok(); + stdout.flush().ok(); } + cancel_token.cancel(); }); if let Some(terminal_size) = terminal_size {