1use std::marker::PhantomData;
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6use syn::parse_quote;
7
8use crate::ir::{DebugInstantiate, HydroLeaf, HydroNode};
9use crate::keyed_singleton::KeyedSingleton;
10use crate::keyed_stream::KeyedStream;
11use crate::location::external_process::ExternalBincodeStream;
12use crate::location::tick::NoAtomic;
13use crate::location::{MembershipEvent, NoTick};
14use crate::staging_util::get_this_crate;
15use crate::stream::ExactlyOnce;
16use crate::{
17 Cluster, External, Location, MemberId, NonDet, Process, Stream, TotalOrder, Unbounded, nondet,
18};
19
20fn track_membership<'a, C, L: Location<'a> + NoTick + NoAtomic>(
22 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
23) -> KeyedSingleton<MemberId<C>, (), L, Unbounded> {
24 membership
25 .fold(
26 q!(|| false),
27 q!(|present, event| {
28 match event {
29 MembershipEvent::Joined => *present = true,
30 MembershipEvent::Left => *present = false,
31 }
32 }),
33 )
34 .filter_map(q!(|v| if v { Some(()) } else { None }))
35}
36
37pub fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
38 let root = get_this_crate();
39
40 if is_demux {
41 parse_quote! {
42 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::MemberId<_>, #t_type), _>(
43 |(id, data)| {
44 (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
45 }
46 )
47 }
48 } else {
49 parse_quote! {
50 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
51 |data| {
52 #root::runtime_support::bincode::serialize(&data).unwrap().into()
53 }
54 )
55 }
56 }
57}
58
59fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
60 serialize_bincode_with_type(is_demux, "e_type::<T>())
61}
62
63pub fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
64 let root = get_this_crate();
65
66 if let Some(c_type) = tagged {
67 parse_quote! {
68 |res| {
69 let (id, b) = res.unwrap();
70 (#root::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
71 }
72 }
73 } else {
74 parse_quote! {
75 |res| {
76 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
77 }
78 }
79 }
80}
81
82pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
83 deserialize_bincode_with_type(tagged, "e_type::<T>())
84}
85
86impl<'a, T, L, B, O, R> Stream<T, Cluster<'a, L>, B, O, R> {
87 pub fn send_bincode<L2>(
88 self,
89 other: &Process<'a, L2>,
90 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
91 where
92 T: Serialize + DeserializeOwned,
93 {
94 let serialize_pipeline = Some(serialize_bincode::<T>(false));
95
96 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
97
98 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
99 other.clone(),
100 HydroNode::Network {
101 serialize_fn: serialize_pipeline.map(|e| e.into()),
102 instantiate_fn: DebugInstantiate::Building,
103 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
104 input: Box::new(self.ir_node.into_inner()),
105 metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
106 },
107 );
108
109 raw_stream.into_keyed()
110 }
111
112 pub fn broadcast_bincode<L2: 'a>(
113 self,
114 other: &Cluster<'a, L2>,
115 nondet_membership: NonDet,
116 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
117 where
118 T: Clone + Serialize + DeserializeOwned,
119 {
120 let ids = track_membership(self.location.source_cluster_members(other));
121 let join_tick = self.location.tick();
122 let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
123
124 current_members
125 .weaker_retries()
126 .assume_ordering::<TotalOrder>(
127 nondet!(),
128 )
129 .cross_product_nested_loop(
130 self.batch(&join_tick, nondet_membership)
131 .assume_ordering::<TotalOrder>(
132 nondet!(),
133 ),
134 )
135 .assume_ordering::<O>(nondet!())
136 .all_ticks()
137 .demux_bincode(other)
138 }
139}
140
141impl<'a, T, L, L2, B, O, R> Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R> {
142 pub fn demux_bincode(
143 self,
144 other: &Cluster<'a, L2>,
145 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
146 where
147 T: Serialize + DeserializeOwned,
148 {
149 self.into_keyed().demux_bincode(other)
150 }
151}
152
153impl<'a, T, L, L2, B, O, R> KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R> {
154 pub fn demux_bincode(
155 self,
156 other: &Cluster<'a, L2>,
157 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
158 where
159 T: Serialize + DeserializeOwned,
160 {
161 let serialize_pipeline = Some(serialize_bincode::<T>(true));
162
163 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
164
165 Stream::new(
166 other.clone(),
167 HydroNode::Network {
168 serialize_fn: serialize_pipeline.map(|e| e.into()),
169 instantiate_fn: DebugInstantiate::Building,
170 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
171 input: Box::new(self.underlying.ir_node.into_inner()),
172 metadata: other.new_node_metadata::<T>(),
173 },
174 )
175 }
176}
177
178impl<'a, T, L, B> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
179 pub fn round_robin_bincode<L2: 'a>(
180 self,
181 other: &Cluster<'a, L2>,
182 nondet_membership: NonDet,
183 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
184 where
185 T: Serialize + DeserializeOwned,
186 {
187 let ids = track_membership(self.location.source_cluster_members(other));
188 let join_tick = self.location.tick();
189 let current_members = ids
190 .snapshot(&join_tick, nondet_membership)
191 .keys()
192 .assume_ordering(
193 nondet!(),
194 )
195 .collect_vec();
196
197 self.enumerate()
198 .batch(&join_tick, nondet_membership)
199 .cross_singleton(current_members)
200 .map(q!(|(data, members)| (
201 members[data.0 % members.len()],
202 data.1
203 )))
204 .all_ticks()
205 .demux_bincode(other)
206 }
207}
208
209impl<'a, T, L, L2, B, O, R> Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R> {
210 pub fn demux_bincode(
211 self,
212 other: &Cluster<'a, L2>,
213 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
214 where
215 T: Serialize + DeserializeOwned,
216 {
217 self.into_keyed().demux_bincode(other)
218 }
219}
220
221impl<'a, T, L, L2, B, O, R> KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R> {
222 pub fn demux_bincode(
223 self,
224 other: &Cluster<'a, L2>,
225 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
226 where
227 T: Serialize + DeserializeOwned,
228 {
229 let serialize_pipeline = Some(serialize_bincode::<T>(true));
230
231 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
232
233 let raw_stream: Stream<(MemberId<L>, T), Cluster<'a, L2>, Unbounded, O, R> = Stream::new(
234 other.clone(),
235 HydroNode::Network {
236 serialize_fn: serialize_pipeline.map(|e| e.into()),
237 instantiate_fn: DebugInstantiate::Building,
238 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
239 input: Box::new(self.underlying.ir_node.into_inner()),
240 metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
241 },
242 );
243
244 raw_stream.into_keyed()
245 }
246}
247
248impl<'a, T, L, B, O, R> Stream<T, Process<'a, L>, B, O, R> {
249 pub fn send_bincode<L2>(
250 self,
251 other: &Process<'a, L2>,
252 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
253 where
254 T: Serialize + DeserializeOwned,
255 {
256 let serialize_pipeline = Some(serialize_bincode::<T>(false));
257
258 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
259
260 Stream::new(
261 other.clone(),
262 HydroNode::Network {
263 serialize_fn: serialize_pipeline.map(|e| e.into()),
264 instantiate_fn: DebugInstantiate::Building,
265 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
266 input: Box::new(self.ir_node.into_inner()),
267 metadata: other.new_node_metadata::<T>(),
268 },
269 )
270 }
271
272 pub fn broadcast_bincode<L2: 'a>(
273 self,
274 other: &Cluster<'a, L2>,
275 nondet_membership: NonDet,
276 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
277 where
278 T: Clone + Serialize + DeserializeOwned,
279 {
280 let ids = track_membership(self.location.source_cluster_members(other));
281 let join_tick = self.location.tick();
282 let current_members = ids.snapshot(&join_tick, nondet_membership).keys();
283
284 current_members
285 .weaker_retries()
286 .assume_ordering::<TotalOrder>(
287 nondet!(),
288 )
289 .cross_product_nested_loop(
290 self.batch(&join_tick, nondet_membership)
291 .assume_ordering::<TotalOrder>(
292 nondet!(),
293 ),
294 )
295 .assume_ordering::<O>(nondet!())
296 .all_ticks()
297 .demux_bincode(other)
298 }
299
300 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T>
301 where
302 T: Serialize + DeserializeOwned,
303 {
304 let serialize_pipeline = Some(serialize_bincode::<T>(false));
305
306 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
307
308 let external_key = flow_state_borrow.next_external_out;
309 flow_state_borrow.next_external_out += 1;
310
311 let leaves = flow_state_borrow.leaves.as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled()");
312
313 leaves.push(HydroLeaf::SendExternal {
314 to_external_id: other.id,
315 to_key: external_key,
316 to_many: false,
317 serialize_fn: serialize_pipeline.map(|e| e.into()),
318 instantiate_fn: DebugInstantiate::Building,
319 input: Box::new(HydroNode::Unpersist {
320 inner: Box::new(self.ir_node.into_inner()),
321 metadata: self.location.new_node_metadata::<T>(),
322 }),
323 });
324
325 ExternalBincodeStream {
326 process_id: other.id,
327 port_id: external_key,
328 _phantom: PhantomData,
329 }
330 }
331}