hydro_lang/
singleton.rs

1use std::cell::RefCell;
2use std::marker::PhantomData;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use stageleft::{IntoQuotedMut, QuotedWithContext, q};
7
8use crate::builder::FLOW_USED_MESSAGE;
9use crate::cycle::{
10    CycleCollection, CycleCollectionWithInitial, CycleComplete, DeferTick, ForwardRefMarker,
11    TickCycleMarker,
12};
13use crate::ir::{HydroLeaf, HydroNode, TeeNode};
14use crate::location::tick::{Atomic, NoAtomic};
15use crate::location::{Location, LocationId, NoTick, Tick, check_matching_location};
16use crate::stream::{AtLeastOnce, ExactlyOnce};
17use crate::unsafety::NonDet;
18use crate::{Bounded, NoOrder, Optional, Stream, TotalOrder, Unbounded};
19
20pub struct Singleton<Type, Loc, Bound> {
21    pub(crate) location: Loc,
22    pub(crate) ir_node: RefCell<HydroNode>,
23
24    _phantom: PhantomData<(Type, Loc, Bound)>,
25}
26
27impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
28where
29    L: Location<'a>,
30{
31    fn from(singleton: Singleton<T, L, Bounded>) -> Self {
32        Singleton::new(singleton.location, singleton.ir_node.into_inner())
33    }
34}
35
36impl<'a, T, L> DeferTick for Singleton<T, Tick<L>, Bounded>
37where
38    L: Location<'a>,
39{
40    fn defer_tick(self) -> Self {
41        Singleton::defer_tick(self)
42    }
43}
44
45impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
46where
47    L: Location<'a>,
48{
49    type Location = Tick<L>;
50
51    fn create_source(ident: syn::Ident, initial: Self, location: Tick<L>) -> Self {
52        Singleton::new(
53            location.clone(),
54            HydroNode::Chain {
55                first: Box::new(HydroNode::CycleSource {
56                    ident,
57                    metadata: location.new_node_metadata::<T>(),
58                }),
59                second: initial
60                    .continue_if(location.optional_first_tick(q!(())))
61                    .ir_node
62                    .into_inner()
63                    .into(),
64                metadata: location.new_node_metadata::<T>(),
65            },
66        )
67    }
68}
69
70impl<'a, T, L> CycleComplete<'a, TickCycleMarker> for Singleton<T, Tick<L>, Bounded>
71where
72    L: Location<'a>,
73{
74    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
75        assert_eq!(
76            self.location.id(),
77            expected_location,
78            "locations do not match"
79        );
80        self.location
81            .flow_state()
82            .borrow_mut()
83            .leaves
84            .as_mut()
85            .expect(FLOW_USED_MESSAGE)
86            .push(HydroLeaf::CycleSink {
87                ident,
88                input: Box::new(self.ir_node.into_inner()),
89                metadata: self.location.new_node_metadata::<T>(),
90            });
91    }
92}
93
94impl<'a, T, L> CycleCollection<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
95where
96    L: Location<'a>,
97{
98    type Location = Tick<L>;
99
100    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
101        Singleton::new(
102            location.clone(),
103            HydroNode::CycleSource {
104                ident,
105                metadata: location.new_node_metadata::<T>(),
106            },
107        )
108    }
109}
110
111impl<'a, T, L> CycleComplete<'a, ForwardRefMarker> for Singleton<T, Tick<L>, Bounded>
112where
113    L: Location<'a>,
114{
115    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
116        assert_eq!(
117            self.location.id(),
118            expected_location,
119            "locations do not match"
120        );
121        self.location
122            .flow_state()
123            .borrow_mut()
124            .leaves
125            .as_mut()
126            .expect(FLOW_USED_MESSAGE)
127            .push(HydroLeaf::CycleSink {
128                ident,
129                input: Box::new(self.ir_node.into_inner()),
130                metadata: self.location.new_node_metadata::<T>(),
131            });
132    }
133}
134
135impl<'a, T, L, B> CycleCollection<'a, ForwardRefMarker> for Singleton<T, L, B>
136where
137    L: Location<'a> + NoTick,
138{
139    type Location = L;
140
141    fn create_source(ident: syn::Ident, location: L) -> Self {
142        Singleton::new(
143            location.clone(),
144            HydroNode::Persist {
145                inner: Box::new(HydroNode::CycleSource {
146                    ident,
147                    metadata: location.new_node_metadata::<T>(),
148                }),
149                metadata: location.new_node_metadata::<T>(),
150            },
151        )
152    }
153}
154
155impl<'a, T, L, B> CycleComplete<'a, ForwardRefMarker> for Singleton<T, L, B>
156where
157    L: Location<'a> + NoTick,
158{
159    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
160        assert_eq!(
161            self.location.id(),
162            expected_location,
163            "locations do not match"
164        );
165        let metadata = self.location.new_node_metadata::<T>();
166        self.location
167            .flow_state()
168            .borrow_mut()
169            .leaves
170            .as_mut()
171            .expect(FLOW_USED_MESSAGE)
172            .push(HydroLeaf::CycleSink {
173                ident,
174                input: Box::new(HydroNode::Unpersist {
175                    inner: Box::new(self.ir_node.into_inner()),
176                    metadata: metadata.clone(),
177                }),
178                metadata,
179            });
180    }
181}
182
183impl<'a, T, L, B> Clone for Singleton<T, L, B>
184where
185    T: Clone,
186    L: Location<'a>,
187{
188    fn clone(&self) -> Self {
189        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
190            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
191            *self.ir_node.borrow_mut() = HydroNode::Tee {
192                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
193                metadata: self.location.new_node_metadata::<T>(),
194            };
195        }
196
197        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
198            Singleton {
199                location: self.location.clone(),
200                ir_node: HydroNode::Tee {
201                    inner: TeeNode(inner.0.clone()),
202                    metadata: metadata.clone(),
203                }
204                .into(),
205                _phantom: PhantomData,
206            }
207        } else {
208            unreachable!()
209        }
210    }
211}
212
213impl<'a, T, L, B> Singleton<T, L, B>
214where
215    L: Location<'a>,
216{
217    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
218        Singleton {
219            location,
220            ir_node: RefCell::new(ir_node),
221            _phantom: PhantomData,
222        }
223    }
224
225    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
226    where
227        F: Fn(T) -> U + 'a,
228    {
229        let f = f.splice_fn1_ctx(&self.location).into();
230        Singleton::new(
231            self.location.clone(),
232            HydroNode::Map {
233                f,
234                input: Box::new(self.ir_node.into_inner()),
235                metadata: self.location.new_node_metadata::<U>(),
236            },
237        )
238    }
239
240    pub fn flat_map_ordered<U, I, F>(
241        self,
242        f: impl IntoQuotedMut<'a, F, L>,
243    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
244    where
245        I: IntoIterator<Item = U>,
246        F: Fn(T) -> I + 'a,
247    {
248        let f = f.splice_fn1_ctx(&self.location).into();
249        Stream::new(
250            self.location.clone(),
251            HydroNode::FlatMap {
252                f,
253                input: Box::new(self.ir_node.into_inner()),
254                metadata: self.location.new_node_metadata::<U>(),
255            },
256        )
257    }
258
259    pub fn flat_map_unordered<U, I, F>(
260        self,
261        f: impl IntoQuotedMut<'a, F, L>,
262    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
263    where
264        I: IntoIterator<Item = U>,
265        F: Fn(T) -> I + 'a,
266    {
267        let f = f.splice_fn1_ctx(&self.location).into();
268        Stream::new(
269            self.location.clone(),
270            HydroNode::FlatMap {
271                f,
272                input: Box::new(self.ir_node.into_inner()),
273                metadata: self.location.new_node_metadata::<U>(),
274            },
275        )
276    }
277
278    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
279    where
280        T: IntoIterator<Item = U>,
281    {
282        self.flat_map_ordered(q!(|x| x))
283    }
284
285    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
286    where
287        T: IntoIterator<Item = U>,
288    {
289        self.flat_map_unordered(q!(|x| x))
290    }
291
292    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
293    where
294        F: Fn(&T) -> bool + 'a,
295    {
296        let f = f.splice_fn1_borrow_ctx(&self.location).into();
297        Optional::new(
298            self.location.clone(),
299            HydroNode::Filter {
300                f,
301                input: Box::new(self.ir_node.into_inner()),
302                metadata: self.location.new_node_metadata::<T>(),
303            },
304        )
305    }
306
307    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
308    where
309        F: Fn(T) -> Option<U> + 'a,
310    {
311        let f = f.splice_fn1_ctx(&self.location).into();
312        Optional::new(
313            self.location.clone(),
314            HydroNode::FilterMap {
315                f,
316                input: Box::new(self.ir_node.into_inner()),
317                metadata: self.location.new_node_metadata::<U>(),
318            },
319        )
320    }
321
322    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
323    where
324        Self: ZipResult<'a, O, Location = L>,
325    {
326        check_matching_location(&self.location, &Self::other_location(&other));
327
328        if L::is_top_level() {
329            let left_ir_node = self.ir_node.into_inner();
330            let left_ir_node_metadata = left_ir_node.metadata().clone();
331            let right_ir_node = Self::other_ir_node(other);
332            let right_ir_node_metadata = right_ir_node.metadata().clone();
333
334            Self::make(
335                self.location.clone(),
336                HydroNode::Persist {
337                    inner: Box::new(HydroNode::CrossSingleton {
338                        left: Box::new(HydroNode::Unpersist {
339                            inner: Box::new(left_ir_node),
340                            metadata: left_ir_node_metadata,
341                        }),
342                        right: Box::new(HydroNode::Unpersist {
343                            inner: Box::new(right_ir_node),
344                            metadata: right_ir_node_metadata,
345                        }),
346                        metadata: self
347                            .location
348                            .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
349                    }),
350                    metadata: self
351                        .location
352                        .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
353                },
354            )
355        } else {
356            Self::make(
357                self.location.clone(),
358                HydroNode::CrossSingleton {
359                    left: Box::new(self.ir_node.into_inner()),
360                    right: Box::new(Self::other_ir_node(other)),
361                    metadata: self
362                        .location
363                        .new_node_metadata::<<Self as ZipResult<'a, O>>::ElementType>(),
364                },
365            )
366        }
367    }
368
369    pub fn continue_if<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
370    where
371        Self: ZipResult<
372                'a,
373                Optional<(), L, Bounded>,
374                Location = L,
375                Out = Optional<(T, ()), L, Bounded>,
376            >,
377    {
378        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
379    }
380
381    pub fn continue_unless<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded>
382    where
383        Singleton<T, L, B>: ZipResult<
384                'a,
385                Optional<(), L, Bounded>,
386                Location = L,
387                Out = Optional<(T, ()), L, Bounded>,
388            >,
389    {
390        self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0)))
391    }
392
393    /// An operator which allows you to "name" a `HydroNode`.
394    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
395    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
396        {
397            let mut node = self.ir_node.borrow_mut();
398            let metadata = node.metadata_mut();
399            metadata.tag = Some(name.to_string());
400        }
401        self
402    }
403}
404
405impl<'a, T, L, B> Singleton<T, Atomic<L>, B>
406where
407    L: Location<'a> + NoTick,
408{
409    /// Returns a singleton value corresponding to the latest snapshot of the singleton
410    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
411    /// at least all relevant data that contributed to the snapshot at tick `t`.
412    ///
413    /// # Non-Determinism
414    /// Because this picks a snapshot of a singleton whose value is continuously changing,
415    /// the output singleton has a non-deterministic value since the snapshot can be at an
416    /// arbitrary point in time.
417    pub fn snapshot(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
418        Singleton::new(
419            self.location.clone().tick,
420            HydroNode::Unpersist {
421                inner: Box::new(self.ir_node.into_inner()),
422                metadata: self.location.new_node_metadata::<T>(),
423            },
424        )
425    }
426
427    pub fn end_atomic(self) -> Optional<T, L, B> {
428        Optional::new(self.location.tick.l, self.ir_node.into_inner())
429    }
430}
431
432impl<'a, T, L, B> Singleton<T, L, B>
433where
434    L: Location<'a> + NoTick + NoAtomic,
435{
436    pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
437        Singleton::new(Atomic { tick: tick.clone() }, self.ir_node.into_inner())
438    }
439
440    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
441    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
442    /// relevant data that contributed to the snapshot at tick `t`.
443    ///
444    /// # Non-Determinism
445    /// Because this picks a snapshot of a singleton whose value is continuously changing,
446    /// the output singleton has a non-deterministic value since the snapshot can be at an
447    /// arbitrary point in time.
448    pub fn snapshot(self, tick: &Tick<L>, nondet: NonDet) -> Singleton<T, Tick<L>, Bounded>
449    where
450        L: NoTick,
451    {
452        self.atomic(tick).snapshot(nondet)
453    }
454
455    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
456    /// with order corresponding to increasing prefixes of data contributing to the singleton.
457    ///
458    /// # Non-Determinism
459    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
460    /// to non-deterministic batching and arrival of inputs, the output stream is
461    /// non-deterministic.
462    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
463        let tick = self.location.tick();
464        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
465    }
466
467    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
468    /// value taken at various points in time. Because the input singleton may be
469    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
470    /// represent the value of the singleton given some prefix of the streams leading up to
471    /// it.
472    ///
473    /// # Non-Determinism
474    /// The output stream is non-deterministic in which elements are sampled, since this
475    /// is controlled by a clock.
476    pub fn sample_every(
477        self,
478        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
479        nondet: NonDet,
480    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce> {
481        let samples = self.location.source_interval(interval, nondet);
482        let tick = self.location.tick();
483
484        self.snapshot(&tick, nondet)
485            .continue_if(samples.batch(&tick, nondet).first())
486            .all_ticks()
487            .weakest_retries()
488    }
489}
490
491impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
492where
493    L: Location<'a>,
494{
495    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
496        Stream::new(
497            self.location.outer().clone(),
498            HydroNode::Persist {
499                inner: Box::new(self.ir_node.into_inner()),
500                metadata: self.location.new_node_metadata::<T>(),
501            },
502        )
503    }
504
505    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
506        Stream::new(
507            Atomic {
508                tick: self.location.clone(),
509            },
510            HydroNode::Persist {
511                inner: Box::new(self.ir_node.into_inner()),
512                metadata: self.location.new_node_metadata::<T>(),
513            },
514        )
515    }
516
517    pub fn latest(self) -> Singleton<T, L, Unbounded> {
518        Singleton::new(
519            self.location.outer().clone(),
520            HydroNode::Persist {
521                inner: Box::new(self.ir_node.into_inner()),
522                metadata: self.location.new_node_metadata::<T>(),
523            },
524        )
525    }
526
527    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
528        Singleton::new(
529            Atomic {
530                tick: self.location.clone(),
531            },
532            HydroNode::Persist {
533                inner: Box::new(self.ir_node.into_inner()),
534                metadata: self.location.new_node_metadata::<T>(),
535            },
536        )
537    }
538
539    pub fn defer_tick(self) -> Singleton<T, Tick<L>, Bounded> {
540        Singleton::new(
541            self.location.clone(),
542            HydroNode::DeferTick {
543                input: Box::new(self.ir_node.into_inner()),
544                metadata: self.location.new_node_metadata::<T>(),
545            },
546        )
547    }
548
549    pub fn persist(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
550        Stream::new(
551            self.location.clone(),
552            HydroNode::Persist {
553                inner: Box::new(self.ir_node.into_inner()),
554                metadata: self.location.new_node_metadata::<T>(),
555            },
556        )
557    }
558
559    pub fn delta(self) -> Optional<T, Tick<L>, Bounded> {
560        Optional::new(
561            self.location.clone(),
562            HydroNode::Delta {
563                inner: Box::new(self.ir_node.into_inner()),
564                metadata: self.location.new_node_metadata::<T>(),
565            },
566        )
567    }
568
569    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
570        Stream::new(self.location, self.ir_node.into_inner())
571    }
572}
573
574pub trait ZipResult<'a, Other> {
575    type Out;
576    type ElementType;
577    type Location;
578
579    fn other_location(other: &Other) -> Self::Location;
580    fn other_ir_node(other: Other) -> HydroNode;
581
582    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
583}
584
585impl<'a, T, U, L, B> ZipResult<'a, Singleton<U, Tick<L>, B>> for Singleton<T, Tick<L>, B>
586where
587    U: Clone,
588    L: Location<'a>,
589{
590    type Out = Singleton<(T, U), Tick<L>, B>;
591    type ElementType = (T, U);
592    type Location = Tick<L>;
593
594    fn other_location(other: &Singleton<U, Tick<L>, B>) -> Tick<L> {
595        other.location.clone()
596    }
597
598    fn other_ir_node(other: Singleton<U, Tick<L>, B>) -> HydroNode {
599        other.ir_node.into_inner()
600    }
601
602    fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
603        Singleton::new(location, ir_node)
604    }
605}
606
607impl<'a, T, U, L, B> ZipResult<'a, Optional<U, Tick<L>, B>> for Singleton<T, Tick<L>, B>
608where
609    U: Clone,
610    L: Location<'a>,
611{
612    type Out = Optional<(T, U), Tick<L>, B>;
613    type ElementType = (T, U);
614    type Location = Tick<L>;
615
616    fn other_location(other: &Optional<U, Tick<L>, B>) -> Tick<L> {
617        other.location.clone()
618    }
619
620    fn other_ir_node(other: Optional<U, Tick<L>, B>) -> HydroNode {
621        other.ir_node.into_inner()
622    }
623
624    fn make(location: Tick<L>, ir_node: HydroNode) -> Self::Out {
625        Optional::new(location, ir_node)
626    }
627}
628
629#[cfg(test)]
630mod tests {
631    use futures::{SinkExt, StreamExt};
632    use hydro_deploy::Deployment;
633    use stageleft::q;
634
635    use crate::*;
636
637    #[tokio::test]
638    async fn tick_cycle_cardinality() {
639        let mut deployment = Deployment::new();
640
641        let flow = FlowBuilder::new();
642        let node = flow.process::<()>();
643        let external = flow.external::<()>();
644
645        let (input_send, input) = node.source_external_bincode(&external);
646
647        let node_tick = node.tick();
648        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
649        let counts = singleton
650            .clone()
651            .into_stream()
652            .count()
653            .continue_if(input.batch(&node_tick, nondet!(/** testing */)).first())
654            .all_ticks()
655            .send_bincode_external(&external);
656        complete_cycle.complete_next_tick(singleton);
657
658        let nodes = flow
659            .with_process(&node, deployment.Localhost())
660            .with_external(&external, deployment.Localhost())
661            .deploy(&mut deployment);
662
663        deployment.deploy().await.unwrap();
664
665        let mut tick_trigger = nodes.connect_sink_bincode(input_send).await;
666        let mut external_out = nodes.connect_source_bincode(counts).await;
667
668        deployment.start().await.unwrap();
669
670        tick_trigger.send(()).await.unwrap();
671
672        assert_eq!(external_out.next().await.unwrap(), 1);
673
674        tick_trigger.send(()).await.unwrap();
675
676        assert_eq!(external_out.next().await.unwrap(), 1);
677    }
678}