mirror of
https://github.com/callumio/suck.git
synced 2025-12-17 03:29:21 +00:00
feat: add multiple channel providers
Each provider is enabled via a feature flag. The currently implemented providers are: - std::mpsc - flume - crossbeam_channel
This commit is contained in:
parent
6b754e13e8
commit
faa5ba23c5
10 changed files with 761 additions and 189 deletions
19
Cargo.toml
19
Cargo.toml
|
|
@ -15,5 +15,24 @@ exclude = ["flake.nix", "flake.lock", ".envrc", "cliff.toml", "release-plz.toml"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
thiserror = "2.0"
|
thiserror = "2.0"
|
||||||
|
flume = { version = "0.11", optional = true }
|
||||||
|
crossbeam-channel = { version = "0.5", optional = true }
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = ["all"]
|
||||||
|
|
||||||
|
sync = []
|
||||||
|
async = []
|
||||||
|
|
||||||
|
sync-std = ["sync"]
|
||||||
|
sync-flume = ["sync", "dep:flume"]
|
||||||
|
sync-crossbeam = ["sync", "dep:crossbeam-channel"]
|
||||||
|
|
||||||
|
all-sync = ["sync-std", "sync-flume", "sync-crossbeam"]
|
||||||
|
|
||||||
|
all = ["all-sync"]
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
|
|
||||||
|
[package.metadata.docs.rs]
|
||||||
|
all-features = true
|
||||||
|
|
|
||||||
|
|
@ -32,11 +32,11 @@ suck = "*"
|
||||||
## Quick Start
|
## Quick Start
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
use suck::SuckPair;
|
use suck::sync::StdSuck;
|
||||||
|
|
||||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
// Create a pair
|
// Create a pair (using default std backend)
|
||||||
let (sucker, sourcer) = SuckPair::<i32>::pair();
|
let (sucker, sourcer) = StdSuck::<i32>::pair();
|
||||||
|
|
||||||
// Start producer in a thread
|
// Start producer in a thread
|
||||||
let producer = std::thread::spawn(move || {
|
let producer = std::thread::spawn(move || {
|
||||||
|
|
|
||||||
|
|
@ -1,61 +1,38 @@
|
||||||
use std::sync::mpsc;
|
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
|
use crate::sync::traits::{ChannelReceiver, ChannelSender};
|
||||||
use crate::types::{ChannelState, Request, Response, ValueSource};
|
use crate::types::{ChannelState, Request, Response, ValueSource};
|
||||||
|
|
||||||
/// The consumer side of the channel that requests values
|
/// The consumer side of the channel that requests values
|
||||||
pub struct Sucker<T> {
|
pub struct Sucker<T, ST, SR>
|
||||||
request_tx: mpsc::Sender<Request>,
|
where
|
||||||
response_rx: mpsc::Receiver<Response<T>>,
|
ST: ChannelSender<Request>,
|
||||||
closed: Mutex<bool>,
|
SR: ChannelReceiver<Response<T>>,
|
||||||
|
{
|
||||||
|
pub(crate) request_tx: ST,
|
||||||
|
pub(crate) response_rx: SR,
|
||||||
|
pub(crate) closed: Mutex<bool>,
|
||||||
|
pub(crate) _phantom: std::marker::PhantomData<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The producer side of the channel that provides values
|
/// The producer side of the channel that provides values
|
||||||
pub struct Sourcer<T> {
|
pub struct Sourcer<T, SR, ST>
|
||||||
request_rx: mpsc::Receiver<Request>,
|
where
|
||||||
response_tx: mpsc::Sender<Response<T>>,
|
SR: ChannelReceiver<Request>,
|
||||||
state: Arc<Mutex<ChannelState<T>>>,
|
ST: ChannelSender<Response<T>>,
|
||||||
|
{
|
||||||
|
pub(crate) request_rx: SR,
|
||||||
|
pub(crate) response_tx: ST,
|
||||||
|
pub(crate) state: Arc<Mutex<ChannelState<T>>>,
|
||||||
|
pub(crate) _phantom: std::marker::PhantomData<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Helper type for creating Sucker and Sourcer instances
|
impl<T, SR, ST> Sourcer<T, SR, ST>
|
||||||
pub struct SuckPair<T> {
|
|
||||||
_phantom: std::marker::PhantomData<T>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> SuckPair<T> {
|
|
||||||
/// Create a new suck pair
|
|
||||||
pub fn pair() -> (Sucker<T>, Sourcer<T>)
|
|
||||||
where
|
|
||||||
T: Clone + Send + 'static,
|
|
||||||
{
|
|
||||||
let (request_tx, request_rx) = mpsc::channel();
|
|
||||||
let (response_tx, response_rx) = mpsc::channel();
|
|
||||||
|
|
||||||
let state = Arc::new(Mutex::new(ChannelState {
|
|
||||||
source: ValueSource::None,
|
|
||||||
closed: false,
|
|
||||||
}));
|
|
||||||
|
|
||||||
let sucker = Sucker {
|
|
||||||
request_tx,
|
|
||||||
response_rx,
|
|
||||||
closed: Mutex::new(false),
|
|
||||||
};
|
|
||||||
|
|
||||||
let sourcer = Sourcer {
|
|
||||||
request_rx,
|
|
||||||
response_tx,
|
|
||||||
state: Arc::clone(&state),
|
|
||||||
};
|
|
||||||
|
|
||||||
(sucker, sourcer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Sourcer<T>
|
|
||||||
where
|
where
|
||||||
T: Clone + Send + 'static,
|
T: Clone + Send + 'static,
|
||||||
|
SR: ChannelReceiver<Request>,
|
||||||
|
ST: ChannelSender<Response<T>>,
|
||||||
{
|
{
|
||||||
/// Set a fixed value
|
/// Set a fixed value
|
||||||
pub fn set_static(&self, value: T) -> Result<(), Error> {
|
pub fn set_static(&self, value: T) -> Result<(), Error> {
|
||||||
|
|
@ -142,7 +119,11 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Sucker<T> {
|
impl<T, ST, SR> Sucker<T, ST, SR>
|
||||||
|
where
|
||||||
|
ST: ChannelSender<Request>,
|
||||||
|
SR: ChannelReceiver<Response<T>>,
|
||||||
|
{
|
||||||
/// Get the current value from the producer
|
/// Get the current value from the producer
|
||||||
pub fn get(&self) -> Result<T, Error> {
|
pub fn get(&self) -> Result<T, Error> {
|
||||||
// Check if locally marked as closed
|
// Check if locally marked as closed
|
||||||
|
|
|
||||||
141
src/lib.rs
141
src/lib.rs
|
|
@ -1,145 +1,14 @@
|
||||||
#![doc = include_str!("../README.md")]
|
#![doc = include_str!("../README.md")]
|
||||||
|
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
|
||||||
|
|
||||||
pub mod channel;
|
pub mod channel;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
|
|
||||||
|
#[cfg(feature = "sync")]
|
||||||
|
pub mod sync;
|
||||||
pub mod types;
|
pub mod types;
|
||||||
|
|
||||||
// Re-exports
|
pub use channel::{Sourcer, Sucker};
|
||||||
pub use channel::{Sourcer, SuckPair, Sucker};
|
|
||||||
pub use error::Error;
|
pub use error::Error;
|
||||||
pub use types::ValueSource;
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use std::thread;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_pre_computed_value() {
|
|
||||||
let (sucker, sourcer) = SuckPair::<i32>::pair();
|
|
||||||
|
|
||||||
// Start producer
|
|
||||||
let producer_handle = thread::spawn(move || {
|
|
||||||
sourcer.set_static(42).unwrap();
|
|
||||||
sourcer.run().unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
// Ensure consumer gets the value
|
|
||||||
let value = sucker.get().unwrap();
|
|
||||||
assert_eq!(value, 42);
|
|
||||||
|
|
||||||
// Close consumer
|
|
||||||
sucker.close().unwrap();
|
|
||||||
|
|
||||||
producer_handle.join().unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_closure_value() {
|
|
||||||
let (sucker, sourcer) = SuckPair::<i32>::pair();
|
|
||||||
|
|
||||||
// Start producer
|
|
||||||
let producer_handle = std::thread::spawn(move || {
|
|
||||||
let counter = std::sync::Arc::new(std::sync::Mutex::new(0));
|
|
||||||
let counter_clone = std::sync::Arc::clone(&counter);
|
|
||||||
sourcer
|
|
||||||
.set(move || {
|
|
||||||
let mut count = counter_clone.lock().unwrap();
|
|
||||||
*count += 1;
|
|
||||||
*count
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
sourcer.run().unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
// Ensure consumer gets the value
|
|
||||||
let value1 = sucker.get().unwrap();
|
|
||||||
assert_eq!(value1, 1);
|
|
||||||
|
|
||||||
// Ensure consumer gets the next value
|
|
||||||
let value2 = sucker.get().unwrap();
|
|
||||||
assert_eq!(value2, 2);
|
|
||||||
|
|
||||||
// Close consumer
|
|
||||||
sucker.close().unwrap();
|
|
||||||
|
|
||||||
producer_handle.join().unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_no_source_error() {
|
|
||||||
let (sucker, sourcer) = SuckPair::<i32>::pair();
|
|
||||||
|
|
||||||
// Start producer
|
|
||||||
let producer_handle = thread::spawn(move || {
|
|
||||||
sourcer.run().unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
// Consumer should get NoSource error
|
|
||||||
let result = sucker.get();
|
|
||||||
assert!(matches!(result, Err(Error::NoSource)));
|
|
||||||
|
|
||||||
// Close consumer
|
|
||||||
sucker.close().unwrap();
|
|
||||||
|
|
||||||
producer_handle.join().unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_channel_closed_error() {
|
|
||||||
let (sucker, sourcer) = SuckPair::<i32>::pair();
|
|
||||||
|
|
||||||
// Start producer
|
|
||||||
let producer_handle = thread::spawn(move || {
|
|
||||||
sourcer.set_static(42).unwrap();
|
|
||||||
sourcer.run().unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
// Close consumer
|
|
||||||
sucker.close().unwrap();
|
|
||||||
|
|
||||||
let result = sucker.get();
|
|
||||||
assert!(matches!(result, Err(Error::ChannelClosed)));
|
|
||||||
|
|
||||||
producer_handle.join().unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_producer_disconnection_error() {
|
|
||||||
let (sucker, sourcer) = SuckPair::<i32>::pair();
|
|
||||||
|
|
||||||
// Start producer
|
|
||||||
let producer_handle = thread::spawn(move || {
|
|
||||||
sourcer.set_static(42).unwrap();
|
|
||||||
// Simulate crash
|
|
||||||
panic!("Producer crashed!");
|
|
||||||
});
|
|
||||||
|
|
||||||
let result = sucker.get();
|
|
||||||
assert!(matches!(result, Err(Error::ProducerDisconnected)));
|
|
||||||
|
|
||||||
let _ = producer_handle.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_is_closed() {
|
|
||||||
let (sucker, sourcer) = SuckPair::<i32>::pair();
|
|
||||||
|
|
||||||
assert!(!sucker.is_closed());
|
|
||||||
|
|
||||||
// Start producer
|
|
||||||
let producer_handle = thread::spawn(move || {
|
|
||||||
sourcer.set_static(42).unwrap();
|
|
||||||
sourcer.run().unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
// Get one value
|
|
||||||
let _ = sucker.get().unwrap();
|
|
||||||
assert!(!sucker.is_closed());
|
|
||||||
|
|
||||||
// Close and check
|
|
||||||
sucker.close().unwrap();
|
|
||||||
|
|
||||||
producer_handle.join().unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
221
src/sync/crossbeam.rs
Normal file
221
src/sync/crossbeam.rs
Normal file
|
|
@ -0,0 +1,221 @@
|
||||||
|
#[cfg(feature = "sync-crossbeam")]
|
||||||
|
use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType};
|
||||||
|
use crate::types;
|
||||||
|
use crossbeam_channel;
|
||||||
|
|
||||||
|
type CrossbeamSucker<T> = crate::Sucker<T, CrossbeamSender<types::Request>, CrossbeamReceiver<types::Response<T>>>;
|
||||||
|
type CrossbeamSourcer<T> = crate::Sourcer<T, CrossbeamReceiver<types::Request>, CrossbeamSender<types::Response<T>>>;
|
||||||
|
|
||||||
|
/// Internal sender type for crossbeam backend
|
||||||
|
pub struct CrossbeamSender<T>(crossbeam_channel::Sender<T>);
|
||||||
|
/// Internal receiver type for crossbeam backend
|
||||||
|
pub struct CrossbeamReceiver<T>(crossbeam_channel::Receiver<T>);
|
||||||
|
|
||||||
|
impl<T> ChannelSender<T> for CrossbeamSender<T> {
|
||||||
|
fn send(&self, msg: T) -> Result<(), ChannelError> {
|
||||||
|
self.0
|
||||||
|
.send(msg)
|
||||||
|
.map_err(|_| ChannelError::ProducerDisconnected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> ChannelReceiver<T> for CrossbeamReceiver<T> {
|
||||||
|
fn recv(&self) -> Result<T, ChannelError> {
|
||||||
|
self.0
|
||||||
|
.recv()
|
||||||
|
.map_err(|_| ChannelError::ProducerDisconnected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Internal channel type for crossbeam backend
|
||||||
|
pub struct CrossbeamChannel;
|
||||||
|
|
||||||
|
impl ChannelType for CrossbeamChannel {
|
||||||
|
type Sender<T> = CrossbeamSender<T>;
|
||||||
|
type Receiver<T> = CrossbeamReceiver<T>;
|
||||||
|
|
||||||
|
fn create_request_channel() -> (Self::Sender<types::Request>, Self::Receiver<types::Request>) {
|
||||||
|
let (tx, rx) = crossbeam_channel::unbounded();
|
||||||
|
(CrossbeamSender(tx), CrossbeamReceiver(rx))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_response_channel<T>() -> (
|
||||||
|
Self::Sender<types::Response<T>>,
|
||||||
|
Self::Receiver<types::Response<T>>,
|
||||||
|
) {
|
||||||
|
let (tx, rx) = crossbeam_channel::unbounded();
|
||||||
|
(CrossbeamSender(tx), CrossbeamReceiver(rx))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct CrossbeamSuck<T> {
|
||||||
|
_phantom: std::marker::PhantomData<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> CrossbeamSuck<T> {
|
||||||
|
pub fn pair() -> (CrossbeamSucker<T>, CrossbeamSourcer<T>)
|
||||||
|
where
|
||||||
|
T: Clone + Send + 'static,
|
||||||
|
{
|
||||||
|
let (request_tx, request_rx) = CrossbeamChannel::create_request_channel();
|
||||||
|
let (response_tx, response_rx) = CrossbeamChannel::create_response_channel::<T>();
|
||||||
|
|
||||||
|
let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ChannelState {
|
||||||
|
source: crate::types::ValueSource::None,
|
||||||
|
closed: false,
|
||||||
|
}));
|
||||||
|
|
||||||
|
let sucker = crate::Sucker {
|
||||||
|
request_tx,
|
||||||
|
response_rx,
|
||||||
|
closed: std::sync::Mutex::new(false),
|
||||||
|
_phantom: std::marker::PhantomData,
|
||||||
|
};
|
||||||
|
|
||||||
|
let sourcer = crate::Sourcer {
|
||||||
|
request_rx,
|
||||||
|
response_tx,
|
||||||
|
state: std::sync::Arc::clone(&state),
|
||||||
|
_phantom: std::marker::PhantomData,
|
||||||
|
};
|
||||||
|
|
||||||
|
(sucker, sourcer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::Error;
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_pre_computed_value() {
|
||||||
|
let (sucker, sourcer) = CrossbeamSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.set_static(42).unwrap();
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Ensure consumer gets the value
|
||||||
|
let value = sucker.get().unwrap();
|
||||||
|
assert_eq!(value, 42);
|
||||||
|
|
||||||
|
// Close consumer
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_closure_value() {
|
||||||
|
let (sucker, sourcer) = CrossbeamSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = std::thread::spawn(move || {
|
||||||
|
let counter = std::sync::Arc::new(std::sync::Mutex::new(0));
|
||||||
|
let counter_clone = std::sync::Arc::clone(&counter);
|
||||||
|
sourcer
|
||||||
|
.set(move || {
|
||||||
|
let mut count = counter_clone.lock().unwrap();
|
||||||
|
*count += 1;
|
||||||
|
*count
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Ensure consumer gets the value
|
||||||
|
let value1 = sucker.get().unwrap();
|
||||||
|
assert_eq!(value1, 1);
|
||||||
|
|
||||||
|
// Ensure consumer gets the next value
|
||||||
|
let value2 = sucker.get().unwrap();
|
||||||
|
assert_eq!(value2, 2);
|
||||||
|
|
||||||
|
// Close consumer
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_no_source_error() {
|
||||||
|
let (sucker, sourcer) = CrossbeamSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Consumer should get NoSource error
|
||||||
|
let result = sucker.get();
|
||||||
|
assert!(matches!(result, Err(Error::NoSource)));
|
||||||
|
|
||||||
|
// Close consumer
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_channel_closed_error() {
|
||||||
|
let (sucker, sourcer) = CrossbeamSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.set_static(42).unwrap();
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Close consumer
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
let result = sucker.get();
|
||||||
|
assert!(matches!(result, Err(Error::ChannelClosed)));
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_producer_disconnection_error() {
|
||||||
|
let (sucker, sourcer) = CrossbeamSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.set_static(42).unwrap();
|
||||||
|
// Simulate crash
|
||||||
|
panic!("Producer crashed!");
|
||||||
|
});
|
||||||
|
|
||||||
|
let result = sucker.get();
|
||||||
|
assert!(matches!(result, Err(Error::ProducerDisconnected)));
|
||||||
|
|
||||||
|
let _ = producer_handle.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_is_closed() {
|
||||||
|
let (sucker, sourcer) = CrossbeamSuck::<i32>::pair();
|
||||||
|
|
||||||
|
assert!(!sucker.is_closed());
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.set_static(42).unwrap();
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Get one value
|
||||||
|
let _ = sucker.get().unwrap();
|
||||||
|
assert!(!sucker.is_closed());
|
||||||
|
|
||||||
|
// Close and check
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
221
src/sync/flume.rs
Normal file
221
src/sync/flume.rs
Normal file
|
|
@ -0,0 +1,221 @@
|
||||||
|
#[cfg(feature = "sync-flume")]
|
||||||
|
use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType};
|
||||||
|
use crate::types;
|
||||||
|
use flume;
|
||||||
|
|
||||||
|
type FlumeSucker<T> = crate::Sucker<T, FlumeSender<types::Request>, FlumeReceiver<types::Response<T>>>;
|
||||||
|
type FlumeSourcer<T> = crate::Sourcer<T, FlumeReceiver<types::Request>, FlumeSender<types::Response<T>>>;
|
||||||
|
|
||||||
|
/// Internal sender type for flume backend
|
||||||
|
pub struct FlumeSender<T>(flume::Sender<T>);
|
||||||
|
/// Internal receiver type for flume backend
|
||||||
|
pub struct FlumeReceiver<T>(flume::Receiver<T>);
|
||||||
|
|
||||||
|
impl<T> ChannelSender<T> for FlumeSender<T> {
|
||||||
|
fn send(&self, msg: T) -> Result<(), ChannelError> {
|
||||||
|
self.0
|
||||||
|
.send(msg)
|
||||||
|
.map_err(|_| ChannelError::ProducerDisconnected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> ChannelReceiver<T> for FlumeReceiver<T> {
|
||||||
|
fn recv(&self) -> Result<T, ChannelError> {
|
||||||
|
self.0
|
||||||
|
.recv()
|
||||||
|
.map_err(|_| ChannelError::ProducerDisconnected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Internal channel type for flume backend
|
||||||
|
pub struct FlumeChannel;
|
||||||
|
|
||||||
|
impl ChannelType for FlumeChannel {
|
||||||
|
type Sender<T> = FlumeSender<T>;
|
||||||
|
type Receiver<T> = FlumeReceiver<T>;
|
||||||
|
|
||||||
|
fn create_request_channel() -> (Self::Sender<types::Request>, Self::Receiver<types::Request>) {
|
||||||
|
let (tx, rx) = flume::unbounded();
|
||||||
|
(FlumeSender(tx), FlumeReceiver(rx))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_response_channel<T>() -> (
|
||||||
|
Self::Sender<types::Response<T>>,
|
||||||
|
Self::Receiver<types::Response<T>>,
|
||||||
|
) {
|
||||||
|
let (tx, rx) = flume::unbounded();
|
||||||
|
(FlumeSender(tx), FlumeReceiver(rx))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FlumeSuck<T> {
|
||||||
|
_phantom: std::marker::PhantomData<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> FlumeSuck<T> {
|
||||||
|
pub fn pair() -> (FlumeSucker<T>, FlumeSourcer<T>)
|
||||||
|
where
|
||||||
|
T: Clone + Send + 'static,
|
||||||
|
{
|
||||||
|
let (request_tx, request_rx) = FlumeChannel::create_request_channel();
|
||||||
|
let (response_tx, response_rx) = FlumeChannel::create_response_channel::<T>();
|
||||||
|
|
||||||
|
let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ChannelState {
|
||||||
|
source: crate::types::ValueSource::None,
|
||||||
|
closed: false,
|
||||||
|
}));
|
||||||
|
|
||||||
|
let sucker = crate::Sucker {
|
||||||
|
request_tx,
|
||||||
|
response_rx,
|
||||||
|
closed: std::sync::Mutex::new(false),
|
||||||
|
_phantom: std::marker::PhantomData,
|
||||||
|
};
|
||||||
|
|
||||||
|
let sourcer = crate::Sourcer {
|
||||||
|
request_rx,
|
||||||
|
response_tx,
|
||||||
|
state: std::sync::Arc::clone(&state),
|
||||||
|
_phantom: std::marker::PhantomData,
|
||||||
|
};
|
||||||
|
|
||||||
|
(sucker, sourcer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::Error;
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_pre_computed_value() {
|
||||||
|
let (sucker, sourcer) = FlumeSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.set_static(42).unwrap();
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Ensure consumer gets the value
|
||||||
|
let value = sucker.get().unwrap();
|
||||||
|
assert_eq!(value, 42);
|
||||||
|
|
||||||
|
// Close consumer
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_closure_value() {
|
||||||
|
let (sucker, sourcer) = FlumeSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = std::thread::spawn(move || {
|
||||||
|
let counter = std::sync::Arc::new(std::sync::Mutex::new(0));
|
||||||
|
let counter_clone = std::sync::Arc::clone(&counter);
|
||||||
|
sourcer
|
||||||
|
.set(move || {
|
||||||
|
let mut count = counter_clone.lock().unwrap();
|
||||||
|
*count += 1;
|
||||||
|
*count
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Ensure consumer gets the value
|
||||||
|
let value1 = sucker.get().unwrap();
|
||||||
|
assert_eq!(value1, 1);
|
||||||
|
|
||||||
|
// Ensure consumer gets the next value
|
||||||
|
let value2 = sucker.get().unwrap();
|
||||||
|
assert_eq!(value2, 2);
|
||||||
|
|
||||||
|
// Close consumer
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_no_source_error() {
|
||||||
|
let (sucker, sourcer) = FlumeSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Consumer should get NoSource error
|
||||||
|
let result = sucker.get();
|
||||||
|
assert!(matches!(result, Err(Error::NoSource)));
|
||||||
|
|
||||||
|
// Close consumer
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_channel_closed_error() {
|
||||||
|
let (sucker, sourcer) = FlumeSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.set_static(42).unwrap();
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Close consumer
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
let result = sucker.get();
|
||||||
|
assert!(matches!(result, Err(Error::ChannelClosed)));
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_producer_disconnection_error() {
|
||||||
|
let (sucker, sourcer) = FlumeSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.set_static(42).unwrap();
|
||||||
|
// Simulate crash
|
||||||
|
panic!("Producer crashed!");
|
||||||
|
});
|
||||||
|
|
||||||
|
let result = sucker.get();
|
||||||
|
assert!(matches!(result, Err(Error::ProducerDisconnected)));
|
||||||
|
|
||||||
|
let _ = producer_handle.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_is_closed() {
|
||||||
|
let (sucker, sourcer) = FlumeSuck::<i32>::pair();
|
||||||
|
|
||||||
|
assert!(!sucker.is_closed());
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.set_static(42).unwrap();
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Get one value
|
||||||
|
let _ = sucker.get().unwrap();
|
||||||
|
assert!(!sucker.is_closed());
|
||||||
|
|
||||||
|
// Close and check
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
17
src/sync/mod.rs
Normal file
17
src/sync/mod.rs
Normal file
|
|
@ -0,0 +1,17 @@
|
||||||
|
pub mod traits;
|
||||||
|
|
||||||
|
#[cfg(feature = "sync-crossbeam")]
|
||||||
|
pub mod crossbeam;
|
||||||
|
#[cfg(feature = "sync-flume")]
|
||||||
|
pub mod flume;
|
||||||
|
#[cfg(feature = "sync-std")]
|
||||||
|
pub mod std;
|
||||||
|
|
||||||
|
#[cfg(feature = "sync-flume")]
|
||||||
|
pub use flume::FlumeSuck;
|
||||||
|
|
||||||
|
#[cfg(feature = "sync-crossbeam")]
|
||||||
|
pub use crossbeam::CrossbeamSuck;
|
||||||
|
|
||||||
|
#[cfg(feature = "sync-std")]
|
||||||
|
pub use std::StdSuck;
|
||||||
221
src/sync/std.rs
Normal file
221
src/sync/std.rs
Normal file
|
|
@ -0,0 +1,221 @@
|
||||||
|
use crate::sync::traits::{ChannelError, ChannelReceiver, ChannelSender, ChannelType};
|
||||||
|
use crate::types;
|
||||||
|
#[cfg(feature = "sync-std")]
|
||||||
|
use std::sync::mpsc;
|
||||||
|
|
||||||
|
type StdSucker<T> = crate::Sucker<T, StdSender<types::Request>, StdReceiver<types::Response<T>>>;
|
||||||
|
type StdSourcer<T> = crate::Sourcer<T, StdReceiver<types::Request>, StdSender<types::Response<T>>>;
|
||||||
|
|
||||||
|
/// Internal sender type for std backend
|
||||||
|
pub struct StdSender<T>(mpsc::Sender<T>);
|
||||||
|
/// Internal receiver type for std backend
|
||||||
|
pub struct StdReceiver<T>(mpsc::Receiver<T>);
|
||||||
|
|
||||||
|
impl<T> ChannelSender<T> for StdSender<T> {
|
||||||
|
fn send(&self, msg: T) -> Result<(), ChannelError> {
|
||||||
|
self.0
|
||||||
|
.send(msg)
|
||||||
|
.map_err(|_| ChannelError::ProducerDisconnected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> ChannelReceiver<T> for StdReceiver<T> {
|
||||||
|
fn recv(&self) -> Result<T, ChannelError> {
|
||||||
|
self.0
|
||||||
|
.recv()
|
||||||
|
.map_err(|_| ChannelError::ProducerDisconnected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Internal channel type for std backend
|
||||||
|
pub struct StdChannel;
|
||||||
|
|
||||||
|
impl ChannelType for StdChannel {
|
||||||
|
type Sender<T> = StdSender<T>;
|
||||||
|
type Receiver<T> = StdReceiver<T>;
|
||||||
|
|
||||||
|
fn create_request_channel() -> (Self::Sender<types::Request>, Self::Receiver<types::Request>) {
|
||||||
|
let (tx, rx) = mpsc::channel();
|
||||||
|
(StdSender(tx), StdReceiver(rx))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_response_channel<T>() -> (
|
||||||
|
Self::Sender<types::Response<T>>,
|
||||||
|
Self::Receiver<types::Response<T>>,
|
||||||
|
) {
|
||||||
|
let (tx, rx) = mpsc::channel();
|
||||||
|
(StdSender(tx), StdReceiver(rx))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct StdSuck<T> {
|
||||||
|
_phantom: std::marker::PhantomData<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> StdSuck<T> {
|
||||||
|
pub fn pair() -> (StdSucker<T>, StdSourcer<T>)
|
||||||
|
where
|
||||||
|
T: Clone + Send + 'static,
|
||||||
|
{
|
||||||
|
let (request_tx, request_rx) = StdChannel::create_request_channel();
|
||||||
|
let (response_tx, response_rx) = StdChannel::create_response_channel::<T>();
|
||||||
|
|
||||||
|
let state = std::sync::Arc::new(std::sync::Mutex::new(crate::types::ChannelState {
|
||||||
|
source: crate::types::ValueSource::None,
|
||||||
|
closed: false,
|
||||||
|
}));
|
||||||
|
|
||||||
|
let sucker = crate::Sucker {
|
||||||
|
request_tx,
|
||||||
|
response_rx,
|
||||||
|
closed: std::sync::Mutex::new(false),
|
||||||
|
_phantom: std::marker::PhantomData,
|
||||||
|
};
|
||||||
|
|
||||||
|
let sourcer = crate::Sourcer {
|
||||||
|
request_rx,
|
||||||
|
response_tx,
|
||||||
|
state: std::sync::Arc::clone(&state),
|
||||||
|
_phantom: std::marker::PhantomData,
|
||||||
|
};
|
||||||
|
|
||||||
|
(sucker, sourcer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::Error;
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_pre_computed_value() {
|
||||||
|
let (sucker, sourcer) = StdSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.set_static(42).unwrap();
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Ensure consumer gets the value
|
||||||
|
let value = sucker.get().unwrap();
|
||||||
|
assert_eq!(value, 42);
|
||||||
|
|
||||||
|
// Close consumer
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_closure_value() {
|
||||||
|
let (sucker, sourcer) = StdSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = std::thread::spawn(move || {
|
||||||
|
let counter = std::sync::Arc::new(std::sync::Mutex::new(0));
|
||||||
|
let counter_clone = std::sync::Arc::clone(&counter);
|
||||||
|
sourcer
|
||||||
|
.set(move || {
|
||||||
|
let mut count = counter_clone.lock().unwrap();
|
||||||
|
*count += 1;
|
||||||
|
*count
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Ensure consumer gets the value
|
||||||
|
let value1 = sucker.get().unwrap();
|
||||||
|
assert_eq!(value1, 1);
|
||||||
|
|
||||||
|
// Ensure consumer gets the next value
|
||||||
|
let value2 = sucker.get().unwrap();
|
||||||
|
assert_eq!(value2, 2);
|
||||||
|
|
||||||
|
// Close consumer
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_no_source_error() {
|
||||||
|
let (sucker, sourcer) = StdSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Consumer should get NoSource error
|
||||||
|
let result = sucker.get();
|
||||||
|
assert!(matches!(result, Err(Error::NoSource)));
|
||||||
|
|
||||||
|
// Close consumer
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_channel_closed_error() {
|
||||||
|
let (sucker, sourcer) = StdSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.set_static(42).unwrap();
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Close consumer
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
let result = sucker.get();
|
||||||
|
assert!(matches!(result, Err(Error::ChannelClosed)));
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_producer_disconnection_error() {
|
||||||
|
let (sucker, sourcer) = StdSuck::<i32>::pair();
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.set_static(42).unwrap();
|
||||||
|
// Simulate crash
|
||||||
|
panic!("Producer crashed!");
|
||||||
|
});
|
||||||
|
|
||||||
|
let result = sucker.get();
|
||||||
|
assert!(matches!(result, Err(Error::ProducerDisconnected)));
|
||||||
|
|
||||||
|
let _ = producer_handle.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_is_closed() {
|
||||||
|
let (sucker, sourcer) = StdSuck::<i32>::pair();
|
||||||
|
|
||||||
|
assert!(!sucker.is_closed());
|
||||||
|
|
||||||
|
// Start producer
|
||||||
|
let producer_handle = thread::spawn(move || {
|
||||||
|
sourcer.set_static(42).unwrap();
|
||||||
|
sourcer.run().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Get one value
|
||||||
|
let _ = sucker.get().unwrap();
|
||||||
|
assert!(!sucker.is_closed());
|
||||||
|
|
||||||
|
// Close and check
|
||||||
|
sucker.close().unwrap();
|
||||||
|
|
||||||
|
producer_handle.join().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
23
src/sync/traits.rs
Normal file
23
src/sync/traits.rs
Normal file
|
|
@ -0,0 +1,23 @@
|
||||||
|
pub use crate::error::Error as ChannelError;
|
||||||
|
|
||||||
|
pub trait ChannelSender<T> {
|
||||||
|
fn send(&self, msg: T) -> Result<(), ChannelError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait ChannelReceiver<T> {
|
||||||
|
fn recv(&self) -> Result<T, ChannelError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait ChannelType {
|
||||||
|
type Sender<T>: ChannelSender<T>;
|
||||||
|
type Receiver<T>: ChannelReceiver<T>;
|
||||||
|
|
||||||
|
fn create_request_channel() -> (
|
||||||
|
Self::Sender<crate::types::Request>,
|
||||||
|
Self::Receiver<crate::types::Request>,
|
||||||
|
);
|
||||||
|
fn create_response_channel<T>() -> (
|
||||||
|
Self::Sender<crate::types::Response<T>>,
|
||||||
|
Self::Receiver<crate::types::Response<T>>,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
@ -12,14 +12,14 @@ pub enum Response<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Represents the source of values: either static or dynamic
|
/// Represents the source of values: either static or dynamic
|
||||||
pub enum ValueSource<T> {
|
pub(crate) enum ValueSource<T> {
|
||||||
Static(T),
|
Static(T),
|
||||||
Dynamic(Box<dyn Fn() -> T + Send + Sync + 'static>),
|
Dynamic(Box<dyn Fn() -> T + Send + Sync + 'static>),
|
||||||
None,
|
None,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Internal channel state shared between producer and consumer
|
/// Internal channel state shared between producer and consumer
|
||||||
pub struct ChannelState<T> {
|
pub(crate) struct ChannelState<T> {
|
||||||
pub source: ValueSource<T>,
|
pub(crate) source: ValueSource<T>,
|
||||||
pub closed: bool,
|
pub(crate) closed: bool,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue