hydro_lang/
keyed_stream.rs

1use std::hash::Hash;
2use std::marker::PhantomData;
3
4use stageleft::{IntoQuotedMut, QuotedWithContext, q};
5
6use crate::cycle::{CycleCollection, CycleComplete, ForwardRefMarker};
7use crate::ir::HydroNode;
8use crate::keyed_singleton::KeyedSingleton;
9use crate::location::tick::NoAtomic;
10use crate::location::{LocationId, NoTick, check_matching_location};
11use crate::manual_expr::ManualExpr;
12use crate::stream::{ExactlyOnce, MinRetries};
13use crate::unsafety::NonDet;
14use crate::*;
15
16/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`,
17/// where the order of keys is non-deterministic but the order *within* each group may
18/// be deterministic.
19///
20/// Type Parameters:
21/// - `K`: the type of the key for each group
22/// - `V`: the type of the elements inside each group
23/// - `Loc`: the [`Location`] where the keyed stream is materialized
24/// - `Order`: tracks whether the elements within each group have deterministic order
25///   ([`TotalOrder`]) or not ([`NoOrder`])
26/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
27///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::stream::AtLeastOnce`])
28pub struct KeyedStream<K, V, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> {
29    pub(crate) underlying: Stream<(K, V), Loc, Bound, NoOrder, Retries>,
30    pub(crate) _phantom_order: PhantomData<Order>,
31}
32
33impl<'a, K, V, L, B, R> From<KeyedStream<K, V, L, B, TotalOrder, R>>
34    for KeyedStream<K, V, L, B, NoOrder, R>
35where
36    L: Location<'a>,
37{
38    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
39        KeyedStream {
40            underlying: stream.underlying,
41            _phantom_order: Default::default(),
42        }
43    }
44}
45
46impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound, Order, Retries> Clone
47    for KeyedStream<K, V, Loc, Bound, Order, Retries>
48{
49    fn clone(&self) -> Self {
50        KeyedStream {
51            underlying: self.underlying.clone(),
52            _phantom_order: PhantomData,
53        }
54    }
55}
56
57impl<'a, K, V, L, B, O, R> CycleCollection<'a, ForwardRefMarker> for KeyedStream<K, V, L, B, O, R>
58where
59    L: Location<'a> + NoTick,
60{
61    type Location = L;
62
63    fn create_source(ident: syn::Ident, location: L) -> Self {
64        Stream::create_source(ident, location).into_keyed()
65    }
66}
67
68impl<'a, K, V, L, B, O, R> CycleComplete<'a, ForwardRefMarker> for KeyedStream<K, V, L, B, O, R>
69where
70    L: Location<'a> + NoTick,
71{
72    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
73        self.underlying.complete(ident, expected_location);
74    }
75}
76
77impl<'a, K, V, L: Location<'a>, B, O, R> KeyedStream<K, V, L, B, O, R> {
78    /// Explicitly "casts" the keyed stream to a type with a different ordering
79    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
80    /// by the type-system.
81    ///
82    /// # Non-Determinism
83    /// This function is used as an escape hatch, and any mistakes in the
84    /// provided ordering guarantee will propagate into the guarantees
85    /// for the rest of the program.
86    pub fn assume_ordering<O2>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
87        KeyedStream {
88            underlying: self.underlying,
89            _phantom_order: PhantomData,
90        }
91    }
92
93    /// Explicitly "casts" the keyed stream to a type with a different retries
94    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
95    /// be proven by the type-system.
96    ///
97    /// # Non-Determinism
98    /// This function is used as an escape hatch, and any mistakes in the
99    /// provided retries guarantee will propagate into the guarantees
100    /// for the rest of the program.
101    pub fn assume_retries<R2>(self, nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
102        KeyedStream {
103            underlying: self.underlying.assume_retries::<R2>(nondet),
104            _phantom_order: PhantomData,
105        }
106    }
107
108    /// Flattens the keyed stream into a single stream of key-value pairs, with non-deterministic
109    /// element ordering.
110    ///
111    /// # Example
112    /// ```rust
113    /// # use hydro_lang::*;
114    /// # use futures::StreamExt;
115    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
116    /// process
117    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
118    ///     .into_keyed()
119    ///     .entries()
120    /// # }, |mut stream| async move {
121    /// // (1, 2), (1, 3), (2, 4) in any order
122    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
123    /// #     assert_eq!(stream.next().await.unwrap(), w);
124    /// # }
125    /// # }));
126    /// ```
127    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
128        self.underlying
129    }
130
131    /// Flattens the keyed stream into a single stream of only the values, with non-deterministic
132    /// element ordering.
133    ///
134    /// # Example
135    /// ```rust
136    /// # use hydro_lang::*;
137    /// # use futures::StreamExt;
138    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
139    /// process
140    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
141    ///     .into_keyed()
142    ///     .values()
143    /// # }, |mut stream| async move {
144    /// // 2, 3, 4 in any order
145    /// # for w in vec![2, 3, 4] {
146    /// #     assert_eq!(stream.next().await.unwrap(), w);
147    /// # }
148    /// # }));
149    /// ```
150    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
151        self.underlying.map(q!(|(_, v)| v))
152    }
153
154    /// Transforms each value by invoking `f` on each element, with keys staying the same
155    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
156    ///
157    /// If you do not want to modify the stream and instead only want to view
158    /// each item use [`KeyedStream::inspect`] instead.
159    ///
160    /// # Example
161    /// ```rust
162    /// # use hydro_lang::*;
163    /// # use futures::StreamExt;
164    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
165    /// process
166    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
167    ///     .into_keyed()
168    ///     .map(q!(|v| v + 1))
169    /// #   .entries()
170    /// # }, |mut stream| async move {
171    /// // { 1: [3, 4], 2: [5] }
172    /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
173    /// #     assert_eq!(stream.next().await.unwrap(), w);
174    /// # }
175    /// # }));
176    /// ```
177    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
178    where
179        F: Fn(V) -> U + 'a,
180    {
181        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
182        KeyedStream {
183            underlying: self.underlying.map(q!({
184                let orig = f;
185                move |(k, v)| (k, orig(v))
186            })),
187            _phantom_order: Default::default(),
188        }
189    }
190
191    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
192    /// re-grouped even they are tuples; instead they will be grouped under the original key.
193    ///
194    /// If you do not want to modify the stream and instead only want to view
195    /// each item use [`KeyedStream::inspect_with_key`] instead.
196    ///
197    /// # Example
198    /// ```rust
199    /// # use hydro_lang::*;
200    /// # use futures::StreamExt;
201    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
202    /// process
203    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
204    ///     .into_keyed()
205    ///     .map_with_key(q!(|(k, v)| k + v))
206    /// #   .entries()
207    /// # }, |mut stream| async move {
208    /// // { 1: [3, 4], 2: [6] }
209    /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
210    /// #     assert_eq!(stream.next().await.unwrap(), w);
211    /// # }
212    /// # }));
213    /// ```
214    pub fn map_with_key<U, F>(
215        self,
216        f: impl IntoQuotedMut<'a, F, L> + Copy,
217    ) -> KeyedStream<K, U, L, B, O, R>
218    where
219        F: Fn((K, V)) -> U + 'a,
220        K: Clone,
221    {
222        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
223        KeyedStream {
224            underlying: self.underlying.map(q!({
225                let orig = f;
226                move |(k, v)| {
227                    let out = orig((k.clone(), v));
228                    (k, out)
229                }
230            })),
231            _phantom_order: Default::default(),
232        }
233    }
234
235    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
236    /// `f`, preserving the order of the elements within the group.
237    ///
238    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
239    /// not modify or take ownership of the values. If you need to modify the values while filtering
240    /// use [`KeyedStream::filter_map`] instead.
241    ///
242    /// # Example
243    /// ```rust
244    /// # use hydro_lang::*;
245    /// # use futures::StreamExt;
246    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
247    /// process
248    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
249    ///     .into_keyed()
250    ///     .filter(q!(|&x| x > 2))
251    /// #   .entries()
252    /// # }, |mut stream| async move {
253    /// // { 1: [3], 2: [4] }
254    /// # for w in vec![(1, 3), (2, 4)] {
255    /// #     assert_eq!(stream.next().await.unwrap(), w);
256    /// # }
257    /// # }));
258    /// ```
259    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
260    where
261        F: Fn(&V) -> bool + 'a,
262    {
263        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
264        KeyedStream {
265            underlying: self.underlying.filter(q!({
266                let orig = f;
267                move |(_k, v)| orig(v)
268            })),
269            _phantom_order: Default::default(),
270        }
271    }
272
273    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
274    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
275    ///
276    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
277    /// not modify or take ownership of the values. If you need to modify the values while filtering
278    /// use [`KeyedStream::filter_map_with_key`] instead.
279    ///
280    /// # Example
281    /// ```rust
282    /// # use hydro_lang::*;
283    /// # use futures::StreamExt;
284    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
285    /// process
286    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
287    ///     .into_keyed()
288    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
289    /// #   .entries()
290    /// # }, |mut stream| async move {
291    /// // { 1: [3], 2: [4] }
292    /// # for w in vec![(1, 3), (2, 4)] {
293    /// #     assert_eq!(stream.next().await.unwrap(), w);
294    /// # }
295    /// # }));
296    /// ```
297    pub fn filter_with_key<F>(
298        self,
299        f: impl IntoQuotedMut<'a, F, L> + Copy,
300    ) -> KeyedStream<K, V, L, B, O, R>
301    where
302        F: Fn(&(K, V)) -> bool + 'a,
303    {
304        KeyedStream {
305            underlying: self.underlying.filter(f),
306            _phantom_order: Default::default(),
307        }
308    }
309
310    /// An operator that both filters and maps each value, with keys staying the same.
311    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
312    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
313    ///
314    /// # Example
315    /// ```rust
316    /// # use hydro_lang::*;
317    /// # use futures::StreamExt;
318    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
319    /// process
320    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
321    ///     .into_keyed()
322    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
323    /// #   .entries()
324    /// # }, |mut stream| async move {
325    /// // { 1: [2], 2: [4] }
326    /// # for w in vec![(1, 2), (2, 4)] {
327    /// #     assert_eq!(stream.next().await.unwrap(), w);
328    /// # }
329    /// # }));
330    /// ```
331    pub fn filter_map<U, F>(
332        self,
333        f: impl IntoQuotedMut<'a, F, L> + Copy,
334    ) -> KeyedStream<K, U, L, B, O, R>
335    where
336        F: Fn(V) -> Option<U> + 'a,
337    {
338        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
339        KeyedStream {
340            underlying: self.underlying.filter_map(q!({
341                let orig = f;
342                move |(k, v)| orig(v).map(|o| (k, o))
343            })),
344            _phantom_order: Default::default(),
345        }
346    }
347
348    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
349    /// re-grouped even they are tuples; instead they will be grouped under the original key.
350    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
351    ///
352    /// # Example
353    /// ```rust
354    /// # use hydro_lang::*;
355    /// # use futures::StreamExt;
356    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
357    /// process
358    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
359    ///     .into_keyed()
360    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
361    /// #   .entries()
362    /// # }, |mut stream| async move {
363    /// // { 2: [2] }
364    /// # for w in vec![(2, 2)] {
365    /// #     assert_eq!(stream.next().await.unwrap(), w);
366    /// # }
367    /// # }));
368    /// ```
369    pub fn filter_map_with_key<U, F>(
370        self,
371        f: impl IntoQuotedMut<'a, F, L> + Copy,
372    ) -> KeyedStream<K, U, L, B, O, R>
373    where
374        F: Fn((K, V)) -> Option<U> + 'a,
375        K: Clone,
376    {
377        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
378        KeyedStream {
379            underlying: self.underlying.filter_map(q!({
380                let orig = f;
381                move |(k, v)| {
382                    let out = orig((k.clone(), v));
383                    out.map(|o| (k, o))
384                }
385            })),
386            _phantom_order: Default::default(),
387        }
388    }
389
390    /// An operator which allows you to "inspect" each element of a stream without
391    /// modifying it. The closure `f` is called on a reference to each value. This is
392    /// mainly useful for debugging, and should not be used to generate side-effects.
393    ///
394    /// # Example
395    /// ```rust
396    /// # use hydro_lang::*;
397    /// # use futures::StreamExt;
398    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
399    /// process
400    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
401    ///     .into_keyed()
402    ///     .inspect(q!(|v| println!("{}", v)))
403    /// #   .entries()
404    /// # }, |mut stream| async move {
405    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
406    /// #     assert_eq!(stream.next().await.unwrap(), w);
407    /// # }
408    /// # }));
409    /// ```
410    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
411    where
412        F: Fn(&V) + 'a,
413    {
414        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
415        KeyedStream {
416            underlying: self.underlying.inspect(q!({
417                let orig = f;
418                move |(_k, v)| orig(v)
419            })),
420            _phantom_order: Default::default(),
421        }
422    }
423
424    /// An operator which allows you to "inspect" each element of a stream without
425    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
426    /// mainly useful for debugging, and should not be used to generate side-effects.
427    ///
428    /// # Example
429    /// ```rust
430    /// # use hydro_lang::*;
431    /// # use futures::StreamExt;
432    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
433    /// process
434    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
435    ///     .into_keyed()
436    ///     .inspect(q!(|v| println!("{}", v)))
437    /// #   .entries()
438    /// # }, |mut stream| async move {
439    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
440    /// #     assert_eq!(stream.next().await.unwrap(), w);
441    /// # }
442    /// # }));
443    /// ```
444    pub fn inspect_with_key<F>(
445        self,
446        f: impl IntoQuotedMut<'a, F, L>,
447    ) -> KeyedStream<K, V, L, B, O, R>
448    where
449        F: Fn(&(K, V)) + 'a,
450    {
451        KeyedStream {
452            underlying: self.underlying.inspect(f),
453            _phantom_order: Default::default(),
454        }
455    }
456
457    /// An operator which allows you to "name" a `HydroNode`.
458    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
459    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
460        {
461            let mut node = self.underlying.ir_node.borrow_mut();
462            let metadata = node.metadata_mut();
463            metadata.tag = Some(name.to_string());
464        }
465        self
466    }
467}
468
469impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O, R> KeyedStream<K, V, L, Unbounded, O, R> {
470    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
471    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
472    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
473    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
474    ///
475    /// Currently, both input streams must be [`Unbounded`].
476    ///
477    /// # Example
478    /// ```rust
479    /// # use hydro_lang::*;
480    /// # use futures::StreamExt;
481    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
482    /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
483    /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
484    /// numbers1.interleave(numbers2)
485    /// #   .entries()
486    /// # }, |mut stream| async move {
487    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
488    /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
489    /// #     assert_eq!(stream.next().await.unwrap(), w);
490    /// # }
491    /// # }));
492    /// ```
493    pub fn interleave<O2, R2: MinRetries<R>>(
494        self,
495        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
496    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, R::Min>
497    where
498        R: MinRetries<R2, Min = R2::Min>,
499    {
500        self.entries().interleave(other.entries()).into_keyed()
501    }
502}
503
504impl<'a, K, V, L, B> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
505where
506    K: Eq + Hash,
507    L: Location<'a>,
508{
509    /// A special case of [`Stream::scan`] for keyd streams. For each key group the values are transformed via the `f` combinator.
510    ///
511    /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
512    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
513    /// early by returning `None`.
514    ///
515    /// The function takes a mutable reference to the accumulator and the current element, and returns
516    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
517    /// If the function returns `None`, the stream is terminated and no more elements are processed.
518    ///
519    /// # Example
520    /// ```rust
521    /// # use hydro_lang::*;
522    /// # use futures::StreamExt;
523    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
524    /// process
525    ///     .source_iter(q!(vec![(0, 1), (0, 2), (1, 3), (1, 4)]))
526    ///     .into_keyed()
527    ///     .scan(
528    ///         q!(|| 0),
529    ///         q!(|acc, x| {
530    ///             *acc += x;
531    ///             Some(*acc)
532    ///         }),
533    ///     )
534    /// #   .entries()
535    /// # }, |mut stream| async move {
536    /// // Output: { 0: [1, 3], 1: [3, 7] }
537    /// # for w in vec![(0, 1), (0, 3), (1, 3), (1, 7)] {
538    /// #     assert_eq!(stream.next().await.unwrap(), w);
539    /// # }
540    /// # }));
541    /// ```
542    pub fn scan<A, U, I, F>(
543        self,
544        init: impl IntoQuotedMut<'a, I, L> + Copy,
545        f: impl IntoQuotedMut<'a, F, L> + Copy,
546    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
547    where
548        K: Clone,
549        I: Fn() -> A + 'a,
550        F: Fn(&mut A, V) -> Option<U> + 'a,
551    {
552        KeyedStream {
553            underlying: self
554                .underlying
555                .assume_ordering::<TotalOrder>(
556                    nondet!(/** keyed scan does not rely on order of keys */),
557                )
558                .scan_keyed(init, f)
559                .into(),
560            _phantom_order: Default::default(),
561        }
562    }
563
564    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed in-order across the values
565    /// in each group. But the aggregation function returns a boolean, which when true indicates that the aggregated
566    /// result is complete and can be released to downstream computation. Unlike [`Stream::fold_keyed`], this means that
567    /// even if the input stream is [`crate::Unbounded`], the outputs of the fold can be processed like normal stream elements.
568    ///
569    /// # Example
570    /// ```rust
571    /// # use hydro_lang::*;
572    /// # use futures::StreamExt;
573    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
574    /// process
575    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
576    ///     .into_keyed()
577    ///     .fold_early_stop(
578    ///         q!(|| 0),
579    ///         q!(|acc, x| {
580    ///             *acc += x;
581    ///             x % 2 == 0
582    ///         }),
583    ///     )
584    /// #   .entries()
585    /// # }, |mut stream| async move {
586    /// // Output: { 0: 2, 1: 9 }
587    /// # for w in vec![(0, 2), (1, 9)] {
588    /// #     assert_eq!(stream.next().await.unwrap(), w);
589    /// # }
590    /// # }));
591    /// ```
592    pub fn fold_early_stop<A, I, F>(
593        self,
594        init: impl IntoQuotedMut<'a, I, L> + Copy,
595        f: impl IntoQuotedMut<'a, F, L> + Copy,
596    ) -> KeyedStream<K, A, L, B, TotalOrder, ExactlyOnce>
597    where
598        K: Clone,
599        I: Fn() -> A + 'a,
600        F: Fn(&mut A, V) -> bool + 'a,
601    {
602        KeyedStream {
603            underlying: {
604                self.underlying
605                    .assume_ordering::<TotalOrder>(
606                        nondet!(/** keyed scan does not rely on order of keys */),
607                    )
608                    .fold_keyed_early_stop(init, f)
609                    .into()
610            },
611            _phantom_order: Default::default(),
612        }
613    }
614
615    /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
616    ///
617    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
618    /// to depend on the order of elements in the group.
619    ///
620    /// If the input and output value types are the same and do not require initialization then use
621    /// [`KeyedStream::reduce`].
622    ///
623    /// # Example
624    /// ```rust
625    /// # use hydro_lang::*;
626    /// # use futures::StreamExt;
627    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
628    /// let tick = process.tick();
629    /// let numbers = process
630    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
631    ///     .into_keyed();
632    /// let batch = numbers.batch(&tick, nondet!(/** test */));
633    /// batch
634    ///     .fold(q!(|| 0), q!(|acc, x| *acc += x))
635    ///     .entries()
636    ///     .all_ticks()
637    /// # }, |mut stream| async move {
638    /// // (1, 5), (2, 7)
639    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
640    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
641    /// # }));
642    /// ```
643    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
644        self,
645        init: impl IntoQuotedMut<'a, I, L>,
646        comb: impl IntoQuotedMut<'a, F, L>,
647    ) -> KeyedSingleton<K, A, L, B> {
648        let init = init.splice_fn0_ctx(&self.underlying.location).into();
649        let comb = comb
650            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
651            .into();
652
653        let out_ir = HydroNode::FoldKeyed {
654            init,
655            acc: comb,
656            input: Box::new(self.underlying.ir_node.into_inner()),
657            metadata: self.underlying.location.new_node_metadata::<(K, A)>(),
658        };
659
660        KeyedSingleton {
661            underlying: Stream::new(self.underlying.location, out_ir),
662        }
663    }
664
665    /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
666    ///
667    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
668    /// to depend on the order of elements in the stream.
669    ///
670    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
671    ///
672    /// # Example
673    /// ```rust
674    /// # use hydro_lang::*;
675    /// # use futures::StreamExt;
676    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
677    /// let tick = process.tick();
678    /// let numbers = process
679    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
680    ///     .into_keyed();
681    /// let batch = numbers.batch(&tick, nondet!(/** test */));
682    /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
683    /// # }, |mut stream| async move {
684    /// // (1, 5), (2, 7)
685    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
686    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
687    /// # }));
688    /// ```
689    pub fn reduce<F: Fn(&mut V, V) + 'a>(
690        self,
691        comb: impl IntoQuotedMut<'a, F, L>,
692    ) -> KeyedSingleton<K, V, L, B> {
693        let f = comb
694            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
695            .into();
696
697        let out_ir = HydroNode::ReduceKeyed {
698            f,
699            input: Box::new(self.underlying.ir_node.into_inner()),
700            metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
701        };
702
703        KeyedSingleton {
704            underlying: Stream::new(self.underlying.location, out_ir),
705        }
706    }
707
708    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
709    ///
710    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
711    /// to depend on the order of elements in the stream.
712    ///
713    /// # Example
714    /// ```rust
715    /// # use hydro_lang::*;
716    /// # use futures::StreamExt;
717    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
718    /// let tick = process.tick();
719    /// let watermark = tick.singleton(q!(1));
720    /// let numbers = process
721    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
722    ///     .into_keyed();
723    /// let batch = numbers.batch(&tick, nondet!(/** test */));
724    /// batch
725    ///     .reduce_watermark(watermark, q!(|acc, x| *acc += x))
726    ///     .entries()
727    ///     .all_ticks()
728    /// # }, |mut stream| async move {
729    /// // (2, 204)
730    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
731    /// # }));
732    /// ```
733    pub fn reduce_watermark<O, F>(
734        self,
735        other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
736        comb: impl IntoQuotedMut<'a, F, L>,
737    ) -> KeyedSingleton<K, V, L, B>
738    where
739        O: Clone,
740        F: Fn(&mut V, V) + 'a,
741    {
742        let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
743        check_matching_location(&self.underlying.location.root(), other.location.outer());
744        let f = comb
745            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
746            .into();
747
748        let out_ir = Stream::new(
749            self.underlying.location.clone(),
750            HydroNode::ReduceKeyedWatermark {
751                f,
752                input: Box::new(self.underlying.ir_node.into_inner()),
753                watermark: Box::new(other.ir_node.into_inner()),
754                metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
755            },
756        );
757
758        KeyedSingleton { underlying: out_ir }
759    }
760}
761
762impl<'a, K, V, L, B, O> KeyedStream<K, V, L, B, O, ExactlyOnce>
763where
764    K: Eq + Hash,
765    L: Location<'a>,
766{
767    /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
768    ///
769    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
770    ///
771    /// If the input and output value types are the same and do not require initialization then use
772    /// [`KeyedStream::reduce_commutative`].
773    ///
774    /// # Example
775    /// ```rust
776    /// # use hydro_lang::*;
777    /// # use futures::StreamExt;
778    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
779    /// let tick = process.tick();
780    /// let numbers = process
781    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
782    ///     .into_keyed();
783    /// let batch = numbers.batch(&tick, nondet!(/** test */));
784    /// batch
785    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
786    ///     .entries()
787    ///     .all_ticks()
788    /// # }, |mut stream| async move {
789    /// // (1, 5), (2, 7)
790    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
791    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
792    /// # }));
793    /// ```
794    pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
795        self,
796        init: impl IntoQuotedMut<'a, I, L>,
797        comb: impl IntoQuotedMut<'a, F, L>,
798    ) -> KeyedSingleton<K, A, L, B> {
799        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
800            .fold(init, comb)
801    }
802
803    /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
804    ///
805    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
806    ///
807    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
808    ///
809    /// # Example
810    /// ```rust
811    /// # use hydro_lang::*;
812    /// # use futures::StreamExt;
813    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
814    /// let tick = process.tick();
815    /// let numbers = process
816    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
817    ///     .into_keyed();
818    /// let batch = numbers.batch(&tick, nondet!(/** test */));
819    /// batch
820    ///     .reduce_commutative(q!(|acc, x| *acc += x))
821    ///     .entries()
822    ///     .all_ticks()
823    /// # }, |mut stream| async move {
824    /// // (1, 5), (2, 7)
825    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
826    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
827    /// # }));
828    /// ```
829    pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
830        self,
831        comb: impl IntoQuotedMut<'a, F, L>,
832    ) -> KeyedSingleton<K, V, L, B> {
833        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
834            .reduce(comb)
835    }
836
837    /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
838    ///
839    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
840    ///
841    /// # Example
842    /// ```rust
843    /// # use hydro_lang::*;
844    /// # use futures::StreamExt;
845    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
846    /// let tick = process.tick();
847    /// let watermark = tick.singleton(q!(1));
848    /// let numbers = process
849    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
850    ///     .into_keyed();
851    /// let batch = numbers.batch(&tick, nondet!(/** test */));
852    /// batch
853    ///     .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
854    ///     .entries()
855    ///     .all_ticks()
856    /// # }, |mut stream| async move {
857    /// // (2, 204)
858    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
859    /// # }));
860    /// ```
861    pub fn reduce_watermark_commutative<O2, F>(
862        self,
863        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
864        comb: impl IntoQuotedMut<'a, F, L>,
865    ) -> KeyedSingleton<K, V, L, B>
866    where
867        O2: Clone,
868        F: Fn(&mut V, V) + 'a,
869    {
870        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
871            .reduce_watermark(other, comb)
872    }
873}
874
875impl<'a, K, V, L, B, R> KeyedStream<K, V, L, B, TotalOrder, R>
876where
877    K: Eq + Hash,
878    L: Location<'a>,
879{
880    /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
881    ///
882    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
883    ///
884    /// If the input and output value types are the same and do not require initialization then use
885    /// [`KeyedStream::reduce_idempotent`].
886    ///
887    /// # Example
888    /// ```rust
889    /// # use hydro_lang::*;
890    /// # use futures::StreamExt;
891    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
892    /// let tick = process.tick();
893    /// let numbers = process
894    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
895    ///     .into_keyed();
896    /// let batch = numbers.batch(&tick, nondet!(/** test */));
897    /// batch
898    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
899    ///     .entries()
900    ///     .all_ticks()
901    /// # }, |mut stream| async move {
902    /// // (1, false), (2, true)
903    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
904    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
905    /// # }));
906    /// ```
907    pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
908        self,
909        init: impl IntoQuotedMut<'a, I, L>,
910        comb: impl IntoQuotedMut<'a, F, L>,
911    ) -> KeyedSingleton<K, A, L, B> {
912        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
913            .fold(init, comb)
914    }
915
916    /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
917    ///
918    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
919    ///
920    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
921    ///
922    /// # Example
923    /// ```rust
924    /// # use hydro_lang::*;
925    /// # use futures::StreamExt;
926    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
927    /// let tick = process.tick();
928    /// let numbers = process
929    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
930    ///     .into_keyed();
931    /// let batch = numbers.batch(&tick, nondet!(/** test */));
932    /// batch
933    ///     .reduce_idempotent(q!(|acc, x| *acc |= x))
934    ///     .entries()
935    ///     .all_ticks()
936    /// # }, |mut stream| async move {
937    /// // (1, false), (2, true)
938    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
939    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
940    /// # }));
941    /// ```
942    pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
943        self,
944        comb: impl IntoQuotedMut<'a, F, L>,
945    ) -> KeyedSingleton<K, V, L, B> {
946        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
947            .reduce(comb)
948    }
949
950    /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
951    ///
952    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
953    ///
954    /// # Example
955    /// ```rust
956    /// # use hydro_lang::*;
957    /// # use futures::StreamExt;
958    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
959    /// let tick = process.tick();
960    /// let watermark = tick.singleton(q!(1));
961    /// let numbers = process
962    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
963    ///     .into_keyed();
964    /// let batch = numbers.batch(&tick, nondet!(/** test */));
965    /// batch
966    ///     .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
967    ///     .entries()
968    ///     .all_ticks()
969    /// # }, |mut stream| async move {
970    /// // (2, true)
971    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
972    /// # }));
973    /// ```
974    pub fn reduce_watermark_idempotent<O2, F>(
975        self,
976        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
977        comb: impl IntoQuotedMut<'a, F, L>,
978    ) -> KeyedSingleton<K, V, L, B>
979    where
980        O2: Clone,
981        F: Fn(&mut V, V) + 'a,
982    {
983        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
984            .reduce_watermark(other, comb)
985    }
986}
987
988impl<'a, K, V, L, B, O, R> KeyedStream<K, V, L, B, O, R>
989where
990    K: Eq + Hash,
991    L: Location<'a>,
992{
993    /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
994    ///
995    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
996    /// as there may be non-deterministic duplicates.
997    ///
998    /// If the input and output value types are the same and do not require initialization then use
999    /// [`KeyedStream::reduce_commutative_idempotent`].
1000    ///
1001    /// # Example
1002    /// ```rust
1003    /// # use hydro_lang::*;
1004    /// # use futures::StreamExt;
1005    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1006    /// let tick = process.tick();
1007    /// let numbers = process
1008    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1009    ///     .into_keyed();
1010    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1011    /// batch
1012    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1013    ///     .entries()
1014    ///     .all_ticks()
1015    /// # }, |mut stream| async move {
1016    /// // (1, false), (2, true)
1017    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1018    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1019    /// # }));
1020    /// ```
1021    pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1022        self,
1023        init: impl IntoQuotedMut<'a, I, L>,
1024        comb: impl IntoQuotedMut<'a, F, L>,
1025    ) -> KeyedSingleton<K, A, L, B> {
1026        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1027            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1028            .fold(init, comb)
1029    }
1030
1031    /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1032    ///
1033    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1034    /// as there may be non-deterministic duplicates.
1035    ///
1036    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1037    ///
1038    /// # Example
1039    /// ```rust
1040    /// # use hydro_lang::*;
1041    /// # use futures::StreamExt;
1042    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1043    /// let tick = process.tick();
1044    /// let numbers = process
1045    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1046    ///     .into_keyed();
1047    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1048    /// batch
1049    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1050    ///     .entries()
1051    ///     .all_ticks()
1052    /// # }, |mut stream| async move {
1053    /// // (1, false), (2, true)
1054    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1055    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1056    /// # }));
1057    /// ```
1058    pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1059        self,
1060        comb: impl IntoQuotedMut<'a, F, L>,
1061    ) -> KeyedSingleton<K, V, L, B> {
1062        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1063            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1064            .reduce(comb)
1065    }
1066
1067    /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1068    ///
1069    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1070    /// as there may be non-deterministic duplicates.
1071    ///
1072    /// # Example
1073    /// ```rust
1074    /// # use hydro_lang::*;
1075    /// # use futures::StreamExt;
1076    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1077    /// let tick = process.tick();
1078    /// let watermark = tick.singleton(q!(1));
1079    /// let numbers = process
1080    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1081    ///     .into_keyed();
1082    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1083    /// batch
1084    ///     .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1085    ///     .entries()
1086    ///     .all_ticks()
1087    /// # }, |mut stream| async move {
1088    /// // (2, true)
1089    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1090    /// # }));
1091    /// ```
1092    pub fn reduce_watermark_commutative_idempotent<O2, F>(
1093        self,
1094        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1095        comb: impl IntoQuotedMut<'a, F, L>,
1096    ) -> KeyedSingleton<K, V, L, B>
1097    where
1098        O2: Clone,
1099        F: Fn(&mut V, V) + 'a,
1100    {
1101        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1102            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1103            .reduce_watermark(other, comb)
1104    }
1105
1106    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1107    /// whose keys are not in the bounded stream.
1108    ///
1109    /// # Example
1110    /// ```rust
1111    /// # use hydro_lang::*;
1112    /// # use futures::StreamExt;
1113    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1114    /// let tick = process.tick();
1115    /// let keyed_stream = process
1116    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1117    ///     .batch(&tick, nondet!(/** test */))
1118    ///     .into_keyed();
1119    /// let keys_to_remove = process
1120    ///     .source_iter(q!(vec![1, 2]))
1121    ///     .batch(&tick, nondet!(/** test */));
1122    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1123    /// #   .entries()
1124    /// # }, |mut stream| async move {
1125    /// // { 3: ['c'], 4: ['d'] }
1126    /// # for w in vec![(3, 'c'), (4, 'd')] {
1127    /// #     assert_eq!(stream.next().await.unwrap(), w);
1128    /// # }
1129    /// # }));
1130    pub fn filter_key_not_in<O2, R2>(self, other: Stream<K, L, Bounded, O2, R2>) -> Self {
1131        KeyedStream {
1132            underlying: self.entries().anti_join(other),
1133            _phantom_order: Default::default(),
1134        }
1135    }
1136}
1137
1138impl<'a, K, V, L, B, O, R> KeyedStream<K, V, L, B, O, R>
1139where
1140    L: Location<'a> + NoTick + NoAtomic,
1141{
1142    pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1143        KeyedStream {
1144            underlying: self.underlying.atomic(tick),
1145            _phantom_order: Default::default(),
1146        }
1147    }
1148
1149    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1150    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1151    /// the order of the input.
1152    ///
1153    /// # Non-Determinism
1154    /// The batch boundaries are non-deterministic and may change across executions.
1155    pub fn batch(
1156        self,
1157        tick: &Tick<L>,
1158        nondet: NonDet,
1159    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1160        self.atomic(tick).batch(nondet)
1161    }
1162}
1163
1164impl<'a, K, V, L, B, O, R> KeyedStream<K, V, Atomic<L>, B, O, R>
1165where
1166    L: Location<'a> + NoTick + NoAtomic,
1167{
1168    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1169    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1170    /// the order of the input.
1171    ///
1172    /// # Non-Determinism
1173    /// The batch boundaries are non-deterministic and may change across executions.
1174    pub fn batch(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1175        KeyedStream {
1176            underlying: self.underlying.batch(nondet),
1177            _phantom_order: Default::default(),
1178        }
1179    }
1180}
1181
1182impl<'a, K, V, L, O, R> KeyedStream<K, V, Tick<L>, Bounded, O, R>
1183where
1184    L: Location<'a>,
1185{
1186    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1187        KeyedStream {
1188            underlying: self.underlying.all_ticks(),
1189            _phantom_order: Default::default(),
1190        }
1191    }
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196    use futures::{SinkExt, StreamExt};
1197    use hydro_deploy::Deployment;
1198    use stageleft::q;
1199
1200    use crate::location::Location;
1201    use crate::{FlowBuilder, nondet};
1202
1203    #[tokio::test]
1204    async fn reduce_watermark_filter() {
1205        let mut deployment = Deployment::new();
1206
1207        let flow = FlowBuilder::new();
1208        let node = flow.process::<()>();
1209        let external = flow.external::<()>();
1210
1211        let node_tick = node.tick();
1212        let watermark = node_tick.singleton(q!(1));
1213
1214        let sum = node
1215            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1216            .into_keyed()
1217            .reduce_watermark(
1218                watermark,
1219                q!(|acc, v| {
1220                    *acc += v;
1221                }),
1222            )
1223            .snapshot(&node_tick, nondet!(/** test */))
1224            .entries()
1225            .all_ticks()
1226            .send_bincode_external(&external);
1227
1228        let nodes = flow
1229            .with_process(&node, deployment.Localhost())
1230            .with_external(&external, deployment.Localhost())
1231            .deploy(&mut deployment);
1232
1233        deployment.deploy().await.unwrap();
1234
1235        let mut out = nodes.connect_source_bincode(sum).await;
1236
1237        deployment.start().await.unwrap();
1238
1239        assert_eq!(out.next().await.unwrap(), (2, 204));
1240    }
1241
1242    #[tokio::test]
1243    async fn reduce_watermark_garbage_collect() {
1244        let mut deployment = Deployment::new();
1245
1246        let flow = FlowBuilder::new();
1247        let node = flow.process::<()>();
1248        let external = flow.external::<()>();
1249        let (tick_send, tick_trigger) = node.source_external_bincode(&external);
1250
1251        let node_tick = node.tick();
1252        let (watermark_complete_cycle, watermark) =
1253            node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
1254        let next_watermark = watermark.clone().map(q!(|v| v + 1));
1255        watermark_complete_cycle.complete_next_tick(next_watermark);
1256
1257        let tick_triggered_input = node
1258            .source_iter(q!([(3, 103)]))
1259            .batch(&node_tick, nondet!(/** test */))
1260            .continue_if(
1261                tick_trigger
1262                    .clone()
1263                    .batch(&node_tick, nondet!(/** test */))
1264                    .first(),
1265            )
1266            .all_ticks();
1267
1268        let sum = node
1269            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1270            .interleave(tick_triggered_input)
1271            .into_keyed()
1272            .reduce_watermark_commutative(
1273                watermark,
1274                q!(|acc, v| {
1275                    *acc += v;
1276                }),
1277            )
1278            .snapshot(&node_tick, nondet!(/** test */))
1279            .entries()
1280            .all_ticks()
1281            .send_bincode_external(&external);
1282
1283        let nodes = flow
1284            .with_default_optimize()
1285            .with_process(&node, deployment.Localhost())
1286            .with_external(&external, deployment.Localhost())
1287            .deploy(&mut deployment);
1288
1289        deployment.deploy().await.unwrap();
1290
1291        let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
1292        let mut out_recv = nodes.connect_source_bincode(sum).await;
1293
1294        deployment.start().await.unwrap();
1295
1296        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
1297
1298        tick_send.send(()).await.unwrap();
1299
1300        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
1301    }
1302}