hydro_lang/
optional.rs

1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7use syn::parse_quote;
8
9use crate::builder::FLOW_USED_MESSAGE;
10use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker};
11use crate::ir::{HydroLeaf, HydroNode, HydroSource, TeeNode};
12use crate::location::tick::{Atomic, NoAtomic};
13use crate::location::{LocationId, NoTick, check_matching_location};
14use crate::singleton::ZipResult;
15use crate::stream::{AtLeastOnce, ExactlyOnce, NoOrder};
16use crate::unsafety::NonDet;
17use crate::{Bounded, Location, Singleton, Stream, Tick, TotalOrder, Unbounded};
18
19pub struct Optional<Type, Loc, Bound> {
20    pub(crate) location: Loc,
21    pub(crate) ir_node: RefCell<HydroNode>,
22
23    _phantom: PhantomData<(Type, Loc, Bound)>,
24}
25
26impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
27where
28    L: Location<'a>,
29{
30    fn defer_tick(self) -> Self {
31        Optional::defer_tick(self)
32    }
33}
34
35impl<'a, T, L> CycleCollection<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
36where
37    L: Location<'a>,
38{
39    type Location = Tick<L>;
40
41    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
42        Optional::new(
43            location.clone(),
44            HydroNode::CycleSource {
45                ident,
46                metadata: location.new_node_metadata::<T>(),
47            },
48        )
49    }
50}
51
52impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Optional<T, Tick<L>, Bounded>
53where
54    L: Location<'a>,
55{
56    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
57        assert_eq!(
58            self.location.id(),
59            expected_location,
60            "locations do not match"
61        );
62        self.location
63            .flow_state()
64            .borrow_mut()
65            .leaves
66            .as_mut()
67            .expect(FLOW_USED_MESSAGE)
68            .push(HydroLeaf::CycleSink {
69                ident,
70                input: Box::new(self.ir_node.into_inner()),
71                metadata: self.location.new_node_metadata::<T>(),
72            });
73    }
74}
75
76impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
77where
78    L: Location<'a>,
79{
80    type Location = Tick<L>;
81
82    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
83        Optional::new(
84            location.clone(),
85            HydroNode::CycleSource {
86                ident,
87                metadata: location.new_node_metadata::<T>(),
88            },
89        )
90    }
91}
92
93impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Optional<T, Tick<L>, Bounded>
94where
95    L: Location<'a>,
96{
97    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
98        assert_eq!(
99            self.location.id(),
100            expected_location,
101            "locations do not match"
102        );
103        self.location
104            .flow_state()
105            .borrow_mut()
106            .leaves
107            .as_mut()
108            .expect(FLOW_USED_MESSAGE)
109            .push(HydroLeaf::CycleSink {
110                ident,
111                input: Box::new(self.ir_node.into_inner()),
112                metadata: self.location.new_node_metadata::<T>(),
113            });
114    }
115}
116
117impl<'a, T, L, B> CycleCollection<'a, ForwardRefMarker> for Optional<T, L, B>
118where
119    L: Location<'a> + NoTick,
120{
121    type Location = L;
122
123    fn create_source(ident: syn::Ident, location: L) -> Self {
124        Optional::new(
125            location.clone(),
126            HydroNode::Persist {
127                inner: Box::new(HydroNode::CycleSource {
128                    ident,
129                    metadata: location.new_node_metadata::<T>(),
130                }),
131                metadata: location.new_node_metadata::<T>(),
132            },
133        )
134    }
135}
136
137impl<'a, T, L, B> CycleComplete<'a, ForwardRefMarker> for Optional<T, L, B>
138where
139    L: Location<'a> + NoTick,
140{
141    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
142        assert_eq!(
143            self.location.id(),
144            expected_location,
145            "locations do not match"
146        );
147        let metadata = self.location.new_node_metadata::<T>();
148        self.location
149            .flow_state()
150            .borrow_mut()
151            .leaves
152            .as_mut()
153            .expect(FLOW_USED_MESSAGE)
154            .push(HydroLeaf::CycleSink {
155                ident,
156                input: Box::new(HydroNode::Unpersist {
157                    inner: Box::new(self.ir_node.into_inner()),
158                    metadata: metadata.clone(),
159                }),
160                metadata,
161            });
162    }
163}
164
165impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
166where
167    L: Location<'a>,
168{
169    fn from(singleton: Optional<T, L, Bounded>) -> Self {
170        Optional::new(singleton.location, singleton.ir_node.into_inner())
171    }
172}
173
174impl<'a, T, L, B> From<Singleton<T, L, B>> for Optional<T, L, B>
175where
176    L: Location<'a>,
177{
178    fn from(singleton: Singleton<T, L, B>) -> Self {
179        Optional::some(singleton)
180    }
181}
182
183impl<'a, T, L, B> Clone for Optional<T, L, B>
184where
185    T: Clone,
186    L: Location<'a>,
187{
188    fn clone(&self) -> Self {
189        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
190            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
191            *self.ir_node.borrow_mut() = HydroNode::Tee {
192                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
193                metadata: self.location.new_node_metadata::<T>(),
194            };
195        }
196
197        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
198            Optional {
199                location: self.location.clone(),
200                ir_node: HydroNode::Tee {
201                    inner: TeeNode(inner.0.clone()),
202                    metadata: metadata.clone(),
203                }
204                .into(),
205                _phantom: PhantomData,
206            }
207        } else {
208            unreachable!()
209        }
210    }
211}
212
213impl<'a, T, L, B> Optional<T, L, B>
214where
215    L: Location<'a>,
216{
217    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
218        Optional {
219            location,
220            ir_node: RefCell::new(ir_node),
221            _phantom: PhantomData,
222        }
223    }
224
225    pub fn some(singleton: Singleton<T, L, B>) -> Self {
226        Optional::new(singleton.location, singleton.ir_node.into_inner())
227    }
228
229    /// Transforms the optional value by applying a function `f` to it,
230    /// continuously as the input is updated.
231    ///
232    /// Whenever the optional is empty, the output optional is also empty.
233    ///
234    /// # Example
235    /// ```rust
236    /// # use hydro_lang::*;
237    /// # use futures::StreamExt;
238    /// # tokio_test::block_on(test_util::stream_transform_test(|process| {
239    /// let tick = process.tick();
240    /// let optional = tick.optional_first_tick(q!(1));
241    /// optional.map(q!(|v| v + 1)).all_ticks()
242    /// # }, |mut stream| async move {
243    /// // 2
244    /// # assert_eq!(stream.next().await.unwrap(), 2);
245    /// # }));
246    /// ```
247    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
248    where
249        F: Fn(T) -> U + 'a,
250    {
251        let f = f.splice_fn1_ctx(&self.location).into();
252        Optional::new(
253            self.location.clone(),
254            HydroNode::Map {
255                f,
256                input: Box::new(self.ir_node.into_inner()),
257                metadata: self.location.new_node_metadata::<U>(),
258            },
259        )
260    }
261
262    pub fn flat_map_ordered<U, I, F>(
263        self,
264        f: impl IntoQuotedMut<'a, F, L>,
265    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
266    where
267        I: IntoIterator<Item = U>,
268        F: Fn(T) -> I + 'a,
269    {
270        let f = f.splice_fn1_ctx(&self.location).into();
271        Stream::new(
272            self.location.clone(),
273            HydroNode::FlatMap {
274                f,
275                input: Box::new(self.ir_node.into_inner()),
276                metadata: self.location.new_node_metadata::<U>(),
277            },
278        )
279    }
280
281    pub fn flat_map_unordered<U, I, F>(
282        self,
283        f: impl IntoQuotedMut<'a, F, L>,
284    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
285    where
286        I: IntoIterator<Item = U>,
287        F: Fn(T) -> I + 'a,
288    {
289        let f = f.splice_fn1_ctx(&self.location).into();
290        Stream::new(
291            self.location.clone(),
292            HydroNode::FlatMap {
293                f,
294                input: Box::new(self.ir_node.into_inner()),
295                metadata: self.location.new_node_metadata::<U>(),
296            },
297        )
298    }
299
300    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
301    where
302        T: IntoIterator<Item = U>,
303    {
304        self.flat_map_ordered(q!(|v| v))
305    }
306
307    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
308    where
309        T: IntoIterator<Item = U>,
310    {
311        self.flat_map_unordered(q!(|v| v))
312    }
313
314    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
315    where
316        F: Fn(&T) -> bool + 'a,
317    {
318        let f = f.splice_fn1_borrow_ctx(&self.location).into();
319        Optional::new(
320            self.location.clone(),
321            HydroNode::Filter {
322                f,
323                input: Box::new(self.ir_node.into_inner()),
324                metadata: self.location.new_node_metadata::<T>(),
325            },
326        )
327    }
328
329    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
330    where
331        F: Fn(T) -> Option<U> + 'a,
332    {
333        let f = f.splice_fn1_ctx(&self.location).into();
334        Optional::new(
335            self.location.clone(),
336            HydroNode::FilterMap {
337                f,
338                input: Box::new(self.ir_node.into_inner()),
339                metadata: self.location.new_node_metadata::<U>(),
340            },
341        )
342    }
343
344    pub fn union(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
345        check_matching_location(&self.location, &other.location);
346
347        if L::is_top_level() {
348            Optional::new(
349                self.location.clone(),
350                HydroNode::Persist {
351                    inner: Box::new(HydroNode::Chain {
352                        first: Box::new(HydroNode::Unpersist {
353                            inner: Box::new(self.ir_node.into_inner()),
354                            metadata: self.location.new_node_metadata::<T>(),
355                        }),
356                        second: Box::new(HydroNode::Unpersist {
357                            inner: Box::new(other.ir_node.into_inner()),
358                            metadata: self.location.new_node_metadata::<T>(),
359                        }),
360                        metadata: self.location.new_node_metadata::<T>(),
361                    }),
362                    metadata: self.location.new_node_metadata::<T>(),
363                },
364            )
365        } else {
366            Optional::new(
367                self.location.clone(),
368                HydroNode::Chain {
369                    first: Box::new(self.ir_node.into_inner()),
370                    second: Box::new(other.ir_node.into_inner()),
371                    metadata: self.location.new_node_metadata::<T>(),
372                },
373            )
374        }
375    }
376
377    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
378    where
379        O: Clone,
380    {
381        let other: Optional<O, L, B> = other.into();
382        check_matching_location(&self.location, &other.location);
383
384        if L::is_top_level() {
385            Optional::new(
386                self.location.clone(),
387                HydroNode::Persist {
388                    inner: Box::new(HydroNode::CrossSingleton {
389                        left: Box::new(HydroNode::Unpersist {
390                            inner: Box::new(self.ir_node.into_inner()),
391                            metadata: self.location.new_node_metadata::<T>(),
392                        }),
393                        right: Box::new(HydroNode::Unpersist {
394                            inner: Box::new(other.ir_node.into_inner()),
395                            metadata: self.location.new_node_metadata::<O>(),
396                        }),
397                        metadata: self.location.new_node_metadata::<(T, O)>(),
398                    }),
399                    metadata: self.location.new_node_metadata::<(T, O)>(),
400                },
401            )
402        } else {
403            Optional::new(
404                self.location.clone(),
405                HydroNode::CrossSingleton {
406                    left: Box::new(self.ir_node.into_inner()),
407                    right: Box::new(other.ir_node.into_inner()),
408                    metadata: self.location.new_node_metadata::<(T, O)>(),
409                },
410            )
411        }
412    }
413
414    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
415        check_matching_location(&self.location, &other.location);
416
417        if L::is_top_level() {
418            Singleton::new(
419                self.location.clone(),
420                HydroNode::Persist {
421                    inner: Box::new(HydroNode::Chain {
422                        first: Box::new(HydroNode::Unpersist {
423                            inner: Box::new(self.ir_node.into_inner()),
424                            metadata: self.location.new_node_metadata::<T>(),
425                        }),
426                        second: Box::new(HydroNode::Unpersist {
427                            inner: Box::new(other.ir_node.into_inner()),
428                            metadata: self.location.new_node_metadata::<T>(),
429                        }),
430                        metadata: self.location.new_node_metadata::<T>(),
431                    }),
432                    metadata: self.location.new_node_metadata::<T>(),
433                },
434            )
435        } else {
436            Singleton::new(
437                self.location.clone(),
438                HydroNode::Chain {
439                    first: Box::new(self.ir_node.into_inner()),
440                    second: Box::new(other.ir_node.into_inner()),
441                    metadata: self.location.new_node_metadata::<T>(),
442                },
443            )
444        }
445    }
446
447    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
448    where
449        T: Clone,
450    {
451        let none: syn::Expr = parse_quote!([::std::option::Option::None]);
452        let core_ir = HydroNode::Persist {
453            inner: Box::new(HydroNode::Source {
454                source: HydroSource::Iter(none.into()),
455                metadata: self.location.root().new_node_metadata::<Option<T>>(),
456            }),
457            metadata: self.location.new_node_metadata::<Option<T>>(),
458        };
459
460        let none_singleton = if L::is_top_level() {
461            Singleton::new(
462                self.location.clone(),
463                HydroNode::Persist {
464                    inner: Box::new(core_ir),
465                    metadata: self.location.new_node_metadata::<Option<T>>(),
466                },
467            )
468        } else {
469            Singleton::new(self.location.clone(), core_ir)
470        };
471
472        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
473    }
474
475    /// An operator which allows you to "name" a `HydroNode`.
476    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
477    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
478        {
479            let mut node = self.ir_node.borrow_mut();
480            let metadata = node.metadata_mut();
481            metadata.tag = Some(name.to_string());
482        }
483        self
484    }
485}
486
487impl<'a, T, L> Optional<T, L, Bounded>
488where
489    L: Location<'a>,
490{
491    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
492        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
493    }
494
495    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
496        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
497    }
498
499    pub fn then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded>
500    where
501        Singleton<U, L, Bounded>: ZipResult<
502                'a,
503                Optional<(), L, Bounded>,
504                Location = L,
505                Out = Optional<(U, ()), L, Bounded>,
506            >,
507    {
508        value.continue_if(self)
509    }
510
511    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce> {
512        if L::is_top_level() {
513            panic!("Converting an optional to a stream is not yet supported at the top level");
514        }
515
516        Stream::new(self.location, self.ir_node.into_inner())
517    }
518}
519
520impl<'a, T, L, B> Optional<T, Atomic<L>, B>
521where
522    L: Location<'a> + NoTick,
523{
524    /// Returns an optional value corresponding to the latest snapshot of the optional
525    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
526    /// at least all relevant data that contributed to the snapshot at tick `t`.
527    ///
528    /// # Non-Determinism
529    /// Because this picks a snapshot of a optional whose value is continuously changing,
530    /// the output optional has a non-deterministic value since the snapshot can be at an
531    /// arbitrary point in time.
532    pub fn snapshot(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
533        Optional::new(
534            self.location.clone().tick,
535            HydroNode::Unpersist {
536                inner: Box::new(self.ir_node.into_inner()),
537                metadata: self.location.new_node_metadata::<T>(),
538            },
539        )
540    }
541
542    pub fn end_atomic(self) -> Optional<T, L, B> {
543        Optional::new(self.location.tick.l, self.ir_node.into_inner())
544    }
545}
546
547impl<'a, T, L, B> Optional<T, L, B>
548where
549    L: Location<'a> + NoTick + NoAtomic,
550{
551    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
552        Optional::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
553    }
554
555    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
556    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
557    /// relevant data that contributed to the snapshot at tick `t`.
558    ///
559    /// # Non-Determinism
560    /// Because this picks a snapshot of a optional whose value is continuously changing,
561    /// the output optional has a non-deterministic value since the snapshot can be at an
562    /// arbitrary point in time.
563    pub fn snapshot(self, tick: &Tick<L>, nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
564        self.atomic(tick).snapshot(nondet)
565    }
566
567    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
568    /// with order corresponding to increasing prefixes of data contributing to the optional.
569    ///
570    /// # Non-Determinism
571    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
572    /// to non-deterministic batching and arrival of inputs, the output stream is
573    /// non-deterministic.
574    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
575        let tick = self.location.tick();
576        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
577    }
578
579    /// Given a time interval, returns a stream corresponding to snapshots of the optional
580    /// value taken at various points in time. Because the input optional may be
581    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
582    /// represent the value of the optional given some prefix of the streams leading up to
583    /// it.
584    ///
585    /// # Non-Determinism
586    /// The output stream is non-deterministic in which elements are sampled, since this
587    /// is controlled by a clock.
588    pub fn sample_every(
589        self,
590        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
591        nondet: NonDet,
592    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
593        let samples = self.location.source_interval(interval, nondet);
594        let tick = self.location.tick();
595
596        self.snapshot(&tick, nondet)
597            .continue_if(samples.batch(&tick, nondet).first())
598            .all_ticks()
599            .weakest_retries()
600    }
601}
602
603impl<'a, T, L> Optional<T, Tick<L>, Bounded>
604where
605    L: Location<'a>,
606{
607    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
608        Stream::new(
609            self.location.outer().clone(),
610            HydroNode::Persist {
611                inner: Box::new(self.ir_node.into_inner()),
612                metadata: self.location.new_node_metadata::<T>(),
613            },
614        )
615    }
616
617    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
618        Stream::new(
619            Atomic {
620                tick: self.location.clone(),
621            },
622            HydroNode::Persist {
623                inner: Box::new(self.ir_node.into_inner()),
624                metadata: self.location.new_node_metadata::<T>(),
625            },
626        )
627    }
628
629    pub fn latest(self) -> Optional<T, L, Unbounded> {
630        Optional::new(
631            self.location.outer().clone(),
632            HydroNode::Persist {
633                inner: Box::new(self.ir_node.into_inner()),
634                metadata: self.location.new_node_metadata::<T>(),
635            },
636        )
637    }
638
639    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
640        Optional::new(
641            Atomic {
642                tick: self.location.clone(),
643            },
644            HydroNode::Persist {
645                inner: Box::new(self.ir_node.into_inner()),
646                metadata: self.location.new_node_metadata::<T>(),
647            },
648        )
649    }
650
651    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
652        Optional::new(
653            self.location.clone(),
654            HydroNode::DeferTick {
655                input: Box::new(self.ir_node.into_inner()),
656                metadata: self.location.new_node_metadata::<T>(),
657            },
658        )
659    }
660
661    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
662        Stream::new(
663            self.location.clone(),
664            HydroNode::Persist {
665                inner: Box::new(self.ir_node.into_inner()),
666                metadata: self.location.new_node_metadata::<T>(),
667            },
668        )
669    }
670
671    pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
672        Optional::new(
673            self.location.clone(),
674            HydroNode::Delta {
675                inner: Box::new(self.ir_node.into_inner()),
676                metadata: self.location.new_node_metadata::<T>(),
677            },
678        )
679    }
680}