hydro_lang/stream.rs
1use std::cell::RefCell;
2use std::collections::HashMap;
3use std::future::Future;
4use std::hash::Hash;
5use std::marker::PhantomData;
6use std::ops::Deref;
7use std::rc::Rc;
8
9use stageleft::{IntoQuotedMut, QuotedWithContext, q};
10use syn::parse_quote;
11use tokio::time::Instant;
12
13use crate::builder::FLOW_USED_MESSAGE;
14use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
15use crate::ir::{HydroLeaf, HydroNode, TeeNode};
16use crate::keyed_stream::KeyedStream;
17use crate::location::tick::{Atomic, NoAtomic};
18use crate::location::{Location, LocationId, NoTick, Tick, check_matching_location};
19use crate::manual_expr::ManualExpr;
20use crate::unsafety::NonDet;
21use crate::*;
22
23pub mod networking;
24
25/// Marks the stream as being totally ordered, which means that there are
26/// no sources of non-determinism (other than intentional ones) that will
27/// affect the order of elements.
28pub enum TotalOrder {}
29
30/// Marks the stream as having no order, which means that the order of
31/// elements may be affected by non-determinism.
32///
33/// This restricts certain operators, such as `fold` and `reduce`, to only
34/// be used with commutative aggregation functions.
35pub enum NoOrder {}
36
37/// Helper trait for determining the weakest of two orderings.
38#[sealed::sealed]
39pub trait MinOrder<Other> {
40 /// The weaker of the two orderings.
41 type Min;
42}
43
44#[sealed::sealed]
45impl<T> MinOrder<T> for T {
46 type Min = T;
47}
48
49#[sealed::sealed]
50impl MinOrder<NoOrder> for TotalOrder {
51 type Min = NoOrder;
52}
53
54#[sealed::sealed]
55impl MinOrder<TotalOrder> for NoOrder {
56 type Min = NoOrder;
57}
58
59/// Marks the stream as having deterministic message cardinality, with no
60/// possibility of duplicates.
61pub enum ExactlyOnce {}
62
63/// Marks the stream as having non-deterministic message cardinality, which
64/// means that duplicates may occur, but messages will not be dropped.
65pub enum AtLeastOnce {}
66
67/// Helper trait for determining the weakest of two retry guarantees.
68#[sealed::sealed]
69pub trait MinRetries<Other> {
70 /// The weaker of the two retry guarantees.
71 type Min;
72}
73
74#[sealed::sealed]
75impl<T> MinRetries<T> for T {
76 type Min = T;
77}
78
79#[sealed::sealed]
80impl MinRetries<ExactlyOnce> for AtLeastOnce {
81 type Min = AtLeastOnce;
82}
83
84#[sealed::sealed]
85impl MinRetries<AtLeastOnce> for ExactlyOnce {
86 type Min = AtLeastOnce;
87}
88
89/// An ordered sequence stream of elements of type `T`.
90///
91/// Type Parameters:
92/// - `Type`: the type of elements in the stream
93/// - `Loc`: the location where the stream is being materialized
94/// - `Bound`: the boundedness of the stream, which is either [`Bounded`]
95/// or [`Unbounded`]
96/// - `Order`: the ordering of the stream, which is either [`TotalOrder`]
97/// or [`NoOrder`] (default is [`TotalOrder`])
98pub struct Stream<Type, Loc, Bound, Order = TotalOrder, Retries = ExactlyOnce> {
99 pub(crate) location: Loc,
100 pub(crate) ir_node: RefCell<HydroNode>,
101
102 _phantom: PhantomData<(Type, Loc, Bound, Order, Retries)>,
103}
104
105impl<'a, T, L, O, R> From<Stream<T, L, Bounded, O, R>> for Stream<T, L, Unbounded, O, R>
106where
107 L: Location<'a>,
108{
109 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
110 Stream {
111 location: stream.location,
112 ir_node: stream.ir_node,
113 _phantom: PhantomData,
114 }
115 }
116}
117
118impl<'a, T, L, B, R> From<Stream<T, L, B, TotalOrder, R>> for Stream<T, L, B, NoOrder, R>
119where
120 L: Location<'a>,
121{
122 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
123 Stream {
124 location: stream.location,
125 ir_node: stream.ir_node,
126 _phantom: PhantomData,
127 }
128 }
129}
130
131impl<'a, T, L, B, O> From<Stream<T, L, B, O, ExactlyOnce>> for Stream<T, L, B, O, AtLeastOnce>
132where
133 L: Location<'a>,
134{
135 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
136 Stream {
137 location: stream.location,
138 ir_node: stream.ir_node,
139 _phantom: PhantomData,
140 }
141 }
142}
143
144impl<'a, T, L, O, R> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
145where
146 L: Location<'a>,
147{
148 fn defer_tick(self) -> Self {
149 Stream::defer_tick(self)
150 }
151}
152
153impl<'a, T, L, O, R> CycleCollection<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O, R>
154where
155 L: Location<'a>,
156{
157 type Location = Tick<L>;
158
159 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
160 Stream::new(
161 location.clone(),
162 HydroNode::CycleSource {
163 ident,
164 metadata: location.new_node_metadata::<T>(),
165 },
166 )
167 }
168}
169
170impl<'a, T, L, O, R> CycleComplete<'a, TickCycleMarker> for Stream<T, Tick<L>, Bounded, O, R>
171where
172 L: Location<'a>,
173{
174 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
175 assert_eq!(
176 self.location.id(),
177 expected_location,
178 "locations do not match"
179 );
180 self.location
181 .flow_state()
182 .borrow_mut()
183 .leaves
184 .as_mut()
185 .expect(FLOW_USED_MESSAGE)
186 .push(HydroLeaf::CycleSink {
187 ident,
188 input: Box::new(self.ir_node.into_inner()),
189 metadata: self.location.new_node_metadata::<T>(),
190 });
191 }
192}
193
194impl<'a, T, L, B, O, R> CycleCollection<'a, ForwardRefMarker> for Stream<T, L, B, O, R>
195where
196 L: Location<'a> + NoTick,
197{
198 type Location = L;
199
200 fn create_source(ident: syn::Ident, location: L) -> Self {
201 Stream::new(
202 location.clone(),
203 HydroNode::Persist {
204 inner: Box::new(HydroNode::CycleSource {
205 ident,
206 metadata: location.new_node_metadata::<T>(),
207 }),
208 metadata: location.new_node_metadata::<T>(),
209 },
210 )
211 }
212}
213
214impl<'a, T, L, B, O, R> CycleComplete<'a, ForwardRefMarker> for Stream<T, L, B, O, R>
215where
216 L: Location<'a> + NoTick,
217{
218 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
219 assert_eq!(
220 self.location.id(),
221 expected_location,
222 "locations do not match"
223 );
224 let metadata = self.location.new_node_metadata::<T>();
225 self.location
226 .flow_state()
227 .borrow_mut()
228 .leaves
229 .as_mut()
230 .expect(FLOW_USED_MESSAGE)
231 .push(HydroLeaf::CycleSink {
232 ident,
233 input: Box::new(HydroNode::Unpersist {
234 inner: Box::new(self.ir_node.into_inner()),
235 metadata: metadata.clone(),
236 }),
237 metadata,
238 });
239 }
240}
241
242impl<'a, T, L, B, O, R> Clone for Stream<T, L, B, O, R>
243where
244 T: Clone,
245 L: Location<'a>,
246{
247 fn clone(&self) -> Self {
248 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
249 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
250 *self.ir_node.borrow_mut() = HydroNode::Tee {
251 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
252 metadata: self.location.new_node_metadata::<T>(),
253 };
254 }
255
256 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
257 Stream {
258 location: self.location.clone(),
259 ir_node: HydroNode::Tee {
260 inner: TeeNode(inner.0.clone()),
261 metadata: metadata.clone(),
262 }
263 .into(),
264 _phantom: PhantomData,
265 }
266 } else {
267 unreachable!()
268 }
269 }
270}
271
272impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
273where
274 L: Location<'a>,
275{
276 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
277 Stream {
278 location,
279 ir_node: RefCell::new(ir_node),
280 _phantom: PhantomData,
281 }
282 }
283
284 /// Produces a stream based on invoking `f` on each element.
285 /// If you do not want to modify the stream and instead only want to view
286 /// each item use [`Stream::inspect`] instead.
287 ///
288 /// # Example
289 /// ```rust
290 /// # use hydro_lang::*;
291 /// # use futures::StreamExt;
292 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
293 /// let words = process.source_iter(q!(vec!["hello", "world"]));
294 /// words.map(q!(|x| x.to_uppercase()))
295 /// # }, |mut stream| async move {
296 /// # for w in vec!["HELLO", "WORLD"] {
297 /// # assert_eq!(stream.next().await.unwrap(), w);
298 /// # }
299 /// # }));
300 /// ```
301 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
302 where
303 F: Fn(T) -> U + 'a,
304 {
305 let f = f.splice_fn1_ctx(&self.location).into();
306 Stream::new(
307 self.location.clone(),
308 HydroNode::Map {
309 f,
310 input: Box::new(self.ir_node.into_inner()),
311 metadata: self.location.new_node_metadata::<U>(),
312 },
313 )
314 }
315
316 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
317 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
318 /// for the output type `U` must produce items in a **deterministic** order.
319 ///
320 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
321 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
322 ///
323 /// # Example
324 /// ```rust
325 /// # use hydro_lang::*;
326 /// # use futures::StreamExt;
327 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
328 /// process
329 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
330 /// .flat_map_ordered(q!(|x| x))
331 /// # }, |mut stream| async move {
332 /// // 1, 2, 3, 4
333 /// # for w in (1..5) {
334 /// # assert_eq!(stream.next().await.unwrap(), w);
335 /// # }
336 /// # }));
337 /// ```
338 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
339 where
340 I: IntoIterator<Item = U>,
341 F: Fn(T) -> I + 'a,
342 {
343 let f = f.splice_fn1_ctx(&self.location).into();
344 Stream::new(
345 self.location.clone(),
346 HydroNode::FlatMap {
347 f,
348 input: Box::new(self.ir_node.into_inner()),
349 metadata: self.location.new_node_metadata::<U>(),
350 },
351 )
352 }
353
354 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
355 /// for the output type `U` to produce items in any order.
356 ///
357 /// # Example
358 /// ```rust
359 /// # use hydro_lang::{*, stream::ExactlyOnce};
360 /// # use futures::StreamExt;
361 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
362 /// process
363 /// .source_iter(q!(vec![
364 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
365 /// std::collections::HashSet::from_iter(vec![3, 4]),
366 /// ]))
367 /// .flat_map_unordered(q!(|x| x))
368 /// # }, |mut stream| async move {
369 /// // 1, 2, 3, 4, but in no particular order
370 /// # let mut results = Vec::new();
371 /// # for w in (1..5) {
372 /// # results.push(stream.next().await.unwrap());
373 /// # }
374 /// # results.sort();
375 /// # assert_eq!(results, vec![1, 2, 3, 4]);
376 /// # }));
377 /// ```
378 pub fn flat_map_unordered<U, I, F>(
379 self,
380 f: impl IntoQuotedMut<'a, F, L>,
381 ) -> Stream<U, L, B, NoOrder, R>
382 where
383 I: IntoIterator<Item = U>,
384 F: Fn(T) -> I + 'a,
385 {
386 let f = f.splice_fn1_ctx(&self.location).into();
387 Stream::new(
388 self.location.clone(),
389 HydroNode::FlatMap {
390 f,
391 input: Box::new(self.ir_node.into_inner()),
392 metadata: self.location.new_node_metadata::<U>(),
393 },
394 )
395 }
396
397 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
398 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
399 ///
400 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
401 /// not deterministic, use [`Stream::flatten_unordered`] instead.
402 ///
403 /// ```rust
404 /// # use hydro_lang::*;
405 /// # use futures::StreamExt;
406 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
407 /// process
408 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
409 /// .flatten_ordered()
410 /// # }, |mut stream| async move {
411 /// // 1, 2, 3, 4
412 /// # for w in (1..5) {
413 /// # assert_eq!(stream.next().await.unwrap(), w);
414 /// # }
415 /// # }));
416 /// ```
417 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
418 where
419 T: IntoIterator<Item = U>,
420 {
421 self.flat_map_ordered(q!(|d| d))
422 }
423
424 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
425 /// for the element type `T` to produce items in any order.
426 ///
427 /// # Example
428 /// ```rust
429 /// # use hydro_lang::{*, stream::ExactlyOnce};
430 /// # use futures::StreamExt;
431 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
432 /// process
433 /// .source_iter(q!(vec![
434 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
435 /// std::collections::HashSet::from_iter(vec![3, 4]),
436 /// ]))
437 /// .flatten_unordered()
438 /// # }, |mut stream| async move {
439 /// // 1, 2, 3, 4, but in no particular order
440 /// # let mut results = Vec::new();
441 /// # for w in (1..5) {
442 /// # results.push(stream.next().await.unwrap());
443 /// # }
444 /// # results.sort();
445 /// # assert_eq!(results, vec![1, 2, 3, 4]);
446 /// # }));
447 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
448 where
449 T: IntoIterator<Item = U>,
450 {
451 self.flat_map_unordered(q!(|d| d))
452 }
453
454 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
455 /// `f`, preserving the order of the elements.
456 ///
457 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
458 /// not modify or take ownership of the values. If you need to modify the values while filtering
459 /// use [`Stream::filter_map`] instead.
460 ///
461 /// # Example
462 /// ```rust
463 /// # use hydro_lang::*;
464 /// # use futures::StreamExt;
465 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
466 /// process
467 /// .source_iter(q!(vec![1, 2, 3, 4]))
468 /// .filter(q!(|&x| x > 2))
469 /// # }, |mut stream| async move {
470 /// // 3, 4
471 /// # for w in (3..5) {
472 /// # assert_eq!(stream.next().await.unwrap(), w);
473 /// # }
474 /// # }));
475 /// ```
476 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
477 where
478 F: Fn(&T) -> bool + 'a,
479 {
480 let f = f.splice_fn1_borrow_ctx(&self.location).into();
481 Stream::new(
482 self.location.clone(),
483 HydroNode::Filter {
484 f,
485 input: Box::new(self.ir_node.into_inner()),
486 metadata: self.location.new_node_metadata::<T>(),
487 },
488 )
489 }
490
491 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
492 ///
493 /// # Example
494 /// ```rust
495 /// # use hydro_lang::*;
496 /// # use futures::StreamExt;
497 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
498 /// process
499 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
500 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
501 /// # }, |mut stream| async move {
502 /// // 1, 2
503 /// # for w in (1..3) {
504 /// # assert_eq!(stream.next().await.unwrap(), w);
505 /// # }
506 /// # }));
507 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
508 where
509 F: Fn(T) -> Option<U> + 'a,
510 {
511 let f = f.splice_fn1_ctx(&self.location).into();
512 Stream::new(
513 self.location.clone(),
514 HydroNode::FilterMap {
515 f,
516 input: Box::new(self.ir_node.into_inner()),
517 metadata: self.location.new_node_metadata::<U>(),
518 },
519 )
520 }
521
522 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
523 /// where `x` is the final value of `other`, a bounded [`Singleton`].
524 ///
525 /// # Example
526 /// ```rust
527 /// # use hydro_lang::*;
528 /// # use futures::StreamExt;
529 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
530 /// let tick = process.tick();
531 /// let batch = process
532 /// .source_iter(q!(vec![1, 2, 3, 4]))
533 /// .batch(&tick, nondet!(/** test */));
534 /// let count = batch.clone().count(); // `count()` returns a singleton
535 /// batch.cross_singleton(count).all_ticks()
536 /// # }, |mut stream| async move {
537 /// // (1, 4), (2, 4), (3, 4), (4, 4)
538 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
539 /// # assert_eq!(stream.next().await.unwrap(), w);
540 /// # }
541 /// # }));
542 pub fn cross_singleton<O2>(
543 self,
544 other: impl Into<Optional<O2, L, Bounded>>,
545 ) -> Stream<(T, O2), L, B, O, R>
546 where
547 O2: Clone,
548 {
549 let other: Optional<O2, L, Bounded> = other.into();
550 check_matching_location(&self.location, &other.location);
551
552 Stream::new(
553 self.location.clone(),
554 HydroNode::CrossSingleton {
555 left: Box::new(self.ir_node.into_inner()),
556 right: Box::new(other.ir_node.into_inner()),
557 metadata: self.location.new_node_metadata::<(T, O2)>(),
558 },
559 )
560 }
561
562 /// Allow this stream through if the argument (a Bounded Optional) is non-empty, otherwise the output is empty.
563 pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
564 self.cross_singleton(signal.map(q!(|_u| ())))
565 .map(q!(|(d, _signal)| d))
566 }
567
568 /// Allow this stream through if the argument (a Bounded Optional) is empty, otherwise the output is empty.
569 pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
570 self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
571 }
572
573 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
574 /// tupled pairs in a non-deterministic order.
575 ///
576 /// # Example
577 /// ```rust
578 /// # use hydro_lang::*;
579 /// # use std::collections::HashSet;
580 /// # use futures::StreamExt;
581 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
582 /// let tick = process.tick();
583 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
584 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
585 /// stream1.cross_product(stream2)
586 /// # }, |mut stream| async move {
587 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
588 /// # stream.map(|i| assert!(expected.contains(&i)));
589 /// # }));
590 pub fn cross_product<T2, O2>(
591 self,
592 other: Stream<T2, L, B, O2, R>,
593 ) -> Stream<(T, T2), L, B, NoOrder, R>
594 where
595 T: Clone,
596 T2: Clone,
597 {
598 check_matching_location(&self.location, &other.location);
599
600 Stream::new(
601 self.location.clone(),
602 HydroNode::CrossProduct {
603 left: Box::new(self.ir_node.into_inner()),
604 right: Box::new(other.ir_node.into_inner()),
605 metadata: self.location.new_node_metadata::<(T, T2)>(),
606 },
607 )
608 }
609
610 /// Takes one stream as input and filters out any duplicate occurrences. The output
611 /// contains all unique values from the input.
612 ///
613 /// # Example
614 /// ```rust
615 /// # use hydro_lang::*;
616 /// # use futures::StreamExt;
617 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
618 /// let tick = process.tick();
619 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
620 /// # }, |mut stream| async move {
621 /// # for w in vec![1, 2, 3, 4] {
622 /// # assert_eq!(stream.next().await.unwrap(), w);
623 /// # }
624 /// # }));
625 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
626 where
627 T: Eq + Hash,
628 {
629 Stream::new(
630 self.location.clone(),
631 HydroNode::Unique {
632 input: Box::new(self.ir_node.into_inner()),
633 metadata: self.location.new_node_metadata::<T>(),
634 },
635 )
636 }
637
638 /// Outputs everything in this stream that is *not* contained in the `other` stream.
639 ///
640 /// The `other` stream must be [`Bounded`], since this function will wait until
641 /// all its elements are available before producing any output.
642 /// # Example
643 /// ```rust
644 /// # use hydro_lang::*;
645 /// # use futures::StreamExt;
646 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
647 /// let tick = process.tick();
648 /// let stream = process
649 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
650 /// .batch(&tick, nondet!(/** test */));
651 /// let batch = process
652 /// .source_iter(q!(vec![1, 2]))
653 /// .batch(&tick, nondet!(/** test */));
654 /// stream.filter_not_in(batch).all_ticks()
655 /// # }, |mut stream| async move {
656 /// # for w in vec![3, 4] {
657 /// # assert_eq!(stream.next().await.unwrap(), w);
658 /// # }
659 /// # }));
660 pub fn filter_not_in<O2>(
661 self,
662 other: Stream<T, L, Bounded, O2, R>,
663 ) -> Stream<T, L, Bounded, O, R>
664 where
665 T: Eq + Hash,
666 {
667 check_matching_location(&self.location, &other.location);
668
669 Stream::new(
670 self.location.clone(),
671 HydroNode::Difference {
672 pos: Box::new(self.ir_node.into_inner()),
673 neg: Box::new(other.ir_node.into_inner()),
674 metadata: self.location.new_node_metadata::<T>(),
675 },
676 )
677 }
678
679 /// An operator which allows you to "inspect" each element of a stream without
680 /// modifying it. The closure `f` is called on a reference to each item. This is
681 /// mainly useful for debugging, and should not be used to generate side-effects.
682 ///
683 /// # Example
684 /// ```rust
685 /// # use hydro_lang::*;
686 /// # use futures::StreamExt;
687 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
688 /// let nums = process.source_iter(q!(vec![1, 2]));
689 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
690 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
691 /// # }, |mut stream| async move {
692 /// # for w in vec![1, 2] {
693 /// # assert_eq!(stream.next().await.unwrap(), w);
694 /// # }
695 /// # }));
696 /// ```
697 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<T, L, B, O, R>
698 where
699 F: Fn(&T) + 'a,
700 {
701 let f = f.splice_fn1_borrow_ctx(&self.location).into();
702
703 if L::is_top_level() {
704 Stream::new(
705 self.location.clone(),
706 HydroNode::Persist {
707 inner: Box::new(HydroNode::Inspect {
708 f,
709 input: Box::new(HydroNode::Unpersist {
710 inner: Box::new(self.ir_node.into_inner()),
711 metadata: self.location.new_node_metadata::<T>(),
712 }),
713 metadata: self.location.new_node_metadata::<T>(),
714 }),
715 metadata: self.location.new_node_metadata::<T>(),
716 },
717 )
718 } else {
719 Stream::new(
720 self.location.clone(),
721 HydroNode::Inspect {
722 f,
723 input: Box::new(self.ir_node.into_inner()),
724 metadata: self.location.new_node_metadata::<T>(),
725 },
726 )
727 }
728 }
729
730 /// An operator which allows you to "name" a `HydroNode`.
731 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
732 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
733 {
734 let mut node = self.ir_node.borrow_mut();
735 let metadata = node.metadata_mut();
736 metadata.tag = Some(name.to_string());
737 }
738 self
739 }
740
741 /// Explicitly "casts" the stream to a type with a different ordering
742 /// guarantee. Useful in unsafe code where the ordering cannot be proven
743 /// by the type-system.
744 ///
745 /// # Non-Determinism
746 /// This function is used as an escape hatch, and any mistakes in the
747 /// provided ordering guarantee will propagate into the guarantees
748 /// for the rest of the program.
749 pub fn assume_ordering<O2>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
750 Stream::new(self.location, self.ir_node.into_inner())
751 }
752
753 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
754 /// which is always safe because that is the weakest possible guarantee.
755 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
756 let nondet = nondet!(/** this is a weaker odering guarantee, so it is safe to assume */);
757 self.assume_ordering::<NoOrder>(nondet)
758 }
759
760 /// Explicitly "casts" the stream to a type with a different retries
761 /// guarantee. Useful in unsafe code where the lack of retries cannot
762 /// be proven by the type-system.
763 ///
764 /// # Non-Determinism
765 /// This function is used as an escape hatch, and any mistakes in the
766 /// provided retries guarantee will propagate into the guarantees
767 /// for the rest of the program.
768 pub fn assume_retries<R2>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
769 Stream::new(self.location, self.ir_node.into_inner())
770 }
771
772 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
773 /// which is always safe because that is the weakest possible guarantee.
774 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
775 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
776 self.assume_retries::<AtLeastOnce>(nondet)
777 }
778
779 /// Weakens the retries guarantee provided by the stream to be the weaker of the
780 /// current guarantee and `R2`. This is safe because the output guarantee will
781 /// always be weaker than the input.
782 pub fn weaken_retries<R2>(self) -> Stream<T, L, B, O, <R as MinRetries<R2>>::Min>
783 where
784 R: MinRetries<R2>,
785 {
786 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
787 self.assume_retries::<<R as MinRetries<R2>>::Min>(nondet)
788 }
789}
790
791impl<'a, T, L, B, O> Stream<T, L, B, O, ExactlyOnce>
792where
793 L: Location<'a>,
794{
795 /// Given a stream with [`ExactlyOnce`] retry guarantees, weakens it to an arbitrary guarantee
796 /// `R2`, which is safe because all guarantees are equal to or weaker than [`ExactlyOnce`]
797 pub fn weaker_retries<R2>(self) -> Stream<T, L, B, O, R2> {
798 self.assume_retries(
799 nondet!(/** any retry ordering is the same or weaker than ExactlyOnce */),
800 )
801 }
802}
803
804impl<'a, T, L, B, O, R> Stream<&T, L, B, O, R>
805where
806 L: Location<'a>,
807{
808 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
809 ///
810 /// # Example
811 /// ```rust
812 /// # use hydro_lang::*;
813 /// # use futures::StreamExt;
814 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
815 /// process.source_iter(q!(&[1, 2, 3])).cloned()
816 /// # }, |mut stream| async move {
817 /// // 1, 2, 3
818 /// # for w in vec![1, 2, 3] {
819 /// # assert_eq!(stream.next().await.unwrap(), w);
820 /// # }
821 /// # }));
822 /// ```
823 pub fn cloned(self) -> Stream<T, L, B, O, R>
824 where
825 T: Clone,
826 {
827 self.map(q!(|d| d.clone()))
828 }
829}
830
831impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
832where
833 L: Location<'a>,
834{
835 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
836 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
837 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
838 ///
839 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
840 /// and there may be duplicates.
841 ///
842 /// # Example
843 /// ```rust
844 /// # use hydro_lang::*;
845 /// # use futures::StreamExt;
846 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
847 /// let tick = process.tick();
848 /// let bools = process.source_iter(q!(vec![false, true, false]));
849 /// let batch = bools.batch(&tick, nondet!(/** test */));
850 /// batch
851 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
852 /// .all_ticks()
853 /// # }, |mut stream| async move {
854 /// // true
855 /// # assert_eq!(stream.next().await.unwrap(), true);
856 /// # }));
857 /// ```
858 pub fn fold_commutative_idempotent<A, I, F>(
859 self,
860 init: impl IntoQuotedMut<'a, I, L>,
861 comb: impl IntoQuotedMut<'a, F, L>,
862 ) -> Singleton<A, L, B>
863 where
864 I: Fn() -> A + 'a,
865 F: Fn(&mut A, T),
866 {
867 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
868 self.assume_ordering(nondet)
869 .assume_retries(nondet)
870 .fold(init, comb)
871 }
872
873 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
874 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
875 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
876 /// reference, so that it can be modified in place.
877 ///
878 /// The `comb` closure must be **commutative** AND **idempotent**, as the order of input items is not guaranteed
879 /// and there may be duplicates.
880 ///
881 /// # Example
882 /// ```rust
883 /// # use hydro_lang::*;
884 /// # use futures::StreamExt;
885 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
886 /// let tick = process.tick();
887 /// let bools = process.source_iter(q!(vec![false, true, false]));
888 /// let batch = bools.batch(&tick, nondet!(/** test */));
889 /// batch
890 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
891 /// .all_ticks()
892 /// # }, |mut stream| async move {
893 /// // true
894 /// # assert_eq!(stream.next().await.unwrap(), true);
895 /// # }));
896 /// ```
897 pub fn reduce_commutative_idempotent<F>(
898 self,
899 comb: impl IntoQuotedMut<'a, F, L>,
900 ) -> Optional<T, L, B>
901 where
902 F: Fn(&mut T, T) + 'a,
903 {
904 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
905 self.assume_ordering(nondet)
906 .assume_retries(nondet)
907 .reduce(comb)
908 }
909
910 /// Computes the maximum element in the stream as an [`Optional`], which
911 /// will be empty until the first element in the input arrives.
912 ///
913 /// # Example
914 /// ```rust
915 /// # use hydro_lang::*;
916 /// # use futures::StreamExt;
917 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
918 /// let tick = process.tick();
919 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
920 /// let batch = numbers.batch(&tick, nondet!(/** test */));
921 /// batch.max().all_ticks()
922 /// # }, |mut stream| async move {
923 /// // 4
924 /// # assert_eq!(stream.next().await.unwrap(), 4);
925 /// # }));
926 /// ```
927 pub fn max(self) -> Optional<T, L, B>
928 where
929 T: Ord,
930 {
931 self.reduce_commutative_idempotent(q!(|curr, new| {
932 if new > *curr {
933 *curr = new;
934 }
935 }))
936 }
937
938 /// Computes the maximum element in the stream as an [`Optional`], where the
939 /// maximum is determined according to the `key` function. The [`Optional`] will
940 /// be empty until the first element in the input arrives.
941 ///
942 /// # Example
943 /// ```rust
944 /// # use hydro_lang::*;
945 /// # use futures::StreamExt;
946 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
947 /// let tick = process.tick();
948 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
949 /// let batch = numbers.batch(&tick, nondet!(/** test */));
950 /// batch.max_by_key(q!(|x| -x)).all_ticks()
951 /// # }, |mut stream| async move {
952 /// // 1
953 /// # assert_eq!(stream.next().await.unwrap(), 1);
954 /// # }));
955 /// ```
956 pub fn max_by_key<K, F>(self, key: impl IntoQuotedMut<'a, F, L> + Copy) -> Optional<T, L, B>
957 where
958 K: Ord,
959 F: Fn(&T) -> K + 'a,
960 {
961 let f = key.splice_fn1_borrow_ctx(&self.location);
962
963 let wrapped: syn::Expr = parse_quote!({
964 let key_fn = #f;
965 move |curr, new| {
966 if key_fn(&new) > key_fn(&*curr) {
967 *curr = new;
968 }
969 }
970 });
971
972 let mut core = HydroNode::Reduce {
973 f: wrapped.into(),
974 input: Box::new(self.ir_node.into_inner()),
975 metadata: self.location.new_node_metadata::<T>(),
976 };
977
978 if L::is_top_level() {
979 core = HydroNode::Persist {
980 inner: Box::new(core),
981 metadata: self.location.new_node_metadata::<T>(),
982 };
983 }
984
985 Optional::new(self.location, core)
986 }
987
988 /// Computes the minimum element in the stream as an [`Optional`], which
989 /// will be empty until the first element in the input arrives.
990 ///
991 /// # Example
992 /// ```rust
993 /// # use hydro_lang::*;
994 /// # use futures::StreamExt;
995 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
996 /// let tick = process.tick();
997 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
998 /// let batch = numbers.batch(&tick, nondet!(/** test */));
999 /// batch.min().all_ticks()
1000 /// # }, |mut stream| async move {
1001 /// // 1
1002 /// # assert_eq!(stream.next().await.unwrap(), 1);
1003 /// # }));
1004 /// ```
1005 pub fn min(self) -> Optional<T, L, B>
1006 where
1007 T: Ord,
1008 {
1009 self.reduce_commutative_idempotent(q!(|curr, new| {
1010 if new < *curr {
1011 *curr = new;
1012 }
1013 }))
1014 }
1015}
1016
1017impl<'a, T, L, B, O> Stream<T, L, B, O, ExactlyOnce>
1018where
1019 L: Location<'a>,
1020{
1021 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1022 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1023 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1024 ///
1025 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1026 ///
1027 /// # Example
1028 /// ```rust
1029 /// # use hydro_lang::*;
1030 /// # use futures::StreamExt;
1031 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1032 /// let tick = process.tick();
1033 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1034 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1035 /// batch
1036 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
1037 /// .all_ticks()
1038 /// # }, |mut stream| async move {
1039 /// // 10
1040 /// # assert_eq!(stream.next().await.unwrap(), 10);
1041 /// # }));
1042 /// ```
1043 pub fn fold_commutative<A, I, F>(
1044 self,
1045 init: impl IntoQuotedMut<'a, I, L>,
1046 comb: impl IntoQuotedMut<'a, F, L>,
1047 ) -> Singleton<A, L, B>
1048 where
1049 I: Fn() -> A + 'a,
1050 F: Fn(&mut A, T),
1051 {
1052 let nondet = nondet!(/** the combinator function is commutative */);
1053 self.assume_ordering(nondet).fold(init, comb)
1054 }
1055
1056 /// Combines elements of the stream into a [`Optional`], by starting with the first element in the stream,
1057 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1058 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1059 /// reference, so that it can be modified in place.
1060 ///
1061 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1062 ///
1063 /// # Example
1064 /// ```rust
1065 /// # use hydro_lang::*;
1066 /// # use futures::StreamExt;
1067 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1068 /// let tick = process.tick();
1069 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1070 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1071 /// batch
1072 /// .reduce_commutative(q!(|curr, new| *curr += new))
1073 /// .all_ticks()
1074 /// # }, |mut stream| async move {
1075 /// // 10
1076 /// # assert_eq!(stream.next().await.unwrap(), 10);
1077 /// # }));
1078 /// ```
1079 pub fn reduce_commutative<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1080 where
1081 F: Fn(&mut T, T) + 'a,
1082 {
1083 let nondet = nondet!(/** the combinator function is commutative */);
1084 self.assume_ordering(nondet).reduce(comb)
1085 }
1086
1087 /// Computes the number of elements in the stream as a [`Singleton`].
1088 ///
1089 /// # Example
1090 /// ```rust
1091 /// # use hydro_lang::*;
1092 /// # use futures::StreamExt;
1093 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1094 /// let tick = process.tick();
1095 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1096 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1097 /// batch.count().all_ticks()
1098 /// # }, |mut stream| async move {
1099 /// // 4
1100 /// # assert_eq!(stream.next().await.unwrap(), 4);
1101 /// # }));
1102 /// ```
1103 pub fn count(self) -> Singleton<usize, L, B> {
1104 self.fold_commutative(q!(|| 0usize), q!(|count, _| *count += 1))
1105 }
1106}
1107
1108impl<'a, T, L, B, R> Stream<T, L, B, TotalOrder, R>
1109where
1110 L: Location<'a>,
1111{
1112 /// Combines elements of the stream into a [`Singleton`], by starting with an initial value,
1113 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1114 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1115 ///
1116 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1117 ///
1118 /// # Example
1119 /// ```rust
1120 /// # use hydro_lang::*;
1121 /// # use futures::StreamExt;
1122 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1123 /// let tick = process.tick();
1124 /// let bools = process.source_iter(q!(vec![false, true, false]));
1125 /// let batch = bools.batch(&tick, nondet!(/** test */));
1126 /// batch
1127 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1128 /// .all_ticks()
1129 /// # }, |mut stream| async move {
1130 /// // true
1131 /// # assert_eq!(stream.next().await.unwrap(), true);
1132 /// # }));
1133 /// ```
1134 pub fn fold_idempotent<A, I, F>(
1135 self,
1136 init: impl IntoQuotedMut<'a, I, L>,
1137 comb: impl IntoQuotedMut<'a, F, L>,
1138 ) -> Singleton<A, L, B>
1139 where
1140 I: Fn() -> A + 'a,
1141 F: Fn(&mut A, T),
1142 {
1143 let nondet = nondet!(/** the combinator function is idempotent */);
1144 self.assume_retries(nondet).fold(init, comb)
1145 }
1146
1147 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1148 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1149 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1150 /// reference, so that it can be modified in place.
1151 ///
1152 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1153 ///
1154 /// # Example
1155 /// ```rust
1156 /// # use hydro_lang::*;
1157 /// # use futures::StreamExt;
1158 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1159 /// let tick = process.tick();
1160 /// let bools = process.source_iter(q!(vec![false, true, false]));
1161 /// let batch = bools.batch(&tick, nondet!(/** test */));
1162 /// batch.reduce_idempotent(q!(|acc, x| *acc |= x)).all_ticks()
1163 /// # }, |mut stream| async move {
1164 /// // true
1165 /// # assert_eq!(stream.next().await.unwrap(), true);
1166 /// # }));
1167 /// ```
1168 pub fn reduce_idempotent<F>(self, comb: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
1169 where
1170 F: Fn(&mut T, T) + 'a,
1171 {
1172 let nondet = nondet!(/** the combinator function is idempotent */);
1173 self.assume_retries(nondet).reduce(comb)
1174 }
1175
1176 /// Computes the first element in the stream as an [`Optional`], which
1177 /// will be empty until the first element in the input arrives.
1178 ///
1179 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1180 /// re-ordering of elements may cause the first element to change.
1181 ///
1182 /// # Example
1183 /// ```rust
1184 /// # use hydro_lang::*;
1185 /// # use futures::StreamExt;
1186 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1187 /// let tick = process.tick();
1188 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1189 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1190 /// batch.first().all_ticks()
1191 /// # }, |mut stream| async move {
1192 /// // 1
1193 /// # assert_eq!(stream.next().await.unwrap(), 1);
1194 /// # }));
1195 /// ```
1196 pub fn first(self) -> Optional<T, L, B> {
1197 self.reduce_idempotent(q!(|_, _| {}))
1198 }
1199
1200 /// Computes the last element in the stream as an [`Optional`], which
1201 /// will be empty until an element in the input arrives.
1202 ///
1203 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1204 /// re-ordering of elements may cause the last element to change.
1205 ///
1206 /// # Example
1207 /// ```rust
1208 /// # use hydro_lang::*;
1209 /// # use futures::StreamExt;
1210 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1211 /// let tick = process.tick();
1212 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1213 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1214 /// batch.last().all_ticks()
1215 /// # }, |mut stream| async move {
1216 /// // 4
1217 /// # assert_eq!(stream.next().await.unwrap(), 4);
1218 /// # }));
1219 /// ```
1220 pub fn last(self) -> Optional<T, L, B> {
1221 self.reduce_idempotent(q!(|curr, new| *curr = new))
1222 }
1223}
1224
1225impl<'a, T, L, B> Stream<T, L, B, TotalOrder, ExactlyOnce>
1226where
1227 L: Location<'a>,
1228{
1229 /// Returns a stream with the current count tupled with each element in the input stream.
1230 ///
1231 /// # Example
1232 /// ```rust
1233 /// # use hydro_lang::{*, stream::ExactlyOnce};
1234 /// # use futures::StreamExt;
1235 /// # tokio_test::block_on(test_util::stream_transform_test::<_, _, TotalOrder, ExactlyOnce>(|process| {
1236 /// let tick = process.tick();
1237 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1238 /// numbers.enumerate()
1239 /// # }, |mut stream| async move {
1240 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1241 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1242 /// # assert_eq!(stream.next().await.unwrap(), w);
1243 /// # }
1244 /// # }));
1245 /// ```
1246 pub fn enumerate(self) -> Stream<(usize, T), L, B, TotalOrder, ExactlyOnce> {
1247 if L::is_top_level() {
1248 Stream::new(
1249 self.location.clone(),
1250 HydroNode::Persist {
1251 inner: Box::new(HydroNode::Enumerate {
1252 is_static: true,
1253 input: Box::new(HydroNode::Unpersist {
1254 inner: Box::new(self.ir_node.into_inner()),
1255 metadata: self.location.new_node_metadata::<T>(),
1256 }),
1257 metadata: self.location.new_node_metadata::<(usize, T)>(),
1258 }),
1259 metadata: self.location.new_node_metadata::<(usize, T)>(),
1260 },
1261 )
1262 } else {
1263 Stream::new(
1264 self.location.clone(),
1265 HydroNode::Enumerate {
1266 is_static: false,
1267 input: Box::new(self.ir_node.into_inner()),
1268 metadata: self.location.new_node_metadata::<(usize, T)>(),
1269 },
1270 )
1271 }
1272 }
1273
1274 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1275 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1276 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1277 ///
1278 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1279 /// to depend on the order of elements in the stream.
1280 ///
1281 /// # Example
1282 /// ```rust
1283 /// # use hydro_lang::*;
1284 /// # use futures::StreamExt;
1285 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1286 /// let tick = process.tick();
1287 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1288 /// let batch = words.batch(&tick, nondet!(/** test */));
1289 /// batch
1290 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1291 /// .all_ticks()
1292 /// # }, |mut stream| async move {
1293 /// // "HELLOWORLD"
1294 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1295 /// # }));
1296 /// ```
1297 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
1298 self,
1299 init: impl IntoQuotedMut<'a, I, L>,
1300 comb: impl IntoQuotedMut<'a, F, L>,
1301 ) -> Singleton<A, L, B> {
1302 let init = init.splice_fn0_ctx(&self.location).into();
1303 let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1304
1305 let mut core = HydroNode::Fold {
1306 init,
1307 acc: comb,
1308 input: Box::new(self.ir_node.into_inner()),
1309 metadata: self.location.new_node_metadata::<A>(),
1310 };
1311
1312 if L::is_top_level() {
1313 // top-level (possibly unbounded) singletons are represented as
1314 // a stream which produces all values from all ticks every tick,
1315 // so Unpersist will always give the lastest aggregation
1316 core = HydroNode::Persist {
1317 inner: Box::new(core),
1318 metadata: self.location.new_node_metadata::<A>(),
1319 };
1320 }
1321
1322 Singleton::new(self.location, core)
1323 }
1324
1325 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B> {
1326 self.fold(
1327 q!(|| vec![]),
1328 q!(|acc, v| {
1329 acc.push(v);
1330 }),
1331 )
1332 }
1333
1334 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1335 /// and emitting each intermediate result.
1336 ///
1337 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1338 /// containing all intermediate accumulated values. The scan operation can also terminate early
1339 /// by returning `None`.
1340 ///
1341 /// The function takes a mutable reference to the accumulator and the current element, and returns
1342 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1343 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1344 ///
1345 /// # Examples
1346 ///
1347 /// Basic usage - running sum:
1348 /// ```rust
1349 /// # use hydro_lang::*;
1350 /// # use futures::StreamExt;
1351 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1352 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1353 /// q!(|| 0),
1354 /// q!(|acc, x| {
1355 /// *acc += x;
1356 /// Some(*acc)
1357 /// }),
1358 /// )
1359 /// # }, |mut stream| async move {
1360 /// // Output: 1, 3, 6, 10
1361 /// # for w in vec![1, 3, 6, 10] {
1362 /// # assert_eq!(stream.next().await.unwrap(), w);
1363 /// # }
1364 /// # }));
1365 /// ```
1366 ///
1367 /// Early termination example:
1368 /// ```rust
1369 /// # use hydro_lang::*;
1370 /// # use futures::StreamExt;
1371 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1372 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1373 /// q!(|| 1),
1374 /// q!(|state, x| {
1375 /// *state = *state * x;
1376 /// if *state > 6 {
1377 /// None // Terminate the stream
1378 /// } else {
1379 /// Some(-*state)
1380 /// }
1381 /// }),
1382 /// )
1383 /// # }, |mut stream| async move {
1384 /// // Output: -1, -2, -6
1385 /// # for w in vec![-1, -2, -6] {
1386 /// # assert_eq!(stream.next().await.unwrap(), w);
1387 /// # }
1388 /// # }));
1389 /// ```
1390 pub fn scan<A, U, I, F>(
1391 self,
1392 init: impl IntoQuotedMut<'a, I, L>,
1393 f: impl IntoQuotedMut<'a, F, L>,
1394 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1395 where
1396 I: Fn() -> A + 'a,
1397 F: Fn(&mut A, T) -> Option<U> + 'a,
1398 {
1399 let init = init.splice_fn0_ctx(&self.location).into();
1400 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1401
1402 if L::is_top_level() {
1403 Stream::new(
1404 self.location.clone(),
1405 HydroNode::Persist {
1406 inner: Box::new(HydroNode::Scan {
1407 init,
1408 acc: f,
1409 input: Box::new(HydroNode::Unpersist {
1410 inner: Box::new(self.ir_node.into_inner()),
1411 metadata: self.location.new_node_metadata::<U>(),
1412 }),
1413 metadata: self.location.new_node_metadata::<U>(),
1414 }),
1415 metadata: self.location.new_node_metadata::<U>(),
1416 },
1417 )
1418 } else {
1419 Stream::new(
1420 self.location.clone(),
1421 HydroNode::Scan {
1422 init,
1423 acc: f,
1424 input: Box::new(self.ir_node.into_inner()),
1425 metadata: self.location.new_node_metadata::<U>(),
1426 },
1427 )
1428 }
1429 }
1430
1431 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1432 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1433 /// until the first element in the input arrives.
1434 ///
1435 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1436 /// to depend on the order of elements in the stream.
1437 ///
1438 /// # Example
1439 /// ```rust
1440 /// # use hydro_lang::*;
1441 /// # use futures::StreamExt;
1442 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1443 /// let tick = process.tick();
1444 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1445 /// let batch = words.batch(&tick, nondet!(/** test */));
1446 /// batch
1447 /// .map(q!(|x| x.to_string()))
1448 /// .reduce(q!(|curr, new| curr.push_str(&new)))
1449 /// .all_ticks()
1450 /// # }, |mut stream| async move {
1451 /// // "HELLOWORLD"
1452 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1453 /// # }));
1454 /// ```
1455 pub fn reduce<F: Fn(&mut T, T) + 'a>(
1456 self,
1457 comb: impl IntoQuotedMut<'a, F, L>,
1458 ) -> Optional<T, L, B> {
1459 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1460 let mut core = HydroNode::Reduce {
1461 f,
1462 input: Box::new(self.ir_node.into_inner()),
1463 metadata: self.location.new_node_metadata::<T>(),
1464 };
1465
1466 if L::is_top_level() {
1467 core = HydroNode::Persist {
1468 inner: Box::new(core),
1469 metadata: self.location.new_node_metadata::<T>(),
1470 };
1471 }
1472
1473 Optional::new(self.location, core)
1474 }
1475}
1476
1477impl<'a, T, L: Location<'a> + NoTick + NoAtomic, O, R> Stream<T, L, Unbounded, O, R> {
1478 /// Produces a new stream that interleaves the elements of the two input streams.
1479 /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1480 ///
1481 /// Currently, both input streams must be [`Unbounded`]. When the streams are
1482 /// [`Bounded`], you can use [`Stream::chain`] instead.
1483 ///
1484 /// # Example
1485 /// ```rust
1486 /// # use hydro_lang::*;
1487 /// # use futures::StreamExt;
1488 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1489 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1490 /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1491 /// # }, |mut stream| async move {
1492 /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1493 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1494 /// # assert_eq!(stream.next().await.unwrap(), w);
1495 /// # }
1496 /// # }));
1497 /// ```
1498 pub fn interleave<O2, R2: MinRetries<R>>(
1499 self,
1500 other: Stream<T, L, Unbounded, O2, R2>,
1501 ) -> Stream<T, L, Unbounded, NoOrder, R::Min>
1502 where
1503 R: MinRetries<R2, Min = R2::Min>,
1504 {
1505 let tick = self.location.tick();
1506 // Because the outputs are unordered, we can interleave batches from both streams.
1507 let nondet_batch_interleaving = nondet!(/** output stream is NoOrder, can interleave */);
1508 self.batch(&tick, nondet_batch_interleaving)
1509 .weakest_ordering()
1510 .weaken_retries::<R2>()
1511 .chain(
1512 other
1513 .batch(&tick, nondet_batch_interleaving)
1514 .weakest_ordering()
1515 .weaken_retries::<R>(),
1516 )
1517 .all_ticks()
1518 }
1519}
1520
1521impl<'a, T, L, O, R> Stream<T, L, Bounded, O, R>
1522where
1523 L: Location<'a>,
1524{
1525 /// Produces a new stream that emits the input elements in sorted order.
1526 ///
1527 /// The input stream can have any ordering guarantee, but the output stream
1528 /// will have a [`TotalOrder`] guarantee. This operator will block until all
1529 /// elements in the input stream are available, so it requires the input stream
1530 /// to be [`Bounded`].
1531 ///
1532 /// # Example
1533 /// ```rust
1534 /// # use hydro_lang::*;
1535 /// # use futures::StreamExt;
1536 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1537 /// let tick = process.tick();
1538 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1539 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1540 /// batch.sort().all_ticks()
1541 /// # }, |mut stream| async move {
1542 /// // 1, 2, 3, 4
1543 /// # for w in (1..5) {
1544 /// # assert_eq!(stream.next().await.unwrap(), w);
1545 /// # }
1546 /// # }));
1547 /// ```
1548 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1549 where
1550 T: Ord,
1551 {
1552 Stream::new(
1553 self.location.clone(),
1554 HydroNode::Sort {
1555 input: Box::new(self.ir_node.into_inner()),
1556 metadata: self.location.new_node_metadata::<T>(),
1557 },
1558 )
1559 }
1560
1561 /// Produces a new stream that first emits the elements of the `self` stream,
1562 /// and then emits the elements of the `other` stream. The output stream has
1563 /// a [`TotalOrder`] guarantee if and only if both input streams have a
1564 /// [`TotalOrder`] guarantee.
1565 ///
1566 /// Currently, both input streams must be [`Bounded`]. This operator will block
1567 /// on the first stream until all its elements are available. In a future version,
1568 /// we will relax the requirement on the `other` stream.
1569 ///
1570 /// # Example
1571 /// ```rust
1572 /// # use hydro_lang::*;
1573 /// # use futures::StreamExt;
1574 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1575 /// let tick = process.tick();
1576 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1577 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1578 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1579 /// # }, |mut stream| async move {
1580 /// // 2, 3, 4, 5, 1, 2, 3, 4
1581 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1582 /// # assert_eq!(stream.next().await.unwrap(), w);
1583 /// # }
1584 /// # }));
1585 /// ```
1586 pub fn chain<O2>(self, other: Stream<T, L, Bounded, O2, R>) -> Stream<T, L, Bounded, O::Min, R>
1587 where
1588 O: MinOrder<O2>,
1589 {
1590 check_matching_location(&self.location, &other.location);
1591
1592 Stream::new(
1593 self.location.clone(),
1594 HydroNode::Chain {
1595 first: Box::new(self.ir_node.into_inner()),
1596 second: Box::new(other.ir_node.into_inner()),
1597 metadata: self.location.new_node_metadata::<T>(),
1598 },
1599 )
1600 }
1601
1602 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
1603 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
1604 /// because this is compiled into a nested loop.
1605 pub fn cross_product_nested_loop<T2, O2>(
1606 self,
1607 other: Stream<T2, L, Bounded, O2, R>,
1608 ) -> Stream<(T, T2), L, Bounded, O::Min, R>
1609 where
1610 T: Clone,
1611 T2: Clone,
1612 O: MinOrder<O2>,
1613 {
1614 check_matching_location(&self.location, &other.location);
1615
1616 Stream::new(
1617 self.location.clone(),
1618 HydroNode::CrossProduct {
1619 left: Box::new(self.ir_node.into_inner()),
1620 right: Box::new(other.ir_node.into_inner()),
1621 metadata: self.location.new_node_metadata::<(T, T2)>(),
1622 },
1623 )
1624 }
1625}
1626
1627impl<'a, K, V1, L, B, O, R> Stream<(K, V1), L, B, O, R>
1628where
1629 L: Location<'a>,
1630{
1631 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
1632 /// by equi-joining the two streams on the key attribute `K`.
1633 ///
1634 /// # Example
1635 /// ```rust
1636 /// # use hydro_lang::*;
1637 /// # use std::collections::HashSet;
1638 /// # use futures::StreamExt;
1639 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1640 /// let tick = process.tick();
1641 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
1642 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
1643 /// stream1.join(stream2)
1644 /// # }, |mut stream| async move {
1645 /// // (1, ('a', 'x')), (2, ('b', 'y'))
1646 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
1647 /// # stream.map(|i| assert!(expected.contains(&i)));
1648 /// # }));
1649 pub fn join<V2, O2>(
1650 self,
1651 n: Stream<(K, V2), L, B, O2, R>,
1652 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, R>
1653 where
1654 K: Eq + Hash,
1655 {
1656 check_matching_location(&self.location, &n.location);
1657
1658 Stream::new(
1659 self.location.clone(),
1660 HydroNode::Join {
1661 left: Box::new(self.ir_node.into_inner()),
1662 right: Box::new(n.ir_node.into_inner()),
1663 metadata: self.location.new_node_metadata::<(K, (V1, V2))>(),
1664 },
1665 )
1666 }
1667
1668 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
1669 /// computes the anti-join of the items in the input -- i.e. returns
1670 /// unique items in the first input that do not have a matching key
1671 /// in the second input.
1672 ///
1673 /// # Example
1674 /// ```rust
1675 /// # use hydro_lang::*;
1676 /// # use futures::StreamExt;
1677 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1678 /// let tick = process.tick();
1679 /// let stream = process
1680 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1681 /// .batch(&tick, nondet!(/** test */));
1682 /// let batch = process
1683 /// .source_iter(q!(vec![1, 2]))
1684 /// .batch(&tick, nondet!(/** test */));
1685 /// stream.anti_join(batch).all_ticks()
1686 /// # }, |mut stream| async move {
1687 /// # for w in vec![(3, 'c'), (4, 'd')] {
1688 /// # assert_eq!(stream.next().await.unwrap(), w);
1689 /// # }
1690 /// # }));
1691 pub fn anti_join<O2, R2>(self, n: Stream<K, L, Bounded, O2, R2>) -> Stream<(K, V1), L, B, O, R>
1692 where
1693 K: Eq + Hash,
1694 {
1695 check_matching_location(&self.location, &n.location);
1696
1697 Stream::new(
1698 self.location.clone(),
1699 HydroNode::AntiJoin {
1700 pos: Box::new(self.ir_node.into_inner()),
1701 neg: Box::new(n.ir_node.into_inner()),
1702 metadata: self.location.new_node_metadata::<(K, V1)>(),
1703 },
1704 )
1705 }
1706}
1707
1708impl<'a, K, V, L: Location<'a>, B, O, R> Stream<(K, V), L, B, O, R> {
1709 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
1710 KeyedStream {
1711 underlying: self.weakest_ordering(),
1712 _phantom_order: Default::default(),
1713 }
1714 }
1715}
1716
1717impl<'a, K, V, L, B> Stream<(K, V), L, B, TotalOrder, ExactlyOnce>
1718where
1719 K: Eq + Hash,
1720 L: Location<'a>,
1721{
1722 /// A special case of [`Stream::scan`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1723 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1724 /// in the second element are transformed via the `f` combinator.
1725 ///
1726 /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
1727 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1728 /// early by returning `None`.
1729 ///
1730 /// The function takes a mutable reference to the accumulator and the current element, and returns
1731 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1732 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1733 ///
1734 /// # Example
1735 /// ```rust
1736 /// # use hydro_lang::*;
1737 /// # use futures::StreamExt;
1738 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1739 /// process
1740 /// .source_iter(q!(vec![(0, 1), (0, 2), (1, 3), (1, 4)]))
1741 /// .scan_keyed(
1742 /// q!(|| 0),
1743 /// q!(|acc, x| {
1744 /// *acc += x;
1745 /// Some(*acc)
1746 /// }),
1747 /// )
1748 /// # }, |mut stream| async move {
1749 /// // Output: (0, 1), (0, 3), (1, 3), (1, 7)
1750 /// # for w in vec![(0, 1), (0, 3), (1, 3), (1, 7)] {
1751 /// # assert_eq!(stream.next().await.unwrap(), w);
1752 /// # }
1753 /// # }));
1754 /// ```
1755 pub fn scan_keyed<A, U, I, F>(
1756 self,
1757 init: impl IntoQuotedMut<'a, I, L> + Copy,
1758 f: impl IntoQuotedMut<'a, F, L> + Copy,
1759 ) -> Stream<(K, U), L, B, TotalOrder, ExactlyOnce>
1760 where
1761 K: Clone,
1762 I: Fn() -> A + 'a,
1763 F: Fn(&mut A, V) -> Option<U> + 'a,
1764 {
1765 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1766 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1767 self.scan(
1768 q!(|| HashMap::new()),
1769 q!(move |acc, (k, v)| {
1770 let existing_state = acc.entry(k.clone()).or_insert_with(&init);
1771 if let Some(out) = f(existing_state, v) {
1772 Some(Some((k, out)))
1773 } else {
1774 acc.remove(&k);
1775 Some(None)
1776 }
1777 }),
1778 )
1779 .flatten_ordered()
1780 }
1781
1782 /// Like [`Stream::fold_keyed`], in the spirit of SQL's GROUP BY and aggregation constructs. But the aggregation
1783 /// function returns a boolean, which when true indicates that the aggregated result is complete and can be
1784 /// released to downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input stream
1785 /// is [`Unbounded`], the outputs of the fold can be processed like normal stream elements.
1786 ///
1787 /// # Example
1788 /// ```rust
1789 /// # use hydro_lang::*;
1790 /// # use futures::StreamExt;
1791 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1792 /// process
1793 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1794 /// .fold_keyed_early_stop(
1795 /// q!(|| 0),
1796 /// q!(|acc, x| {
1797 /// *acc += x;
1798 /// x % 2 == 0
1799 /// }),
1800 /// )
1801 /// # }, |mut stream| async move {
1802 /// // Output: (0, 2), (1, 9)
1803 /// # for w in vec![(0, 2), (1, 9)] {
1804 /// # assert_eq!(stream.next().await.unwrap(), w);
1805 /// # }
1806 /// # }));
1807 /// ```
1808 pub fn fold_keyed_early_stop<A, I, F>(
1809 self,
1810 init: impl IntoQuotedMut<'a, I, L> + Copy,
1811 f: impl IntoQuotedMut<'a, F, L> + Copy,
1812 ) -> Stream<(K, A), L, B, TotalOrder, ExactlyOnce>
1813 where
1814 K: Clone,
1815 I: Fn() -> A + 'a,
1816 F: Fn(&mut A, V) -> bool + 'a,
1817 {
1818 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1819 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1820 self.scan(
1821 q!(|| HashMap::new()),
1822 q!(move |acc, (k, v)| {
1823 let existing_state = acc.entry(k.clone()).or_insert_with(&init);
1824 if f(existing_state, v) {
1825 let out = acc.remove(&k).unwrap();
1826 Some(Some((k, out)))
1827 } else {
1828 Some(None)
1829 }
1830 }),
1831 )
1832 .flatten_ordered()
1833 }
1834}
1835
1836impl<'a, K, V, L> Stream<(K, V), Tick<L>, Bounded, TotalOrder, ExactlyOnce>
1837where
1838 K: Eq + Hash,
1839 L: Location<'a>,
1840{
1841 #[deprecated = "use .into_keyed().fold(...) instead"]
1842 /// A special case of [`Stream::fold`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1843 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1844 /// in the second element are accumulated via the `comb` closure.
1845 ///
1846 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1847 /// to depend on the order of elements in the stream.
1848 ///
1849 /// If the input and output value types are the same and do not require initialization then use
1850 /// [`Stream::reduce_keyed`].
1851 ///
1852 /// # Example
1853 /// ```rust
1854 /// # use hydro_lang::*;
1855 /// # use futures::StreamExt;
1856 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1857 /// let tick = process.tick();
1858 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1859 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1860 /// batch
1861 /// .fold_keyed(q!(|| 0), q!(|acc, x| *acc += x))
1862 /// .all_ticks()
1863 /// # }, |mut stream| async move {
1864 /// // (1, 5), (2, 7)
1865 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1866 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1867 /// # }));
1868 /// ```
1869 pub fn fold_keyed<A, I, F>(
1870 self,
1871 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1872 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1873 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1874 where
1875 I: Fn() -> A + 'a,
1876 F: Fn(&mut A, V) + 'a,
1877 {
1878 self.into_keyed().fold(init, comb).entries()
1879 }
1880
1881 #[deprecated = "use .into_keyed().reduce(...) instead"]
1882 /// A special case of [`Stream::reduce`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
1883 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
1884 /// in the second element are accumulated via the `comb` closure.
1885 ///
1886 /// The input stream must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
1887 /// to depend on the order of elements in the stream.
1888 ///
1889 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed`].
1890 ///
1891 /// # Example
1892 /// ```rust
1893 /// # use hydro_lang::*;
1894 /// # use futures::StreamExt;
1895 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1896 /// let tick = process.tick();
1897 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1898 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1899 /// batch.reduce_keyed(q!(|acc, x| *acc += x)).all_ticks()
1900 /// # }, |mut stream| async move {
1901 /// // (1, 5), (2, 7)
1902 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1903 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1904 /// # }));
1905 /// ```
1906 pub fn reduce_keyed<F>(
1907 self,
1908 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1909 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1910 where
1911 F: Fn(&mut V, V) + 'a,
1912 {
1913 let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();
1914
1915 Stream::new(
1916 self.location.clone(),
1917 HydroNode::ReduceKeyed {
1918 f,
1919 input: Box::new(self.ir_node.into_inner()),
1920 metadata: self.location.new_node_metadata::<(K, V)>(),
1921 },
1922 )
1923 }
1924}
1925
1926impl<'a, K, V, L, O, R> Stream<(K, V), Tick<L>, Bounded, O, R>
1927where
1928 K: Eq + Hash,
1929 L: Location<'a>,
1930{
1931 #[deprecated = "use .into_keyed().fold_commutative_idempotent(...) instead"]
1932 /// A special case of [`Stream::fold_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
1933 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
1934 /// in the second element are accumulated via the `comb` closure.
1935 ///
1936 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1937 /// as there may be non-deterministic duplicates.
1938 ///
1939 /// If the input and output value types are the same and do not require initialization then use
1940 /// [`Stream::reduce_keyed_commutative_idempotent`].
1941 ///
1942 /// # Example
1943 /// ```rust
1944 /// # use hydro_lang::*;
1945 /// # use futures::StreamExt;
1946 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1947 /// let tick = process.tick();
1948 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
1949 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1950 /// batch
1951 /// .fold_keyed_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1952 /// .all_ticks()
1953 /// # }, |mut stream| async move {
1954 /// // (1, false), (2, true)
1955 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1956 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1957 /// # }));
1958 /// ```
1959 pub fn fold_keyed_commutative_idempotent<A, I, F>(
1960 self,
1961 init: impl IntoQuotedMut<'a, I, Tick<L>>,
1962 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
1963 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
1964 where
1965 I: Fn() -> A + 'a,
1966 F: Fn(&mut A, V) + 'a,
1967 {
1968 self.into_keyed()
1969 .fold_commutative_idempotent(init, comb)
1970 .entries()
1971 }
1972
1973 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
1974 /// # Example
1975 /// ```rust
1976 /// # use hydro_lang::*;
1977 /// # use futures::StreamExt;
1978 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
1979 /// let tick = process.tick();
1980 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
1981 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1982 /// batch.keys().all_ticks()
1983 /// # }, |mut stream| async move {
1984 /// // 1, 2
1985 /// # assert_eq!(stream.next().await.unwrap(), 1);
1986 /// # assert_eq!(stream.next().await.unwrap(), 2);
1987 /// # }));
1988 /// ```
1989 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
1990 self.into_keyed()
1991 .fold_commutative_idempotent(q!(|| ()), q!(|_, _| {}))
1992 .keys()
1993 }
1994
1995 #[deprecated = "use .into_keyed().reduce_commutative_idempotent(...) instead"]
1996 /// A special case of [`Stream::reduce_commutative_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
1997 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
1998 /// in the second element are accumulated via the `comb` closure.
1999 ///
2000 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
2001 /// as there may be non-deterministic duplicates.
2002 ///
2003 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative_idempotent`].
2004 ///
2005 /// # Example
2006 /// ```rust
2007 /// # use hydro_lang::*;
2008 /// # use futures::StreamExt;
2009 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2010 /// let tick = process.tick();
2011 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2012 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2013 /// batch
2014 /// .reduce_keyed_commutative_idempotent(q!(|acc, x| *acc |= x))
2015 /// .all_ticks()
2016 /// # }, |mut stream| async move {
2017 /// // (1, false), (2, true)
2018 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2019 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2020 /// # }));
2021 /// ```
2022 pub fn reduce_keyed_commutative_idempotent<F>(
2023 self,
2024 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2025 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2026 where
2027 F: Fn(&mut V, V) + 'a,
2028 {
2029 self.into_keyed()
2030 .reduce_commutative_idempotent(comb)
2031 .entries()
2032 }
2033}
2034
2035impl<'a, K, V, L, O> Stream<(K, V), Tick<L>, Bounded, O, ExactlyOnce>
2036where
2037 K: Eq + Hash,
2038 L: Location<'a>,
2039{
2040 #[deprecated = "use .into_keyed().fold_commutative(...) instead"]
2041 /// A special case of [`Stream::fold_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2042 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2043 /// in the second element are accumulated via the `comb` closure.
2044 ///
2045 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2046 ///
2047 /// If the input and output value types are the same and do not require initialization then use
2048 /// [`Stream::reduce_keyed_commutative`].
2049 ///
2050 /// # Example
2051 /// ```rust
2052 /// # use hydro_lang::*;
2053 /// # use futures::StreamExt;
2054 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2055 /// let tick = process.tick();
2056 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2057 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2058 /// batch
2059 /// .fold_keyed_commutative(q!(|| 0), q!(|acc, x| *acc += x))
2060 /// .all_ticks()
2061 /// # }, |mut stream| async move {
2062 /// // (1, 5), (2, 7)
2063 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2064 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2065 /// # }));
2066 /// ```
2067 pub fn fold_keyed_commutative<A, I, F>(
2068 self,
2069 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2070 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2071 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2072 where
2073 I: Fn() -> A + 'a,
2074 F: Fn(&mut A, V) + 'a,
2075 {
2076 self.into_keyed().fold_commutative(init, comb).entries()
2077 }
2078
2079 #[deprecated = "use .into_keyed().reduce_commutative(...) instead"]
2080 /// A special case of [`Stream::reduce_commutative`], in the spirit of SQL's GROUP BY and aggregation constructs. The input
2081 /// tuples are partitioned into groups by the first element ("keys"), and for each group the values
2082 /// in the second element are accumulated via the `comb` closure.
2083 ///
2084 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
2085 ///
2086 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_commutative`].
2087 ///
2088 /// # Example
2089 /// ```rust
2090 /// # use hydro_lang::*;
2091 /// # use futures::StreamExt;
2092 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2093 /// let tick = process.tick();
2094 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2095 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2096 /// batch
2097 /// .reduce_keyed_commutative(q!(|acc, x| *acc += x))
2098 /// .all_ticks()
2099 /// # }, |mut stream| async move {
2100 /// // (1, 5), (2, 7)
2101 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
2102 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
2103 /// # }));
2104 /// ```
2105 pub fn reduce_keyed_commutative<F>(
2106 self,
2107 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2108 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2109 where
2110 F: Fn(&mut V, V) + 'a,
2111 {
2112 self.into_keyed().reduce_commutative(comb).entries()
2113 }
2114}
2115
2116impl<'a, K, V, L, R> Stream<(K, V), Tick<L>, Bounded, TotalOrder, R>
2117where
2118 K: Eq + Hash,
2119 L: Location<'a>,
2120{
2121 #[deprecated = "use .into_keyed().fold_idempotent(...) instead"]
2122 /// A special case of [`Stream::fold_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2123 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2124 /// in the second element are accumulated via the `comb` closure.
2125 ///
2126 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
2127 ///
2128 /// If the input and output value types are the same and do not require initialization then use
2129 /// [`Stream::reduce_keyed_idempotent`].
2130 ///
2131 /// # Example
2132 /// ```rust
2133 /// # use hydro_lang::*;
2134 /// # use futures::StreamExt;
2135 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2136 /// let tick = process.tick();
2137 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2138 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2139 /// batch
2140 /// .fold_keyed_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
2141 /// .all_ticks()
2142 /// # }, |mut stream| async move {
2143 /// // (1, false), (2, true)
2144 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2145 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2146 /// # }));
2147 /// ```
2148 pub fn fold_keyed_idempotent<A, I, F>(
2149 self,
2150 init: impl IntoQuotedMut<'a, I, Tick<L>>,
2151 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2152 ) -> Stream<(K, A), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2153 where
2154 I: Fn() -> A + 'a,
2155 F: Fn(&mut A, V) + 'a,
2156 {
2157 self.into_keyed().fold_idempotent(init, comb).entries()
2158 }
2159
2160 #[deprecated = "use .into_keyed().reduce_idempotent(...) instead"]
2161 /// A special case of [`Stream::reduce_idempotent`], in the spirit of SQL's GROUP BY and aggregation constructs.
2162 /// The input tuples are partitioned into groups by the first element ("keys"), and for each group the values
2163 /// in the second element are accumulated via the `comb` closure.
2164 ///
2165 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
2166 ///
2167 /// If you need the accumulated value to have a different type than the input, use [`Stream::fold_keyed_idempotent`].
2168 ///
2169 /// # Example
2170 /// ```rust
2171 /// # use hydro_lang::*;
2172 /// # use futures::StreamExt;
2173 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2174 /// let tick = process.tick();
2175 /// let numbers = process.source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]));
2176 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2177 /// batch
2178 /// .reduce_keyed_idempotent(q!(|acc, x| *acc |= x))
2179 /// .all_ticks()
2180 /// # }, |mut stream| async move {
2181 /// // (1, false), (2, true)
2182 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
2183 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2184 /// # }));
2185 /// ```
2186 pub fn reduce_keyed_idempotent<F>(
2187 self,
2188 comb: impl IntoQuotedMut<'a, F, Tick<L>>,
2189 ) -> Stream<(K, V), Tick<L>, Bounded, NoOrder, ExactlyOnce>
2190 where
2191 F: Fn(&mut V, V) + 'a,
2192 {
2193 self.into_keyed().reduce_idempotent(comb).entries()
2194 }
2195}
2196
2197impl<'a, T, L, B, O, R> Stream<T, Atomic<L>, B, O, R>
2198where
2199 L: Location<'a> + NoTick,
2200{
2201 /// Returns a stream corresponding to the latest batch of elements being atomically
2202 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2203 /// the order of the input.
2204 ///
2205 /// # Non-Determinism
2206 /// The batch boundaries are non-deterministic and may change across executions.
2207 pub fn batch(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2208 Stream::new(
2209 self.location.clone().tick,
2210 HydroNode::Unpersist {
2211 inner: Box::new(self.ir_node.into_inner()),
2212 metadata: self.location.new_node_metadata::<T>(),
2213 },
2214 )
2215 }
2216
2217 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2218 Stream::new(self.location.tick.l, self.ir_node.into_inner())
2219 }
2220
2221 pub fn atomic_source(&self) -> Tick<L> {
2222 self.location.tick.clone()
2223 }
2224}
2225
2226impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
2227where
2228 L: Location<'a> + NoTick + NoAtomic,
2229{
2230 pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
2231 Stream::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
2232 }
2233
2234 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2235 /// Future outputs are produced as available, regardless of input arrival order.
2236 ///
2237 /// # Example
2238 /// ```rust
2239 /// # use std::collections::HashSet;
2240 /// # use futures::StreamExt;
2241 /// # use hydro_lang::*;
2242 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2243 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2244 /// .map(q!(|x| async move {
2245 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2246 /// x
2247 /// }))
2248 /// .resolve_futures()
2249 /// # },
2250 /// # |mut stream| async move {
2251 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2252 /// # let mut output = HashSet::new();
2253 /// # for _ in 1..10 {
2254 /// # output.insert(stream.next().await.unwrap());
2255 /// # }
2256 /// # assert_eq!(
2257 /// # output,
2258 /// # HashSet::<i32>::from_iter(1..10)
2259 /// # );
2260 /// # },
2261 /// # ));
2262 pub fn resolve_futures<T2>(self) -> Stream<T2, L, B, NoOrder, R>
2263 where
2264 T: Future<Output = T2>,
2265 {
2266 Stream::new(
2267 self.location.clone(),
2268 HydroNode::ResolveFutures {
2269 input: Box::new(self.ir_node.into_inner()),
2270 metadata: self.location.new_node_metadata::<T2>(),
2271 },
2272 )
2273 }
2274
2275 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2276 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2277 /// the order of the input.
2278 ///
2279 /// # Non-Determinism
2280 /// The batch boundaries are non-deterministic and may change across executions.
2281 pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2282 self.atomic(tick).batch(nondet)
2283 }
2284
2285 /// Given a time interval, returns a stream corresponding to samples taken from the
2286 /// stream roughly at that interval. The output will have elements in the same order
2287 /// as the input, but with arbitrary elements skipped between samples. There is also
2288 /// no guarantee on the exact timing of the samples.
2289 ///
2290 /// # Non-Determinism
2291 /// The output stream is non-deterministic in which elements are sampled, since this
2292 /// is controlled by a clock.
2293 pub fn sample_every(
2294 self,
2295 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
2296 nondet: NonDet,
2297 ) -> Stream<T, L, Unbounded, O, AtLeastOnce> {
2298 let samples = self.location.source_interval(interval, nondet);
2299
2300 let tick = self.location.tick();
2301 self.batch(&tick, nondet)
2302 .continue_if(samples.batch(&tick, nondet).first())
2303 .all_ticks()
2304 .weakest_retries()
2305 }
2306
2307 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
2308 /// stream has not emitted a value since that duration.
2309 ///
2310 /// # Non-Determinism
2311 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2312 /// samples take place, timeouts may be non-deterministically generated or missed,
2313 /// and the notification of the timeout may be delayed as well. There is also no
2314 /// guarantee on how long the [`Optional`] will have a value after the timeout is
2315 /// detected based on when the next sample is taken.
2316 pub fn timeout(
2317 self,
2318 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
2319 nondet: NonDet,
2320 ) -> Optional<(), L, Unbounded> {
2321 let tick = self.location.tick();
2322
2323 let latest_received = self.assume_retries(nondet).fold_commutative(
2324 q!(|| None),
2325 q!(|latest, _| {
2326 *latest = Some(Instant::now());
2327 }),
2328 );
2329
2330 latest_received
2331 .snapshot(&tick, nondet)
2332 .filter_map(q!(move |latest_received| {
2333 if let Some(latest_received) = latest_received {
2334 if Instant::now().duration_since(latest_received) > duration {
2335 Some(())
2336 } else {
2337 None
2338 }
2339 } else {
2340 Some(())
2341 }
2342 }))
2343 .latest()
2344 }
2345}
2346
2347impl<'a, F, T, L, B, O, R> Stream<F, L, B, O, R>
2348where
2349 L: Location<'a> + NoTick + NoAtomic,
2350 F: Future<Output = T>,
2351{
2352 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2353 /// Future outputs are produced in the same order as the input stream.
2354 ///
2355 /// # Example
2356 /// ```rust
2357 /// # use std::collections::HashSet;
2358 /// # use futures::StreamExt;
2359 /// # use hydro_lang::*;
2360 /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
2361 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2362 /// .map(q!(|x| async move {
2363 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2364 /// x
2365 /// }))
2366 /// .resolve_futures_ordered()
2367 /// # },
2368 /// # |mut stream| async move {
2369 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2370 /// # let mut output = Vec::new();
2371 /// # for _ in 1..10 {
2372 /// # output.push(stream.next().await.unwrap());
2373 /// # }
2374 /// # assert_eq!(
2375 /// # output,
2376 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2377 /// # );
2378 /// # },
2379 /// # ));
2380 pub fn resolve_futures_ordered(self) -> Stream<T, L, B, O, R> {
2381 Stream::new(
2382 self.location.clone(),
2383 HydroNode::ResolveFuturesOrdered {
2384 input: Box::new(self.ir_node.into_inner()),
2385 metadata: self.location.new_node_metadata::<T>(),
2386 },
2387 )
2388 }
2389}
2390
2391impl<'a, T, L, B, O, R> Stream<T, L, B, O, R>
2392where
2393 L: Location<'a> + NoTick,
2394{
2395 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) {
2396 let f = f.splice_fn1_ctx(&self.location).into();
2397 let metadata = self.location.new_node_metadata::<T>();
2398 self.location
2399 .flow_state()
2400 .borrow_mut()
2401 .leaves
2402 .as_mut()
2403 .expect(FLOW_USED_MESSAGE)
2404 .push(HydroLeaf::ForEach {
2405 input: Box::new(HydroNode::Unpersist {
2406 inner: Box::new(self.ir_node.into_inner()),
2407 metadata: metadata.clone(),
2408 }),
2409 f,
2410 metadata,
2411 });
2412 }
2413
2414 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
2415 where
2416 S: 'a + futures::Sink<T> + Unpin,
2417 {
2418 self.location
2419 .flow_state()
2420 .borrow_mut()
2421 .leaves
2422 .as_mut()
2423 .expect(FLOW_USED_MESSAGE)
2424 .push(HydroLeaf::DestSink {
2425 sink: sink.splice_typed_ctx(&self.location).into(),
2426 input: Box::new(self.ir_node.into_inner()),
2427 metadata: self.location.new_node_metadata::<T>(),
2428 });
2429 }
2430}
2431
2432impl<'a, T, L, O, R> Stream<T, Tick<L>, Bounded, O, R>
2433where
2434 L: Location<'a>,
2435{
2436 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2437 Stream::new(
2438 self.location.outer().clone(),
2439 HydroNode::Persist {
2440 inner: Box::new(self.ir_node.into_inner()),
2441 metadata: self.location.new_node_metadata::<T>(),
2442 },
2443 )
2444 }
2445
2446 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2447 Stream::new(
2448 Atomic {
2449 tick: self.location.clone(),
2450 },
2451 HydroNode::Persist {
2452 inner: Box::new(self.ir_node.into_inner()),
2453 metadata: self.location.new_node_metadata::<T>(),
2454 },
2455 )
2456 }
2457
2458 pub fn persist(self) -> Stream<T, Tick<L>, Bounded, O, R>
2459 where
2460 T: Clone,
2461 {
2462 Stream::new(
2463 self.location.clone(),
2464 HydroNode::Persist {
2465 inner: Box::new(self.ir_node.into_inner()),
2466 metadata: self.location.new_node_metadata::<T>(),
2467 },
2468 )
2469 }
2470
2471 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2472 Stream::new(
2473 self.location.clone(),
2474 HydroNode::DeferTick {
2475 input: Box::new(self.ir_node.into_inner()),
2476 metadata: self.location.new_node_metadata::<T>(),
2477 },
2478 )
2479 }
2480
2481 pub fn delta(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2482 Stream::new(
2483 self.location.clone(),
2484 HydroNode::Delta {
2485 inner: Box::new(self.ir_node.into_inner()),
2486 metadata: self.location.new_node_metadata::<T>(),
2487 },
2488 )
2489 }
2490}
2491
2492#[cfg(test)]
2493mod tests {
2494 use futures::StreamExt;
2495 use hydro_deploy::Deployment;
2496 use serde::{Deserialize, Serialize};
2497 use stageleft::q;
2498
2499 use crate::FlowBuilder;
2500 use crate::location::Location;
2501
2502 struct P1 {}
2503 struct P2 {}
2504
2505 #[derive(Serialize, Deserialize, Debug)]
2506 struct SendOverNetwork {
2507 n: u32,
2508 }
2509
2510 #[tokio::test]
2511 async fn first_ten_distributed() {
2512 let mut deployment = Deployment::new();
2513
2514 let flow = FlowBuilder::new();
2515 let first_node = flow.process::<P1>();
2516 let second_node = flow.process::<P2>();
2517 let external = flow.external::<P2>();
2518
2519 let numbers = first_node.source_iter(q!(0..10));
2520 let out_port = numbers
2521 .map(q!(|n| SendOverNetwork { n }))
2522 .send_bincode(&second_node)
2523 .send_bincode_external(&external);
2524
2525 let nodes = flow
2526 .with_process(&first_node, deployment.Localhost())
2527 .with_process(&second_node, deployment.Localhost())
2528 .with_external(&external, deployment.Localhost())
2529 .deploy(&mut deployment);
2530
2531 deployment.deploy().await.unwrap();
2532
2533 let mut external_out = nodes.connect_source_bincode(out_port).await;
2534
2535 deployment.start().await.unwrap();
2536
2537 for i in 0..10 {
2538 assert_eq!(external_out.next().await.unwrap().n, i);
2539 }
2540 }
2541
2542 #[tokio::test]
2543 async fn first_cardinality() {
2544 let mut deployment = Deployment::new();
2545
2546 let flow = FlowBuilder::new();
2547 let node = flow.process::<()>();
2548 let external = flow.external::<()>();
2549
2550 let node_tick = node.tick();
2551 let count = node_tick
2552 .singleton(q!([1, 2, 3]))
2553 .into_stream()
2554 .flatten_ordered()
2555 .first()
2556 .into_stream()
2557 .count()
2558 .all_ticks()
2559 .send_bincode_external(&external);
2560
2561 let nodes = flow
2562 .with_process(&node, deployment.Localhost())
2563 .with_external(&external, deployment.Localhost())
2564 .deploy(&mut deployment);
2565
2566 deployment.deploy().await.unwrap();
2567
2568 let mut external_out = nodes.connect_source_bincode(count).await;
2569
2570 deployment.start().await.unwrap();
2571
2572 assert_eq!(external_out.next().await.unwrap(), 1);
2573 }
2574}