Merge branch 'refactor/cancel_token' into dev

This commit is contained in:
Jack Wills
2024-04-17 11:25:51 +00:00
3 changed files with 36 additions and 26 deletions
Generated
+1
View File
@@ -855,6 +855,7 @@ dependencies = [
"parking_lot",
"ratatui",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
"uuid",
+1
View File
@@ -36,6 +36,7 @@ futures-util = "0.3"
parking_lot = { version = "0.12" }
ratatui = "0.26"
tokio = { version = "1.37", features = ["full"] }
tokio-util = "0.7.10"
tracing = "0.1"
tracing-subscriber = "0.3"
uuid = { version = "1.8", features = ["fast-rng", "v4"] }
+29 -21
View File
@@ -1,6 +1,6 @@
use std::{
io::{Read, Stdout, Write},
sync::{atomic::AtomicBool, Arc},
sync::{atomic::AtomicBool, mpsc::Sender, Arc},
};
use bollard::{
@@ -11,7 +11,11 @@ use crossterm::terminal::enable_raw_mode;
use futures_util::StreamExt;
use parking_lot::Mutex;
use ratatui::{backend::CrosstermBackend, Terminal};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt},
};
use tokio_util::sync::CancellationToken;
use crate::{
app_data::{AppData, ContainerId, State},
@@ -80,26 +84,34 @@ pub fn tty_readable() -> bool {
.is_ok()
}
/// Async tty reading, spawned into its own tokio thread
fn tty(run: Arc<AtomicBool>) -> Option<AsyncTTY> {
if tty_readable() {
let (tx, rx) = std::sync::mpsc::channel();
tokio::spawn(async move {
if let Ok(mut f) = tokio::fs::File::open(TTY).await {
while run.load(std::sync::atomic::Ordering::SeqCst) {
async fn tty_read_loop(mut f: File, tx: Sender<u8>, cancel_token: CancellationToken) {
loop {
let mut buf = [0];
if tokio::time::timeout(
std::time::Duration::from_millis(10),
f.read_exact(&mut buf),
)
if tokio::time::timeout(std::time::Duration::from_millis(10), f.read_exact(&mut buf))
.await
.is_ok()
&& tx.send(buf[0]).is_err()
{
run.store(false, std::sync::atomic::Ordering::SeqCst);
cancel_token.cancel();
}
}
}
/// Async tty reading, spawned into its own tokio thread
/// This should be a cancel token
fn tty(cancel_token: &CancellationToken) -> Option<AsyncTTY> {
if tty_readable() {
let (tx, rx) = std::sync::mpsc::channel();
let cancel_token = cancel_token.to_owned();
tokio::spawn(async move {
if let Ok(f) = tokio::fs::File::open(TTY).await {
let c_1 = cancel_token.clone();
let c_2 = cancel_token.clone();
tokio::select! {
() = c_1.cancelled() => (),
() = tty_read_loop(f, tx, c_2) => (),
}
}
});
Some(AsyncTTY { rx })
} else {
@@ -217,7 +229,7 @@ impl ExecMode {
docker: &Arc<Docker>,
terminal_size: Option<TerminalSize>,
) -> Result<(), AppError> {
let run = Arc::new(AtomicBool::new(true));
let cancel_token = CancellationToken::new();
if let Ok(exec_result) = docker
.create_exec(
@@ -246,21 +258,17 @@ impl ExecMode {
)
.await
{
if let Some(async_tty) = tty(Arc::clone(&run)) {
let run_thread = Arc::clone(&run);
if let Some(async_tty) = tty(&cancel_token) {
tokio::spawn(async move {
enable_raw_mode().ok();
let mut stdout = std::io::stdout();
stdout.write_all(CURSOR_POS.as_bytes()).ok();
stdout.flush().ok();
while run_thread.load(std::sync::atomic::Ordering::SeqCst) {
while let Some(Ok(x)) = output.next().await {
stdout.write_all(&x.into_bytes()).ok();
stdout.flush().ok();
}
run_thread.store(false, std::sync::atomic::Ordering::SeqCst);
}
cancel_token.cancel();
});
if let Some(terminal_size) = terminal_size {