From 68369a7e4be7916a51dc127d3a107bdb667f50a5 Mon Sep 17 00:00:00 2001 From: Roman Moisieiev Date: Fri, 12 Sep 2025 14:04:07 +0100 Subject: [PATCH 1/8] Remove unnecessary Mutex --- src/channel.rs | 7 ++++--- src/sync/crossbeam.rs | 4 +++- src/sync/flume.rs | 4 +++- src/sync/std.rs | 3 ++- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 8da830b..8452487 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use crate::error::Error; @@ -12,7 +13,7 @@ where { pub(crate) request_tx: ST, pub(crate) response_rx: SR, - pub(crate) closed: Mutex, + pub(crate) closed: AtomicBool, pub(crate) _phantom: std::marker::PhantomData, } @@ -127,7 +128,7 @@ where /// Get the current value from the producer pub fn get(&self) -> Result { // Check if locally marked as closed - if *self.closed.lock().unwrap() { + if self.closed.load(Ordering::Acquire) { return Err(Error::ChannelClosed); } @@ -152,7 +153,7 @@ where /// Close the channel from the consumer side pub fn close(&self) -> Result<(), Error> { // Mark locally as closed - *self.closed.lock().unwrap() = true; + self.closed.store(true, Ordering::Release); // Send close request self.request_tx diff --git a/src/sync/crossbeam.rs b/src/sync/crossbeam.rs index c20217c..20bc162 100644 --- a/src/sync/crossbeam.rs +++ b/src/sync/crossbeam.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::AtomicBool; + #[cfg(feature = "sync-crossbeam")] use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; @@ -70,7 +72,7 @@ impl CrossbeamSuck { let sucker = crate::Sucker { request_tx, response_rx, - closed: std::sync::Mutex::new(false), + closed: AtomicBool::new(false), _phantom: std::marker::PhantomData, }; diff --git a/src/sync/flume.rs b/src/sync/flume.rs index 9176b14..5208cf5 100644 --- a/src/sync/flume.rs +++ b/src/sync/flume.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::AtomicBool; + #[cfg(feature = "sync-flume")] use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; @@ -70,7 +72,7 @@ impl FlumeSuck { let sucker = crate::Sucker { request_tx, response_rx, - closed: std::sync::Mutex::new(false), + closed: AtomicBool::new(false), _phantom: std::marker::PhantomData, }; diff --git a/src/sync/std.rs b/src/sync/std.rs index d0458a1..d735ebb 100644 --- a/src/sync/std.rs +++ b/src/sync/std.rs @@ -1,5 +1,6 @@ use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; +use std::sync::atomic::AtomicBool; #[cfg(feature = "sync-std")] use std::sync::mpsc; @@ -68,7 +69,7 @@ impl StdSuck { let sucker = crate::Sucker { request_tx, response_rx, - closed: std::sync::Mutex::new(false), + closed: AtomicBool::new(false), _phantom: std::marker::PhantomData, }; From 32b7aa65e67659c3ee318aae8144b26f61dee136 Mon Sep 17 00:00:00 2001 From: Callum Leslie Date: Mon, 15 Sep 2025 10:07:42 +0100 Subject: [PATCH 2/8] fix: correct toolchain in flake --- flake.nix | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/flake.nix b/flake.nix index a6030b1..06a1c34 100644 --- a/flake.nix +++ b/flake.nix @@ -20,11 +20,17 @@ flake-utils.lib.eachDefaultSystem (system: let overlays = [(import rust-overlay)]; pkgs = import nixpkgs {inherit system overlays;}; - rustToolchain = pkgs.pkgsBuildHost.rust-bin.stable.latest.default; + rustToolchain = pkgs.pkgsBuildHost.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml; + rustToolchainNightly = pkgs.pkgsBuildHost.rust-bin.nightly.latest.default; tools = with pkgs; [cargo-nextest]; - nativeBuildInputs = with pkgs; [rustToolchain pkg-config] ++ tools; + nativeBuildInputs = with pkgs; [rustToolchain rustToolchainNightly pkg-config] ++ tools; in with pkgs; { - devShells.default = mkShell {inherit nativeBuildInputs;}; + devShells.default = mkShell { + inherit nativeBuildInputs; + shellHook = '' + export CARGO_NIGHTLY="${rustToolchainNightly}/bin/cargo" + ''; + }; }); } From ca4825552f816fb025d7840e47ae229c8b3e517e Mon Sep 17 00:00:00 2001 From: Callum Leslie Date: Mon, 15 Sep 2025 11:38:00 +0100 Subject: [PATCH 3/8] feat: remove closed flag from ChannelState --- src/channel.rs | 27 ++++++++------------------- src/sync/crossbeam.rs | 5 +---- src/sync/flume.rs | 5 +---- src/sync/std.rs | 5 +---- src/types.rs | 10 +++++----- 5 files changed, 16 insertions(+), 36 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 8452487..158e21a 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,5 +1,4 @@ use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; use crate::error::Error; use crate::sync::traits::{ChannelReceiver, ChannelSender}; @@ -25,7 +24,7 @@ where { pub(crate) request_rx: SR, pub(crate) response_tx: ST, - pub(crate) state: Arc>>, + pub(crate) state: ChannelState, pub(crate) _phantom: std::marker::PhantomData, } @@ -38,10 +37,7 @@ where /// Set a fixed value pub fn set_static(&self, value: T) -> Result<(), Error> { let mut state = self.state.lock().map_err(|_| Error::InternalError)?; - if state.closed { - return Err(Error::ChannelClosed); - } - state.source = ValueSource::Static(value); + *state = ValueSource::Static(value); Ok(()) } @@ -51,18 +47,14 @@ where F: Fn() -> T + Send + Sync + 'static, { let mut state = self.state.lock().map_err(|_| Error::InternalError)?; - if state.closed { - return Err(Error::ChannelClosed); - } - state.source = ValueSource::Dynamic(Box::new(closure)); + *state = ValueSource::Dynamic(Box::new(closure)); Ok(()) } /// Close the channel pub fn close(&self) -> Result<(), Error> { let mut state = self.state.lock().map_err(|_| Error::InternalError)?; - state.closed = true; - state.source = ValueSource::None; + *state = ValueSource::Cleared; Ok(()) } @@ -80,8 +72,7 @@ where Ok(Request::Close) => { // Close channel let mut state = self.state.lock().map_err(|_| Error::InternalError)?; - state.closed = true; - state.source = ValueSource::None; + *state = ValueSource::Cleared; break; } Err(_) => { @@ -95,11 +86,8 @@ where fn handle_get_value(&self) -> Result, Error> { let state = self.state.lock().map_err(|_| Error::InternalError)?; - if state.closed { - return Ok(Response::Closed); - } - match &state.source { + match &*state { ValueSource::Static(value) => Ok(Response::Value(value.clone())), ValueSource::Dynamic(closure) => { let value = self.execute_closure_safely(closure); @@ -108,7 +96,8 @@ where Err(_) => Ok(Response::NoSource), // Closure execution failed } } - ValueSource::None => Ok(Response::NoSource), + ValueSource::None => Ok(Response::NoSource), // No source was ever set + ValueSource::Cleared => Ok(Response::Closed), // Channel was closed (source was set then cleared) } } diff --git a/src/sync/crossbeam.rs b/src/sync/crossbeam.rs index 20bc162..35a99f4 100644 --- a/src/sync/crossbeam.rs +++ b/src/sync/crossbeam.rs @@ -64,10 +64,7 @@ impl CrossbeamSuck { let (request_tx, request_rx) = CrossbeamChannel::create_request_channel(); let (response_tx, response_rx) = CrossbeamChannel::create_response_channel::(); - let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ChannelState { - source: crate::types::ValueSource::None, - closed: false, - })); + let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ValueSource::None)); let sucker = crate::Sucker { request_tx, diff --git a/src/sync/flume.rs b/src/sync/flume.rs index 5208cf5..14a0a79 100644 --- a/src/sync/flume.rs +++ b/src/sync/flume.rs @@ -64,10 +64,7 @@ impl FlumeSuck { let (request_tx, request_rx) = FlumeChannel::create_request_channel(); let (response_tx, response_rx) = FlumeChannel::create_response_channel::(); - let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ChannelState { - source: crate::types::ValueSource::None, - closed: false, - })); + let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ValueSource::None)); let sucker = crate::Sucker { request_tx, diff --git a/src/sync/std.rs b/src/sync/std.rs index d735ebb..2038543 100644 --- a/src/sync/std.rs +++ b/src/sync/std.rs @@ -61,10 +61,7 @@ impl StdSuck { let (request_tx, request_rx) = StdChannel::create_request_channel(); let (response_tx, response_rx) = StdChannel::create_response_channel::(); - let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ChannelState { - source: crate::types::ValueSource::None, - closed: false, - })); + let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ValueSource::None)); let sucker = crate::Sucker { request_tx, diff --git a/src/types.rs b/src/types.rs index 0e82f9c..203a743 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,3 +1,5 @@ +use std::sync::{Arc, Mutex}; + /// Request messages sent from consumer to producer pub enum Request { GetValue, @@ -15,11 +17,9 @@ pub enum Response { pub(crate) enum ValueSource { Static(T), Dynamic(Box T + Send + Sync + 'static>), - None, + None, // Never set + Cleared, // Was set but cleared (closed) } /// Internal channel state shared between producer and consumer -pub(crate) struct ChannelState { - pub(crate) source: ValueSource, - pub(crate) closed: bool, -} +pub(crate) type ChannelState = Arc>>; From 4dd7df50e188a2a3e2490debcad4552ea451fd51 Mon Sep 17 00:00:00 2001 From: Callum Leslie Date: Tue, 16 Sep 2025 08:26:24 +0100 Subject: [PATCH 4/8] feat: add internal constructor for `Sucker`/`Sourcer` --- src/channel.rs | 48 +++++++++++++++++++++++++++++++++++-------- src/sync/crossbeam.rs | 17 ++------------- src/sync/flume.rs | 17 ++------------- src/sync/std.rs | 16 ++------------- 4 files changed, 46 insertions(+), 52 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 158e21a..3ce9e7d 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -10,10 +10,26 @@ where ST: ChannelSender, SR: ChannelReceiver>, { - pub(crate) request_tx: ST, - pub(crate) response_rx: SR, - pub(crate) closed: AtomicBool, - pub(crate) _phantom: std::marker::PhantomData, + request_tx: ST, + response_rx: SR, + closed: AtomicBool, + _phantom: std::marker::PhantomData, +} + +impl Sucker +where + ST: ChannelSender, + SR: ChannelReceiver>, +{ + /// Create a new Sucker instance + pub(crate) fn new(request_tx: ST, response_rx: SR) -> Self { + Self { + request_tx, + response_rx, + closed: AtomicBool::new(false), + _phantom: std::marker::PhantomData, + } + } } /// The producer side of the channel that provides values @@ -22,10 +38,26 @@ where SR: ChannelReceiver, ST: ChannelSender>, { - pub(crate) request_rx: SR, - pub(crate) response_tx: ST, - pub(crate) state: ChannelState, - pub(crate) _phantom: std::marker::PhantomData, + request_rx: SR, + response_tx: ST, + state: ChannelState, + _phantom: std::marker::PhantomData, +} + +impl Sourcer +where + SR: ChannelReceiver, + ST: ChannelSender>, +{ + /// Create a new Sourcer instance + pub(crate) fn new(request_rx: SR, response_tx: ST, state: ChannelState) -> Self { + Self { + request_rx, + response_tx, + state, + _phantom: std::marker::PhantomData, + } + } } impl Sourcer diff --git a/src/sync/crossbeam.rs b/src/sync/crossbeam.rs index 35a99f4..99c6a9c 100644 --- a/src/sync/crossbeam.rs +++ b/src/sync/crossbeam.rs @@ -1,5 +1,3 @@ -use std::sync::atomic::AtomicBool; - #[cfg(feature = "sync-crossbeam")] use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; @@ -66,19 +64,8 @@ impl CrossbeamSuck { let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ValueSource::None)); - let sucker = crate::Sucker { - request_tx, - response_rx, - closed: AtomicBool::new(false), - _phantom: std::marker::PhantomData, - }; - - let sourcer = crate::Sourcer { - request_rx, - response_tx, - state: std::sync::Arc::clone(&state), - _phantom: std::marker::PhantomData, - }; + let sucker = crate::Sucker::new(request_tx, response_rx); + let sourcer = crate::Sourcer::new(request_rx, response_tx, std::sync::Arc::clone(&state)); (sucker, sourcer) } diff --git a/src/sync/flume.rs b/src/sync/flume.rs index 14a0a79..a493513 100644 --- a/src/sync/flume.rs +++ b/src/sync/flume.rs @@ -1,5 +1,3 @@ -use std::sync::atomic::AtomicBool; - #[cfg(feature = "sync-flume")] use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; @@ -66,19 +64,8 @@ impl FlumeSuck { let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ValueSource::None)); - let sucker = crate::Sucker { - request_tx, - response_rx, - closed: AtomicBool::new(false), - _phantom: std::marker::PhantomData, - }; - - let sourcer = crate::Sourcer { - request_rx, - response_tx, - state: std::sync::Arc::clone(&state), - _phantom: std::marker::PhantomData, - }; + let sucker = crate::Sucker::new(request_tx, response_rx); + let sourcer = crate::Sourcer::new(request_rx, response_tx, std::sync::Arc::clone(&state)); (sucker, sourcer) } diff --git a/src/sync/std.rs b/src/sync/std.rs index 2038543..f58a845 100644 --- a/src/sync/std.rs +++ b/src/sync/std.rs @@ -1,6 +1,5 @@ use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; -use std::sync::atomic::AtomicBool; #[cfg(feature = "sync-std")] use std::sync::mpsc; @@ -63,19 +62,8 @@ impl StdSuck { let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ValueSource::None)); - let sucker = crate::Sucker { - request_tx, - response_rx, - closed: AtomicBool::new(false), - _phantom: std::marker::PhantomData, - }; - - let sourcer = crate::Sourcer { - request_rx, - response_tx, - state: std::sync::Arc::clone(&state), - _phantom: std::marker::PhantomData, - }; + let sucker = crate::Sucker::new(request_tx, response_rx); + let sourcer = crate::Sourcer::new(request_rx, response_tx, std::sync::Arc::clone(&state)); (sucker, sourcer) } From bb5950cd76ee2d22744d1df31add20a1a2180152 Mon Sep 17 00:00:00 2001 From: CordlessCoder Date: Wed, 17 Sep 2025 01:23:25 +0100 Subject: [PATCH 5/8] Relax dynamic source requirement from Fn to FnMut --- src/channel.rs | 6 +++--- src/types.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 3ce9e7d..531c0bc 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -117,9 +117,9 @@ where } fn handle_get_value(&self) -> Result, Error> { - let state = self.state.lock().map_err(|_| Error::InternalError)?; + let mut state = self.state.lock().map_err(|_| Error::InternalError)?; - match &*state { + match &mut *state { ValueSource::Static(value) => Ok(Response::Value(value.clone())), ValueSource::Dynamic(closure) => { let value = self.execute_closure_safely(closure); @@ -135,7 +135,7 @@ where fn execute_closure_safely( &self, - closure: &dyn Fn() -> T, + closure: &mut dyn FnMut() -> T, ) -> Result> { std::panic::catch_unwind(std::panic::AssertUnwindSafe(closure)) } diff --git a/src/types.rs b/src/types.rs index 203a743..32eb625 100644 --- a/src/types.rs +++ b/src/types.rs @@ -16,7 +16,7 @@ pub enum Response { /// Represents the source of values: either static or dynamic pub(crate) enum ValueSource { Static(T), - Dynamic(Box T + Send + Sync + 'static>), + Dynamic(Box T + Send + Sync + 'static>), None, // Never set Cleared, // Was set but cleared (closed) } From 016fe5302d4c346c2a25a1ee578557bff3781f29 Mon Sep 17 00:00:00 2001 From: CordlessCoder Date: Wed, 17 Sep 2025 01:34:54 +0100 Subject: [PATCH 6/8] Allow for completely lock-free access to Static values, and closed channels. --- Cargo.toml | 1 + src/channel.rs | 23 ++++++++++++----------- src/sync/crossbeam.rs | 7 +++++-- src/sync/flume.rs | 8 ++++++-- src/sync/std.rs | 8 ++++++-- src/types.rs | 8 +++++--- 6 files changed, 35 insertions(+), 20 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 94fe60b..ada7cc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ exclude = ["flake.nix", "flake.lock", ".envrc", "cliff.toml", "release-plz.toml" thiserror = "2.0" flume = { version = "0.11", optional = true } crossbeam-channel = { version = "0.5", optional = true } +arc-swap = "1.7.1" [features] default = ["all"] diff --git a/src/channel.rs b/src/channel.rs index 531c0bc..06e1dfe 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,4 +1,5 @@ use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; use crate::error::Error; use crate::sync::traits::{ChannelReceiver, ChannelSender}; @@ -68,8 +69,7 @@ where { /// Set a fixed value pub fn set_static(&self, value: T) -> Result<(), Error> { - let mut state = self.state.lock().map_err(|_| Error::InternalError)?; - *state = ValueSource::Static(value); + self.state.swap(Arc::new(ValueSource::Static(value))); Ok(()) } @@ -78,15 +78,16 @@ where where F: Fn() -> T + Send + Sync + 'static, { - let mut state = self.state.lock().map_err(|_| Error::InternalError)?; - *state = ValueSource::Dynamic(Box::new(closure)); + self.state + .swap(Arc::new(ValueSource::Dynamic(Mutex::new(Box::new( + closure, + ))))); Ok(()) } /// Close the channel pub fn close(&self) -> Result<(), Error> { - let mut state = self.state.lock().map_err(|_| Error::InternalError)?; - *state = ValueSource::Cleared; + self.state.swap(Arc::new(ValueSource::Cleared)); Ok(()) } @@ -103,8 +104,7 @@ where } Ok(Request::Close) => { // Close channel - let mut state = self.state.lock().map_err(|_| Error::InternalError)?; - *state = ValueSource::Cleared; + self.close()?; break; } Err(_) => { @@ -117,12 +117,13 @@ where } fn handle_get_value(&self) -> Result, Error> { - let mut state = self.state.lock().map_err(|_| Error::InternalError)?; + let state = self.state.load(); - match &mut *state { + match &**state { ValueSource::Static(value) => Ok(Response::Value(value.clone())), ValueSource::Dynamic(closure) => { - let value = self.execute_closure_safely(closure); + let mut closure = closure.lock().unwrap(); + let value = self.execute_closure_safely(&mut *closure); match value { Ok(v) => Ok(Response::Value(v)), Err(_) => Ok(Response::NoSource), // Closure execution failed diff --git a/src/sync/crossbeam.rs b/src/sync/crossbeam.rs index 99c6a9c..4e72624 100644 --- a/src/sync/crossbeam.rs +++ b/src/sync/crossbeam.rs @@ -1,6 +1,9 @@ +use std::sync::Arc; + #[cfg(feature = "sync-crossbeam")] use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; +use arc_swap::ArcSwap; use crossbeam_channel; type CrossbeamSucker = @@ -62,10 +65,10 @@ impl CrossbeamSuck { let (request_tx, request_rx) = CrossbeamChannel::create_request_channel(); let (response_tx, response_rx) = CrossbeamChannel::create_response_channel::(); - let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ValueSource::None)); + let state = ArcSwap::new(Arc::new(crate::types::ValueSource::None)); let sucker = crate::Sucker::new(request_tx, response_rx); - let sourcer = crate::Sourcer::new(request_rx, response_tx, std::sync::Arc::clone(&state)); + let sourcer = crate::Sourcer::new(request_rx, response_tx, state); (sucker, sourcer) } diff --git a/src/sync/flume.rs b/src/sync/flume.rs index a493513..78ec2a5 100644 --- a/src/sync/flume.rs +++ b/src/sync/flume.rs @@ -1,6 +1,9 @@ +use std::sync::Arc; + #[cfg(feature = "sync-flume")] use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; +use arc_swap::ArcSwap; use flume; type FlumeSucker = @@ -62,10 +65,11 @@ impl FlumeSuck { let (request_tx, request_rx) = FlumeChannel::create_request_channel(); let (response_tx, response_rx) = FlumeChannel::create_response_channel::(); - let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ValueSource::None)); + let state = Arc::new(crate::types::ValueSource::None); + let state = ArcSwap::new(state); let sucker = crate::Sucker::new(request_tx, response_rx); - let sourcer = crate::Sourcer::new(request_rx, response_tx, std::sync::Arc::clone(&state)); + let sourcer = crate::Sourcer::new(request_rx, response_tx, state); (sucker, sourcer) } diff --git a/src/sync/std.rs b/src/sync/std.rs index f58a845..8184a2b 100644 --- a/src/sync/std.rs +++ b/src/sync/std.rs @@ -1,5 +1,8 @@ +use arc_swap::ArcSwap; + use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; +use std::sync::Arc; #[cfg(feature = "sync-std")] use std::sync::mpsc; @@ -60,10 +63,11 @@ impl StdSuck { let (request_tx, request_rx) = StdChannel::create_request_channel(); let (response_tx, response_rx) = StdChannel::create_response_channel::(); - let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ValueSource::None)); + let state = Arc::new(crate::types::ValueSource::None); + let state = ArcSwap::new(state); let sucker = crate::Sucker::new(request_tx, response_rx); - let sourcer = crate::Sourcer::new(request_rx, response_tx, std::sync::Arc::clone(&state)); + let sourcer = crate::Sourcer::new(request_rx, response_tx, state); (sucker, sourcer) } diff --git a/src/types.rs b/src/types.rs index 32eb625..d20e022 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,4 +1,6 @@ -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; + +use arc_swap::ArcSwap; /// Request messages sent from consumer to producer pub enum Request { @@ -16,10 +18,10 @@ pub enum Response { /// Represents the source of values: either static or dynamic pub(crate) enum ValueSource { Static(T), - Dynamic(Box T + Send + Sync + 'static>), + Dynamic(Mutex T + Send + Sync + 'static>>), None, // Never set Cleared, // Was set but cleared (closed) } /// Internal channel state shared between producer and consumer -pub(crate) type ChannelState = Arc>>; +pub(crate) type ChannelState = ArcSwap>; From 7c8fc42f277c501c19e004232c83815627803f6b Mon Sep 17 00:00:00 2001 From: Roman Moisieiev Date: Wed, 8 Oct 2025 11:39:28 +0100 Subject: [PATCH 7/8] Use separate .set and .set_mut methods --- src/channel.rs | 21 +++++++++++++++++++-- src/types.rs | 3 ++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 06e1dfe..3f889e1 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -73,13 +73,23 @@ where Ok(()) } - /// Set a closure + /// Set a closure that implements [Fn] pub fn set(&self, closure: F) -> Result<(), Error> where F: Fn() -> T + Send + Sync + 'static, { self.state - .swap(Arc::new(ValueSource::Dynamic(Mutex::new(Box::new( + .swap(Arc::new(ValueSource::Dynamic(Box::new(closure)))); + Ok(()) + } + + /// Set a closure that implements [FnMut] + pub fn set_mut(&self, closure: F) -> Result<(), Error> + where + F: FnMut() -> T + Send + Sync + 'static, + { + self.state + .swap(Arc::new(ValueSource::DynamicMut(Mutex::new(Box::new( closure, ))))); Ok(()) @@ -122,6 +132,13 @@ where match &**state { ValueSource::Static(value) => Ok(Response::Value(value.clone())), ValueSource::Dynamic(closure) => { + let value = self.execute_closure_safely(&mut || closure()); + match value { + Ok(v) => Ok(Response::Value(v)), + Err(_) => Ok(Response::NoSource), // Closure execution failed + } + } + ValueSource::DynamicMut(closure) => { let mut closure = closure.lock().unwrap(); let value = self.execute_closure_safely(&mut *closure); match value { diff --git a/src/types.rs b/src/types.rs index d20e022..937ce51 100644 --- a/src/types.rs +++ b/src/types.rs @@ -18,7 +18,8 @@ pub enum Response { /// Represents the source of values: either static or dynamic pub(crate) enum ValueSource { Static(T), - Dynamic(Mutex T + Send + Sync + 'static>>), + DynamicMut(Mutex T + Send + Sync + 'static>>), + Dynamic(Box T + Send + Sync + 'static>), None, // Never set Cleared, // Was set but cleared (closed) } From 4c13facc96602cbbc1ee439612fbb047aed2ebb4 Mon Sep 17 00:00:00 2001 From: Roman Moisieiev Date: Thu, 9 Oct 2025 11:02:20 +0100 Subject: [PATCH 8/8] Relax T: Clone bound to only apply to static values --- src/channel.rs | 20 ++++++++++++++++---- src/types.rs | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 3f889e1..c3c8159 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -63,13 +63,19 @@ where impl Sourcer where - T: Clone + Send + 'static, + T: Send + 'static, SR: ChannelReceiver, ST: ChannelSender>, { /// Set a fixed value - pub fn set_static(&self, value: T) -> Result<(), Error> { - self.state.swap(Arc::new(ValueSource::Static(value))); + pub fn set_static(&self, val: T) -> Result<(), Error> + where + T: Clone, + { + self.state.swap(Arc::new(ValueSource::Static { + val, + clone: T::clone, + })); Ok(()) } @@ -130,7 +136,13 @@ where let state = self.state.load(); match &**state { - ValueSource::Static(value) => Ok(Response::Value(value.clone())), + ValueSource::Static { val, clone } => { + let value = self.execute_closure_safely(&mut || clone(val)); + match value { + Ok(v) => Ok(Response::Value(v)), + Err(_) => Ok(Response::NoSource), // Closure execution failed + } + } ValueSource::Dynamic(closure) => { let value = self.execute_closure_safely(&mut || closure()); match value { diff --git a/src/types.rs b/src/types.rs index 937ce51..244584b 100644 --- a/src/types.rs +++ b/src/types.rs @@ -17,7 +17,7 @@ pub enum Response { /// Represents the source of values: either static or dynamic pub(crate) enum ValueSource { - Static(T), + Static { val: T, clone: fn(&T) -> T }, DynamicMut(Mutex T + Send + Sync + 'static>>), Dynamic(Box T + Send + Sync + 'static>), None, // Never set