hydro_lang/
keyed_singleton.rs

1use std::hash::Hash;
2
3use stageleft::{IntoQuotedMut, QuotedWithContext, q};
4
5use crate::cycle::{CycleCollection, CycleComplete, ForwardRefMarker};
6use crate::location::tick::NoAtomic;
7use crate::location::{LocationId, NoTick};
8use crate::manual_expr::ManualExpr;
9use crate::stream::ExactlyOnce;
10use crate::unsafety::NonDet;
11use crate::{
12    Atomic, Bounded, KeyedStream, Location, NoOrder, Optional, Singleton, Stream, Tick, TotalOrder,
13    Unbounded, nondet,
14};
15
16pub struct KeyedSingleton<K, V, Loc, Bound> {
17    pub(crate) underlying: Stream<(K, V), Loc, Bound, NoOrder, ExactlyOnce>,
18}
19
20impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound> Clone for KeyedSingleton<K, V, Loc, Bound> {
21    fn clone(&self) -> Self {
22        KeyedSingleton {
23            underlying: self.underlying.clone(),
24        }
25    }
26}
27
28impl<'a, K, V, L, B> CycleCollection<'a, ForwardRefMarker> for KeyedSingleton<K, V, L, B>
29where
30    L: Location<'a> + NoTick,
31{
32    type Location = L;
33
34    fn create_source(ident: syn::Ident, location: L) -> Self {
35        KeyedSingleton {
36            underlying: Stream::create_source(ident, location),
37        }
38    }
39}
40
41impl<'a, K, V, L, B> CycleComplete<'a, ForwardRefMarker> for KeyedSingleton<K, V, L, B>
42where
43    L: Location<'a> + NoTick,
44{
45    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
46        self.underlying.complete(ident, expected_location);
47    }
48}
49
50impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
51    pub fn entries(self) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce> {
52        self.underlying
53    }
54
55    pub fn values(self) -> Stream<V, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
56        self.underlying.map(q!(|(_, v)| v))
57    }
58
59    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
60        self.underlying.map(q!(|(k, _)| k))
61    }
62}
63
64impl<'a, K, V, L: Location<'a>, B> KeyedSingleton<K, V, L, B> {
65    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
66    where
67        F: Fn(V) -> U + 'a,
68    {
69        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
70        KeyedSingleton {
71            underlying: self.underlying.map(q!({
72                let orig = f;
73                move |(k, v)| (k, orig(v))
74            })),
75        }
76    }
77
78    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
79    where
80        F: Fn(&V) -> bool + 'a,
81    {
82        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
83        KeyedSingleton {
84            underlying: self.underlying.filter(q!({
85                let orig = f;
86                move |(_k, v)| orig(v)
87            })),
88        }
89    }
90
91    pub fn filter_map<F, U>(
92        self,
93        f: impl IntoQuotedMut<'a, F, L> + Copy,
94    ) -> KeyedSingleton<K, U, L, B>
95    where
96        F: Fn(V) -> Option<U> + 'a,
97    {
98        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
99        KeyedSingleton {
100            underlying: self.underlying.filter_map(q!({
101                let orig = f;
102                move |(k, v)| orig(v).map(|v| (k, v))
103            })),
104        }
105    }
106
107    pub fn key_count(self) -> Singleton<usize, L, B> {
108        self.underlying.count()
109    }
110
111    /// An operator which allows you to "name" a `HydroNode`.
112    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
113    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
114        {
115            let mut node = self.underlying.ir_node.borrow_mut();
116            let metadata = node.metadata_mut();
117            metadata.tag = Some(name.to_string());
118        }
119        self
120    }
121}
122
123impl<'a, K: Hash + Eq, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
124    /// Gets the value associated with a specific key from the keyed singleton.
125    ///
126    /// # Example
127    /// ```rust
128    /// # use hydro_lang::*;
129    /// # use futures::StreamExt;
130    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
131    /// let tick = process.tick();
132    /// let keyed_data = process
133    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
134    ///     .into_keyed()
135    ///     .batch(&tick, nondet!(/** test */))
136    ///     .fold(q!(|| 0), q!(|acc, x| *acc = x));
137    /// let key = tick.singleton(q!(1));
138    /// keyed_data.get(key).all_ticks()
139    /// # }, |mut stream| async move {
140    /// // 2
141    /// # assert_eq!(stream.next().await.unwrap(), 2);
142    /// # }));
143    /// ```
144    pub fn get(self, key: Singleton<K, Tick<L>, Bounded>) -> Optional<V, Tick<L>, Bounded> {
145        self.entries()
146            .join(key.into_stream().map(q!(|k| (k, ()))))
147            .map(q!(|(_, (v, _))| v))
148            .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
149            .first()
150    }
151
152    /// Given a keyed stream of lookup requests, where the key is the lookup and the value
153    /// is some additional metadata, emits a keyed stream of lookup results where the key
154    /// is the same as before, but the value is a tuple of the lookup result and the metadata
155    /// of the request.
156    ///
157    /// # Example
158    /// ```rust
159    /// # use hydro_lang::*;
160    /// # use futures::StreamExt;
161    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
162    /// let tick = process.tick();
163    /// let keyed_data = process
164    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
165    ///     .into_keyed()
166    ///     .batch(&tick, nondet!(/** test */))
167    ///     .fold(q!(|| 0), q!(|acc, x| *acc = x));
168    /// let other_data = process
169    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
170    ///     .into_keyed()
171    ///     .batch(&tick, nondet!(/** test */));
172    /// keyed_data.get_many(other_data).entries().all_ticks()
173    /// # }, |mut stream| async move {
174    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
175    /// # let mut results = vec![];
176    /// # for _ in 0..3 {
177    /// #     results.push(stream.next().await.unwrap());
178    /// # }
179    /// # results.sort();
180    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
181    /// # }));
182    /// ```
183    pub fn get_many<O2, R2, V2>(
184        self,
185        with: KeyedStream<K, V2, Tick<L>, Bounded, O2, R2>,
186    ) -> KeyedStream<K, (V, V2), Tick<L>, Bounded, NoOrder, R2> {
187        self.entries()
188            .weaker_retries()
189            .join(with.entries())
190            .into_keyed()
191    }
192
193    pub fn latest(self) -> KeyedSingleton<K, V, L, Unbounded> {
194        KeyedSingleton {
195            underlying: Stream::new(
196                self.underlying.location.outer().clone(),
197                // no need to persist due to top-level replay
198                self.underlying.ir_node.into_inner(),
199            ),
200        }
201    }
202}
203
204impl<'a, K, V, L, B> KeyedSingleton<K, V, L, B>
205where
206    L: Location<'a> + NoTick + NoAtomic,
207{
208    pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
209        KeyedSingleton {
210            underlying: self.underlying.atomic(tick),
211        }
212    }
213
214    /// Given a tick, returns a keyed singleton with a entries consisting of keys with
215    /// snapshots of the value singleton.
216    ///
217    /// # Non-Determinism
218    /// Because this picks a snapshot of each singleton whose value is continuously changing,
219    /// the output singleton has a non-deterministic value since each snapshot can be at an
220    /// arbitrary point in time.
221    pub fn snapshot(
222        self,
223        tick: &Tick<L>,
224        nondet: NonDet,
225    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
226        self.atomic(tick).snapshot(nondet)
227    }
228}
229
230impl<'a, K, V, L, B> KeyedSingleton<K, V, Atomic<L>, B>
231where
232    L: Location<'a> + NoTick + NoAtomic,
233{
234    /// Returns a keyed singleton with a entries consisting of keys with snapshots of the value
235    /// singleton being atomically processed.
236    ///
237    /// # Non-Determinism
238    /// Because this picks a snapshot of each singleton whose value is continuously changing,
239    /// each output singleton has a non-deterministic value since each snapshot can be at an
240    /// arbitrary point in time.
241    pub fn snapshot(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
242        KeyedSingleton {
243            underlying: Stream::new(
244                self.underlying.location.tick,
245                // no need to unpersist due to top-level replay
246                self.underlying.ir_node.into_inner(),
247            ),
248        }
249    }
250}