From bb5950cd76ee2d22744d1df31add20a1a2180152 Mon Sep 17 00:00:00 2001 From: CordlessCoder Date: Wed, 17 Sep 2025 01:23:25 +0100 Subject: [PATCH 1/5] Relax dynamic source requirement from Fn to FnMut --- src/channel.rs | 6 +++--- src/types.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 3ce9e7d..531c0bc 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -117,9 +117,9 @@ where } fn handle_get_value(&self) -> Result, Error> { - let state = self.state.lock().map_err(|_| Error::InternalError)?; + let mut state = self.state.lock().map_err(|_| Error::InternalError)?; - match &*state { + match &mut *state { ValueSource::Static(value) => Ok(Response::Value(value.clone())), ValueSource::Dynamic(closure) => { let value = self.execute_closure_safely(closure); @@ -135,7 +135,7 @@ where fn execute_closure_safely( &self, - closure: &dyn Fn() -> T, + closure: &mut dyn FnMut() -> T, ) -> Result> { std::panic::catch_unwind(std::panic::AssertUnwindSafe(closure)) } diff --git a/src/types.rs b/src/types.rs index 203a743..32eb625 100644 --- a/src/types.rs +++ b/src/types.rs @@ -16,7 +16,7 @@ pub enum Response { /// Represents the source of values: either static or dynamic pub(crate) enum ValueSource { Static(T), - Dynamic(Box T + Send + Sync + 'static>), + Dynamic(Box T + Send + Sync + 'static>), None, // Never set Cleared, // Was set but cleared (closed) } From 016fe5302d4c346c2a25a1ee578557bff3781f29 Mon Sep 17 00:00:00 2001 From: CordlessCoder Date: Wed, 17 Sep 2025 01:34:54 +0100 Subject: [PATCH 2/5] Allow for completely lock-free access to Static values, and closed channels. --- Cargo.toml | 1 + src/channel.rs | 23 ++++++++++++----------- src/sync/crossbeam.rs | 7 +++++-- src/sync/flume.rs | 8 ++++++-- src/sync/std.rs | 8 ++++++-- src/types.rs | 8 +++++--- 6 files changed, 35 insertions(+), 20 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 94fe60b..ada7cc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ exclude = ["flake.nix", "flake.lock", ".envrc", "cliff.toml", "release-plz.toml" thiserror = "2.0" flume = { version = "0.11", optional = true } crossbeam-channel = { version = "0.5", optional = true } +arc-swap = "1.7.1" [features] default = ["all"] diff --git a/src/channel.rs b/src/channel.rs index 531c0bc..06e1dfe 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,4 +1,5 @@ use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; use crate::error::Error; use crate::sync::traits::{ChannelReceiver, ChannelSender}; @@ -68,8 +69,7 @@ where { /// Set a fixed value pub fn set_static(&self, value: T) -> Result<(), Error> { - let mut state = self.state.lock().map_err(|_| Error::InternalError)?; - *state = ValueSource::Static(value); + self.state.swap(Arc::new(ValueSource::Static(value))); Ok(()) } @@ -78,15 +78,16 @@ where where F: Fn() -> T + Send + Sync + 'static, { - let mut state = self.state.lock().map_err(|_| Error::InternalError)?; - *state = ValueSource::Dynamic(Box::new(closure)); + self.state + .swap(Arc::new(ValueSource::Dynamic(Mutex::new(Box::new( + closure, + ))))); Ok(()) } /// Close the channel pub fn close(&self) -> Result<(), Error> { - let mut state = self.state.lock().map_err(|_| Error::InternalError)?; - *state = ValueSource::Cleared; + self.state.swap(Arc::new(ValueSource::Cleared)); Ok(()) } @@ -103,8 +104,7 @@ where } Ok(Request::Close) => { // Close channel - let mut state = self.state.lock().map_err(|_| Error::InternalError)?; - *state = ValueSource::Cleared; + self.close()?; break; } Err(_) => { @@ -117,12 +117,13 @@ where } fn handle_get_value(&self) -> Result, Error> { - let mut state = self.state.lock().map_err(|_| Error::InternalError)?; + let state = self.state.load(); - match &mut *state { + match &**state { ValueSource::Static(value) => Ok(Response::Value(value.clone())), ValueSource::Dynamic(closure) => { - let value = self.execute_closure_safely(closure); + let mut closure = closure.lock().unwrap(); + let value = self.execute_closure_safely(&mut *closure); match value { Ok(v) => Ok(Response::Value(v)), Err(_) => Ok(Response::NoSource), // Closure execution failed diff --git a/src/sync/crossbeam.rs b/src/sync/crossbeam.rs index 99c6a9c..4e72624 100644 --- a/src/sync/crossbeam.rs +++ b/src/sync/crossbeam.rs @@ -1,6 +1,9 @@ +use std::sync::Arc; + #[cfg(feature = "sync-crossbeam")] use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; +use arc_swap::ArcSwap; use crossbeam_channel; type CrossbeamSucker = @@ -62,10 +65,10 @@ impl CrossbeamSuck { let (request_tx, request_rx) = CrossbeamChannel::create_request_channel(); let (response_tx, response_rx) = CrossbeamChannel::create_response_channel::(); - let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ValueSource::None)); + let state = ArcSwap::new(Arc::new(crate::types::ValueSource::None)); let sucker = crate::Sucker::new(request_tx, response_rx); - let sourcer = crate::Sourcer::new(request_rx, response_tx, std::sync::Arc::clone(&state)); + let sourcer = crate::Sourcer::new(request_rx, response_tx, state); (sucker, sourcer) } diff --git a/src/sync/flume.rs b/src/sync/flume.rs index a493513..78ec2a5 100644 --- a/src/sync/flume.rs +++ b/src/sync/flume.rs @@ -1,6 +1,9 @@ +use std::sync::Arc; + #[cfg(feature = "sync-flume")] use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; +use arc_swap::ArcSwap; use flume; type FlumeSucker = @@ -62,10 +65,11 @@ impl FlumeSuck { let (request_tx, request_rx) = FlumeChannel::create_request_channel(); let (response_tx, response_rx) = FlumeChannel::create_response_channel::(); - let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ValueSource::None)); + let state = Arc::new(crate::types::ValueSource::None); + let state = ArcSwap::new(state); let sucker = crate::Sucker::new(request_tx, response_rx); - let sourcer = crate::Sourcer::new(request_rx, response_tx, std::sync::Arc::clone(&state)); + let sourcer = crate::Sourcer::new(request_rx, response_tx, state); (sucker, sourcer) } diff --git a/src/sync/std.rs b/src/sync/std.rs index f58a845..8184a2b 100644 --- a/src/sync/std.rs +++ b/src/sync/std.rs @@ -1,5 +1,8 @@ +use arc_swap::ArcSwap; + use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType}; use crate::types; +use std::sync::Arc; #[cfg(feature = "sync-std")] use std::sync::mpsc; @@ -60,10 +63,11 @@ impl StdSuck { let (request_tx, request_rx) = StdChannel::create_request_channel(); let (response_tx, response_rx) = StdChannel::create_response_channel::(); - let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ValueSource::None)); + let state = Arc::new(crate::types::ValueSource::None); + let state = ArcSwap::new(state); let sucker = crate::Sucker::new(request_tx, response_rx); - let sourcer = crate::Sourcer::new(request_rx, response_tx, std::sync::Arc::clone(&state)); + let sourcer = crate::Sourcer::new(request_rx, response_tx, state); (sucker, sourcer) } diff --git a/src/types.rs b/src/types.rs index 32eb625..d20e022 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,4 +1,6 @@ -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; + +use arc_swap::ArcSwap; /// Request messages sent from consumer to producer pub enum Request { @@ -16,10 +18,10 @@ pub enum Response { /// Represents the source of values: either static or dynamic pub(crate) enum ValueSource { Static(T), - Dynamic(Box T + Send + Sync + 'static>), + Dynamic(Mutex T + Send + Sync + 'static>>), None, // Never set Cleared, // Was set but cleared (closed) } /// Internal channel state shared between producer and consumer -pub(crate) type ChannelState = Arc>>; +pub(crate) type ChannelState = ArcSwap>; From 7c8fc42f277c501c19e004232c83815627803f6b Mon Sep 17 00:00:00 2001 From: Roman Moisieiev Date: Wed, 8 Oct 2025 11:39:28 +0100 Subject: [PATCH 3/5] Use separate .set and .set_mut methods --- src/channel.rs | 21 +++++++++++++++++++-- src/types.rs | 3 ++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 06e1dfe..3f889e1 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -73,13 +73,23 @@ where Ok(()) } - /// Set a closure + /// Set a closure that implements [Fn] pub fn set(&self, closure: F) -> Result<(), Error> where F: Fn() -> T + Send + Sync + 'static, { self.state - .swap(Arc::new(ValueSource::Dynamic(Mutex::new(Box::new( + .swap(Arc::new(ValueSource::Dynamic(Box::new(closure)))); + Ok(()) + } + + /// Set a closure that implements [FnMut] + pub fn set_mut(&self, closure: F) -> Result<(), Error> + where + F: FnMut() -> T + Send + Sync + 'static, + { + self.state + .swap(Arc::new(ValueSource::DynamicMut(Mutex::new(Box::new( closure, ))))); Ok(()) @@ -122,6 +132,13 @@ where match &**state { ValueSource::Static(value) => Ok(Response::Value(value.clone())), ValueSource::Dynamic(closure) => { + let value = self.execute_closure_safely(&mut || closure()); + match value { + Ok(v) => Ok(Response::Value(v)), + Err(_) => Ok(Response::NoSource), // Closure execution failed + } + } + ValueSource::DynamicMut(closure) => { let mut closure = closure.lock().unwrap(); let value = self.execute_closure_safely(&mut *closure); match value { diff --git a/src/types.rs b/src/types.rs index d20e022..937ce51 100644 --- a/src/types.rs +++ b/src/types.rs @@ -18,7 +18,8 @@ pub enum Response { /// Represents the source of values: either static or dynamic pub(crate) enum ValueSource { Static(T), - Dynamic(Mutex T + Send + Sync + 'static>>), + DynamicMut(Mutex T + Send + Sync + 'static>>), + Dynamic(Box T + Send + Sync + 'static>), None, // Never set Cleared, // Was set but cleared (closed) } From 4c13facc96602cbbc1ee439612fbb047aed2ebb4 Mon Sep 17 00:00:00 2001 From: Roman Moisieiev Date: Thu, 9 Oct 2025 11:02:20 +0100 Subject: [PATCH 4/5] Relax T: Clone bound to only apply to static values --- src/channel.rs | 20 ++++++++++++++++---- src/types.rs | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 3f889e1..c3c8159 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -63,13 +63,19 @@ where impl Sourcer where - T: Clone + Send + 'static, + T: Send + 'static, SR: ChannelReceiver, ST: ChannelSender>, { /// Set a fixed value - pub fn set_static(&self, value: T) -> Result<(), Error> { - self.state.swap(Arc::new(ValueSource::Static(value))); + pub fn set_static(&self, val: T) -> Result<(), Error> + where + T: Clone, + { + self.state.swap(Arc::new(ValueSource::Static { + val, + clone: T::clone, + })); Ok(()) } @@ -130,7 +136,13 @@ where let state = self.state.load(); match &**state { - ValueSource::Static(value) => Ok(Response::Value(value.clone())), + ValueSource::Static { val, clone } => { + let value = self.execute_closure_safely(&mut || clone(val)); + match value { + Ok(v) => Ok(Response::Value(v)), + Err(_) => Ok(Response::NoSource), // Closure execution failed + } + } ValueSource::Dynamic(closure) => { let value = self.execute_closure_safely(&mut || closure()); match value { diff --git a/src/types.rs b/src/types.rs index 937ce51..244584b 100644 --- a/src/types.rs +++ b/src/types.rs @@ -17,7 +17,7 @@ pub enum Response { /// Represents the source of values: either static or dynamic pub(crate) enum ValueSource { - Static(T), + Static { val: T, clone: fn(&T) -> T }, DynamicMut(Mutex T + Send + Sync + 'static>>), Dynamic(Box T + Send + Sync + 'static>), None, // Never set From 4cd5c65ea5cd8cdbe366c1000be1c9ec8d533354 Mon Sep 17 00:00:00 2001 From: Callum Leslie Date: Tue, 14 Oct 2025 09:18:04 +0100 Subject: [PATCH 5/5] chore: release v0.0.3 --- CHANGELOG.md | 10 ++++++++++ Cargo.toml | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25e85a7..aa68df1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [0.0.3] - 2025-10-14 + +### 🚀 Features + +- Remove closed flag from ChannelState +- Add internal constructor for `Sucker`/`Sourcer` + +### 🐛 Bug Fixes + +- Correct toolchain in flake ## [0.0.2] - 2025-09-04 ### 🚀 Features diff --git a/Cargo.toml b/Cargo.toml index ada7cc2..0c7bdbb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "suck" description = "Suck data up through a channel" authors = ["Callum Leslie "] -version = "0.0.2" +version = "0.0.3" edition = "2024" documentation = "https://docs.rs/suck" homepage = "https://github.com/callumio/suck"