Skip to main content

hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::Atomic;
25use crate::location::{Location, Tick, TopLevel, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{
28    ApplyMonotoneStream, ApplyOrderPreservingSingleton, Proved, SingletonMapFuncAlgebra,
29};
30
31/// A marker trait indicating which components of a [`Singleton`] may change.
32///
33/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
34/// includes an additional variant [`Monotonic`], which means that the value will only grow.
35pub trait SingletonBound {
36    /// The [`Boundedness`] that this [`Singleton`] would be erased to.
37    type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
38
39    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
40    type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
41
42    /// Returns the [`SingletonBoundKind`] corresponding to this type.
43    fn bound_kind() -> SingletonBoundKind;
44}
45
46impl SingletonBound for Unbounded {
47    type UnderlyingBound = Unbounded;
48
49    type StreamToMonotone = Monotonic;
50
51    fn bound_kind() -> SingletonBoundKind {
52        SingletonBoundKind::Unbounded
53    }
54}
55
56impl SingletonBound for Bounded {
57    type UnderlyingBound = Bounded;
58
59    type StreamToMonotone = Bounded;
60
61    fn bound_kind() -> SingletonBoundKind {
62        SingletonBoundKind::Bounded
63    }
64}
65
66/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
67pub struct Monotonic;
68
69impl SingletonBound for Monotonic {
70    type UnderlyingBound = Unbounded;
71
72    type StreamToMonotone = Monotonic;
73
74    fn bound_kind() -> SingletonBoundKind {
75        SingletonBoundKind::Monotonic
76    }
77}
78
79#[sealed]
80#[diagnostic::on_unimplemented(
81    message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
82    label = "required here",
83    note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
84)]
85/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
86pub trait IsMonotonic: SingletonBound {}
87
88#[sealed]
89#[diagnostic::do_not_recommend]
90impl IsMonotonic for Monotonic {}
91
92#[sealed]
93#[diagnostic::do_not_recommend]
94impl<B: IsBounded> IsMonotonic for B {}
95
96/// A single Rust value that can asynchronously change over time.
97///
98/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
99/// [`Unbounded`], the value will asynchronously change over time.
100///
101/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
102/// a single number that will asynchronously change as events are processed. Singletons also appear
103/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
104/// such as getting the length of a batch of requests.
105///
106/// Type Parameters:
107/// - `Type`: the type of the value in this singleton
108/// - `Loc`: the [`Location`] where the singleton is materialized
109/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
110pub struct Singleton<Type, Loc, Bound: SingletonBound> {
111    pub(crate) location: Loc,
112    pub(crate) ir_node: RefCell<HydroNode>,
113    pub(crate) flow_state: FlowState,
114
115    _phantom: PhantomData<(Type, Loc, Bound)>,
116}
117
118impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
119    fn drop(&mut self) {
120        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
121        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
122            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
123                input: Box::new(ir_node),
124                op_metadata: HydroIrOpMetadata::new(),
125            });
126        }
127    }
128}
129
130impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
131where
132    T: Clone,
133    L: Location<'a>,
134{
135    fn from(value: Singleton<T, L, Bounded>) -> Self {
136        let location = value.location().clone();
137        Singleton::new(
138            location.clone(),
139            HydroNode::UnboundSingleton {
140                inner: Box::new(value.ir_node.replace(HydroNode::Placeholder)),
141                metadata: location
142                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
143            },
144        )
145    }
146}
147
148impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
149where
150    L: Location<'a>,
151{
152    type Location = Tick<L>;
153
154    fn location(&self) -> &Self::Location {
155        self.location()
156    }
157
158    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
159        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
160            location.clone(),
161            HydroNode::DeferTick {
162                input: Box::new(HydroNode::CycleSource {
163                    cycle_id,
164                    metadata: location.new_node_metadata(Self::collection_kind()),
165                }),
166                metadata: location
167                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
168            },
169        );
170
171        from_previous_tick.unwrap_or(initial)
172    }
173}
174
175impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
176where
177    L: Location<'a>,
178{
179    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
180        assert_eq!(
181            Location::id(&self.location),
182            expected_location,
183            "locations do not match"
184        );
185        self.location
186            .flow_state()
187            .borrow_mut()
188            .push_root(HydroRoot::CycleSink {
189                cycle_id,
190                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
191                op_metadata: HydroIrOpMetadata::new(),
192            });
193    }
194}
195
196impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
197where
198    L: Location<'a>,
199{
200    type Location = L;
201
202    fn create_source(cycle_id: CycleId, location: L) -> Self {
203        Singleton::new(
204            location.clone(),
205            HydroNode::CycleSource {
206                cycle_id,
207                metadata: location.new_node_metadata(Self::collection_kind()),
208            },
209        )
210    }
211}
212
213impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
214where
215    L: Location<'a>,
216{
217    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
218        assert_eq!(
219            Location::id(&self.location),
220            expected_location,
221            "locations do not match"
222        );
223        self.location
224            .flow_state()
225            .borrow_mut()
226            .push_root(HydroRoot::CycleSink {
227                cycle_id,
228                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
229                op_metadata: HydroIrOpMetadata::new(),
230            });
231    }
232}
233
234impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
235where
236    T: Clone,
237    L: Location<'a>,
238{
239    fn clone(&self) -> Self {
240        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
241            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
242            *self.ir_node.borrow_mut() = HydroNode::Tee {
243                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
244                metadata: self.location.new_node_metadata(Self::collection_kind()),
245            };
246        }
247
248        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
249            Singleton {
250                location: self.location.clone(),
251                flow_state: self.flow_state.clone(),
252                ir_node: HydroNode::Tee {
253                    inner: SharedNode(inner.0.clone()),
254                    metadata: metadata.clone(),
255                }
256                .into(),
257                _phantom: PhantomData,
258            }
259        } else {
260            unreachable!()
261        }
262    }
263}
264
265#[cfg(stageleft_runtime)]
266fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
267    me: Singleton<T, Tick<L>, B>,
268    other: Optional<O, Tick<L>, B::UnderlyingBound>,
269) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
270    let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
271    super::optional::zip_inside_tick(me_as_optional, other)
272}
273
274impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
275where
276    L: Location<'a>,
277{
278    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
279        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
280        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
281        let flow_state = location.flow_state().clone();
282        Singleton {
283            location,
284            flow_state,
285            ir_node: RefCell::new(ir_node),
286            _phantom: PhantomData,
287        }
288    }
289
290    pub(crate) fn collection_kind() -> CollectionKind {
291        CollectionKind::Singleton {
292            bound: B::bound_kind(),
293            element_type: stageleft::quote_type::<T>().into(),
294        }
295    }
296
297    /// Returns the [`Location`] where this singleton is being materialized.
298    pub fn location(&self) -> &L {
299        &self.location
300    }
301
302    /// Creates a lightweight reference handle to this singleton that can be captured
303    /// inside `q!()` closures. The handle resolves to `&T` at runtime.
304    ///
305    /// The singleton must be bounded, otherwise reading it would be non-deterministic.
306    ///
307    /// ```rust
308    /// # #[cfg(feature = "deploy")] {
309    /// # use hydro_lang::prelude::*;
310    /// # use futures::StreamExt;
311    /// # tokio_test::block_on(async {
312    /// # let mut deployment = hydro_deploy::Deployment::new();
313    /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
314    /// # let process = builder.process::<()>();
315    /// # let external = builder.external::<()>();
316    /// let my_count = process
317    ///     .source_iter(q!(0..5i32))
318    ///     .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
319    /// let count_ref = my_count.by_ref();
320    /// let out_port = process
321    ///     .source_iter(q!(1..=3i32))
322    ///     .map(q!(|x| x + *count_ref))
323    ///     .send_bincode_external(&external);
324    /// # let nodes = builder
325    /// #     .with_default_optimize()
326    /// #     .with_process(&process, deployment.Localhost())
327    /// #     .with_external(&external, deployment.Localhost())
328    /// #     .deploy(&mut deployment);
329    /// # deployment.deploy().await.unwrap();
330    /// # let mut out_recv = nodes.connect(out_port).await;
331    /// # deployment.start().await.unwrap();
332    /// # let mut results = Vec::new();
333    /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
334    /// # results.sort();
335    /// // fold(0..5) = 10, so results are 11, 12, 13
336    /// # assert_eq!(results, vec![11, 12, 13]);
337    /// # });
338    /// # }
339    /// ```
340    pub fn by_ref(&self) -> crate::singleton_ref::SingletonRef<'a, '_, T, L>
341    where
342        B: IsBounded,
343    {
344        crate::singleton_ref::SingletonRef::new(&self.ir_node)
345    }
346
347    /// Returns a mutable reference handle to this singleton that can be captured inside `q!()`
348    /// closures. The handle resolves to `&mut T` at runtime.
349    ///
350    /// Mutable references are ordered via access groups in the generated DFIR code, ensuring
351    /// exclusive access at each point in the execution order.
352    ///
353    /// ```rust
354    /// # #[cfg(feature = "deploy")] {
355    /// # use hydro_lang::prelude::*;
356    /// # use futures::StreamExt;
357    /// # tokio_test::block_on(async {
358    /// # let mut deployment = hydro_deploy::Deployment::new();
359    /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
360    /// # let process = builder.process::<()>();
361    /// # let external = builder.external::<()>();
362    /// let my_count = process
363    ///     .source_iter(q!(0..5i32))
364    ///     .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
365    /// let count_mut = my_count.by_mut();
366    /// let out_port = process
367    ///     .source_iter(q!(1..=3i32))
368    ///     .map(q!(|x| {
369    ///         *count_mut += x;
370    ///         *count_mut
371    ///     }))
372    ///     .send_bincode_external(&external);
373    /// # let nodes = builder
374    /// #     .with_default_optimize()
375    /// #     .with_process(&process, deployment.Localhost())
376    /// #     .with_external(&external, deployment.Localhost())
377    /// #     .deploy(&mut deployment);
378    /// # deployment.deploy().await.unwrap();
379    /// # let mut out_recv = nodes.connect(out_port).await;
380    /// # deployment.start().await.unwrap();
381    /// # let mut results = Vec::new();
382    /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
383    /// # results.sort();
384    /// // fold(0..5) = 10, then each map adds x: results are 11, 13, 16
385    /// # assert_eq!(results, vec![11, 13, 16]);
386    /// # });
387    /// # }
388    /// ```
389    pub fn by_mut(&self) -> crate::singleton_ref::SingletonMut<'a, '_, T, L>
390    where
391        B: IsBounded,
392    {
393        crate::singleton_ref::SingletonMut::new(&self.ir_node)
394    }
395
396    /// Weakens the consistency of this live collection to not guarantee any consistency across
397    /// cluster members (if this collection is on a cluster).
398    pub fn weaken_consistency(self) -> Singleton<T, L::DropConsistency, B>
399    where
400        L: Location<'a>,
401    {
402        if L::consistency()
403            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
404        {
405            // already no consistency
406            Singleton::new(
407                self.location.drop_consistency(),
408                self.ir_node.replace(HydroNode::Placeholder),
409            )
410        } else {
411            Singleton::new(
412                self.location.drop_consistency(),
413                HydroNode::Cast {
414                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
415                    metadata:
416                        self.location
417                            .clone()
418                            .drop_consistency()
419                            .new_node_metadata(
420                                Singleton::<T, L::DropConsistency, B>::collection_kind(),
421                            ),
422                },
423            )
424        }
425    }
426
427    /// Casts this live collection to have the consistency guarantees specified in the given
428    /// location type parameter. The developer must ensure that the strengthened consistency
429    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
430    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
431        self,
432        _proof: impl crate::properties::ConsistencyProof,
433    ) -> Singleton<T, L2, B>
434    where
435        L: Location<'a>,
436    {
437        if L::consistency() == L2::consistency() {
438            Singleton::new(
439                self.location.with_consistency_of(),
440                self.ir_node.replace(HydroNode::Placeholder),
441            )
442        } else {
443            Singleton::new(
444                self.location.with_consistency_of(),
445                HydroNode::AssertIsConsistent {
446                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
447                    trusted: false,
448                    metadata: self
449                        .location
450                        .clone()
451                        .with_consistency_of::<L2>()
452                        .new_node_metadata(Singleton::<T, L2, B>::collection_kind()),
453                },
454            )
455        }
456    }
457
458    /// Drops the monotonicity property of the [`Singleton`].
459    pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
460        if B::bound_kind() == B::UnderlyingBound::bound_kind() {
461            Singleton::new(
462                self.location.clone(),
463                self.ir_node.replace(HydroNode::Placeholder),
464            )
465        } else {
466            Singleton::new(
467                self.location.clone(),
468                HydroNode::Cast {
469                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
470                    metadata:
471                        self.location.new_node_metadata(
472                            Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
473                        ),
474                },
475            )
476        }
477    }
478
479    /// Transforms the singleton value by applying a function `f` to it,
480    /// continuously as the input is updated.
481    ///
482    /// # Example
483    /// ```rust
484    /// # #[cfg(feature = "deploy")] {
485    /// # use hydro_lang::prelude::*;
486    /// # use futures::StreamExt;
487    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
488    /// let tick = process.tick();
489    /// let singleton = tick.singleton(q!(5));
490    /// singleton.map(q!(|v| v * 2)).all_ticks()
491    /// # }, |mut stream| async move {
492    /// // 10
493    /// # assert_eq!(stream.next().await.unwrap(), 10);
494    /// # }));
495    /// # }
496    /// ```
497    pub fn map<U, F, OP, B2: SingletonBound>(
498        self,
499        f: impl IntoQuotedMut<'a, F, L, SingletonMapFuncAlgebra<OP>>,
500    ) -> Singleton<U, L, B2>
501    where
502        F: Fn(T) -> U + 'a,
503        B: ApplyOrderPreservingSingleton<OP, B2>,
504    {
505        let (f, proof) = f.splice_fn1_ctx_props(&self.location);
506        proof.register_proof(&f);
507        let f = f.into();
508        Singleton::new(
509            self.location.clone(),
510            HydroNode::Map {
511                f,
512                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
513                metadata: self
514                    .location
515                    .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
516            },
517        )
518    }
519
520    /// Transforms the singleton value by applying a function `f` to it and then flattening
521    /// the result into a stream, preserving the order of elements.
522    ///
523    /// The function `f` is applied to the singleton value to produce an iterator, and all items
524    /// from that iterator are emitted in the output stream in deterministic order.
525    ///
526    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
527    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
528    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
529    ///
530    /// # Example
531    /// ```rust
532    /// # #[cfg(feature = "deploy")] {
533    /// # use hydro_lang::prelude::*;
534    /// # use futures::StreamExt;
535    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
536    /// let tick = process.tick();
537    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
538    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
539    /// # }, |mut stream| async move {
540    /// // 1, 2, 3
541    /// # for w in vec![1, 2, 3] {
542    /// #     assert_eq!(stream.next().await.unwrap(), w);
543    /// # }
544    /// # }));
545    /// # }
546    /// ```
547    pub fn flat_map_ordered<U, I, F>(
548        self,
549        f: impl IntoQuotedMut<'a, F, L>,
550    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
551    where
552        B: IsBounded,
553        I: IntoIterator<Item = U>,
554        F: Fn(T) -> I + 'a,
555    {
556        self.into_stream().flat_map_ordered(f)
557    }
558
559    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
560    /// for the output type `I` to produce items in any order.
561    ///
562    /// The function `f` is applied to the singleton value to produce an iterator, and all items
563    /// from that iterator are emitted in the output stream in non-deterministic order.
564    ///
565    /// # Example
566    /// ```rust
567    /// # #[cfg(feature = "deploy")] {
568    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
569    /// # use futures::StreamExt;
570    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
571    /// let tick = process.tick();
572    /// let singleton = tick.singleton(q!(
573    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
574    /// ));
575    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
576    /// # }, |mut stream| async move {
577    /// // 1, 2, 3, but in no particular order
578    /// # let mut results = Vec::new();
579    /// # for _ in 0..3 {
580    /// #     results.push(stream.next().await.unwrap());
581    /// # }
582    /// # results.sort();
583    /// # assert_eq!(results, vec![1, 2, 3]);
584    /// # }));
585    /// # }
586    /// ```
587    pub fn flat_map_unordered<U, I, F>(
588        self,
589        f: impl IntoQuotedMut<'a, F, L>,
590    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
591    where
592        B: IsBounded,
593        I: IntoIterator<Item = U>,
594        F: Fn(T) -> I + 'a,
595    {
596        self.into_stream().flat_map_unordered(f)
597    }
598
599    /// Flattens the singleton value into a stream, preserving the order of elements.
600    ///
601    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
602    /// are emitted in the output stream in deterministic order.
603    ///
604    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
605    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
606    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
607    ///
608    /// # Example
609    /// ```rust
610    /// # #[cfg(feature = "deploy")] {
611    /// # use hydro_lang::prelude::*;
612    /// # use futures::StreamExt;
613    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
614    /// let tick = process.tick();
615    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
616    /// singleton.flatten_ordered().all_ticks()
617    /// # }, |mut stream| async move {
618    /// // 1, 2, 3
619    /// # for w in vec![1, 2, 3] {
620    /// #     assert_eq!(stream.next().await.unwrap(), w);
621    /// # }
622    /// # }));
623    /// # }
624    /// ```
625    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
626    where
627        B: IsBounded,
628        T: IntoIterator<Item = U>,
629    {
630        self.flat_map_ordered(q!(|x| x))
631    }
632
633    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
634    /// for the element type `T` to produce items in any order.
635    ///
636    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
637    /// are emitted in the output stream in non-deterministic order.
638    ///
639    /// # Example
640    /// ```rust
641    /// # #[cfg(feature = "deploy")] {
642    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
643    /// # use futures::StreamExt;
644    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
645    /// let tick = process.tick();
646    /// let singleton = tick.singleton(q!(
647    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
648    /// ));
649    /// singleton.flatten_unordered().all_ticks()
650    /// # }, |mut stream| async move {
651    /// // 1, 2, 3, but in no particular order
652    /// # let mut results = Vec::new();
653    /// # for _ in 0..3 {
654    /// #     results.push(stream.next().await.unwrap());
655    /// # }
656    /// # results.sort();
657    /// # assert_eq!(results, vec![1, 2, 3]);
658    /// # }));
659    /// # }
660    /// ```
661    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
662    where
663        B: IsBounded,
664        T: IntoIterator<Item = U>,
665    {
666        self.flat_map_unordered(q!(|x| x))
667    }
668
669    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
670    ///
671    /// If the predicate returns `true`, the output optional contains the same value.
672    /// If the predicate returns `false`, the output optional is empty.
673    ///
674    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
675    /// not modify or take ownership of the value. If you need to modify the value while filtering
676    /// use [`Singleton::filter_map`] instead.
677    ///
678    /// # Example
679    /// ```rust
680    /// # #[cfg(feature = "deploy")] {
681    /// # use hydro_lang::prelude::*;
682    /// # use futures::StreamExt;
683    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
684    /// let tick = process.tick();
685    /// let singleton = tick.singleton(q!(5));
686    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
687    /// # }, |mut stream| async move {
688    /// // 5
689    /// # assert_eq!(stream.next().await.unwrap(), 5);
690    /// # }));
691    /// # }
692    /// ```
693    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
694    where
695        F: Fn(&T) -> bool + 'a,
696    {
697        let f = f.splice_fn1_borrow_ctx(&self.location).into();
698        Optional::new(
699            self.location.clone(),
700            HydroNode::Filter {
701                f,
702                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
703                metadata: self
704                    .location
705                    .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
706            },
707        )
708    }
709
710    /// An operator that both filters and maps. It yields the value only if the supplied
711    /// closure `f` returns `Some(value)`.
712    ///
713    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
714    /// If the closure returns `None`, the output optional is empty.
715    ///
716    /// # Example
717    /// ```rust
718    /// # #[cfg(feature = "deploy")] {
719    /// # use hydro_lang::prelude::*;
720    /// # use futures::StreamExt;
721    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
722    /// let tick = process.tick();
723    /// let singleton = tick.singleton(q!("42"));
724    /// singleton
725    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
726    ///     .all_ticks()
727    /// # }, |mut stream| async move {
728    /// // 42
729    /// # assert_eq!(stream.next().await.unwrap(), 42);
730    /// # }));
731    /// # }
732    /// ```
733    pub fn filter_map<U, F>(
734        self,
735        f: impl IntoQuotedMut<'a, F, L>,
736    ) -> Optional<U, L, B::UnderlyingBound>
737    where
738        F: Fn(T) -> Option<U> + 'a,
739    {
740        let f = f.splice_fn1_ctx(&self.location).into();
741        Optional::new(
742            self.location.clone(),
743            HydroNode::FilterMap {
744                f,
745                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
746                metadata: self
747                    .location
748                    .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
749            },
750        )
751    }
752
753    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
754    ///
755    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
756    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
757    /// non-null. This is useful for combining several pieces of state together.
758    ///
759    /// # Example
760    /// ```rust
761    /// # #[cfg(feature = "deploy")] {
762    /// # use hydro_lang::prelude::*;
763    /// # use futures::StreamExt;
764    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
765    /// let tick = process.tick();
766    /// let numbers = process
767    ///   .source_iter(q!(vec![123, 456]))
768    ///   .batch(&tick, nondet!(/** test */));
769    /// let count = numbers.clone().count(); // Singleton
770    /// let max = numbers.max(); // Optional
771    /// count.zip(max).all_ticks()
772    /// # }, |mut stream| async move {
773    /// // [(2, 456)]
774    /// # for w in vec![(2, 456)] {
775    /// #     assert_eq!(stream.next().await.unwrap(), w);
776    /// # }
777    /// # }));
778    /// # }
779    /// ```
780    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
781    where
782        Self: ZipResult<'a, O, Location = L>,
783        B: IsBounded,
784    {
785        check_matching_location(&self.location, &Self::other_location(&other));
786
787        if L::is_top_level()
788            && let Some(tick) = self.location.try_tick()
789        {
790            let self_location = self.location().clone();
791            let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
792            let out = zip_inside_tick(
793                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
794                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
795                    other_location.clone(),
796                    HydroNode::Cast {
797                        inner: Box::new(Self::other_ir_node(other)),
798                        metadata: other_location.new_node_metadata(Optional::<
799                            <Self as ZipResult<'a, O>>::OtherType,
800                            Tick<L>,
801                            Bounded,
802                        >::collection_kind(
803                        )),
804                    },
805                )
806                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
807            )
808            .latest();
809
810            Self::make(self_location, out.ir_node.replace(HydroNode::Placeholder))
811        } else {
812            Self::make(
813                self.location.clone(),
814                HydroNode::CrossSingleton {
815                    left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
816                    right: Box::new(Self::other_ir_node(other)),
817                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
818                        bound: B::BOUND_KIND,
819                        element_type: stageleft::quote_type::<
820                            <Self as ZipResult<'a, O>>::ElementType,
821                        >()
822                        .into(),
823                    }),
824                },
825            )
826        }
827    }
828
829    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
830    /// boolean signal is `true`, otherwise the output is null.
831    ///
832    /// # Example
833    /// ```rust
834    /// # #[cfg(feature = "deploy")] {
835    /// # use hydro_lang::prelude::*;
836    /// # use futures::StreamExt;
837    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
838    /// let tick = process.tick();
839    /// // ticks are lazy by default, forces the second tick to run
840    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
841    ///
842    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
843    /// let batch_first_tick = process
844    ///   .source_iter(q!(vec![1]))
845    ///   .batch(&tick, nondet!(/** test */));
846    /// let batch_second_tick = process
847    ///   .source_iter(q!(vec![1, 2, 3]))
848    ///   .batch(&tick, nondet!(/** test */))
849    ///   .defer_tick();
850    /// batch_first_tick.chain(batch_second_tick).count()
851    ///   .filter_if(signal)
852    ///   .all_ticks()
853    /// # }, |mut stream| async move {
854    /// // [1]
855    /// # for w in vec![1] {
856    /// #     assert_eq!(stream.next().await.unwrap(), w);
857    /// # }
858    /// # }));
859    /// # }
860    /// ```
861    pub fn filter_if(
862        self,
863        signal: Singleton<bool, L, B>,
864    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
865    where
866        B: IsBounded,
867    {
868        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
869    }
870
871    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
872    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
873    ///
874    /// Useful for conditionally processing, such as only emitting a singleton's value outside
875    /// a tick if some other condition is satisfied.
876    ///
877    /// # Example
878    /// ```rust
879    /// # #[cfg(feature = "deploy")] {
880    /// # use hydro_lang::prelude::*;
881    /// # use futures::StreamExt;
882    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
883    /// let tick = process.tick();
884    /// // ticks are lazy by default, forces the second tick to run
885    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
886    ///
887    /// let batch_first_tick = process
888    ///   .source_iter(q!(vec![1]))
889    ///   .batch(&tick, nondet!(/** test */));
890    /// let batch_second_tick = process
891    ///   .source_iter(q!(vec![1, 2, 3]))
892    ///   .batch(&tick, nondet!(/** test */))
893    ///   .defer_tick(); // appears on the second tick
894    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
895    /// batch_first_tick.chain(batch_second_tick).count()
896    ///   .filter_if_some(some_on_first_tick)
897    ///   .all_ticks()
898    /// # }, |mut stream| async move {
899    /// // [1]
900    /// # for w in vec![1] {
901    /// #     assert_eq!(stream.next().await.unwrap(), w);
902    /// # }
903    /// # }));
904    /// # }
905    /// ```
906    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
907    pub fn filter_if_some<U>(
908        self,
909        signal: Optional<U, L, B>,
910    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
911    where
912        B: IsBounded,
913    {
914        self.filter_if(signal.is_some())
915    }
916
917    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
918    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
919    ///
920    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
921    /// the condition.
922    ///
923    /// # Example
924    /// ```rust
925    /// # #[cfg(feature = "deploy")] {
926    /// # use hydro_lang::prelude::*;
927    /// # use futures::StreamExt;
928    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
929    /// let tick = process.tick();
930    /// // ticks are lazy by default, forces the second tick to run
931    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
932    ///
933    /// let batch_first_tick = process
934    ///   .source_iter(q!(vec![1]))
935    ///   .batch(&tick, nondet!(/** test */));
936    /// let batch_second_tick = process
937    ///   .source_iter(q!(vec![1, 2, 3]))
938    ///   .batch(&tick, nondet!(/** test */))
939    ///   .defer_tick(); // appears on the second tick
940    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
941    /// batch_first_tick.chain(batch_second_tick).count()
942    ///   .filter_if_none(some_on_first_tick)
943    ///   .all_ticks()
944    /// # }, |mut stream| async move {
945    /// // [3]
946    /// # for w in vec![3] {
947    /// #     assert_eq!(stream.next().await.unwrap(), w);
948    /// # }
949    /// # }));
950    /// # }
951    /// ```
952    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
953    pub fn filter_if_none<U>(
954        self,
955        other: Optional<U, L, B>,
956    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
957    where
958        B: IsBounded,
959    {
960        self.filter_if(other.is_none())
961    }
962
963    /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
964    ///
965    /// # Example
966    /// ```rust
967    /// # #[cfg(feature = "deploy")] {
968    /// # use hydro_lang::prelude::*;
969    /// # use futures::StreamExt;
970    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
971    /// let tick = process.tick();
972    /// let a = tick.singleton(q!(5));
973    /// let b = tick.singleton(q!(5));
974    /// a.equals(b).all_ticks()
975    /// # }, |mut stream| async move {
976    /// // [true]
977    /// # assert_eq!(stream.next().await.unwrap(), true);
978    /// # }));
979    /// # }
980    /// ```
981    pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
982    where
983        T: PartialEq,
984        B: IsBounded,
985    {
986        self.zip(other).map(q!(|(a, b)| a == b))
987    }
988
989    /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
990    /// greater than or equal to the provided threshold. The event will have the value of the
991    /// given threshold.
992    ///
993    /// This requires the incoming singleton to be monotonic, because otherwise the detection of
994    /// the threshold would be non-deterministic.
995    ///
996    /// # Example
997    /// ```rust
998    /// # #[cfg(feature = "deploy")] {
999    /// # use hydro_lang::prelude::*;
1000    /// # use futures::StreamExt;
1001    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1002    /// let a = // singleton 1 ~> 5 ~> 10
1003    /// # process.singleton(q!(5));
1004    /// let b = process.singleton(q!(4));
1005    /// a.threshold_greater_or_equal(b)
1006    /// # }, |mut stream| async move {
1007    /// // [4]
1008    /// # assert_eq!(stream.next().await.unwrap(), 4);
1009    /// # }));
1010    /// # }
1011    /// ```
1012    pub fn threshold_greater_or_equal<B2: IsBounded>(
1013        self,
1014        threshold: Singleton<T, L, B2>,
1015    ) -> Stream<T, L, B::UnderlyingBound>
1016    where
1017        T: Clone + PartialOrd,
1018        B: IsMonotonic,
1019    {
1020        let threshold = threshold.make_bounded();
1021        let self_location = self.location().clone();
1022        match self.try_make_bounded() {
1023            Ok(bounded) => {
1024                let uncasted = threshold
1025                    .zip(bounded)
1026                    .into_stream()
1027                    .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
1028
1029                Stream::new(
1030                    uncasted.location.clone(),
1031                    uncasted.ir_node.replace(HydroNode::Placeholder),
1032                )
1033            }
1034            Err(me) => {
1035                let uncasted = sliced! {
1036                    let me = use(me, nondet!(/** thresholds are deterministic */));
1037                    let mut remaining_threshold = use::state(|l| {
1038                        let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
1039                        as_option
1040                    });
1041
1042                    let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
1043                    remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
1044                    passed.map(q!(|(t, _)| t))
1045                };
1046
1047                Stream::new(
1048                    self_location,
1049                    uncasted.ir_node.replace(HydroNode::Placeholder),
1050                )
1051            }
1052        }
1053    }
1054
1055    /// An operator which allows you to "name" a `HydroNode`.
1056    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1057    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
1058        {
1059            let mut node = self.ir_node.borrow_mut();
1060            let metadata = node.metadata_mut();
1061            metadata.tag = Some(name.to_owned());
1062        }
1063        self
1064    }
1065}
1066
1067impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
1068    type Output = Singleton<bool, L, B::UnderlyingBound>;
1069
1070    fn not(self) -> Self::Output {
1071        self.map(q!(|b| !b))
1072    }
1073}
1074
1075impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
1076where
1077    L: Location<'a>,
1078{
1079    /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
1080    /// the inner `Option`.
1081    ///
1082    /// This is implemented as an identity [`Singleton::filter_map`], passing through the
1083    /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
1084    /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
1085    ///
1086    /// # Example
1087    /// ```rust
1088    /// # #[cfg(feature = "deploy")] {
1089    /// # use hydro_lang::prelude::*;
1090    /// # use futures::StreamExt;
1091    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1092    /// let tick = process.tick();
1093    /// let singleton = tick.singleton(q!(Some(42)));
1094    /// singleton.into_optional().all_ticks()
1095    /// # }, |mut stream| async move {
1096    /// // 42
1097    /// # assert_eq!(stream.next().await.unwrap(), 42);
1098    /// # }));
1099    /// # }
1100    /// ```
1101    pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
1102        self.filter_map(q!(|v| v))
1103    }
1104}
1105
1106impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
1107where
1108    L: Location<'a>,
1109{
1110    /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
1111    ///
1112    /// # Example
1113    /// ```rust
1114    /// # #[cfg(feature = "deploy")] {
1115    /// # use hydro_lang::prelude::*;
1116    /// # use futures::StreamExt;
1117    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1118    /// let tick = process.tick();
1119    /// // ticks are lazy by default, forces the second tick to run
1120    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1121    ///
1122    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1123    /// let b = tick.singleton(q!(true)); // true, true
1124    /// a.and(b).all_ticks()
1125    /// # }, |mut stream| async move {
1126    /// // [true, false]
1127    /// # for w in vec![true, false] {
1128    /// #     assert_eq!(stream.next().await.unwrap(), w);
1129    /// # }
1130    /// # }));
1131    /// # }
1132    /// ```
1133    pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1134    where
1135        B: IsBounded,
1136    {
1137        self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1138    }
1139
1140    /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1141    ///
1142    /// # Example
1143    /// ```rust
1144    /// # #[cfg(feature = "deploy")] {
1145    /// # use hydro_lang::prelude::*;
1146    /// # use futures::StreamExt;
1147    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1148    /// let tick = process.tick();
1149    /// // ticks are lazy by default, forces the second tick to run
1150    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1151    ///
1152    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1153    /// let b = tick.singleton(q!(false)); // false, false
1154    /// a.or(b).all_ticks()
1155    /// # }, |mut stream| async move {
1156    /// // [true, false]
1157    /// # for w in vec![true, false] {
1158    /// #     assert_eq!(stream.next().await.unwrap(), w);
1159    /// # }
1160    /// # }));
1161    /// # }
1162    /// ```
1163    pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1164    where
1165        B: IsBounded,
1166    {
1167        self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1168    }
1169}
1170
1171impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1172where
1173    L: Location<'a>,
1174{
1175    /// Returns a singleton value corresponding to the latest snapshot of the singleton
1176    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1177    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1178    /// all snapshots of this singleton into the atomic-associated tick will observe the
1179    /// same value each tick.
1180    ///
1181    /// # Non-Determinism
1182    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1183    /// the output singleton has a non-deterministic value since the snapshot can be at an
1184    /// arbitrary point in time.
1185    pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1186        self,
1187        tick: &Tick<L2>,
1188        _nondet: NonDet,
1189    ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1190        Singleton::new(
1191            tick.drop_consistency(),
1192            HydroNode::Batch {
1193                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1194                metadata: tick
1195                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1196            },
1197        )
1198    }
1199
1200    /// Returns this singleton back into a top-level, asynchronous execution context where updates
1201    /// to the value will be asynchronously propagated.
1202    pub fn end_atomic(self) -> Singleton<T, L, B> {
1203        Singleton::new(
1204            self.location.tick.l.clone(),
1205            HydroNode::EndAtomic {
1206                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1207                metadata: self
1208                    .location
1209                    .tick
1210                    .l
1211                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1212            },
1213        )
1214    }
1215}
1216
1217impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1218where
1219    L: Location<'a>,
1220{
1221    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1222    /// will observe the same version of the value and will be executed synchronously before any
1223    /// outputs are yielded (in [`Optional::end_atomic`]).
1224    ///
1225    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1226    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1227    /// a different version).
1228    pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1229        let id = self.location.flow_state().borrow_mut().next_clock_id();
1230        let out_location = Atomic {
1231            tick: Tick {
1232                id,
1233                l: self.location.clone(),
1234            },
1235        };
1236        Singleton::new(
1237            out_location.clone(),
1238            HydroNode::BeginAtomic {
1239                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1240                metadata: out_location
1241                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1242            },
1243        )
1244    }
1245
1246    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1247    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1248    /// relevant data that contributed to the snapshot at tick `t`.
1249    ///
1250    /// # Non-Determinism
1251    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1252    /// the output singleton has a non-deterministic value since the snapshot can be at an
1253    /// arbitrary point in time.
1254    pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1255        self,
1256        tick: &Tick<L2>,
1257        _nondet: NonDet,
1258    ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1259        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1260        Singleton::new(
1261            tick.drop_consistency(),
1262            HydroNode::Batch {
1263                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1264                metadata: tick
1265                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1266            },
1267        )
1268    }
1269
1270    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1271    /// with order corresponding to increasing prefixes of data contributing to the singleton.
1272    ///
1273    /// # Non-Determinism
1274    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1275    /// to non-deterministic batching and arrival of inputs, the output stream is
1276    /// non-deterministic.
1277    pub fn sample_eager(
1278        self,
1279        nondet: NonDet,
1280    ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1281        sliced! {
1282            let snapshot = use(self, nondet);
1283            snapshot.into_stream()
1284        }
1285        .weaken_retries()
1286    }
1287
1288    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1289    /// value taken at various points in time. Because the input singleton may be
1290    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1291    /// represent the value of the singleton given some prefix of the streams leading up to
1292    /// it.
1293    ///
1294    /// # Non-Determinism
1295    /// The output stream is non-deterministic in which elements are sampled, since this
1296    /// is controlled by a clock.
1297    pub fn sample_every(
1298        self,
1299        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1300        nondet: NonDet,
1301    ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce>
1302    where
1303        L: TopLevel<'a>,
1304    {
1305        let samples = self.location.source_interval(interval);
1306        sliced! {
1307            let snapshot = use(self, nondet);
1308            let sample_batch = use(samples, nondet);
1309
1310            snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1311        }
1312        .weaken_retries()
1313    }
1314
1315    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1316    /// implies that `B == Bounded`.
1317    pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1318    where
1319        B: IsBounded,
1320    {
1321        Singleton::new(
1322            self.location.clone(),
1323            self.ir_node.replace(HydroNode::Placeholder),
1324        )
1325    }
1326
1327    #[expect(clippy::result_large_err, reason = "internal use only")]
1328    fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1329        if B::UnderlyingBound::BOUNDED {
1330            Ok(Singleton::new(
1331                self.location.clone(),
1332                self.ir_node.replace(HydroNode::Placeholder),
1333            ))
1334        } else {
1335            Err(self)
1336        }
1337    }
1338
1339    /// Clones this bounded singleton into a tick, returning a singleton that has the
1340    /// same value as the outer singleton. Because the outer singleton is bounded, this
1341    /// is deterministic because there is only a single immutable version.
1342    pub fn clone_into_tick<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1343        self,
1344        tick: &Tick<L2>,
1345    ) -> Singleton<T, Tick<L2>, Bounded>
1346    where
1347        B: IsBounded,
1348        T: Clone,
1349    {
1350        // TODO(shadaj): avoid printing simulator logs for this snapshot
1351        let inner = self.snapshot(
1352            tick,
1353            nondet!(/** bounded top-level singleton so deterministic */),
1354        );
1355        Singleton::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
1356    }
1357
1358    /// Converts this singleton into a [`Stream`] containing a single element, the value.
1359    ///
1360    /// # Example
1361    /// ```rust
1362    /// # #[cfg(feature = "deploy")] {
1363    /// # use hydro_lang::prelude::*;
1364    /// # use futures::StreamExt;
1365    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1366    /// let tick = process.tick();
1367    /// let batch_input = process
1368    ///   .source_iter(q!(vec![123, 456]))
1369    ///   .batch(&tick, nondet!(/** test */));
1370    /// batch_input.clone().chain(
1371    ///   batch_input.count().into_stream()
1372    /// ).all_ticks()
1373    /// # }, |mut stream| async move {
1374    /// // [123, 456, 2]
1375    /// # for w in vec![123, 456, 2] {
1376    /// #     assert_eq!(stream.next().await.unwrap(), w);
1377    /// # }
1378    /// # }));
1379    /// # }
1380    /// ```
1381    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1382    where
1383        B: IsBounded,
1384    {
1385        Stream::new(
1386            self.location.clone(),
1387            HydroNode::Cast {
1388                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1389                metadata: self.location.new_node_metadata(Stream::<
1390                    T,
1391                    Tick<L>,
1392                    Bounded,
1393                    TotalOrder,
1394                    ExactlyOnce,
1395                >::collection_kind()),
1396            },
1397        )
1398    }
1399
1400    /// Resolves the singleton's [`Future`] value by blocking until it completes,
1401    /// producing a singleton of the resolved output.
1402    ///
1403    /// This is useful when the singleton contains an async computation that must
1404    /// be awaited before further processing. The future is polled to completion
1405    /// before the output value is emitted.
1406    ///
1407    /// # Example
1408    /// ```rust
1409    /// # #[cfg(feature = "deploy")] {
1410    /// # use hydro_lang::prelude::*;
1411    /// # use futures::StreamExt;
1412    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1413    /// let tick = process.tick();
1414    /// let singleton = tick.singleton(q!(5));
1415    /// singleton
1416    ///     .map(q!(|v| async move { v * 2 }))
1417    ///     .resolve_future_blocking()
1418    ///     .all_ticks()
1419    /// # }, |mut stream| async move {
1420    /// // 10
1421    /// # assert_eq!(stream.next().await.unwrap(), 10);
1422    /// # }));
1423    /// # }
1424    /// ```
1425    pub fn resolve_future_blocking(
1426        self,
1427    ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1428    where
1429        T: Future,
1430        B: IsBounded,
1431    {
1432        Singleton::new(
1433            self.location.clone(),
1434            HydroNode::ResolveFuturesBlocking {
1435                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1436                metadata: self
1437                    .location
1438                    .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1439            },
1440        )
1441    }
1442}
1443
1444impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1445where
1446    L: Location<'a>,
1447{
1448    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1449    /// which will stream the value computed in _each_ tick as a separate stream element.
1450    ///
1451    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1452    /// producing one element in the output for each tick. This is useful for batched computations,
1453    /// where the results from each tick must be combined together.
1454    ///
1455    /// # Example
1456    /// ```rust
1457    /// # #[cfg(feature = "deploy")] {
1458    /// # use hydro_lang::prelude::*;
1459    /// # use futures::StreamExt;
1460    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1461    /// let tick = process.tick();
1462    /// # // ticks are lazy by default, forces the second tick to run
1463    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1464    /// # let batch_first_tick = process
1465    /// #   .source_iter(q!(vec![1]))
1466    /// #   .batch(&tick, nondet!(/** test */));
1467    /// # let batch_second_tick = process
1468    /// #   .source_iter(q!(vec![1, 2, 3]))
1469    /// #   .batch(&tick, nondet!(/** test */))
1470    /// #   .defer_tick(); // appears on the second tick
1471    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1472    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1473    ///     .count()
1474    ///     .all_ticks()
1475    /// # }, |mut stream| async move {
1476    /// // [1, 3]
1477    /// # for w in vec![1, 3] {
1478    /// #     assert_eq!(stream.next().await.unwrap(), w);
1479    /// # }
1480    /// # }));
1481    /// # }
1482    /// ```
1483    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1484        self.into_stream().all_ticks()
1485    }
1486
1487    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1488    /// which will stream the value computed in _each_ tick as a separate stream element.
1489    ///
1490    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1491    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1492    /// singleton's [`Tick`] context.
1493    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1494        self.into_stream().all_ticks_atomic()
1495    }
1496
1497    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1498    /// be asynchronously updated with the latest value of the singleton inside the tick.
1499    ///
1500    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1501    /// tick that tracks the inner value. This is useful for getting the value as of the
1502    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1503    ///
1504    /// # Example
1505    /// ```rust
1506    /// # #[cfg(feature = "deploy")] {
1507    /// # use hydro_lang::prelude::*;
1508    /// # use futures::StreamExt;
1509    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1510    /// let tick = process.tick();
1511    /// # // ticks are lazy by default, forces the second tick to run
1512    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1513    /// # let batch_first_tick = process
1514    /// #   .source_iter(q!(vec![1]))
1515    /// #   .batch(&tick, nondet!(/** test */));
1516    /// # let batch_second_tick = process
1517    /// #   .source_iter(q!(vec![1, 2, 3]))
1518    /// #   .batch(&tick, nondet!(/** test */))
1519    /// #   .defer_tick(); // appears on the second tick
1520    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1521    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1522    ///     .count()
1523    ///     .latest()
1524    /// # .sample_eager(nondet!(/** test */))
1525    /// # }, |mut stream| async move {
1526    /// // asynchronously changes from 1 ~> 3
1527    /// # for w in vec![1, 3] {
1528    /// #     assert_eq!(stream.next().await.unwrap(), w);
1529    /// # }
1530    /// # }));
1531    /// # }
1532    /// ```
1533    pub fn latest(self) -> Singleton<T, L, Unbounded> {
1534        Singleton::new(
1535            self.location.outer().clone(),
1536            HydroNode::YieldConcat {
1537                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1538                metadata: self
1539                    .location
1540                    .outer()
1541                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1542            },
1543        )
1544    }
1545
1546    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1547    /// be updated with the latest value of the singleton inside the tick.
1548    ///
1549    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1550    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1551    /// singleton's [`Tick`] context.
1552    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1553        let out_location = Atomic {
1554            tick: self.location.clone(),
1555        };
1556        Singleton::new(
1557            out_location.clone(),
1558            HydroNode::YieldConcat {
1559                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1560                metadata: out_location
1561                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1562            },
1563        )
1564    }
1565}
1566
1567#[doc(hidden)]
1568/// Helper trait that determines the output collection type for [`Singleton::zip`].
1569///
1570/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1571/// [`Singleton`].
1572#[sealed::sealed]
1573pub trait ZipResult<'a, Other> {
1574    /// The output collection type.
1575    type Out;
1576    /// The type of the tupled output value.
1577    type ElementType;
1578    /// The type of the other collection's value.
1579    type OtherType;
1580    /// The location where the tupled result will be materialized.
1581    type Location: Location<'a>;
1582
1583    /// The location of the second input to the `zip`.
1584    fn other_location(other: &Other) -> Self::Location;
1585    /// The IR node of the second input to the `zip`.
1586    fn other_ir_node(other: Other) -> HydroNode;
1587
1588    /// Constructs the output live collection given an IR node containing the zip result.
1589    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1590}
1591
1592#[sealed::sealed]
1593impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1594where
1595    L: Location<'a>,
1596{
1597    type Out = Singleton<(T, U), L, B>;
1598    type ElementType = (T, U);
1599    type OtherType = U;
1600    type Location = L;
1601
1602    fn other_location(other: &Singleton<U, L, B>) -> L {
1603        other.location.clone()
1604    }
1605
1606    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1607        other.ir_node.replace(HydroNode::Placeholder)
1608    }
1609
1610    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1611        Singleton::new(
1612            location.clone(),
1613            HydroNode::Cast {
1614                inner: Box::new(ir_node),
1615                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1616            },
1617        )
1618    }
1619}
1620
1621#[sealed::sealed]
1622impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1623    for Singleton<T, L, B>
1624where
1625    L: Location<'a>,
1626{
1627    type Out = Optional<(T, U), L, B::UnderlyingBound>;
1628    type ElementType = (T, U);
1629    type OtherType = U;
1630    type Location = L;
1631
1632    fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1633        other.location.clone()
1634    }
1635
1636    fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1637        other.ir_node.replace(HydroNode::Placeholder)
1638    }
1639
1640    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1641        Optional::new(location, ir_node)
1642    }
1643}
1644
1645#[cfg(test)]
1646mod tests {
1647    #[cfg(feature = "deploy")]
1648    use futures::{SinkExt, StreamExt};
1649    #[cfg(feature = "deploy")]
1650    use hydro_deploy::Deployment;
1651    #[cfg(any(feature = "deploy", feature = "sim"))]
1652    use stageleft::q;
1653
1654    #[cfg(any(feature = "deploy", feature = "sim"))]
1655    use crate::compile::builder::FlowBuilder;
1656    #[cfg(feature = "deploy")]
1657    use crate::live_collections::stream::ExactlyOnce;
1658    #[cfg(any(feature = "deploy", feature = "sim"))]
1659    use crate::location::Location;
1660    #[cfg(any(feature = "deploy", feature = "sim"))]
1661    use crate::nondet::nondet;
1662
1663    #[cfg(feature = "deploy")]
1664    #[tokio::test]
1665    async fn tick_cycle_cardinality() {
1666        let mut deployment = Deployment::new();
1667
1668        let mut flow = FlowBuilder::new();
1669        let node = flow.process::<()>();
1670        let external = flow.external::<()>();
1671
1672        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1673
1674        let node_tick = node.tick();
1675        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1676        let counts = singleton
1677            .clone()
1678            .into_stream()
1679            .count()
1680            .filter_if(
1681                input
1682                    .batch(&node_tick, nondet!(/** testing */))
1683                    .first()
1684                    .is_some(),
1685            )
1686            .all_ticks()
1687            .send_bincode_external(&external);
1688        complete_cycle.complete_next_tick(singleton);
1689
1690        let nodes = flow
1691            .with_process(&node, deployment.Localhost())
1692            .with_external(&external, deployment.Localhost())
1693            .deploy(&mut deployment);
1694
1695        deployment.deploy().await.unwrap();
1696
1697        let mut tick_trigger = nodes.connect(input_send).await;
1698        let mut external_out = nodes.connect(counts).await;
1699
1700        deployment.start().await.unwrap();
1701
1702        tick_trigger.send(()).await.unwrap();
1703
1704        assert_eq!(external_out.next().await.unwrap(), 1);
1705
1706        tick_trigger.send(()).await.unwrap();
1707
1708        assert_eq!(external_out.next().await.unwrap(), 1);
1709    }
1710
1711    #[cfg(feature = "sim")]
1712    #[test]
1713    #[should_panic]
1714    fn sim_fold_intermediate_states() {
1715        let mut flow = FlowBuilder::new();
1716        let node = flow.process::<()>();
1717
1718        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1719        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1720
1721        let tick = node.tick();
1722        let batch = folded.snapshot(&tick, nondet!(/** test */));
1723        let out_recv = batch.all_ticks().sim_output();
1724
1725        flow.sim().exhaustive(async || {
1726            assert_eq!(out_recv.next().await.unwrap(), 10);
1727        });
1728    }
1729
1730    #[cfg(feature = "sim")]
1731    #[test]
1732    fn sim_fold_intermediate_state_count() {
1733        let mut flow = FlowBuilder::new();
1734        let node = flow.process::<()>();
1735
1736        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1737        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1738
1739        let tick = node.tick();
1740        let batch = folded.snapshot(&tick, nondet!(/** test */));
1741        let out_recv = batch.all_ticks().sim_output();
1742
1743        let instance_count = flow.sim().exhaustive(async || {
1744            let out = out_recv.collect::<Vec<_>>().await;
1745            assert_eq!(out.last(), Some(&10));
1746        });
1747
1748        assert_eq!(
1749            instance_count,
1750            16 // 2^4 possible subsets of intermediates (including initial state)
1751        )
1752    }
1753
1754    #[cfg(feature = "sim")]
1755    #[test]
1756    fn sim_fold_no_repeat_initial() {
1757        // check that we don't repeat the initial state of the fold in autonomous decisions
1758
1759        let mut flow = FlowBuilder::new();
1760        let node = flow.process::<()>();
1761
1762        let (in_port, input) = node.sim_input();
1763        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1764
1765        let tick = node.tick();
1766        let batch = folded.snapshot(&tick, nondet!(/** test */));
1767        let out_recv = batch.all_ticks().sim_output();
1768
1769        flow.sim().exhaustive(async || {
1770            assert_eq!(out_recv.next().await.unwrap(), 0);
1771
1772            in_port.send(123);
1773
1774            assert_eq!(out_recv.next().await.unwrap(), 123);
1775        });
1776    }
1777
1778    #[cfg(feature = "sim")]
1779    #[test]
1780    #[should_panic]
1781    fn sim_fold_repeats_snapshots() {
1782        // when the tick is driven by a snapshot AND something else, the snapshot can
1783        // "stutter" and repeat the same state multiple times
1784
1785        let mut flow = FlowBuilder::new();
1786        let node = flow.process::<()>();
1787
1788        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1789        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1790
1791        let tick = node.tick();
1792        let batch = source
1793            .batch(&tick, nondet!(/** test */))
1794            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1795        let out_recv = batch.all_ticks().sim_output();
1796
1797        flow.sim().exhaustive(async || {
1798            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1799            {
1800                panic!("repeated snapshot");
1801            }
1802        });
1803    }
1804
1805    #[cfg(feature = "sim")]
1806    #[test]
1807    fn sim_fold_repeats_snapshots_count() {
1808        // check the number of instances
1809        let mut flow = FlowBuilder::new();
1810        let node = flow.process::<()>();
1811
1812        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1813        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1814
1815        let tick = node.tick();
1816        let batch = source
1817            .batch(&tick, nondet!(/** test */))
1818            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1819        let out_recv = batch.all_ticks().sim_output();
1820
1821        let count = flow.sim().exhaustive(async || {
1822            let _ = out_recv.collect::<Vec<_>>().await;
1823        });
1824
1825        assert_eq!(count, 52);
1826        // don't have a combinatorial explanation for this number yet, but checked via logs
1827    }
1828
1829    #[cfg(feature = "sim")]
1830    #[test]
1831    fn sim_top_level_singleton_exhaustive() {
1832        // ensures that top-level singletons have only one snapshot
1833        let mut flow = FlowBuilder::new();
1834        let node = flow.process::<()>();
1835
1836        let singleton = node.singleton(q!(1));
1837        let tick = node.tick();
1838        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1839        let out_recv = batch.all_ticks().sim_output();
1840
1841        let count = flow.sim().exhaustive(async || {
1842            let _ = out_recv.collect::<Vec<_>>().await;
1843        });
1844
1845        assert_eq!(count, 1);
1846    }
1847
1848    #[cfg(feature = "sim")]
1849    #[test]
1850    fn sim_top_level_singleton_join_count() {
1851        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1852        // exploration
1853
1854        let mut flow = FlowBuilder::new();
1855        let node = flow.process::<()>();
1856
1857        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1858        let tick = node.tick();
1859        let batch = source_iter
1860            .batch(&tick, nondet!(/** test */))
1861            .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1862        let out_recv = batch.all_ticks().sim_output();
1863
1864        let instance_count = flow.sim().exhaustive(async || {
1865            let _ = out_recv.collect::<Vec<_>>().await;
1866        });
1867
1868        assert_eq!(
1869            instance_count,
1870            16 // 2^4 ways to split up (including a possibly empty first batch)
1871        )
1872    }
1873
1874    #[cfg(feature = "sim")]
1875    #[test]
1876    fn top_level_singleton_into_stream_no_replay() {
1877        let mut flow = FlowBuilder::new();
1878        let node = flow.process::<()>();
1879
1880        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1881        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1882
1883        let out_recv = folded.into_stream().sim_output();
1884
1885        flow.sim().exhaustive(async || {
1886            out_recv.assert_yields_only([10]).await;
1887        });
1888    }
1889
1890    #[cfg(feature = "sim")]
1891    #[test]
1892    fn inside_tick_singleton_zip() {
1893        use crate::live_collections::Stream;
1894        use crate::live_collections::sliced::sliced;
1895
1896        let mut flow = FlowBuilder::new();
1897        let node = flow.process::<()>();
1898
1899        let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1900        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1901
1902        let out_recv = sliced! {
1903            let v = use(folded, nondet!(/** test */));
1904            v.clone().zip(v).into_stream()
1905        }
1906        .sim_output();
1907
1908        let count = flow.sim().exhaustive(async || {
1909            let out = out_recv.collect::<Vec<_>>().await;
1910            assert_eq!(out.last(), Some(&(3, 3)));
1911        });
1912
1913        assert_eq!(count, 4);
1914    }
1915
1916    /// Reproducer for simulator hang when using cross_singleton on a top-level
1917    /// unbounded stream (not inside sliced!). The exhaustive simulator hangs
1918    /// after the first iteration.
1919    #[cfg(feature = "sim")]
1920    #[test]
1921    fn sim_cross_singleton_top_level_unbounded_hang() {
1922        let mut flow = FlowBuilder::new();
1923        let node = flow.process::<()>();
1924
1925        let (cmd_port, input) = node.sim_input::<String, _, _>();
1926
1927        let top_level_singleton = node.singleton(q!(123));
1928
1929        // cross_singleton on a top-level stream - bug trigger
1930        let crossed = input.cross_singleton(top_level_singleton);
1931
1932        // Output directly
1933        let resp_port = crossed.sim_output();
1934
1935        let count = flow.sim().exhaustive(async || {
1936            cmd_port.send("abc".to_owned());
1937
1938            let responses: Vec<_> = resp_port.collect().await;
1939            assert!(!responses.is_empty());
1940        });
1941
1942        assert_eq!(count, 1);
1943    }
1944
1945    #[cfg(feature = "sim")]
1946    #[test]
1947    fn sim_top_level_singleton_state_count() {
1948        let mut flow = FlowBuilder::new();
1949        let process = flow.process::<()>();
1950
1951        let (cmd_port, input) = process.sim_input();
1952        {
1953            // increases exhaustive inputs from 1 to 2 before we optimized `From`
1954            use super::Singleton;
1955            use crate::live_collections::boundedness::Unbounded;
1956            let _singleton: Singleton<_, _, Unbounded> = process.singleton(q!(false)).into();
1957        }
1958        let tick = process.tick();
1959        let batched_unbatched = input.batch(&tick, nondet!(/** */)).all_ticks();
1960        let resp_port = batched_unbatched.sim_output();
1961
1962        let count = flow.sim().exhaustive(async || {
1963            cmd_port.send(());
1964            let _responses: Vec<_> = resp_port.collect().await;
1965        });
1966
1967        assert_eq!(count, 1);
1968    }
1969}