Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39/// A closure expression bundled with any singleton references it captures.
40///
41/// When a `q!()` closure captures a `SingletonRef`, the reference is recorded here
42/// alongside the closure's expression. This allows per-closure tracking of singleton
43/// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`).
44pub struct ClosureExpr {
45    pub(crate) expr: DebugExpr,
46    /// Each entry is `(HydroNode::Reference, is_mut: bool)`.
47    /// The index in the Vec determines the ident name via [`handoff_ref_ident`].
48    /// The `access_counter` was assigned at staging time in code order.
49    pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
50}
51
52impl Clone for ClosureExpr {
53    fn clone(&self) -> Self {
54        Self {
55            expr: self.expr.clone(),
56            singleton_refs: self
57                .singleton_refs
58                .iter()
59                .map(|(node, is_mut)| {
60                    let HydroNode::Reference {
61                        inner,
62                        kind,
63                        access_counter,
64                        metadata,
65                    } = node
66                    else {
67                        panic!("singleton_refs should only contain HydroNode::Reference");
68                    };
69                    (
70                        HydroNode::Reference {
71                            inner: SharedNode(Rc::clone(&inner.0)),
72                            kind: *kind,
73                            access_counter: access_counter.freeze(),
74                            metadata: metadata.clone(),
75                        },
76                        *is_mut,
77                    )
78                })
79                .collect(),
80        }
81    }
82}
83
84impl Hash for ClosureExpr {
85    fn hash<H: Hasher>(&self, state: &mut H) {
86        self.expr.hash(state);
87        // singleton_refs are structural children (like HydroIrMetadata), not
88        // identity-defining. Two closures with the same expr but different
89        // captured refs are the same closure text — the refs only affect codegen.
90    }
91}
92
93impl serde::Serialize for ClosureExpr {
94    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95        use serde::ser::SerializeStruct;
96        let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
97        s.serialize_field("expr", &self.expr)?;
98        s.serialize_field(
99            "singleton_refs",
100            &SerializableSingletonRefs(&self.singleton_refs),
101        )?;
102        s.end()
103    }
104}
105
106struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
107
108impl serde::Serialize for SerializableSingletonRefs<'_> {
109    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
110        use serde::ser::SerializeSeq;
111        let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
112        for (node, is_mut) in self.0.iter() {
113            seq.serialize_element(&(node, is_mut))?;
114        }
115        seq.end()
116    }
117}
118
119impl Debug for ClosureExpr {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        Debug::fmt(&self.expr, f)
122    }
123}
124
125impl Display for ClosureExpr {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        Display::fmt(&self.expr, f)
128    }
129}
130
131impl From<syn::Expr> for ClosureExpr {
132    fn from(expr: syn::Expr) -> Self {
133        Self {
134            expr: DebugExpr(Box::new(expr)),
135            singleton_refs: Vec::new(),
136        }
137    }
138}
139
140impl From<DebugExpr> for ClosureExpr {
141    fn from(expr: DebugExpr) -> Self {
142        Self {
143            expr,
144            singleton_refs: Vec::new(),
145        }
146    }
147}
148
149impl ClosureExpr {
150    pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
151        Self {
152            expr,
153            singleton_refs,
154        }
155    }
156
157    pub fn has_mut_ref(&self) -> bool {
158        self.singleton_refs.iter().any(|(_, is_mut)| *is_mut)
159    }
160
161    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
162        Self {
163            expr: self.expr.clone(),
164            singleton_refs: self
165                .singleton_refs
166                .iter()
167                .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
168                .collect(),
169        }
170    }
171
172    pub fn transform_children(
173        &mut self,
174        transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
175        seen_tees: &mut SeenSharedNodes,
176    ) {
177        for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
178            transform(ref_node, seen_tees);
179        }
180    }
181
182    /// Pop singleton ref idents from the stack and rewrite the closure's token stream,
183    /// replacing local singleton ref idents with `#{N} dfir_ident` or `#{N} mut dfir_ident` references.
184    #[cfg(feature = "build")]
185    pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
186        if self.singleton_refs.is_empty() {
187            self.expr.0.to_token_stream()
188        } else {
189            assert!(
190                ident_stack.len() >= self.singleton_refs.len(),
191                "ident_stack has {} entries but expected at least {} for singleton_refs",
192                ident_stack.len(),
193                self.singleton_refs.len()
194            );
195            let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
196
197            let mut let_bindings = Vec::new();
198            for ((i, (ref_node, is_mut)), ref_ident) in
199                self.singleton_refs.iter().enumerate().zip(ref_idents)
200            {
201                let HydroNode::Reference { access_counter, .. } = ref_node else {
202                    panic!("ClosureExpression expected references to `HydroNode::Reference`");
203                };
204                let group = access_counter.frozen_group();
205                // TODO(mingwei): proper spanning?
206                let local_ident = handoff_ref_ident(i);
207                let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
208                let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
209                let mut_token = is_mut.then(|| quote!(mut));
210                let binding = quote! {
211                    let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
212                };
213                let_bindings.push(binding);
214            }
215
216            let expr = &self.expr.0;
217            quote! {
218                {
219                    #( #let_bindings )*
220                    #expr
221                }
222            }
223        }
224    }
225}
226
227/// Wrapper that displays only the tokens of a parsed expr.
228///
229/// Boxes `syn::Type` which is ~240 bytes.
230#[derive(Clone, Hash)]
231pub struct DebugExpr(pub Box<syn::Expr>);
232
233impl serde::Serialize for DebugExpr {
234    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
235        serializer.serialize_str(&self.to_string())
236    }
237}
238
239impl From<syn::Expr> for DebugExpr {
240    fn from(expr: syn::Expr) -> Self {
241        Self(Box::new(expr))
242    }
243}
244
245impl Deref for DebugExpr {
246    type Target = syn::Expr;
247
248    fn deref(&self) -> &Self::Target {
249        &self.0
250    }
251}
252
253impl ToTokens for DebugExpr {
254    fn to_tokens(&self, tokens: &mut TokenStream) {
255        self.0.to_tokens(tokens);
256    }
257}
258
259impl Debug for DebugExpr {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        write!(f, "{}", self.0.to_token_stream())
262    }
263}
264
265impl Display for DebugExpr {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        let original = self.0.as_ref().clone();
268        let simplified = simplify_q_macro(original);
269
270        // For now, just use quote formatting without trying to parse as a statement
271        // This avoids the syn::parse_quote! issues entirely
272        write!(f, "q!({})", quote::quote!(#simplified))
273    }
274}
275
276/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
277fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
278    if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
279        // Look for calls to stageleft::runtime_support::fn*
280        && is_stageleft_runtime_support_call(&path_expr.path)
281        && let syn::Expr::Block(b) = &call.args[0]
282        && b.block.stmts.len() == 3
283        && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
284    // skip the first two, which are imports
285    {
286        let mut e = e.clone();
287        while let syn::Expr::Block(ref mut block) = e
288            && block.block.stmts.len() == 1
289            && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
290        {
291            e = inner_e;
292        }
293
294        e
295    } else {
296        expr
297    }
298}
299
300fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
301    // Check if this is a call to stageleft::runtime_support::fn*
302    if let Some(last_segment) = path.segments.last() {
303        let fn_name = last_segment.ident.to_string();
304        path.segments.len() > 2
305            && path.segments[0].ident == "stageleft"
306            && path.segments[1].ident == "runtime_support"
307            && fn_name.contains("_type_hint")
308    } else {
309        false
310    }
311}
312
313/// Debug displays the type's tokens.
314///
315/// Boxes `syn::Type` which is ~320 bytes.
316#[derive(Clone, PartialEq, Eq, Hash)]
317pub struct DebugType(pub Box<syn::Type>);
318
319impl From<syn::Type> for DebugType {
320    fn from(t: syn::Type) -> Self {
321        Self(Box::new(t))
322    }
323}
324
325impl Deref for DebugType {
326    type Target = syn::Type;
327
328    fn deref(&self) -> &Self::Target {
329        &self.0
330    }
331}
332
333impl ToTokens for DebugType {
334    fn to_tokens(&self, tokens: &mut TokenStream) {
335        self.0.to_tokens(tokens);
336    }
337}
338
339impl Debug for DebugType {
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        write!(f, "{}", self.0.to_token_stream())
342    }
343}
344
345impl serde::Serialize for DebugType {
346    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
347        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
348    }
349}
350
351fn serialize_backtrace_as_span<S: serde::Serializer>(
352    backtrace: &Backtrace,
353    serializer: S,
354) -> Result<S::Ok, S::Error> {
355    match backtrace.format_span() {
356        Some(span) => serializer.serialize_some(&span),
357        None => serializer.serialize_none(),
358    }
359}
360
361fn serialize_ident<S: serde::Serializer>(
362    ident: &syn::Ident,
363    serializer: S,
364) -> Result<S::Ok, S::Error> {
365    serializer.serialize_str(&ident.to_string())
366}
367
368pub enum DebugInstantiate {
369    Building,
370    Finalized(Box<DebugInstantiateFinalized>),
371}
372
373impl serde::Serialize for DebugInstantiate {
374    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
375        match self {
376            DebugInstantiate::Building => {
377                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
378            }
379            DebugInstantiate::Finalized(_) => {
380                panic!(
381                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
382                )
383            }
384        }
385    }
386}
387
388#[cfg_attr(
389    not(feature = "build"),
390    expect(
391        dead_code,
392        reason = "sink, source unused without `feature = \"build\"`."
393    )
394)]
395pub struct DebugInstantiateFinalized {
396    sink: syn::Expr,
397    source: syn::Expr,
398    connect_fn: Option<Box<dyn FnOnce()>>,
399}
400
401impl From<DebugInstantiateFinalized> for DebugInstantiate {
402    fn from(f: DebugInstantiateFinalized) -> Self {
403        Self::Finalized(Box::new(f))
404    }
405}
406
407impl Debug for DebugInstantiate {
408    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409        write!(f, "<network instantiate>")
410    }
411}
412
413impl Hash for DebugInstantiate {
414    fn hash<H: Hasher>(&self, _state: &mut H) {
415        // Do nothing
416    }
417}
418
419impl Clone for DebugInstantiate {
420    fn clone(&self) -> Self {
421        match self {
422            DebugInstantiate::Building => DebugInstantiate::Building,
423            DebugInstantiate::Finalized(_) => {
424                panic!("DebugInstantiate::Finalized should not be cloned")
425            }
426        }
427    }
428}
429
430/// Tracks the instantiation state of a `ClusterMembers` source.
431///
432/// During `compile_network`, the first `ClusterMembers` node for a given
433/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
434/// receives the expression returned by `Deploy::cluster_membership_stream`.
435/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
436/// during code-gen they simply reference the tee output of the first node
437/// instead of creating a redundant `source_stream`.
438#[derive(Debug, Hash, Clone, serde::Serialize)]
439pub enum ClusterMembersState {
440    /// Not yet instantiated.
441    Uninit,
442    /// The primary instance: holds the stream expression and will emit
443    /// `source_stream(expr) -> tee()` during code-gen.
444    Stream(DebugExpr),
445    /// A secondary instance that references the tee output of the primary.
446    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
447    /// can derive the deterministic tee ident without extra state.
448    Tee(LocationId, LocationId),
449}
450
451/// A source in a Hydro graph, where data enters the graph.
452#[derive(Debug, Hash, Clone, serde::Serialize)]
453pub enum HydroSource {
454    Stream(DebugExpr),
455    ExternalNetwork(),
456    Iter(DebugExpr),
457    Spin(),
458    ClusterMembers(LocationId, ClusterMembersState),
459    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
460    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
461}
462
463#[cfg(feature = "build")]
464/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
465/// and simulations.
466///
467/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
468/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
469pub trait DfirBuilder {
470    /// Whether the representation of singletons should include intermediate states.
471    fn singleton_intermediates(&self) -> bool;
472
473    /// Gets the DFIR builder for the given location, creating it if necessary.
474    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
475
476    #[expect(clippy::too_many_arguments, reason = "TODO")]
477    fn batch(
478        &mut self,
479        in_ident: syn::Ident,
480        in_location: &LocationId,
481        in_kind: &CollectionKind,
482        out_ident: &syn::Ident,
483        out_location: &LocationId,
484        op_meta: &HydroIrOpMetadata,
485        fold_hooked_idents: &HashSet<String>,
486    );
487    fn yield_from_tick(
488        &mut self,
489        in_ident: syn::Ident,
490        in_location: &LocationId,
491        in_kind: &CollectionKind,
492        out_ident: &syn::Ident,
493        out_location: &LocationId,
494    );
495
496    fn begin_atomic(
497        &mut self,
498        in_ident: syn::Ident,
499        in_location: &LocationId,
500        in_kind: &CollectionKind,
501        out_ident: &syn::Ident,
502        out_location: &LocationId,
503        op_meta: &HydroIrOpMetadata,
504    );
505    fn end_atomic(
506        &mut self,
507        in_ident: syn::Ident,
508        in_location: &LocationId,
509        in_kind: &CollectionKind,
510        out_ident: &syn::Ident,
511    );
512
513    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
514    fn observe_nondet(
515        &mut self,
516        trusted: bool,
517        location: &LocationId,
518        in_ident: syn::Ident,
519        in_kind: &CollectionKind,
520        out_ident: &syn::Ident,
521        out_kind: &CollectionKind,
522        op_meta: &HydroIrOpMetadata,
523    );
524
525    #[expect(clippy::too_many_arguments, reason = "TODO")]
526    fn merge_ordered(
527        &mut self,
528        location: &LocationId,
529        first_ident: syn::Ident,
530        second_ident: syn::Ident,
531        out_ident: &syn::Ident,
532        in_kind: &CollectionKind,
533        op_meta: &HydroIrOpMetadata,
534        operator_tag: Option<&str>,
535    );
536
537    #[expect(clippy::too_many_arguments, reason = "TODO")]
538    fn create_network(
539        &mut self,
540        from: &LocationId,
541        to: &LocationId,
542        input_ident: syn::Ident,
543        out_ident: &syn::Ident,
544        serialize: Option<&DebugExpr>,
545        sink: syn::Expr,
546        source: syn::Expr,
547        deserialize: Option<&DebugExpr>,
548        tag_id: StmtId,
549        networking_info: &crate::networking::NetworkingInfo,
550    );
551
552    fn create_external_source(
553        &mut self,
554        on: &LocationId,
555        source_expr: syn::Expr,
556        out_ident: &syn::Ident,
557        deserialize: Option<&DebugExpr>,
558        tag_id: StmtId,
559    );
560
561    fn create_external_output(
562        &mut self,
563        on: &LocationId,
564        sink_expr: syn::Expr,
565        input_ident: &syn::Ident,
566        serialize: Option<&DebugExpr>,
567        tag_id: StmtId,
568    );
569
570    /// Optionally emit a fold hook that buffers and permutes inputs before the fold.
571    /// Returns the new input ident to use for the fold if a hook was emitted.
572    fn emit_fold_hook(
573        &mut self,
574        location: &LocationId,
575        in_ident: &syn::Ident,
576        in_kind: &CollectionKind,
577        op_meta: &HydroIrOpMetadata,
578    ) -> Option<syn::Ident>;
579
580    /// Inserts necessary code to validate a manual assertion that at this point the
581    /// input live collection is consistent. In production, this is a no-op, but in simulation
582    /// this will (not yet implemented) inject assertions that validate consistency.
583    fn assert_is_consistent(
584        &mut self,
585        trusted: bool,
586        location: &LocationId,
587        in_ident: syn::Ident,
588        out_ident: &syn::Ident,
589    );
590
591    /// Observes non-determinism introduced by a mut closure operating on a non-strict
592    /// (unordered / at-least-once) input. In production this is identity; in simulation
593    /// it delegates to `observe_nondet` with the strict output kind.
594    fn observe_for_mut(
595        &mut self,
596        location: &LocationId,
597        in_ident: syn::Ident,
598        in_kind: &CollectionKind,
599        out_ident: &syn::Ident,
600        op_meta: &HydroIrOpMetadata,
601    );
602
603    fn create_versioned_network_fork(
604        &mut self,
605        channel_id: u32,
606        dest: &LocationId,
607        senders: Vec<(LocationId, syn::Ident, Option<DebugExpr>)>,
608        tag_id: StmtId,
609    );
610
611    fn create_versioned_network(
612        &mut self,
613        channel_id: u32,
614        source: &LocationId,
615        dest: &LocationId,
616        out_ident: &syn::Ident,
617        deserialize: Option<&DebugExpr>,
618        tag_id: StmtId,
619    );
620}
621
622#[cfg(feature = "build")]
623impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
624    fn singleton_intermediates(&self) -> bool {
625        false
626    }
627
628    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
629        self.entry(location.root().key())
630            .expect("location was removed")
631            .or_default()
632    }
633
634    fn batch(
635        &mut self,
636        in_ident: syn::Ident,
637        in_location: &LocationId,
638        in_kind: &CollectionKind,
639        out_ident: &syn::Ident,
640        _out_location: &LocationId,
641        _op_meta: &HydroIrOpMetadata,
642        _fold_hooked_idents: &HashSet<String>,
643    ) {
644        let builder = self.get_dfir_mut(in_location.root());
645        if in_kind.is_bounded()
646            && matches!(
647                in_kind,
648                CollectionKind::Singleton { .. }
649                    | CollectionKind::Optional { .. }
650                    | CollectionKind::KeyedSingleton { .. }
651            )
652        {
653            assert!(in_location.is_top_level());
654            builder.add_dfir(
655                parse_quote! {
656                    #out_ident = #in_ident -> persist::<'static>();
657                },
658                None,
659                None,
660            );
661        } else {
662            builder.add_dfir(
663                parse_quote! {
664                    #out_ident = #in_ident;
665                },
666                None,
667                None,
668            );
669        }
670    }
671
672    fn yield_from_tick(
673        &mut self,
674        in_ident: syn::Ident,
675        in_location: &LocationId,
676        _in_kind: &CollectionKind,
677        out_ident: &syn::Ident,
678        _out_location: &LocationId,
679    ) {
680        let builder = self.get_dfir_mut(in_location.root());
681        builder.add_dfir(
682            parse_quote! {
683                #out_ident = #in_ident;
684            },
685            None,
686            None,
687        );
688    }
689
690    fn begin_atomic(
691        &mut self,
692        in_ident: syn::Ident,
693        in_location: &LocationId,
694        _in_kind: &CollectionKind,
695        out_ident: &syn::Ident,
696        _out_location: &LocationId,
697        _op_meta: &HydroIrOpMetadata,
698    ) {
699        let builder = self.get_dfir_mut(in_location.root());
700        builder.add_dfir(
701            parse_quote! {
702                #out_ident = #in_ident;
703            },
704            None,
705            None,
706        );
707    }
708
709    fn end_atomic(
710        &mut self,
711        in_ident: syn::Ident,
712        in_location: &LocationId,
713        _in_kind: &CollectionKind,
714        out_ident: &syn::Ident,
715    ) {
716        let builder = self.get_dfir_mut(in_location.root());
717        builder.add_dfir(
718            parse_quote! {
719                #out_ident = #in_ident;
720            },
721            None,
722            None,
723        );
724    }
725
726    fn observe_nondet(
727        &mut self,
728        _trusted: bool,
729        location: &LocationId,
730        in_ident: syn::Ident,
731        _in_kind: &CollectionKind,
732        out_ident: &syn::Ident,
733        _out_kind: &CollectionKind,
734        _op_meta: &HydroIrOpMetadata,
735    ) {
736        let builder = self.get_dfir_mut(location);
737        builder.add_dfir(
738            parse_quote! {
739                #out_ident = #in_ident;
740            },
741            None,
742            None,
743        );
744    }
745
746    fn merge_ordered(
747        &mut self,
748        location: &LocationId,
749        first_ident: syn::Ident,
750        second_ident: syn::Ident,
751        out_ident: &syn::Ident,
752        _in_kind: &CollectionKind,
753        _op_meta: &HydroIrOpMetadata,
754        operator_tag: Option<&str>,
755    ) {
756        let builder = self.get_dfir_mut(location);
757        builder.add_dfir(
758            parse_quote! {
759                #out_ident = union();
760                #first_ident -> [0]#out_ident;
761                #second_ident -> [1]#out_ident;
762            },
763            None,
764            operator_tag,
765        );
766    }
767
768    fn create_network(
769        &mut self,
770        from: &LocationId,
771        to: &LocationId,
772        input_ident: syn::Ident,
773        out_ident: &syn::Ident,
774        serialize: Option<&DebugExpr>,
775        sink: syn::Expr,
776        source: syn::Expr,
777        deserialize: Option<&DebugExpr>,
778        tag_id: StmtId,
779        _networking_info: &crate::networking::NetworkingInfo,
780    ) {
781        let sender_builder = self.get_dfir_mut(from);
782        if let Some(serialize_pipeline) = serialize {
783            sender_builder.add_dfir(
784                parse_quote! {
785                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
786                },
787                None,
788                // operator tag separates send and receive, which otherwise have the same next_stmt_id
789                Some(&format!("send{}", tag_id)),
790            );
791        } else {
792            sender_builder.add_dfir(
793                parse_quote! {
794                    #input_ident -> dest_sink(#sink);
795                },
796                None,
797                Some(&format!("send{}", tag_id)),
798            );
799        }
800
801        let receiver_builder = self.get_dfir_mut(to);
802        if let Some(deserialize_pipeline) = deserialize {
803            receiver_builder.add_dfir(
804                parse_quote! {
805                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
806                },
807                None,
808                Some(&format!("recv{}", tag_id)),
809            );
810        } else {
811            receiver_builder.add_dfir(
812                parse_quote! {
813                    #out_ident = source_stream(#source);
814                },
815                None,
816                Some(&format!("recv{}", tag_id)),
817            );
818        }
819    }
820
821    fn create_external_source(
822        &mut self,
823        on: &LocationId,
824        source_expr: syn::Expr,
825        out_ident: &syn::Ident,
826        deserialize: Option<&DebugExpr>,
827        tag_id: StmtId,
828    ) {
829        let receiver_builder = self.get_dfir_mut(on);
830        if let Some(deserialize_pipeline) = deserialize {
831            receiver_builder.add_dfir(
832                parse_quote! {
833                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
834                },
835                None,
836                Some(&format!("recv{}", tag_id)),
837            );
838        } else {
839            receiver_builder.add_dfir(
840                parse_quote! {
841                    #out_ident = source_stream(#source_expr);
842                },
843                None,
844                Some(&format!("recv{}", tag_id)),
845            );
846        }
847    }
848
849    fn create_external_output(
850        &mut self,
851        on: &LocationId,
852        sink_expr: syn::Expr,
853        input_ident: &syn::Ident,
854        serialize: Option<&DebugExpr>,
855        tag_id: StmtId,
856    ) {
857        let sender_builder = self.get_dfir_mut(on);
858        if let Some(serialize_fn) = serialize {
859            sender_builder.add_dfir(
860                parse_quote! {
861                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
862                },
863                None,
864                // operator tag separates send and receive, which otherwise have the same next_stmt_id
865                Some(&format!("send{}", tag_id)),
866            );
867        } else {
868            sender_builder.add_dfir(
869                parse_quote! {
870                    #input_ident -> dest_sink(#sink_expr);
871                },
872                None,
873                Some(&format!("send{}", tag_id)),
874            );
875        }
876    }
877
878    fn emit_fold_hook(
879        &mut self,
880        _location: &LocationId,
881        _in_ident: &syn::Ident,
882        _in_kind: &CollectionKind,
883        _op_meta: &HydroIrOpMetadata,
884    ) -> Option<syn::Ident> {
885        None
886    }
887
888    fn assert_is_consistent(
889        &mut self,
890        _trusted: bool,
891        location: &LocationId,
892        in_ident: syn::Ident,
893        out_ident: &syn::Ident,
894    ) {
895        let builder = self.get_dfir_mut(location);
896        builder.add_dfir(
897            parse_quote! {
898                #out_ident = #in_ident;
899            },
900            None,
901            None,
902        );
903    }
904
905    fn observe_for_mut(
906        &mut self,
907        location: &LocationId,
908        in_ident: syn::Ident,
909        _in_kind: &CollectionKind,
910        out_ident: &syn::Ident,
911        _op_meta: &HydroIrOpMetadata,
912    ) {
913        let builder = self.get_dfir_mut(location);
914        builder.add_dfir(
915            parse_quote! {
916                #out_ident = #in_ident;
917            },
918            None,
919            None,
920        );
921    }
922
923    fn create_versioned_network_fork(
924        &mut self,
925        _channel_id: u32,
926        _dest: &LocationId,
927        _senders: Vec<(LocationId, syn::Ident, Option<DebugExpr>)>,
928        _tag_id: StmtId,
929    ) {
930        unreachable!(
931            "HydroNode::VersionedNetworkFork is only produced by the multi-version simulator merge \
932             pass and cannot be emitted by the non-simulation builder"
933        );
934    }
935
936    fn create_versioned_network(
937        &mut self,
938        _channel_id: u32,
939        _source: &LocationId,
940        _dest: &LocationId,
941        _out_ident: &syn::Ident,
942        _deserialize: Option<&DebugExpr>,
943        _tag_id: StmtId,
944    ) {
945        unreachable!(
946            "HydroNode::VersionedNetwork is only produced by the multi-version simulator merge \
947             pass and cannot be emitted by the non-simulation builder"
948        );
949    }
950}
951
952#[cfg(feature = "build")]
953pub enum BuildersOrCallback<'a, L, N>
954where
955    L: FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
956    N: FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
957{
958    Builders(&'a mut dyn DfirBuilder),
959    Callback(L, N),
960}
961
962/// An root in a Hydro graph, which is an pipeline that doesn't emit
963/// any downstream values. Traversals over the dataflow graph and
964/// generating DFIR IR start from roots.
965#[derive(Debug, Hash, serde::Serialize)]
966pub enum HydroRoot {
967    ForEach {
968        f: ClosureExpr,
969        input: Box<HydroNode>,
970        op_metadata: HydroIrOpMetadata,
971    },
972    SendExternal {
973        to_external_key: LocationKey,
974        to_port_id: ExternalPortId,
975        to_many: bool,
976        unpaired: bool,
977        serialize_fn: Option<DebugExpr>,
978        instantiate_fn: DebugInstantiate,
979        input: Box<HydroNode>,
980        op_metadata: HydroIrOpMetadata,
981    },
982    DestSink {
983        sink: DebugExpr,
984        input: Box<HydroNode>,
985        op_metadata: HydroIrOpMetadata,
986    },
987    CycleSink {
988        cycle_id: CycleId,
989        input: Box<HydroNode>,
990        op_metadata: HydroIrOpMetadata,
991    },
992    EmbeddedOutput {
993        #[serde(serialize_with = "serialize_ident")]
994        ident: syn::Ident,
995        input: Box<HydroNode>,
996        op_metadata: HydroIrOpMetadata,
997    },
998    Null {
999        input: Box<HydroNode>,
1000        op_metadata: HydroIrOpMetadata,
1001    },
1002}
1003
1004impl HydroRoot {
1005    #[cfg(feature = "build")]
1006    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
1007    pub fn compile_network<'a, D>(
1008        &mut self,
1009        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
1010        seen_tees: &mut SeenSharedNodes,
1011        seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
1012        processes: &SparseSecondaryMap<LocationKey, D::Process>,
1013        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
1014        externals: &SparseSecondaryMap<LocationKey, D::External>,
1015        env: &mut D::InstantiateEnv,
1016    ) where
1017        D: Deploy<'a>,
1018    {
1019        let refcell_extra_stmts = RefCell::new(extra_stmts);
1020        let refcell_env = RefCell::new(env);
1021        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
1022        self.transform_bottom_up(
1023            &mut |l| {
1024                if let HydroRoot::SendExternal {
1025                    #[cfg(feature = "tokio")]
1026                    input,
1027                    #[cfg(feature = "tokio")]
1028                    to_external_key,
1029                    #[cfg(feature = "tokio")]
1030                    to_port_id,
1031                    #[cfg(feature = "tokio")]
1032                    to_many,
1033                    #[cfg(feature = "tokio")]
1034                    unpaired,
1035                    #[cfg(feature = "tokio")]
1036                    instantiate_fn,
1037                    ..
1038                } = l
1039                {
1040                    #[cfg(feature = "tokio")]
1041                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1042                        DebugInstantiate::Building => {
1043                            let to_node = externals
1044                                .get(*to_external_key)
1045                                .unwrap_or_else(|| {
1046                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
1047                                })
1048                                .clone();
1049
1050                            match input.metadata().location_id.root() {
1051                                &LocationId::Process(process_key) => {
1052                                    if *to_many {
1053                                        (
1054                                            (
1055                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1056                                                parse_quote!(DUMMY),
1057                                            ),
1058                                            Box::new(|| {}) as Box<dyn FnOnce()>,
1059                                        )
1060                                    } else {
1061                                        let from_node = processes
1062                                            .get(process_key)
1063                                            .unwrap_or_else(|| {
1064                                                panic!("A process used in the graph was not instantiated: {}", process_key)
1065                                            })
1066                                            .clone();
1067
1068                                        let sink_port = from_node.next_port();
1069                                        let source_port = to_node.next_port();
1070
1071                                        if *unpaired {
1072                                            use stageleft::quote_type;
1073                                            use tokio_util::codec::LengthDelimitedCodec;
1074
1075                                            to_node.register(*to_port_id, source_port.clone());
1076
1077                                            let _ = D::e2o_source(
1078                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1079                                                &to_node, &source_port,
1080                                                &from_node, &sink_port,
1081                                                &quote_type::<LengthDelimitedCodec>(),
1082                                                format!("{}_{}", *to_external_key, *to_port_id)
1083                                            );
1084                                        }
1085
1086                                        (
1087                                            (
1088                                                D::o2e_sink(
1089                                                    &from_node,
1090                                                    &sink_port,
1091                                                    &to_node,
1092                                                    &source_port,
1093                                                    format!("{}_{}", *to_external_key, *to_port_id)
1094                                                ),
1095                                                parse_quote!(DUMMY),
1096                                            ),
1097                                            if *unpaired {
1098                                                D::e2o_connect(
1099                                                    &to_node,
1100                                                    &source_port,
1101                                                    &from_node,
1102                                                    &sink_port,
1103                                                    *to_many,
1104                                                    NetworkHint::Auto,
1105                                                )
1106                                            } else {
1107                                                Box::new(|| {}) as Box<dyn FnOnce()>
1108                                            },
1109                                        )
1110                                    }
1111                                }
1112                                LocationId::Cluster(cluster_key) => {
1113                                    let from_node = clusters
1114                                        .get(*cluster_key)
1115                                        .unwrap_or_else(|| {
1116                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1117                                        })
1118                                        .clone();
1119
1120                                    let sink_port = from_node.next_port();
1121                                    let source_port = to_node.next_port();
1122
1123                                    if *unpaired {
1124                                        to_node.register(*to_port_id, source_port.clone());
1125                                    }
1126
1127                                    (
1128                                        (
1129                                            D::m2e_sink(
1130                                                &from_node,
1131                                                &sink_port,
1132                                                &to_node,
1133                                                &source_port,
1134                                                format!("{}_{}", *to_external_key, *to_port_id)
1135                                            ),
1136                                            parse_quote!(DUMMY),
1137                                        ),
1138                                        Box::new(|| {}) as Box<dyn FnOnce()>,
1139                                    )
1140                                }
1141                                _ => panic!()
1142                            }
1143                        },
1144
1145                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1146                    };
1147
1148                    #[cfg(not(feature = "tokio"))]
1149                    {
1150                        panic!("Cannot instantiate external inputs without tokio");
1151                    };
1152
1153                    #[cfg(feature = "tokio")]
1154                    {
1155                        *instantiate_fn = DebugInstantiateFinalized {
1156                            sink: sink_expr,
1157                            source: source_expr,
1158                            connect_fn: Some(connect_fn),
1159                        }
1160                        .into();
1161                    };
1162                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1163                    let element_type = match &input.metadata().collection_kind {
1164                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1165                        _ => panic!("Embedded output must have Stream collection kind"),
1166                    };
1167                    let location_key = match input.metadata().location_id.root() {
1168                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1169                        _ => panic!("Embedded output must be on a process or cluster"),
1170                    };
1171                    D::register_embedded_output(
1172                        &mut refcell_env.borrow_mut(),
1173                        location_key,
1174                        ident,
1175                        &element_type,
1176                    );
1177                }
1178            },
1179            &mut |n| {
1180                if let HydroNode::Network {
1181                    name,
1182                    networking_info,
1183                    input,
1184                    instantiate_fn,
1185                    metadata,
1186                    ..
1187                } = n
1188                {
1189                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1190                        DebugInstantiate::Building => instantiate_network::<D>(
1191                            &mut refcell_env.borrow_mut(),
1192                            input.metadata().location_id.root(),
1193                            metadata.location_id.root(),
1194                            processes,
1195                            clusters,
1196                            name.as_deref(),
1197                            networking_info,
1198                        ),
1199
1200                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1201                    };
1202
1203                    *instantiate_fn = DebugInstantiateFinalized {
1204                        sink: sink_expr,
1205                        source: source_expr,
1206                        connect_fn: Some(connect_fn),
1207                    }
1208                    .into();
1209                } else if let HydroNode::ExternalInput {
1210                    from_external_key,
1211                    from_port_id,
1212                    from_many,
1213                    codec_type,
1214                    port_hint,
1215                    instantiate_fn,
1216                    metadata,
1217                    ..
1218                } = n
1219                {
1220                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1221                        DebugInstantiate::Building => {
1222                            let from_node = externals
1223                                .get(*from_external_key)
1224                                .unwrap_or_else(|| {
1225                                    panic!(
1226                                        "A external used in the graph was not instantiated: {}",
1227                                        from_external_key,
1228                                    )
1229                                })
1230                                .clone();
1231
1232                            match metadata.location_id.root() {
1233                                &LocationId::Process(process_key) => {
1234                                    let to_node = processes
1235                                        .get(process_key)
1236                                        .unwrap_or_else(|| {
1237                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1238                                        })
1239                                        .clone();
1240
1241                                    let sink_port = from_node.next_port();
1242                                    let source_port = to_node.next_port();
1243
1244                                    from_node.register(*from_port_id, sink_port.clone());
1245
1246                                    (
1247                                        (
1248                                            parse_quote!(DUMMY),
1249                                            if *from_many {
1250                                                D::e2o_many_source(
1251                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1252                                                    &to_node, &source_port,
1253                                                    codec_type.0.as_ref(),
1254                                                    format!("{}_{}", *from_external_key, *from_port_id)
1255                                                )
1256                                            } else {
1257                                                D::e2o_source(
1258                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1259                                                    &from_node, &sink_port,
1260                                                    &to_node, &source_port,
1261                                                    codec_type.0.as_ref(),
1262                                                    format!("{}_{}", *from_external_key, *from_port_id)
1263                                                )
1264                                            },
1265                                        ),
1266                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1267                                    )
1268                                }
1269                                LocationId::Cluster(cluster_key) => {
1270                                    let to_node = clusters
1271                                        .get(*cluster_key)
1272                                        .unwrap_or_else(|| {
1273                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1274                                        })
1275                                        .clone();
1276
1277                                    let sink_port = from_node.next_port();
1278                                    let source_port = to_node.next_port();
1279
1280                                    from_node.register(*from_port_id, sink_port.clone());
1281
1282                                    (
1283                                        (
1284                                            parse_quote!(DUMMY),
1285                                            D::e2m_source(
1286                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1287                                                &from_node, &sink_port,
1288                                                &to_node, &source_port,
1289                                                codec_type.0.as_ref(),
1290                                                format!("{}_{}", *from_external_key, *from_port_id)
1291                                            ),
1292                                        ),
1293                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1294                                    )
1295                                }
1296                                _ => panic!()
1297                            }
1298                        },
1299
1300                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1301                    };
1302
1303                    *instantiate_fn = DebugInstantiateFinalized {
1304                        sink: sink_expr,
1305                        source: source_expr,
1306                        connect_fn: Some(connect_fn),
1307                    }
1308                    .into();
1309                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1310                    let element_type = match &metadata.collection_kind {
1311                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1312                        _ => panic!("Embedded source must have Stream collection kind"),
1313                    };
1314                    let location_key = match metadata.location_id.root() {
1315                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1316                        _ => panic!("Embedded source must be on a process or cluster"),
1317                    };
1318                    D::register_embedded_stream_input(
1319                        &mut refcell_env.borrow_mut(),
1320                        location_key,
1321                        ident,
1322                        &element_type,
1323                    );
1324                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1325                    let element_type = match &metadata.collection_kind {
1326                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1327                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1328                    };
1329                    let location_key = match metadata.location_id.root() {
1330                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1331                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1332                    };
1333                    D::register_embedded_singleton_input(
1334                        &mut refcell_env.borrow_mut(),
1335                        location_key,
1336                        ident,
1337                        &element_type,
1338                    );
1339                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1340                    match state {
1341                        ClusterMembersState::Uninit => {
1342                            let at_location = metadata.location_id.root().clone();
1343                            let key = (at_location.clone(), location_id.key());
1344                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1345                                // First occurrence: call cluster_membership_stream and mark as Stream.
1346                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1347                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1348                                    &(),
1349                                );
1350                                *state = ClusterMembersState::Stream(expr.into());
1351                            } else {
1352                                // Already instantiated for this (at, target) pair: just tee.
1353                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1354                            }
1355                        }
1356                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1357                            panic!("cluster members already finalized");
1358                        }
1359                    }
1360                }
1361            },
1362            seen_tees,
1363            false,
1364        );
1365    }
1366
1367    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1368        self.transform_bottom_up(
1369            &mut |l| {
1370                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1371                    match instantiate_fn {
1372                        DebugInstantiate::Building => panic!("network not built"),
1373
1374                        DebugInstantiate::Finalized(finalized) => {
1375                            (finalized.connect_fn.take().unwrap())();
1376                        }
1377                    }
1378                }
1379            },
1380            &mut |n| {
1381                if let HydroNode::Network { instantiate_fn, .. }
1382                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1383                {
1384                    match instantiate_fn {
1385                        DebugInstantiate::Building => panic!("network not built"),
1386
1387                        DebugInstantiate::Finalized(finalized) => {
1388                            (finalized.connect_fn.take().unwrap())();
1389                        }
1390                    }
1391                }
1392            },
1393            seen_tees,
1394            false,
1395        );
1396    }
1397
1398    pub fn transform_bottom_up(
1399        &mut self,
1400        transform_root: &mut impl FnMut(&mut HydroRoot),
1401        transform_node: &mut impl FnMut(&mut HydroNode),
1402        seen_tees: &mut SeenSharedNodes,
1403        check_well_formed: bool,
1404    ) {
1405        self.transform_children(
1406            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1407            seen_tees,
1408        );
1409
1410        transform_root(self);
1411    }
1412
1413    pub fn transform_children(
1414        &mut self,
1415        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1416        seen_tees: &mut SeenSharedNodes,
1417    ) {
1418        match self {
1419            HydroRoot::ForEach { f, input, .. } => {
1420                f.transform_children(&mut transform, seen_tees);
1421                transform(input, seen_tees);
1422            }
1423            HydroRoot::SendExternal { input, .. }
1424            | HydroRoot::DestSink { input, .. }
1425            | HydroRoot::CycleSink { input, .. }
1426            | HydroRoot::EmbeddedOutput { input, .. }
1427            | HydroRoot::Null { input, .. } => {
1428                transform(input, seen_tees);
1429            }
1430        }
1431    }
1432
1433    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1434        match self {
1435            HydroRoot::ForEach {
1436                f,
1437                input,
1438                op_metadata,
1439            } => HydroRoot::ForEach {
1440                f: f.deep_clone(seen_tees),
1441                input: Box::new(input.deep_clone(seen_tees)),
1442                op_metadata: op_metadata.clone(),
1443            },
1444            HydroRoot::SendExternal {
1445                to_external_key,
1446                to_port_id,
1447                to_many,
1448                unpaired,
1449                serialize_fn,
1450                instantiate_fn,
1451                input,
1452                op_metadata,
1453            } => HydroRoot::SendExternal {
1454                to_external_key: *to_external_key,
1455                to_port_id: *to_port_id,
1456                to_many: *to_many,
1457                unpaired: *unpaired,
1458                serialize_fn: serialize_fn.clone(),
1459                instantiate_fn: instantiate_fn.clone(),
1460                input: Box::new(input.deep_clone(seen_tees)),
1461                op_metadata: op_metadata.clone(),
1462            },
1463            HydroRoot::DestSink {
1464                sink,
1465                input,
1466                op_metadata,
1467            } => HydroRoot::DestSink {
1468                sink: sink.clone(),
1469                input: Box::new(input.deep_clone(seen_tees)),
1470                op_metadata: op_metadata.clone(),
1471            },
1472            HydroRoot::CycleSink {
1473                cycle_id,
1474                input,
1475                op_metadata,
1476            } => HydroRoot::CycleSink {
1477                cycle_id: *cycle_id,
1478                input: Box::new(input.deep_clone(seen_tees)),
1479                op_metadata: op_metadata.clone(),
1480            },
1481            HydroRoot::EmbeddedOutput {
1482                ident,
1483                input,
1484                op_metadata,
1485            } => HydroRoot::EmbeddedOutput {
1486                ident: ident.clone(),
1487                input: Box::new(input.deep_clone(seen_tees)),
1488                op_metadata: op_metadata.clone(),
1489            },
1490            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1491                input: Box::new(input.deep_clone(seen_tees)),
1492                op_metadata: op_metadata.clone(),
1493            },
1494        }
1495    }
1496
1497    #[cfg(feature = "build")]
1498    pub fn emit(
1499        &mut self,
1500        graph_builders: &mut dyn DfirBuilder,
1501        seen_tees: &mut SeenSharedNodes,
1502        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1503        next_stmt_id: &mut crate::Counter<StmtId>,
1504        fold_hooked_idents: &mut HashSet<String>,
1505    ) {
1506        self.emit_core(
1507            &mut BuildersOrCallback::<
1508                fn(&mut HydroRoot, &mut crate::Counter<StmtId>),
1509                fn(&mut HydroNode, &mut crate::Counter<StmtId>),
1510            >::Builders(graph_builders),
1511            seen_tees,
1512            built_tees,
1513            next_stmt_id,
1514            fold_hooked_idents,
1515        );
1516    }
1517
1518    #[cfg(feature = "build")]
1519    pub fn emit_core(
1520        &mut self,
1521        builders_or_callback: &mut BuildersOrCallback<
1522            impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1523            impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1524        >,
1525        seen_tees: &mut SeenSharedNodes,
1526        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1527        next_stmt_id: &mut crate::Counter<StmtId>,
1528        fold_hooked_idents: &mut HashSet<String>,
1529    ) {
1530        match self {
1531            HydroRoot::ForEach { f, input, .. } => {
1532                let input_ident = input.emit_core(
1533                    builders_or_callback,
1534                    seen_tees,
1535                    built_tees,
1536                    next_stmt_id,
1537                    fold_hooked_idents,
1538                );
1539
1540                let stmt_id = next_stmt_id.get_and_increment();
1541
1542                match builders_or_callback {
1543                    BuildersOrCallback::Builders(graph_builders) => {
1544                        let mut ident_stack: Vec<syn::Ident> = Vec::new();
1545
1546                        // Look up each captured ref's ident from built_tees
1547                        for (ref_node, _is_mut) in f.singleton_refs.iter() {
1548                            let HydroNode::Reference { inner, .. } = ref_node else {
1549                                panic!("singleton_refs should only contain HydroNode::Reference");
1550                            };
1551                            let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1552                            let idents = built_tees.get(&ptr).expect(
1553                                "ForEach singleton ref not found in built_tees — ref node was not emitted",
1554                            );
1555                            ident_stack.push(idents[0].clone());
1556                        }
1557
1558                        let f_tokens = f.emit_tokens(&mut ident_stack);
1559
1560                        graph_builders
1561                            .get_dfir_mut(&input.metadata().location_id)
1562                            .add_dfir(
1563                                parse_quote! {
1564                                    #input_ident -> for_each(#f_tokens);
1565                                },
1566                                None,
1567                                Some(&stmt_id.to_string()),
1568                            );
1569                    }
1570                    BuildersOrCallback::Callback(leaf_callback, _) => {
1571                        leaf_callback(self, next_stmt_id);
1572                    }
1573                }
1574            }
1575
1576            HydroRoot::SendExternal {
1577                serialize_fn,
1578                instantiate_fn,
1579                input,
1580                ..
1581            } => {
1582                let input_ident = input.emit_core(
1583                    builders_or_callback,
1584                    seen_tees,
1585                    built_tees,
1586                    next_stmt_id,
1587                    fold_hooked_idents,
1588                );
1589
1590                let stmt_id = next_stmt_id.get_and_increment();
1591
1592                match builders_or_callback {
1593                    BuildersOrCallback::Builders(graph_builders) => {
1594                        let (sink_expr, _) = match instantiate_fn {
1595                            DebugInstantiate::Building => (
1596                                syn::parse_quote!(DUMMY_SINK),
1597                                syn::parse_quote!(DUMMY_SOURCE),
1598                            ),
1599
1600                            DebugInstantiate::Finalized(finalized) => {
1601                                (finalized.sink.clone(), finalized.source.clone())
1602                            }
1603                        };
1604
1605                        graph_builders.create_external_output(
1606                            &input.metadata().location_id,
1607                            sink_expr,
1608                            &input_ident,
1609                            serialize_fn.as_ref(),
1610                            stmt_id,
1611                        );
1612                    }
1613                    BuildersOrCallback::Callback(leaf_callback, _) => {
1614                        leaf_callback(self, next_stmt_id);
1615                    }
1616                }
1617            }
1618
1619            HydroRoot::DestSink { sink, input, .. } => {
1620                let input_ident = input.emit_core(
1621                    builders_or_callback,
1622                    seen_tees,
1623                    built_tees,
1624                    next_stmt_id,
1625                    fold_hooked_idents,
1626                );
1627
1628                let stmt_id = next_stmt_id.get_and_increment();
1629
1630                match builders_or_callback {
1631                    BuildersOrCallback::Builders(graph_builders) => {
1632                        graph_builders
1633                            .get_dfir_mut(&input.metadata().location_id)
1634                            .add_dfir(
1635                                parse_quote! {
1636                                    #input_ident -> dest_sink(#sink);
1637                                },
1638                                None,
1639                                Some(&stmt_id.to_string()),
1640                            );
1641                    }
1642                    BuildersOrCallback::Callback(leaf_callback, _) => {
1643                        leaf_callback(self, next_stmt_id);
1644                    }
1645                }
1646            }
1647
1648            HydroRoot::CycleSink {
1649                cycle_id, input, ..
1650            } => {
1651                let input_ident = input.emit_core(
1652                    builders_or_callback,
1653                    seen_tees,
1654                    built_tees,
1655                    next_stmt_id,
1656                    fold_hooked_idents,
1657                );
1658
1659                match builders_or_callback {
1660                    BuildersOrCallback::Builders(graph_builders) => {
1661                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1662                            CollectionKind::KeyedSingleton {
1663                                key_type,
1664                                value_type,
1665                                ..
1666                            }
1667                            | CollectionKind::KeyedStream {
1668                                key_type,
1669                                value_type,
1670                                ..
1671                            } => {
1672                                parse_quote!((#key_type, #value_type))
1673                            }
1674                            CollectionKind::Stream { element_type, .. }
1675                            | CollectionKind::Singleton { element_type, .. }
1676                            | CollectionKind::Optional { element_type, .. } => {
1677                                parse_quote!(#element_type)
1678                            }
1679                        };
1680
1681                        let cycle_id_ident = cycle_id.as_ident();
1682                        graph_builders
1683                            .get_dfir_mut(&input.metadata().location_id)
1684                            .add_dfir(
1685                                parse_quote! {
1686                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1687                                },
1688                                None,
1689                                None,
1690                            );
1691                    }
1692                    // No ID, no callback
1693                    BuildersOrCallback::Callback(_, _) => {}
1694                }
1695            }
1696
1697            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1698                let input_ident = input.emit_core(
1699                    builders_or_callback,
1700                    seen_tees,
1701                    built_tees,
1702                    next_stmt_id,
1703                    fold_hooked_idents,
1704                );
1705
1706                let stmt_id = next_stmt_id.get_and_increment();
1707
1708                match builders_or_callback {
1709                    BuildersOrCallback::Builders(graph_builders) => {
1710                        graph_builders
1711                            .get_dfir_mut(&input.metadata().location_id)
1712                            .add_dfir(
1713                                parse_quote! {
1714                                    #input_ident -> for_each(&mut #ident);
1715                                },
1716                                None,
1717                                Some(&stmt_id.to_string()),
1718                            );
1719                    }
1720                    BuildersOrCallback::Callback(leaf_callback, _) => {
1721                        leaf_callback(self, next_stmt_id);
1722                    }
1723                }
1724            }
1725
1726            HydroRoot::Null { input, .. } => {
1727                let input_ident = input.emit_core(
1728                    builders_or_callback,
1729                    seen_tees,
1730                    built_tees,
1731                    next_stmt_id,
1732                    fold_hooked_idents,
1733                );
1734
1735                let stmt_id = next_stmt_id.get_and_increment();
1736
1737                match builders_or_callback {
1738                    BuildersOrCallback::Builders(graph_builders) => {
1739                        graph_builders
1740                            .get_dfir_mut(&input.metadata().location_id)
1741                            .add_dfir(
1742                                parse_quote! {
1743                                    #input_ident -> for_each(|_| {});
1744                                },
1745                                None,
1746                                Some(&stmt_id.to_string()),
1747                            );
1748                    }
1749                    BuildersOrCallback::Callback(leaf_callback, _) => {
1750                        leaf_callback(self, next_stmt_id);
1751                    }
1752                }
1753            }
1754        }
1755    }
1756
1757    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1758        match self {
1759            HydroRoot::ForEach { op_metadata, .. }
1760            | HydroRoot::SendExternal { op_metadata, .. }
1761            | HydroRoot::DestSink { op_metadata, .. }
1762            | HydroRoot::CycleSink { op_metadata, .. }
1763            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1764            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1765        }
1766    }
1767
1768    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1769        match self {
1770            HydroRoot::ForEach { op_metadata, .. }
1771            | HydroRoot::SendExternal { op_metadata, .. }
1772            | HydroRoot::DestSink { op_metadata, .. }
1773            | HydroRoot::CycleSink { op_metadata, .. }
1774            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1775            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1776        }
1777    }
1778
1779    pub fn input(&self) -> &HydroNode {
1780        match self {
1781            HydroRoot::ForEach { input, .. }
1782            | HydroRoot::SendExternal { input, .. }
1783            | HydroRoot::DestSink { input, .. }
1784            | HydroRoot::CycleSink { input, .. }
1785            | HydroRoot::EmbeddedOutput { input, .. }
1786            | HydroRoot::Null { input, .. } => input,
1787        }
1788    }
1789
1790    pub fn input_metadata(&self) -> &HydroIrMetadata {
1791        self.input().metadata()
1792    }
1793
1794    pub fn print_root(&self) -> String {
1795        match self {
1796            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1797            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1798            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1799            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1800            HydroRoot::EmbeddedOutput { ident, .. } => {
1801                format!("EmbeddedOutput({})", ident)
1802            }
1803            HydroRoot::Null { .. } => "Null".to_owned(),
1804        }
1805    }
1806
1807    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1808        match self {
1809            HydroRoot::ForEach { f, .. } => {
1810                transform(&mut f.expr);
1811            }
1812            HydroRoot::DestSink { sink, .. } => {
1813                transform(sink);
1814            }
1815            HydroRoot::SendExternal { .. }
1816            | HydroRoot::CycleSink { .. }
1817            | HydroRoot::EmbeddedOutput { .. }
1818            | HydroRoot::Null { .. } => {}
1819        }
1820    }
1821}
1822
1823#[cfg(feature = "build")]
1824fn tick_of(loc: &LocationId) -> Option<ClockId> {
1825    match loc {
1826        LocationId::Tick(id, _) => Some(*id),
1827        LocationId::Atomic(inner) => tick_of(inner),
1828        _ => None,
1829    }
1830}
1831
1832#[cfg(feature = "build")]
1833fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1834    match loc {
1835        LocationId::Tick(id, inner) => {
1836            *id = uf_find(uf, *id);
1837            remap_location(inner, uf);
1838        }
1839        LocationId::Atomic(inner) => {
1840            remap_location(inner, uf);
1841        }
1842        LocationId::Process(_) | LocationId::Cluster(_) => {}
1843    }
1844}
1845
1846#[cfg(feature = "build")]
1847fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1848    let p = *parent.get(&x).unwrap_or(&x);
1849    if p == x {
1850        return x;
1851    }
1852    let root = uf_find(parent, p);
1853    parent.insert(x, root);
1854    root
1855}
1856
1857#[cfg(feature = "build")]
1858fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1859    let ra = uf_find(parent, a);
1860    let rb = uf_find(parent, b);
1861    if ra != rb {
1862        parent.insert(ra, rb);
1863    }
1864}
1865
1866/// Traverse the IR to build a union-find that unifies tick IDs connected
1867/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1868/// rewrite all `LocationId`s to use the representative tick ID.
1869#[cfg(feature = "build")]
1870pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1871    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1872
1873    // Pass 1: collect unifications.
1874    transform_bottom_up(
1875        ir,
1876        &mut |_| {},
1877        &mut |node: &mut HydroNode| match node {
1878            HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1879                if let (Some(a), Some(b)) = (
1880                    tick_of(&inner.metadata().location_id),
1881                    tick_of(&metadata.location_id),
1882                ) {
1883                    uf_union(&mut uf, a, b);
1884                }
1885            }
1886            HydroNode::Chain {
1887                first,
1888                second,
1889                metadata,
1890            }
1891            | HydroNode::ChainFirst {
1892                first,
1893                second,
1894                metadata,
1895            }
1896            | HydroNode::MergeOrdered {
1897                first,
1898                second,
1899                metadata,
1900            } => {
1901                if let (Some(a), Some(b)) = (
1902                    tick_of(&first.metadata().location_id),
1903                    tick_of(&metadata.location_id),
1904                ) {
1905                    uf_union(&mut uf, a, b);
1906                }
1907                if let (Some(a), Some(b)) = (
1908                    tick_of(&second.metadata().location_id),
1909                    tick_of(&metadata.location_id),
1910                ) {
1911                    uf_union(&mut uf, a, b);
1912                }
1913            }
1914            _ => {}
1915        },
1916        false,
1917    );
1918
1919    // Pass 2: rewrite all LocationIds.
1920    transform_bottom_up(
1921        ir,
1922        &mut |_| {},
1923        &mut |node: &mut HydroNode| {
1924            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1925        },
1926        false,
1927    );
1928}
1929
1930#[cfg(feature = "build")]
1931pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1932    let mut builders = SecondaryMap::new();
1933    let mut seen_tees = HashMap::new();
1934    let mut built_tees = HashMap::new();
1935    let mut next_stmt_id = crate::Counter::<StmtId>::default();
1936    let mut fold_hooked_idents = HashSet::new();
1937    for leaf in ir {
1938        leaf.emit(
1939            &mut builders,
1940            &mut seen_tees,
1941            &mut built_tees,
1942            &mut next_stmt_id,
1943            &mut fold_hooked_idents,
1944        );
1945    }
1946    builders
1947}
1948
1949#[cfg(feature = "build")]
1950pub fn traverse_dfir(
1951    ir: &mut [HydroRoot],
1952    transform_root: impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1953    transform_node: impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1954) {
1955    let mut seen_tees = HashMap::new();
1956    let mut built_tees = HashMap::new();
1957    let mut next_stmt_id = crate::Counter::<StmtId>::default();
1958    let mut fold_hooked_idents = HashSet::new();
1959    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1960    ir.iter_mut().for_each(|leaf| {
1961        leaf.emit_core(
1962            &mut callback,
1963            &mut seen_tees,
1964            &mut built_tees,
1965            &mut next_stmt_id,
1966            &mut fold_hooked_idents,
1967        );
1968    });
1969}
1970
1971pub fn transform_bottom_up(
1972    ir: &mut [HydroRoot],
1973    transform_root: &mut impl FnMut(&mut HydroRoot),
1974    transform_node: &mut impl FnMut(&mut HydroNode),
1975    check_well_formed: bool,
1976) {
1977    let mut seen_tees = HashMap::new();
1978    ir.iter_mut().for_each(|leaf| {
1979        leaf.transform_bottom_up(
1980            transform_root,
1981            transform_node,
1982            &mut seen_tees,
1983            check_well_formed,
1984        );
1985    });
1986}
1987
1988pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1989    let mut seen_tees = HashMap::new();
1990    ir.iter()
1991        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1992        .collect()
1993}
1994
1995type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1996thread_local! {
1997    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1998    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1999    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
2000    /// on subsequent encounters, preventing infinite loops.
2001    static SERIALIZED_SHARED: PrintedTees
2002        = const { RefCell::new(None) };
2003}
2004
2005pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
2006    PRINTED_TEES.with(|printed_tees| {
2007        let mut printed_tees_mut = printed_tees.borrow_mut();
2008        *printed_tees_mut = Some((0, HashMap::new()));
2009        drop(printed_tees_mut);
2010
2011        let ret = f();
2012
2013        let mut printed_tees_mut = printed_tees.borrow_mut();
2014        *printed_tees_mut = None;
2015
2016        ret
2017    })
2018}
2019
2020/// Runs `f` with a fresh shared-node deduplication scope for serialization.
2021/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
2022/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
2023/// back-reference.  The tracking state is restored when `f` returns or panics.
2024pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
2025    let _guard = SerializedSharedGuard::enter();
2026    f()
2027}
2028
2029/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
2030/// making `serialize_dedup_shared` re-entrant and panic-safe.
2031struct SerializedSharedGuard {
2032    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
2033}
2034
2035impl SerializedSharedGuard {
2036    fn enter() -> Self {
2037        let previous = SERIALIZED_SHARED.with(|cell| {
2038            let mut guard = cell.borrow_mut();
2039            guard.replace((0, HashMap::new()))
2040        });
2041        Self { previous }
2042    }
2043}
2044
2045impl Drop for SerializedSharedGuard {
2046    fn drop(&mut self) {
2047        SERIALIZED_SHARED.with(|cell| {
2048            *cell.borrow_mut() = self.previous.take();
2049        });
2050    }
2051}
2052
2053pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2054
2055impl serde::Serialize for SharedNode {
2056    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
2057    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
2058    /// same subtree every time and, if the graph ever contains a cycle, loop
2059    /// forever.
2060    ///
2061    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
2062    /// integer id.  The first time we see a pointer we assign it the next id and
2063    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
2064    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
2065    /// recursion.  Requires an active `serialize_dedup_shared` scope.
2066    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2067        SERIALIZED_SHARED.with(|cell| {
2068            let mut guard = cell.borrow_mut();
2069            // (next_id, pointer → assigned_id)
2070            let state = guard.as_mut().ok_or_else(|| {
2071                serde::ser::Error::custom(
2072                    "SharedNode serialization requires an active serialize_dedup_shared scope",
2073                )
2074            })?;
2075            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2076
2077            if let Some(&id) = state.1.get(&ptr) {
2078                drop(guard);
2079                use serde::ser::SerializeMap;
2080                let mut map = serializer.serialize_map(Some(1))?;
2081                map.serialize_entry("$shared_ref", &id)?;
2082                map.end()
2083            } else {
2084                let id = state.0;
2085                state.0 += 1;
2086                state.1.insert(ptr, id);
2087                drop(guard);
2088
2089                use serde::ser::SerializeMap;
2090                let mut map = serializer.serialize_map(Some(2))?;
2091                map.serialize_entry("$shared", &id)?;
2092                map.serialize_entry("node", &*self.0.borrow())?;
2093                map.end()
2094            }
2095        })
2096    }
2097}
2098
2099impl SharedNode {
2100    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2101        Rc::as_ptr(&self.0)
2102    }
2103}
2104
2105impl Debug for SharedNode {
2106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2107        PRINTED_TEES.with(|printed_tees| {
2108            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2109            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2110
2111            if let Some(printed_tees_mut) = printed_tees_mut {
2112                if let Some(existing) = printed_tees_mut
2113                    .1
2114                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2115                {
2116                    write!(f, "<shared {}>", existing)
2117                } else {
2118                    let next_id = printed_tees_mut.0;
2119                    printed_tees_mut.0 += 1;
2120                    printed_tees_mut
2121                        .1
2122                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2123                    drop(printed_tees_mut_borrow);
2124                    write!(f, "<shared {}>: ", next_id)?;
2125                    Debug::fmt(&self.0.borrow(), f)
2126                }
2127            } else {
2128                drop(printed_tees_mut_borrow);
2129                write!(f, "<shared>: ")?;
2130                Debug::fmt(&self.0.borrow(), f)
2131            }
2132        })
2133    }
2134}
2135
2136impl Hash for SharedNode {
2137    fn hash<H: Hasher>(&self, state: &mut H) {
2138        self.0.borrow_mut().hash(state);
2139    }
2140}
2141
2142/// A counter for tracking singleton access groups on a `HydroNode::Reference`.
2143///
2144/// Each mutable access increments the counter (before and after) to isolate itself in its own group;
2145/// immutable accesses share the current group.
2146#[derive(Debug)]
2147pub enum AccessCounter {
2148    Counting(Cell<u32>),
2149    Frozen(u32),
2150}
2151
2152impl AccessCounter {
2153    pub fn new() -> Self {
2154        Self::Counting(Cell::new(0))
2155    }
2156
2157    /// Assign the next access group for this reference.
2158    /// Mutable accesses get an isolated group (counter increments before and after).
2159    /// Immutable accesses share the current group.
2160    pub fn next_group(&self, is_mut: bool) -> Self {
2161        let AccessCounter::Counting(count) = self else {
2162            panic!("Cannot count on `AccessCounter::Frozen`");
2163        };
2164        let c = if is_mut {
2165            let c = count.get() + 1;
2166            count.set(c + 1);
2167            c
2168        } else {
2169            count.get()
2170        };
2171        Self::Frozen(c)
2172    }
2173
2174    /// Creates a frozen counter to prevent further counting.
2175    pub fn freeze(&self) -> Self {
2176        Self::Frozen(match self {
2177            Self::Counting(count) => count.get(),
2178            Self::Frozen(count) => *count,
2179        })
2180    }
2181
2182    pub fn frozen_group(&self) -> u32 {
2183        let Self::Frozen(count) = self else {
2184            panic!("`AccessCounter` not frozen");
2185        };
2186        *count
2187    }
2188}
2189
2190impl Default for AccessCounter {
2191    fn default() -> Self {
2192        Self::new()
2193    }
2194}
2195
2196impl Hash for AccessCounter {
2197    fn hash<H: Hasher>(&self, _state: &mut H) {
2198        // Access counter does not participate in hashing — it is runtime bookkeeping.
2199    }
2200}
2201
2202impl serde::Serialize for AccessCounter {
2203    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2204        let count = match self {
2205            AccessCounter::Counting(count) => count.get(),
2206            AccessCounter::Frozen(count) => *count,
2207        };
2208        count.serialize(serializer)
2209    }
2210}
2211
2212#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2213pub enum BoundKind {
2214    Unbounded,
2215    Bounded,
2216}
2217
2218#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2219pub enum StreamOrder {
2220    NoOrder,
2221    TotalOrder,
2222}
2223
2224#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2225pub enum StreamRetry {
2226    AtLeastOnce,
2227    ExactlyOnce,
2228}
2229
2230#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2231pub enum KeyedSingletonBoundKind {
2232    Unbounded,
2233    MonotonicKeys,
2234    MonotonicValue,
2235    BoundedValue,
2236    Bounded,
2237}
2238
2239#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2240pub enum SingletonBoundKind {
2241    Unbounded,
2242    Monotonic,
2243    Bounded,
2244}
2245
2246#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2247pub enum CollectionKind {
2248    Stream {
2249        bound: BoundKind,
2250        order: StreamOrder,
2251        retry: StreamRetry,
2252        element_type: DebugType,
2253    },
2254    Singleton {
2255        bound: SingletonBoundKind,
2256        element_type: DebugType,
2257    },
2258    Optional {
2259        bound: BoundKind,
2260        element_type: DebugType,
2261    },
2262    KeyedStream {
2263        bound: BoundKind,
2264        value_order: StreamOrder,
2265        value_retry: StreamRetry,
2266        key_type: DebugType,
2267        value_type: DebugType,
2268    },
2269    KeyedSingleton {
2270        bound: KeyedSingletonBoundKind,
2271        key_type: DebugType,
2272        value_type: DebugType,
2273    },
2274}
2275
2276impl CollectionKind {
2277    pub fn is_bounded(&self) -> bool {
2278        matches!(
2279            self,
2280            CollectionKind::Stream {
2281                bound: BoundKind::Bounded,
2282                ..
2283            } | CollectionKind::Singleton {
2284                bound: SingletonBoundKind::Bounded,
2285                ..
2286            } | CollectionKind::Optional {
2287                bound: BoundKind::Bounded,
2288                ..
2289            } | CollectionKind::KeyedStream {
2290                bound: BoundKind::Bounded,
2291                ..
2292            } | CollectionKind::KeyedSingleton {
2293                bound: KeyedSingletonBoundKind::Bounded,
2294                ..
2295            }
2296        )
2297    }
2298
2299    /// Returns whether this collection kind is already "strict" (TotalOrder + ExactlyOnce),
2300    /// meaning no non-determinism needs to be observed for mut closures.
2301    pub fn is_strict(&self) -> bool {
2302        match self {
2303            CollectionKind::Stream { order, retry, .. } => {
2304                *order == StreamOrder::TotalOrder && *retry == StreamRetry::ExactlyOnce
2305            }
2306            CollectionKind::KeyedStream {
2307                value_order,
2308                value_retry,
2309                ..
2310            } => {
2311                *value_order == StreamOrder::TotalOrder && *value_retry == StreamRetry::ExactlyOnce
2312            }
2313            // Singletons/Optionals/KeyedSingletons do not have observable
2314            // non-determinism other than snapshots / batching
2315            CollectionKind::Singleton { .. }
2316            | CollectionKind::Optional { .. }
2317            | CollectionKind::KeyedSingleton { .. } => true,
2318        }
2319    }
2320
2321    /// Creates a "strict" version of this kind with TotalOrder and ExactlyOnce.
2322    pub fn strict_kind(&self) -> CollectionKind {
2323        match self {
2324            CollectionKind::Stream {
2325                bound,
2326                element_type,
2327                ..
2328            } => CollectionKind::Stream {
2329                bound: bound.clone(),
2330                order: StreamOrder::TotalOrder,
2331                retry: StreamRetry::ExactlyOnce,
2332                element_type: element_type.clone(),
2333            },
2334            CollectionKind::KeyedStream {
2335                bound,
2336                key_type,
2337                value_type,
2338                ..
2339            } => CollectionKind::KeyedStream {
2340                bound: bound.clone(),
2341                value_order: StreamOrder::TotalOrder,
2342                value_retry: StreamRetry::ExactlyOnce,
2343                key_type: key_type.clone(),
2344                value_type: value_type.clone(),
2345            },
2346            other => other.clone(),
2347        }
2348    }
2349}
2350
2351#[derive(Clone, serde::Serialize)]
2352pub struct HydroIrMetadata {
2353    pub location_id: LocationId,
2354    pub collection_kind: CollectionKind,
2355    pub consistency: Option<ClusterConsistency>,
2356    pub cardinality: Option<usize>,
2357    pub tag: Option<String>,
2358    pub op: HydroIrOpMetadata,
2359}
2360
2361// HydroIrMetadata shouldn't be used to hash or compare
2362impl Hash for HydroIrMetadata {
2363    fn hash<H: Hasher>(&self, _: &mut H) {}
2364}
2365
2366impl PartialEq for HydroIrMetadata {
2367    fn eq(&self, _: &Self) -> bool {
2368        true
2369    }
2370}
2371
2372impl Eq for HydroIrMetadata {}
2373
2374impl Debug for HydroIrMetadata {
2375    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2376        f.debug_struct("HydroIrMetadata")
2377            .field("location_id", &self.location_id)
2378            .field("collection_kind", &self.collection_kind)
2379            .finish()
2380    }
2381}
2382
2383/// Metadata that is specific to the operator itself, rather than its outputs.
2384/// This is available on _both_ inner nodes and roots.
2385#[derive(Clone, serde::Serialize)]
2386pub struct HydroIrOpMetadata {
2387    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2388    pub backtrace: Backtrace,
2389    pub cpu_usage: Option<f64>,
2390    pub network_recv_cpu_usage: Option<f64>,
2391    pub id: Option<usize>,
2392}
2393
2394impl HydroIrOpMetadata {
2395    #[expect(
2396        clippy::new_without_default,
2397        reason = "explicit calls to new ensure correct backtrace bounds"
2398    )]
2399    pub fn new() -> HydroIrOpMetadata {
2400        Self::new_with_skip(1)
2401    }
2402
2403    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2404        HydroIrOpMetadata {
2405            backtrace: Backtrace::get_backtrace(2 + skip_count),
2406            cpu_usage: None,
2407            network_recv_cpu_usage: None,
2408            id: None,
2409        }
2410    }
2411}
2412
2413impl Debug for HydroIrOpMetadata {
2414    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2415        f.debug_struct("HydroIrOpMetadata").finish()
2416    }
2417}
2418
2419impl Hash for HydroIrOpMetadata {
2420    fn hash<H: Hasher>(&self, _: &mut H) {}
2421}
2422
2423/// An intermediate node in a Hydro graph, which consumes data
2424/// from upstream nodes and emits data to downstream nodes.
2425#[derive(Debug, Hash, serde::Serialize)]
2426pub enum HydroNode {
2427    Placeholder,
2428
2429    /// Manually "casts" between two different collection kinds.
2430    ///
2431    /// Using this IR node requires special care, since it bypasses many of Hydro's core
2432    /// correctness checks. In particular, the user must ensure that every possible
2433    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2434    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2435    /// collection. This ensures that the simulator does not miss any possible outputs.
2436    Cast {
2437        inner: Box<HydroNode>,
2438        metadata: HydroIrMetadata,
2439    },
2440
2441    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2442    /// interpretation of the input stream.
2443    ///
2444    /// In production, this simply passes through the input, but in simulation, this operator
2445    /// explicitly selects a randomized interpretation.
2446    ObserveNonDet {
2447        inner: Box<HydroNode>,
2448        trusted: bool, // if true, we do not need to simulate non-determinism
2449        metadata: HydroIrMetadata,
2450    },
2451
2452    Source {
2453        source: HydroSource,
2454        metadata: HydroIrMetadata,
2455    },
2456
2457    SingletonSource {
2458        value: DebugExpr,
2459        first_tick_only: bool,
2460        metadata: HydroIrMetadata,
2461    },
2462
2463    CycleSource {
2464        cycle_id: CycleId,
2465        metadata: HydroIrMetadata,
2466    },
2467
2468    Tee {
2469        inner: SharedNode,
2470        metadata: HydroIrMetadata,
2471    },
2472
2473    /// A reference materialization point. Wraps a SharedNode so that:
2474    /// - The pipe output delivers data to one consumer
2475    /// - `#var` references can borrow the value from the slot
2476    ///
2477    /// In DFIR codegen, emits `ident = inner_ident -> singleton()` or `-> optional()` or
2478    /// `-> handoff()` depending on `kind`.
2479    ///
2480    /// Uses the same `built_tees` dedup pattern as `Tee`.
2481    Reference {
2482        inner: SharedNode,
2483        kind: crate::handoff_ref::HandoffRefKind,
2484        access_counter: AccessCounter,
2485        metadata: HydroIrMetadata,
2486    },
2487
2488    Partition {
2489        inner: SharedNode,
2490        f: ClosureExpr,
2491        is_true: bool,
2492        metadata: HydroIrMetadata,
2493    },
2494
2495    BeginAtomic {
2496        inner: Box<HydroNode>,
2497        metadata: HydroIrMetadata,
2498    },
2499
2500    EndAtomic {
2501        inner: Box<HydroNode>,
2502        metadata: HydroIrMetadata,
2503    },
2504
2505    Batch {
2506        inner: Box<HydroNode>,
2507        metadata: HydroIrMetadata,
2508    },
2509
2510    YieldConcat {
2511        inner: Box<HydroNode>,
2512        metadata: HydroIrMetadata,
2513    },
2514
2515    Chain {
2516        first: Box<HydroNode>,
2517        second: Box<HydroNode>,
2518        metadata: HydroIrMetadata,
2519    },
2520
2521    MergeOrdered {
2522        first: Box<HydroNode>,
2523        second: Box<HydroNode>,
2524        metadata: HydroIrMetadata,
2525    },
2526
2527    ChainFirst {
2528        first: Box<HydroNode>,
2529        second: Box<HydroNode>,
2530        metadata: HydroIrMetadata,
2531    },
2532
2533    CrossProduct {
2534        left: Box<HydroNode>,
2535        right: Box<HydroNode>,
2536        metadata: HydroIrMetadata,
2537    },
2538
2539    CrossSingleton {
2540        left: Box<HydroNode>,
2541        right: Box<HydroNode>,
2542        metadata: HydroIrMetadata,
2543    },
2544
2545    Join {
2546        left: Box<HydroNode>,
2547        right: Box<HydroNode>,
2548        metadata: HydroIrMetadata,
2549    },
2550
2551    /// Asymmetric join where the right (build) side is bounded.
2552    /// The build side is accumulated (stratum-delayed) into a hash table,
2553    /// then the left (probe) side streams through preserving its ordering.
2554    JoinHalf {
2555        left: Box<HydroNode>,
2556        right: Box<HydroNode>,
2557        metadata: HydroIrMetadata,
2558    },
2559
2560    Difference {
2561        pos: Box<HydroNode>,
2562        neg: Box<HydroNode>,
2563        metadata: HydroIrMetadata,
2564    },
2565
2566    AntiJoin {
2567        pos: Box<HydroNode>,
2568        neg: Box<HydroNode>,
2569        metadata: HydroIrMetadata,
2570    },
2571
2572    ResolveFutures {
2573        input: Box<HydroNode>,
2574        metadata: HydroIrMetadata,
2575    },
2576    ResolveFuturesBlocking {
2577        input: Box<HydroNode>,
2578        metadata: HydroIrMetadata,
2579    },
2580    ResolveFuturesOrdered {
2581        input: Box<HydroNode>,
2582        metadata: HydroIrMetadata,
2583    },
2584
2585    Map {
2586        f: ClosureExpr,
2587        input: Box<HydroNode>,
2588        metadata: HydroIrMetadata,
2589    },
2590    FlatMap {
2591        f: ClosureExpr,
2592        input: Box<HydroNode>,
2593        metadata: HydroIrMetadata,
2594    },
2595    FlatMapStreamBlocking {
2596        f: ClosureExpr,
2597        input: Box<HydroNode>,
2598        metadata: HydroIrMetadata,
2599    },
2600    Filter {
2601        f: ClosureExpr,
2602        input: Box<HydroNode>,
2603        metadata: HydroIrMetadata,
2604    },
2605    FilterMap {
2606        f: ClosureExpr,
2607        input: Box<HydroNode>,
2608        metadata: HydroIrMetadata,
2609    },
2610
2611    DeferTick {
2612        input: Box<HydroNode>,
2613        metadata: HydroIrMetadata,
2614    },
2615    Enumerate {
2616        input: Box<HydroNode>,
2617        metadata: HydroIrMetadata,
2618    },
2619    Inspect {
2620        f: ClosureExpr,
2621        input: Box<HydroNode>,
2622        metadata: HydroIrMetadata,
2623    },
2624
2625    Unique {
2626        input: Box<HydroNode>,
2627        metadata: HydroIrMetadata,
2628    },
2629
2630    Sort {
2631        input: Box<HydroNode>,
2632        metadata: HydroIrMetadata,
2633    },
2634    Fold {
2635        init: ClosureExpr,
2636        acc: ClosureExpr,
2637        input: Box<HydroNode>,
2638        metadata: HydroIrMetadata,
2639    },
2640
2641    Scan {
2642        init: ClosureExpr,
2643        acc: ClosureExpr,
2644        input: Box<HydroNode>,
2645        metadata: HydroIrMetadata,
2646    },
2647    ScanAsyncBlocking {
2648        init: ClosureExpr,
2649        acc: ClosureExpr,
2650        input: Box<HydroNode>,
2651        metadata: HydroIrMetadata,
2652    },
2653    FoldKeyed {
2654        init: ClosureExpr,
2655        acc: ClosureExpr,
2656        input: Box<HydroNode>,
2657        metadata: HydroIrMetadata,
2658    },
2659
2660    Reduce {
2661        f: ClosureExpr,
2662        input: Box<HydroNode>,
2663        metadata: HydroIrMetadata,
2664    },
2665    ReduceKeyed {
2666        f: ClosureExpr,
2667        input: Box<HydroNode>,
2668        metadata: HydroIrMetadata,
2669    },
2670    ReduceKeyedWatermark {
2671        f: ClosureExpr,
2672        input: Box<HydroNode>,
2673        watermark: Box<HydroNode>,
2674        metadata: HydroIrMetadata,
2675    },
2676
2677    Network {
2678        name: Option<String>,
2679        networking_info: crate::networking::NetworkingInfo,
2680        serialize_fn: Option<DebugExpr>,
2681        instantiate_fn: DebugInstantiate,
2682        deserialize_fn: Option<DebugExpr>,
2683        input: Box<HydroNode>,
2684        metadata: HydroIrMetadata,
2685    },
2686
2687    VersionedNetworkFork {
2688        channel_id: u32,
2689        channel_name: String,
2690        senders: Vec<(u32, Box<HydroNode>, Option<DebugExpr>)>,
2691        metadata: HydroIrMetadata,
2692    },
2693
2694    VersionedNetwork {
2695        fork: SharedNode,
2696        version: u32,
2697        deserialize_fn: Option<DebugExpr>,
2698        metadata: HydroIrMetadata,
2699    },
2700
2701    ExternalInput {
2702        from_external_key: LocationKey,
2703        from_port_id: ExternalPortId,
2704        from_many: bool,
2705        codec_type: DebugType,
2706        #[serde(skip)]
2707        port_hint: NetworkHint,
2708        instantiate_fn: DebugInstantiate,
2709        deserialize_fn: Option<DebugExpr>,
2710        metadata: HydroIrMetadata,
2711    },
2712
2713    Counter {
2714        tag: String,
2715        duration: DebugExpr,
2716        prefix: String,
2717        input: Box<HydroNode>,
2718        metadata: HydroIrMetadata,
2719    },
2720
2721    AssertIsConsistent {
2722        inner: Box<HydroNode>,
2723        trusted: bool,
2724        metadata: HydroIrMetadata,
2725    },
2726
2727    UnboundSingleton {
2728        inner: Box<HydroNode>,
2729        metadata: HydroIrMetadata,
2730    },
2731}
2732
2733pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2734pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2735
2736/// If `f` has a mut singleton ref and `in_kind` is non-strict, emits an
2737/// `observe_for_mut` node and returns the new ident. Otherwise returns
2738/// `in_ident` unchanged. Always consumes a stmt_id when applicable.
2739#[cfg(feature = "build")]
2740fn maybe_observe_for_mut(
2741    f: &ClosureExpr,
2742    in_ident: syn::Ident,
2743    in_location: &LocationId,
2744    in_kind: &CollectionKind,
2745    op_meta: &HydroIrOpMetadata,
2746    builders_or_callback: &mut BuildersOrCallback<
2747        impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
2748        impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
2749    >,
2750    next_stmt_id: &mut crate::Counter<StmtId>,
2751) -> syn::Ident {
2752    if f.has_mut_ref() && !in_kind.is_strict() {
2753        let observe_stmt_id = next_stmt_id.get_and_increment();
2754        let observe_ident =
2755            syn::Ident::new(&format!("stream_{}", observe_stmt_id), Span::call_site());
2756        if let BuildersOrCallback::Builders(graph_builders) = builders_or_callback {
2757            graph_builders.observe_for_mut(in_location, in_ident, in_kind, &observe_ident, op_meta);
2758        }
2759        observe_ident
2760    } else {
2761        in_ident
2762    }
2763}
2764
2765impl HydroNode {
2766    pub fn transform_bottom_up(
2767        &mut self,
2768        transform: &mut impl FnMut(&mut HydroNode),
2769        seen_tees: &mut SeenSharedNodes,
2770        check_well_formed: bool,
2771    ) {
2772        self.transform_children(
2773            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2774            seen_tees,
2775        );
2776
2777        transform(self);
2778
2779        let self_location = self.metadata().location_id.root();
2780
2781        if check_well_formed {
2782            match &*self {
2783                HydroNode::Network { .. } => {}
2784                _ => {
2785                    self.input_metadata().iter().for_each(|i| {
2786                        if i.location_id.root() != self_location {
2787                            panic!(
2788                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2789                                i,
2790                                i.location_id.root(),
2791                                self,
2792                                self_location
2793                            )
2794                        }
2795                    });
2796                }
2797            }
2798        }
2799    }
2800
2801    #[inline(always)]
2802    pub fn transform_children(
2803        &mut self,
2804        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2805        seen_tees: &mut SeenSharedNodes,
2806    ) {
2807        match self {
2808            HydroNode::Placeholder => {
2809                panic!();
2810            }
2811
2812            HydroNode::Source { .. }
2813            | HydroNode::SingletonSource { .. }
2814            | HydroNode::CycleSource { .. }
2815            | HydroNode::ExternalInput { .. } => {}
2816
2817            HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2818                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2819                    *inner = SharedNode(transformed.clone());
2820                } else {
2821                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2822                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2823                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2824                    transform(&mut orig, seen_tees);
2825                    *transformed_cell.borrow_mut() = orig;
2826                    *inner = SharedNode(transformed_cell);
2827                }
2828            }
2829
2830            HydroNode::Partition { inner, f, .. } => {
2831                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2832                    *inner = SharedNode(transformed.clone());
2833                } else {
2834                    f.transform_children(&mut transform, seen_tees);
2835                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2836                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2837                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2838                    transform(&mut orig, seen_tees);
2839                    *transformed_cell.borrow_mut() = orig;
2840                    *inner = SharedNode(transformed_cell);
2841                }
2842            }
2843
2844            HydroNode::Cast { inner, .. }
2845            | HydroNode::ObserveNonDet { inner, .. }
2846            | HydroNode::BeginAtomic { inner, .. }
2847            | HydroNode::EndAtomic { inner, .. }
2848            | HydroNode::Batch { inner, .. }
2849            | HydroNode::YieldConcat { inner, .. }
2850            | HydroNode::UnboundSingleton { inner, .. }
2851            | HydroNode::AssertIsConsistent { inner, .. } => {
2852                transform(inner.as_mut(), seen_tees);
2853            }
2854
2855            HydroNode::Chain { first, second, .. } => {
2856                transform(first.as_mut(), seen_tees);
2857                transform(second.as_mut(), seen_tees);
2858            }
2859
2860            HydroNode::MergeOrdered { first, second, .. } => {
2861                transform(first.as_mut(), seen_tees);
2862                transform(second.as_mut(), seen_tees);
2863            }
2864
2865            HydroNode::ChainFirst { first, second, .. } => {
2866                transform(first.as_mut(), seen_tees);
2867                transform(second.as_mut(), seen_tees);
2868            }
2869
2870            HydroNode::CrossSingleton { left, right, .. }
2871            | HydroNode::CrossProduct { left, right, .. }
2872            | HydroNode::Join { left, right, .. }
2873            | HydroNode::JoinHalf { left, right, .. } => {
2874                transform(left.as_mut(), seen_tees);
2875                transform(right.as_mut(), seen_tees);
2876            }
2877
2878            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2879                transform(pos.as_mut(), seen_tees);
2880                transform(neg.as_mut(), seen_tees);
2881            }
2882
2883            HydroNode::Map { f, input, .. } => {
2884                f.transform_children(&mut transform, seen_tees);
2885                transform(input.as_mut(), seen_tees);
2886            }
2887            HydroNode::FlatMap { f, input, .. }
2888            | HydroNode::FlatMapStreamBlocking { f, input, .. }
2889            | HydroNode::Filter { f, input, .. }
2890            | HydroNode::FilterMap { f, input, .. }
2891            | HydroNode::Inspect { f, input, .. }
2892            | HydroNode::Reduce { f, input, .. }
2893            | HydroNode::ReduceKeyed { f, input, .. } => {
2894                f.transform_children(&mut transform, seen_tees);
2895                transform(input.as_mut(), seen_tees);
2896            }
2897            HydroNode::ReduceKeyedWatermark {
2898                f,
2899                input,
2900                watermark,
2901                ..
2902            } => {
2903                f.transform_children(&mut transform, seen_tees);
2904                transform(input.as_mut(), seen_tees);
2905                transform(watermark.as_mut(), seen_tees);
2906            }
2907            HydroNode::Fold {
2908                init, acc, input, ..
2909            }
2910            | HydroNode::Scan {
2911                init, acc, input, ..
2912            }
2913            | HydroNode::ScanAsyncBlocking {
2914                init, acc, input, ..
2915            }
2916            | HydroNode::FoldKeyed {
2917                init, acc, input, ..
2918            } => {
2919                init.transform_children(&mut transform, seen_tees);
2920                acc.transform_children(&mut transform, seen_tees);
2921                transform(input.as_mut(), seen_tees);
2922            }
2923            HydroNode::ResolveFutures { input, .. }
2924            | HydroNode::ResolveFuturesBlocking { input, .. }
2925            | HydroNode::ResolveFuturesOrdered { input, .. }
2926            | HydroNode::Sort { input, .. }
2927            | HydroNode::DeferTick { input, .. }
2928            | HydroNode::Enumerate { input, .. }
2929            | HydroNode::Unique { input, .. }
2930            | HydroNode::Network { input, .. }
2931            | HydroNode::Counter { input, .. } => {
2932                transform(input.as_mut(), seen_tees);
2933            }
2934
2935            HydroNode::VersionedNetworkFork { senders, .. } => {
2936                for (_version, sender, _serialize) in senders.iter_mut() {
2937                    transform(sender.as_mut(), seen_tees);
2938                }
2939            }
2940
2941            HydroNode::VersionedNetwork { fork, .. } => {
2942                if let Some(transformed) = seen_tees.get(&fork.as_ptr()) {
2943                    *fork = SharedNode(transformed.clone());
2944                } else {
2945                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2946                    seen_tees.insert(fork.as_ptr(), transformed_cell.clone());
2947                    let mut orig = fork.0.replace(HydroNode::Placeholder);
2948                    transform(&mut orig, seen_tees);
2949                    *transformed_cell.borrow_mut() = orig;
2950                    *fork = SharedNode(transformed_cell);
2951                }
2952            }
2953        }
2954    }
2955
2956    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2957        match self {
2958            HydroNode::Placeholder => HydroNode::Placeholder,
2959            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2960                inner: Box::new(inner.deep_clone(seen_tees)),
2961                metadata: metadata.clone(),
2962            },
2963            HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2964                inner: Box::new(inner.deep_clone(seen_tees)),
2965                metadata: metadata.clone(),
2966            },
2967            HydroNode::ObserveNonDet {
2968                inner,
2969                trusted,
2970                metadata,
2971            } => HydroNode::ObserveNonDet {
2972                inner: Box::new(inner.deep_clone(seen_tees)),
2973                trusted: *trusted,
2974                metadata: metadata.clone(),
2975            },
2976            HydroNode::AssertIsConsistent {
2977                inner,
2978                trusted,
2979                metadata,
2980            } => HydroNode::AssertIsConsistent {
2981                inner: Box::new(inner.deep_clone(seen_tees)),
2982                trusted: *trusted,
2983                metadata: metadata.clone(),
2984            },
2985            HydroNode::Source { source, metadata } => HydroNode::Source {
2986                source: source.clone(),
2987                metadata: metadata.clone(),
2988            },
2989            HydroNode::SingletonSource {
2990                value,
2991                first_tick_only,
2992                metadata,
2993            } => HydroNode::SingletonSource {
2994                value: value.clone(),
2995                first_tick_only: *first_tick_only,
2996                metadata: metadata.clone(),
2997            },
2998            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2999                cycle_id: *cycle_id,
3000                metadata: metadata.clone(),
3001            },
3002            HydroNode::Tee { inner, metadata }
3003            | HydroNode::Reference {
3004                inner, metadata, ..
3005            } => {
3006                let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
3007                    SharedNode(transformed.clone())
3008                } else {
3009                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
3010                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
3011                    let cloned = inner.0.borrow().deep_clone(seen_tees);
3012                    *new_rc.borrow_mut() = cloned;
3013                    SharedNode(new_rc)
3014                };
3015                if let HydroNode::Reference {
3016                    kind,
3017                    access_counter,
3018                    ..
3019                } = self
3020                {
3021                    HydroNode::Reference {
3022                        inner: cloned_inner,
3023                        kind: *kind,
3024                        access_counter: access_counter.freeze(),
3025                        metadata: metadata.clone(),
3026                    }
3027                } else {
3028                    HydroNode::Tee {
3029                        inner: cloned_inner,
3030                        metadata: metadata.clone(),
3031                    }
3032                }
3033            }
3034            HydroNode::Partition {
3035                inner,
3036                f,
3037                is_true,
3038                metadata,
3039            } => {
3040                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
3041                    HydroNode::Partition {
3042                        inner: SharedNode(transformed.clone()),
3043                        f: f.deep_clone(seen_tees),
3044                        is_true: *is_true,
3045                        metadata: metadata.clone(),
3046                    }
3047                } else {
3048                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
3049                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
3050                    let cloned = inner.0.borrow().deep_clone(seen_tees);
3051                    *new_rc.borrow_mut() = cloned;
3052                    HydroNode::Partition {
3053                        inner: SharedNode(new_rc),
3054                        f: f.deep_clone(seen_tees),
3055                        is_true: *is_true,
3056                        metadata: metadata.clone(),
3057                    }
3058                }
3059            }
3060            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
3061                inner: Box::new(inner.deep_clone(seen_tees)),
3062                metadata: metadata.clone(),
3063            },
3064            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
3065                inner: Box::new(inner.deep_clone(seen_tees)),
3066                metadata: metadata.clone(),
3067            },
3068            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
3069                inner: Box::new(inner.deep_clone(seen_tees)),
3070                metadata: metadata.clone(),
3071            },
3072            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
3073                inner: Box::new(inner.deep_clone(seen_tees)),
3074                metadata: metadata.clone(),
3075            },
3076            HydroNode::Chain {
3077                first,
3078                second,
3079                metadata,
3080            } => HydroNode::Chain {
3081                first: Box::new(first.deep_clone(seen_tees)),
3082                second: Box::new(second.deep_clone(seen_tees)),
3083                metadata: metadata.clone(),
3084            },
3085            HydroNode::MergeOrdered {
3086                first,
3087                second,
3088                metadata,
3089            } => HydroNode::MergeOrdered {
3090                first: Box::new(first.deep_clone(seen_tees)),
3091                second: Box::new(second.deep_clone(seen_tees)),
3092                metadata: metadata.clone(),
3093            },
3094            HydroNode::ChainFirst {
3095                first,
3096                second,
3097                metadata,
3098            } => HydroNode::ChainFirst {
3099                first: Box::new(first.deep_clone(seen_tees)),
3100                second: Box::new(second.deep_clone(seen_tees)),
3101                metadata: metadata.clone(),
3102            },
3103            HydroNode::CrossProduct {
3104                left,
3105                right,
3106                metadata,
3107            } => HydroNode::CrossProduct {
3108                left: Box::new(left.deep_clone(seen_tees)),
3109                right: Box::new(right.deep_clone(seen_tees)),
3110                metadata: metadata.clone(),
3111            },
3112            HydroNode::CrossSingleton {
3113                left,
3114                right,
3115                metadata,
3116            } => HydroNode::CrossSingleton {
3117                left: Box::new(left.deep_clone(seen_tees)),
3118                right: Box::new(right.deep_clone(seen_tees)),
3119                metadata: metadata.clone(),
3120            },
3121            HydroNode::Join {
3122                left,
3123                right,
3124                metadata,
3125            } => HydroNode::Join {
3126                left: Box::new(left.deep_clone(seen_tees)),
3127                right: Box::new(right.deep_clone(seen_tees)),
3128                metadata: metadata.clone(),
3129            },
3130            HydroNode::JoinHalf {
3131                left,
3132                right,
3133                metadata,
3134            } => HydroNode::JoinHalf {
3135                left: Box::new(left.deep_clone(seen_tees)),
3136                right: Box::new(right.deep_clone(seen_tees)),
3137                metadata: metadata.clone(),
3138            },
3139            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
3140                pos: Box::new(pos.deep_clone(seen_tees)),
3141                neg: Box::new(neg.deep_clone(seen_tees)),
3142                metadata: metadata.clone(),
3143            },
3144            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
3145                pos: Box::new(pos.deep_clone(seen_tees)),
3146                neg: Box::new(neg.deep_clone(seen_tees)),
3147                metadata: metadata.clone(),
3148            },
3149            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
3150                input: Box::new(input.deep_clone(seen_tees)),
3151                metadata: metadata.clone(),
3152            },
3153            HydroNode::ResolveFuturesBlocking { input, metadata } => {
3154                HydroNode::ResolveFuturesBlocking {
3155                    input: Box::new(input.deep_clone(seen_tees)),
3156                    metadata: metadata.clone(),
3157                }
3158            }
3159            HydroNode::ResolveFuturesOrdered { input, metadata } => {
3160                HydroNode::ResolveFuturesOrdered {
3161                    input: Box::new(input.deep_clone(seen_tees)),
3162                    metadata: metadata.clone(),
3163                }
3164            }
3165            HydroNode::Map { f, input, metadata } => HydroNode::Map {
3166                f: f.deep_clone(seen_tees),
3167                input: Box::new(input.deep_clone(seen_tees)),
3168                metadata: metadata.clone(),
3169            },
3170            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
3171                f: f.deep_clone(seen_tees),
3172                input: Box::new(input.deep_clone(seen_tees)),
3173                metadata: metadata.clone(),
3174            },
3175            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
3176                HydroNode::FlatMapStreamBlocking {
3177                    f: f.deep_clone(seen_tees),
3178                    input: Box::new(input.deep_clone(seen_tees)),
3179                    metadata: metadata.clone(),
3180                }
3181            }
3182            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
3183                f: f.deep_clone(seen_tees),
3184                input: Box::new(input.deep_clone(seen_tees)),
3185                metadata: metadata.clone(),
3186            },
3187            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
3188                f: f.deep_clone(seen_tees),
3189                input: Box::new(input.deep_clone(seen_tees)),
3190                metadata: metadata.clone(),
3191            },
3192            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
3193                input: Box::new(input.deep_clone(seen_tees)),
3194                metadata: metadata.clone(),
3195            },
3196            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
3197                input: Box::new(input.deep_clone(seen_tees)),
3198                metadata: metadata.clone(),
3199            },
3200            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
3201                f: f.deep_clone(seen_tees),
3202                input: Box::new(input.deep_clone(seen_tees)),
3203                metadata: metadata.clone(),
3204            },
3205            HydroNode::Unique { input, metadata } => HydroNode::Unique {
3206                input: Box::new(input.deep_clone(seen_tees)),
3207                metadata: metadata.clone(),
3208            },
3209            HydroNode::Sort { input, metadata } => HydroNode::Sort {
3210                input: Box::new(input.deep_clone(seen_tees)),
3211                metadata: metadata.clone(),
3212            },
3213            HydroNode::Fold {
3214                init,
3215                acc,
3216                input,
3217                metadata,
3218            } => HydroNode::Fold {
3219                init: init.deep_clone(seen_tees),
3220                acc: acc.deep_clone(seen_tees),
3221                input: Box::new(input.deep_clone(seen_tees)),
3222                metadata: metadata.clone(),
3223            },
3224            HydroNode::Scan {
3225                init,
3226                acc,
3227                input,
3228                metadata,
3229            } => HydroNode::Scan {
3230                init: init.deep_clone(seen_tees),
3231                acc: acc.deep_clone(seen_tees),
3232                input: Box::new(input.deep_clone(seen_tees)),
3233                metadata: metadata.clone(),
3234            },
3235            HydroNode::ScanAsyncBlocking {
3236                init,
3237                acc,
3238                input,
3239                metadata,
3240            } => HydroNode::ScanAsyncBlocking {
3241                init: init.deep_clone(seen_tees),
3242                acc: acc.deep_clone(seen_tees),
3243                input: Box::new(input.deep_clone(seen_tees)),
3244                metadata: metadata.clone(),
3245            },
3246            HydroNode::FoldKeyed {
3247                init,
3248                acc,
3249                input,
3250                metadata,
3251            } => HydroNode::FoldKeyed {
3252                init: init.deep_clone(seen_tees),
3253                acc: acc.deep_clone(seen_tees),
3254                input: Box::new(input.deep_clone(seen_tees)),
3255                metadata: metadata.clone(),
3256            },
3257            HydroNode::ReduceKeyedWatermark {
3258                f,
3259                input,
3260                watermark,
3261                metadata,
3262            } => HydroNode::ReduceKeyedWatermark {
3263                f: f.deep_clone(seen_tees),
3264                input: Box::new(input.deep_clone(seen_tees)),
3265                watermark: Box::new(watermark.deep_clone(seen_tees)),
3266                metadata: metadata.clone(),
3267            },
3268            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3269                f: f.deep_clone(seen_tees),
3270                input: Box::new(input.deep_clone(seen_tees)),
3271                metadata: metadata.clone(),
3272            },
3273            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3274                f: f.deep_clone(seen_tees),
3275                input: Box::new(input.deep_clone(seen_tees)),
3276                metadata: metadata.clone(),
3277            },
3278            HydroNode::Network {
3279                name,
3280                networking_info,
3281                serialize_fn,
3282                instantiate_fn,
3283                deserialize_fn,
3284                input,
3285                metadata,
3286            } => HydroNode::Network {
3287                name: name.clone(),
3288                networking_info: networking_info.clone(),
3289                serialize_fn: serialize_fn.clone(),
3290                instantiate_fn: instantiate_fn.clone(),
3291                deserialize_fn: deserialize_fn.clone(),
3292                input: Box::new(input.deep_clone(seen_tees)),
3293                metadata: metadata.clone(),
3294            },
3295            HydroNode::ExternalInput {
3296                from_external_key,
3297                from_port_id,
3298                from_many,
3299                codec_type,
3300                port_hint,
3301                instantiate_fn,
3302                deserialize_fn,
3303                metadata,
3304            } => HydroNode::ExternalInput {
3305                from_external_key: *from_external_key,
3306                from_port_id: *from_port_id,
3307                from_many: *from_many,
3308                codec_type: codec_type.clone(),
3309                port_hint: *port_hint,
3310                instantiate_fn: instantiate_fn.clone(),
3311                deserialize_fn: deserialize_fn.clone(),
3312                metadata: metadata.clone(),
3313            },
3314            HydroNode::Counter {
3315                tag,
3316                duration,
3317                prefix,
3318                input,
3319                metadata,
3320            } => HydroNode::Counter {
3321                tag: tag.clone(),
3322                duration: duration.clone(),
3323                prefix: prefix.clone(),
3324                input: Box::new(input.deep_clone(seen_tees)),
3325                metadata: metadata.clone(),
3326            },
3327            HydroNode::VersionedNetworkFork {
3328                channel_id,
3329                channel_name,
3330                senders,
3331                metadata,
3332            } => HydroNode::VersionedNetworkFork {
3333                channel_id: *channel_id,
3334                channel_name: channel_name.clone(),
3335                senders: senders
3336                    .iter()
3337                    .map(|(version, sender, serialize)| {
3338                        (
3339                            *version,
3340                            Box::new(sender.deep_clone(seen_tees)),
3341                            serialize.clone(),
3342                        )
3343                    })
3344                    .collect(),
3345                metadata: metadata.clone(),
3346            },
3347            HydroNode::VersionedNetwork {
3348                fork,
3349                version,
3350                deserialize_fn,
3351                metadata,
3352            } => {
3353                let cloned_fork = if let Some(transformed) = seen_tees.get(&fork.as_ptr()) {
3354                    SharedNode(transformed.clone())
3355                } else {
3356                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
3357                    seen_tees.insert(fork.as_ptr(), new_rc.clone());
3358                    let cloned = fork.0.borrow().deep_clone(seen_tees);
3359                    *new_rc.borrow_mut() = cloned;
3360                    SharedNode(new_rc)
3361                };
3362                HydroNode::VersionedNetwork {
3363                    fork: cloned_fork,
3364                    version: *version,
3365                    deserialize_fn: deserialize_fn.clone(),
3366                    metadata: metadata.clone(),
3367                }
3368            }
3369        }
3370    }
3371
3372    #[cfg(feature = "build")]
3373    pub fn emit_core(
3374        &mut self,
3375        builders_or_callback: &mut BuildersOrCallback<
3376            impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
3377            impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
3378        >,
3379        seen_tees: &mut SeenSharedNodes,
3380        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3381        next_stmt_id: &mut crate::Counter<StmtId>,
3382        fold_hooked_idents: &mut HashSet<String>,
3383    ) -> syn::Ident {
3384        let mut ident_stack: Vec<syn::Ident> = Vec::new();
3385
3386        self.transform_bottom_up(
3387            &mut |node: &mut HydroNode| {
3388                let out_location = node.metadata().location_id.clone();
3389                match node {
3390                    HydroNode::Placeholder => {
3391                        panic!()
3392                    }
3393
3394                    HydroNode::Cast { .. } => {
3395                        // Cast passes through the input ident unchanged
3396                        // The input ident is already on the stack from processing the child
3397                        let _ = next_stmt_id.get_and_increment();
3398                        match builders_or_callback {
3399                            BuildersOrCallback::Builders(_) => {}
3400                            BuildersOrCallback::Callback(_, node_callback) => {
3401                                node_callback(node, next_stmt_id);
3402                            }
3403                        }
3404                        // input_ident stays on stack as output
3405                    }
3406
3407                    HydroNode::UnboundSingleton { .. } => {
3408                        let inner_ident = ident_stack.pop().unwrap();
3409
3410                        let stmt_id = next_stmt_id.get_and_increment();
3411                        let out_ident =
3412                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3413
3414                        match builders_or_callback {
3415                            BuildersOrCallback::Builders(graph_builders) => {
3416                                if graph_builders.singleton_intermediates() {
3417                                    let builder = graph_builders.get_dfir_mut(&out_location);
3418                                    builder.add_dfir(
3419                                        parse_quote! {
3420                                            #out_ident = #inner_ident;
3421                                        },
3422                                        None,
3423                                        None,
3424                                    );
3425                                } else {
3426                                    let builder = graph_builders.get_dfir_mut(&out_location);
3427                                    builder.add_dfir(
3428                                        parse_quote! {
3429                                            #out_ident = #inner_ident -> persist::<'static>();
3430                                        },
3431                                        None,
3432                                        None,
3433                                    );
3434                                }
3435                            }
3436                            BuildersOrCallback::Callback(_, node_callback) => {
3437                                node_callback(node, next_stmt_id);
3438                            }
3439                        }
3440
3441                        ident_stack.push(out_ident);
3442                    }
3443
3444                    HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3445                        let inner_ident = ident_stack.pop().unwrap();
3446
3447                        let stmt_id = next_stmt_id.get_and_increment();
3448                        let out_ident =
3449                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3450
3451                        match builders_or_callback {
3452                            BuildersOrCallback::Builders(graph_builders) => {
3453                                graph_builders.assert_is_consistent(
3454                                    *trusted,
3455                                    &inner.metadata().location_id,
3456                                    inner_ident,
3457                                    &out_ident,
3458                                );
3459                            }
3460                            BuildersOrCallback::Callback(_, node_callback) => {
3461                                node_callback(node, next_stmt_id);
3462                            }
3463                        }
3464
3465                        ident_stack.push(out_ident);
3466                    }
3467
3468                    HydroNode::ObserveNonDet {
3469                        inner,
3470                        trusted,
3471                        metadata,
3472                        ..
3473                    } => {
3474                        let inner_ident = ident_stack.pop().unwrap();
3475
3476                        let stmt_id = next_stmt_id.get_and_increment();
3477                        let observe_ident =
3478                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3479
3480                        match builders_or_callback {
3481                            BuildersOrCallback::Builders(graph_builders) => {
3482                                graph_builders.observe_nondet(
3483                                    *trusted,
3484                                    &inner.metadata().location_id,
3485                                    inner_ident,
3486                                    &inner.metadata().collection_kind,
3487                                    &observe_ident,
3488                                    &metadata.collection_kind,
3489                                    &metadata.op,
3490                                );
3491                            }
3492                            BuildersOrCallback::Callback(_, node_callback) => {
3493                                node_callback(node, next_stmt_id);
3494                            }
3495                        }
3496
3497                        ident_stack.push(observe_ident);
3498                    }
3499
3500                    HydroNode::Batch {
3501                        inner, metadata, ..
3502                    } => {
3503                        let inner_ident = ident_stack.pop().unwrap();
3504
3505                        let stmt_id = next_stmt_id.get_and_increment();
3506                        let batch_ident =
3507                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3508
3509                        match builders_or_callback {
3510                            BuildersOrCallback::Builders(graph_builders) => {
3511                                graph_builders.batch(
3512                                    inner_ident,
3513                                    &inner.metadata().location_id,
3514                                    &inner.metadata().collection_kind,
3515                                    &batch_ident,
3516                                    &out_location,
3517                                    &metadata.op,
3518                                    fold_hooked_idents,
3519                                );
3520                            }
3521                            BuildersOrCallback::Callback(_, node_callback) => {
3522                                node_callback(node, next_stmt_id);
3523                            }
3524                        }
3525
3526                        ident_stack.push(batch_ident);
3527                    }
3528
3529                    HydroNode::YieldConcat { inner, .. } => {
3530                        let inner_ident = ident_stack.pop().unwrap();
3531
3532                        let stmt_id = next_stmt_id.get_and_increment();
3533                        let yield_ident =
3534                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3535
3536                        match builders_or_callback {
3537                            BuildersOrCallback::Builders(graph_builders) => {
3538                                graph_builders.yield_from_tick(
3539                                    inner_ident,
3540                                    &inner.metadata().location_id,
3541                                    &inner.metadata().collection_kind,
3542                                    &yield_ident,
3543                                    &out_location,
3544                                );
3545                            }
3546                            BuildersOrCallback::Callback(_, node_callback) => {
3547                                node_callback(node, next_stmt_id);
3548                            }
3549                        }
3550
3551                        ident_stack.push(yield_ident);
3552                    }
3553
3554                    HydroNode::BeginAtomic { inner, metadata } => {
3555                        let inner_ident = ident_stack.pop().unwrap();
3556
3557                        let stmt_id = next_stmt_id.get_and_increment();
3558                        let begin_ident =
3559                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3560
3561                        match builders_or_callback {
3562                            BuildersOrCallback::Builders(graph_builders) => {
3563                                graph_builders.begin_atomic(
3564                                    inner_ident,
3565                                    &inner.metadata().location_id,
3566                                    &inner.metadata().collection_kind,
3567                                    &begin_ident,
3568                                    &out_location,
3569                                    &metadata.op,
3570                                );
3571                            }
3572                            BuildersOrCallback::Callback(_, node_callback) => {
3573                                node_callback(node, next_stmt_id);
3574                            }
3575                        }
3576
3577                        ident_stack.push(begin_ident);
3578                    }
3579
3580                    HydroNode::EndAtomic { inner, .. } => {
3581                        let inner_ident = ident_stack.pop().unwrap();
3582
3583                        let stmt_id = next_stmt_id.get_and_increment();
3584                        let end_ident =
3585                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3586
3587                        match builders_or_callback {
3588                            BuildersOrCallback::Builders(graph_builders) => {
3589                                graph_builders.end_atomic(
3590                                    inner_ident,
3591                                    &inner.metadata().location_id,
3592                                    &inner.metadata().collection_kind,
3593                                    &end_ident,
3594                                );
3595                            }
3596                            BuildersOrCallback::Callback(_, node_callback) => {
3597                                node_callback(node, next_stmt_id);
3598                            }
3599                        }
3600
3601                        ident_stack.push(end_ident);
3602                    }
3603
3604                    HydroNode::Source {
3605                        source, metadata, ..
3606                    } => {
3607                        if let HydroSource::ExternalNetwork() = source {
3608                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3609                        } else {
3610                            let stmt_id = next_stmt_id.get_and_increment();
3611                            let source_ident =
3612                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3613
3614                            let source_stmt = match source {
3615                                HydroSource::Stream(expr) => {
3616                                    debug_assert!(metadata.location_id.is_top_level());
3617                                    parse_quote! {
3618                                        #source_ident = source_stream(#expr);
3619                                    }
3620                                }
3621
3622                                HydroSource::ExternalNetwork() => {
3623                                    unreachable!()
3624                                }
3625
3626                                HydroSource::Iter(expr) => {
3627                                    if metadata.location_id.is_top_level() {
3628                                        parse_quote! {
3629                                            #source_ident = source_iter(#expr);
3630                                        }
3631                                    } else {
3632                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
3633                                        parse_quote! {
3634                                            #source_ident = source_iter(#expr) -> persist::<'static>();
3635                                        }
3636                                    }
3637                                }
3638
3639                                HydroSource::Spin() => {
3640                                    debug_assert!(metadata.location_id.is_top_level());
3641                                    parse_quote! {
3642                                        #source_ident = spin();
3643                                    }
3644                                }
3645
3646                                HydroSource::ClusterMembers(target_loc, state) => {
3647                                    debug_assert!(metadata.location_id.is_top_level());
3648
3649                                    let members_tee_ident = syn::Ident::new(
3650                                        &format!(
3651                                            "__cluster_members_tee_{}_{}",
3652                                            metadata.location_id.root().key(),
3653                                            target_loc.key(),
3654                                        ),
3655                                        Span::call_site(),
3656                                    );
3657
3658                                    match state {
3659                                        ClusterMembersState::Stream(d) => {
3660                                            parse_quote! {
3661                                                #members_tee_ident = source_stream(#d) -> tee();
3662                                                #source_ident = #members_tee_ident;
3663                                            }
3664                                        },
3665                                        ClusterMembersState::Uninit => syn::parse_quote! {
3666                                            #source_ident = source_stream(DUMMY);
3667                                        },
3668                                        ClusterMembersState::Tee(..) => parse_quote! {
3669                                            #source_ident = #members_tee_ident;
3670                                        },
3671                                    }
3672                                }
3673
3674                                HydroSource::Embedded(ident) => {
3675                                    parse_quote! {
3676                                        #source_ident = source_stream(#ident);
3677                                    }
3678                                }
3679
3680                                HydroSource::EmbeddedSingleton(ident) => {
3681                                    parse_quote! {
3682                                        #source_ident = source_iter([#ident]);
3683                                    }
3684                                }
3685                            };
3686
3687                            match builders_or_callback {
3688                                BuildersOrCallback::Builders(graph_builders) => {
3689                                    let builder = graph_builders.get_dfir_mut(&out_location);
3690                                    builder.add_dfir(source_stmt, None, Some(&stmt_id.to_string()));
3691                                }
3692                                BuildersOrCallback::Callback(_, node_callback) => {
3693                                    node_callback(node, next_stmt_id);
3694                                }
3695                            }
3696
3697                            ident_stack.push(source_ident);
3698                        }
3699                    }
3700
3701                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3702                        let stmt_id = next_stmt_id.get_and_increment();
3703                        let source_ident =
3704                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3705
3706                        match builders_or_callback {
3707                            BuildersOrCallback::Builders(graph_builders) => {
3708                                let builder = graph_builders.get_dfir_mut(&out_location);
3709
3710                                if *first_tick_only {
3711                                    assert!(
3712                                        !metadata.location_id.is_top_level(),
3713                                        "first_tick_only SingletonSource must be inside a tick"
3714                                    );
3715                                }
3716
3717                                if *first_tick_only
3718                                    || (metadata.location_id.is_top_level()
3719                                        && metadata.collection_kind.is_bounded())
3720                                {
3721                                    builder.add_dfir(
3722                                        parse_quote! {
3723                                            #source_ident = source_iter([#value]);
3724                                        },
3725                                        None,
3726                                        Some(&stmt_id.to_string()),
3727                                    );
3728                                } else {
3729                                    builder.add_dfir(
3730                                        parse_quote! {
3731                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3732                                        },
3733                                        None,
3734                                        Some(&stmt_id.to_string()),
3735                                    );
3736                                }
3737                            }
3738                            BuildersOrCallback::Callback(_, node_callback) => {
3739                                node_callback(node, next_stmt_id);
3740                            }
3741                        }
3742
3743                        ident_stack.push(source_ident);
3744                    }
3745
3746                    HydroNode::CycleSource { cycle_id, .. } => {
3747                        let ident = cycle_id.as_ident();
3748
3749                        // consume a stmt id even though we did not emit anything so that we can instrument this
3750                        let _ = next_stmt_id.get_and_increment();
3751
3752                        match builders_or_callback {
3753                            BuildersOrCallback::Builders(_) => {}
3754                            BuildersOrCallback::Callback(_, node_callback) => {
3755                                node_callback(node, next_stmt_id);
3756                            }
3757                        }
3758
3759                        ident_stack.push(ident);
3760                    }
3761
3762                    HydroNode::Tee { inner, .. } => {
3763                        // we consume a stmt id regardless of if we emit the tee() operator,
3764                        // so that during rewrites we touch all recipients of the tee()
3765                        let stmt_id = next_stmt_id.get_and_increment();
3766
3767                        let ret_ident = if let Some(built_idents) =
3768                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3769                        {
3770                            match builders_or_callback {
3771                                BuildersOrCallback::Builders(_) => {}
3772                                BuildersOrCallback::Callback(_, node_callback) => {
3773                                    node_callback(node, next_stmt_id);
3774                                }
3775                            }
3776
3777                            built_idents[0].clone()
3778                        } else {
3779                            // The inner node was already processed by transform_bottom_up,
3780                            // so its ident is on the stack
3781                            let inner_ident = ident_stack.pop().unwrap();
3782
3783                            let tee_ident =
3784                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3785
3786                            built_tees.insert(
3787                                inner.0.as_ref() as *const RefCell<HydroNode>,
3788                                vec![tee_ident.clone()],
3789                            );
3790
3791                            match builders_or_callback {
3792                                BuildersOrCallback::Builders(graph_builders) => {
3793                                    // NOTE: With `forward_ref`, the fold codegen may not have
3794                                    // run yet when we reach this tee, so `fold_hooked_idents`
3795                                    // might not contain the inner ident. In that case we won't
3796                                    // propagate the "hooked" status to the tee and the
3797                                    // downstream singleton batch will use the normal
3798                                    // `SingletonHook` instead of `PassthroughSingletonHook`.
3799                                    // This is not a soundness issue: the fallback hook still
3800                                    // produces correct behavior, just with a redundant decision
3801                                    // point. TODO(https://github.com/hydro-project/hydro/issues/2856):
3802                                    // fix ordering so forward_ref folds are always processed
3803                                    // before their downstream tees.
3804                                    if fold_hooked_idents.contains(&inner_ident.to_string()) {
3805                                        fold_hooked_idents.insert(tee_ident.to_string());
3806                                    }
3807                                    let builder = graph_builders.get_dfir_mut(&out_location);
3808                                    builder.add_dfir(
3809                                        parse_quote! {
3810                                            #tee_ident = #inner_ident -> tee();
3811                                        },
3812                                        None,
3813                                        Some(&stmt_id.to_string()),
3814                                    );
3815                                }
3816                                BuildersOrCallback::Callback(_, node_callback) => {
3817                                    node_callback(node, next_stmt_id);
3818                                }
3819                            }
3820
3821                            tee_ident
3822                        };
3823
3824                        ident_stack.push(ret_ident);
3825                    }
3826
3827                    HydroNode::Reference { inner, kind, .. } => {
3828                        // we consume a stmt id regardless of if we emit the operator,
3829                        // so that during rewrites we touch all recipients
3830                        let stmt_id = next_stmt_id.get_and_increment();
3831
3832                        let ret_ident = if let Some(built_idents) =
3833                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3834                        {
3835                            built_idents[0].clone()
3836                        } else {
3837                            let inner_ident = ident_stack.pop().unwrap();
3838
3839                            let ref_ident =
3840                                syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3841
3842                            built_tees.insert(
3843                                inner.0.as_ref() as *const RefCell<HydroNode>,
3844                                vec![ref_ident.clone()],
3845                            );
3846
3847                            match builders_or_callback {
3848                                BuildersOrCallback::Builders(graph_builders) => {
3849                                    let builder = graph_builders.get_dfir_mut(&out_location);
3850                                    let op_ident = syn::Ident::new(
3851                                        match kind {
3852                                            crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3853                                            crate::handoff_ref::HandoffRefKind::Optional => "optional",
3854                                            crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3855                                        },
3856                                        Span::call_site(),
3857                                    );
3858                                    builder.add_dfir(
3859                                        parse_quote! {
3860                                            #ref_ident = #inner_ident -> #op_ident();
3861                                        },
3862                                        None,
3863                                        Some(&stmt_id.to_string()),
3864                                    );
3865                                }
3866                                BuildersOrCallback::Callback(_, node_callback) => {
3867                                    node_callback(node, next_stmt_id);
3868                                }
3869                            }
3870
3871                            ref_ident
3872                        };
3873
3874                        ident_stack.push(ret_ident);
3875                    }
3876
3877                    HydroNode::Partition {
3878                        inner, f, is_true, metadata,
3879                    } => {
3880                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3881                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3882                        let stmt_id = next_stmt_id.get_and_increment();
3883
3884                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3885                            match builders_or_callback {
3886                                BuildersOrCallback::Builders(_) => {}
3887                                BuildersOrCallback::Callback(_, node_callback) => {
3888                                    node_callback(node, next_stmt_id);
3889                                }
3890                            }
3891
3892                            let idx = if is_true { 0 } else { 1 };
3893                            built_idents[idx].clone()
3894                        } else {
3895                            // The inner node was already processed by transform_bottom_up,
3896                            // so its ident is on the stack
3897                            let inner_ident = ident_stack.pop().unwrap();
3898                            let f_tokens = f.emit_tokens(&mut ident_stack);
3899
3900                            let inner_ident = {
3901                                let inner_borrow = inner.0.borrow();
3902                                maybe_observe_for_mut(
3903                                    f, inner_ident,
3904                                    &inner_borrow.metadata().location_id,
3905                                    &inner_borrow.metadata().collection_kind,
3906                                    &metadata.op,
3907                                    builders_or_callback, next_stmt_id,
3908                                )
3909                            };
3910
3911                            let partition_ident = syn::Ident::new(
3912                                &format!("stream_{}_partition", stmt_id),
3913                                Span::call_site(),
3914                            );
3915                            let true_ident = syn::Ident::new(
3916                                &format!("stream_{}_true", stmt_id),
3917                                Span::call_site(),
3918                            );
3919                            let false_ident = syn::Ident::new(
3920                                &format!("stream_{}_false", stmt_id),
3921                                Span::call_site(),
3922                            );
3923
3924                            built_tees.insert(
3925                                ptr,
3926                                vec![true_ident.clone(), false_ident.clone()],
3927                            );
3928
3929                            let stmt_id = next_stmt_id.get_and_increment();
3930                            match builders_or_callback {
3931                                BuildersOrCallback::Builders(graph_builders) => {
3932                                    let builder = graph_builders.get_dfir_mut(&out_location);
3933                                    builder.add_dfir(
3934                                        parse_quote! {
3935                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3936                                            #true_ident = #partition_ident[0];
3937                                            #false_ident = #partition_ident[1];
3938                                        },
3939                                        None,
3940                                        Some(&stmt_id.to_string()),
3941                                    );
3942                                }
3943                                BuildersOrCallback::Callback(_, node_callback) => {
3944                                    node_callback(node, next_stmt_id);
3945                                }
3946                            }
3947
3948                            if is_true { true_ident } else { false_ident }
3949                        };
3950
3951                        ident_stack.push(ret_ident);
3952                    }
3953
3954                    HydroNode::Chain { .. } => {
3955                        // Children are processed left-to-right, so second is on top
3956                        let second_ident = ident_stack.pop().unwrap();
3957                        let first_ident = ident_stack.pop().unwrap();
3958
3959                        let stmt_id = next_stmt_id.get_and_increment();
3960                        let chain_ident =
3961                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3962
3963                        match builders_or_callback {
3964                            BuildersOrCallback::Builders(graph_builders) => {
3965                                let builder = graph_builders.get_dfir_mut(&out_location);
3966                                builder.add_dfir(
3967                                    parse_quote! {
3968                                        #chain_ident = chain();
3969                                        #first_ident -> [0]#chain_ident;
3970                                        #second_ident -> [1]#chain_ident;
3971                                    },
3972                                    None,
3973                                    Some(&stmt_id.to_string()),
3974                                );
3975                            }
3976                            BuildersOrCallback::Callback(_, node_callback) => {
3977                                node_callback(node, next_stmt_id);
3978                            }
3979                        }
3980
3981                        ident_stack.push(chain_ident);
3982                    }
3983
3984                    HydroNode::MergeOrdered { first, metadata, .. } => {
3985                        let second_ident = ident_stack.pop().unwrap();
3986                        let first_ident = ident_stack.pop().unwrap();
3987
3988                        let stmt_id = next_stmt_id.get_and_increment();
3989                        let merge_ident =
3990                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3991
3992                        match builders_or_callback {
3993                            BuildersOrCallback::Builders(graph_builders) => {
3994                                graph_builders.merge_ordered(
3995                                    &first.metadata().location_id,
3996                                    first_ident,
3997                                    second_ident,
3998                                    &merge_ident,
3999                                    &first.metadata().collection_kind,
4000                                    &metadata.op,
4001                                    Some(&stmt_id.to_string()),
4002                                );
4003                            }
4004                            BuildersOrCallback::Callback(_, node_callback) => {
4005                                node_callback(node, next_stmt_id);
4006                            }
4007                        }
4008
4009                        ident_stack.push(merge_ident);
4010                    }
4011
4012                    HydroNode::ChainFirst { .. } => {
4013                        let second_ident = ident_stack.pop().unwrap();
4014                        let first_ident = ident_stack.pop().unwrap();
4015
4016                        let stmt_id = next_stmt_id.get_and_increment();
4017                        let chain_ident =
4018                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4019
4020                        match builders_or_callback {
4021                            BuildersOrCallback::Builders(graph_builders) => {
4022                                let builder = graph_builders.get_dfir_mut(&out_location);
4023                                builder.add_dfir(
4024                                    parse_quote! {
4025                                        #chain_ident = chain_first_n(1);
4026                                        #first_ident -> [0]#chain_ident;
4027                                        #second_ident -> [1]#chain_ident;
4028                                    },
4029                                    None,
4030                                    Some(&stmt_id.to_string()),
4031                                );
4032                            }
4033                            BuildersOrCallback::Callback(_, node_callback) => {
4034                                node_callback(node, next_stmt_id);
4035                            }
4036                        }
4037
4038                        ident_stack.push(chain_ident);
4039                    }
4040
4041                    HydroNode::CrossSingleton { right, .. } => {
4042                        let right_ident = ident_stack.pop().unwrap();
4043                        let left_ident = ident_stack.pop().unwrap();
4044
4045                        let stmt_id = next_stmt_id.get_and_increment();
4046                        let cross_ident =
4047                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4048
4049                        match builders_or_callback {
4050                            BuildersOrCallback::Builders(graph_builders) => {
4051                                let builder = graph_builders.get_dfir_mut(&out_location);
4052
4053                                if right.metadata().location_id.is_top_level()
4054                                    && right.metadata().collection_kind.is_bounded()
4055                                {
4056                                    builder.add_dfir(
4057                                        parse_quote! {
4058                                            #cross_ident = cross_singleton::<'static>();
4059                                            #left_ident -> [input]#cross_ident;
4060                                            #right_ident -> [single]#cross_ident;
4061                                        },
4062                                        None,
4063                                        Some(&stmt_id.to_string()),
4064                                    );
4065                                } else {
4066                                    builder.add_dfir(
4067                                        parse_quote! {
4068                                            #cross_ident = cross_singleton();
4069                                            #left_ident -> [input]#cross_ident;
4070                                            #right_ident -> [single]#cross_ident;
4071                                        },
4072                                        None,
4073                                        Some(&stmt_id.to_string()),
4074                                    );
4075                                }
4076                            }
4077                            BuildersOrCallback::Callback(_, node_callback) => {
4078                                node_callback(node, next_stmt_id);
4079                            }
4080                        }
4081
4082                        ident_stack.push(cross_ident);
4083                    }
4084
4085                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
4086                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
4087                            parse_quote!(cross_join_multiset)
4088                        } else {
4089                            parse_quote!(join_multiset)
4090                        };
4091
4092                        let (HydroNode::CrossProduct { left, right, .. }
4093                        | HydroNode::Join { left, right, .. }) = node
4094                        else {
4095                            unreachable!()
4096                        };
4097
4098                        let is_top_level = left.metadata().location_id.is_top_level()
4099                            && right.metadata().location_id.is_top_level();
4100                        let left_lifetime = if left.metadata().location_id.is_top_level() {
4101                            quote!('static)
4102                        } else {
4103                            quote!('tick)
4104                        };
4105
4106                        let right_lifetime = if right.metadata().location_id.is_top_level() {
4107                            quote!('static)
4108                        } else {
4109                            quote!('tick)
4110                        };
4111
4112                        let right_ident = ident_stack.pop().unwrap();
4113                        let left_ident = ident_stack.pop().unwrap();
4114
4115                        let stmt_id = next_stmt_id.get_and_increment();
4116                        let stream_ident =
4117                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4118
4119                        match builders_or_callback {
4120                            BuildersOrCallback::Builders(graph_builders) => {
4121                                let builder = graph_builders.get_dfir_mut(&out_location);
4122                                builder.add_dfir(
4123                                    if is_top_level {
4124                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
4125                                        // a multiset_delta() to negate the replay behavior
4126                                        parse_quote! {
4127                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
4128                                            #left_ident -> [0]#stream_ident;
4129                                            #right_ident -> [1]#stream_ident;
4130                                        }
4131                                    } else {
4132                                        parse_quote! {
4133                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
4134                                            #left_ident -> [0]#stream_ident;
4135                                            #right_ident -> [1]#stream_ident;
4136                                        }
4137                                    }
4138                                    ,
4139                                    None,
4140                                    Some(&stmt_id.to_string()),
4141                                );
4142                            }
4143                            BuildersOrCallback::Callback(_, node_callback) => {
4144                                node_callback(node, next_stmt_id);
4145                            }
4146                        }
4147
4148                        ident_stack.push(stream_ident);
4149                    }
4150
4151                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
4152                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
4153                            parse_quote!(difference)
4154                        } else {
4155                            parse_quote!(anti_join)
4156                        };
4157
4158                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
4159                            node
4160                        else {
4161                            unreachable!()
4162                        };
4163
4164                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
4165                            quote!('static)
4166                        } else {
4167                            quote!('tick)
4168                        };
4169
4170                        let neg_ident = ident_stack.pop().unwrap();
4171                        let pos_ident = ident_stack.pop().unwrap();
4172
4173                        let stmt_id = next_stmt_id.get_and_increment();
4174                        let stream_ident =
4175                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4176
4177                        match builders_or_callback {
4178                            BuildersOrCallback::Builders(graph_builders) => {
4179                                let builder = graph_builders.get_dfir_mut(&out_location);
4180                                builder.add_dfir(
4181                                    parse_quote! {
4182                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
4183                                        #pos_ident -> [pos]#stream_ident;
4184                                        #neg_ident -> [neg]#stream_ident;
4185                                    },
4186                                    None,
4187                                    Some(&stmt_id.to_string()),
4188                                );
4189                            }
4190                            BuildersOrCallback::Callback(_, node_callback) => {
4191                                node_callback(node, next_stmt_id);
4192                            }
4193                        }
4194
4195                        ident_stack.push(stream_ident);
4196                    }
4197
4198                    HydroNode::JoinHalf { .. } => {
4199                        let HydroNode::JoinHalf { right, .. } = node else {
4200                            unreachable!()
4201                        };
4202
4203                        assert!(
4204                            right.metadata().collection_kind.is_bounded(),
4205                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
4206                            right.metadata().collection_kind
4207                        );
4208
4209                        let build_lifetime = if right.metadata().location_id.is_top_level() {
4210                            quote!('static)
4211                        } else {
4212                            quote!('tick)
4213                        };
4214
4215                        let build_ident = ident_stack.pop().unwrap();
4216                        let probe_ident = ident_stack.pop().unwrap();
4217
4218                        let stmt_id = next_stmt_id.get_and_increment();
4219                        let stream_ident =
4220                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4221
4222                        match builders_or_callback {
4223                            BuildersOrCallback::Builders(graph_builders) => {
4224                                let builder = graph_builders.get_dfir_mut(&out_location);
4225                                builder.add_dfir(
4226                                    parse_quote! {
4227                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
4228                                        #probe_ident -> [probe]#stream_ident;
4229                                        #build_ident -> [build]#stream_ident;
4230                                    },
4231                                    None,
4232                                    Some(&stmt_id.to_string()),
4233                                );
4234                            }
4235                            BuildersOrCallback::Callback(_, node_callback) => {
4236                                node_callback(node, next_stmt_id);
4237                            }
4238                        }
4239
4240                        ident_stack.push(stream_ident);
4241                    }
4242
4243                    HydroNode::ResolveFutures { .. } => {
4244                        let input_ident = ident_stack.pop().unwrap();
4245
4246                        let stmt_id = next_stmt_id.get_and_increment();
4247                        let futures_ident =
4248                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4249
4250                        match builders_or_callback {
4251                            BuildersOrCallback::Builders(graph_builders) => {
4252                                let builder = graph_builders.get_dfir_mut(&out_location);
4253                                builder.add_dfir(
4254                                    parse_quote! {
4255                                        #futures_ident = #input_ident -> resolve_futures();
4256                                    },
4257                                    None,
4258                                    Some(&stmt_id.to_string()),
4259                                );
4260                            }
4261                            BuildersOrCallback::Callback(_, node_callback) => {
4262                                node_callback(node, next_stmt_id);
4263                            }
4264                        }
4265
4266                        ident_stack.push(futures_ident);
4267                    }
4268
4269                    HydroNode::ResolveFuturesBlocking { .. } => {
4270                        let input_ident = ident_stack.pop().unwrap();
4271
4272                        let stmt_id = next_stmt_id.get_and_increment();
4273                        let futures_ident =
4274                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4275
4276                        match builders_or_callback {
4277                            BuildersOrCallback::Builders(graph_builders) => {
4278                                let builder = graph_builders.get_dfir_mut(&out_location);
4279                                builder.add_dfir(
4280                                    parse_quote! {
4281                                        #futures_ident = #input_ident -> resolve_futures_blocking();
4282                                    },
4283                                    None,
4284                                    Some(&stmt_id.to_string()),
4285                                );
4286                            }
4287                            BuildersOrCallback::Callback(_, node_callback) => {
4288                                node_callback(node, next_stmt_id);
4289                            }
4290                        }
4291
4292                        ident_stack.push(futures_ident);
4293                    }
4294
4295                    HydroNode::ResolveFuturesOrdered { .. } => {
4296                        let input_ident = ident_stack.pop().unwrap();
4297
4298                        let stmt_id = next_stmt_id.get_and_increment();
4299                        let futures_ident =
4300                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4301
4302                        match builders_or_callback {
4303                            BuildersOrCallback::Builders(graph_builders) => {
4304                                let builder = graph_builders.get_dfir_mut(&out_location);
4305                                builder.add_dfir(
4306                                    parse_quote! {
4307                                        #futures_ident = #input_ident -> resolve_futures_ordered();
4308                                    },
4309                                    None,
4310                                    Some(&stmt_id.to_string()),
4311                                );
4312                            }
4313                            BuildersOrCallback::Callback(_, node_callback) => {
4314                                node_callback(node, next_stmt_id);
4315                            }
4316                        }
4317
4318                        ident_stack.push(futures_ident);
4319                    }
4320
4321                    HydroNode::Map {
4322                        f,
4323                        input,
4324                        metadata,
4325                    } => {
4326                        // Pop input ident (pushed last by transform_children).
4327                        let input_ident = ident_stack.pop().unwrap();
4328                        let f_tokens = f.emit_tokens(&mut ident_stack);
4329
4330                        let input_ident = maybe_observe_for_mut(
4331                            f,
4332                            input_ident,
4333                            &input.metadata().location_id,
4334                            &input.metadata().collection_kind,
4335                            &metadata.op,
4336                            builders_or_callback,
4337                            next_stmt_id,
4338                        );
4339
4340                        let stmt_id = next_stmt_id.get_and_increment();
4341                        let map_ident =
4342                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4343
4344                        match builders_or_callback {
4345                            BuildersOrCallback::Builders(graph_builders) => {
4346                                let builder = graph_builders.get_dfir_mut(&out_location);
4347                                builder.add_dfir(
4348                                    parse_quote! {
4349                                        #map_ident = #input_ident -> map(#f_tokens);
4350                                    },
4351                                    None,
4352                                    Some(&stmt_id.to_string()),
4353                                );
4354                            }
4355                            BuildersOrCallback::Callback(_, node_callback) => {
4356                                node_callback(node, next_stmt_id);
4357                            }
4358                        }
4359
4360                        ident_stack.push(map_ident);
4361                    }
4362
4363                    HydroNode::FlatMap { f, input, metadata } => {
4364                        let input_ident = ident_stack.pop().unwrap();
4365                        let f_tokens = f.emit_tokens(&mut ident_stack);
4366
4367                        let input_ident = maybe_observe_for_mut(
4368                            f, input_ident,
4369                            &input.metadata().location_id,
4370                            &input.metadata().collection_kind,
4371                            &metadata.op,
4372                            builders_or_callback, next_stmt_id,
4373                        );
4374
4375                        let stmt_id = next_stmt_id.get_and_increment();
4376                        let flat_map_ident =
4377                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4378
4379                        match builders_or_callback {
4380                            BuildersOrCallback::Builders(graph_builders) => {
4381                                let builder = graph_builders.get_dfir_mut(&out_location);
4382                                builder.add_dfir(
4383                                    parse_quote! {
4384                                        #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4385                                    },
4386                                    None,
4387                                    Some(&stmt_id.to_string()),
4388                                );
4389                            }
4390                            BuildersOrCallback::Callback(_, node_callback) => {
4391                                node_callback(node, next_stmt_id);
4392                            }
4393                        }
4394
4395                        ident_stack.push(flat_map_ident);
4396                    }
4397
4398                    HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
4399                        let input_ident = ident_stack.pop().unwrap();
4400                        let f_tokens = f.emit_tokens(&mut ident_stack);
4401
4402                        let input_ident = maybe_observe_for_mut(
4403                            f, input_ident,
4404                            &input.metadata().location_id,
4405                            &input.metadata().collection_kind,
4406                            &metadata.op,
4407                            builders_or_callback, next_stmt_id,
4408                        );
4409
4410                        let stmt_id = next_stmt_id.get_and_increment();
4411                        let flat_map_stream_blocking_ident =
4412                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4413
4414                        match builders_or_callback {
4415                            BuildersOrCallback::Builders(graph_builders) => {
4416                                let builder = graph_builders.get_dfir_mut(&out_location);
4417                                builder.add_dfir(
4418                                    parse_quote! {
4419                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4420                                    },
4421                                    None,
4422                                    Some(&stmt_id.to_string()),
4423                                );
4424                            }
4425                            BuildersOrCallback::Callback(_, node_callback) => {
4426                                node_callback(node, next_stmt_id);
4427                            }
4428                        }
4429
4430                        ident_stack.push(flat_map_stream_blocking_ident);
4431                    }
4432
4433                    HydroNode::Filter { f, input, metadata } => {
4434                        let input_ident = ident_stack.pop().unwrap();
4435                        let f_tokens = f.emit_tokens(&mut ident_stack);
4436
4437                        let input_ident = maybe_observe_for_mut(
4438                            f, input_ident,
4439                            &input.metadata().location_id,
4440                            &input.metadata().collection_kind,
4441                            &metadata.op,
4442                            builders_or_callback, next_stmt_id,
4443                        );
4444
4445                        let stmt_id = next_stmt_id.get_and_increment();
4446                        let filter_ident =
4447                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4448
4449                        match builders_or_callback {
4450                            BuildersOrCallback::Builders(graph_builders) => {
4451                                let builder = graph_builders.get_dfir_mut(&out_location);
4452                                builder.add_dfir(
4453                                    parse_quote! {
4454                                        #filter_ident = #input_ident -> filter(#f_tokens);
4455                                    },
4456                                    None,
4457                                    Some(&stmt_id.to_string()),
4458                                );
4459                            }
4460                            BuildersOrCallback::Callback(_, node_callback) => {
4461                                node_callback(node, next_stmt_id);
4462                            }
4463                        }
4464
4465                        ident_stack.push(filter_ident);
4466                    }
4467
4468                    HydroNode::FilterMap { f, input, metadata } => {
4469                        let input_ident = ident_stack.pop().unwrap();
4470                        let f_tokens = f.emit_tokens(&mut ident_stack);
4471
4472                        let input_ident = maybe_observe_for_mut(
4473                            f, input_ident,
4474                            &input.metadata().location_id,
4475                            &input.metadata().collection_kind,
4476                            &metadata.op,
4477                            builders_or_callback, next_stmt_id,
4478                        );
4479
4480                        let stmt_id = next_stmt_id.get_and_increment();
4481                        let filter_map_ident =
4482                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4483
4484                        match builders_or_callback {
4485                            BuildersOrCallback::Builders(graph_builders) => {
4486                                let builder = graph_builders.get_dfir_mut(&out_location);
4487                                builder.add_dfir(
4488                                    parse_quote! {
4489                                        #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4490                                    },
4491                                    None,
4492                                    Some(&stmt_id.to_string()),
4493                                );
4494                            }
4495                            BuildersOrCallback::Callback(_, node_callback) => {
4496                                node_callback(node, next_stmt_id);
4497                            }
4498                        }
4499
4500                        ident_stack.push(filter_map_ident);
4501                    }
4502
4503                    HydroNode::Sort { .. } => {
4504                        let input_ident = ident_stack.pop().unwrap();
4505
4506                        let stmt_id = next_stmt_id.get_and_increment();
4507                        let sort_ident =
4508                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4509
4510                        match builders_or_callback {
4511                            BuildersOrCallback::Builders(graph_builders) => {
4512                                let builder = graph_builders.get_dfir_mut(&out_location);
4513                                builder.add_dfir(
4514                                    parse_quote! {
4515                                        #sort_ident = #input_ident -> sort();
4516                                    },
4517                                    None,
4518                                    Some(&stmt_id.to_string()),
4519                                );
4520                            }
4521                            BuildersOrCallback::Callback(_, node_callback) => {
4522                                node_callback(node, next_stmt_id);
4523                            }
4524                        }
4525
4526                        ident_stack.push(sort_ident);
4527                    }
4528
4529                    HydroNode::DeferTick { .. } => {
4530                        let input_ident = ident_stack.pop().unwrap();
4531
4532                        let stmt_id = next_stmt_id.get_and_increment();
4533                        let defer_tick_ident =
4534                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4535
4536                        match builders_or_callback {
4537                            BuildersOrCallback::Builders(graph_builders) => {
4538                                let builder = graph_builders.get_dfir_mut(&out_location);
4539                                builder.add_dfir(
4540                                    parse_quote! {
4541                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
4542                                    },
4543                                    None,
4544                                    Some(&stmt_id.to_string()),
4545                                );
4546                            }
4547                            BuildersOrCallback::Callback(_, node_callback) => {
4548                                node_callback(node, next_stmt_id);
4549                            }
4550                        }
4551
4552                        ident_stack.push(defer_tick_ident);
4553                    }
4554
4555                    HydroNode::Enumerate { input, .. } => {
4556                        let input_ident = ident_stack.pop().unwrap();
4557
4558                        let stmt_id = next_stmt_id.get_and_increment();
4559                        let enumerate_ident =
4560                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4561
4562                        match builders_or_callback {
4563                            BuildersOrCallback::Builders(graph_builders) => {
4564                                let builder = graph_builders.get_dfir_mut(&out_location);
4565                                let lifetime = if input.metadata().location_id.is_top_level() {
4566                                    quote!('static)
4567                                } else {
4568                                    quote!('tick)
4569                                };
4570                                builder.add_dfir(
4571                                    parse_quote! {
4572                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4573                                    },
4574                                    None,
4575                                    Some(&stmt_id.to_string()),
4576                                );
4577                            }
4578                            BuildersOrCallback::Callback(_, node_callback) => {
4579                                node_callback(node, next_stmt_id);
4580                            }
4581                        }
4582
4583                        ident_stack.push(enumerate_ident);
4584                    }
4585
4586                    HydroNode::Inspect { f, input, metadata } => {
4587                        let input_ident = ident_stack.pop().unwrap();
4588                        let f_tokens = f.emit_tokens(&mut ident_stack);
4589
4590                        let input_ident = maybe_observe_for_mut(
4591                            f, input_ident,
4592                            &input.metadata().location_id,
4593                            &input.metadata().collection_kind,
4594                            &metadata.op,
4595                            builders_or_callback, next_stmt_id,
4596                        );
4597
4598                        let stmt_id = next_stmt_id.get_and_increment();
4599                        let inspect_ident =
4600                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4601
4602                        match builders_or_callback {
4603                            BuildersOrCallback::Builders(graph_builders) => {
4604                                let builder = graph_builders.get_dfir_mut(&out_location);
4605                                builder.add_dfir(
4606                                    parse_quote! {
4607                                        #inspect_ident = #input_ident -> inspect(#f_tokens);
4608                                    },
4609                                    None,
4610                                    Some(&stmt_id.to_string()),
4611                                );
4612                            }
4613                            BuildersOrCallback::Callback(_, node_callback) => {
4614                                node_callback(node, next_stmt_id);
4615                            }
4616                        }
4617
4618                        ident_stack.push(inspect_ident);
4619                    }
4620
4621                    HydroNode::Unique { input, .. } => {
4622                        let input_ident = ident_stack.pop().unwrap();
4623
4624                        let stmt_id = next_stmt_id.get_and_increment();
4625                        let unique_ident =
4626                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4627
4628                        match builders_or_callback {
4629                            BuildersOrCallback::Builders(graph_builders) => {
4630                                let builder = graph_builders.get_dfir_mut(&out_location);
4631                                let lifetime = if input.metadata().location_id.is_top_level() {
4632                                    quote!('static)
4633                                } else {
4634                                    quote!('tick)
4635                                };
4636
4637                                builder.add_dfir(
4638                                    parse_quote! {
4639                                        #unique_ident = #input_ident -> unique::<#lifetime>();
4640                                    },
4641                                    None,
4642                                    Some(&stmt_id.to_string()),
4643                                );
4644                            }
4645                            BuildersOrCallback::Callback(_, node_callback) => {
4646                                node_callback(node, next_stmt_id);
4647                            }
4648                        }
4649
4650                        ident_stack.push(unique_ident);
4651                    }
4652
4653                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4654                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4655                            if input.metadata().location_id.is_top_level()
4656                                && input.metadata().collection_kind.is_bounded()
4657                            {
4658                                parse_quote!(fold_no_replay)
4659                            } else {
4660                                parse_quote!(fold)
4661                            }
4662                        } else if matches!(node, HydroNode::Scan { .. }) {
4663                            parse_quote!(scan)
4664                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4665                            parse_quote!(scan_async_blocking)
4666                        } else if let HydroNode::FoldKeyed { input, .. } = node {
4667                            if input.metadata().location_id.is_top_level()
4668                                && input.metadata().collection_kind.is_bounded()
4669                            {
4670                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
4671                            } else {
4672                                parse_quote!(fold_keyed)
4673                            }
4674                        } else {
4675                            unreachable!()
4676                        };
4677
4678                        let (HydroNode::Fold { input, .. }
4679                        | HydroNode::FoldKeyed { input, .. }
4680                        | HydroNode::Scan { input, .. }
4681                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
4682                        else {
4683                            unreachable!()
4684                        };
4685
4686                        let lifetime = if input.metadata().location_id.is_top_level() {
4687                            quote!('static)
4688                        } else {
4689                            quote!('tick)
4690                        };
4691
4692                        let input_ident = ident_stack.pop().unwrap();
4693
4694                        let (HydroNode::Fold { init, acc, .. }
4695                        | HydroNode::FoldKeyed { init, acc, .. }
4696                        | HydroNode::Scan { init, acc, .. }
4697                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4698                        else {
4699                            unreachable!()
4700                        };
4701
4702                        let acc_tokens = acc.emit_tokens(&mut ident_stack);
4703                        let init_tokens = init.emit_tokens(&mut ident_stack);
4704
4705                        let stmt_id = next_stmt_id.get_and_increment();
4706                        let fold_ident =
4707                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4708
4709                        match builders_or_callback {
4710                            BuildersOrCallback::Builders(graph_builders) => {
4711                                if matches!(node, HydroNode::Fold { .. })
4712                                    && node.metadata().location_id.is_top_level()
4713                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4714                                    && graph_builders.singleton_intermediates()
4715                                    && !node.metadata().collection_kind.is_bounded()
4716                                {
4717                                    let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4718                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4719                                        &input.metadata().location_id,
4720                                        &input_ident,
4721                                        &input.metadata().collection_kind,
4722                                        &node.metadata().op,
4723                                    );
4724
4725                                    let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4726                                        let acc: syn::Expr = parse_quote!({
4727                                            let mut __inner = #acc_tokens;
4728                                            move |__state, __batch: Vec<_>| {
4729                                                if __batch.is_empty() {
4730                                                    return None;
4731                                                }
4732                                                for __value in __batch {
4733                                                    __inner(__state, __value);
4734                                                }
4735                                                Some(__state.clone())
4736                                            }
4737                                        });
4738                                        (hooked, acc)
4739                                    } else {
4740                                        let acc: syn::Expr = parse_quote!({
4741                                            let mut __inner = #acc_tokens;
4742                                            move |__state, __value| {
4743                                                __inner(__state, __value);
4744                                                Some(__state.clone())
4745                                            }
4746                                        });
4747                                        (&input_ident, acc)
4748                                    };
4749
4750                                    let builder = graph_builders.get_dfir_mut(&out_location);
4751                                    builder.add_dfir(
4752                                        parse_quote! {
4753                                            source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4754                                            #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4755                                            #fold_ident = chain();
4756                                        },
4757                                        None,
4758                                        Some(&stmt_id.to_string()),
4759                                    );
4760
4761                                    if hooked_input_ident.is_some() {
4762                                        fold_hooked_idents.insert(fold_ident.to_string());
4763                                    }
4764                                } else if matches!(node, HydroNode::FoldKeyed { .. })
4765                                    && node.metadata().location_id.is_top_level()
4766                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4767                                    && graph_builders.singleton_intermediates()
4768                                    && !node.metadata().collection_kind.is_bounded()
4769                                {
4770                                    let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4771                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4772                                        &input.metadata().location_id,
4773                                        &input_ident,
4774                                        &input.metadata().collection_kind,
4775                                        &node.metadata().op,
4776                                    );
4777                                    let builder = graph_builders.get_dfir_mut(&out_location);
4778
4779                                    let wrapped_acc: syn::Expr = parse_quote!({
4780                                        let mut __init = #init_tokens;
4781                                        let mut __inner = #acc_tokens;
4782                                        move |__state, __kv: (_, _)| {
4783                                            // TODO(shadaj): we can avoid the clone when the entry exists
4784                                            let __state = __state
4785                                                .entry(::std::clone::Clone::clone(&__kv.0))
4786                                                .or_insert_with(|| (__init)());
4787                                            __inner(__state, __kv.1);
4788                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4789                                        }
4790                                    });
4791
4792                                    if let Some(hooked_input_ident) = hooked_input_ident {
4793                                        builder.add_dfir(
4794                                            parse_quote! {
4795                                                #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4796                                            },
4797                                            None,
4798                                            Some(&stmt_id.to_string()),
4799                                        );
4800
4801                                        fold_hooked_idents.insert(fold_ident.to_string());
4802                                    } else {
4803                                        builder.add_dfir(
4804                                            parse_quote! {
4805                                                #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4806                                            },
4807                                            None,
4808                                            Some(&stmt_id.to_string()),
4809                                        );
4810                                    }
4811                                } else if (matches!(node, HydroNode::Fold { .. })
4812                                    || matches!(node, HydroNode::FoldKeyed { .. }))
4813                                    && !node.metadata().location_id.is_top_level()
4814                                    && graph_builders.singleton_intermediates()
4815                                {
4816                                    let input_ref = match &*node {
4817                                        HydroNode::Fold { input, .. } => input,
4818                                        HydroNode::FoldKeyed { input, .. } => input,
4819                                        _ => unreachable!(),
4820                                    };
4821                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4822                                        &input_ref.metadata().location_id,
4823                                        &input_ident,
4824                                        &input_ref.metadata().collection_kind,
4825                                        &node.metadata().op,
4826                                    );
4827
4828                                    let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4829                                    let builder = graph_builders.get_dfir_mut(&out_location);
4830                                    builder.add_dfir(
4831                                        parse_quote! {
4832                                            #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4833                                        },
4834                                        None,
4835                                        Some(&stmt_id.to_string()),
4836                                    );
4837                                } else {
4838                                    let builder = graph_builders.get_dfir_mut(&out_location);
4839                                    builder.add_dfir(
4840                                        parse_quote! {
4841                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4842                                        },
4843                                        None,
4844                                        Some(&stmt_id.to_string()),
4845                                    );
4846                                }
4847                            }
4848                            BuildersOrCallback::Callback(_, node_callback) => {
4849                                node_callback(node, next_stmt_id);
4850                            }
4851                        }
4852
4853                        ident_stack.push(fold_ident);
4854                    }
4855
4856                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4857                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4858                            if input.metadata().location_id.is_top_level()
4859                                && input.metadata().collection_kind.is_bounded()
4860                            {
4861                                parse_quote!(reduce_no_replay)
4862                            } else {
4863                                parse_quote!(reduce)
4864                            }
4865                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4866                            if input.metadata().location_id.is_top_level()
4867                                && input.metadata().collection_kind.is_bounded()
4868                            {
4869                                todo!(
4870                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4871                                )
4872                            } else {
4873                                parse_quote!(reduce_keyed)
4874                            }
4875                        } else {
4876                            unreachable!()
4877                        };
4878
4879                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4880                        else {
4881                            unreachable!()
4882                        };
4883
4884                        let lifetime = if input.metadata().location_id.is_top_level() {
4885                            quote!('static)
4886                        } else {
4887                            quote!('tick)
4888                        };
4889
4890                        let input_ident = ident_stack.pop().unwrap();
4891
4892                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4893                        else {
4894                            unreachable!()
4895                        };
4896
4897                        let f_tokens = f.emit_tokens(&mut ident_stack);
4898
4899                        let stmt_id = next_stmt_id.get_and_increment();
4900                        let reduce_ident =
4901                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4902
4903                        match builders_or_callback {
4904                            BuildersOrCallback::Builders(graph_builders) => {
4905                                if matches!(node, HydroNode::Reduce { .. })
4906                                    && node.metadata().location_id.is_top_level()
4907                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4908                                    && graph_builders.singleton_intermediates()
4909                                    && !node.metadata().collection_kind.is_bounded()
4910                                {
4911                                    todo!(
4912                                        "Reduce with optional intermediates is not yet supported in simulator"
4913                                    );
4914                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4915                                    && node.metadata().location_id.is_top_level()
4916                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4917                                    && graph_builders.singleton_intermediates()
4918                                    && !node.metadata().collection_kind.is_bounded()
4919                                {
4920                                    todo!(
4921                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4922                                    );
4923                                } else {
4924                                    let builder = graph_builders.get_dfir_mut(&out_location);
4925                                    builder.add_dfir(
4926                                        parse_quote! {
4927                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4928                                        },
4929                                        None,
4930                                        Some(&stmt_id.to_string()),
4931                                    );
4932                                }
4933                            }
4934                            BuildersOrCallback::Callback(_, node_callback) => {
4935                                node_callback(node, next_stmt_id);
4936                            }
4937                        }
4938
4939                        ident_stack.push(reduce_ident);
4940                    }
4941
4942                    HydroNode::ReduceKeyedWatermark {
4943                        f,
4944                        input,
4945                        metadata,
4946                        ..
4947                    } => {
4948                        let lifetime = if input.metadata().location_id.is_top_level() {
4949                            quote!('static)
4950                        } else {
4951                            quote!('tick)
4952                        };
4953
4954                        // watermark is processed second, so it's on top
4955                        let watermark_ident = ident_stack.pop().unwrap();
4956                        let input_ident = ident_stack.pop().unwrap();
4957                        let f_tokens = f.emit_tokens(&mut ident_stack);
4958
4959                        let stmt_id = next_stmt_id.get_and_increment();
4960                        let chain_ident = syn::Ident::new(
4961                            &format!("reduce_keyed_watermark_chain_{}", stmt_id),
4962                            Span::call_site(),
4963                        );
4964
4965                        let fold_ident =
4966                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4967
4968                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4969                            && input.metadata().collection_kind.is_bounded()
4970                        {
4971                            parse_quote!(fold_no_replay)
4972                        } else {
4973                            parse_quote!(fold)
4974                        };
4975
4976                        match builders_or_callback {
4977                            BuildersOrCallback::Builders(graph_builders) => {
4978                                if metadata.location_id.is_top_level()
4979                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4980                                    && graph_builders.singleton_intermediates()
4981                                    && !metadata.collection_kind.is_bounded()
4982                                {
4983                                    todo!(
4984                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4985                                    )
4986                                } else {
4987                                    let builder = graph_builders.get_dfir_mut(&out_location);
4988                                    builder.add_dfir(
4989                                        parse_quote! {
4990                                            #chain_ident = chain();
4991                                            #input_ident
4992                                                -> map(|x| (Some(x), None))
4993                                                -> [0]#chain_ident;
4994                                            #watermark_ident
4995                                                -> map(|watermark| (None, Some(watermark)))
4996                                                -> [1]#chain_ident;
4997
4998                                            #fold_ident = #chain_ident
4999                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
5000                                                    let __reduce_keyed_fn = #f_tokens;
5001                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
5002                                                        if let Some((k, v)) = opt_payload {
5003                                                            if let Some(curr_watermark) = *opt_curr_watermark {
5004                                                                if k < curr_watermark {
5005                                                                    return;
5006                                                                }
5007                                                            }
5008                                                            match map.entry(k) {
5009                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
5010                                                                    e.insert(v);
5011                                                                }
5012                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
5013                                                                    __reduce_keyed_fn(e.get_mut(), v);
5014                                                                }
5015                                                            }
5016                                                        } else {
5017                                                            let watermark = opt_watermark.unwrap();
5018                                                            if let Some(curr_watermark) = *opt_curr_watermark {
5019                                                                if watermark <= curr_watermark {
5020                                                                    return;
5021                                                                }
5022                                                            }
5023                                                            map.retain(|k, _| *k >= watermark);
5024                                                            *opt_curr_watermark = Some(watermark);
5025                                                        }
5026                                                    }
5027                                                })
5028                                                -> flat_map(|(map, _curr_watermark)| map);
5029                                        },
5030                                        None,
5031                                        Some(&stmt_id.to_string()),
5032                                    );
5033                                }
5034                            }
5035                            BuildersOrCallback::Callback(_, node_callback) => {
5036                                node_callback(node, next_stmt_id);
5037                            }
5038                        }
5039
5040                        ident_stack.push(fold_ident);
5041                    }
5042
5043                    HydroNode::Network {
5044                        networking_info,
5045                        serialize_fn: serialize_pipeline,
5046                        instantiate_fn,
5047                        deserialize_fn: deserialize_pipeline,
5048                        input,
5049                        ..
5050                    } => {
5051                        let input_ident = ident_stack.pop().unwrap();
5052
5053                        let stmt_id = next_stmt_id.get_and_increment();
5054                        let receiver_stream_ident =
5055                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5056
5057                        match builders_or_callback {
5058                            BuildersOrCallback::Builders(graph_builders) => {
5059                                let (sink_expr, source_expr) = match instantiate_fn {
5060                                    DebugInstantiate::Building => (
5061                                        syn::parse_quote!(DUMMY_SINK),
5062                                        syn::parse_quote!(DUMMY_SOURCE),
5063                                    ),
5064
5065                                    DebugInstantiate::Finalized(finalized) => {
5066                                        (finalized.sink.clone(), finalized.source.clone())
5067                                    }
5068                                };
5069
5070                                graph_builders.create_network(
5071                                    &input.metadata().location_id,
5072                                    &out_location,
5073                                    input_ident,
5074                                    &receiver_stream_ident,
5075                                    serialize_pipeline.as_ref(),
5076                                    sink_expr,
5077                                    source_expr,
5078                                    deserialize_pipeline.as_ref(),
5079                                    stmt_id,
5080                                    networking_info,
5081                                );
5082                            }
5083                            BuildersOrCallback::Callback(_, node_callback) => {
5084                                node_callback(node, next_stmt_id);
5085                            }
5086                        }
5087
5088                        ident_stack.push(receiver_stream_ident);
5089                    }
5090
5091                    HydroNode::ExternalInput {
5092                        instantiate_fn,
5093                        deserialize_fn: deserialize_pipeline,
5094                        ..
5095                    } => {
5096                        let stmt_id = next_stmt_id.get_and_increment();
5097                        let receiver_stream_ident =
5098                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5099
5100                        match builders_or_callback {
5101                            BuildersOrCallback::Builders(graph_builders) => {
5102                                let (_, source_expr) = match instantiate_fn {
5103                                    DebugInstantiate::Building => (
5104                                        syn::parse_quote!(DUMMY_SINK),
5105                                        syn::parse_quote!(DUMMY_SOURCE),
5106                                    ),
5107
5108                                    DebugInstantiate::Finalized(finalized) => {
5109                                        (finalized.sink.clone(), finalized.source.clone())
5110                                    }
5111                                };
5112
5113                                graph_builders.create_external_source(
5114                                    &out_location,
5115                                    source_expr,
5116                                    &receiver_stream_ident,
5117                                    deserialize_pipeline.as_ref(),
5118                                    stmt_id,
5119                                );
5120                            }
5121                            BuildersOrCallback::Callback(_, node_callback) => {
5122                                node_callback(node, next_stmt_id);
5123                            }
5124                        }
5125
5126                        ident_stack.push(receiver_stream_ident);
5127                    }
5128
5129                    HydroNode::Counter {
5130                        tag,
5131                        duration,
5132                        prefix,
5133                        ..
5134                    } => {
5135                        let input_ident = ident_stack.pop().unwrap();
5136
5137                        let stmt_id = next_stmt_id.get_and_increment();
5138                        let counter_ident =
5139                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5140
5141                        match builders_or_callback {
5142                            BuildersOrCallback::Builders(graph_builders) => {
5143                                let arg = format!("{}({})", prefix, tag);
5144                                let builder = graph_builders.get_dfir_mut(&out_location);
5145                                builder.add_dfir(
5146                                    parse_quote! {
5147                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
5148                                    },
5149                                    None,
5150                                    Some(&stmt_id.to_string()),
5151                                );
5152                            }
5153                            BuildersOrCallback::Callback(_, node_callback) => {
5154                                node_callback(node, next_stmt_id);
5155                            }
5156                        }
5157
5158                        ident_stack.push(counter_ident);
5159                    }
5160
5161                    HydroNode::VersionedNetworkFork {
5162                        channel_id,
5163                        senders,
5164                        metadata,
5165                        ..
5166                    } => {
5167                        // sender idents are pushed in order of the 'senders' member.
5168                        let split_at = ident_stack.len() - senders.len();
5169                        let sender_idents = ident_stack.split_off(split_at);
5170
5171                        let stmt_id = next_stmt_id.get_and_increment();
5172
5173                        match builders_or_callback {
5174                            BuildersOrCallback::Builders(graph_builders) => {
5175                                let sender_args: Vec<(LocationId, syn::Ident, Option<DebugExpr>)> =
5176                                    senders
5177                                        .iter()
5178                                        .zip(sender_idents)
5179                                        .map(|((_version, sender, serialize), ident)| {
5180                                            (
5181                                                sender.metadata().location_id.clone(),
5182                                                ident,
5183                                                serialize.clone(),
5184                                            )
5185                                        })
5186                                        .collect();
5187                                graph_builders.create_versioned_network_fork(
5188                                    *channel_id,
5189                                    &metadata.location_id,
5190                                    sender_args,
5191                                    stmt_id,
5192                                );
5193                            }
5194                            BuildersOrCallback::Callback(_, node_callback) => {
5195                                node_callback(node, next_stmt_id);
5196                            }
5197                        }
5198                    }
5199
5200                    HydroNode::VersionedNetwork {
5201                        fork,
5202                        deserialize_fn,
5203                        metadata,
5204                        ..
5205                    } => {
5206                        let stmt_id = next_stmt_id.get_and_increment();
5207                        let receiver_stream_ident =
5208                            syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5209
5210                        // The wire element type is determined by the channel's *source* kind, which
5211                        // all senders share; read it from the shared fork's first sender.
5212                        let (channel_id, source_loc) = {
5213                            let fork_ref = fork.0.borrow();
5214                            let HydroNode::VersionedNetworkFork {
5215                                channel_id,
5216                                senders,
5217                                ..
5218                            } = &*fork_ref
5219                            else {
5220                                unreachable!("VersionedNetwork.fork must be a VersionedNetworkFork");
5221                            };
5222                            let source_loc = senders
5223                                .first()
5224                                .map(|(_v, sender, _s)| sender.metadata().location_id.clone())
5225                                .expect("a VersionedNetworkFork always has at least one sender");
5226                            (*channel_id, source_loc)
5227                        };
5228
5229                        match builders_or_callback {
5230                            BuildersOrCallback::Builders(graph_builders) => {
5231                                graph_builders.create_versioned_network(
5232                                    channel_id,
5233                                    &source_loc,
5234                                    &metadata.location_id,
5235                                    &receiver_stream_ident,
5236                                    deserialize_fn.as_ref(),
5237                                    stmt_id,
5238                                );
5239                            }
5240                            BuildersOrCallback::Callback(_, node_callback) => {
5241                                node_callback(node, next_stmt_id);
5242                            }
5243                        }
5244
5245                        ident_stack.push(receiver_stream_ident);
5246                    }
5247                }
5248            },
5249            seen_tees,
5250            false,
5251        );
5252
5253        let ret = ident_stack
5254            .pop()
5255            .expect("ident_stack should have exactly one element after traversal");
5256        assert!(
5257            ident_stack.is_empty(),
5258            "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
5259             This indicates a bug in the code gen: some node pushed idents that were never consumed.",
5260            ident_stack.len()
5261        );
5262        ret
5263    }
5264
5265    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
5266        match self {
5267            HydroNode::Placeholder => {
5268                panic!()
5269            }
5270            HydroNode::Cast { .. }
5271            | HydroNode::ObserveNonDet { .. }
5272            | HydroNode::UnboundSingleton { .. }
5273            | HydroNode::AssertIsConsistent { .. } => {}
5274            HydroNode::Source { source, .. } => match source {
5275                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
5276                HydroSource::ExternalNetwork()
5277                | HydroSource::Spin()
5278                | HydroSource::ClusterMembers(_, _)
5279                | HydroSource::Embedded(_)
5280                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
5281            },
5282            HydroNode::SingletonSource { value, .. } => {
5283                transform(value);
5284            }
5285            HydroNode::CycleSource { .. }
5286            | HydroNode::Tee { .. }
5287            | HydroNode::Reference { .. }
5288            | HydroNode::YieldConcat { .. }
5289            | HydroNode::BeginAtomic { .. }
5290            | HydroNode::EndAtomic { .. }
5291            | HydroNode::Batch { .. }
5292            | HydroNode::Chain { .. }
5293            | HydroNode::MergeOrdered { .. }
5294            | HydroNode::ChainFirst { .. }
5295            | HydroNode::CrossProduct { .. }
5296            | HydroNode::CrossSingleton { .. }
5297            | HydroNode::ResolveFutures { .. }
5298            | HydroNode::ResolveFuturesBlocking { .. }
5299            | HydroNode::ResolveFuturesOrdered { .. }
5300            | HydroNode::Join { .. }
5301            | HydroNode::JoinHalf { .. }
5302            | HydroNode::Difference { .. }
5303            | HydroNode::AntiJoin { .. }
5304            | HydroNode::DeferTick { .. }
5305            | HydroNode::Enumerate { .. }
5306            | HydroNode::Unique { .. }
5307            | HydroNode::Sort { .. }
5308            | HydroNode::VersionedNetworkFork { .. }
5309            | HydroNode::VersionedNetwork { .. } => {}
5310            HydroNode::Map { f, .. }
5311            | HydroNode::FlatMap { f, .. }
5312            | HydroNode::FlatMapStreamBlocking { f, .. }
5313            | HydroNode::Filter { f, .. }
5314            | HydroNode::FilterMap { f, .. }
5315            | HydroNode::Inspect { f, .. }
5316            | HydroNode::Partition { f, .. }
5317            | HydroNode::Reduce { f, .. }
5318            | HydroNode::ReduceKeyed { f, .. }
5319            | HydroNode::ReduceKeyedWatermark { f, .. } => {
5320                transform(&mut f.expr);
5321            }
5322            HydroNode::Fold { init, acc, .. }
5323            | HydroNode::Scan { init, acc, .. }
5324            | HydroNode::ScanAsyncBlocking { init, acc, .. }
5325            | HydroNode::FoldKeyed { init, acc, .. } => {
5326                transform(&mut init.expr);
5327                transform(&mut acc.expr);
5328            }
5329            HydroNode::Network {
5330                serialize_fn,
5331                deserialize_fn,
5332                ..
5333            } => {
5334                if let Some(serialize_fn) = serialize_fn {
5335                    transform(serialize_fn);
5336                }
5337                if let Some(deserialize_fn) = deserialize_fn {
5338                    transform(deserialize_fn);
5339                }
5340            }
5341            HydroNode::ExternalInput { deserialize_fn, .. } => {
5342                if let Some(deserialize_fn) = deserialize_fn {
5343                    transform(deserialize_fn);
5344                }
5345            }
5346            HydroNode::Counter { duration, .. } => {
5347                transform(duration);
5348            }
5349        }
5350    }
5351
5352    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
5353        &self.metadata().op
5354    }
5355
5356    pub fn metadata(&self) -> &HydroIrMetadata {
5357        match self {
5358            HydroNode::Placeholder => {
5359                panic!()
5360            }
5361            HydroNode::VersionedNetworkFork { metadata, .. }
5362            | HydroNode::VersionedNetwork { metadata, .. } => metadata,
5363            HydroNode::Cast { metadata, .. }
5364            | HydroNode::ObserveNonDet { metadata, .. }
5365            | HydroNode::AssertIsConsistent { metadata, .. }
5366            | HydroNode::UnboundSingleton { metadata, .. }
5367            | HydroNode::Source { metadata, .. }
5368            | HydroNode::SingletonSource { metadata, .. }
5369            | HydroNode::CycleSource { metadata, .. }
5370            | HydroNode::Tee { metadata, .. }
5371            | HydroNode::Reference { metadata, .. }
5372            | HydroNode::Partition { metadata, .. }
5373            | HydroNode::YieldConcat { metadata, .. }
5374            | HydroNode::BeginAtomic { metadata, .. }
5375            | HydroNode::EndAtomic { metadata, .. }
5376            | HydroNode::Batch { metadata, .. }
5377            | HydroNode::Chain { metadata, .. }
5378            | HydroNode::MergeOrdered { metadata, .. }
5379            | HydroNode::ChainFirst { metadata, .. }
5380            | HydroNode::CrossProduct { metadata, .. }
5381            | HydroNode::CrossSingleton { metadata, .. }
5382            | HydroNode::Join { metadata, .. }
5383            | HydroNode::JoinHalf { metadata, .. }
5384            | HydroNode::Difference { metadata, .. }
5385            | HydroNode::AntiJoin { metadata, .. }
5386            | HydroNode::ResolveFutures { metadata, .. }
5387            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5388            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5389            | HydroNode::Map { metadata, .. }
5390            | HydroNode::FlatMap { metadata, .. }
5391            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5392            | HydroNode::Filter { metadata, .. }
5393            | HydroNode::FilterMap { metadata, .. }
5394            | HydroNode::DeferTick { metadata, .. }
5395            | HydroNode::Enumerate { metadata, .. }
5396            | HydroNode::Inspect { metadata, .. }
5397            | HydroNode::Unique { metadata, .. }
5398            | HydroNode::Sort { metadata, .. }
5399            | HydroNode::Scan { metadata, .. }
5400            | HydroNode::ScanAsyncBlocking { metadata, .. }
5401            | HydroNode::Fold { metadata, .. }
5402            | HydroNode::FoldKeyed { metadata, .. }
5403            | HydroNode::Reduce { metadata, .. }
5404            | HydroNode::ReduceKeyed { metadata, .. }
5405            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5406            | HydroNode::ExternalInput { metadata, .. }
5407            | HydroNode::Network { metadata, .. }
5408            | HydroNode::Counter { metadata, .. } => metadata,
5409        }
5410    }
5411
5412    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5413        &mut self.metadata_mut().op
5414    }
5415
5416    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5417        match self {
5418            HydroNode::Placeholder => {
5419                panic!()
5420            }
5421            HydroNode::VersionedNetworkFork { metadata, .. }
5422            | HydroNode::VersionedNetwork { metadata, .. } => metadata,
5423            HydroNode::Cast { metadata, .. }
5424            | HydroNode::ObserveNonDet { metadata, .. }
5425            | HydroNode::AssertIsConsistent { metadata, .. }
5426            | HydroNode::UnboundSingleton { metadata, .. }
5427            | HydroNode::Source { metadata, .. }
5428            | HydroNode::SingletonSource { metadata, .. }
5429            | HydroNode::CycleSource { metadata, .. }
5430            | HydroNode::Tee { metadata, .. }
5431            | HydroNode::Reference { metadata, .. }
5432            | HydroNode::Partition { metadata, .. }
5433            | HydroNode::YieldConcat { metadata, .. }
5434            | HydroNode::BeginAtomic { metadata, .. }
5435            | HydroNode::EndAtomic { metadata, .. }
5436            | HydroNode::Batch { metadata, .. }
5437            | HydroNode::Chain { metadata, .. }
5438            | HydroNode::MergeOrdered { metadata, .. }
5439            | HydroNode::ChainFirst { metadata, .. }
5440            | HydroNode::CrossProduct { metadata, .. }
5441            | HydroNode::CrossSingleton { metadata, .. }
5442            | HydroNode::Join { metadata, .. }
5443            | HydroNode::JoinHalf { metadata, .. }
5444            | HydroNode::Difference { metadata, .. }
5445            | HydroNode::AntiJoin { metadata, .. }
5446            | HydroNode::ResolveFutures { metadata, .. }
5447            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5448            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5449            | HydroNode::Map { metadata, .. }
5450            | HydroNode::FlatMap { metadata, .. }
5451            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5452            | HydroNode::Filter { metadata, .. }
5453            | HydroNode::FilterMap { metadata, .. }
5454            | HydroNode::DeferTick { metadata, .. }
5455            | HydroNode::Enumerate { metadata, .. }
5456            | HydroNode::Inspect { metadata, .. }
5457            | HydroNode::Unique { metadata, .. }
5458            | HydroNode::Sort { metadata, .. }
5459            | HydroNode::Scan { metadata, .. }
5460            | HydroNode::ScanAsyncBlocking { metadata, .. }
5461            | HydroNode::Fold { metadata, .. }
5462            | HydroNode::FoldKeyed { metadata, .. }
5463            | HydroNode::Reduce { metadata, .. }
5464            | HydroNode::ReduceKeyed { metadata, .. }
5465            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5466            | HydroNode::ExternalInput { metadata, .. }
5467            | HydroNode::Network { metadata, .. }
5468            | HydroNode::Counter { metadata, .. } => metadata,
5469        }
5470    }
5471
5472    pub fn input(&self) -> Vec<&HydroNode> {
5473        match self {
5474            HydroNode::Placeholder => {
5475                panic!()
5476            }
5477            HydroNode::Source { .. }
5478            | HydroNode::SingletonSource { .. }
5479            | HydroNode::ExternalInput { .. }
5480            | HydroNode::CycleSource { .. }
5481            | HydroNode::Tee { .. }
5482            | HydroNode::Reference { .. }
5483            | HydroNode::Partition { .. }
5484            | HydroNode::VersionedNetwork { .. } => {
5485                // Tee/Partition/VersionedNetwork find their input in separate special ways
5486                vec![]
5487            }
5488            HydroNode::Cast { inner, .. }
5489            | HydroNode::ObserveNonDet { inner, .. }
5490            | HydroNode::YieldConcat { inner, .. }
5491            | HydroNode::BeginAtomic { inner, .. }
5492            | HydroNode::EndAtomic { inner, .. }
5493            | HydroNode::Batch { inner, .. }
5494            | HydroNode::UnboundSingleton { inner, .. }
5495            | HydroNode::AssertIsConsistent { inner, .. } => {
5496                vec![inner]
5497            }
5498            HydroNode::Chain { first, second, .. } => {
5499                vec![first, second]
5500            }
5501            HydroNode::MergeOrdered { first, second, .. } => {
5502                vec![first, second]
5503            }
5504            HydroNode::ChainFirst { first, second, .. } => {
5505                vec![first, second]
5506            }
5507            HydroNode::CrossProduct { left, right, .. }
5508            | HydroNode::CrossSingleton { left, right, .. }
5509            | HydroNode::Join { left, right, .. }
5510            | HydroNode::JoinHalf { left, right, .. } => {
5511                vec![left, right]
5512            }
5513            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5514                vec![pos, neg]
5515            }
5516            HydroNode::Map { input, .. }
5517            | HydroNode::FlatMap { input, .. }
5518            | HydroNode::FlatMapStreamBlocking { input, .. }
5519            | HydroNode::Filter { input, .. }
5520            | HydroNode::FilterMap { input, .. }
5521            | HydroNode::Sort { input, .. }
5522            | HydroNode::DeferTick { input, .. }
5523            | HydroNode::Enumerate { input, .. }
5524            | HydroNode::Inspect { input, .. }
5525            | HydroNode::Unique { input, .. }
5526            | HydroNode::Network { input, .. }
5527            | HydroNode::Counter { input, .. }
5528            | HydroNode::ResolveFutures { input, .. }
5529            | HydroNode::ResolveFuturesBlocking { input, .. }
5530            | HydroNode::ResolveFuturesOrdered { input, .. }
5531            | HydroNode::Fold { input, .. }
5532            | HydroNode::FoldKeyed { input, .. }
5533            | HydroNode::Reduce { input, .. }
5534            | HydroNode::ReduceKeyed { input, .. }
5535            | HydroNode::Scan { input, .. }
5536            | HydroNode::ScanAsyncBlocking { input, .. } => {
5537                vec![input]
5538            }
5539            HydroNode::ReduceKeyedWatermark {
5540                input, watermark, ..
5541            } => {
5542                vec![input, watermark]
5543            }
5544            HydroNode::VersionedNetworkFork { senders, .. } => senders
5545                .iter()
5546                .map(|(_version, sender, _serialize)| sender.as_ref())
5547                .collect(),
5548        }
5549    }
5550
5551    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5552        self.input()
5553            .iter()
5554            .map(|input_node| input_node.metadata())
5555            .collect()
5556    }
5557
5558    /// Returns `true` if this node is a Tee or Partition whose inner Rc
5559    /// has other live references, meaning the upstream is already driven
5560    /// by another consumer and does not need a Null sink.
5561    pub fn is_shared_with_others(&self) -> bool {
5562        match self {
5563            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5564                Rc::strong_count(&inner.0) > 1
5565            }
5566            // A zero-output reference node is valid in DFIR (it drains itself at
5567            // end of tick), so it doesn't need to be driven by another consumer.
5568            HydroNode::Reference { .. } => false,
5569            _ => false,
5570        }
5571    }
5572
5573    pub fn print_root(&self) -> String {
5574        match self {
5575            HydroNode::Placeholder => {
5576                panic!()
5577            }
5578            HydroNode::Cast { .. } => "Cast()".to_owned(),
5579            HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5580            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5581            HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5582            HydroNode::Source { source, .. } => format!("Source({:?})", source),
5583            HydroNode::SingletonSource {
5584                value,
5585                first_tick_only,
5586                ..
5587            } => format!(
5588                "SingletonSource({:?}, first_tick_only={})",
5589                value, first_tick_only
5590            ),
5591            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5592            HydroNode::Tee { inner, .. } => {
5593                format!("Tee({})", inner.0.borrow().print_root())
5594            }
5595            HydroNode::Reference { inner, kind, .. } => {
5596                format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5597            }
5598            HydroNode::Partition { f, is_true, .. } => {
5599                format!("Partition({:?}, is_true={})", f, is_true)
5600            }
5601            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5602            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5603            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5604            HydroNode::Batch { .. } => "Batch()".to_owned(),
5605            HydroNode::Chain { first, second, .. } => {
5606                format!("Chain({}, {})", first.print_root(), second.print_root())
5607            }
5608            HydroNode::MergeOrdered { first, second, .. } => {
5609                format!(
5610                    "MergeOrdered({}, {})",
5611                    first.print_root(),
5612                    second.print_root()
5613                )
5614            }
5615            HydroNode::ChainFirst { first, second, .. } => {
5616                format!(
5617                    "ChainFirst({}, {})",
5618                    first.print_root(),
5619                    second.print_root()
5620                )
5621            }
5622            HydroNode::CrossProduct { left, right, .. } => {
5623                format!(
5624                    "CrossProduct({}, {})",
5625                    left.print_root(),
5626                    right.print_root()
5627                )
5628            }
5629            HydroNode::CrossSingleton { left, right, .. } => {
5630                format!(
5631                    "CrossSingleton({}, {})",
5632                    left.print_root(),
5633                    right.print_root()
5634                )
5635            }
5636            HydroNode::Join { left, right, .. } => {
5637                format!("Join({}, {})", left.print_root(), right.print_root())
5638            }
5639            HydroNode::JoinHalf { left, right, .. } => {
5640                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5641            }
5642            HydroNode::Difference { pos, neg, .. } => {
5643                format!("Difference({}, {})", pos.print_root(), neg.print_root())
5644            }
5645            HydroNode::AntiJoin { pos, neg, .. } => {
5646                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5647            }
5648            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5649            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5650            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5651            HydroNode::Map { f, .. } => format!("Map({:?})", f),
5652            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5653            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5654            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5655            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5656            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5657            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5658            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5659            HydroNode::Unique { .. } => "Unique()".to_owned(),
5660            HydroNode::Sort { .. } => "Sort()".to_owned(),
5661            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5662            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5663            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5664                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5665            }
5666            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5667            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5668            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5669            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5670            HydroNode::Network { .. } => "Network()".to_owned(),
5671            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5672            HydroNode::Counter { tag, duration, .. } => {
5673                format!("Counter({:?}, {:?})", tag, duration)
5674            }
5675            HydroNode::VersionedNetworkFork {
5676                channel_name,
5677                senders,
5678                ..
5679            } => {
5680                let versions: Vec<u32> = senders.iter().map(|(v, _, _)| *v).collect();
5681                format!(
5682                    "VersionedNetworkFork({}, senders={:?})",
5683                    channel_name, versions
5684                )
5685            }
5686            HydroNode::VersionedNetwork { version, .. } => {
5687                format!("VersionedNetwork(v{})", version)
5688            }
5689        }
5690    }
5691}
5692
5693#[cfg(feature = "build")]
5694fn instantiate_network<'a, D>(
5695    env: &mut D::InstantiateEnv,
5696    from_location: &LocationId,
5697    to_location: &LocationId,
5698    processes: &SparseSecondaryMap<LocationKey, D::Process>,
5699    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5700    name: Option<&str>,
5701    networking_info: &crate::networking::NetworkingInfo,
5702) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5703where
5704    D: Deploy<'a>,
5705{
5706    let ((sink, source), connect_fn) = match (from_location, to_location) {
5707        (&LocationId::Process(from), &LocationId::Process(to)) => {
5708            let from_node = processes
5709                .get(from)
5710                .unwrap_or_else(|| {
5711                    panic!("A process used in the graph was not instantiated: {}", from)
5712                })
5713                .clone();
5714            let to_node = processes
5715                .get(to)
5716                .unwrap_or_else(|| {
5717                    panic!("A process used in the graph was not instantiated: {}", to)
5718                })
5719                .clone();
5720
5721            let sink_port = from_node.next_port();
5722            let source_port = to_node.next_port();
5723
5724            (
5725                D::o2o_sink_source(
5726                    env,
5727                    &from_node,
5728                    &sink_port,
5729                    &to_node,
5730                    &source_port,
5731                    name,
5732                    networking_info,
5733                ),
5734                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5735            )
5736        }
5737        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5738            let from_node = processes
5739                .get(from)
5740                .unwrap_or_else(|| {
5741                    panic!("A process used in the graph was not instantiated: {}", from)
5742                })
5743                .clone();
5744            let to_node = clusters
5745                .get(to)
5746                .unwrap_or_else(|| {
5747                    panic!("A cluster used in the graph was not instantiated: {}", to)
5748                })
5749                .clone();
5750
5751            let sink_port = from_node.next_port();
5752            let source_port = to_node.next_port();
5753
5754            (
5755                D::o2m_sink_source(
5756                    env,
5757                    &from_node,
5758                    &sink_port,
5759                    &to_node,
5760                    &source_port,
5761                    name,
5762                    networking_info,
5763                ),
5764                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5765            )
5766        }
5767        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5768            let from_node = clusters
5769                .get(from)
5770                .unwrap_or_else(|| {
5771                    panic!("A cluster used in the graph was not instantiated: {}", from)
5772                })
5773                .clone();
5774            let to_node = processes
5775                .get(to)
5776                .unwrap_or_else(|| {
5777                    panic!("A process used in the graph was not instantiated: {}", to)
5778                })
5779                .clone();
5780
5781            let sink_port = from_node.next_port();
5782            let source_port = to_node.next_port();
5783
5784            (
5785                D::m2o_sink_source(
5786                    env,
5787                    &from_node,
5788                    &sink_port,
5789                    &to_node,
5790                    &source_port,
5791                    name,
5792                    networking_info,
5793                ),
5794                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5795            )
5796        }
5797        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5798            let from_node = clusters
5799                .get(from)
5800                .unwrap_or_else(|| {
5801                    panic!("A cluster used in the graph was not instantiated: {}", from)
5802                })
5803                .clone();
5804            let to_node = clusters
5805                .get(to)
5806                .unwrap_or_else(|| {
5807                    panic!("A cluster used in the graph was not instantiated: {}", to)
5808                })
5809                .clone();
5810
5811            let sink_port = from_node.next_port();
5812            let source_port = to_node.next_port();
5813
5814            (
5815                D::m2m_sink_source(
5816                    env,
5817                    &from_node,
5818                    &sink_port,
5819                    &to_node,
5820                    &source_port,
5821                    name,
5822                    networking_info,
5823                ),
5824                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5825            )
5826        }
5827        (LocationId::Tick(_, _), _) => panic!(),
5828        (_, LocationId::Tick(_, _)) => panic!(),
5829        (LocationId::Atomic(_), _) => panic!(),
5830        (_, LocationId::Atomic(_)) => panic!(),
5831    };
5832    (sink, source, connect_fn)
5833}
5834
5835#[cfg(test)]
5836mod serde_test;
5837
5838#[cfg(test)]
5839mod test {
5840    use std::mem::size_of;
5841
5842    use stageleft::{QuotedWithContext, q};
5843
5844    use super::*;
5845
5846    #[test]
5847    #[cfg_attr(
5848        not(feature = "build"),
5849        ignore = "expects inclusion of feature-gated fields"
5850    )]
5851    fn hydro_node_size() {
5852        assert_eq!(size_of::<HydroNode>(), 264);
5853    }
5854
5855    #[test]
5856    #[cfg_attr(
5857        not(feature = "build"),
5858        ignore = "expects inclusion of feature-gated fields"
5859    )]
5860    fn hydro_root_size() {
5861        assert_eq!(size_of::<HydroRoot>(), 136);
5862    }
5863
5864    #[test]
5865    fn test_simplify_q_macro_basic() {
5866        // Test basic non-q! expression
5867        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5868        let result = simplify_q_macro(simple_expr.clone());
5869        assert_eq!(result, simple_expr);
5870    }
5871
5872    #[test]
5873    fn test_simplify_q_macro_actual_stageleft_call() {
5874        // Test a simplified version of what a real stageleft call might look like
5875        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5876        let result = simplify_q_macro(stageleft_call);
5877        // This should be processed by our visitor and simplified to q!(...)
5878        // since we detect the stageleft::runtime_support::fn_* pattern
5879        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5880    }
5881
5882    #[test]
5883    fn test_closure_no_pipe_at_start() {
5884        // Test a closure that does not start with a pipe
5885        let stageleft_call = q!({
5886            let foo = 123;
5887            move |b: usize| b + foo
5888        })
5889        .splice_fn1_ctx(&());
5890        let result = simplify_q_macro(stageleft_call);
5891        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5892    }
5893}