From a8d73eba4b4c77e54ffa0598b13d0def38be66b3 Mon Sep 17 00:00:00 2001 From: Callum Leslie Date: Wed, 4 Mar 2026 21:01:46 +0000 Subject: [PATCH] refactor: reorganize channel modules and implement async/sync structures --- .../channel.rs} | 0 src/asynchronous/mod.rs | 1 + src/asynchronous/tokio.rs | 19 +++++++++++++------ src/lib.rs | 10 +++------- src/{ => sync}/channel.rs | 0 src/sync/crossbeam.rs | 18 ++++++++++++------ src/sync/flume.rs | 13 ++++++++----- src/sync/mod.rs | 1 + src/sync/std.rs | 14 ++++++++------ 9 files changed, 46 insertions(+), 30 deletions(-) rename src/{async_channel.rs => asynchronous/channel.rs} (100%) rename src/{ => sync}/channel.rs (100%) diff --git a/src/async_channel.rs b/src/asynchronous/channel.rs similarity index 100% rename from src/async_channel.rs rename to src/asynchronous/channel.rs diff --git a/src/asynchronous/mod.rs b/src/asynchronous/mod.rs index fcaa130..9a55d57 100644 --- a/src/asynchronous/mod.rs +++ b/src/asynchronous/mod.rs @@ -1,3 +1,4 @@ +pub mod channel; pub mod traits; #[cfg(feature = "async-tokio")] diff --git a/src/asynchronous/tokio.rs b/src/asynchronous/tokio.rs index 10d4cf9..31f7c70 100644 --- a/src/asynchronous/tokio.rs +++ b/src/asynchronous/tokio.rs @@ -9,10 +9,16 @@ use crate::asynchronous::traits::{ }; use crate::types; -type TokioSucker = - crate::AsyncSucker, TokioReceiver>>; -type TokioSourcer = - crate::AsyncSourcer, TokioSender>>; +type TokioSucker = crate::asynchronous::channel::AsyncSucker< + T, + TokioSender, + TokioReceiver>, +>; +type TokioSourcer = crate::asynchronous::channel::AsyncSourcer< + T, + TokioReceiver, + TokioSender>, +>; pub struct TokioSender(mpsc::UnboundedSender); pub struct TokioReceiver(Mutex>); @@ -70,8 +76,9 @@ impl TokioSuck { let (response_tx, response_rx) = TokioChannel::create_response_channel::(); let state = ArcSwap::new(Arc::new(crate::types::ValueSource::None)); - let sucker = crate::AsyncSucker::new(request_tx, response_rx); - let sourcer = crate::AsyncSourcer::new(request_rx, response_tx, state); + let sucker = crate::asynchronous::channel::AsyncSucker::new(request_tx, response_rx); + let sourcer = + crate::asynchronous::channel::AsyncSourcer::new(request_rx, response_tx, state); (sucker, sourcer) } diff --git a/src/lib.rs b/src/lib.rs index d5df465..3ea0acc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,6 @@ #![doc = include_str!("../README.md")] #![cfg_attr(docsrs, feature(doc_auto_cfg))] -#[cfg(feature = "async")] -pub mod async_channel; -#[cfg(feature = "sync")] -pub mod channel; pub mod error; #[cfg(feature = "async")] @@ -15,7 +11,7 @@ pub mod sync; pub mod types; #[cfg(feature = "async")] -pub use async_channel::{AsyncSourcer, AsyncSucker}; -#[cfg(feature = "sync")] -pub use channel::{Sourcer, Sucker}; +pub use asynchronous::channel::{AsyncSourcer, AsyncSucker}; pub use error::Error; +#[cfg(feature = "sync")] +pub use sync::channel::{Sourcer, Sucker}; diff --git a/src/channel.rs b/src/sync/channel.rs similarity index 100% rename from src/channel.rs rename to src/sync/channel.rs diff --git a/src/sync/crossbeam.rs b/src/sync/crossbeam.rs index cb8e6a9..f0d2ccc 100644 --- a/src/sync/crossbeam.rs +++ b/src/sync/crossbeam.rs @@ -6,10 +6,16 @@ use crate::types; use arc_swap::ArcSwap; use crossbeam_channel; -type CrossbeamSucker = - crate::Sucker, CrossbeamReceiver>>; -type CrossbeamSourcer = - crate::Sourcer, CrossbeamSender>>; +type CrossbeamSucker = crate::sync::channel::Sucker< + T, + CrossbeamSender, + CrossbeamReceiver>, +>; +type CrossbeamSourcer = crate::sync::channel::Sourcer< + T, + CrossbeamReceiver, + CrossbeamSender>, +>; /// Internal sender type for crossbeam backend pub struct CrossbeamSender(crossbeam_channel::Sender); @@ -67,8 +73,8 @@ impl CrossbeamSuck { let state = ArcSwap::new(Arc::new(crate::types::ValueSource::None)); - let sucker = crate::Sucker::new(request_tx, response_rx); - let sourcer = crate::Sourcer::new(request_rx, response_tx, state); + let sucker = crate::sync::channel::Sucker::new(request_tx, response_rx); + let sourcer = crate::sync::channel::Sourcer::new(request_rx, response_tx, state); (sucker, sourcer) } diff --git a/src/sync/flume.rs b/src/sync/flume.rs index e201d91..5304693 100644 --- a/src/sync/flume.rs +++ b/src/sync/flume.rs @@ -7,9 +7,12 @@ use arc_swap::ArcSwap; use flume; type FlumeSucker = - crate::Sucker, FlumeReceiver>>; -type FlumeSourcer = - crate::Sourcer, FlumeSender>>; + crate::sync::channel::Sucker, FlumeReceiver>>; +type FlumeSourcer = crate::sync::channel::Sourcer< + T, + FlumeReceiver, + FlumeSender>, +>; /// Internal sender type for flume backend pub struct FlumeSender(flume::Sender); @@ -68,8 +71,8 @@ impl FlumeSuck { let state = Arc::new(crate::types::ValueSource::None); let state = ArcSwap::new(state); - let sucker = crate::Sucker::new(request_tx, response_rx); - let sourcer = crate::Sourcer::new(request_rx, response_tx, state); + let sucker = crate::sync::channel::Sucker::new(request_tx, response_rx); + let sourcer = crate::sync::channel::Sourcer::new(request_rx, response_tx, state); (sucker, sourcer) } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 8feab0d..49153b3 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,3 +1,4 @@ +pub mod channel; pub mod traits; #[cfg(feature = "sync-crossbeam")] diff --git a/src/sync/std.rs b/src/sync/std.rs index 809d4b7..cf2b06a 100644 --- a/src/sync/std.rs +++ b/src/sync/std.rs @@ -6,8 +6,10 @@ use std::sync::Arc; #[cfg(feature = "sync-std")] use std::sync::mpsc; -type StdSucker = crate::Sucker, StdReceiver>>; -type StdSourcer = crate::Sourcer, StdSender>>; +type StdSucker = + crate::sync::channel::Sucker, StdReceiver>>; +type StdSourcer = + crate::sync::channel::Sourcer, StdSender>>; /// Internal sender type for std backend pub struct StdSender(mpsc::Sender); @@ -66,8 +68,8 @@ impl StdSuck { let state = Arc::new(crate::types::ValueSource::None); let state = ArcSwap::new(state); - let sucker = crate::Sucker::new(request_tx, response_rx); - let sourcer = crate::Sourcer::new(request_rx, response_tx, state); + let sucker = crate::sync::channel::Sucker::new(request_tx, response_rx); + let sourcer = crate::sync::channel::Sourcer::new(request_rx, response_tx, state); (sucker, sourcer) } @@ -236,7 +238,7 @@ mod tests { let state = Arc::new(crate::types::ValueSource::None); let state = ArcSwap::new(state); - let sourcer = crate::Sourcer::new(request_rx, response_tx, state); + let sourcer = crate::sync::channel::Sourcer::new(request_rx, response_tx, state); sourcer.set_static(42).unwrap(); let producer_handle = thread::spawn(move || sourcer.run().unwrap()); @@ -254,7 +256,7 @@ mod tests { let state = Arc::new(crate::types::ValueSource::None); let state = ArcSwap::new(state); - let sourcer = crate::Sourcer::new(request_rx, response_tx, state); + let sourcer = crate::sync::channel::Sourcer::new(request_rx, response_tx, state); sourcer.run().unwrap(); }