From 3e8a541daa87a0b50802825abffb66ba28e3d955 Mon Sep 17 00:00:00 2001 From: Callum Leslie Date: Wed, 4 Mar 2026 20:21:04 +0000 Subject: [PATCH 1/8] test: set_mut tests --- src/sync/crossbeam.rs | 32 ++++++++++++++++++++++++++++++++ src/sync/flume.rs | 32 ++++++++++++++++++++++++++++++++ src/sync/std.rs | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 96 insertions(+) diff --git a/src/sync/crossbeam.rs b/src/sync/crossbeam.rs index 4e72624..cb8e6a9 100644 --- a/src/sync/crossbeam.rs +++ b/src/sync/crossbeam.rs @@ -132,6 +132,38 @@ mod tests { producer_handle.join().unwrap(); } + #[test] + fn test_mut_closure_value() { + let (sucker, sourcer) = CrossbeamSuck::::pair(); + + // Start producer + let producer_handle = std::thread::spawn(move || { + let mut count = 0; + sourcer + .set_mut(move || { + count += 1; + count + }) + .unwrap(); + sourcer.run().unwrap(); + }); + + // Ensure consumer gets incrementing values from the mutable closure + let value1 = sucker.get().unwrap(); + assert_eq!(value1, 1); + + let value2 = sucker.get().unwrap(); + assert_eq!(value2, 2); + + let value3 = sucker.get().unwrap(); + assert_eq!(value3, 3); + + // Close consumer + sucker.close().unwrap(); + + producer_handle.join().unwrap(); + } + #[test] fn test_no_source_error() { let (sucker, sourcer) = CrossbeamSuck::::pair(); diff --git a/src/sync/flume.rs b/src/sync/flume.rs index 78ec2a5..e201d91 100644 --- a/src/sync/flume.rs +++ b/src/sync/flume.rs @@ -133,6 +133,38 @@ mod tests { producer_handle.join().unwrap(); } + #[test] + fn test_mut_closure_value() { + let (sucker, sourcer) = FlumeSuck::::pair(); + + // Start producer + let producer_handle = std::thread::spawn(move || { + let mut count = 0; + sourcer + .set_mut(move || { + count += 1; + count + }) + .unwrap(); + sourcer.run().unwrap(); + }); + + // Ensure consumer gets incrementing values from the mutable closure + let value1 = sucker.get().unwrap(); + assert_eq!(value1, 1); + + let value2 = sucker.get().unwrap(); + assert_eq!(value2, 2); + + let value3 = sucker.get().unwrap(); + assert_eq!(value3, 3); + + // Close consumer + sucker.close().unwrap(); + + producer_handle.join().unwrap(); + } + #[test] fn test_no_source_error() { let (sucker, sourcer) = FlumeSuck::::pair(); diff --git a/src/sync/std.rs b/src/sync/std.rs index 8184a2b..8568b98 100644 --- a/src/sync/std.rs +++ b/src/sync/std.rs @@ -131,6 +131,38 @@ mod tests { producer_handle.join().unwrap(); } + #[test] + fn test_mut_closure_value() { + let (sucker, sourcer) = StdSuck::::pair(); + + // Start producer + let producer_handle = std::thread::spawn(move || { + let mut count = 0; + sourcer + .set_mut(move || { + count += 1; + count + }) + .unwrap(); + sourcer.run().unwrap(); + }); + + // Ensure consumer gets incrementing values from the mutable closure + let value1 = sucker.get().unwrap(); + assert_eq!(value1, 1); + + let value2 = sucker.get().unwrap(); + assert_eq!(value2, 2); + + let value3 = sucker.get().unwrap(); + assert_eq!(value3, 3); + + // Close consumer + sucker.close().unwrap(); + + producer_handle.join().unwrap(); + } + #[test] fn test_no_source_error() { let (sucker, sourcer) = StdSuck::::pair(); From 56d0889b37c140404e778ff672c67ea0b9eafa96 Mon Sep 17 00:00:00 2001 From: Callum Leslie Date: Wed, 4 Mar 2026 20:43:15 +0000 Subject: [PATCH 2/8] test: increase code coverage of failure paths --- src/sync/std.rs | 114 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/src/sync/std.rs b/src/sync/std.rs index 8568b98..809d4b7 100644 --- a/src/sync/std.rs +++ b/src/sync/std.rs @@ -77,8 +77,18 @@ impl StdSuck { mod tests { use super::*; use crate::Error; + use crate::sync::traits::ChannelType; use std::thread; + #[derive(Debug)] + struct PanicOnClone; + + impl Clone for PanicOnClone { + fn clone(&self) -> Self { + panic!("intentional panic from Clone"); + } + } + #[test] fn test_pre_computed_value() { let (sucker, sourcer) = StdSuck::::pair(); @@ -218,6 +228,110 @@ mod tests { let _ = producer_handle.join(); } + #[test] + fn test_run_breaks_when_response_receiver_is_dropped() { + let (request_tx, request_rx) = StdChannel::create_request_channel(); + let (response_tx, response_rx) = StdChannel::create_response_channel::(); + drop(response_rx); + + let state = Arc::new(crate::types::ValueSource::None); + let state = ArcSwap::new(state); + let sourcer = crate::Sourcer::new(request_rx, response_tx, state); + sourcer.set_static(42).unwrap(); + + let producer_handle = thread::spawn(move || sourcer.run().unwrap()); + + request_tx.send(crate::types::Request::GetValue).unwrap(); + + producer_handle.join().unwrap(); + } + + #[test] + fn test_run_breaks_when_request_sender_is_dropped() { + let (request_tx, request_rx) = StdChannel::create_request_channel(); + let (response_tx, _response_rx) = StdChannel::create_response_channel::(); + drop(request_tx); + + let state = Arc::new(crate::types::ValueSource::None); + let state = ArcSwap::new(state); + let sourcer = crate::Sourcer::new(request_rx, response_tx, state); + + sourcer.run().unwrap(); + } + + #[test] + fn test_static_source_panic_returns_no_source() { + let (sucker, sourcer) = StdSuck::::pair(); + + let producer_handle = thread::spawn(move || { + sourcer.set_static(PanicOnClone).unwrap(); + sourcer.run().unwrap(); + }); + + let result = sucker.get(); + assert!(matches!(result, Err(Error::NoSource))); + + sucker.close().unwrap(); + producer_handle.join().unwrap(); + } + + #[test] + fn test_dynamic_source_panic_returns_no_source() { + let (sucker, sourcer) = StdSuck::::pair(); + + let producer_handle = thread::spawn(move || { + sourcer + .set(|| -> i32 { + panic!("intentional panic from Fn source"); + }) + .unwrap(); + sourcer.run().unwrap(); + }); + + let result = sucker.get(); + assert!(matches!(result, Err(Error::NoSource))); + + sucker.close().unwrap(); + producer_handle.join().unwrap(); + } + + #[test] + fn test_dynamic_mut_source_panic_returns_no_source() { + let (sucker, sourcer) = StdSuck::::pair(); + + let producer_handle = thread::spawn(move || { + sourcer + .set_mut(|| -> i32 { + panic!("intentional panic from FnMut source"); + }) + .unwrap(); + sourcer.run().unwrap(); + }); + + let result = sucker.get(); + assert!(matches!(result, Err(Error::NoSource))); + + sucker.close().unwrap(); + producer_handle.join().unwrap(); + } + + #[test] + fn test_cleared_source_returns_channel_closed() { + let (sucker, sourcer) = StdSuck::::pair(); + + let producer_handle = thread::spawn(move || { + sourcer.set_static(42).unwrap(); + sourcer.close().unwrap(); + sourcer.run().unwrap(); + }); + + let result = sucker.get(); + assert!(matches!(result, Err(Error::ChannelClosed))); + + sucker.close().unwrap(); + producer_handle.join().unwrap(); + } + #[test] fn test_is_closed() { let (sucker, sourcer) = StdSuck::::pair(); From 863ca61215df372313fa1330d61686988341cbdd Mon Sep 17 00:00:00 2001 From: Callum Leslie Date: Wed, 4 Mar 2026 20:51:20 +0000 Subject: [PATCH 3/8] feat: implement asynchronous channel support with tokio integration --- Cargo.toml | 8 +- rust-toolchain.toml | 3 - src/async_channel.rs | 196 +++++++++++++++++++++++++++++++++++++ src/asynchronous/mod.rs | 7 ++ src/asynchronous/tokio.rs | 113 +++++++++++++++++++++ src/asynchronous/traits.rs | 27 +++++ src/channel.rs | 2 +- src/lib.rs | 7 ++ src/sync/crossbeam.rs | 2 +- src/sync/flume.rs | 2 +- src/sync/std.rs | 4 +- src/sync/traits.rs | 24 +---- src/traits.rs | 23 +++++ 13 files changed, 385 insertions(+), 33 deletions(-) delete mode 100644 rust-toolchain.toml create mode 100644 src/async_channel.rs create mode 100644 src/asynchronous/mod.rs create mode 100644 src/asynchronous/tokio.rs create mode 100644 src/asynchronous/traits.rs create mode 100644 src/traits.rs diff --git a/Cargo.toml b/Cargo.toml index a4277f3..aec3b50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,20 +18,24 @@ thiserror = "2.0" flume = { version = "0.12", optional = true } crossbeam-channel = { version = "0.5", optional = true } arc-swap = "1.7.1" +tokio = { version = "1.48", features = ["sync", "macros", "rt-multi-thread"], optional = true } +async-trait = { version = "0.1", optional = true } [features] default = ["all"] sync = [] -async = [] +async = ["dep:async-trait"] sync-std = ["sync"] sync-flume = ["sync", "dep:flume"] sync-crossbeam = ["sync", "dep:crossbeam-channel"] +async-tokio = ["async", "dep:tokio"] all-sync = ["sync-std", "sync-flume", "sync-crossbeam"] +all-async = ["async-tokio"] -all = ["all-sync"] +all = ["all-sync", "all-async"] [lib] diff --git a/rust-toolchain.toml b/rust-toolchain.toml deleted file mode 100644 index 02cb8fc..0000000 --- a/rust-toolchain.toml +++ /dev/null @@ -1,3 +0,0 @@ -[toolchain] -channel = "stable" -profile = "default" diff --git a/src/async_channel.rs b/src/async_channel.rs new file mode 100644 index 0000000..532cac7 --- /dev/null +++ b/src/async_channel.rs @@ -0,0 +1,196 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; + +use crate::asynchronous::traits::{AsyncChannelReceiver, AsyncChannelSender}; +use crate::error::Error; +use crate::types::{ChannelState, Request, Response, ValueSource}; + +/// The consumer side of the channel that requests values asynchronously. +pub struct AsyncSucker +where + ST: AsyncChannelSender, + SR: AsyncChannelReceiver>, +{ + request_tx: ST, + response_rx: SR, + closed: AtomicBool, + _phantom: std::marker::PhantomData, +} + +impl AsyncSucker +where + ST: AsyncChannelSender, + SR: AsyncChannelReceiver>, +{ + pub(crate) fn new(request_tx: ST, response_rx: SR) -> Self { + Self { + request_tx, + response_rx, + closed: AtomicBool::new(false), + _phantom: std::marker::PhantomData, + } + } +} + +/// The producer side of the channel that provides values asynchronously. +pub struct AsyncSourcer +where + SR: AsyncChannelReceiver, + ST: AsyncChannelSender>, +{ + request_rx: SR, + response_tx: ST, + state: ChannelState, + _phantom: std::marker::PhantomData, +} + +impl AsyncSourcer +where + SR: AsyncChannelReceiver, + ST: AsyncChannelSender>, +{ + pub(crate) fn new(request_rx: SR, response_tx: ST, state: ChannelState) -> Self { + Self { + request_rx, + response_tx, + state, + _phantom: std::marker::PhantomData, + } + } +} + +impl AsyncSourcer +where + T: Send + 'static, + SR: AsyncChannelReceiver, + ST: AsyncChannelSender>, +{ + pub fn set_static(&self, val: T) -> Result<(), Error> + where + T: Clone, + { + self.state.swap(Arc::new(ValueSource::Static { + val, + clone: T::clone, + })); + Ok(()) + } + + pub fn set(&self, closure: F) -> Result<(), Error> + where + F: Fn() -> T + Send + Sync + 'static, + { + self.state + .swap(Arc::new(ValueSource::Dynamic(Box::new(closure)))); + Ok(()) + } + + pub fn set_mut(&self, closure: F) -> Result<(), Error> + where + F: FnMut() -> T + Send + Sync + 'static, + { + self.state + .swap(Arc::new(ValueSource::DynamicMut(Mutex::new(Box::new( + closure, + ))))); + Ok(()) + } + + pub fn close(&self) -> Result<(), Error> { + self.state.swap(Arc::new(ValueSource::Cleared)); + Ok(()) + } + + pub async fn run(self) -> Result<(), Error> { + loop { + match self.request_rx.recv().await { + Ok(Request::GetValue) => { + let response = self.handle_get_value()?; + if self.response_tx.send(response).await.is_err() { + break; + } + } + Ok(Request::Close) => { + self.close()?; + break; + } + Err(_) => break, + } + } + Ok(()) + } + + fn handle_get_value(&self) -> Result, Error> { + let state = self.state.load(); + + match &**state { + ValueSource::Static { val, clone } => { + let value = self.execute_closure_safely(&mut || clone(val)); + match value { + Ok(v) => Ok(Response::Value(v)), + Err(_) => Ok(Response::NoSource), + } + } + ValueSource::Dynamic(closure) => { + let value = self.execute_closure_safely(&mut || closure()); + match value { + Ok(v) => Ok(Response::Value(v)), + Err(_) => Ok(Response::NoSource), + } + } + ValueSource::DynamicMut(closure) => { + let mut closure = closure.lock().unwrap(); + let value = self.execute_closure_safely(&mut *closure); + match value { + Ok(v) => Ok(Response::Value(v)), + Err(_) => Ok(Response::NoSource), + } + } + ValueSource::None => Ok(Response::NoSource), + ValueSource::Cleared => Ok(Response::Closed), + } + } + + fn execute_closure_safely( + &self, + closure: &mut dyn FnMut() -> T, + ) -> Result> { + std::panic::catch_unwind(std::panic::AssertUnwindSafe(closure)) + } +} + +impl AsyncSucker +where + ST: AsyncChannelSender, + SR: AsyncChannelReceiver>, +{ + pub async fn get(&self) -> Result { + if self.closed.load(Ordering::Acquire) { + return Err(Error::ChannelClosed); + } + + self.request_tx + .send(Request::GetValue) + .await + .map_err(|_| Error::ProducerDisconnected)?; + + match self.response_rx.recv().await { + Ok(Response::Value(value)) => Ok(value), + Ok(Response::NoSource) => Err(Error::NoSource), + Ok(Response::Closed) => Err(Error::ChannelClosed), + Err(_) => Err(Error::ProducerDisconnected), + } + } + + pub async fn is_closed(&self) -> bool { + self.request_tx.send(Request::GetValue).await.is_err() + } + + pub async fn close(&self) -> Result<(), Error> { + self.closed.store(true, Ordering::Release); + self.request_tx + .send(Request::Close) + .await + .map_err(|_| Error::ProducerDisconnected) + } +} diff --git a/src/asynchronous/mod.rs b/src/asynchronous/mod.rs new file mode 100644 index 0000000..fcaa130 --- /dev/null +++ b/src/asynchronous/mod.rs @@ -0,0 +1,7 @@ +pub mod traits; + +#[cfg(feature = "async-tokio")] +pub mod tokio; + +#[cfg(feature = "async-tokio")] +pub use tokio::TokioSuck; diff --git a/src/asynchronous/tokio.rs b/src/asynchronous/tokio.rs new file mode 100644 index 0000000..10d4cf9 --- /dev/null +++ b/src/asynchronous/tokio.rs @@ -0,0 +1,113 @@ +use std::sync::Arc; + +use arc_swap::ArcSwap; +use async_trait::async_trait; +use tokio::sync::{Mutex, mpsc}; + +use crate::asynchronous::traits::{ + AsyncChannelReceiver, AsyncChannelSender, AsyncChannelType, ChannelError, +}; +use crate::types; + +type TokioSucker = + crate::AsyncSucker, TokioReceiver>>; +type TokioSourcer = + crate::AsyncSourcer, TokioSender>>; + +pub struct TokioSender(mpsc::UnboundedSender); +pub struct TokioReceiver(Mutex>); + +#[async_trait] +impl AsyncChannelSender for TokioSender { + async fn send(&self, msg: T) -> Result<(), ChannelError> { + self.0 + .send(msg) + .map_err(|_| ChannelError::ProducerDisconnected) + } +} + +#[async_trait] +impl AsyncChannelReceiver for TokioReceiver { + async fn recv(&self) -> Result { + let mut receiver = self.0.lock().await; + receiver + .recv() + .await + .ok_or(ChannelError::ProducerDisconnected) + } +} + +pub struct TokioChannel; + +impl AsyncChannelType for TokioChannel { + type Sender = TokioSender; + type Receiver = TokioReceiver; + + fn create_request_channel() -> (Self::Sender, Self::Receiver) { + let (tx, rx) = mpsc::unbounded_channel(); + (TokioSender(tx), TokioReceiver(Mutex::new(rx))) + } + + fn create_response_channel() -> ( + Self::Sender>, + Self::Receiver>, + ) { + let (tx, rx) = mpsc::unbounded_channel(); + (TokioSender(tx), TokioReceiver(Mutex::new(rx))) + } +} + +pub struct TokioSuck { + _phantom: std::marker::PhantomData, +} + +impl TokioSuck { + pub fn pair() -> (TokioSucker, TokioSourcer) + where + T: Clone + Send + 'static, + { + let (request_tx, request_rx) = TokioChannel::create_request_channel(); + let (response_tx, response_rx) = TokioChannel::create_response_channel::(); + let state = ArcSwap::new(Arc::new(crate::types::ValueSource::None)); + + let sucker = crate::AsyncSucker::new(request_tx, response_rx); + let sourcer = crate::AsyncSourcer::new(request_rx, response_tx, state); + + (sucker, sourcer) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Error; + + #[tokio::test] + async fn test_pre_computed_value() { + let (sucker, sourcer) = TokioSuck::::pair(); + + let producer = tokio::spawn(async move { + sourcer.set_static(42).unwrap(); + sourcer.run().await.unwrap(); + }); + + let value = sucker.get().await.unwrap(); + assert_eq!(value, 42); + sucker.close().await.unwrap(); + producer.await.unwrap(); + } + + #[tokio::test] + async fn test_no_source_error() { + let (sucker, sourcer) = TokioSuck::::pair(); + + let producer = tokio::spawn(async move { + sourcer.run().await.unwrap(); + }); + + let result = sucker.get().await; + assert!(matches!(result, Err(Error::NoSource))); + sucker.close().await.unwrap(); + producer.await.unwrap(); + } +} diff --git a/src/asynchronous/traits.rs b/src/asynchronous/traits.rs new file mode 100644 index 0000000..67b6ec3 --- /dev/null +++ b/src/asynchronous/traits.rs @@ -0,0 +1,27 @@ +use async_trait::async_trait; + +pub use crate::error::Error as ChannelError; + +#[async_trait] +pub trait AsyncChannelSender: Send + Sync { + async fn send(&self, msg: T) -> Result<(), ChannelError>; +} + +#[async_trait] +pub trait AsyncChannelReceiver: Send + Sync { + async fn recv(&self) -> Result; +} + +pub trait AsyncChannelType { + type Sender: AsyncChannelSender; + type Receiver: AsyncChannelReceiver; + + fn create_request_channel() -> ( + Self::Sender, + Self::Receiver, + ); + fn create_response_channel() -> ( + Self::Sender>, + Self::Receiver>, + ); +} diff --git a/src/channel.rs b/src/channel.rs index c3c8159..391725b 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use crate::error::Error; -use crate::sync::traits::{ChannelReceiver, ChannelSender}; +use crate::traits::{ChannelReceiver, ChannelSender}; use crate::types::{ChannelState, Request, Response, ValueSource}; /// The consumer side of the channel that requests values diff --git a/src/lib.rs b/src/lib.rs index 867d924..76c3e87 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,19 @@ #![doc = include_str!("../README.md")] #![cfg_attr(docsrs, feature(doc_auto_cfg))] +#[cfg(feature = "async")] +pub mod async_channel; pub mod channel; pub mod error; +pub mod traits; +#[cfg(feature = "async")] +pub mod asynchronous; #[cfg(feature = "sync")] pub mod sync; pub mod types; +#[cfg(feature = "async")] +pub use async_channel::{AsyncSourcer, AsyncSucker}; pub use channel::{Sourcer, Sucker}; pub use error::Error; diff --git a/src/sync/crossbeam.rs b/src/sync/crossbeam.rs index cb8e6a9..4f2ff02 100644 --- a/src/sync/crossbeam.rs +++ b/src/sync/crossbeam.rs @@ -1,7 +1,7 @@ use std::sync::Arc; #[cfg(feature = "sync-crossbeam")] -use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; +use crate::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; use arc_swap::ArcSwap; use crossbeam_channel; diff --git a/src/sync/flume.rs b/src/sync/flume.rs index e201d91..2fe4a22 100644 --- a/src/sync/flume.rs +++ b/src/sync/flume.rs @@ -1,7 +1,7 @@ use std::sync::Arc; #[cfg(feature = "sync-flume")] -use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; +use crate::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; use arc_swap::ArcSwap; use flume; diff --git a/src/sync/std.rs b/src/sync/std.rs index 809d4b7..692055e 100644 --- a/src/sync/std.rs +++ b/src/sync/std.rs @@ -1,6 +1,6 @@ use arc_swap::ArcSwap; -use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; +use crate::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; use std::sync::Arc; #[cfg(feature = "sync-std")] @@ -77,7 +77,7 @@ impl StdSuck { mod tests { use super::*; use crate::Error; - use crate::sync::traits::ChannelType; + use crate::traits::ChannelType; use std::thread; #[derive(Debug)] diff --git a/src/sync/traits.rs b/src/sync/traits.rs index d2d1fa5..5d9bf6c 100644 --- a/src/sync/traits.rs +++ b/src/sync/traits.rs @@ -1,23 +1 @@ -pub use crate::error::Error as ChannelError; - -pub trait ChannelSender { - fn send(&self, msg: T) -> Result<(), ChannelError>; -} - -pub trait ChannelReceiver { - fn recv(&self) -> Result; -} - -pub trait ChannelType { - type Sender: ChannelSender; - type Receiver: ChannelReceiver; - - fn create_request_channel() -> ( - Self::Sender, - Self::Receiver, - ); - fn create_response_channel() -> ( - Self::Sender>, - Self::Receiver>, - ); -} +pub use crate::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; diff --git a/src/traits.rs b/src/traits.rs new file mode 100644 index 0000000..d2d1fa5 --- /dev/null +++ b/src/traits.rs @@ -0,0 +1,23 @@ +pub use crate::error::Error as ChannelError; + +pub trait ChannelSender { + fn send(&self, msg: T) -> Result<(), ChannelError>; +} + +pub trait ChannelReceiver { + fn recv(&self) -> Result; +} + +pub trait ChannelType { + type Sender: ChannelSender; + type Receiver: ChannelReceiver; + + fn create_request_channel() -> ( + Self::Sender, + Self::Receiver, + ); + fn create_response_channel() -> ( + Self::Sender>, + Self::Receiver>, + ); +} From 79ad51772db277935793f744d7d7b5af39bc4d77 Mon Sep 17 00:00:00 2001 From: Callum Leslie Date: Wed, 4 Mar 2026 20:53:58 +0000 Subject: [PATCH 4/8] chore: remove unused traits module --- src/sync/mod.rs | 2 -- src/sync/traits.rs | 1 - 2 files changed, 3 deletions(-) delete mode 100644 src/sync/traits.rs diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 8feab0d..8462cd4 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,5 +1,3 @@ -pub mod traits; - #[cfg(feature = "sync-crossbeam")] pub mod crossbeam; #[cfg(feature = "sync-flume")] diff --git a/src/sync/traits.rs b/src/sync/traits.rs deleted file mode 100644 index 5d9bf6c..0000000 --- a/src/sync/traits.rs +++ /dev/null @@ -1 +0,0 @@ -pub use crate::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; From 72b87fd6e0a90fe4cd583ab0124c00a885dc608a Mon Sep 17 00:00:00 2001 From: Callum Leslie Date: Wed, 4 Mar 2026 20:56:09 +0000 Subject: [PATCH 5/8] chore: reorganize module exports for async and sync features --- src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 76c3e87..739a837 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,17 +3,21 @@ #[cfg(feature = "async")] pub mod async_channel; +#[cfg(feature = "sync")] pub mod channel; pub mod error; +#[cfg(feature = "sync")] pub mod traits; #[cfg(feature = "async")] pub mod asynchronous; #[cfg(feature = "sync")] pub mod sync; +#[cfg(any(feature = "sync", feature = "async"))] pub mod types; #[cfg(feature = "async")] pub use async_channel::{AsyncSourcer, AsyncSucker}; +#[cfg(feature = "sync")] pub use channel::{Sourcer, Sucker}; pub use error::Error; From 8366421edeb845aaeb2ea4f46971e825a6004315 Mon Sep 17 00:00:00 2001 From: Callum Leslie Date: Wed, 4 Mar 2026 20:58:17 +0000 Subject: [PATCH 6/8] refactor: move traits to sync module and update imports --- src/channel.rs | 2 +- src/lib.rs | 2 -- src/sync/crossbeam.rs | 2 +- src/sync/flume.rs | 2 +- src/sync/mod.rs | 2 ++ src/sync/std.rs | 4 ++-- src/{ => sync}/traits.rs | 0 7 files changed, 7 insertions(+), 7 deletions(-) rename src/{ => sync}/traits.rs (100%) diff --git a/src/channel.rs b/src/channel.rs index 391725b..c3c8159 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use crate::error::Error; -use crate::traits::{ChannelReceiver, ChannelSender}; +use crate::sync::traits::{ChannelReceiver, ChannelSender}; use crate::types::{ChannelState, Request, Response, ValueSource}; /// The consumer side of the channel that requests values diff --git a/src/lib.rs b/src/lib.rs index 739a837..d5df465 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,8 +6,6 @@ pub mod async_channel; #[cfg(feature = "sync")] pub mod channel; pub mod error; -#[cfg(feature = "sync")] -pub mod traits; #[cfg(feature = "async")] pub mod asynchronous; diff --git a/src/sync/crossbeam.rs b/src/sync/crossbeam.rs index 4f2ff02..cb8e6a9 100644 --- a/src/sync/crossbeam.rs +++ b/src/sync/crossbeam.rs @@ -1,7 +1,7 @@ use std::sync::Arc; #[cfg(feature = "sync-crossbeam")] -use crate::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; +use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; use arc_swap::ArcSwap; use crossbeam_channel; diff --git a/src/sync/flume.rs b/src/sync/flume.rs index 2fe4a22..e201d91 100644 --- a/src/sync/flume.rs +++ b/src/sync/flume.rs @@ -1,7 +1,7 @@ use std::sync::Arc; #[cfg(feature = "sync-flume")] -use crate::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; +use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; use arc_swap::ArcSwap; use flume; diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 8462cd4..8feab0d 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,3 +1,5 @@ +pub mod traits; + #[cfg(feature = "sync-crossbeam")] pub mod crossbeam; #[cfg(feature = "sync-flume")] diff --git a/src/sync/std.rs b/src/sync/std.rs index 692055e..809d4b7 100644 --- a/src/sync/std.rs +++ b/src/sync/std.rs @@ -1,6 +1,6 @@ use arc_swap::ArcSwap; -use crate::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; +use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; use std::sync::Arc; #[cfg(feature = "sync-std")] @@ -77,7 +77,7 @@ impl StdSuck { mod tests { use super::*; use crate::Error; - use crate::traits::ChannelType; + use crate::sync::traits::ChannelType; use std::thread; #[derive(Debug)] diff --git a/src/traits.rs b/src/sync/traits.rs similarity index 100% rename from src/traits.rs rename to src/sync/traits.rs From a8d73eba4b4c77e54ffa0598b13d0def38be66b3 Mon Sep 17 00:00:00 2001 From: Callum Leslie Date: Wed, 4 Mar 2026 21:01:46 +0000 Subject: [PATCH 7/8] refactor: reorganize channel modules and implement async/sync structures --- .../channel.rs} | 0 src/asynchronous/mod.rs | 1 + src/asynchronous/tokio.rs | 19 +++++++++++++------ src/lib.rs | 10 +++------- src/{ => sync}/channel.rs | 0 src/sync/crossbeam.rs | 18 ++++++++++++------ src/sync/flume.rs | 13 ++++++++----- src/sync/mod.rs | 1 + src/sync/std.rs | 14 ++++++++------ 9 files changed, 46 insertions(+), 30 deletions(-) rename src/{async_channel.rs => asynchronous/channel.rs} (100%) rename src/{ => sync}/channel.rs (100%) diff --git a/src/async_channel.rs b/src/asynchronous/channel.rs similarity index 100% rename from src/async_channel.rs rename to src/asynchronous/channel.rs diff --git a/src/asynchronous/mod.rs b/src/asynchronous/mod.rs index fcaa130..9a55d57 100644 --- a/src/asynchronous/mod.rs +++ b/src/asynchronous/mod.rs @@ -1,3 +1,4 @@ +pub mod channel; pub mod traits; #[cfg(feature = "async-tokio")] diff --git a/src/asynchronous/tokio.rs b/src/asynchronous/tokio.rs index 10d4cf9..31f7c70 100644 --- a/src/asynchronous/tokio.rs +++ b/src/asynchronous/tokio.rs @@ -9,10 +9,16 @@ use crate::asynchronous::traits::{ }; use crate::types; -type TokioSucker = - crate::AsyncSucker, TokioReceiver>>; -type TokioSourcer = - crate::AsyncSourcer, TokioSender>>; +type TokioSucker = crate::asynchronous::channel::AsyncSucker< + T, + TokioSender, + TokioReceiver>, +>; +type TokioSourcer = crate::asynchronous::channel::AsyncSourcer< + T, + TokioReceiver, + TokioSender>, +>; pub struct TokioSender(mpsc::UnboundedSender); pub struct TokioReceiver(Mutex>); @@ -70,8 +76,9 @@ impl TokioSuck { let (response_tx, response_rx) = TokioChannel::create_response_channel::(); let state = ArcSwap::new(Arc::new(crate::types::ValueSource::None)); - let sucker = crate::AsyncSucker::new(request_tx, response_rx); - let sourcer = crate::AsyncSourcer::new(request_rx, response_tx, state); + let sucker = crate::asynchronous::channel::AsyncSucker::new(request_tx, response_rx); + let sourcer = + crate::asynchronous::channel::AsyncSourcer::new(request_rx, response_tx, state); (sucker, sourcer) } diff --git a/src/lib.rs b/src/lib.rs index d5df465..3ea0acc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,6 @@ #![doc = include_str!("../README.md")] #![cfg_attr(docsrs, feature(doc_auto_cfg))] -#[cfg(feature = "async")] -pub mod async_channel; -#[cfg(feature = "sync")] -pub mod channel; pub mod error; #[cfg(feature = "async")] @@ -15,7 +11,7 @@ pub mod sync; pub mod types; #[cfg(feature = "async")] -pub use async_channel::{AsyncSourcer, AsyncSucker}; -#[cfg(feature = "sync")] -pub use channel::{Sourcer, Sucker}; +pub use asynchronous::channel::{AsyncSourcer, AsyncSucker}; pub use error::Error; +#[cfg(feature = "sync")] +pub use sync::channel::{Sourcer, Sucker}; diff --git a/src/channel.rs b/src/sync/channel.rs similarity index 100% rename from src/channel.rs rename to src/sync/channel.rs diff --git a/src/sync/crossbeam.rs b/src/sync/crossbeam.rs index cb8e6a9..f0d2ccc 100644 --- a/src/sync/crossbeam.rs +++ b/src/sync/crossbeam.rs @@ -6,10 +6,16 @@ use crate::types; use arc_swap::ArcSwap; use crossbeam_channel; -type CrossbeamSucker = - crate::Sucker, CrossbeamReceiver>>; -type CrossbeamSourcer = - crate::Sourcer, CrossbeamSender>>; +type CrossbeamSucker = crate::sync::channel::Sucker< + T, + CrossbeamSender, + CrossbeamReceiver>, +>; +type CrossbeamSourcer = crate::sync::channel::Sourcer< + T, + CrossbeamReceiver, + CrossbeamSender>, +>; /// Internal sender type for crossbeam backend pub struct CrossbeamSender(crossbeam_channel::Sender); @@ -67,8 +73,8 @@ impl CrossbeamSuck { let state = ArcSwap::new(Arc::new(crate::types::ValueSource::None)); - let sucker = crate::Sucker::new(request_tx, response_rx); - let sourcer = crate::Sourcer::new(request_rx, response_tx, state); + let sucker = crate::sync::channel::Sucker::new(request_tx, response_rx); + let sourcer = crate::sync::channel::Sourcer::new(request_rx, response_tx, state); (sucker, sourcer) } diff --git a/src/sync/flume.rs b/src/sync/flume.rs index e201d91..5304693 100644 --- a/src/sync/flume.rs +++ b/src/sync/flume.rs @@ -7,9 +7,12 @@ use arc_swap::ArcSwap; use flume; type FlumeSucker = - crate::Sucker, FlumeReceiver>>; -type FlumeSourcer = - crate::Sourcer, FlumeSender>>; + crate::sync::channel::Sucker, FlumeReceiver>>; +type FlumeSourcer = crate::sync::channel::Sourcer< + T, + FlumeReceiver, + FlumeSender>, +>; /// Internal sender type for flume backend pub struct FlumeSender(flume::Sender); @@ -68,8 +71,8 @@ impl FlumeSuck { let state = Arc::new(crate::types::ValueSource::None); let state = ArcSwap::new(state); - let sucker = crate::Sucker::new(request_tx, response_rx); - let sourcer = crate::Sourcer::new(request_rx, response_tx, state); + let sucker = crate::sync::channel::Sucker::new(request_tx, response_rx); + let sourcer = crate::sync::channel::Sourcer::new(request_rx, response_tx, state); (sucker, sourcer) } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 8feab0d..49153b3 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,3 +1,4 @@ +pub mod channel; pub mod traits; #[cfg(feature = "sync-crossbeam")] diff --git a/src/sync/std.rs b/src/sync/std.rs index 809d4b7..cf2b06a 100644 --- a/src/sync/std.rs +++ b/src/sync/std.rs @@ -6,8 +6,10 @@ use std::sync::Arc; #[cfg(feature = "sync-std")] use std::sync::mpsc; -type StdSucker = crate::Sucker, StdReceiver>>; -type StdSourcer = crate::Sourcer, StdSender>>; +type StdSucker = + crate::sync::channel::Sucker, StdReceiver>>; +type StdSourcer = + crate::sync::channel::Sourcer, StdSender>>; /// Internal sender type for std backend pub struct StdSender(mpsc::Sender); @@ -66,8 +68,8 @@ impl StdSuck { let state = Arc::new(crate::types::ValueSource::None); let state = ArcSwap::new(state); - let sucker = crate::Sucker::new(request_tx, response_rx); - let sourcer = crate::Sourcer::new(request_rx, response_tx, state); + let sucker = crate::sync::channel::Sucker::new(request_tx, response_rx); + let sourcer = crate::sync::channel::Sourcer::new(request_rx, response_tx, state); (sucker, sourcer) } @@ -236,7 +238,7 @@ mod tests { let state = Arc::new(crate::types::ValueSource::None); let state = ArcSwap::new(state); - let sourcer = crate::Sourcer::new(request_rx, response_tx, state); + let sourcer = crate::sync::channel::Sourcer::new(request_rx, response_tx, state); sourcer.set_static(42).unwrap(); let producer_handle = thread::spawn(move || sourcer.run().unwrap()); @@ -254,7 +256,7 @@ mod tests { let state = Arc::new(crate::types::ValueSource::None); let state = ArcSwap::new(state); - let sourcer = crate::Sourcer::new(request_rx, response_tx, state); + let sourcer = crate::sync::channel::Sourcer::new(request_rx, response_tx, state); sourcer.run().unwrap(); } From f8effc1d29291d1a1b2d7e174df3df63abde516d Mon Sep 17 00:00:00 2001 From: Callum Leslie Date: Wed, 4 Mar 2026 21:11:23 +0000 Subject: [PATCH 8/8] chore: release v0.0.3 --- CHANGELOG.md | 26 ++++++++++++++++++++++++++ Cargo.toml | 2 +- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25e85a7..90f7b45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,32 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [0.0.3] - 2026-03-04 + +### ๐Ÿš€ Features + +- Remove closed flag from ChannelState +- Add internal constructor for `Sucker`/`Sourcer` +- Implement asynchronous channel support with tokio integration + +### ๐Ÿ› Bug Fixes + +- Correct toolchain in flake + +### ๐Ÿšœ Refactor + +- Move traits to sync module and update imports +- Reorganize channel modules and implement async/sync structures + +### ๐Ÿงช Testing + +- Set_mut tests +- Increase code coverage of failure paths + +### โš™๏ธ Miscellaneous Tasks + +- Remove unused traits module +- Reorganize module exports for async and sync features ## [0.0.2] - 2025-09-04 ### ๐Ÿš€ Features diff --git a/Cargo.toml b/Cargo.toml index aec3b50..49df714 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "suck" description = "Suck data up through a channel" authors = ["Callum Leslie "] -version = "0.0.2" +version = "0.0.3" edition = "2024" documentation = "https://docs.rs/suck" homepage = "https://github.com/callumio/suck"