feat: add multiple channel providers

Each provider is enabled via a feature flag.
The currently implemented providers are:
- std::mpsc
- flume
- crossbeam_channel
This commit is contained in:
Callum Leslie 2025-09-04 09:37:55 +01:00
parent babcd7f4f7
commit f3fe41aa82
Signed by: cleslie
GPG key ID: D382C4AFEECEAA90
10 changed files with 762 additions and 191 deletions

View file

@ -1,61 +1,38 @@
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use crate::error::Error;
use crate::sync::traits::{ChannelReceiver, ChannelSender};
use crate::types::{ChannelState, Request, Response, ValueSource};
/// The consumer side of the channel that requests values
pub struct Sucker<T> {
request_tx: mpsc::Sender<Request>,
response_rx: mpsc::Receiver<Response<T>>,
closed: Mutex<bool>,
pub struct Sucker<T, ST, SR>
where
ST: ChannelSender<Request>,
SR: ChannelReceiver<Response<T>>,
{
pub(crate) request_tx: ST,
pub(crate) response_rx: SR,
pub(crate) closed: Mutex<bool>,
pub(crate) _phantom: std::marker::PhantomData<T>,
}
/// The producer side of the channel that provides values
pub struct Sourcer<T> {
request_rx: mpsc::Receiver<Request>,
response_tx: mpsc::Sender<Response<T>>,
state: Arc<Mutex<ChannelState<T>>>,
pub struct Sourcer<T, SR, ST>
where
SR: ChannelReceiver<Request>,
ST: ChannelSender<Response<T>>,
{
pub(crate) request_rx: SR,
pub(crate) response_tx: ST,
pub(crate) state: Arc<Mutex<ChannelState<T>>>,
pub(crate) _phantom: std::marker::PhantomData<T>,
}
/// Helper type for creating Sucker and Sourcer instances
pub struct SuckPair<T> {
_phantom: std::marker::PhantomData<T>,
}
impl<T> SuckPair<T> {
/// Create a new suck pair
pub fn pair() -> (Sucker<T>, Sourcer<T>)
where
T: Clone + Send + 'static,
{
let (request_tx, request_rx) = mpsc::channel();
let (response_tx, response_rx) = mpsc::channel();
let state = Arc::new(Mutex::new(ChannelState {
source: ValueSource::None,
closed: false,
}));
let sucker = Sucker {
request_tx,
response_rx,
closed: Mutex::new(false),
};
let sourcer = Sourcer {
request_rx,
response_tx,
state: Arc::clone(&state),
};
(sucker, sourcer)
}
}
impl<T> Sourcer<T>
impl<T, SR, ST> Sourcer<T, SR, ST>
where
T: Clone + Send + 'static,
SR: ChannelReceiver<Request>,
ST: ChannelSender<Response<T>>,
{
/// Set a fixed value
pub fn set_static(&self, value: T) -> Result<(), Error> {
@ -142,7 +119,11 @@ where
}
}
impl<T> Sucker<T> {
impl<T, ST, SR> Sucker<T, ST, SR>
where
ST: ChannelSender<Request>,
SR: ChannelReceiver<Response<T>>,
{
/// Get the current value from the producer
pub fn get(&self) -> Result<T, Error> {
// Check if locally marked as closed