hydro_lang/graph/
render.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::fmt::Write;
4
5use auto_impl::auto_impl;
6
7pub use super::graphviz::{HydroDot, escape_dot};
8// Re-export specific implementations
9pub use super::mermaid::{HydroMermaid, escape_mermaid};
10pub use super::reactflow::HydroReactFlow;
11use crate::ir::{DebugExpr, HydroLeaf, HydroNode, HydroSource};
12
13/// Label for a graph node - can be either a static string or contain expressions.
14#[derive(Debug, Clone)]
15pub enum NodeLabel {
16    /// A static string label
17    Static(String),
18    /// A label with an operation name and expression arguments
19    WithExprs {
20        op_name: String,
21        exprs: Vec<DebugExpr>,
22    },
23}
24
25impl NodeLabel {
26    /// Create a static label
27    pub fn static_label(s: String) -> Self {
28        Self::Static(s)
29    }
30
31    /// Create a label for an operation with multiple expression
32    pub fn with_exprs(op_name: String, exprs: Vec<DebugExpr>) -> Self {
33        Self::WithExprs { op_name, exprs }
34    }
35}
36
37impl std::fmt::Display for NodeLabel {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Self::Static(s) => write!(f, "{}", s),
41            Self::WithExprs { op_name, exprs } => {
42                if exprs.is_empty() {
43                    write!(f, "{}()", op_name)
44                } else {
45                    let expr_strs: Vec<_> = exprs.iter().map(|e| e.to_string()).collect();
46                    write!(f, "{}({})", op_name, expr_strs.join(", "))
47                }
48            }
49        }
50    }
51}
52
53/// Base struct for text-based graph writers that use indentation.
54/// Contains common fields shared by DOT and Mermaid writers.
55pub struct IndentedGraphWriter<W> {
56    pub write: W,
57    pub indent: usize,
58    pub config: HydroWriteConfig,
59}
60
61impl<W> IndentedGraphWriter<W> {
62    /// Create a new writer with default configuration.
63    pub fn new(write: W) -> Self {
64        Self {
65            write,
66            indent: 0,
67            config: HydroWriteConfig::default(),
68        }
69    }
70
71    /// Create a new writer with the given configuration.
72    pub fn new_with_config(write: W, config: &HydroWriteConfig) -> Self {
73        Self {
74            write,
75            indent: 0,
76            config: config.clone(),
77        }
78    }
79}
80
81impl<W: Write> IndentedGraphWriter<W> {
82    /// Write an indented line using the current indentation level.
83    pub fn writeln_indented(&mut self, content: &str) -> Result<(), std::fmt::Error> {
84        writeln!(self.write, "{b:i$}{content}", b = "", i = self.indent)
85    }
86}
87
88/// Common error type used by all graph writers.
89pub type GraphWriteError = std::fmt::Error;
90
91/// Trait for writing textual representations of Hydro IR graphs, i.e. mermaid or dot graphs.
92#[auto_impl(&mut, Box)]
93pub trait HydroGraphWrite {
94    /// Error type emitted by writing.
95    type Err: Error;
96
97    /// Begin the graph. First method called.
98    fn write_prologue(&mut self) -> Result<(), Self::Err>;
99
100    /// Write a node definition with styling.
101    fn write_node_definition(
102        &mut self,
103        node_id: usize,
104        node_label: &NodeLabel,
105        node_type: HydroNodeType,
106        location_id: Option<usize>,
107        location_type: Option<&str>,
108    ) -> Result<(), Self::Err>;
109
110    /// Write an edge between nodes with optional labeling.
111    fn write_edge(
112        &mut self,
113        src_id: usize,
114        dst_id: usize,
115        edge_type: HydroEdgeType,
116        label: Option<&str>,
117    ) -> Result<(), Self::Err>;
118
119    /// Begin writing a location grouping (process/cluster).
120    fn write_location_start(
121        &mut self,
122        location_id: usize,
123        location_type: &str,
124    ) -> Result<(), Self::Err>;
125
126    /// Write a node within a location.
127    fn write_node(&mut self, node_id: usize) -> Result<(), Self::Err>;
128
129    /// End writing a location grouping.
130    fn write_location_end(&mut self) -> Result<(), Self::Err>;
131
132    /// End the graph. Last method called.
133    fn write_epilogue(&mut self) -> Result<(), Self::Err>;
134}
135
136/// Types of nodes in Hydro IR for styling purposes.
137#[derive(Debug, Clone, Copy)]
138pub enum HydroNodeType {
139    Source,
140    Transform,
141    Join,
142    Aggregation,
143    Network,
144    Sink,
145    Tee,
146}
147
148/// Types of edges in Hydro IR.
149#[derive(Debug, Clone, Copy)]
150pub enum HydroEdgeType {
151    Stream,
152    Persistent,
153    Network,
154    Cycle,
155}
156
157/// Configuration for graph writing.
158#[derive(Debug, Clone)]
159pub struct HydroWriteConfig {
160    pub show_metadata: bool,
161    pub show_location_groups: bool,
162    pub use_short_labels: bool,
163    pub process_id_name: Vec<(usize, String)>,
164    pub cluster_id_name: Vec<(usize, String)>,
165    pub external_id_name: Vec<(usize, String)>,
166}
167
168impl Default for HydroWriteConfig {
169    fn default() -> Self {
170        Self {
171            show_metadata: false,
172            show_location_groups: true,
173            use_short_labels: true, // Default to short labels for all renderers
174            process_id_name: vec![],
175            cluster_id_name: vec![],
176            external_id_name: vec![],
177        }
178    }
179}
180
181/// Graph structure tracker for Hydro IR rendering.
182#[derive(Debug, Default)]
183pub struct HydroGraphStructure {
184    pub nodes: HashMap<usize, (NodeLabel, HydroNodeType, Option<usize>)>, /* node_id -> (label, type, location) */
185    pub edges: Vec<(usize, usize, HydroEdgeType, Option<String>)>, // (src, dst, edge_type, label)
186    pub locations: HashMap<usize, String>,                         // location_id -> location_type
187    pub next_node_id: usize,
188}
189
190impl HydroGraphStructure {
191    pub fn new() -> Self {
192        Self::default()
193    }
194
195    pub fn add_node(
196        &mut self,
197        label: NodeLabel,
198        node_type: HydroNodeType,
199        location: Option<usize>,
200    ) -> usize {
201        let node_id = self.next_node_id;
202        self.next_node_id += 1;
203        self.nodes.insert(node_id, (label, node_type, location));
204        node_id
205    }
206
207    pub fn add_edge(
208        &mut self,
209        src: usize,
210        dst: usize,
211        edge_type: HydroEdgeType,
212        label: Option<String>,
213    ) {
214        self.edges.push((src, dst, edge_type, label));
215    }
216
217    pub fn add_location(&mut self, location_id: usize, location_type: String) {
218        self.locations.insert(location_id, location_type);
219    }
220}
221
222/// Function to extract an op_name from a print_root() result for use in labels.
223pub fn extract_op_name(full_label: String) -> String {
224    full_label
225        .split('(')
226        .next()
227        .unwrap_or("unknown")
228        .to_string()
229        .to_lowercase()
230}
231
232/// Extract a short, readable label from the full token stream label using print_root() style naming
233pub fn extract_short_label(full_label: &str) -> String {
234    // Use the same logic as extract_op_name but handle the specific cases we need for UI display
235    if let Some(op_name) = full_label.split('(').next() {
236        let base_name = op_name.to_lowercase();
237        match base_name.as_str() {
238            // Handle special cases for UI display
239            "source" => {
240                if full_label.contains("Iter") {
241                    "source_iter".to_string()
242                } else if full_label.contains("Stream") {
243                    "source_stream".to_string()
244                } else if full_label.contains("ExternalNetwork") {
245                    "external_network".to_string()
246                } else if full_label.contains("Spin") {
247                    "spin".to_string()
248                } else {
249                    "source".to_string()
250                }
251            }
252            "network" => {
253                if full_label.contains("deser") {
254                    "network(recv)".to_string()
255                } else if full_label.contains("ser") {
256                    "network(send)".to_string()
257                } else {
258                    "network".to_string()
259                }
260            }
261            // For all other cases, just use the lowercase base name (same as extract_op_name)
262            _ => base_name,
263        }
264    } else {
265        // Fallback for labels that don't follow the pattern
266        if full_label.len() > 20 {
267            format!("{}...", &full_label[..17])
268        } else {
269            full_label.to_string()
270        }
271    }
272}
273
274/// Helper function to extract location ID and type from metadata.
275fn extract_location_id(metadata: &crate::ir::HydroIrMetadata) -> (Option<usize>, Option<String>) {
276    use crate::location::LocationId;
277    match &metadata.location_kind {
278        LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
279        LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
280        LocationId::Tick(_, inner) => match inner.as_ref() {
281            LocationId::Process(id) => (Some(*id), Some("Process".to_string())),
282            LocationId::Cluster(id) => (Some(*id), Some("Cluster".to_string())),
283            _ => (None, None),
284        },
285    }
286}
287
288/// Helper function to set up location in structure from metadata.
289fn setup_location(
290    structure: &mut HydroGraphStructure,
291    metadata: &crate::ir::HydroIrMetadata,
292) -> Option<usize> {
293    let (location_id, location_type) = extract_location_id(metadata);
294    if let (Some(loc_id), Some(loc_type)) = (location_id, location_type) {
295        structure.add_location(loc_id, loc_type);
296    }
297    location_id
298}
299
300impl HydroLeaf {
301    /// Core graph writing logic that works with any GraphWrite implementation.
302    pub fn write_graph<W>(
303        &self,
304        mut graph_write: W,
305        config: &HydroWriteConfig,
306    ) -> Result<(), W::Err>
307    where
308        W: HydroGraphWrite,
309    {
310        let mut structure = HydroGraphStructure::new();
311        let mut seen_tees = HashMap::new();
312
313        // Build the graph structure by traversing the IR
314        let _sink_id = self.build_graph_structure(&mut structure, &mut seen_tees, config);
315
316        // Write the graph
317        graph_write.write_prologue()?;
318
319        // Write node definitions
320        for (&node_id, (label, node_type, location)) in &structure.nodes {
321            let (location_id, location_type) = if let Some(loc_id) = location {
322                (
323                    Some(*loc_id),
324                    structure.locations.get(loc_id).map(|s| s.as_str()),
325                )
326            } else {
327                (None, None)
328            };
329
330            // Check if this is a label that came from an expression-containing operation
331            // We can detect this by looking for the pattern "op_name(...)" and checking if we have the original expressions
332            graph_write.write_node_definition(
333                node_id,
334                label,
335                *node_type,
336                location_id,
337                location_type,
338            )?;
339        }
340
341        // Group nodes by location if requested
342        if config.show_location_groups {
343            let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
344            for (&node_id, (_, _, location)) in &structure.nodes {
345                if let Some(location_id) = location {
346                    nodes_by_location
347                        .entry(*location_id)
348                        .or_default()
349                        .push(node_id);
350                }
351            }
352
353            for (&location_id, node_ids) in &nodes_by_location {
354                if let Some(location_type) = structure.locations.get(&location_id) {
355                    graph_write.write_location_start(location_id, location_type)?;
356                    for &node_id in node_ids {
357                        graph_write.write_node(node_id)?;
358                    }
359                    graph_write.write_location_end()?;
360                }
361            }
362        }
363
364        // Write edges
365        for (src_id, dst_id, edge_type, label) in &structure.edges {
366            graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
367        }
368
369        graph_write.write_epilogue()?;
370        Ok(())
371    }
372
373    /// Build the graph structure by traversing the IR tree.
374    pub fn build_graph_structure(
375        &self,
376        structure: &mut HydroGraphStructure,
377        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
378        config: &HydroWriteConfig,
379    ) -> usize {
380        // Helper function for sink nodes to reduce duplication
381        fn build_sink_node(
382            structure: &mut HydroGraphStructure,
383            seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
384            config: &HydroWriteConfig,
385            input: &HydroNode,
386            metadata: Option<&crate::ir::HydroIrMetadata>,
387            label: NodeLabel,
388            edge_type: HydroEdgeType,
389        ) -> usize {
390            let input_id = input.build_graph_structure(structure, seen_tees, config);
391            let location_id = metadata.and_then(|m| setup_location(structure, m));
392            let sink_id = structure.add_node(label, HydroNodeType::Sink, location_id);
393            structure.add_edge(input_id, sink_id, edge_type, None);
394            sink_id
395        }
396
397        match self {
398            // Sink operations with Stream edges - grouped by edge type
399            HydroLeaf::ForEach { f, input, metadata } => build_sink_node(
400                structure,
401                seen_tees,
402                config,
403                input,
404                Some(metadata),
405                NodeLabel::with_exprs("for_each".to_string(), vec![f.clone()]),
406                HydroEdgeType::Stream,
407            ),
408
409            HydroLeaf::SendExternal {
410                to_external_id,
411                to_key,
412                input,
413                ..
414            } => build_sink_node(
415                structure,
416                seen_tees,
417                config,
418                input,
419                None,
420                NodeLabel::with_exprs(
421                    format!("send_external({}:{})", to_external_id, to_key),
422                    vec![],
423                ),
424                HydroEdgeType::Stream,
425            ),
426
427            HydroLeaf::DestSink {
428                sink,
429                input,
430                metadata,
431            } => build_sink_node(
432                structure,
433                seen_tees,
434                config,
435                input,
436                Some(metadata),
437                NodeLabel::with_exprs("dest_sink".to_string(), vec![sink.clone()]),
438                HydroEdgeType::Stream,
439            ),
440
441            // Sink operation with Cycle edge - grouped by edge type
442            HydroLeaf::CycleSink {
443                ident,
444                input,
445                metadata,
446                ..
447            } => build_sink_node(
448                structure,
449                seen_tees,
450                config,
451                input,
452                Some(metadata),
453                NodeLabel::static_label(format!("cycle_sink({})", ident)),
454                HydroEdgeType::Cycle,
455            ),
456        }
457    }
458}
459
460impl HydroNode {
461    /// Build the graph structure recursively for this node.
462    pub fn build_graph_structure(
463        &self,
464        structure: &mut HydroGraphStructure,
465        seen_tees: &mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
466        config: &HydroWriteConfig,
467    ) -> usize {
468        use crate::location::LocationId;
469
470        // Helper functions to reduce duplication, categorized by input/expression patterns
471
472        /// Common parameters for transform builder functions to reduce argument count
473        struct TransformParams<'a> {
474            structure: &'a mut HydroGraphStructure,
475            seen_tees: &'a mut HashMap<*const std::cell::RefCell<HydroNode>, usize>,
476            config: &'a HydroWriteConfig,
477            input: &'a HydroNode,
478            metadata: &'a crate::ir::HydroIrMetadata,
479            op_name: String,
480            node_type: HydroNodeType,
481            edge_type: HydroEdgeType,
482        }
483
484        // Single-input transform with no expressions
485        fn build_simple_transform(params: TransformParams) -> usize {
486            let input_id = params.input.build_graph_structure(
487                params.structure,
488                params.seen_tees,
489                params.config,
490            );
491            let location_id = setup_location(params.structure, params.metadata);
492            let node_id = params.structure.add_node(
493                NodeLabel::Static(params.op_name.to_string()),
494                params.node_type,
495                location_id,
496            );
497            params
498                .structure
499                .add_edge(input_id, node_id, params.edge_type, None);
500            node_id
501        }
502
503        // Single-input transform with one expression
504        fn build_single_expr_transform(params: TransformParams, expr: &DebugExpr) -> usize {
505            let input_id = params.input.build_graph_structure(
506                params.structure,
507                params.seen_tees,
508                params.config,
509            );
510            let location_id = setup_location(params.structure, params.metadata);
511            let node_id = params.structure.add_node(
512                NodeLabel::with_exprs(params.op_name.to_string(), vec![expr.clone()]),
513                params.node_type,
514                location_id,
515            );
516            params
517                .structure
518                .add_edge(input_id, node_id, params.edge_type, None);
519            node_id
520        }
521
522        // Single-input transform with two expressions
523        fn build_dual_expr_transform(
524            params: TransformParams,
525            expr1: &DebugExpr,
526            expr2: &DebugExpr,
527        ) -> usize {
528            let input_id = params.input.build_graph_structure(
529                params.structure,
530                params.seen_tees,
531                params.config,
532            );
533            let location_id = setup_location(params.structure, params.metadata);
534            let node_id = params.structure.add_node(
535                NodeLabel::with_exprs(
536                    params.op_name.to_string(),
537                    vec![expr1.clone(), expr2.clone()],
538                ),
539                params.node_type,
540                location_id,
541            );
542            params
543                .structure
544                .add_edge(input_id, node_id, params.edge_type, None);
545            node_id
546        }
547
548        // Helper function for source nodes
549        fn build_source_node(
550            structure: &mut HydroGraphStructure,
551            metadata: &crate::ir::HydroIrMetadata,
552            label: String,
553        ) -> usize {
554            let location_id = setup_location(structure, metadata);
555            structure.add_node(NodeLabel::Static(label), HydroNodeType::Source, location_id)
556        }
557
558        match self {
559            HydroNode::Placeholder => structure.add_node(
560                NodeLabel::Static("PLACEHOLDER".to_string()),
561                HydroNodeType::Transform,
562                None,
563            ),
564
565            HydroNode::Source {
566                source, metadata, ..
567            } => {
568                let label = match source {
569                    HydroSource::Stream(expr) => format!("source_stream({})", expr),
570                    HydroSource::ExternalNetwork() => "external_network()".to_string(),
571                    HydroSource::Iter(expr) => format!("source_iter({})", expr),
572                    HydroSource::Spin() => "spin()".to_string(),
573                };
574                build_source_node(structure, metadata, label)
575            }
576
577            HydroNode::ExternalInput {
578                from_external_id,
579                from_key,
580                metadata,
581                ..
582            } => build_source_node(
583                structure,
584                metadata,
585                format!("external_input({}:{})", from_external_id, from_key),
586            ),
587
588            HydroNode::CycleSource {
589                ident, metadata, ..
590            } => build_source_node(structure, metadata, format!("cycle_source({})", ident)),
591
592            HydroNode::Tee { inner, metadata } => {
593                let ptr = inner.as_ptr();
594                if let Some(&existing_id) = seen_tees.get(&ptr) {
595                    return existing_id;
596                }
597
598                let input_id = inner
599                    .0
600                    .borrow()
601                    .build_graph_structure(structure, seen_tees, config);
602                let location_id = setup_location(structure, metadata);
603
604                let tee_id = structure.add_node(
605                    NodeLabel::Static(extract_op_name(self.print_root())),
606                    HydroNodeType::Tee,
607                    location_id,
608                );
609
610                seen_tees.insert(ptr, tee_id);
611
612                structure.add_edge(input_id, tee_id, HydroEdgeType::Stream, None);
613
614                tee_id
615            }
616
617            // Transform operations with Stream edges - grouped by node/edge type
618            HydroNode::Delta { inner, metadata }
619            | HydroNode::DeferTick {
620                input: inner,
621                metadata,
622            }
623            | HydroNode::Enumerate {
624                input: inner,
625                metadata,
626                ..
627            }
628            | HydroNode::Unique {
629                input: inner,
630                metadata,
631            }
632            | HydroNode::ResolveFutures {
633                input: inner,
634                metadata,
635            }
636            | HydroNode::ResolveFuturesOrdered {
637                input: inner,
638                metadata,
639            } => build_simple_transform(TransformParams {
640                structure,
641                seen_tees,
642                config,
643                input: inner,
644                metadata,
645                op_name: extract_op_name(self.print_root()),
646                node_type: HydroNodeType::Transform,
647                edge_type: HydroEdgeType::Stream,
648            }),
649
650            // Transform operation with Persistent edge - grouped by node/edge type
651            HydroNode::Persist { inner, metadata } => build_simple_transform(TransformParams {
652                structure,
653                seen_tees,
654                config,
655                input: inner,
656                metadata,
657                op_name: extract_op_name(self.print_root()),
658                node_type: HydroNodeType::Transform,
659                edge_type: HydroEdgeType::Persistent,
660            }),
661
662            // Aggregation operation with Stream edge - grouped by node/edge type
663            HydroNode::Sort {
664                input: inner,
665                metadata,
666            } => build_simple_transform(TransformParams {
667                structure,
668                seen_tees,
669                config,
670                input: inner,
671                metadata,
672                op_name: extract_op_name(self.print_root()),
673                node_type: HydroNodeType::Aggregation,
674                edge_type: HydroEdgeType::Stream,
675            }),
676
677            // Single-expression Transform operations - grouped by node type
678            HydroNode::Map { f, input, metadata }
679            | HydroNode::Filter { f, input, metadata }
680            | HydroNode::FlatMap { f, input, metadata }
681            | HydroNode::FilterMap { f, input, metadata }
682            | HydroNode::Inspect { f, input, metadata } => build_single_expr_transform(
683                TransformParams {
684                    structure,
685                    seen_tees,
686                    config,
687                    input,
688                    metadata,
689                    op_name: extract_op_name(self.print_root()),
690                    node_type: HydroNodeType::Transform,
691                    edge_type: HydroEdgeType::Stream,
692                },
693                f,
694            ),
695
696            // Single-expression Aggregation operations - grouped by node type
697            HydroNode::Reduce { f, input, metadata }
698            | HydroNode::ReduceKeyed { f, input, metadata } => build_single_expr_transform(
699                TransformParams {
700                    structure,
701                    seen_tees,
702                    config,
703                    input,
704                    metadata,
705                    op_name: extract_op_name(self.print_root()),
706                    node_type: HydroNodeType::Aggregation,
707                    edge_type: HydroEdgeType::Stream,
708                },
709                f,
710            ),
711
712            // Join-like operations with left/right edge labels - grouped by edge labeling
713            HydroNode::Join {
714                left,
715                right,
716                metadata,
717            }
718            | HydroNode::CrossProduct {
719                left,
720                right,
721                metadata,
722            }
723            | HydroNode::CrossSingleton {
724                left,
725                right,
726                metadata,
727            } => {
728                let left_id = left.build_graph_structure(structure, seen_tees, config);
729                let right_id = right.build_graph_structure(structure, seen_tees, config);
730                let location_id = setup_location(structure, metadata);
731                let node_id = structure.add_node(
732                    NodeLabel::Static(extract_op_name(self.print_root())),
733                    HydroNodeType::Join,
734                    location_id,
735                );
736                structure.add_edge(
737                    left_id,
738                    node_id,
739                    HydroEdgeType::Stream,
740                    Some("left".to_string()),
741                );
742                structure.add_edge(
743                    right_id,
744                    node_id,
745                    HydroEdgeType::Stream,
746                    Some("right".to_string()),
747                );
748                node_id
749            }
750
751            // Join-like operations with pos/neg edge labels - grouped by edge labeling
752            HydroNode::Difference {
753                pos: left,
754                neg: right,
755                metadata,
756            }
757            | HydroNode::AntiJoin {
758                pos: left,
759                neg: right,
760                metadata,
761            } => {
762                let left_id = left.build_graph_structure(structure, seen_tees, config);
763                let right_id = right.build_graph_structure(structure, seen_tees, config);
764                let location_id = setup_location(structure, metadata);
765                let node_id = structure.add_node(
766                    NodeLabel::Static(extract_op_name(self.print_root())),
767                    HydroNodeType::Join,
768                    location_id,
769                );
770                structure.add_edge(
771                    left_id,
772                    node_id,
773                    HydroEdgeType::Stream,
774                    Some("pos".to_string()),
775                );
776                structure.add_edge(
777                    right_id,
778                    node_id,
779                    HydroEdgeType::Stream,
780                    Some("neg".to_string()),
781                );
782                node_id
783            }
784
785            // Dual expression transforms - consolidated using pattern matching
786            HydroNode::Fold {
787                init,
788                acc,
789                input,
790                metadata,
791            }
792            | HydroNode::FoldKeyed {
793                init,
794                acc,
795                input,
796                metadata,
797            }
798            | HydroNode::Scan {
799                init,
800                acc,
801                input,
802                metadata,
803            } => {
804                let node_type = HydroNodeType::Aggregation; // All are aggregation operations
805
806                build_dual_expr_transform(
807                    TransformParams {
808                        structure,
809                        seen_tees,
810                        config,
811                        input,
812                        metadata,
813                        op_name: extract_op_name(self.print_root()),
814                        node_type,
815                        edge_type: HydroEdgeType::Stream,
816                    },
817                    init,
818                    acc,
819                )
820            }
821
822            // Combination of join and transform
823            HydroNode::ReduceKeyedWatermark {
824                f,
825                input,
826                watermark,
827                metadata,
828            } => {
829                let input_id = input.build_graph_structure(structure, seen_tees, config);
830                let watermark_id = watermark.build_graph_structure(structure, seen_tees, config);
831                let location_id = setup_location(structure, metadata);
832                let join_node_id = structure.add_node(
833                    NodeLabel::Static(extract_op_name(self.print_root())),
834                    HydroNodeType::Join,
835                    location_id,
836                );
837                structure.add_edge(
838                    input_id,
839                    join_node_id,
840                    HydroEdgeType::Stream,
841                    Some("input".to_string()),
842                );
843                structure.add_edge(
844                    watermark_id,
845                    join_node_id,
846                    HydroEdgeType::Stream,
847                    Some("watermark".to_string()),
848                );
849
850                let node_id = structure.add_node(
851                    NodeLabel::with_exprs(
852                        extract_op_name(self.print_root()).to_string(),
853                        vec![f.clone()],
854                    ),
855                    HydroNodeType::Aggregation,
856                    location_id,
857                );
858                structure.add_edge(join_node_id, node_id, HydroEdgeType::Stream, None);
859                node_id
860            }
861
862            HydroNode::Network {
863                serialize_fn,
864                deserialize_fn,
865                input,
866                metadata,
867                ..
868            } => {
869                let input_id = input.build_graph_structure(structure, seen_tees, config);
870                let _from_location_id = setup_location(structure, metadata);
871
872                let to_location_id = match metadata.location_kind.root() {
873                    LocationId::Process(id) => {
874                        structure.add_location(*id, "Process".to_string());
875                        Some(*id)
876                    }
877                    LocationId::Cluster(id) => {
878                        structure.add_location(*id, "Cluster".to_string());
879                        Some(*id)
880                    }
881                    _ => None,
882                };
883
884                let mut label = "network(".to_string();
885                if serialize_fn.is_some() {
886                    label.push_str("ser");
887                }
888                if deserialize_fn.is_some() {
889                    if serialize_fn.is_some() {
890                        label.push_str(" + ");
891                    }
892                    label.push_str("deser");
893                }
894                label.push(')');
895
896                let network_id = structure.add_node(
897                    NodeLabel::Static(label),
898                    HydroNodeType::Network,
899                    to_location_id,
900                );
901                structure.add_edge(
902                    input_id,
903                    network_id,
904                    HydroEdgeType::Network,
905                    Some(format!("to {:?}", to_location_id)),
906                );
907                network_id
908            }
909
910            // Handle remaining node types
911            HydroNode::Unpersist { inner, .. } => {
912                // Unpersist is typically optimized away, just pass through
913                inner.build_graph_structure(structure, seen_tees, config)
914            }
915
916            HydroNode::Chain {
917                first,
918                second,
919                metadata,
920            } => {
921                let first_id = first.build_graph_structure(structure, seen_tees, config);
922                let second_id = second.build_graph_structure(structure, seen_tees, config);
923                let location_id = setup_location(structure, metadata);
924                let chain_id = structure.add_node(
925                    NodeLabel::Static(extract_op_name(self.print_root())),
926                    HydroNodeType::Transform,
927                    location_id,
928                );
929                structure.add_edge(
930                    first_id,
931                    chain_id,
932                    HydroEdgeType::Stream,
933                    Some("first".to_string()),
934                );
935                structure.add_edge(
936                    second_id,
937                    chain_id,
938                    HydroEdgeType::Stream,
939                    Some("second".to_string()),
940                );
941                chain_id
942            }
943
944            HydroNode::Counter {
945                tag: _,
946                duration,
947                input,
948                metadata,
949            } => build_single_expr_transform(
950                TransformParams {
951                    structure,
952                    seen_tees,
953                    config,
954                    input,
955                    metadata,
956                    op_name: extract_op_name(self.print_root()),
957                    node_type: HydroNodeType::Transform,
958                    edge_type: HydroEdgeType::Stream,
959                },
960                duration,
961            ),
962        }
963    }
964}
965
966/// Utility functions for rendering multiple leaves as a single graph.
967/// Macro to reduce duplication in render functions.
968macro_rules! render_hydro_ir {
969    ($name:ident, $write_fn:ident) => {
970        pub fn $name(leaves: &[HydroLeaf], config: &HydroWriteConfig) -> String {
971            let mut output = String::new();
972            $write_fn(&mut output, leaves, config).unwrap();
973            output
974        }
975    };
976}
977
978/// Macro to reduce duplication in write functions.
979macro_rules! write_hydro_ir {
980    ($name:ident, $writer_type:ty, $constructor:expr) => {
981        pub fn $name(
982            output: impl std::fmt::Write,
983            leaves: &[HydroLeaf],
984            config: &HydroWriteConfig,
985        ) -> std::fmt::Result {
986            let mut graph_write: $writer_type = $constructor(output, config);
987            write_hydro_ir_graph(&mut graph_write, leaves, config)
988        }
989    };
990}
991
992render_hydro_ir!(render_hydro_ir_mermaid, write_hydro_ir_mermaid);
993write_hydro_ir!(
994    write_hydro_ir_mermaid,
995    HydroMermaid<_>,
996    HydroMermaid::new_with_config
997);
998
999render_hydro_ir!(render_hydro_ir_dot, write_hydro_ir_dot);
1000write_hydro_ir!(write_hydro_ir_dot, HydroDot<_>, HydroDot::new_with_config);
1001
1002render_hydro_ir!(render_hydro_ir_reactflow, write_hydro_ir_reactflow);
1003write_hydro_ir!(
1004    write_hydro_ir_reactflow,
1005    HydroReactFlow<_>,
1006    HydroReactFlow::new
1007);
1008
1009fn write_hydro_ir_graph<W>(
1010    mut graph_write: W,
1011    leaves: &[HydroLeaf],
1012    config: &HydroWriteConfig,
1013) -> Result<(), W::Err>
1014where
1015    W: HydroGraphWrite,
1016{
1017    let mut structure = HydroGraphStructure::new();
1018    let mut seen_tees = HashMap::new();
1019
1020    // Build the graph structure for all leaves
1021    for leaf in leaves {
1022        leaf.build_graph_structure(&mut structure, &mut seen_tees, config);
1023    }
1024
1025    // Write the graph using the same logic as individual leaves
1026    graph_write.write_prologue()?;
1027
1028    for (&node_id, (label, node_type, location)) in &structure.nodes {
1029        let (location_id, location_type) = if let Some(loc_id) = location {
1030            (
1031                Some(*loc_id),
1032                structure.locations.get(loc_id).map(|s| s.as_str()),
1033            )
1034        } else {
1035            (None, None)
1036        };
1037        graph_write.write_node_definition(
1038            node_id,
1039            label,
1040            *node_type,
1041            location_id,
1042            location_type,
1043        )?;
1044    }
1045
1046    if config.show_location_groups {
1047        let mut nodes_by_location: HashMap<usize, Vec<usize>> = HashMap::new();
1048        for (&node_id, (_, _, location)) in &structure.nodes {
1049            if let Some(location_id) = location {
1050                nodes_by_location
1051                    .entry(*location_id)
1052                    .or_default()
1053                    .push(node_id);
1054            }
1055        }
1056
1057        for (&location_id, node_ids) in &nodes_by_location {
1058            if let Some(location_type) = structure.locations.get(&location_id) {
1059                graph_write.write_location_start(location_id, location_type)?;
1060                for &node_id in node_ids {
1061                    graph_write.write_node(node_id)?;
1062                }
1063                graph_write.write_location_end()?;
1064            }
1065        }
1066    }
1067
1068    for (src_id, dst_id, edge_type, label) in &structure.edges {
1069        graph_write.write_edge(*src_id, *dst_id, *edge_type, label.as_deref())?;
1070    }
1071
1072    graph_write.write_epilogue()?;
1073    Ok(())
1074}