1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28#[cfg(feature = "build")]
29use crate::compile::builder::StmtId;
30use crate::compile::builder::{CycleId, ExternalPortId};
31#[cfg(feature = "build")]
32use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35#[cfg(feature = "build")]
36use crate::singleton_ref::singleton_ref_ident;
37
38pub mod backtrace;
39use backtrace::Backtrace;
40
41pub struct ClosureExpr {
47 pub(crate) expr: DebugExpr,
48 pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
51}
52
53impl Clone for ClosureExpr {
54 fn clone(&self) -> Self {
55 Self {
56 expr: self.expr.clone(),
57 singleton_refs: self
58 .singleton_refs
59 .iter()
60 .map(|(node, is_mut)| {
61 let HydroNode::Singleton { inner, metadata } = node else {
62 panic!("singleton_refs should only contain HydroNode::Singleton");
63 };
64 (
65 HydroNode::Singleton {
66 inner: SharedNode(Rc::clone(&inner.0)),
67 metadata: metadata.clone(),
68 },
69 *is_mut,
70 )
71 })
72 .collect(),
73 }
74 }
75}
76
77impl Hash for ClosureExpr {
78 fn hash<H: Hasher>(&self, state: &mut H) {
79 self.expr.hash(state);
80 }
84}
85
86impl serde::Serialize for ClosureExpr {
87 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
88 use serde::ser::SerializeStruct;
89 let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
90 s.serialize_field("expr", &self.expr)?;
91 s.serialize_field(
92 "singleton_refs",
93 &SerializableSingletonRefs(&self.singleton_refs),
94 )?;
95 s.end()
96 }
97}
98
99struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
100
101impl serde::Serialize for SerializableSingletonRefs<'_> {
102 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
103 use serde::ser::SerializeSeq;
104 let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
105 for (node, is_mut) in self.0.iter() {
106 seq.serialize_element(&(node, is_mut))?;
107 }
108 seq.end()
109 }
110}
111
112impl Debug for ClosureExpr {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 Debug::fmt(&self.expr, f)
115 }
116}
117
118impl Display for ClosureExpr {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 Display::fmt(&self.expr, f)
121 }
122}
123
124impl From<syn::Expr> for ClosureExpr {
125 fn from(expr: syn::Expr) -> Self {
126 Self {
127 expr: DebugExpr(Box::new(expr)),
128 singleton_refs: Vec::new(),
129 }
130 }
131}
132
133impl From<DebugExpr> for ClosureExpr {
134 fn from(expr: DebugExpr) -> Self {
135 Self {
136 expr,
137 singleton_refs: Vec::new(),
138 }
139 }
140}
141
142impl ClosureExpr {
143 pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
144 Self {
145 expr,
146 singleton_refs,
147 }
148 }
149
150 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
151 Self {
152 expr: self.expr.clone(),
153 singleton_refs: self
154 .singleton_refs
155 .iter()
156 .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
157 .collect(),
158 }
159 }
160
161 pub fn transform_children(
162 &mut self,
163 transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
164 seen_tees: &mut SeenSharedNodes,
165 ) {
166 for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
167 transform(ref_node, seen_tees);
168 }
169 }
170
171 #[cfg(feature = "build")]
174 pub fn emit_tokens(
175 &self,
176 ident_stack: &mut Vec<syn::Ident>,
177 access_counters: &mut HashMap<*const RefCell<HydroNode>, u32>,
178 ) -> TokenStream {
179 if self.singleton_refs.is_empty() {
180 self.expr.0.to_token_stream()
181 } else {
182 assert!(
183 ident_stack.len() >= self.singleton_refs.len(),
184 "ident_stack has {} entries but expected at least {} for singleton_refs",
185 ident_stack.len(),
186 self.singleton_refs.len()
187 );
188 let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
189
190 let mut let_bindings = Vec::new();
191 for ((i, (node, is_mut)), ref_ident) in
192 self.singleton_refs.iter().enumerate().zip(ref_idents)
193 {
194 let HydroNode::Singleton { inner, .. } = node else {
195 panic!("singleton_refs should only contain HydroNode::Singleton");
196 };
197 let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
198 let counter = access_counters.entry(ptr).or_insert(0);
199 let group = if *is_mut {
200 *counter += 1;
201 let g = *counter;
202 *counter += 1;
203 g
204 } else {
205 *counter
206 };
207
208 let local_ident = singleton_ref_ident(i);
210 let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
211 let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
212 let mut_token = is_mut.then(|| quote!(mut));
213 let binding = quote! {
214 let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
215 };
216 let_bindings.push(binding);
217 }
218
219 let expr = &self.expr.0;
220 quote! {
221 {
222 #( #let_bindings )*
223 #expr
224 }
225 }
226 }
227 }
228}
229
230#[derive(Clone, Hash)]
234pub struct DebugExpr(pub Box<syn::Expr>);
235
236impl serde::Serialize for DebugExpr {
237 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
238 serializer.serialize_str(&self.to_string())
239 }
240}
241
242impl From<syn::Expr> for DebugExpr {
243 fn from(expr: syn::Expr) -> Self {
244 Self(Box::new(expr))
245 }
246}
247
248impl Deref for DebugExpr {
249 type Target = syn::Expr;
250
251 fn deref(&self) -> &Self::Target {
252 &self.0
253 }
254}
255
256impl ToTokens for DebugExpr {
257 fn to_tokens(&self, tokens: &mut TokenStream) {
258 self.0.to_tokens(tokens);
259 }
260}
261
262impl Debug for DebugExpr {
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 write!(f, "{}", self.0.to_token_stream())
265 }
266}
267
268impl Display for DebugExpr {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 let original = self.0.as_ref().clone();
271 let simplified = simplify_q_macro(original);
272
273 write!(f, "q!({})", quote::quote!(#simplified))
276 }
277}
278
279fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
281 let mut simplifier = QMacroSimplifier::new();
284 simplifier.visit_expr_mut(&mut expr);
285
286 if let Some(simplified) = simplifier.simplified_result {
288 simplified
289 } else {
290 expr
291 }
292}
293
294#[derive(Default)]
296pub struct QMacroSimplifier {
297 pub simplified_result: Option<syn::Expr>,
298}
299
300impl QMacroSimplifier {
301 pub fn new() -> Self {
302 Self::default()
303 }
304}
305
306impl VisitMut for QMacroSimplifier {
307 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
308 if self.simplified_result.is_some() {
310 return;
311 }
312
313 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
314 && self.is_stageleft_runtime_support_call(&path_expr.path)
316 && let Some(closure) = self.extract_closure_from_args(&call.args)
318 {
319 self.simplified_result = Some(closure);
320 return;
321 }
322
323 syn::visit_mut::visit_expr_mut(self, expr);
326 }
327}
328
329impl QMacroSimplifier {
330 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
331 if let Some(last_segment) = path.segments.last() {
333 let fn_name = last_segment.ident.to_string();
334 fn_name.contains("_type_hint")
336 && path.segments.len() > 2
337 && path.segments[0].ident == "stageleft"
338 && path.segments[1].ident == "runtime_support"
339 } else {
340 false
341 }
342 }
343
344 fn extract_closure_from_args(
345 &self,
346 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
347 ) -> Option<syn::Expr> {
348 for arg in args {
350 if let syn::Expr::Closure(_) = arg {
351 return Some(arg.clone());
352 }
353 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
355 return Some(closure_expr);
356 }
357 }
358 None
359 }
360
361 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
362 let mut visitor = ClosureFinder {
363 found_closure: None,
364 prefer_inner_blocks: true,
365 };
366 visitor.visit_expr(expr);
367 visitor.found_closure
368 }
369}
370
371struct ClosureFinder {
373 found_closure: Option<syn::Expr>,
374 prefer_inner_blocks: bool,
375}
376
377impl<'ast> Visit<'ast> for ClosureFinder {
378 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
379 if self.found_closure.is_some() {
381 return;
382 }
383
384 match expr {
385 syn::Expr::Closure(_) => {
386 self.found_closure = Some(expr.clone());
387 }
388 syn::Expr::Block(block) if self.prefer_inner_blocks => {
389 for stmt in &block.block.stmts {
391 if let syn::Stmt::Expr(stmt_expr, _) = stmt
392 && let syn::Expr::Block(_) = stmt_expr
393 {
394 let mut inner_visitor = ClosureFinder {
396 found_closure: None,
397 prefer_inner_blocks: false, };
399 inner_visitor.visit_expr(stmt_expr);
400 if inner_visitor.found_closure.is_some() {
401 self.found_closure = Some(stmt_expr.clone());
403 return;
404 }
405 }
406 }
407
408 visit::visit_expr(self, expr);
410
411 if self.found_closure.is_some() {
414 }
416 }
417 _ => {
418 visit::visit_expr(self, expr);
420 }
421 }
422 }
423}
424
425#[derive(Clone, PartialEq, Eq, Hash)]
429pub struct DebugType(pub Box<syn::Type>);
430
431impl From<syn::Type> for DebugType {
432 fn from(t: syn::Type) -> Self {
433 Self(Box::new(t))
434 }
435}
436
437impl Deref for DebugType {
438 type Target = syn::Type;
439
440 fn deref(&self) -> &Self::Target {
441 &self.0
442 }
443}
444
445impl ToTokens for DebugType {
446 fn to_tokens(&self, tokens: &mut TokenStream) {
447 self.0.to_tokens(tokens);
448 }
449}
450
451impl Debug for DebugType {
452 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
453 write!(f, "{}", self.0.to_token_stream())
454 }
455}
456
457impl serde::Serialize for DebugType {
458 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
459 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
460 }
461}
462
463fn serialize_backtrace_as_span<S: serde::Serializer>(
464 backtrace: &Backtrace,
465 serializer: S,
466) -> Result<S::Ok, S::Error> {
467 match backtrace.format_span() {
468 Some(span) => serializer.serialize_some(&span),
469 None => serializer.serialize_none(),
470 }
471}
472
473fn serialize_ident<S: serde::Serializer>(
474 ident: &syn::Ident,
475 serializer: S,
476) -> Result<S::Ok, S::Error> {
477 serializer.serialize_str(&ident.to_string())
478}
479
480pub enum DebugInstantiate {
481 Building,
482 Finalized(Box<DebugInstantiateFinalized>),
483}
484
485impl serde::Serialize for DebugInstantiate {
486 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
487 match self {
488 DebugInstantiate::Building => {
489 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
490 }
491 DebugInstantiate::Finalized(_) => {
492 panic!(
493 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
494 )
495 }
496 }
497 }
498}
499
500#[cfg_attr(
501 not(feature = "build"),
502 expect(
503 dead_code,
504 reason = "sink, source unused without `feature = \"build\"`."
505 )
506)]
507pub struct DebugInstantiateFinalized {
508 sink: syn::Expr,
509 source: syn::Expr,
510 connect_fn: Option<Box<dyn FnOnce()>>,
511}
512
513impl From<DebugInstantiateFinalized> for DebugInstantiate {
514 fn from(f: DebugInstantiateFinalized) -> Self {
515 Self::Finalized(Box::new(f))
516 }
517}
518
519impl Debug for DebugInstantiate {
520 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
521 write!(f, "<network instantiate>")
522 }
523}
524
525impl Hash for DebugInstantiate {
526 fn hash<H: Hasher>(&self, _state: &mut H) {
527 }
529}
530
531impl Clone for DebugInstantiate {
532 fn clone(&self) -> Self {
533 match self {
534 DebugInstantiate::Building => DebugInstantiate::Building,
535 DebugInstantiate::Finalized(_) => {
536 panic!("DebugInstantiate::Finalized should not be cloned")
537 }
538 }
539 }
540}
541
542#[derive(Debug, Hash, Clone, serde::Serialize)]
551pub enum ClusterMembersState {
552 Uninit,
554 Stream(DebugExpr),
557 Tee(LocationId, LocationId),
561}
562
563#[derive(Debug, Hash, Clone, serde::Serialize)]
565pub enum HydroSource {
566 Stream(DebugExpr),
567 ExternalNetwork(),
568 Iter(DebugExpr),
569 Spin(),
570 ClusterMembers(LocationId, ClusterMembersState),
571 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
572 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
573}
574
575#[cfg(feature = "build")]
576pub trait DfirBuilder {
582 fn singleton_intermediates(&self) -> bool;
584
585 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
587
588 #[expect(clippy::too_many_arguments, reason = "TODO")]
589 fn batch(
590 &mut self,
591 in_ident: syn::Ident,
592 in_location: &LocationId,
593 in_kind: &CollectionKind,
594 out_ident: &syn::Ident,
595 out_location: &LocationId,
596 op_meta: &HydroIrOpMetadata,
597 fold_hooked_idents: &HashSet<String>,
598 );
599 fn yield_from_tick(
600 &mut self,
601 in_ident: syn::Ident,
602 in_location: &LocationId,
603 in_kind: &CollectionKind,
604 out_ident: &syn::Ident,
605 out_location: &LocationId,
606 );
607
608 fn begin_atomic(
609 &mut self,
610 in_ident: syn::Ident,
611 in_location: &LocationId,
612 in_kind: &CollectionKind,
613 out_ident: &syn::Ident,
614 out_location: &LocationId,
615 op_meta: &HydroIrOpMetadata,
616 );
617 fn end_atomic(
618 &mut self,
619 in_ident: syn::Ident,
620 in_location: &LocationId,
621 in_kind: &CollectionKind,
622 out_ident: &syn::Ident,
623 );
624
625 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
626 fn observe_nondet(
627 &mut self,
628 trusted: bool,
629 location: &LocationId,
630 in_ident: syn::Ident,
631 in_kind: &CollectionKind,
632 out_ident: &syn::Ident,
633 out_kind: &CollectionKind,
634 op_meta: &HydroIrOpMetadata,
635 );
636
637 #[expect(clippy::too_many_arguments, reason = "TODO")]
638 fn merge_ordered(
639 &mut self,
640 location: &LocationId,
641 first_ident: syn::Ident,
642 second_ident: syn::Ident,
643 out_ident: &syn::Ident,
644 in_kind: &CollectionKind,
645 op_meta: &HydroIrOpMetadata,
646 operator_tag: Option<&str>,
647 );
648
649 #[expect(clippy::too_many_arguments, reason = "TODO")]
650 fn create_network(
651 &mut self,
652 from: &LocationId,
653 to: &LocationId,
654 input_ident: syn::Ident,
655 out_ident: &syn::Ident,
656 serialize: Option<&DebugExpr>,
657 sink: syn::Expr,
658 source: syn::Expr,
659 deserialize: Option<&DebugExpr>,
660 tag_id: StmtId,
661 networking_info: &crate::networking::NetworkingInfo,
662 );
663
664 fn create_external_source(
665 &mut self,
666 on: &LocationId,
667 source_expr: syn::Expr,
668 out_ident: &syn::Ident,
669 deserialize: Option<&DebugExpr>,
670 tag_id: StmtId,
671 );
672
673 fn create_external_output(
674 &mut self,
675 on: &LocationId,
676 sink_expr: syn::Expr,
677 input_ident: &syn::Ident,
678 serialize: Option<&DebugExpr>,
679 tag_id: StmtId,
680 );
681
682 fn emit_fold_hook(
685 &mut self,
686 location: &LocationId,
687 in_ident: &syn::Ident,
688 in_kind: &CollectionKind,
689 op_meta: &HydroIrOpMetadata,
690 ) -> Option<syn::Ident>;
691
692 fn assert_is_consistent(
696 &mut self,
697 trusted: bool,
698 location: &LocationId,
699 in_ident: syn::Ident,
700 out_ident: &syn::Ident,
701 );
702}
703
704#[cfg(feature = "build")]
705impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
706 fn singleton_intermediates(&self) -> bool {
707 false
708 }
709
710 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
711 self.entry(location.root().key())
712 .expect("location was removed")
713 .or_default()
714 }
715
716 fn batch(
717 &mut self,
718 in_ident: syn::Ident,
719 in_location: &LocationId,
720 in_kind: &CollectionKind,
721 out_ident: &syn::Ident,
722 _out_location: &LocationId,
723 _op_meta: &HydroIrOpMetadata,
724 _fold_hooked_idents: &HashSet<String>,
725 ) {
726 let builder = self.get_dfir_mut(in_location.root());
727 if in_kind.is_bounded()
728 && matches!(
729 in_kind,
730 CollectionKind::Singleton { .. }
731 | CollectionKind::Optional { .. }
732 | CollectionKind::KeyedSingleton { .. }
733 )
734 {
735 assert!(in_location.is_top_level());
736 builder.add_dfir(
737 parse_quote! {
738 #out_ident = #in_ident -> persist::<'static>();
739 },
740 None,
741 None,
742 );
743 } else {
744 builder.add_dfir(
745 parse_quote! {
746 #out_ident = #in_ident;
747 },
748 None,
749 None,
750 );
751 }
752 }
753
754 fn yield_from_tick(
755 &mut self,
756 in_ident: syn::Ident,
757 in_location: &LocationId,
758 _in_kind: &CollectionKind,
759 out_ident: &syn::Ident,
760 _out_location: &LocationId,
761 ) {
762 let builder = self.get_dfir_mut(in_location.root());
763 builder.add_dfir(
764 parse_quote! {
765 #out_ident = #in_ident;
766 },
767 None,
768 None,
769 );
770 }
771
772 fn begin_atomic(
773 &mut self,
774 in_ident: syn::Ident,
775 in_location: &LocationId,
776 _in_kind: &CollectionKind,
777 out_ident: &syn::Ident,
778 _out_location: &LocationId,
779 _op_meta: &HydroIrOpMetadata,
780 ) {
781 let builder = self.get_dfir_mut(in_location.root());
782 builder.add_dfir(
783 parse_quote! {
784 #out_ident = #in_ident;
785 },
786 None,
787 None,
788 );
789 }
790
791 fn end_atomic(
792 &mut self,
793 in_ident: syn::Ident,
794 in_location: &LocationId,
795 _in_kind: &CollectionKind,
796 out_ident: &syn::Ident,
797 ) {
798 let builder = self.get_dfir_mut(in_location.root());
799 builder.add_dfir(
800 parse_quote! {
801 #out_ident = #in_ident;
802 },
803 None,
804 None,
805 );
806 }
807
808 fn observe_nondet(
809 &mut self,
810 _trusted: bool,
811 location: &LocationId,
812 in_ident: syn::Ident,
813 _in_kind: &CollectionKind,
814 out_ident: &syn::Ident,
815 _out_kind: &CollectionKind,
816 _op_meta: &HydroIrOpMetadata,
817 ) {
818 let builder = self.get_dfir_mut(location);
819 builder.add_dfir(
820 parse_quote! {
821 #out_ident = #in_ident;
822 },
823 None,
824 None,
825 );
826 }
827
828 fn merge_ordered(
829 &mut self,
830 location: &LocationId,
831 first_ident: syn::Ident,
832 second_ident: syn::Ident,
833 out_ident: &syn::Ident,
834 _in_kind: &CollectionKind,
835 _op_meta: &HydroIrOpMetadata,
836 operator_tag: Option<&str>,
837 ) {
838 let builder = self.get_dfir_mut(location);
839 builder.add_dfir(
840 parse_quote! {
841 #out_ident = union();
842 #first_ident -> [0]#out_ident;
843 #second_ident -> [1]#out_ident;
844 },
845 None,
846 operator_tag,
847 );
848 }
849
850 fn create_network(
851 &mut self,
852 from: &LocationId,
853 to: &LocationId,
854 input_ident: syn::Ident,
855 out_ident: &syn::Ident,
856 serialize: Option<&DebugExpr>,
857 sink: syn::Expr,
858 source: syn::Expr,
859 deserialize: Option<&DebugExpr>,
860 tag_id: StmtId,
861 _networking_info: &crate::networking::NetworkingInfo,
862 ) {
863 let sender_builder = self.get_dfir_mut(from);
864 if let Some(serialize_pipeline) = serialize {
865 sender_builder.add_dfir(
866 parse_quote! {
867 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
868 },
869 None,
870 Some(&format!("send{}", tag_id)),
872 );
873 } else {
874 sender_builder.add_dfir(
875 parse_quote! {
876 #input_ident -> dest_sink(#sink);
877 },
878 None,
879 Some(&format!("send{}", tag_id)),
880 );
881 }
882
883 let receiver_builder = self.get_dfir_mut(to);
884 if let Some(deserialize_pipeline) = deserialize {
885 receiver_builder.add_dfir(
886 parse_quote! {
887 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
888 },
889 None,
890 Some(&format!("recv{}", tag_id)),
891 );
892 } else {
893 receiver_builder.add_dfir(
894 parse_quote! {
895 #out_ident = source_stream(#source);
896 },
897 None,
898 Some(&format!("recv{}", tag_id)),
899 );
900 }
901 }
902
903 fn create_external_source(
904 &mut self,
905 on: &LocationId,
906 source_expr: syn::Expr,
907 out_ident: &syn::Ident,
908 deserialize: Option<&DebugExpr>,
909 tag_id: StmtId,
910 ) {
911 let receiver_builder = self.get_dfir_mut(on);
912 if let Some(deserialize_pipeline) = deserialize {
913 receiver_builder.add_dfir(
914 parse_quote! {
915 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
916 },
917 None,
918 Some(&format!("recv{}", tag_id)),
919 );
920 } else {
921 receiver_builder.add_dfir(
922 parse_quote! {
923 #out_ident = source_stream(#source_expr);
924 },
925 None,
926 Some(&format!("recv{}", tag_id)),
927 );
928 }
929 }
930
931 fn create_external_output(
932 &mut self,
933 on: &LocationId,
934 sink_expr: syn::Expr,
935 input_ident: &syn::Ident,
936 serialize: Option<&DebugExpr>,
937 tag_id: StmtId,
938 ) {
939 let sender_builder = self.get_dfir_mut(on);
940 if let Some(serialize_fn) = serialize {
941 sender_builder.add_dfir(
942 parse_quote! {
943 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
944 },
945 None,
946 Some(&format!("send{}", tag_id)),
948 );
949 } else {
950 sender_builder.add_dfir(
951 parse_quote! {
952 #input_ident -> dest_sink(#sink_expr);
953 },
954 None,
955 Some(&format!("send{}", tag_id)),
956 );
957 }
958 }
959
960 fn emit_fold_hook(
961 &mut self,
962 _location: &LocationId,
963 _in_ident: &syn::Ident,
964 _in_kind: &CollectionKind,
965 _op_meta: &HydroIrOpMetadata,
966 ) -> Option<syn::Ident> {
967 None
968 }
969
970 fn assert_is_consistent(
971 &mut self,
972 _trusted: bool,
973 location: &LocationId,
974 in_ident: syn::Ident,
975 out_ident: &syn::Ident,
976 ) {
977 let builder = self.get_dfir_mut(location);
978 builder.add_dfir(
979 parse_quote! {
980 #out_ident = #in_ident;
981 },
982 None,
983 None,
984 );
985 }
986}
987
988#[cfg(feature = "build")]
989pub enum BuildersOrCallback<'a, L, N>
990where
991 L: FnMut(&mut HydroRoot, &mut StmtId),
992 N: FnMut(&mut HydroNode, &mut StmtId),
993{
994 Builders(&'a mut dyn DfirBuilder),
995 Callback(L, N),
996}
997
998#[derive(Debug, Hash, serde::Serialize)]
1002pub enum HydroRoot {
1003 ForEach {
1004 f: DebugExpr,
1005 input: Box<HydroNode>,
1006 op_metadata: HydroIrOpMetadata,
1007 },
1008 SendExternal {
1009 to_external_key: LocationKey,
1010 to_port_id: ExternalPortId,
1011 to_many: bool,
1012 unpaired: bool,
1013 serialize_fn: Option<DebugExpr>,
1014 instantiate_fn: DebugInstantiate,
1015 input: Box<HydroNode>,
1016 op_metadata: HydroIrOpMetadata,
1017 },
1018 DestSink {
1019 sink: DebugExpr,
1020 input: Box<HydroNode>,
1021 op_metadata: HydroIrOpMetadata,
1022 },
1023 CycleSink {
1024 cycle_id: CycleId,
1025 input: Box<HydroNode>,
1026 op_metadata: HydroIrOpMetadata,
1027 },
1028 EmbeddedOutput {
1029 #[serde(serialize_with = "serialize_ident")]
1030 ident: syn::Ident,
1031 input: Box<HydroNode>,
1032 op_metadata: HydroIrOpMetadata,
1033 },
1034 Null {
1035 input: Box<HydroNode>,
1036 op_metadata: HydroIrOpMetadata,
1037 },
1038}
1039
1040impl HydroRoot {
1041 #[cfg(feature = "build")]
1042 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
1043 pub fn compile_network<'a, D>(
1044 &mut self,
1045 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
1046 seen_tees: &mut SeenSharedNodes,
1047 seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
1048 processes: &SparseSecondaryMap<LocationKey, D::Process>,
1049 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
1050 externals: &SparseSecondaryMap<LocationKey, D::External>,
1051 env: &mut D::InstantiateEnv,
1052 ) where
1053 D: Deploy<'a>,
1054 {
1055 let refcell_extra_stmts = RefCell::new(extra_stmts);
1056 let refcell_env = RefCell::new(env);
1057 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
1058 self.transform_bottom_up(
1059 &mut |l| {
1060 if let HydroRoot::SendExternal {
1061 input,
1062 to_external_key,
1063 to_port_id,
1064 to_many,
1065 unpaired,
1066 instantiate_fn,
1067 ..
1068 } = l
1069 {
1070 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1071 DebugInstantiate::Building => {
1072 let to_node = externals
1073 .get(*to_external_key)
1074 .unwrap_or_else(|| {
1075 panic!("A external used in the graph was not instantiated: {}", to_external_key)
1076 })
1077 .clone();
1078
1079 match input.metadata().location_id.root() {
1080 &LocationId::Process(process_key) => {
1081 if *to_many {
1082 (
1083 (
1084 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1085 parse_quote!(DUMMY),
1086 ),
1087 Box::new(|| {}) as Box<dyn FnOnce()>,
1088 )
1089 } else {
1090 let from_node = processes
1091 .get(process_key)
1092 .unwrap_or_else(|| {
1093 panic!("A process used in the graph was not instantiated: {}", process_key)
1094 })
1095 .clone();
1096
1097 let sink_port = from_node.next_port();
1098 let source_port = to_node.next_port();
1099
1100 if *unpaired {
1101 use stageleft::quote_type;
1102 use tokio_util::codec::LengthDelimitedCodec;
1103
1104 to_node.register(*to_port_id, source_port.clone());
1105
1106 let _ = D::e2o_source(
1107 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1108 &to_node, &source_port,
1109 &from_node, &sink_port,
1110 "e_type::<LengthDelimitedCodec>(),
1111 format!("{}_{}", *to_external_key, *to_port_id)
1112 );
1113 }
1114
1115 (
1116 (
1117 D::o2e_sink(
1118 &from_node,
1119 &sink_port,
1120 &to_node,
1121 &source_port,
1122 format!("{}_{}", *to_external_key, *to_port_id)
1123 ),
1124 parse_quote!(DUMMY),
1125 ),
1126 if *unpaired {
1127 D::e2o_connect(
1128 &to_node,
1129 &source_port,
1130 &from_node,
1131 &sink_port,
1132 *to_many,
1133 NetworkHint::Auto,
1134 )
1135 } else {
1136 Box::new(|| {}) as Box<dyn FnOnce()>
1137 },
1138 )
1139 }
1140 }
1141 LocationId::Cluster(cluster_key) => {
1142 let from_node = clusters
1143 .get(*cluster_key)
1144 .unwrap_or_else(|| {
1145 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1146 })
1147 .clone();
1148
1149 let sink_port = from_node.next_port();
1150 let source_port = to_node.next_port();
1151
1152 if *unpaired {
1153 to_node.register(*to_port_id, source_port.clone());
1154 }
1155
1156 (
1157 (
1158 D::m2e_sink(
1159 &from_node,
1160 &sink_port,
1161 &to_node,
1162 &source_port,
1163 format!("{}_{}", *to_external_key, *to_port_id)
1164 ),
1165 parse_quote!(DUMMY),
1166 ),
1167 Box::new(|| {}) as Box<dyn FnOnce()>,
1168 )
1169 }
1170 _ => panic!()
1171 }
1172 },
1173
1174 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1175 };
1176
1177 *instantiate_fn = DebugInstantiateFinalized {
1178 sink: sink_expr,
1179 source: source_expr,
1180 connect_fn: Some(connect_fn),
1181 }
1182 .into();
1183 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1184 let element_type = match &input.metadata().collection_kind {
1185 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1186 _ => panic!("Embedded output must have Stream collection kind"),
1187 };
1188 let location_key = match input.metadata().location_id.root() {
1189 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1190 _ => panic!("Embedded output must be on a process or cluster"),
1191 };
1192 D::register_embedded_output(
1193 &mut refcell_env.borrow_mut(),
1194 location_key,
1195 ident,
1196 &element_type,
1197 );
1198 }
1199 },
1200 &mut |n| {
1201 if let HydroNode::Network {
1202 name,
1203 networking_info,
1204 input,
1205 instantiate_fn,
1206 metadata,
1207 ..
1208 } = n
1209 {
1210 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1211 DebugInstantiate::Building => instantiate_network::<D>(
1212 &mut refcell_env.borrow_mut(),
1213 input.metadata().location_id.root(),
1214 metadata.location_id.root(),
1215 processes,
1216 clusters,
1217 name.as_deref(),
1218 networking_info,
1219 ),
1220
1221 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1222 };
1223
1224 *instantiate_fn = DebugInstantiateFinalized {
1225 sink: sink_expr,
1226 source: source_expr,
1227 connect_fn: Some(connect_fn),
1228 }
1229 .into();
1230 } else if let HydroNode::ExternalInput {
1231 from_external_key,
1232 from_port_id,
1233 from_many,
1234 codec_type,
1235 port_hint,
1236 instantiate_fn,
1237 metadata,
1238 ..
1239 } = n
1240 {
1241 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1242 DebugInstantiate::Building => {
1243 let from_node = externals
1244 .get(*from_external_key)
1245 .unwrap_or_else(|| {
1246 panic!(
1247 "A external used in the graph was not instantiated: {}",
1248 from_external_key,
1249 )
1250 })
1251 .clone();
1252
1253 match metadata.location_id.root() {
1254 &LocationId::Process(process_key) => {
1255 let to_node = processes
1256 .get(process_key)
1257 .unwrap_or_else(|| {
1258 panic!("A process used in the graph was not instantiated: {}", process_key)
1259 })
1260 .clone();
1261
1262 let sink_port = from_node.next_port();
1263 let source_port = to_node.next_port();
1264
1265 from_node.register(*from_port_id, sink_port.clone());
1266
1267 (
1268 (
1269 parse_quote!(DUMMY),
1270 if *from_many {
1271 D::e2o_many_source(
1272 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1273 &to_node, &source_port,
1274 codec_type.0.as_ref(),
1275 format!("{}_{}", *from_external_key, *from_port_id)
1276 )
1277 } else {
1278 D::e2o_source(
1279 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1280 &from_node, &sink_port,
1281 &to_node, &source_port,
1282 codec_type.0.as_ref(),
1283 format!("{}_{}", *from_external_key, *from_port_id)
1284 )
1285 },
1286 ),
1287 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1288 )
1289 }
1290 LocationId::Cluster(cluster_key) => {
1291 let to_node = clusters
1292 .get(*cluster_key)
1293 .unwrap_or_else(|| {
1294 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1295 })
1296 .clone();
1297
1298 let sink_port = from_node.next_port();
1299 let source_port = to_node.next_port();
1300
1301 from_node.register(*from_port_id, sink_port.clone());
1302
1303 (
1304 (
1305 parse_quote!(DUMMY),
1306 D::e2m_source(
1307 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1308 &from_node, &sink_port,
1309 &to_node, &source_port,
1310 codec_type.0.as_ref(),
1311 format!("{}_{}", *from_external_key, *from_port_id)
1312 ),
1313 ),
1314 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1315 )
1316 }
1317 _ => panic!()
1318 }
1319 },
1320
1321 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1322 };
1323
1324 *instantiate_fn = DebugInstantiateFinalized {
1325 sink: sink_expr,
1326 source: source_expr,
1327 connect_fn: Some(connect_fn),
1328 }
1329 .into();
1330 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1331 let element_type = match &metadata.collection_kind {
1332 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1333 _ => panic!("Embedded source must have Stream collection kind"),
1334 };
1335 let location_key = match metadata.location_id.root() {
1336 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1337 _ => panic!("Embedded source must be on a process or cluster"),
1338 };
1339 D::register_embedded_stream_input(
1340 &mut refcell_env.borrow_mut(),
1341 location_key,
1342 ident,
1343 &element_type,
1344 );
1345 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1346 let element_type = match &metadata.collection_kind {
1347 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1348 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1349 };
1350 let location_key = match metadata.location_id.root() {
1351 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1352 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1353 };
1354 D::register_embedded_singleton_input(
1355 &mut refcell_env.borrow_mut(),
1356 location_key,
1357 ident,
1358 &element_type,
1359 );
1360 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1361 match state {
1362 ClusterMembersState::Uninit => {
1363 let at_location = metadata.location_id.root().clone();
1364 let key = (at_location.clone(), location_id.key());
1365 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1366 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1368 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1369 &(),
1370 );
1371 *state = ClusterMembersState::Stream(expr.into());
1372 } else {
1373 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1375 }
1376 }
1377 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1378 panic!("cluster members already finalized");
1379 }
1380 }
1381 }
1382 },
1383 seen_tees,
1384 false,
1385 );
1386 }
1387
1388 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1389 self.transform_bottom_up(
1390 &mut |l| {
1391 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1392 match instantiate_fn {
1393 DebugInstantiate::Building => panic!("network not built"),
1394
1395 DebugInstantiate::Finalized(finalized) => {
1396 (finalized.connect_fn.take().unwrap())();
1397 }
1398 }
1399 }
1400 },
1401 &mut |n| {
1402 if let HydroNode::Network { instantiate_fn, .. }
1403 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1404 {
1405 match instantiate_fn {
1406 DebugInstantiate::Building => panic!("network not built"),
1407
1408 DebugInstantiate::Finalized(finalized) => {
1409 (finalized.connect_fn.take().unwrap())();
1410 }
1411 }
1412 }
1413 },
1414 seen_tees,
1415 false,
1416 );
1417 }
1418
1419 pub fn transform_bottom_up(
1420 &mut self,
1421 transform_root: &mut impl FnMut(&mut HydroRoot),
1422 transform_node: &mut impl FnMut(&mut HydroNode),
1423 seen_tees: &mut SeenSharedNodes,
1424 check_well_formed: bool,
1425 ) {
1426 self.transform_children(
1427 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1428 seen_tees,
1429 );
1430
1431 transform_root(self);
1432 }
1433
1434 pub fn transform_children(
1435 &mut self,
1436 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1437 seen_tees: &mut SeenSharedNodes,
1438 ) {
1439 match self {
1440 HydroRoot::ForEach { input, .. }
1441 | HydroRoot::SendExternal { input, .. }
1442 | HydroRoot::DestSink { input, .. }
1443 | HydroRoot::CycleSink { input, .. }
1444 | HydroRoot::EmbeddedOutput { input, .. }
1445 | HydroRoot::Null { input, .. } => {
1446 transform(input, seen_tees);
1447 }
1448 }
1449 }
1450
1451 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1452 match self {
1453 HydroRoot::ForEach {
1454 f,
1455 input,
1456 op_metadata,
1457 } => HydroRoot::ForEach {
1458 f: f.clone(),
1459 input: Box::new(input.deep_clone(seen_tees)),
1460 op_metadata: op_metadata.clone(),
1461 },
1462 HydroRoot::SendExternal {
1463 to_external_key,
1464 to_port_id,
1465 to_many,
1466 unpaired,
1467 serialize_fn,
1468 instantiate_fn,
1469 input,
1470 op_metadata,
1471 } => HydroRoot::SendExternal {
1472 to_external_key: *to_external_key,
1473 to_port_id: *to_port_id,
1474 to_many: *to_many,
1475 unpaired: *unpaired,
1476 serialize_fn: serialize_fn.clone(),
1477 instantiate_fn: instantiate_fn.clone(),
1478 input: Box::new(input.deep_clone(seen_tees)),
1479 op_metadata: op_metadata.clone(),
1480 },
1481 HydroRoot::DestSink {
1482 sink,
1483 input,
1484 op_metadata,
1485 } => HydroRoot::DestSink {
1486 sink: sink.clone(),
1487 input: Box::new(input.deep_clone(seen_tees)),
1488 op_metadata: op_metadata.clone(),
1489 },
1490 HydroRoot::CycleSink {
1491 cycle_id,
1492 input,
1493 op_metadata,
1494 } => HydroRoot::CycleSink {
1495 cycle_id: *cycle_id,
1496 input: Box::new(input.deep_clone(seen_tees)),
1497 op_metadata: op_metadata.clone(),
1498 },
1499 HydroRoot::EmbeddedOutput {
1500 ident,
1501 input,
1502 op_metadata,
1503 } => HydroRoot::EmbeddedOutput {
1504 ident: ident.clone(),
1505 input: Box::new(input.deep_clone(seen_tees)),
1506 op_metadata: op_metadata.clone(),
1507 },
1508 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1509 input: Box::new(input.deep_clone(seen_tees)),
1510 op_metadata: op_metadata.clone(),
1511 },
1512 }
1513 }
1514
1515 #[cfg(feature = "build")]
1516 pub fn emit(
1517 &mut self,
1518 graph_builders: &mut dyn DfirBuilder,
1519 seen_tees: &mut SeenSharedNodes,
1520 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1521 next_stmt_id: &mut StmtId,
1522 fold_hooked_idents: &mut HashSet<String>,
1523 ) {
1524 self.emit_core(
1525 &mut BuildersOrCallback::<
1526 fn(&mut HydroRoot, &mut StmtId),
1527 fn(&mut HydroNode, &mut StmtId),
1528 >::Builders(graph_builders),
1529 seen_tees,
1530 built_tees,
1531 next_stmt_id,
1532 fold_hooked_idents,
1533 );
1534 }
1535
1536 #[cfg(feature = "build")]
1537 pub fn emit_core(
1538 &mut self,
1539 builders_or_callback: &mut BuildersOrCallback<
1540 impl FnMut(&mut HydroRoot, &mut StmtId),
1541 impl FnMut(&mut HydroNode, &mut StmtId),
1542 >,
1543 seen_tees: &mut SeenSharedNodes,
1544 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1545 next_stmt_id: &mut StmtId,
1546 fold_hooked_idents: &mut HashSet<String>,
1547 ) {
1548 match self {
1549 HydroRoot::ForEach { f, input, .. } => {
1550 let input_ident = input.emit_core(
1551 builders_or_callback,
1552 seen_tees,
1553 built_tees,
1554 next_stmt_id,
1555 fold_hooked_idents,
1556 );
1557
1558 match builders_or_callback {
1559 BuildersOrCallback::Builders(graph_builders) => {
1560 graph_builders
1561 .get_dfir_mut(&input.metadata().location_id)
1562 .add_dfir(
1563 parse_quote! {
1564 #input_ident -> for_each(#f);
1565 },
1566 None,
1567 Some(&next_stmt_id.to_string()),
1568 );
1569 }
1570 BuildersOrCallback::Callback(leaf_callback, _) => {
1571 leaf_callback(self, next_stmt_id);
1572 }
1573 }
1574
1575 let _ = next_stmt_id.get_and_increment();
1576 }
1577
1578 HydroRoot::SendExternal {
1579 serialize_fn,
1580 instantiate_fn,
1581 input,
1582 ..
1583 } => {
1584 let input_ident = input.emit_core(
1585 builders_or_callback,
1586 seen_tees,
1587 built_tees,
1588 next_stmt_id,
1589 fold_hooked_idents,
1590 );
1591
1592 match builders_or_callback {
1593 BuildersOrCallback::Builders(graph_builders) => {
1594 let (sink_expr, _) = match instantiate_fn {
1595 DebugInstantiate::Building => (
1596 syn::parse_quote!(DUMMY_SINK),
1597 syn::parse_quote!(DUMMY_SOURCE),
1598 ),
1599
1600 DebugInstantiate::Finalized(finalized) => {
1601 (finalized.sink.clone(), finalized.source.clone())
1602 }
1603 };
1604
1605 graph_builders.create_external_output(
1606 &input.metadata().location_id,
1607 sink_expr,
1608 &input_ident,
1609 serialize_fn.as_ref(),
1610 *next_stmt_id,
1611 );
1612 }
1613 BuildersOrCallback::Callback(leaf_callback, _) => {
1614 leaf_callback(self, next_stmt_id);
1615 }
1616 }
1617
1618 let _ = next_stmt_id.get_and_increment();
1619 }
1620
1621 HydroRoot::DestSink { sink, input, .. } => {
1622 let input_ident = input.emit_core(
1623 builders_or_callback,
1624 seen_tees,
1625 built_tees,
1626 next_stmt_id,
1627 fold_hooked_idents,
1628 );
1629
1630 match builders_or_callback {
1631 BuildersOrCallback::Builders(graph_builders) => {
1632 graph_builders
1633 .get_dfir_mut(&input.metadata().location_id)
1634 .add_dfir(
1635 parse_quote! {
1636 #input_ident -> dest_sink(#sink);
1637 },
1638 None,
1639 Some(&next_stmt_id.to_string()),
1640 );
1641 }
1642 BuildersOrCallback::Callback(leaf_callback, _) => {
1643 leaf_callback(self, next_stmt_id);
1644 }
1645 }
1646
1647 let _ = next_stmt_id.get_and_increment();
1648 }
1649
1650 HydroRoot::CycleSink {
1651 cycle_id, input, ..
1652 } => {
1653 let input_ident = input.emit_core(
1654 builders_or_callback,
1655 seen_tees,
1656 built_tees,
1657 next_stmt_id,
1658 fold_hooked_idents,
1659 );
1660
1661 match builders_or_callback {
1662 BuildersOrCallback::Builders(graph_builders) => {
1663 let elem_type: syn::Type = match &input.metadata().collection_kind {
1664 CollectionKind::KeyedSingleton {
1665 key_type,
1666 value_type,
1667 ..
1668 }
1669 | CollectionKind::KeyedStream {
1670 key_type,
1671 value_type,
1672 ..
1673 } => {
1674 parse_quote!((#key_type, #value_type))
1675 }
1676 CollectionKind::Stream { element_type, .. }
1677 | CollectionKind::Singleton { element_type, .. }
1678 | CollectionKind::Optional { element_type, .. } => {
1679 parse_quote!(#element_type)
1680 }
1681 };
1682
1683 let cycle_id_ident = cycle_id.as_ident();
1684 graph_builders
1685 .get_dfir_mut(&input.metadata().location_id)
1686 .add_dfir(
1687 parse_quote! {
1688 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1689 },
1690 None,
1691 None,
1692 );
1693 }
1694 BuildersOrCallback::Callback(_, _) => {}
1696 }
1697 }
1698
1699 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1700 let input_ident = input.emit_core(
1701 builders_or_callback,
1702 seen_tees,
1703 built_tees,
1704 next_stmt_id,
1705 fold_hooked_idents,
1706 );
1707
1708 match builders_or_callback {
1709 BuildersOrCallback::Builders(graph_builders) => {
1710 graph_builders
1711 .get_dfir_mut(&input.metadata().location_id)
1712 .add_dfir(
1713 parse_quote! {
1714 #input_ident -> for_each(&mut #ident);
1715 },
1716 None,
1717 Some(&next_stmt_id.to_string()),
1718 );
1719 }
1720 BuildersOrCallback::Callback(leaf_callback, _) => {
1721 leaf_callback(self, next_stmt_id);
1722 }
1723 }
1724
1725 let _ = next_stmt_id.get_and_increment();
1726 }
1727
1728 HydroRoot::Null { input, .. } => {
1729 let input_ident = input.emit_core(
1730 builders_or_callback,
1731 seen_tees,
1732 built_tees,
1733 next_stmt_id,
1734 fold_hooked_idents,
1735 );
1736
1737 match builders_or_callback {
1738 BuildersOrCallback::Builders(graph_builders) => {
1739 graph_builders
1740 .get_dfir_mut(&input.metadata().location_id)
1741 .add_dfir(
1742 parse_quote! {
1743 #input_ident -> for_each(|_| {});
1744 },
1745 None,
1746 Some(&next_stmt_id.to_string()),
1747 );
1748 }
1749 BuildersOrCallback::Callback(leaf_callback, _) => {
1750 leaf_callback(self, next_stmt_id);
1751 }
1752 }
1753
1754 let _ = next_stmt_id.get_and_increment();
1755 }
1756 }
1757 }
1758
1759 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1760 match self {
1761 HydroRoot::ForEach { op_metadata, .. }
1762 | HydroRoot::SendExternal { op_metadata, .. }
1763 | HydroRoot::DestSink { op_metadata, .. }
1764 | HydroRoot::CycleSink { op_metadata, .. }
1765 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1766 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1767 }
1768 }
1769
1770 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1771 match self {
1772 HydroRoot::ForEach { op_metadata, .. }
1773 | HydroRoot::SendExternal { op_metadata, .. }
1774 | HydroRoot::DestSink { op_metadata, .. }
1775 | HydroRoot::CycleSink { op_metadata, .. }
1776 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1777 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1778 }
1779 }
1780
1781 pub fn input(&self) -> &HydroNode {
1782 match self {
1783 HydroRoot::ForEach { input, .. }
1784 | HydroRoot::SendExternal { input, .. }
1785 | HydroRoot::DestSink { input, .. }
1786 | HydroRoot::CycleSink { input, .. }
1787 | HydroRoot::EmbeddedOutput { input, .. }
1788 | HydroRoot::Null { input, .. } => input,
1789 }
1790 }
1791
1792 pub fn input_metadata(&self) -> &HydroIrMetadata {
1793 self.input().metadata()
1794 }
1795
1796 pub fn print_root(&self) -> String {
1797 match self {
1798 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1799 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1800 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1801 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1802 HydroRoot::EmbeddedOutput { ident, .. } => {
1803 format!("EmbeddedOutput({})", ident)
1804 }
1805 HydroRoot::Null { .. } => "Null".to_owned(),
1806 }
1807 }
1808
1809 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1810 match self {
1811 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1812 transform(f);
1813 }
1814 HydroRoot::SendExternal { .. }
1815 | HydroRoot::CycleSink { .. }
1816 | HydroRoot::EmbeddedOutput { .. }
1817 | HydroRoot::Null { .. } => {}
1818 }
1819 }
1820}
1821
1822#[cfg(feature = "build")]
1823fn tick_of(loc: &LocationId) -> Option<ClockId> {
1824 match loc {
1825 LocationId::Tick(id, _) => Some(*id),
1826 LocationId::Atomic(inner) => tick_of(inner),
1827 _ => None,
1828 }
1829}
1830
1831#[cfg(feature = "build")]
1832fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1833 match loc {
1834 LocationId::Tick(id, inner) => {
1835 *id = uf_find(uf, *id);
1836 remap_location(inner, uf);
1837 }
1838 LocationId::Atomic(inner) => {
1839 remap_location(inner, uf);
1840 }
1841 LocationId::Process(_) | LocationId::Cluster(_) => {}
1842 }
1843}
1844
1845#[cfg(feature = "build")]
1846fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1847 let p = *parent.get(&x).unwrap_or(&x);
1848 if p == x {
1849 return x;
1850 }
1851 let root = uf_find(parent, p);
1852 parent.insert(x, root);
1853 root
1854}
1855
1856#[cfg(feature = "build")]
1857fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1858 let ra = uf_find(parent, a);
1859 let rb = uf_find(parent, b);
1860 if ra != rb {
1861 parent.insert(ra, rb);
1862 }
1863}
1864
1865#[cfg(feature = "build")]
1869pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1870 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1871
1872 transform_bottom_up(
1874 ir,
1875 &mut |_| {},
1876 &mut |node: &mut HydroNode| match node {
1877 HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1878 if let (Some(a), Some(b)) = (
1879 tick_of(&inner.metadata().location_id),
1880 tick_of(&metadata.location_id),
1881 ) {
1882 uf_union(&mut uf, a, b);
1883 }
1884 }
1885 HydroNode::Chain {
1886 first,
1887 second,
1888 metadata,
1889 }
1890 | HydroNode::ChainFirst {
1891 first,
1892 second,
1893 metadata,
1894 }
1895 | HydroNode::MergeOrdered {
1896 first,
1897 second,
1898 metadata,
1899 } => {
1900 if let (Some(a), Some(b)) = (
1901 tick_of(&first.metadata().location_id),
1902 tick_of(&metadata.location_id),
1903 ) {
1904 uf_union(&mut uf, a, b);
1905 }
1906 if let (Some(a), Some(b)) = (
1907 tick_of(&second.metadata().location_id),
1908 tick_of(&metadata.location_id),
1909 ) {
1910 uf_union(&mut uf, a, b);
1911 }
1912 }
1913 _ => {}
1914 },
1915 false,
1916 );
1917
1918 transform_bottom_up(
1920 ir,
1921 &mut |_| {},
1922 &mut |node: &mut HydroNode| {
1923 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1924 },
1925 false,
1926 );
1927}
1928
1929#[cfg(feature = "build")]
1930pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1931 let mut builders = SecondaryMap::new();
1932 let mut seen_tees = HashMap::new();
1933 let mut built_tees = HashMap::new();
1934 let mut next_stmt_id = StmtId::default();
1935 let mut fold_hooked_idents = HashSet::new();
1936 for leaf in ir {
1937 leaf.emit(
1938 &mut builders,
1939 &mut seen_tees,
1940 &mut built_tees,
1941 &mut next_stmt_id,
1942 &mut fold_hooked_idents,
1943 );
1944 }
1945 builders
1946}
1947
1948#[cfg(feature = "build")]
1949pub fn traverse_dfir(
1950 ir: &mut [HydroRoot],
1951 transform_root: impl FnMut(&mut HydroRoot, &mut StmtId),
1952 transform_node: impl FnMut(&mut HydroNode, &mut StmtId),
1953) {
1954 let mut seen_tees = HashMap::new();
1955 let mut built_tees = HashMap::new();
1956 let mut next_stmt_id = StmtId::default();
1957 let mut fold_hooked_idents = HashSet::new();
1958 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1959 ir.iter_mut().for_each(|leaf| {
1960 leaf.emit_core(
1961 &mut callback,
1962 &mut seen_tees,
1963 &mut built_tees,
1964 &mut next_stmt_id,
1965 &mut fold_hooked_idents,
1966 );
1967 });
1968}
1969
1970pub fn transform_bottom_up(
1971 ir: &mut [HydroRoot],
1972 transform_root: &mut impl FnMut(&mut HydroRoot),
1973 transform_node: &mut impl FnMut(&mut HydroNode),
1974 check_well_formed: bool,
1975) {
1976 let mut seen_tees = HashMap::new();
1977 ir.iter_mut().for_each(|leaf| {
1978 leaf.transform_bottom_up(
1979 transform_root,
1980 transform_node,
1981 &mut seen_tees,
1982 check_well_formed,
1983 );
1984 });
1985}
1986
1987pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1988 let mut seen_tees = HashMap::new();
1989 ir.iter()
1990 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1991 .collect()
1992}
1993
1994type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1995thread_local! {
1996 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1997 static SERIALIZED_SHARED: PrintedTees
2001 = const { RefCell::new(None) };
2002}
2003
2004pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
2005 PRINTED_TEES.with(|printed_tees| {
2006 let mut printed_tees_mut = printed_tees.borrow_mut();
2007 *printed_tees_mut = Some((0, HashMap::new()));
2008 drop(printed_tees_mut);
2009
2010 let ret = f();
2011
2012 let mut printed_tees_mut = printed_tees.borrow_mut();
2013 *printed_tees_mut = None;
2014
2015 ret
2016 })
2017}
2018
2019pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
2024 let _guard = SerializedSharedGuard::enter();
2025 f()
2026}
2027
2028struct SerializedSharedGuard {
2031 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
2032}
2033
2034impl SerializedSharedGuard {
2035 fn enter() -> Self {
2036 let previous = SERIALIZED_SHARED.with(|cell| {
2037 let mut guard = cell.borrow_mut();
2038 guard.replace((0, HashMap::new()))
2039 });
2040 Self { previous }
2041 }
2042}
2043
2044impl Drop for SerializedSharedGuard {
2045 fn drop(&mut self) {
2046 SERIALIZED_SHARED.with(|cell| {
2047 *cell.borrow_mut() = self.previous.take();
2048 });
2049 }
2050}
2051
2052pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2053
2054impl serde::Serialize for SharedNode {
2055 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2066 SERIALIZED_SHARED.with(|cell| {
2067 let mut guard = cell.borrow_mut();
2068 let state = guard.as_mut().ok_or_else(|| {
2070 serde::ser::Error::custom(
2071 "SharedNode serialization requires an active serialize_dedup_shared scope",
2072 )
2073 })?;
2074 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2075
2076 if let Some(&id) = state.1.get(&ptr) {
2077 drop(guard);
2078 use serde::ser::SerializeMap;
2079 let mut map = serializer.serialize_map(Some(1))?;
2080 map.serialize_entry("$shared_ref", &id)?;
2081 map.end()
2082 } else {
2083 let id = state.0;
2084 state.0 += 1;
2085 state.1.insert(ptr, id);
2086 drop(guard);
2087
2088 use serde::ser::SerializeMap;
2089 let mut map = serializer.serialize_map(Some(2))?;
2090 map.serialize_entry("$shared", &id)?;
2091 map.serialize_entry("node", &*self.0.borrow())?;
2092 map.end()
2093 }
2094 })
2095 }
2096}
2097
2098impl SharedNode {
2099 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2100 Rc::as_ptr(&self.0)
2101 }
2102}
2103
2104impl Debug for SharedNode {
2105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2106 PRINTED_TEES.with(|printed_tees| {
2107 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2108 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2109
2110 if let Some(printed_tees_mut) = printed_tees_mut {
2111 if let Some(existing) = printed_tees_mut
2112 .1
2113 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2114 {
2115 write!(f, "<shared {}>", existing)
2116 } else {
2117 let next_id = printed_tees_mut.0;
2118 printed_tees_mut.0 += 1;
2119 printed_tees_mut
2120 .1
2121 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2122 drop(printed_tees_mut_borrow);
2123 write!(f, "<shared {}>: ", next_id)?;
2124 Debug::fmt(&self.0.borrow(), f)
2125 }
2126 } else {
2127 drop(printed_tees_mut_borrow);
2128 write!(f, "<shared>: ")?;
2129 Debug::fmt(&self.0.borrow(), f)
2130 }
2131 })
2132 }
2133}
2134
2135impl Hash for SharedNode {
2136 fn hash<H: Hasher>(&self, state: &mut H) {
2137 self.0.borrow_mut().hash(state);
2138 }
2139}
2140
2141#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2142pub enum BoundKind {
2143 Unbounded,
2144 Bounded,
2145}
2146
2147#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2148pub enum StreamOrder {
2149 NoOrder,
2150 TotalOrder,
2151}
2152
2153#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2154pub enum StreamRetry {
2155 AtLeastOnce,
2156 ExactlyOnce,
2157}
2158
2159#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2160pub enum KeyedSingletonBoundKind {
2161 Unbounded,
2162 MonotonicValue,
2163 BoundedValue,
2164 Bounded,
2165}
2166
2167#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2168pub enum SingletonBoundKind {
2169 Unbounded,
2170 Monotonic,
2171 Bounded,
2172}
2173
2174#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2175pub enum CollectionKind {
2176 Stream {
2177 bound: BoundKind,
2178 order: StreamOrder,
2179 retry: StreamRetry,
2180 element_type: DebugType,
2181 },
2182 Singleton {
2183 bound: SingletonBoundKind,
2184 element_type: DebugType,
2185 },
2186 Optional {
2187 bound: BoundKind,
2188 element_type: DebugType,
2189 },
2190 KeyedStream {
2191 bound: BoundKind,
2192 value_order: StreamOrder,
2193 value_retry: StreamRetry,
2194 key_type: DebugType,
2195 value_type: DebugType,
2196 },
2197 KeyedSingleton {
2198 bound: KeyedSingletonBoundKind,
2199 key_type: DebugType,
2200 value_type: DebugType,
2201 },
2202}
2203
2204impl CollectionKind {
2205 pub fn is_bounded(&self) -> bool {
2206 matches!(
2207 self,
2208 CollectionKind::Stream {
2209 bound: BoundKind::Bounded,
2210 ..
2211 } | CollectionKind::Singleton {
2212 bound: SingletonBoundKind::Bounded,
2213 ..
2214 } | CollectionKind::Optional {
2215 bound: BoundKind::Bounded,
2216 ..
2217 } | CollectionKind::KeyedStream {
2218 bound: BoundKind::Bounded,
2219 ..
2220 } | CollectionKind::KeyedSingleton {
2221 bound: KeyedSingletonBoundKind::Bounded,
2222 ..
2223 }
2224 )
2225 }
2226}
2227
2228#[derive(Clone, serde::Serialize)]
2229pub struct HydroIrMetadata {
2230 pub location_id: LocationId,
2231 pub collection_kind: CollectionKind,
2232 pub consistency: Option<ClusterConsistency>,
2233 pub cardinality: Option<usize>,
2234 pub tag: Option<String>,
2235 pub op: HydroIrOpMetadata,
2236}
2237
2238impl Hash for HydroIrMetadata {
2240 fn hash<H: Hasher>(&self, _: &mut H) {}
2241}
2242
2243impl PartialEq for HydroIrMetadata {
2244 fn eq(&self, _: &Self) -> bool {
2245 true
2246 }
2247}
2248
2249impl Eq for HydroIrMetadata {}
2250
2251impl Debug for HydroIrMetadata {
2252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2253 f.debug_struct("HydroIrMetadata")
2254 .field("location_id", &self.location_id)
2255 .field("collection_kind", &self.collection_kind)
2256 .finish()
2257 }
2258}
2259
2260#[derive(Clone, serde::Serialize)]
2263pub struct HydroIrOpMetadata {
2264 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2265 pub backtrace: Backtrace,
2266 pub cpu_usage: Option<f64>,
2267 pub network_recv_cpu_usage: Option<f64>,
2268 pub id: Option<usize>,
2269}
2270
2271impl HydroIrOpMetadata {
2272 #[expect(
2273 clippy::new_without_default,
2274 reason = "explicit calls to new ensure correct backtrace bounds"
2275 )]
2276 pub fn new() -> HydroIrOpMetadata {
2277 Self::new_with_skip(1)
2278 }
2279
2280 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2281 HydroIrOpMetadata {
2282 backtrace: Backtrace::get_backtrace(2 + skip_count),
2283 cpu_usage: None,
2284 network_recv_cpu_usage: None,
2285 id: None,
2286 }
2287 }
2288}
2289
2290impl Debug for HydroIrOpMetadata {
2291 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2292 f.debug_struct("HydroIrOpMetadata").finish()
2293 }
2294}
2295
2296impl Hash for HydroIrOpMetadata {
2297 fn hash<H: Hasher>(&self, _: &mut H) {}
2298}
2299
2300#[derive(Debug, Hash, serde::Serialize)]
2303pub enum HydroNode {
2304 Placeholder,
2305
2306 Cast {
2314 inner: Box<HydroNode>,
2315 metadata: HydroIrMetadata,
2316 },
2317
2318 ObserveNonDet {
2324 inner: Box<HydroNode>,
2325 trusted: bool, metadata: HydroIrMetadata,
2327 },
2328
2329 Source {
2330 source: HydroSource,
2331 metadata: HydroIrMetadata,
2332 },
2333
2334 SingletonSource {
2335 value: DebugExpr,
2336 first_tick_only: bool,
2337 metadata: HydroIrMetadata,
2338 },
2339
2340 CycleSource {
2341 cycle_id: CycleId,
2342 metadata: HydroIrMetadata,
2343 },
2344
2345 Tee {
2346 inner: SharedNode,
2347 metadata: HydroIrMetadata,
2348 },
2349
2350 Singleton {
2358 inner: SharedNode,
2359 metadata: HydroIrMetadata,
2360 },
2361
2362 Partition {
2363 inner: SharedNode,
2364 f: ClosureExpr,
2365 is_true: bool,
2366 metadata: HydroIrMetadata,
2367 },
2368
2369 BeginAtomic {
2370 inner: Box<HydroNode>,
2371 metadata: HydroIrMetadata,
2372 },
2373
2374 EndAtomic {
2375 inner: Box<HydroNode>,
2376 metadata: HydroIrMetadata,
2377 },
2378
2379 Batch {
2380 inner: Box<HydroNode>,
2381 metadata: HydroIrMetadata,
2382 },
2383
2384 YieldConcat {
2385 inner: Box<HydroNode>,
2386 metadata: HydroIrMetadata,
2387 },
2388
2389 Chain {
2390 first: Box<HydroNode>,
2391 second: Box<HydroNode>,
2392 metadata: HydroIrMetadata,
2393 },
2394
2395 MergeOrdered {
2396 first: Box<HydroNode>,
2397 second: Box<HydroNode>,
2398 metadata: HydroIrMetadata,
2399 },
2400
2401 ChainFirst {
2402 first: Box<HydroNode>,
2403 second: Box<HydroNode>,
2404 metadata: HydroIrMetadata,
2405 },
2406
2407 CrossProduct {
2408 left: Box<HydroNode>,
2409 right: Box<HydroNode>,
2410 metadata: HydroIrMetadata,
2411 },
2412
2413 CrossSingleton {
2414 left: Box<HydroNode>,
2415 right: Box<HydroNode>,
2416 metadata: HydroIrMetadata,
2417 },
2418
2419 Join {
2420 left: Box<HydroNode>,
2421 right: Box<HydroNode>,
2422 metadata: HydroIrMetadata,
2423 },
2424
2425 JoinHalf {
2429 left: Box<HydroNode>,
2430 right: Box<HydroNode>,
2431 metadata: HydroIrMetadata,
2432 },
2433
2434 Difference {
2435 pos: Box<HydroNode>,
2436 neg: Box<HydroNode>,
2437 metadata: HydroIrMetadata,
2438 },
2439
2440 AntiJoin {
2441 pos: Box<HydroNode>,
2442 neg: Box<HydroNode>,
2443 metadata: HydroIrMetadata,
2444 },
2445
2446 ResolveFutures {
2447 input: Box<HydroNode>,
2448 metadata: HydroIrMetadata,
2449 },
2450 ResolveFuturesBlocking {
2451 input: Box<HydroNode>,
2452 metadata: HydroIrMetadata,
2453 },
2454 ResolveFuturesOrdered {
2455 input: Box<HydroNode>,
2456 metadata: HydroIrMetadata,
2457 },
2458
2459 Map {
2460 f: ClosureExpr,
2461 input: Box<HydroNode>,
2462 metadata: HydroIrMetadata,
2463 },
2464 FlatMap {
2465 f: ClosureExpr,
2466 input: Box<HydroNode>,
2467 metadata: HydroIrMetadata,
2468 },
2469 FlatMapStreamBlocking {
2470 f: ClosureExpr,
2471 input: Box<HydroNode>,
2472 metadata: HydroIrMetadata,
2473 },
2474 Filter {
2475 f: ClosureExpr,
2476 input: Box<HydroNode>,
2477 metadata: HydroIrMetadata,
2478 },
2479 FilterMap {
2480 f: ClosureExpr,
2481 input: Box<HydroNode>,
2482 metadata: HydroIrMetadata,
2483 },
2484
2485 DeferTick {
2486 input: Box<HydroNode>,
2487 metadata: HydroIrMetadata,
2488 },
2489 Enumerate {
2490 input: Box<HydroNode>,
2491 metadata: HydroIrMetadata,
2492 },
2493 Inspect {
2494 f: ClosureExpr,
2495 input: Box<HydroNode>,
2496 metadata: HydroIrMetadata,
2497 },
2498
2499 Unique {
2500 input: Box<HydroNode>,
2501 metadata: HydroIrMetadata,
2502 },
2503
2504 Sort {
2505 input: Box<HydroNode>,
2506 metadata: HydroIrMetadata,
2507 },
2508 Fold {
2509 init: ClosureExpr,
2510 acc: ClosureExpr,
2511 input: Box<HydroNode>,
2512 metadata: HydroIrMetadata,
2513 },
2514
2515 Scan {
2516 init: ClosureExpr,
2517 acc: ClosureExpr,
2518 input: Box<HydroNode>,
2519 metadata: HydroIrMetadata,
2520 },
2521 ScanAsyncBlocking {
2522 init: ClosureExpr,
2523 acc: ClosureExpr,
2524 input: Box<HydroNode>,
2525 metadata: HydroIrMetadata,
2526 },
2527 FoldKeyed {
2528 init: ClosureExpr,
2529 acc: ClosureExpr,
2530 input: Box<HydroNode>,
2531 metadata: HydroIrMetadata,
2532 },
2533
2534 Reduce {
2535 f: ClosureExpr,
2536 input: Box<HydroNode>,
2537 metadata: HydroIrMetadata,
2538 },
2539 ReduceKeyed {
2540 f: ClosureExpr,
2541 input: Box<HydroNode>,
2542 metadata: HydroIrMetadata,
2543 },
2544 ReduceKeyedWatermark {
2545 f: ClosureExpr,
2546 input: Box<HydroNode>,
2547 watermark: Box<HydroNode>,
2548 metadata: HydroIrMetadata,
2549 },
2550
2551 Network {
2552 name: Option<String>,
2553 networking_info: crate::networking::NetworkingInfo,
2554 serialize_fn: Option<DebugExpr>,
2555 instantiate_fn: DebugInstantiate,
2556 deserialize_fn: Option<DebugExpr>,
2557 input: Box<HydroNode>,
2558 metadata: HydroIrMetadata,
2559 },
2560
2561 ExternalInput {
2562 from_external_key: LocationKey,
2563 from_port_id: ExternalPortId,
2564 from_many: bool,
2565 codec_type: DebugType,
2566 #[serde(skip)]
2567 port_hint: NetworkHint,
2568 instantiate_fn: DebugInstantiate,
2569 deserialize_fn: Option<DebugExpr>,
2570 metadata: HydroIrMetadata,
2571 },
2572
2573 Counter {
2574 tag: String,
2575 duration: DebugExpr,
2576 prefix: String,
2577 input: Box<HydroNode>,
2578 metadata: HydroIrMetadata,
2579 },
2580
2581 AssertIsConsistent {
2582 inner: Box<HydroNode>,
2583 trusted: bool,
2584 metadata: HydroIrMetadata,
2585 },
2586
2587 UnboundSingleton {
2588 inner: Box<HydroNode>,
2589 metadata: HydroIrMetadata,
2590 },
2591}
2592
2593pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2594pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2595
2596impl HydroNode {
2597 pub fn transform_bottom_up(
2598 &mut self,
2599 transform: &mut impl FnMut(&mut HydroNode),
2600 seen_tees: &mut SeenSharedNodes,
2601 check_well_formed: bool,
2602 ) {
2603 self.transform_children(
2604 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2605 seen_tees,
2606 );
2607
2608 transform(self);
2609
2610 let self_location = self.metadata().location_id.root();
2611
2612 if check_well_formed {
2613 match &*self {
2614 HydroNode::Network { .. } => {}
2615 _ => {
2616 self.input_metadata().iter().for_each(|i| {
2617 if i.location_id.root() != self_location {
2618 panic!(
2619 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2620 i,
2621 i.location_id.root(),
2622 self,
2623 self_location
2624 )
2625 }
2626 });
2627 }
2628 }
2629 }
2630 }
2631
2632 #[inline(always)]
2633 pub fn transform_children(
2634 &mut self,
2635 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2636 seen_tees: &mut SeenSharedNodes,
2637 ) {
2638 match self {
2639 HydroNode::Placeholder => {
2640 panic!();
2641 }
2642
2643 HydroNode::Source { .. }
2644 | HydroNode::SingletonSource { .. }
2645 | HydroNode::CycleSource { .. }
2646 | HydroNode::ExternalInput { .. } => {}
2647
2648 HydroNode::Tee { inner, .. } | HydroNode::Singleton { inner, .. } => {
2649 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2650 *inner = SharedNode(transformed.clone());
2651 } else {
2652 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2653 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2654 let mut orig = inner.0.replace(HydroNode::Placeholder);
2655 transform(&mut orig, seen_tees);
2656 *transformed_cell.borrow_mut() = orig;
2657 *inner = SharedNode(transformed_cell);
2658 }
2659 }
2660
2661 HydroNode::Partition { inner, f, .. } => {
2662 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2663 *inner = SharedNode(transformed.clone());
2664 } else {
2665 f.transform_children(&mut transform, seen_tees);
2666 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2667 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2668 let mut orig = inner.0.replace(HydroNode::Placeholder);
2669 transform(&mut orig, seen_tees);
2670 *transformed_cell.borrow_mut() = orig;
2671 *inner = SharedNode(transformed_cell);
2672 }
2673 }
2674
2675 HydroNode::Cast { inner, .. }
2676 | HydroNode::ObserveNonDet { inner, .. }
2677 | HydroNode::BeginAtomic { inner, .. }
2678 | HydroNode::EndAtomic { inner, .. }
2679 | HydroNode::Batch { inner, .. }
2680 | HydroNode::YieldConcat { inner, .. }
2681 | HydroNode::UnboundSingleton { inner, .. }
2682 | HydroNode::AssertIsConsistent { inner, .. } => {
2683 transform(inner.as_mut(), seen_tees);
2684 }
2685
2686 HydroNode::Chain { first, second, .. } => {
2687 transform(first.as_mut(), seen_tees);
2688 transform(second.as_mut(), seen_tees);
2689 }
2690
2691 HydroNode::MergeOrdered { first, second, .. } => {
2692 transform(first.as_mut(), seen_tees);
2693 transform(second.as_mut(), seen_tees);
2694 }
2695
2696 HydroNode::ChainFirst { first, second, .. } => {
2697 transform(first.as_mut(), seen_tees);
2698 transform(second.as_mut(), seen_tees);
2699 }
2700
2701 HydroNode::CrossSingleton { left, right, .. }
2702 | HydroNode::CrossProduct { left, right, .. }
2703 | HydroNode::Join { left, right, .. }
2704 | HydroNode::JoinHalf { left, right, .. } => {
2705 transform(left.as_mut(), seen_tees);
2706 transform(right.as_mut(), seen_tees);
2707 }
2708
2709 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2710 transform(pos.as_mut(), seen_tees);
2711 transform(neg.as_mut(), seen_tees);
2712 }
2713
2714 HydroNode::Map { f, input, .. } => {
2715 f.transform_children(&mut transform, seen_tees);
2716 transform(input.as_mut(), seen_tees);
2717 }
2718 HydroNode::FlatMap { f, input, .. }
2719 | HydroNode::FlatMapStreamBlocking { f, input, .. }
2720 | HydroNode::Filter { f, input, .. }
2721 | HydroNode::FilterMap { f, input, .. }
2722 | HydroNode::Inspect { f, input, .. }
2723 | HydroNode::Reduce { f, input, .. }
2724 | HydroNode::ReduceKeyed { f, input, .. } => {
2725 f.transform_children(&mut transform, seen_tees);
2726 transform(input.as_mut(), seen_tees);
2727 }
2728 HydroNode::ReduceKeyedWatermark {
2729 f,
2730 input,
2731 watermark,
2732 ..
2733 } => {
2734 f.transform_children(&mut transform, seen_tees);
2735 transform(input.as_mut(), seen_tees);
2736 transform(watermark.as_mut(), seen_tees);
2737 }
2738 HydroNode::Fold {
2739 init, acc, input, ..
2740 }
2741 | HydroNode::Scan {
2742 init, acc, input, ..
2743 }
2744 | HydroNode::ScanAsyncBlocking {
2745 init, acc, input, ..
2746 }
2747 | HydroNode::FoldKeyed {
2748 init, acc, input, ..
2749 } => {
2750 init.transform_children(&mut transform, seen_tees);
2751 acc.transform_children(&mut transform, seen_tees);
2752 transform(input.as_mut(), seen_tees);
2753 }
2754 HydroNode::ResolveFutures { input, .. }
2755 | HydroNode::ResolveFuturesBlocking { input, .. }
2756 | HydroNode::ResolveFuturesOrdered { input, .. }
2757 | HydroNode::Sort { input, .. }
2758 | HydroNode::DeferTick { input, .. }
2759 | HydroNode::Enumerate { input, .. }
2760 | HydroNode::Unique { input, .. }
2761 | HydroNode::Network { input, .. }
2762 | HydroNode::Counter { input, .. } => {
2763 transform(input.as_mut(), seen_tees);
2764 }
2765 }
2766 }
2767
2768 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2769 match self {
2770 HydroNode::Placeholder => HydroNode::Placeholder,
2771 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2772 inner: Box::new(inner.deep_clone(seen_tees)),
2773 metadata: metadata.clone(),
2774 },
2775 HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2776 inner: Box::new(inner.deep_clone(seen_tees)),
2777 metadata: metadata.clone(),
2778 },
2779 HydroNode::ObserveNonDet {
2780 inner,
2781 trusted,
2782 metadata,
2783 } => HydroNode::ObserveNonDet {
2784 inner: Box::new(inner.deep_clone(seen_tees)),
2785 trusted: *trusted,
2786 metadata: metadata.clone(),
2787 },
2788 HydroNode::AssertIsConsistent {
2789 inner,
2790 trusted,
2791 metadata,
2792 } => HydroNode::AssertIsConsistent {
2793 inner: Box::new(inner.deep_clone(seen_tees)),
2794 trusted: *trusted,
2795 metadata: metadata.clone(),
2796 },
2797 HydroNode::Source { source, metadata } => HydroNode::Source {
2798 source: source.clone(),
2799 metadata: metadata.clone(),
2800 },
2801 HydroNode::SingletonSource {
2802 value,
2803 first_tick_only,
2804 metadata,
2805 } => HydroNode::SingletonSource {
2806 value: value.clone(),
2807 first_tick_only: *first_tick_only,
2808 metadata: metadata.clone(),
2809 },
2810 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2811 cycle_id: *cycle_id,
2812 metadata: metadata.clone(),
2813 },
2814 HydroNode::Tee { inner, metadata } | HydroNode::Singleton { inner, metadata } => {
2815 let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2816 SharedNode(transformed.clone())
2817 } else {
2818 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2819 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2820 let cloned = inner.0.borrow().deep_clone(seen_tees);
2821 *new_rc.borrow_mut() = cloned;
2822 SharedNode(new_rc)
2823 };
2824 if matches!(self, HydroNode::Singleton { .. }) {
2825 HydroNode::Singleton {
2826 inner: cloned_inner,
2827 metadata: metadata.clone(),
2828 }
2829 } else {
2830 HydroNode::Tee {
2831 inner: cloned_inner,
2832 metadata: metadata.clone(),
2833 }
2834 }
2835 }
2836 HydroNode::Partition {
2837 inner,
2838 f,
2839 is_true,
2840 metadata,
2841 } => {
2842 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2843 HydroNode::Partition {
2844 inner: SharedNode(transformed.clone()),
2845 f: f.deep_clone(seen_tees),
2846 is_true: *is_true,
2847 metadata: metadata.clone(),
2848 }
2849 } else {
2850 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2851 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2852 let cloned = inner.0.borrow().deep_clone(seen_tees);
2853 *new_rc.borrow_mut() = cloned;
2854 HydroNode::Partition {
2855 inner: SharedNode(new_rc),
2856 f: f.deep_clone(seen_tees),
2857 is_true: *is_true,
2858 metadata: metadata.clone(),
2859 }
2860 }
2861 }
2862 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2863 inner: Box::new(inner.deep_clone(seen_tees)),
2864 metadata: metadata.clone(),
2865 },
2866 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2867 inner: Box::new(inner.deep_clone(seen_tees)),
2868 metadata: metadata.clone(),
2869 },
2870 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2871 inner: Box::new(inner.deep_clone(seen_tees)),
2872 metadata: metadata.clone(),
2873 },
2874 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2875 inner: Box::new(inner.deep_clone(seen_tees)),
2876 metadata: metadata.clone(),
2877 },
2878 HydroNode::Chain {
2879 first,
2880 second,
2881 metadata,
2882 } => HydroNode::Chain {
2883 first: Box::new(first.deep_clone(seen_tees)),
2884 second: Box::new(second.deep_clone(seen_tees)),
2885 metadata: metadata.clone(),
2886 },
2887 HydroNode::MergeOrdered {
2888 first,
2889 second,
2890 metadata,
2891 } => HydroNode::MergeOrdered {
2892 first: Box::new(first.deep_clone(seen_tees)),
2893 second: Box::new(second.deep_clone(seen_tees)),
2894 metadata: metadata.clone(),
2895 },
2896 HydroNode::ChainFirst {
2897 first,
2898 second,
2899 metadata,
2900 } => HydroNode::ChainFirst {
2901 first: Box::new(first.deep_clone(seen_tees)),
2902 second: Box::new(second.deep_clone(seen_tees)),
2903 metadata: metadata.clone(),
2904 },
2905 HydroNode::CrossProduct {
2906 left,
2907 right,
2908 metadata,
2909 } => HydroNode::CrossProduct {
2910 left: Box::new(left.deep_clone(seen_tees)),
2911 right: Box::new(right.deep_clone(seen_tees)),
2912 metadata: metadata.clone(),
2913 },
2914 HydroNode::CrossSingleton {
2915 left,
2916 right,
2917 metadata,
2918 } => HydroNode::CrossSingleton {
2919 left: Box::new(left.deep_clone(seen_tees)),
2920 right: Box::new(right.deep_clone(seen_tees)),
2921 metadata: metadata.clone(),
2922 },
2923 HydroNode::Join {
2924 left,
2925 right,
2926 metadata,
2927 } => HydroNode::Join {
2928 left: Box::new(left.deep_clone(seen_tees)),
2929 right: Box::new(right.deep_clone(seen_tees)),
2930 metadata: metadata.clone(),
2931 },
2932 HydroNode::JoinHalf {
2933 left,
2934 right,
2935 metadata,
2936 } => HydroNode::JoinHalf {
2937 left: Box::new(left.deep_clone(seen_tees)),
2938 right: Box::new(right.deep_clone(seen_tees)),
2939 metadata: metadata.clone(),
2940 },
2941 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2942 pos: Box::new(pos.deep_clone(seen_tees)),
2943 neg: Box::new(neg.deep_clone(seen_tees)),
2944 metadata: metadata.clone(),
2945 },
2946 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2947 pos: Box::new(pos.deep_clone(seen_tees)),
2948 neg: Box::new(neg.deep_clone(seen_tees)),
2949 metadata: metadata.clone(),
2950 },
2951 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2952 input: Box::new(input.deep_clone(seen_tees)),
2953 metadata: metadata.clone(),
2954 },
2955 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2956 HydroNode::ResolveFuturesBlocking {
2957 input: Box::new(input.deep_clone(seen_tees)),
2958 metadata: metadata.clone(),
2959 }
2960 }
2961 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2962 HydroNode::ResolveFuturesOrdered {
2963 input: Box::new(input.deep_clone(seen_tees)),
2964 metadata: metadata.clone(),
2965 }
2966 }
2967 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2968 f: f.deep_clone(seen_tees),
2969 input: Box::new(input.deep_clone(seen_tees)),
2970 metadata: metadata.clone(),
2971 },
2972 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2973 f: f.deep_clone(seen_tees),
2974 input: Box::new(input.deep_clone(seen_tees)),
2975 metadata: metadata.clone(),
2976 },
2977 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2978 HydroNode::FlatMapStreamBlocking {
2979 f: f.deep_clone(seen_tees),
2980 input: Box::new(input.deep_clone(seen_tees)),
2981 metadata: metadata.clone(),
2982 }
2983 }
2984 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2985 f: f.deep_clone(seen_tees),
2986 input: Box::new(input.deep_clone(seen_tees)),
2987 metadata: metadata.clone(),
2988 },
2989 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2990 f: f.deep_clone(seen_tees),
2991 input: Box::new(input.deep_clone(seen_tees)),
2992 metadata: metadata.clone(),
2993 },
2994 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2995 input: Box::new(input.deep_clone(seen_tees)),
2996 metadata: metadata.clone(),
2997 },
2998 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2999 input: Box::new(input.deep_clone(seen_tees)),
3000 metadata: metadata.clone(),
3001 },
3002 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
3003 f: f.deep_clone(seen_tees),
3004 input: Box::new(input.deep_clone(seen_tees)),
3005 metadata: metadata.clone(),
3006 },
3007 HydroNode::Unique { input, metadata } => HydroNode::Unique {
3008 input: Box::new(input.deep_clone(seen_tees)),
3009 metadata: metadata.clone(),
3010 },
3011 HydroNode::Sort { input, metadata } => HydroNode::Sort {
3012 input: Box::new(input.deep_clone(seen_tees)),
3013 metadata: metadata.clone(),
3014 },
3015 HydroNode::Fold {
3016 init,
3017 acc,
3018 input,
3019 metadata,
3020 } => HydroNode::Fold {
3021 init: init.deep_clone(seen_tees),
3022 acc: acc.deep_clone(seen_tees),
3023 input: Box::new(input.deep_clone(seen_tees)),
3024 metadata: metadata.clone(),
3025 },
3026 HydroNode::Scan {
3027 init,
3028 acc,
3029 input,
3030 metadata,
3031 } => HydroNode::Scan {
3032 init: init.deep_clone(seen_tees),
3033 acc: acc.deep_clone(seen_tees),
3034 input: Box::new(input.deep_clone(seen_tees)),
3035 metadata: metadata.clone(),
3036 },
3037 HydroNode::ScanAsyncBlocking {
3038 init,
3039 acc,
3040 input,
3041 metadata,
3042 } => HydroNode::ScanAsyncBlocking {
3043 init: init.deep_clone(seen_tees),
3044 acc: acc.deep_clone(seen_tees),
3045 input: Box::new(input.deep_clone(seen_tees)),
3046 metadata: metadata.clone(),
3047 },
3048 HydroNode::FoldKeyed {
3049 init,
3050 acc,
3051 input,
3052 metadata,
3053 } => HydroNode::FoldKeyed {
3054 init: init.deep_clone(seen_tees),
3055 acc: acc.deep_clone(seen_tees),
3056 input: Box::new(input.deep_clone(seen_tees)),
3057 metadata: metadata.clone(),
3058 },
3059 HydroNode::ReduceKeyedWatermark {
3060 f,
3061 input,
3062 watermark,
3063 metadata,
3064 } => HydroNode::ReduceKeyedWatermark {
3065 f: f.deep_clone(seen_tees),
3066 input: Box::new(input.deep_clone(seen_tees)),
3067 watermark: Box::new(watermark.deep_clone(seen_tees)),
3068 metadata: metadata.clone(),
3069 },
3070 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3071 f: f.deep_clone(seen_tees),
3072 input: Box::new(input.deep_clone(seen_tees)),
3073 metadata: metadata.clone(),
3074 },
3075 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3076 f: f.deep_clone(seen_tees),
3077 input: Box::new(input.deep_clone(seen_tees)),
3078 metadata: metadata.clone(),
3079 },
3080 HydroNode::Network {
3081 name,
3082 networking_info,
3083 serialize_fn,
3084 instantiate_fn,
3085 deserialize_fn,
3086 input,
3087 metadata,
3088 } => HydroNode::Network {
3089 name: name.clone(),
3090 networking_info: networking_info.clone(),
3091 serialize_fn: serialize_fn.clone(),
3092 instantiate_fn: instantiate_fn.clone(),
3093 deserialize_fn: deserialize_fn.clone(),
3094 input: Box::new(input.deep_clone(seen_tees)),
3095 metadata: metadata.clone(),
3096 },
3097 HydroNode::ExternalInput {
3098 from_external_key,
3099 from_port_id,
3100 from_many,
3101 codec_type,
3102 port_hint,
3103 instantiate_fn,
3104 deserialize_fn,
3105 metadata,
3106 } => HydroNode::ExternalInput {
3107 from_external_key: *from_external_key,
3108 from_port_id: *from_port_id,
3109 from_many: *from_many,
3110 codec_type: codec_type.clone(),
3111 port_hint: *port_hint,
3112 instantiate_fn: instantiate_fn.clone(),
3113 deserialize_fn: deserialize_fn.clone(),
3114 metadata: metadata.clone(),
3115 },
3116 HydroNode::Counter {
3117 tag,
3118 duration,
3119 prefix,
3120 input,
3121 metadata,
3122 } => HydroNode::Counter {
3123 tag: tag.clone(),
3124 duration: duration.clone(),
3125 prefix: prefix.clone(),
3126 input: Box::new(input.deep_clone(seen_tees)),
3127 metadata: metadata.clone(),
3128 },
3129 }
3130 }
3131
3132 #[cfg(feature = "build")]
3133 pub fn emit_core(
3134 &mut self,
3135 builders_or_callback: &mut BuildersOrCallback<
3136 impl FnMut(&mut HydroRoot, &mut StmtId),
3137 impl FnMut(&mut HydroNode, &mut StmtId),
3138 >,
3139 seen_tees: &mut SeenSharedNodes,
3140 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3141 next_stmt_id: &mut StmtId,
3142 fold_hooked_idents: &mut HashSet<String>,
3143 ) -> syn::Ident {
3144 let mut ident_stack: Vec<syn::Ident> = Vec::new();
3145 let mut singleton_access_counters: HashMap<*const RefCell<HydroNode>, u32> = HashMap::new();
3146
3147 self.transform_bottom_up(
3148 &mut |node: &mut HydroNode| {
3149 let out_location = node.metadata().location_id.clone();
3150 match node {
3151 HydroNode::Placeholder => {
3152 panic!()
3153 }
3154
3155 HydroNode::Cast { .. } => {
3156 match builders_or_callback {
3159 BuildersOrCallback::Builders(_) => {}
3160 BuildersOrCallback::Callback(_, node_callback) => {
3161 node_callback(node, next_stmt_id);
3162 }
3163 }
3164
3165 let _ = next_stmt_id.get_and_increment();
3166 }
3168
3169 HydroNode::UnboundSingleton { .. } => {
3170 let inner_ident = ident_stack.pop().unwrap();
3171
3172 let out_ident =
3173 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3174
3175 match builders_or_callback {
3176 BuildersOrCallback::Builders(graph_builders) => {
3177 if graph_builders.singleton_intermediates() {
3178 let builder = graph_builders.get_dfir_mut(&out_location);
3179 builder.add_dfir(
3180 parse_quote! {
3181 #out_ident = #inner_ident;
3182 },
3183 None,
3184 None,
3185 );
3186 } else {
3187 let builder = graph_builders.get_dfir_mut(&out_location);
3188 builder.add_dfir(
3189 parse_quote! {
3190 #out_ident = #inner_ident -> persist::<'static>();
3191 },
3192 None,
3193 None,
3194 );
3195 }
3196 }
3197 BuildersOrCallback::Callback(_, node_callback) => {
3198 node_callback(node, next_stmt_id);
3199 }
3200 }
3201
3202 let _ = next_stmt_id.get_and_increment();
3203
3204 ident_stack.push(out_ident);
3205 }
3206
3207 HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3208 let inner_ident = ident_stack.pop().unwrap();
3209
3210 let out_ident =
3211 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3212
3213 match builders_or_callback {
3214 BuildersOrCallback::Builders(graph_builders) => {
3215 graph_builders.assert_is_consistent(
3216 *trusted,
3217 &inner.metadata().location_id,
3218 inner_ident,
3219 &out_ident,
3220 );
3221 }
3222 BuildersOrCallback::Callback(_, node_callback) => {
3223 node_callback(node, next_stmt_id);
3224 }
3225 }
3226
3227 let _ = next_stmt_id.get_and_increment();
3228
3229 ident_stack.push(out_ident);
3230 }
3231
3232 HydroNode::ObserveNonDet {
3233 inner,
3234 trusted,
3235 metadata,
3236 ..
3237 } => {
3238 let inner_ident = ident_stack.pop().unwrap();
3239
3240 let observe_ident =
3241 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3242
3243 match builders_or_callback {
3244 BuildersOrCallback::Builders(graph_builders) => {
3245 graph_builders.observe_nondet(
3246 *trusted,
3247 &inner.metadata().location_id,
3248 inner_ident,
3249 &inner.metadata().collection_kind,
3250 &observe_ident,
3251 &metadata.collection_kind,
3252 &metadata.op,
3253 );
3254 }
3255 BuildersOrCallback::Callback(_, node_callback) => {
3256 node_callback(node, next_stmt_id);
3257 }
3258 }
3259
3260 let _ = next_stmt_id.get_and_increment();
3261
3262 ident_stack.push(observe_ident);
3263 }
3264
3265 HydroNode::Batch {
3266 inner, metadata, ..
3267 } => {
3268 let inner_ident = ident_stack.pop().unwrap();
3269
3270 let batch_ident =
3271 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3272
3273 match builders_or_callback {
3274 BuildersOrCallback::Builders(graph_builders) => {
3275 graph_builders.batch(
3276 inner_ident,
3277 &inner.metadata().location_id,
3278 &inner.metadata().collection_kind,
3279 &batch_ident,
3280 &out_location,
3281 &metadata.op,
3282 fold_hooked_idents,
3283 );
3284 }
3285 BuildersOrCallback::Callback(_, node_callback) => {
3286 node_callback(node, next_stmt_id);
3287 }
3288 }
3289
3290 let _ = next_stmt_id.get_and_increment();
3291
3292 ident_stack.push(batch_ident);
3293 }
3294
3295 HydroNode::YieldConcat { inner, .. } => {
3296 let inner_ident = ident_stack.pop().unwrap();
3297
3298 let yield_ident =
3299 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3300
3301 match builders_or_callback {
3302 BuildersOrCallback::Builders(graph_builders) => {
3303 graph_builders.yield_from_tick(
3304 inner_ident,
3305 &inner.metadata().location_id,
3306 &inner.metadata().collection_kind,
3307 &yield_ident,
3308 &out_location,
3309 );
3310 }
3311 BuildersOrCallback::Callback(_, node_callback) => {
3312 node_callback(node, next_stmt_id);
3313 }
3314 }
3315
3316 let _ = next_stmt_id.get_and_increment();
3317
3318 ident_stack.push(yield_ident);
3319 }
3320
3321 HydroNode::BeginAtomic { inner, metadata } => {
3322 let inner_ident = ident_stack.pop().unwrap();
3323
3324 let begin_ident =
3325 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3326
3327 match builders_or_callback {
3328 BuildersOrCallback::Builders(graph_builders) => {
3329 graph_builders.begin_atomic(
3330 inner_ident,
3331 &inner.metadata().location_id,
3332 &inner.metadata().collection_kind,
3333 &begin_ident,
3334 &out_location,
3335 &metadata.op,
3336 );
3337 }
3338 BuildersOrCallback::Callback(_, node_callback) => {
3339 node_callback(node, next_stmt_id);
3340 }
3341 }
3342
3343 let _ = next_stmt_id.get_and_increment();
3344
3345 ident_stack.push(begin_ident);
3346 }
3347
3348 HydroNode::EndAtomic { inner, .. } => {
3349 let inner_ident = ident_stack.pop().unwrap();
3350
3351 let end_ident =
3352 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3353
3354 match builders_or_callback {
3355 BuildersOrCallback::Builders(graph_builders) => {
3356 graph_builders.end_atomic(
3357 inner_ident,
3358 &inner.metadata().location_id,
3359 &inner.metadata().collection_kind,
3360 &end_ident,
3361 );
3362 }
3363 BuildersOrCallback::Callback(_, node_callback) => {
3364 node_callback(node, next_stmt_id);
3365 }
3366 }
3367
3368 let _ = next_stmt_id.get_and_increment();
3369
3370 ident_stack.push(end_ident);
3371 }
3372
3373 HydroNode::Source {
3374 source, metadata, ..
3375 } => {
3376 if let HydroSource::ExternalNetwork() = source {
3377 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3378 } else {
3379 let source_ident =
3380 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3381
3382 let source_stmt = match source {
3383 HydroSource::Stream(expr) => {
3384 debug_assert!(metadata.location_id.is_top_level());
3385 parse_quote! {
3386 #source_ident = source_stream(#expr);
3387 }
3388 }
3389
3390 HydroSource::ExternalNetwork() => {
3391 unreachable!()
3392 }
3393
3394 HydroSource::Iter(expr) => {
3395 if metadata.location_id.is_top_level() {
3396 parse_quote! {
3397 #source_ident = source_iter(#expr);
3398 }
3399 } else {
3400 parse_quote! {
3402 #source_ident = source_iter(#expr) -> persist::<'static>();
3403 }
3404 }
3405 }
3406
3407 HydroSource::Spin() => {
3408 debug_assert!(metadata.location_id.is_top_level());
3409 parse_quote! {
3410 #source_ident = spin();
3411 }
3412 }
3413
3414 HydroSource::ClusterMembers(target_loc, state) => {
3415 debug_assert!(metadata.location_id.is_top_level());
3416
3417 let members_tee_ident = syn::Ident::new(
3418 &format!(
3419 "__cluster_members_tee_{}_{}",
3420 metadata.location_id.root().key(),
3421 target_loc.key(),
3422 ),
3423 Span::call_site(),
3424 );
3425
3426 match state {
3427 ClusterMembersState::Stream(d) => {
3428 parse_quote! {
3429 #members_tee_ident = source_stream(#d) -> tee();
3430 #source_ident = #members_tee_ident;
3431 }
3432 },
3433 ClusterMembersState::Uninit => syn::parse_quote! {
3434 #source_ident = source_stream(DUMMY);
3435 },
3436 ClusterMembersState::Tee(..) => parse_quote! {
3437 #source_ident = #members_tee_ident;
3438 },
3439 }
3440 }
3441
3442 HydroSource::Embedded(ident) => {
3443 parse_quote! {
3444 #source_ident = source_stream(#ident);
3445 }
3446 }
3447
3448 HydroSource::EmbeddedSingleton(ident) => {
3449 parse_quote! {
3450 #source_ident = source_iter([#ident]);
3451 }
3452 }
3453 };
3454
3455 match builders_or_callback {
3456 BuildersOrCallback::Builders(graph_builders) => {
3457 let builder = graph_builders.get_dfir_mut(&out_location);
3458 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3459 }
3460 BuildersOrCallback::Callback(_, node_callback) => {
3461 node_callback(node, next_stmt_id);
3462 }
3463 }
3464
3465 let _ = next_stmt_id.get_and_increment();
3466
3467 ident_stack.push(source_ident);
3468 }
3469 }
3470
3471 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3472 let source_ident =
3473 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3474
3475 match builders_or_callback {
3476 BuildersOrCallback::Builders(graph_builders) => {
3477 let builder = graph_builders.get_dfir_mut(&out_location);
3478
3479 if *first_tick_only {
3480 assert!(
3481 !metadata.location_id.is_top_level(),
3482 "first_tick_only SingletonSource must be inside a tick"
3483 );
3484 }
3485
3486 if *first_tick_only
3487 || (metadata.location_id.is_top_level()
3488 && metadata.collection_kind.is_bounded())
3489 {
3490 builder.add_dfir(
3491 parse_quote! {
3492 #source_ident = source_iter([#value]);
3493 },
3494 None,
3495 Some(&next_stmt_id.to_string()),
3496 );
3497 } else {
3498 builder.add_dfir(
3499 parse_quote! {
3500 #source_ident = source_iter([#value]) -> persist::<'static>();
3501 },
3502 None,
3503 Some(&next_stmt_id.to_string()),
3504 );
3505 }
3506 }
3507 BuildersOrCallback::Callback(_, node_callback) => {
3508 node_callback(node, next_stmt_id);
3509 }
3510 }
3511
3512 let _ = next_stmt_id.get_and_increment();
3513
3514 ident_stack.push(source_ident);
3515 }
3516
3517 HydroNode::CycleSource { cycle_id, .. } => {
3518 let ident = cycle_id.as_ident();
3519
3520 match builders_or_callback {
3521 BuildersOrCallback::Builders(_) => {}
3522 BuildersOrCallback::Callback(_, node_callback) => {
3523 node_callback(node, next_stmt_id);
3524 }
3525 }
3526
3527 let _ = next_stmt_id.get_and_increment();
3529
3530 ident_stack.push(ident);
3531 }
3532
3533 HydroNode::Tee { inner, .. } => {
3534 let ret_ident = if let Some(built_idents) =
3535 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3536 {
3537 match builders_or_callback {
3538 BuildersOrCallback::Builders(_) => {}
3539 BuildersOrCallback::Callback(_, node_callback) => {
3540 node_callback(node, next_stmt_id);
3541 }
3542 }
3543
3544 built_idents[0].clone()
3545 } else {
3546 let inner_ident = ident_stack.pop().unwrap();
3549
3550 let tee_ident =
3551 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3552
3553 built_tees.insert(
3554 inner.0.as_ref() as *const RefCell<HydroNode>,
3555 vec![tee_ident.clone()],
3556 );
3557
3558 match builders_or_callback {
3559 BuildersOrCallback::Builders(graph_builders) => {
3560 if fold_hooked_idents.contains(&inner_ident.to_string()) {
3572 fold_hooked_idents.insert(tee_ident.to_string());
3573 }
3574 let builder = graph_builders.get_dfir_mut(&out_location);
3575 builder.add_dfir(
3576 parse_quote! {
3577 #tee_ident = #inner_ident -> tee();
3578 },
3579 None,
3580 Some(&next_stmt_id.to_string()),
3581 );
3582 }
3583 BuildersOrCallback::Callback(_, node_callback) => {
3584 node_callback(node, next_stmt_id);
3585 }
3586 }
3587
3588 tee_ident
3589 };
3590
3591 let _ = next_stmt_id.get_and_increment();
3595 ident_stack.push(ret_ident);
3596 }
3597
3598 HydroNode::Singleton { inner, .. } => {
3599 let ret_ident = if let Some(built_idents) =
3600 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3601 {
3602 built_idents[0].clone()
3603 } else {
3604 let inner_ident = ident_stack.pop().unwrap();
3605
3606 let singleton_ident =
3607 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3608
3609 built_tees.insert(
3610 inner.0.as_ref() as *const RefCell<HydroNode>,
3611 vec![singleton_ident.clone()],
3612 );
3613
3614 match builders_or_callback {
3615 BuildersOrCallback::Builders(graph_builders) => {
3616 let builder = graph_builders.get_dfir_mut(&out_location);
3617 builder.add_dfir(
3618 parse_quote! {
3619 #singleton_ident = #inner_ident -> singleton();
3620 },
3621 None,
3622 Some(&next_stmt_id.to_string()),
3623 );
3624 }
3625 BuildersOrCallback::Callback(_, node_callback) => {
3626 node_callback(node, next_stmt_id);
3627 }
3628 }
3629
3630 singleton_ident
3631 };
3632
3633 let _ = next_stmt_id.get_and_increment();
3636 ident_stack.push(ret_ident);
3637 }
3638
3639 HydroNode::Partition {
3640 inner, f, is_true, ..
3641 } => {
3642 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3644 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3645 match builders_or_callback {
3646 BuildersOrCallback::Builders(_) => {}
3647 BuildersOrCallback::Callback(_, node_callback) => {
3648 node_callback(node, next_stmt_id);
3649 }
3650 }
3651
3652 let idx = if is_true { 0 } else { 1 };
3653 built_idents[idx].clone()
3654 } else {
3655 let inner_ident = ident_stack.pop().unwrap();
3658 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
3659
3660 let partition_ident = syn::Ident::new(
3661 &format!("stream_{}_partition", *next_stmt_id),
3662 Span::call_site(),
3663 );
3664 let true_ident = syn::Ident::new(
3665 &format!("stream_{}_true", *next_stmt_id),
3666 Span::call_site(),
3667 );
3668 let false_ident = syn::Ident::new(
3669 &format!("stream_{}_false", *next_stmt_id),
3670 Span::call_site(),
3671 );
3672
3673 built_tees.insert(
3674 ptr,
3675 vec![true_ident.clone(), false_ident.clone()],
3676 );
3677
3678 match builders_or_callback {
3679 BuildersOrCallback::Builders(graph_builders) => {
3680 let builder = graph_builders.get_dfir_mut(&out_location);
3681 builder.add_dfir(
3682 parse_quote! {
3683 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3684 #true_ident = #partition_ident[0];
3685 #false_ident = #partition_ident[1];
3686 },
3687 None,
3688 Some(&next_stmt_id.to_string()),
3689 );
3690 }
3691 BuildersOrCallback::Callback(_, node_callback) => {
3692 node_callback(node, next_stmt_id);
3693 }
3694 }
3695
3696 if is_true { true_ident } else { false_ident }
3697 };
3698
3699 let _ = next_stmt_id.get_and_increment();
3700 ident_stack.push(ret_ident);
3701 }
3702
3703 HydroNode::Chain { .. } => {
3704 let second_ident = ident_stack.pop().unwrap();
3706 let first_ident = ident_stack.pop().unwrap();
3707
3708 let chain_ident =
3709 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3710
3711 match builders_or_callback {
3712 BuildersOrCallback::Builders(graph_builders) => {
3713 let builder = graph_builders.get_dfir_mut(&out_location);
3714 builder.add_dfir(
3715 parse_quote! {
3716 #chain_ident = chain();
3717 #first_ident -> [0]#chain_ident;
3718 #second_ident -> [1]#chain_ident;
3719 },
3720 None,
3721 Some(&next_stmt_id.to_string()),
3722 );
3723 }
3724 BuildersOrCallback::Callback(_, node_callback) => {
3725 node_callback(node, next_stmt_id);
3726 }
3727 }
3728
3729 let _ = next_stmt_id.get_and_increment();
3730
3731 ident_stack.push(chain_ident);
3732 }
3733
3734 HydroNode::MergeOrdered { first, metadata, .. } => {
3735 let second_ident = ident_stack.pop().unwrap();
3736 let first_ident = ident_stack.pop().unwrap();
3737
3738 let merge_ident =
3739 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3740
3741 match builders_or_callback {
3742 BuildersOrCallback::Builders(graph_builders) => {
3743 graph_builders.merge_ordered(
3744 &first.metadata().location_id,
3745 first_ident,
3746 second_ident,
3747 &merge_ident,
3748 &first.metadata().collection_kind,
3749 &metadata.op,
3750 Some(&next_stmt_id.to_string()),
3751 );
3752 }
3753 BuildersOrCallback::Callback(_, node_callback) => {
3754 node_callback(node, next_stmt_id);
3755 }
3756 }
3757
3758 let _ = next_stmt_id.get_and_increment();
3759
3760 ident_stack.push(merge_ident);
3761 }
3762
3763 HydroNode::ChainFirst { .. } => {
3764 let second_ident = ident_stack.pop().unwrap();
3765 let first_ident = ident_stack.pop().unwrap();
3766
3767 let chain_ident =
3768 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3769
3770 match builders_or_callback {
3771 BuildersOrCallback::Builders(graph_builders) => {
3772 let builder = graph_builders.get_dfir_mut(&out_location);
3773 builder.add_dfir(
3774 parse_quote! {
3775 #chain_ident = chain_first_n(1);
3776 #first_ident -> [0]#chain_ident;
3777 #second_ident -> [1]#chain_ident;
3778 },
3779 None,
3780 Some(&next_stmt_id.to_string()),
3781 );
3782 }
3783 BuildersOrCallback::Callback(_, node_callback) => {
3784 node_callback(node, next_stmt_id);
3785 }
3786 }
3787
3788 let _ = next_stmt_id.get_and_increment();
3789
3790 ident_stack.push(chain_ident);
3791 }
3792
3793 HydroNode::CrossSingleton { right, .. } => {
3794 let right_ident = ident_stack.pop().unwrap();
3795 let left_ident = ident_stack.pop().unwrap();
3796
3797 let cross_ident =
3798 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3799
3800 match builders_or_callback {
3801 BuildersOrCallback::Builders(graph_builders) => {
3802 let builder = graph_builders.get_dfir_mut(&out_location);
3803
3804 if right.metadata().location_id.is_top_level()
3805 && right.metadata().collection_kind.is_bounded()
3806 {
3807 builder.add_dfir(
3808 parse_quote! {
3809 #cross_ident = cross_singleton::<'static>();
3810 #left_ident -> [input]#cross_ident;
3811 #right_ident -> [single]#cross_ident;
3812 },
3813 None,
3814 Some(&next_stmt_id.to_string()),
3815 );
3816 } else {
3817 builder.add_dfir(
3818 parse_quote! {
3819 #cross_ident = cross_singleton();
3820 #left_ident -> [input]#cross_ident;
3821 #right_ident -> [single]#cross_ident;
3822 },
3823 None,
3824 Some(&next_stmt_id.to_string()),
3825 );
3826 }
3827 }
3828 BuildersOrCallback::Callback(_, node_callback) => {
3829 node_callback(node, next_stmt_id);
3830 }
3831 }
3832
3833 let _ = next_stmt_id.get_and_increment();
3834
3835 ident_stack.push(cross_ident);
3836 }
3837
3838 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3839 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3840 parse_quote!(cross_join_multiset)
3841 } else {
3842 parse_quote!(join_multiset)
3843 };
3844
3845 let (HydroNode::CrossProduct { left, right, .. }
3846 | HydroNode::Join { left, right, .. }) = node
3847 else {
3848 unreachable!()
3849 };
3850
3851 let is_top_level = left.metadata().location_id.is_top_level()
3852 && right.metadata().location_id.is_top_level();
3853 let left_lifetime = if left.metadata().location_id.is_top_level() {
3854 quote!('static)
3855 } else {
3856 quote!('tick)
3857 };
3858
3859 let right_lifetime = if right.metadata().location_id.is_top_level() {
3860 quote!('static)
3861 } else {
3862 quote!('tick)
3863 };
3864
3865 let right_ident = ident_stack.pop().unwrap();
3866 let left_ident = ident_stack.pop().unwrap();
3867
3868 let stream_ident =
3869 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3870
3871 match builders_or_callback {
3872 BuildersOrCallback::Builders(graph_builders) => {
3873 let builder = graph_builders.get_dfir_mut(&out_location);
3874 builder.add_dfir(
3875 if is_top_level {
3876 parse_quote! {
3879 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3880 #left_ident -> [0]#stream_ident;
3881 #right_ident -> [1]#stream_ident;
3882 }
3883 } else {
3884 parse_quote! {
3885 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3886 #left_ident -> [0]#stream_ident;
3887 #right_ident -> [1]#stream_ident;
3888 }
3889 }
3890 ,
3891 None,
3892 Some(&next_stmt_id.to_string()),
3893 );
3894 }
3895 BuildersOrCallback::Callback(_, node_callback) => {
3896 node_callback(node, next_stmt_id);
3897 }
3898 }
3899
3900 let _ = next_stmt_id.get_and_increment();
3901
3902 ident_stack.push(stream_ident);
3903 }
3904
3905 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3906 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3907 parse_quote!(difference)
3908 } else {
3909 parse_quote!(anti_join)
3910 };
3911
3912 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3913 node
3914 else {
3915 unreachable!()
3916 };
3917
3918 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3919 quote!('static)
3920 } else {
3921 quote!('tick)
3922 };
3923
3924 let neg_ident = ident_stack.pop().unwrap();
3925 let pos_ident = ident_stack.pop().unwrap();
3926
3927 let stream_ident =
3928 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3929
3930 match builders_or_callback {
3931 BuildersOrCallback::Builders(graph_builders) => {
3932 let builder = graph_builders.get_dfir_mut(&out_location);
3933 builder.add_dfir(
3934 parse_quote! {
3935 #stream_ident = #operator::<'tick, #neg_lifetime>();
3936 #pos_ident -> [pos]#stream_ident;
3937 #neg_ident -> [neg]#stream_ident;
3938 },
3939 None,
3940 Some(&next_stmt_id.to_string()),
3941 );
3942 }
3943 BuildersOrCallback::Callback(_, node_callback) => {
3944 node_callback(node, next_stmt_id);
3945 }
3946 }
3947
3948 let _ = next_stmt_id.get_and_increment();
3949
3950 ident_stack.push(stream_ident);
3951 }
3952
3953 HydroNode::JoinHalf { .. } => {
3954 let HydroNode::JoinHalf { right, .. } = node else {
3955 unreachable!()
3956 };
3957
3958 assert!(
3959 right.metadata().collection_kind.is_bounded(),
3960 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3961 right.metadata().collection_kind
3962 );
3963
3964 let build_lifetime = if right.metadata().location_id.is_top_level() {
3965 quote!('static)
3966 } else {
3967 quote!('tick)
3968 };
3969
3970 let build_ident = ident_stack.pop().unwrap();
3971 let probe_ident = ident_stack.pop().unwrap();
3972
3973 let stream_ident =
3974 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3975
3976 match builders_or_callback {
3977 BuildersOrCallback::Builders(graph_builders) => {
3978 let builder = graph_builders.get_dfir_mut(&out_location);
3979 builder.add_dfir(
3980 parse_quote! {
3981 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3982 #probe_ident -> [probe]#stream_ident;
3983 #build_ident -> [build]#stream_ident;
3984 },
3985 None,
3986 Some(&next_stmt_id.to_string()),
3987 );
3988 }
3989 BuildersOrCallback::Callback(_, node_callback) => {
3990 node_callback(node, next_stmt_id);
3991 }
3992 }
3993
3994 let _ = next_stmt_id.get_and_increment();
3995
3996 ident_stack.push(stream_ident);
3997 }
3998
3999 HydroNode::ResolveFutures { .. } => {
4000 let input_ident = ident_stack.pop().unwrap();
4001
4002 let futures_ident =
4003 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4004
4005 match builders_or_callback {
4006 BuildersOrCallback::Builders(graph_builders) => {
4007 let builder = graph_builders.get_dfir_mut(&out_location);
4008 builder.add_dfir(
4009 parse_quote! {
4010 #futures_ident = #input_ident -> resolve_futures();
4011 },
4012 None,
4013 Some(&next_stmt_id.to_string()),
4014 );
4015 }
4016 BuildersOrCallback::Callback(_, node_callback) => {
4017 node_callback(node, next_stmt_id);
4018 }
4019 }
4020
4021 let _ = next_stmt_id.get_and_increment();
4022
4023 ident_stack.push(futures_ident);
4024 }
4025
4026 HydroNode::ResolveFuturesBlocking { .. } => {
4027 let input_ident = ident_stack.pop().unwrap();
4028
4029 let futures_ident =
4030 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4031
4032 match builders_or_callback {
4033 BuildersOrCallback::Builders(graph_builders) => {
4034 let builder = graph_builders.get_dfir_mut(&out_location);
4035 builder.add_dfir(
4036 parse_quote! {
4037 #futures_ident = #input_ident -> resolve_futures_blocking();
4038 },
4039 None,
4040 Some(&next_stmt_id.to_string()),
4041 );
4042 }
4043 BuildersOrCallback::Callback(_, node_callback) => {
4044 node_callback(node, next_stmt_id);
4045 }
4046 }
4047
4048 let _ = next_stmt_id.get_and_increment();
4049
4050 ident_stack.push(futures_ident);
4051 }
4052
4053 HydroNode::ResolveFuturesOrdered { .. } => {
4054 let input_ident = ident_stack.pop().unwrap();
4055
4056 let futures_ident =
4057 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4058
4059 match builders_or_callback {
4060 BuildersOrCallback::Builders(graph_builders) => {
4061 let builder = graph_builders.get_dfir_mut(&out_location);
4062 builder.add_dfir(
4063 parse_quote! {
4064 #futures_ident = #input_ident -> resolve_futures_ordered();
4065 },
4066 None,
4067 Some(&next_stmt_id.to_string()),
4068 );
4069 }
4070 BuildersOrCallback::Callback(_, node_callback) => {
4071 node_callback(node, next_stmt_id);
4072 }
4073 }
4074
4075 let _ = next_stmt_id.get_and_increment();
4076
4077 ident_stack.push(futures_ident);
4078 }
4079
4080 HydroNode::Map { f, .. } => {
4081 let input_ident = ident_stack.pop().unwrap();
4083 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4084
4085 let map_ident =
4086 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4087
4088 match builders_or_callback {
4089 BuildersOrCallback::Builders(graph_builders) => {
4090 let builder = graph_builders.get_dfir_mut(&out_location);
4091 builder.add_dfir(
4092 parse_quote! {
4093 #map_ident = #input_ident -> map(#f_tokens);
4094 },
4095 None,
4096 Some(&next_stmt_id.to_string()),
4097 );
4098 }
4099 BuildersOrCallback::Callback(_, node_callback) => {
4100 node_callback(node, next_stmt_id);
4101 }
4102 }
4103
4104 let _ = next_stmt_id.get_and_increment();
4105
4106 ident_stack.push(map_ident);
4107 }
4108
4109 HydroNode::FlatMap { f, .. } => {
4110 let input_ident = ident_stack.pop().unwrap();
4111 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4112
4113 let flat_map_ident =
4114 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4115
4116 match builders_or_callback {
4117 BuildersOrCallback::Builders(graph_builders) => {
4118 let builder = graph_builders.get_dfir_mut(&out_location);
4119 builder.add_dfir(
4120 parse_quote! {
4121 #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4122 },
4123 None,
4124 Some(&next_stmt_id.to_string()),
4125 );
4126 }
4127 BuildersOrCallback::Callback(_, node_callback) => {
4128 node_callback(node, next_stmt_id);
4129 }
4130 }
4131
4132 let _ = next_stmt_id.get_and_increment();
4133
4134 ident_stack.push(flat_map_ident);
4135 }
4136
4137 HydroNode::FlatMapStreamBlocking { f, .. } => {
4138 let input_ident = ident_stack.pop().unwrap();
4139 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4140
4141 let flat_map_stream_blocking_ident =
4142 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4143
4144 match builders_or_callback {
4145 BuildersOrCallback::Builders(graph_builders) => {
4146 let builder = graph_builders.get_dfir_mut(&out_location);
4147 builder.add_dfir(
4148 parse_quote! {
4149 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4150 },
4151 None,
4152 Some(&next_stmt_id.to_string()),
4153 );
4154 }
4155 BuildersOrCallback::Callback(_, node_callback) => {
4156 node_callback(node, next_stmt_id);
4157 }
4158 }
4159
4160 let _ = next_stmt_id.get_and_increment();
4161
4162 ident_stack.push(flat_map_stream_blocking_ident);
4163 }
4164
4165 HydroNode::Filter { f, .. } => {
4166 let input_ident = ident_stack.pop().unwrap();
4167 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4168
4169 let filter_ident =
4170 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4171
4172 match builders_or_callback {
4173 BuildersOrCallback::Builders(graph_builders) => {
4174 let builder = graph_builders.get_dfir_mut(&out_location);
4175 builder.add_dfir(
4176 parse_quote! {
4177 #filter_ident = #input_ident -> filter(#f_tokens);
4178 },
4179 None,
4180 Some(&next_stmt_id.to_string()),
4181 );
4182 }
4183 BuildersOrCallback::Callback(_, node_callback) => {
4184 node_callback(node, next_stmt_id);
4185 }
4186 }
4187
4188 let _ = next_stmt_id.get_and_increment();
4189
4190 ident_stack.push(filter_ident);
4191 }
4192
4193 HydroNode::FilterMap { f, .. } => {
4194 let input_ident = ident_stack.pop().unwrap();
4195 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4196
4197 let filter_map_ident =
4198 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4199
4200 match builders_or_callback {
4201 BuildersOrCallback::Builders(graph_builders) => {
4202 let builder = graph_builders.get_dfir_mut(&out_location);
4203 builder.add_dfir(
4204 parse_quote! {
4205 #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4206 },
4207 None,
4208 Some(&next_stmt_id.to_string()),
4209 );
4210 }
4211 BuildersOrCallback::Callback(_, node_callback) => {
4212 node_callback(node, next_stmt_id);
4213 }
4214 }
4215
4216 let _ = next_stmt_id.get_and_increment();
4217
4218 ident_stack.push(filter_map_ident);
4219 }
4220
4221 HydroNode::Sort { .. } => {
4222 let input_ident = ident_stack.pop().unwrap();
4223
4224 let sort_ident =
4225 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4226
4227 match builders_or_callback {
4228 BuildersOrCallback::Builders(graph_builders) => {
4229 let builder = graph_builders.get_dfir_mut(&out_location);
4230 builder.add_dfir(
4231 parse_quote! {
4232 #sort_ident = #input_ident -> sort();
4233 },
4234 None,
4235 Some(&next_stmt_id.to_string()),
4236 );
4237 }
4238 BuildersOrCallback::Callback(_, node_callback) => {
4239 node_callback(node, next_stmt_id);
4240 }
4241 }
4242
4243 let _ = next_stmt_id.get_and_increment();
4244
4245 ident_stack.push(sort_ident);
4246 }
4247
4248 HydroNode::DeferTick { .. } => {
4249 let input_ident = ident_stack.pop().unwrap();
4250
4251 let defer_tick_ident =
4252 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4253
4254 match builders_or_callback {
4255 BuildersOrCallback::Builders(graph_builders) => {
4256 let builder = graph_builders.get_dfir_mut(&out_location);
4257 builder.add_dfir(
4258 parse_quote! {
4259 #defer_tick_ident = #input_ident -> defer_tick_lazy();
4260 },
4261 None,
4262 Some(&next_stmt_id.to_string()),
4263 );
4264 }
4265 BuildersOrCallback::Callback(_, node_callback) => {
4266 node_callback(node, next_stmt_id);
4267 }
4268 }
4269
4270 let _ = next_stmt_id.get_and_increment();
4271
4272 ident_stack.push(defer_tick_ident);
4273 }
4274
4275 HydroNode::Enumerate { input, .. } => {
4276 let input_ident = ident_stack.pop().unwrap();
4277
4278 let enumerate_ident =
4279 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4280
4281 match builders_or_callback {
4282 BuildersOrCallback::Builders(graph_builders) => {
4283 let builder = graph_builders.get_dfir_mut(&out_location);
4284 let lifetime = if input.metadata().location_id.is_top_level() {
4285 quote!('static)
4286 } else {
4287 quote!('tick)
4288 };
4289 builder.add_dfir(
4290 parse_quote! {
4291 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4292 },
4293 None,
4294 Some(&next_stmt_id.to_string()),
4295 );
4296 }
4297 BuildersOrCallback::Callback(_, node_callback) => {
4298 node_callback(node, next_stmt_id);
4299 }
4300 }
4301
4302 let _ = next_stmt_id.get_and_increment();
4303
4304 ident_stack.push(enumerate_ident);
4305 }
4306
4307 HydroNode::Inspect { f, .. } => {
4308 let input_ident = ident_stack.pop().unwrap();
4309 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4310
4311 let inspect_ident =
4312 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4313
4314 match builders_or_callback {
4315 BuildersOrCallback::Builders(graph_builders) => {
4316 let builder = graph_builders.get_dfir_mut(&out_location);
4317 builder.add_dfir(
4318 parse_quote! {
4319 #inspect_ident = #input_ident -> inspect(#f_tokens);
4320 },
4321 None,
4322 Some(&next_stmt_id.to_string()),
4323 );
4324 }
4325 BuildersOrCallback::Callback(_, node_callback) => {
4326 node_callback(node, next_stmt_id);
4327 }
4328 }
4329
4330 let _ = next_stmt_id.get_and_increment();
4331
4332 ident_stack.push(inspect_ident);
4333 }
4334
4335 HydroNode::Unique { input, .. } => {
4336 let input_ident = ident_stack.pop().unwrap();
4337
4338 let unique_ident =
4339 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4340
4341 match builders_or_callback {
4342 BuildersOrCallback::Builders(graph_builders) => {
4343 let builder = graph_builders.get_dfir_mut(&out_location);
4344 let lifetime = if input.metadata().location_id.is_top_level() {
4345 quote!('static)
4346 } else {
4347 quote!('tick)
4348 };
4349
4350 builder.add_dfir(
4351 parse_quote! {
4352 #unique_ident = #input_ident -> unique::<#lifetime>();
4353 },
4354 None,
4355 Some(&next_stmt_id.to_string()),
4356 );
4357 }
4358 BuildersOrCallback::Callback(_, node_callback) => {
4359 node_callback(node, next_stmt_id);
4360 }
4361 }
4362
4363 let _ = next_stmt_id.get_and_increment();
4364
4365 ident_stack.push(unique_ident);
4366 }
4367
4368 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4369 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4370 if input.metadata().location_id.is_top_level()
4371 && input.metadata().collection_kind.is_bounded()
4372 {
4373 parse_quote!(fold_no_replay)
4374 } else {
4375 parse_quote!(fold)
4376 }
4377 } else if matches!(node, HydroNode::Scan { .. }) {
4378 parse_quote!(scan)
4379 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4380 parse_quote!(scan_async_blocking)
4381 } else if let HydroNode::FoldKeyed { input, .. } = node {
4382 if input.metadata().location_id.is_top_level()
4383 && input.metadata().collection_kind.is_bounded()
4384 {
4385 todo!("Fold keyed on a top-level bounded collection is not yet supported")
4386 } else {
4387 parse_quote!(fold_keyed)
4388 }
4389 } else {
4390 unreachable!()
4391 };
4392
4393 let (HydroNode::Fold { input, .. }
4394 | HydroNode::FoldKeyed { input, .. }
4395 | HydroNode::Scan { input, .. }
4396 | HydroNode::ScanAsyncBlocking { input, .. }) = node
4397 else {
4398 unreachable!()
4399 };
4400
4401 let lifetime = if input.metadata().location_id.is_top_level() {
4402 quote!('static)
4403 } else {
4404 quote!('tick)
4405 };
4406
4407 let input_ident = ident_stack.pop().unwrap();
4408
4409 let (HydroNode::Fold { init, acc, .. }
4410 | HydroNode::FoldKeyed { init, acc, .. }
4411 | HydroNode::Scan { init, acc, .. }
4412 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4413 else {
4414 unreachable!()
4415 };
4416
4417 let acc_tokens = acc.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4418 let init_tokens = init.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4419
4420 let fold_ident =
4421 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4422
4423 match builders_or_callback {
4424 BuildersOrCallback::Builders(graph_builders) => {
4425 if matches!(node, HydroNode::Fold { .. })
4426 && node.metadata().location_id.is_top_level()
4427 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4428 && graph_builders.singleton_intermediates()
4429 && !node.metadata().collection_kind.is_bounded()
4430 {
4431 let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4432 let hooked_input_ident = graph_builders.emit_fold_hook(
4433 &input.metadata().location_id,
4434 &input_ident,
4435 &input.metadata().collection_kind,
4436 &node.metadata().op,
4437 );
4438
4439 let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4440 let acc: syn::Expr = parse_quote!({
4441 let mut __inner = #acc_tokens;
4442 move |__state, __batch: Vec<_>| {
4443 if __batch.is_empty() {
4444 return None;
4445 }
4446 for __value in __batch {
4447 __inner(__state, __value);
4448 }
4449 Some(__state.clone())
4450 }
4451 });
4452 (hooked, acc)
4453 } else {
4454 let acc: syn::Expr = parse_quote!({
4455 let mut __inner = #acc_tokens;
4456 move |__state, __value| {
4457 __inner(__state, __value);
4458 Some(__state.clone())
4459 }
4460 });
4461 (&input_ident, acc)
4462 };
4463
4464 let builder = graph_builders.get_dfir_mut(&out_location);
4465 builder.add_dfir(
4466 parse_quote! {
4467 source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4468 #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4469 #fold_ident = chain();
4470 },
4471 None,
4472 Some(&next_stmt_id.to_string()),
4473 );
4474
4475 if hooked_input_ident.is_some() {
4476 fold_hooked_idents.insert(fold_ident.to_string());
4477 }
4478 } else if matches!(node, HydroNode::FoldKeyed { .. })
4479 && node.metadata().location_id.is_top_level()
4480 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4481 && graph_builders.singleton_intermediates()
4482 && !node.metadata().collection_kind.is_bounded()
4483 {
4484 let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4485 let hooked_input_ident = graph_builders.emit_fold_hook(
4486 &input.metadata().location_id,
4487 &input_ident,
4488 &input.metadata().collection_kind,
4489 &node.metadata().op,
4490 );
4491 let builder = graph_builders.get_dfir_mut(&out_location);
4492
4493 let wrapped_acc: syn::Expr = parse_quote!({
4494 let mut __init = #init_tokens;
4495 let mut __inner = #acc_tokens;
4496 move |__state, __kv: (_, _)| {
4497 let __state = __state
4499 .entry(::std::clone::Clone::clone(&__kv.0))
4500 .or_insert_with(|| (__init)());
4501 __inner(__state, __kv.1);
4502 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4503 }
4504 });
4505
4506 if let Some(hooked_input_ident) = hooked_input_ident {
4507 builder.add_dfir(
4508 parse_quote! {
4509 #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4510 },
4511 None,
4512 Some(&next_stmt_id.to_string()),
4513 );
4514
4515 fold_hooked_idents.insert(fold_ident.to_string());
4516 } else {
4517 builder.add_dfir(
4518 parse_quote! {
4519 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4520 },
4521 None,
4522 Some(&next_stmt_id.to_string()),
4523 );
4524 }
4525 } else if (matches!(node, HydroNode::Fold { .. })
4526 || matches!(node, HydroNode::FoldKeyed { .. }))
4527 && !node.metadata().location_id.is_top_level()
4528 && graph_builders.singleton_intermediates()
4529 {
4530 let input_ref = match &*node {
4531 HydroNode::Fold { input, .. } => input,
4532 HydroNode::FoldKeyed { input, .. } => input,
4533 _ => unreachable!(),
4534 };
4535 let hooked_input_ident = graph_builders.emit_fold_hook(
4536 &input_ref.metadata().location_id,
4537 &input_ident,
4538 &input_ref.metadata().collection_kind,
4539 &node.metadata().op,
4540 );
4541
4542 let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4543 let builder = graph_builders.get_dfir_mut(&out_location);
4544 builder.add_dfir(
4545 parse_quote! {
4546 #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4547 },
4548 None,
4549 Some(&next_stmt_id.to_string()),
4550 );
4551 } else {
4552 let builder = graph_builders.get_dfir_mut(&out_location);
4553 builder.add_dfir(
4554 parse_quote! {
4555 #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4556 },
4557 None,
4558 Some(&next_stmt_id.to_string()),
4559 );
4560 }
4561 }
4562 BuildersOrCallback::Callback(_, node_callback) => {
4563 node_callback(node, next_stmt_id);
4564 }
4565 }
4566
4567 let _ = next_stmt_id.get_and_increment();
4568
4569 ident_stack.push(fold_ident);
4570 }
4571
4572 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4573 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4574 if input.metadata().location_id.is_top_level()
4575 && input.metadata().collection_kind.is_bounded()
4576 {
4577 parse_quote!(reduce_no_replay)
4578 } else {
4579 parse_quote!(reduce)
4580 }
4581 } else if let HydroNode::ReduceKeyed { input, .. } = node {
4582 if input.metadata().location_id.is_top_level()
4583 && input.metadata().collection_kind.is_bounded()
4584 {
4585 todo!(
4586 "Calling keyed reduce on a top-level bounded collection is not supported"
4587 )
4588 } else {
4589 parse_quote!(reduce_keyed)
4590 }
4591 } else {
4592 unreachable!()
4593 };
4594
4595 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4596 else {
4597 unreachable!()
4598 };
4599
4600 let lifetime = if input.metadata().location_id.is_top_level() {
4601 quote!('static)
4602 } else {
4603 quote!('tick)
4604 };
4605
4606 let input_ident = ident_stack.pop().unwrap();
4607
4608 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4609 else {
4610 unreachable!()
4611 };
4612
4613 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4614
4615 let reduce_ident =
4616 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4617
4618 match builders_or_callback {
4619 BuildersOrCallback::Builders(graph_builders) => {
4620 if matches!(node, HydroNode::Reduce { .. })
4621 && node.metadata().location_id.is_top_level()
4622 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4623 && graph_builders.singleton_intermediates()
4624 && !node.metadata().collection_kind.is_bounded()
4625 {
4626 todo!(
4627 "Reduce with optional intermediates is not yet supported in simulator"
4628 );
4629 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4630 && node.metadata().location_id.is_top_level()
4631 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4632 && graph_builders.singleton_intermediates()
4633 && !node.metadata().collection_kind.is_bounded()
4634 {
4635 todo!(
4636 "Reduce keyed with optional intermediates is not yet supported in simulator"
4637 );
4638 } else {
4639 let builder = graph_builders.get_dfir_mut(&out_location);
4640 builder.add_dfir(
4641 parse_quote! {
4642 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4643 },
4644 None,
4645 Some(&next_stmt_id.to_string()),
4646 );
4647 }
4648 }
4649 BuildersOrCallback::Callback(_, node_callback) => {
4650 node_callback(node, next_stmt_id);
4651 }
4652 }
4653
4654 let _ = next_stmt_id.get_and_increment();
4655
4656 ident_stack.push(reduce_ident);
4657 }
4658
4659 HydroNode::ReduceKeyedWatermark {
4660 f,
4661 input,
4662 metadata,
4663 ..
4664 } => {
4665 let lifetime = if input.metadata().location_id.is_top_level() {
4666 quote!('static)
4667 } else {
4668 quote!('tick)
4669 };
4670
4671 let watermark_ident = ident_stack.pop().unwrap();
4673 let input_ident = ident_stack.pop().unwrap();
4674 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4675
4676 let chain_ident = syn::Ident::new(
4677 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4678 Span::call_site(),
4679 );
4680
4681 let fold_ident =
4682 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4683
4684 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4685 && input.metadata().collection_kind.is_bounded()
4686 {
4687 parse_quote!(fold_no_replay)
4688 } else {
4689 parse_quote!(fold)
4690 };
4691
4692 match builders_or_callback {
4693 BuildersOrCallback::Builders(graph_builders) => {
4694 if metadata.location_id.is_top_level()
4695 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4696 && graph_builders.singleton_intermediates()
4697 && !metadata.collection_kind.is_bounded()
4698 {
4699 todo!(
4700 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4701 )
4702 } else {
4703 let builder = graph_builders.get_dfir_mut(&out_location);
4704 builder.add_dfir(
4705 parse_quote! {
4706 #chain_ident = chain();
4707 #input_ident
4708 -> map(|x| (Some(x), None))
4709 -> [0]#chain_ident;
4710 #watermark_ident
4711 -> map(|watermark| (None, Some(watermark)))
4712 -> [1]#chain_ident;
4713
4714 #fold_ident = #chain_ident
4715 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4716 let __reduce_keyed_fn = #f_tokens;
4717 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4718 if let Some((k, v)) = opt_payload {
4719 if let Some(curr_watermark) = *opt_curr_watermark {
4720 if k < curr_watermark {
4721 return;
4722 }
4723 }
4724 match map.entry(k) {
4725 ::std::collections::hash_map::Entry::Vacant(e) => {
4726 e.insert(v);
4727 }
4728 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4729 __reduce_keyed_fn(e.get_mut(), v);
4730 }
4731 }
4732 } else {
4733 let watermark = opt_watermark.unwrap();
4734 if let Some(curr_watermark) = *opt_curr_watermark {
4735 if watermark <= curr_watermark {
4736 return;
4737 }
4738 }
4739 map.retain(|k, _| *k >= watermark);
4740 *opt_curr_watermark = Some(watermark);
4741 }
4742 }
4743 })
4744 -> flat_map(|(map, _curr_watermark)| map);
4745 },
4746 None,
4747 Some(&next_stmt_id.to_string()),
4748 );
4749 }
4750 }
4751 BuildersOrCallback::Callback(_, node_callback) => {
4752 node_callback(node, next_stmt_id);
4753 }
4754 }
4755
4756 let _ = next_stmt_id.get_and_increment();
4757
4758 ident_stack.push(fold_ident);
4759 }
4760
4761 HydroNode::Network {
4762 networking_info,
4763 serialize_fn: serialize_pipeline,
4764 instantiate_fn,
4765 deserialize_fn: deserialize_pipeline,
4766 input,
4767 ..
4768 } => {
4769 let input_ident = ident_stack.pop().unwrap();
4770
4771 let receiver_stream_ident =
4772 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4773
4774 match builders_or_callback {
4775 BuildersOrCallback::Builders(graph_builders) => {
4776 let (sink_expr, source_expr) = match instantiate_fn {
4777 DebugInstantiate::Building => (
4778 syn::parse_quote!(DUMMY_SINK),
4779 syn::parse_quote!(DUMMY_SOURCE),
4780 ),
4781
4782 DebugInstantiate::Finalized(finalized) => {
4783 (finalized.sink.clone(), finalized.source.clone())
4784 }
4785 };
4786
4787 graph_builders.create_network(
4788 &input.metadata().location_id,
4789 &out_location,
4790 input_ident,
4791 &receiver_stream_ident,
4792 serialize_pipeline.as_ref(),
4793 sink_expr,
4794 source_expr,
4795 deserialize_pipeline.as_ref(),
4796 *next_stmt_id,
4797 networking_info,
4798 );
4799 }
4800 BuildersOrCallback::Callback(_, node_callback) => {
4801 node_callback(node, next_stmt_id);
4802 }
4803 }
4804
4805 let _ = next_stmt_id.get_and_increment();
4806
4807 ident_stack.push(receiver_stream_ident);
4808 }
4809
4810 HydroNode::ExternalInput {
4811 instantiate_fn,
4812 deserialize_fn: deserialize_pipeline,
4813 ..
4814 } => {
4815 let receiver_stream_ident =
4816 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4817
4818 match builders_or_callback {
4819 BuildersOrCallback::Builders(graph_builders) => {
4820 let (_, source_expr) = match instantiate_fn {
4821 DebugInstantiate::Building => (
4822 syn::parse_quote!(DUMMY_SINK),
4823 syn::parse_quote!(DUMMY_SOURCE),
4824 ),
4825
4826 DebugInstantiate::Finalized(finalized) => {
4827 (finalized.sink.clone(), finalized.source.clone())
4828 }
4829 };
4830
4831 graph_builders.create_external_source(
4832 &out_location,
4833 source_expr,
4834 &receiver_stream_ident,
4835 deserialize_pipeline.as_ref(),
4836 *next_stmt_id,
4837 );
4838 }
4839 BuildersOrCallback::Callback(_, node_callback) => {
4840 node_callback(node, next_stmt_id);
4841 }
4842 }
4843
4844 let _ = next_stmt_id.get_and_increment();
4845
4846 ident_stack.push(receiver_stream_ident);
4847 }
4848
4849 HydroNode::Counter {
4850 tag,
4851 duration,
4852 prefix,
4853 ..
4854 } => {
4855 let input_ident = ident_stack.pop().unwrap();
4856
4857 let counter_ident =
4858 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4859
4860 match builders_or_callback {
4861 BuildersOrCallback::Builders(graph_builders) => {
4862 let arg = format!("{}({})", prefix, tag);
4863 let builder = graph_builders.get_dfir_mut(&out_location);
4864 builder.add_dfir(
4865 parse_quote! {
4866 #counter_ident = #input_ident -> _counter(#arg, #duration);
4867 },
4868 None,
4869 Some(&next_stmt_id.to_string()),
4870 );
4871 }
4872 BuildersOrCallback::Callback(_, node_callback) => {
4873 node_callback(node, next_stmt_id);
4874 }
4875 }
4876
4877 let _ = next_stmt_id.get_and_increment();
4878
4879 ident_stack.push(counter_ident);
4880 }
4881 }
4882 },
4883 seen_tees,
4884 false,
4885 );
4886
4887 let ret = ident_stack
4888 .pop()
4889 .expect("ident_stack should have exactly one element after traversal");
4890 assert!(
4891 ident_stack.is_empty(),
4892 "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
4893 This indicates a bug in the code gen: some node pushed idents that were never consumed.",
4894 ident_stack.len()
4895 );
4896 ret
4897 }
4898
4899 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4900 match self {
4901 HydroNode::Placeholder => {
4902 panic!()
4903 }
4904 HydroNode::Cast { .. }
4905 | HydroNode::ObserveNonDet { .. }
4906 | HydroNode::UnboundSingleton { .. }
4907 | HydroNode::AssertIsConsistent { .. } => {}
4908 HydroNode::Source { source, .. } => match source {
4909 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4910 HydroSource::ExternalNetwork()
4911 | HydroSource::Spin()
4912 | HydroSource::ClusterMembers(_, _)
4913 | HydroSource::Embedded(_)
4914 | HydroSource::EmbeddedSingleton(_) => {} },
4916 HydroNode::SingletonSource { value, .. } => {
4917 transform(value);
4918 }
4919 HydroNode::CycleSource { .. }
4920 | HydroNode::Tee { .. }
4921 | HydroNode::Singleton { .. }
4922 | HydroNode::YieldConcat { .. }
4923 | HydroNode::BeginAtomic { .. }
4924 | HydroNode::EndAtomic { .. }
4925 | HydroNode::Batch { .. }
4926 | HydroNode::Chain { .. }
4927 | HydroNode::MergeOrdered { .. }
4928 | HydroNode::ChainFirst { .. }
4929 | HydroNode::CrossProduct { .. }
4930 | HydroNode::CrossSingleton { .. }
4931 | HydroNode::ResolveFutures { .. }
4932 | HydroNode::ResolveFuturesBlocking { .. }
4933 | HydroNode::ResolveFuturesOrdered { .. }
4934 | HydroNode::Join { .. }
4935 | HydroNode::JoinHalf { .. }
4936 | HydroNode::Difference { .. }
4937 | HydroNode::AntiJoin { .. }
4938 | HydroNode::DeferTick { .. }
4939 | HydroNode::Enumerate { .. }
4940 | HydroNode::Unique { .. }
4941 | HydroNode::Sort { .. } => {}
4942 HydroNode::Map { f, .. }
4943 | HydroNode::FlatMap { f, .. }
4944 | HydroNode::FlatMapStreamBlocking { f, .. }
4945 | HydroNode::Filter { f, .. }
4946 | HydroNode::FilterMap { f, .. }
4947 | HydroNode::Inspect { f, .. }
4948 | HydroNode::Partition { f, .. }
4949 | HydroNode::Reduce { f, .. }
4950 | HydroNode::ReduceKeyed { f, .. }
4951 | HydroNode::ReduceKeyedWatermark { f, .. } => {
4952 transform(&mut f.expr);
4953 }
4954 HydroNode::Fold { init, acc, .. }
4955 | HydroNode::Scan { init, acc, .. }
4956 | HydroNode::ScanAsyncBlocking { init, acc, .. }
4957 | HydroNode::FoldKeyed { init, acc, .. } => {
4958 transform(&mut init.expr);
4959 transform(&mut acc.expr);
4960 }
4961 HydroNode::Network {
4962 serialize_fn,
4963 deserialize_fn,
4964 ..
4965 } => {
4966 if let Some(serialize_fn) = serialize_fn {
4967 transform(serialize_fn);
4968 }
4969 if let Some(deserialize_fn) = deserialize_fn {
4970 transform(deserialize_fn);
4971 }
4972 }
4973 HydroNode::ExternalInput { deserialize_fn, .. } => {
4974 if let Some(deserialize_fn) = deserialize_fn {
4975 transform(deserialize_fn);
4976 }
4977 }
4978 HydroNode::Counter { duration, .. } => {
4979 transform(duration);
4980 }
4981 }
4982 }
4983
4984 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4985 &self.metadata().op
4986 }
4987
4988 pub fn metadata(&self) -> &HydroIrMetadata {
4989 match self {
4990 HydroNode::Placeholder => {
4991 panic!()
4992 }
4993 HydroNode::Cast { metadata, .. }
4994 | HydroNode::ObserveNonDet { metadata, .. }
4995 | HydroNode::AssertIsConsistent { metadata, .. }
4996 | HydroNode::UnboundSingleton { metadata, .. }
4997 | HydroNode::Source { metadata, .. }
4998 | HydroNode::SingletonSource { metadata, .. }
4999 | HydroNode::CycleSource { metadata, .. }
5000 | HydroNode::Tee { metadata, .. }
5001 | HydroNode::Singleton { metadata, .. }
5002 | HydroNode::Partition { metadata, .. }
5003 | HydroNode::YieldConcat { metadata, .. }
5004 | HydroNode::BeginAtomic { metadata, .. }
5005 | HydroNode::EndAtomic { metadata, .. }
5006 | HydroNode::Batch { metadata, .. }
5007 | HydroNode::Chain { metadata, .. }
5008 | HydroNode::MergeOrdered { metadata, .. }
5009 | HydroNode::ChainFirst { metadata, .. }
5010 | HydroNode::CrossProduct { metadata, .. }
5011 | HydroNode::CrossSingleton { metadata, .. }
5012 | HydroNode::Join { metadata, .. }
5013 | HydroNode::JoinHalf { metadata, .. }
5014 | HydroNode::Difference { metadata, .. }
5015 | HydroNode::AntiJoin { metadata, .. }
5016 | HydroNode::ResolveFutures { metadata, .. }
5017 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5018 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5019 | HydroNode::Map { metadata, .. }
5020 | HydroNode::FlatMap { metadata, .. }
5021 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5022 | HydroNode::Filter { metadata, .. }
5023 | HydroNode::FilterMap { metadata, .. }
5024 | HydroNode::DeferTick { metadata, .. }
5025 | HydroNode::Enumerate { metadata, .. }
5026 | HydroNode::Inspect { metadata, .. }
5027 | HydroNode::Unique { metadata, .. }
5028 | HydroNode::Sort { metadata, .. }
5029 | HydroNode::Scan { metadata, .. }
5030 | HydroNode::ScanAsyncBlocking { metadata, .. }
5031 | HydroNode::Fold { metadata, .. }
5032 | HydroNode::FoldKeyed { metadata, .. }
5033 | HydroNode::Reduce { metadata, .. }
5034 | HydroNode::ReduceKeyed { metadata, .. }
5035 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5036 | HydroNode::ExternalInput { metadata, .. }
5037 | HydroNode::Network { metadata, .. }
5038 | HydroNode::Counter { metadata, .. } => metadata,
5039 }
5040 }
5041
5042 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5043 &mut self.metadata_mut().op
5044 }
5045
5046 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5047 match self {
5048 HydroNode::Placeholder => {
5049 panic!()
5050 }
5051 HydroNode::Cast { metadata, .. }
5052 | HydroNode::ObserveNonDet { metadata, .. }
5053 | HydroNode::AssertIsConsistent { metadata, .. }
5054 | HydroNode::UnboundSingleton { metadata, .. }
5055 | HydroNode::Source { metadata, .. }
5056 | HydroNode::SingletonSource { metadata, .. }
5057 | HydroNode::CycleSource { metadata, .. }
5058 | HydroNode::Tee { metadata, .. }
5059 | HydroNode::Singleton { metadata, .. }
5060 | HydroNode::Partition { metadata, .. }
5061 | HydroNode::YieldConcat { metadata, .. }
5062 | HydroNode::BeginAtomic { metadata, .. }
5063 | HydroNode::EndAtomic { metadata, .. }
5064 | HydroNode::Batch { metadata, .. }
5065 | HydroNode::Chain { metadata, .. }
5066 | HydroNode::MergeOrdered { metadata, .. }
5067 | HydroNode::ChainFirst { metadata, .. }
5068 | HydroNode::CrossProduct { metadata, .. }
5069 | HydroNode::CrossSingleton { metadata, .. }
5070 | HydroNode::Join { metadata, .. }
5071 | HydroNode::JoinHalf { metadata, .. }
5072 | HydroNode::Difference { metadata, .. }
5073 | HydroNode::AntiJoin { metadata, .. }
5074 | HydroNode::ResolveFutures { metadata, .. }
5075 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5076 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5077 | HydroNode::Map { metadata, .. }
5078 | HydroNode::FlatMap { metadata, .. }
5079 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5080 | HydroNode::Filter { metadata, .. }
5081 | HydroNode::FilterMap { metadata, .. }
5082 | HydroNode::DeferTick { metadata, .. }
5083 | HydroNode::Enumerate { metadata, .. }
5084 | HydroNode::Inspect { metadata, .. }
5085 | HydroNode::Unique { metadata, .. }
5086 | HydroNode::Sort { metadata, .. }
5087 | HydroNode::Scan { metadata, .. }
5088 | HydroNode::ScanAsyncBlocking { metadata, .. }
5089 | HydroNode::Fold { metadata, .. }
5090 | HydroNode::FoldKeyed { metadata, .. }
5091 | HydroNode::Reduce { metadata, .. }
5092 | HydroNode::ReduceKeyed { metadata, .. }
5093 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5094 | HydroNode::ExternalInput { metadata, .. }
5095 | HydroNode::Network { metadata, .. }
5096 | HydroNode::Counter { metadata, .. } => metadata,
5097 }
5098 }
5099
5100 pub fn input(&self) -> Vec<&HydroNode> {
5101 match self {
5102 HydroNode::Placeholder => {
5103 panic!()
5104 }
5105 HydroNode::Source { .. }
5106 | HydroNode::SingletonSource { .. }
5107 | HydroNode::ExternalInput { .. }
5108 | HydroNode::CycleSource { .. }
5109 | HydroNode::Tee { .. }
5110 | HydroNode::Singleton { .. }
5111 | HydroNode::Partition { .. } => {
5112 vec![]
5114 }
5115 HydroNode::Cast { inner, .. }
5116 | HydroNode::ObserveNonDet { inner, .. }
5117 | HydroNode::YieldConcat { inner, .. }
5118 | HydroNode::BeginAtomic { inner, .. }
5119 | HydroNode::EndAtomic { inner, .. }
5120 | HydroNode::Batch { inner, .. }
5121 | HydroNode::UnboundSingleton { inner, .. }
5122 | HydroNode::AssertIsConsistent { inner, .. } => {
5123 vec![inner]
5124 }
5125 HydroNode::Chain { first, second, .. } => {
5126 vec![first, second]
5127 }
5128 HydroNode::MergeOrdered { first, second, .. } => {
5129 vec![first, second]
5130 }
5131 HydroNode::ChainFirst { first, second, .. } => {
5132 vec![first, second]
5133 }
5134 HydroNode::CrossProduct { left, right, .. }
5135 | HydroNode::CrossSingleton { left, right, .. }
5136 | HydroNode::Join { left, right, .. }
5137 | HydroNode::JoinHalf { left, right, .. } => {
5138 vec![left, right]
5139 }
5140 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5141 vec![pos, neg]
5142 }
5143 HydroNode::Map { input, .. }
5144 | HydroNode::FlatMap { input, .. }
5145 | HydroNode::FlatMapStreamBlocking { input, .. }
5146 | HydroNode::Filter { input, .. }
5147 | HydroNode::FilterMap { input, .. }
5148 | HydroNode::Sort { input, .. }
5149 | HydroNode::DeferTick { input, .. }
5150 | HydroNode::Enumerate { input, .. }
5151 | HydroNode::Inspect { input, .. }
5152 | HydroNode::Unique { input, .. }
5153 | HydroNode::Network { input, .. }
5154 | HydroNode::Counter { input, .. }
5155 | HydroNode::ResolveFutures { input, .. }
5156 | HydroNode::ResolveFuturesBlocking { input, .. }
5157 | HydroNode::ResolveFuturesOrdered { input, .. }
5158 | HydroNode::Fold { input, .. }
5159 | HydroNode::FoldKeyed { input, .. }
5160 | HydroNode::Reduce { input, .. }
5161 | HydroNode::ReduceKeyed { input, .. }
5162 | HydroNode::Scan { input, .. }
5163 | HydroNode::ScanAsyncBlocking { input, .. } => {
5164 vec![input]
5165 }
5166 HydroNode::ReduceKeyedWatermark {
5167 input, watermark, ..
5168 } => {
5169 vec![input, watermark]
5170 }
5171 }
5172 }
5173
5174 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5175 self.input()
5176 .iter()
5177 .map(|input_node| input_node.metadata())
5178 .collect()
5179 }
5180
5181 pub fn is_shared_with_others(&self) -> bool {
5185 match self {
5186 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5187 Rc::strong_count(&inner.0) > 1
5188 }
5189 HydroNode::Singleton { .. } => false,
5192 _ => false,
5193 }
5194 }
5195
5196 pub fn print_root(&self) -> String {
5197 match self {
5198 HydroNode::Placeholder => {
5199 panic!()
5200 }
5201 HydroNode::Cast { .. } => "Cast()".to_owned(),
5202 HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5203 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5204 HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5205 HydroNode::Source { source, .. } => format!("Source({:?})", source),
5206 HydroNode::SingletonSource {
5207 value,
5208 first_tick_only,
5209 ..
5210 } => format!(
5211 "SingletonSource({:?}, first_tick_only={})",
5212 value, first_tick_only
5213 ),
5214 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5215 HydroNode::Tee { inner, .. } => {
5216 format!("Tee({})", inner.0.borrow().print_root())
5217 }
5218 HydroNode::Singleton { inner, .. } => {
5219 format!("Singleton({})", inner.0.borrow().print_root())
5220 }
5221 HydroNode::Partition { f, is_true, .. } => {
5222 format!("Partition({:?}, is_true={})", f, is_true)
5223 }
5224 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5225 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5226 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5227 HydroNode::Batch { .. } => "Batch()".to_owned(),
5228 HydroNode::Chain { first, second, .. } => {
5229 format!("Chain({}, {})", first.print_root(), second.print_root())
5230 }
5231 HydroNode::MergeOrdered { first, second, .. } => {
5232 format!(
5233 "MergeOrdered({}, {})",
5234 first.print_root(),
5235 second.print_root()
5236 )
5237 }
5238 HydroNode::ChainFirst { first, second, .. } => {
5239 format!(
5240 "ChainFirst({}, {})",
5241 first.print_root(),
5242 second.print_root()
5243 )
5244 }
5245 HydroNode::CrossProduct { left, right, .. } => {
5246 format!(
5247 "CrossProduct({}, {})",
5248 left.print_root(),
5249 right.print_root()
5250 )
5251 }
5252 HydroNode::CrossSingleton { left, right, .. } => {
5253 format!(
5254 "CrossSingleton({}, {})",
5255 left.print_root(),
5256 right.print_root()
5257 )
5258 }
5259 HydroNode::Join { left, right, .. } => {
5260 format!("Join({}, {})", left.print_root(), right.print_root())
5261 }
5262 HydroNode::JoinHalf { left, right, .. } => {
5263 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5264 }
5265 HydroNode::Difference { pos, neg, .. } => {
5266 format!("Difference({}, {})", pos.print_root(), neg.print_root())
5267 }
5268 HydroNode::AntiJoin { pos, neg, .. } => {
5269 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5270 }
5271 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5272 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5273 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5274 HydroNode::Map { f, .. } => format!("Map({:?})", f),
5275 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5276 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5277 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5278 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5279 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5280 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5281 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5282 HydroNode::Unique { .. } => "Unique()".to_owned(),
5283 HydroNode::Sort { .. } => "Sort()".to_owned(),
5284 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5285 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5286 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5287 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5288 }
5289 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5290 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5291 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5292 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5293 HydroNode::Network { .. } => "Network()".to_owned(),
5294 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5295 HydroNode::Counter { tag, duration, .. } => {
5296 format!("Counter({:?}, {:?})", tag, duration)
5297 }
5298 }
5299 }
5300}
5301
5302#[cfg(feature = "build")]
5303fn instantiate_network<'a, D>(
5304 env: &mut D::InstantiateEnv,
5305 from_location: &LocationId,
5306 to_location: &LocationId,
5307 processes: &SparseSecondaryMap<LocationKey, D::Process>,
5308 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5309 name: Option<&str>,
5310 networking_info: &crate::networking::NetworkingInfo,
5311) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5312where
5313 D: Deploy<'a>,
5314{
5315 let ((sink, source), connect_fn) = match (from_location, to_location) {
5316 (&LocationId::Process(from), &LocationId::Process(to)) => {
5317 let from_node = processes
5318 .get(from)
5319 .unwrap_or_else(|| {
5320 panic!("A process used in the graph was not instantiated: {}", from)
5321 })
5322 .clone();
5323 let to_node = processes
5324 .get(to)
5325 .unwrap_or_else(|| {
5326 panic!("A process used in the graph was not instantiated: {}", to)
5327 })
5328 .clone();
5329
5330 let sink_port = from_node.next_port();
5331 let source_port = to_node.next_port();
5332
5333 (
5334 D::o2o_sink_source(
5335 env,
5336 &from_node,
5337 &sink_port,
5338 &to_node,
5339 &source_port,
5340 name,
5341 networking_info,
5342 ),
5343 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5344 )
5345 }
5346 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5347 let from_node = processes
5348 .get(from)
5349 .unwrap_or_else(|| {
5350 panic!("A process used in the graph was not instantiated: {}", from)
5351 })
5352 .clone();
5353 let to_node = clusters
5354 .get(to)
5355 .unwrap_or_else(|| {
5356 panic!("A cluster used in the graph was not instantiated: {}", to)
5357 })
5358 .clone();
5359
5360 let sink_port = from_node.next_port();
5361 let source_port = to_node.next_port();
5362
5363 (
5364 D::o2m_sink_source(
5365 env,
5366 &from_node,
5367 &sink_port,
5368 &to_node,
5369 &source_port,
5370 name,
5371 networking_info,
5372 ),
5373 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5374 )
5375 }
5376 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5377 let from_node = clusters
5378 .get(from)
5379 .unwrap_or_else(|| {
5380 panic!("A cluster used in the graph was not instantiated: {}", from)
5381 })
5382 .clone();
5383 let to_node = processes
5384 .get(to)
5385 .unwrap_or_else(|| {
5386 panic!("A process used in the graph was not instantiated: {}", to)
5387 })
5388 .clone();
5389
5390 let sink_port = from_node.next_port();
5391 let source_port = to_node.next_port();
5392
5393 (
5394 D::m2o_sink_source(
5395 env,
5396 &from_node,
5397 &sink_port,
5398 &to_node,
5399 &source_port,
5400 name,
5401 networking_info,
5402 ),
5403 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5404 )
5405 }
5406 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5407 let from_node = clusters
5408 .get(from)
5409 .unwrap_or_else(|| {
5410 panic!("A cluster used in the graph was not instantiated: {}", from)
5411 })
5412 .clone();
5413 let to_node = clusters
5414 .get(to)
5415 .unwrap_or_else(|| {
5416 panic!("A cluster used in the graph was not instantiated: {}", to)
5417 })
5418 .clone();
5419
5420 let sink_port = from_node.next_port();
5421 let source_port = to_node.next_port();
5422
5423 (
5424 D::m2m_sink_source(
5425 env,
5426 &from_node,
5427 &sink_port,
5428 &to_node,
5429 &source_port,
5430 name,
5431 networking_info,
5432 ),
5433 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5434 )
5435 }
5436 (LocationId::Tick(_, _), _) => panic!(),
5437 (_, LocationId::Tick(_, _)) => panic!(),
5438 (LocationId::Atomic(_), _) => panic!(),
5439 (_, LocationId::Atomic(_)) => panic!(),
5440 };
5441 (sink, source, connect_fn)
5442}
5443
5444#[cfg(test)]
5445mod serde_test;
5446
5447#[cfg(test)]
5448mod test {
5449 use std::mem::size_of;
5450
5451 use stageleft::{QuotedWithContext, q};
5452
5453 use super::*;
5454
5455 #[test]
5456 #[cfg_attr(
5457 not(feature = "build"),
5458 ignore = "expects inclusion of feature-gated fields"
5459 )]
5460 fn hydro_node_size() {
5461 assert_eq!(size_of::<HydroNode>(), 264);
5462 }
5463
5464 #[test]
5465 #[cfg_attr(
5466 not(feature = "build"),
5467 ignore = "expects inclusion of feature-gated fields"
5468 )]
5469 fn hydro_root_size() {
5470 assert_eq!(size_of::<HydroRoot>(), 136);
5471 }
5472
5473 #[test]
5474 fn test_simplify_q_macro_basic() {
5475 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5477 let result = simplify_q_macro(simple_expr.clone());
5478 assert_eq!(result, simple_expr);
5479 }
5480
5481 #[test]
5482 fn test_simplify_q_macro_actual_stageleft_call() {
5483 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5485 let result = simplify_q_macro(stageleft_call);
5486 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5489 }
5490
5491 #[test]
5492 fn test_closure_no_pipe_at_start() {
5493 let stageleft_call = q!({
5495 let foo = 123;
5496 move |b: usize| b + foo
5497 })
5498 .splice_fn1_ctx(&());
5499 let result = simplify_q_macro(stageleft_call);
5500 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5501 }
5502}