hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::Atomic;
25use crate::location::{Location, Tick, TopLevel, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{
28 ApplyMonotoneStream, ApplyOrderPreservingSingleton, Proved, SingletonMapFuncAlgebra,
29};
30
31/// A marker trait indicating which components of a [`Singleton`] may change.
32///
33/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
34/// includes an additional variant [`Monotonic`], which means that the value will only grow.
35pub trait SingletonBound {
36 /// The [`Boundedness`] that this [`Singleton`] would be erased to.
37 type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
38
39 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
40 type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
41
42 /// Returns the [`SingletonBoundKind`] corresponding to this type.
43 fn bound_kind() -> SingletonBoundKind;
44}
45
46impl SingletonBound for Unbounded {
47 type UnderlyingBound = Unbounded;
48
49 type StreamToMonotone = Monotonic;
50
51 fn bound_kind() -> SingletonBoundKind {
52 SingletonBoundKind::Unbounded
53 }
54}
55
56impl SingletonBound for Bounded {
57 type UnderlyingBound = Bounded;
58
59 type StreamToMonotone = Bounded;
60
61 fn bound_kind() -> SingletonBoundKind {
62 SingletonBoundKind::Bounded
63 }
64}
65
66/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
67pub struct Monotonic;
68
69impl SingletonBound for Monotonic {
70 type UnderlyingBound = Unbounded;
71
72 type StreamToMonotone = Monotonic;
73
74 fn bound_kind() -> SingletonBoundKind {
75 SingletonBoundKind::Monotonic
76 }
77}
78
79#[sealed]
80#[diagnostic::on_unimplemented(
81 message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
82 label = "required here",
83 note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
84)]
85/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
86pub trait IsMonotonic: SingletonBound {}
87
88#[sealed]
89#[diagnostic::do_not_recommend]
90impl IsMonotonic for Monotonic {}
91
92#[sealed]
93#[diagnostic::do_not_recommend]
94impl<B: IsBounded> IsMonotonic for B {}
95
96/// A single Rust value that can asynchronously change over time.
97///
98/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
99/// [`Unbounded`], the value will asynchronously change over time.
100///
101/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
102/// a single number that will asynchronously change as events are processed. Singletons also appear
103/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
104/// such as getting the length of a batch of requests.
105///
106/// Type Parameters:
107/// - `Type`: the type of the value in this singleton
108/// - `Loc`: the [`Location`] where the singleton is materialized
109/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
110pub struct Singleton<Type, Loc, Bound: SingletonBound> {
111 pub(crate) location: Loc,
112 pub(crate) ir_node: RefCell<HydroNode>,
113 pub(crate) flow_state: FlowState,
114
115 _phantom: PhantomData<(Type, Loc, Bound)>,
116}
117
118impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
119 fn drop(&mut self) {
120 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
121 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
122 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
123 input: Box::new(ir_node),
124 op_metadata: HydroIrOpMetadata::new(),
125 });
126 }
127 }
128}
129
130impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
131where
132 T: Clone,
133 L: Location<'a>,
134{
135 fn from(value: Singleton<T, L, Bounded>) -> Self {
136 let location = value.location().clone();
137 Singleton::new(
138 location.clone(),
139 HydroNode::UnboundSingleton {
140 inner: Box::new(value.ir_node.replace(HydroNode::Placeholder)),
141 metadata: location
142 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
143 },
144 )
145 }
146}
147
148impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
149where
150 L: Location<'a>,
151{
152 type Location = Tick<L>;
153
154 fn location(&self) -> &Self::Location {
155 self.location()
156 }
157
158 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
159 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
160 location.clone(),
161 HydroNode::DeferTick {
162 input: Box::new(HydroNode::CycleSource {
163 cycle_id,
164 metadata: location.new_node_metadata(Self::collection_kind()),
165 }),
166 metadata: location
167 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
168 },
169 );
170
171 from_previous_tick.unwrap_or(initial)
172 }
173}
174
175impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
176where
177 L: Location<'a>,
178{
179 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
180 assert_eq!(
181 Location::id(&self.location),
182 expected_location,
183 "locations do not match"
184 );
185 self.location
186 .flow_state()
187 .borrow_mut()
188 .push_root(HydroRoot::CycleSink {
189 cycle_id,
190 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
191 op_metadata: HydroIrOpMetadata::new(),
192 });
193 }
194}
195
196impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
197where
198 L: Location<'a>,
199{
200 type Location = L;
201
202 fn create_source(cycle_id: CycleId, location: L) -> Self {
203 Singleton::new(
204 location.clone(),
205 HydroNode::CycleSource {
206 cycle_id,
207 metadata: location.new_node_metadata(Self::collection_kind()),
208 },
209 )
210 }
211}
212
213impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
214where
215 L: Location<'a>,
216{
217 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
218 assert_eq!(
219 Location::id(&self.location),
220 expected_location,
221 "locations do not match"
222 );
223 self.location
224 .flow_state()
225 .borrow_mut()
226 .push_root(HydroRoot::CycleSink {
227 cycle_id,
228 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
229 op_metadata: HydroIrOpMetadata::new(),
230 });
231 }
232}
233
234impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
235where
236 T: Clone,
237 L: Location<'a>,
238{
239 fn clone(&self) -> Self {
240 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
241 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
242 *self.ir_node.borrow_mut() = HydroNode::Tee {
243 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
244 metadata: self.location.new_node_metadata(Self::collection_kind()),
245 };
246 }
247
248 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
249 Singleton {
250 location: self.location.clone(),
251 flow_state: self.flow_state.clone(),
252 ir_node: HydroNode::Tee {
253 inner: SharedNode(inner.0.clone()),
254 metadata: metadata.clone(),
255 }
256 .into(),
257 _phantom: PhantomData,
258 }
259 } else {
260 unreachable!()
261 }
262 }
263}
264
265#[cfg(stageleft_runtime)]
266fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
267 me: Singleton<T, Tick<L>, B>,
268 other: Optional<O, Tick<L>, B::UnderlyingBound>,
269) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
270 let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
271 super::optional::zip_inside_tick(me_as_optional, other)
272}
273
274impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
275where
276 L: Location<'a>,
277{
278 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
279 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
280 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
281 let flow_state = location.flow_state().clone();
282 Singleton {
283 location,
284 flow_state,
285 ir_node: RefCell::new(ir_node),
286 _phantom: PhantomData,
287 }
288 }
289
290 pub(crate) fn collection_kind() -> CollectionKind {
291 CollectionKind::Singleton {
292 bound: B::bound_kind(),
293 element_type: stageleft::quote_type::<T>().into(),
294 }
295 }
296
297 /// Returns the [`Location`] where this singleton is being materialized.
298 pub fn location(&self) -> &L {
299 &self.location
300 }
301
302 /// Creates a lightweight reference handle to this singleton that can be captured
303 /// inside `q!()` closures. The handle resolves to `&T` at runtime.
304 ///
305 /// The singleton must be bounded, otherwise reading it would be non-deterministic.
306 ///
307 /// ```rust
308 /// # #[cfg(feature = "deploy")] {
309 /// # use hydro_lang::prelude::*;
310 /// # use futures::StreamExt;
311 /// # tokio_test::block_on(async {
312 /// # let mut deployment = hydro_deploy::Deployment::new();
313 /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
314 /// # let process = builder.process::<()>();
315 /// # let external = builder.external::<()>();
316 /// let my_count = process
317 /// .source_iter(q!(0..5i32))
318 /// .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
319 /// let count_ref = my_count.by_ref();
320 /// let out_port = process
321 /// .source_iter(q!(1..=3i32))
322 /// .map(q!(|x| x + *count_ref))
323 /// .send_bincode_external(&external);
324 /// # let nodes = builder
325 /// # .with_default_optimize()
326 /// # .with_process(&process, deployment.Localhost())
327 /// # .with_external(&external, deployment.Localhost())
328 /// # .deploy(&mut deployment);
329 /// # deployment.deploy().await.unwrap();
330 /// # let mut out_recv = nodes.connect(out_port).await;
331 /// # deployment.start().await.unwrap();
332 /// # let mut results = Vec::new();
333 /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
334 /// # results.sort();
335 /// // fold(0..5) = 10, so results are 11, 12, 13
336 /// # assert_eq!(results, vec![11, 12, 13]);
337 /// # });
338 /// # }
339 /// ```
340 pub fn by_ref(&self) -> crate::singleton_ref::SingletonRef<'a, '_, T, L>
341 where
342 B: IsBounded,
343 {
344 crate::singleton_ref::SingletonRef::new(&self.ir_node)
345 }
346
347 /// Returns a mutable reference handle to this singleton that can be captured inside `q!()`
348 /// closures. The handle resolves to `&mut T` at runtime.
349 ///
350 /// Mutable references are ordered via access groups in the generated DFIR code, ensuring
351 /// exclusive access at each point in the execution order.
352 ///
353 /// ```rust
354 /// # #[cfg(feature = "deploy")] {
355 /// # use hydro_lang::prelude::*;
356 /// # use futures::StreamExt;
357 /// # tokio_test::block_on(async {
358 /// # let mut deployment = hydro_deploy::Deployment::new();
359 /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
360 /// # let process = builder.process::<()>();
361 /// # let external = builder.external::<()>();
362 /// let my_count = process
363 /// .source_iter(q!(0..5i32))
364 /// .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
365 /// let count_mut = my_count.by_mut();
366 /// let out_port = process
367 /// .source_iter(q!(1..=3i32))
368 /// .map(q!(|x| {
369 /// *count_mut += x;
370 /// *count_mut
371 /// }))
372 /// .send_bincode_external(&external);
373 /// # let nodes = builder
374 /// # .with_default_optimize()
375 /// # .with_process(&process, deployment.Localhost())
376 /// # .with_external(&external, deployment.Localhost())
377 /// # .deploy(&mut deployment);
378 /// # deployment.deploy().await.unwrap();
379 /// # let mut out_recv = nodes.connect(out_port).await;
380 /// # deployment.start().await.unwrap();
381 /// # let mut results = Vec::new();
382 /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
383 /// # results.sort();
384 /// // fold(0..5) = 10, then each map adds x: results are 11, 13, 16
385 /// # assert_eq!(results, vec![11, 13, 16]);
386 /// # });
387 /// # }
388 /// ```
389 pub fn by_mut(&self) -> crate::singleton_ref::SingletonMut<'a, '_, T, L>
390 where
391 B: IsBounded,
392 {
393 crate::singleton_ref::SingletonMut::new(&self.ir_node)
394 }
395
396 /// Weakens the consistency of this live collection to not guarantee any consistency across
397 /// cluster members (if this collection is on a cluster).
398 pub fn weaken_consistency(self) -> Singleton<T, L::DropConsistency, B>
399 where
400 L: Location<'a>,
401 {
402 if L::consistency()
403 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
404 {
405 // already no consistency
406 Singleton::new(
407 self.location.drop_consistency(),
408 self.ir_node.replace(HydroNode::Placeholder),
409 )
410 } else {
411 Singleton::new(
412 self.location.drop_consistency(),
413 HydroNode::Cast {
414 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
415 metadata:
416 self.location
417 .clone()
418 .drop_consistency()
419 .new_node_metadata(
420 Singleton::<T, L::DropConsistency, B>::collection_kind(),
421 ),
422 },
423 )
424 }
425 }
426
427 /// Casts this live collection to have the consistency guarantees specified in the given
428 /// location type parameter. The developer must ensure that the strengthened consistency
429 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
430 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
431 self,
432 _proof: impl crate::properties::ConsistencyProof,
433 ) -> Singleton<T, L2, B>
434 where
435 L: Location<'a>,
436 {
437 if L::consistency() == L2::consistency() {
438 Singleton::new(
439 self.location.with_consistency_of(),
440 self.ir_node.replace(HydroNode::Placeholder),
441 )
442 } else {
443 Singleton::new(
444 self.location.with_consistency_of(),
445 HydroNode::AssertIsConsistent {
446 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
447 trusted: false,
448 metadata: self
449 .location
450 .clone()
451 .with_consistency_of::<L2>()
452 .new_node_metadata(Singleton::<T, L2, B>::collection_kind()),
453 },
454 )
455 }
456 }
457
458 /// Drops the monotonicity property of the [`Singleton`].
459 pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
460 if B::bound_kind() == B::UnderlyingBound::bound_kind() {
461 Singleton::new(
462 self.location.clone(),
463 self.ir_node.replace(HydroNode::Placeholder),
464 )
465 } else {
466 Singleton::new(
467 self.location.clone(),
468 HydroNode::Cast {
469 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
470 metadata:
471 self.location.new_node_metadata(
472 Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
473 ),
474 },
475 )
476 }
477 }
478
479 /// Transforms the singleton value by applying a function `f` to it,
480 /// continuously as the input is updated.
481 ///
482 /// # Example
483 /// ```rust
484 /// # #[cfg(feature = "deploy")] {
485 /// # use hydro_lang::prelude::*;
486 /// # use futures::StreamExt;
487 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
488 /// let tick = process.tick();
489 /// let singleton = tick.singleton(q!(5));
490 /// singleton.map(q!(|v| v * 2)).all_ticks()
491 /// # }, |mut stream| async move {
492 /// // 10
493 /// # assert_eq!(stream.next().await.unwrap(), 10);
494 /// # }));
495 /// # }
496 /// ```
497 pub fn map<U, F, OP, B2: SingletonBound>(
498 self,
499 f: impl IntoQuotedMut<'a, F, L, SingletonMapFuncAlgebra<OP>>,
500 ) -> Singleton<U, L, B2>
501 where
502 F: Fn(T) -> U + 'a,
503 B: ApplyOrderPreservingSingleton<OP, B2>,
504 {
505 let (f, proof) = f.splice_fn1_ctx_props(&self.location);
506 proof.register_proof(&f);
507 let f = f.into();
508 Singleton::new(
509 self.location.clone(),
510 HydroNode::Map {
511 f,
512 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
513 metadata: self
514 .location
515 .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
516 },
517 )
518 }
519
520 /// Transforms the singleton value by applying a function `f` to it and then flattening
521 /// the result into a stream, preserving the order of elements.
522 ///
523 /// The function `f` is applied to the singleton value to produce an iterator, and all items
524 /// from that iterator are emitted in the output stream in deterministic order.
525 ///
526 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
527 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
528 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
529 ///
530 /// # Example
531 /// ```rust
532 /// # #[cfg(feature = "deploy")] {
533 /// # use hydro_lang::prelude::*;
534 /// # use futures::StreamExt;
535 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
536 /// let tick = process.tick();
537 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
538 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
539 /// # }, |mut stream| async move {
540 /// // 1, 2, 3
541 /// # for w in vec![1, 2, 3] {
542 /// # assert_eq!(stream.next().await.unwrap(), w);
543 /// # }
544 /// # }));
545 /// # }
546 /// ```
547 pub fn flat_map_ordered<U, I, F>(
548 self,
549 f: impl IntoQuotedMut<'a, F, L>,
550 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
551 where
552 B: IsBounded,
553 I: IntoIterator<Item = U>,
554 F: Fn(T) -> I + 'a,
555 {
556 self.into_stream().flat_map_ordered(f)
557 }
558
559 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
560 /// for the output type `I` to produce items in any order.
561 ///
562 /// The function `f` is applied to the singleton value to produce an iterator, and all items
563 /// from that iterator are emitted in the output stream in non-deterministic order.
564 ///
565 /// # Example
566 /// ```rust
567 /// # #[cfg(feature = "deploy")] {
568 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
569 /// # use futures::StreamExt;
570 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
571 /// let tick = process.tick();
572 /// let singleton = tick.singleton(q!(
573 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
574 /// ));
575 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
576 /// # }, |mut stream| async move {
577 /// // 1, 2, 3, but in no particular order
578 /// # let mut results = Vec::new();
579 /// # for _ in 0..3 {
580 /// # results.push(stream.next().await.unwrap());
581 /// # }
582 /// # results.sort();
583 /// # assert_eq!(results, vec![1, 2, 3]);
584 /// # }));
585 /// # }
586 /// ```
587 pub fn flat_map_unordered<U, I, F>(
588 self,
589 f: impl IntoQuotedMut<'a, F, L>,
590 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
591 where
592 B: IsBounded,
593 I: IntoIterator<Item = U>,
594 F: Fn(T) -> I + 'a,
595 {
596 self.into_stream().flat_map_unordered(f)
597 }
598
599 /// Flattens the singleton value into a stream, preserving the order of elements.
600 ///
601 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
602 /// are emitted in the output stream in deterministic order.
603 ///
604 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
605 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
606 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
607 ///
608 /// # Example
609 /// ```rust
610 /// # #[cfg(feature = "deploy")] {
611 /// # use hydro_lang::prelude::*;
612 /// # use futures::StreamExt;
613 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
614 /// let tick = process.tick();
615 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
616 /// singleton.flatten_ordered().all_ticks()
617 /// # }, |mut stream| async move {
618 /// // 1, 2, 3
619 /// # for w in vec![1, 2, 3] {
620 /// # assert_eq!(stream.next().await.unwrap(), w);
621 /// # }
622 /// # }));
623 /// # }
624 /// ```
625 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
626 where
627 B: IsBounded,
628 T: IntoIterator<Item = U>,
629 {
630 self.flat_map_ordered(q!(|x| x))
631 }
632
633 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
634 /// for the element type `T` to produce items in any order.
635 ///
636 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
637 /// are emitted in the output stream in non-deterministic order.
638 ///
639 /// # Example
640 /// ```rust
641 /// # #[cfg(feature = "deploy")] {
642 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
643 /// # use futures::StreamExt;
644 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
645 /// let tick = process.tick();
646 /// let singleton = tick.singleton(q!(
647 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
648 /// ));
649 /// singleton.flatten_unordered().all_ticks()
650 /// # }, |mut stream| async move {
651 /// // 1, 2, 3, but in no particular order
652 /// # let mut results = Vec::new();
653 /// # for _ in 0..3 {
654 /// # results.push(stream.next().await.unwrap());
655 /// # }
656 /// # results.sort();
657 /// # assert_eq!(results, vec![1, 2, 3]);
658 /// # }));
659 /// # }
660 /// ```
661 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
662 where
663 B: IsBounded,
664 T: IntoIterator<Item = U>,
665 {
666 self.flat_map_unordered(q!(|x| x))
667 }
668
669 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
670 ///
671 /// If the predicate returns `true`, the output optional contains the same value.
672 /// If the predicate returns `false`, the output optional is empty.
673 ///
674 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
675 /// not modify or take ownership of the value. If you need to modify the value while filtering
676 /// use [`Singleton::filter_map`] instead.
677 ///
678 /// # Example
679 /// ```rust
680 /// # #[cfg(feature = "deploy")] {
681 /// # use hydro_lang::prelude::*;
682 /// # use futures::StreamExt;
683 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
684 /// let tick = process.tick();
685 /// let singleton = tick.singleton(q!(5));
686 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
687 /// # }, |mut stream| async move {
688 /// // 5
689 /// # assert_eq!(stream.next().await.unwrap(), 5);
690 /// # }));
691 /// # }
692 /// ```
693 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
694 where
695 F: Fn(&T) -> bool + 'a,
696 {
697 let f = f.splice_fn1_borrow_ctx(&self.location).into();
698 Optional::new(
699 self.location.clone(),
700 HydroNode::Filter {
701 f,
702 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
703 metadata: self
704 .location
705 .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
706 },
707 )
708 }
709
710 /// An operator that both filters and maps. It yields the value only if the supplied
711 /// closure `f` returns `Some(value)`.
712 ///
713 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
714 /// If the closure returns `None`, the output optional is empty.
715 ///
716 /// # Example
717 /// ```rust
718 /// # #[cfg(feature = "deploy")] {
719 /// # use hydro_lang::prelude::*;
720 /// # use futures::StreamExt;
721 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
722 /// let tick = process.tick();
723 /// let singleton = tick.singleton(q!("42"));
724 /// singleton
725 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
726 /// .all_ticks()
727 /// # }, |mut stream| async move {
728 /// // 42
729 /// # assert_eq!(stream.next().await.unwrap(), 42);
730 /// # }));
731 /// # }
732 /// ```
733 pub fn filter_map<U, F>(
734 self,
735 f: impl IntoQuotedMut<'a, F, L>,
736 ) -> Optional<U, L, B::UnderlyingBound>
737 where
738 F: Fn(T) -> Option<U> + 'a,
739 {
740 let f = f.splice_fn1_ctx(&self.location).into();
741 Optional::new(
742 self.location.clone(),
743 HydroNode::FilterMap {
744 f,
745 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
746 metadata: self
747 .location
748 .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
749 },
750 )
751 }
752
753 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
754 ///
755 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
756 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
757 /// non-null. This is useful for combining several pieces of state together.
758 ///
759 /// # Example
760 /// ```rust
761 /// # #[cfg(feature = "deploy")] {
762 /// # use hydro_lang::prelude::*;
763 /// # use futures::StreamExt;
764 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
765 /// let tick = process.tick();
766 /// let numbers = process
767 /// .source_iter(q!(vec![123, 456]))
768 /// .batch(&tick, nondet!(/** test */));
769 /// let count = numbers.clone().count(); // Singleton
770 /// let max = numbers.max(); // Optional
771 /// count.zip(max).all_ticks()
772 /// # }, |mut stream| async move {
773 /// // [(2, 456)]
774 /// # for w in vec![(2, 456)] {
775 /// # assert_eq!(stream.next().await.unwrap(), w);
776 /// # }
777 /// # }));
778 /// # }
779 /// ```
780 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
781 where
782 Self: ZipResult<'a, O, Location = L>,
783 B: IsBounded,
784 {
785 check_matching_location(&self.location, &Self::other_location(&other));
786
787 if L::is_top_level()
788 && let Some(tick) = self.location.try_tick()
789 {
790 let self_location = self.location().clone();
791 let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
792 let out = zip_inside_tick(
793 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
794 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
795 other_location.clone(),
796 HydroNode::Cast {
797 inner: Box::new(Self::other_ir_node(other)),
798 metadata: other_location.new_node_metadata(Optional::<
799 <Self as ZipResult<'a, O>>::OtherType,
800 Tick<L>,
801 Bounded,
802 >::collection_kind(
803 )),
804 },
805 )
806 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
807 )
808 .latest();
809
810 Self::make(self_location, out.ir_node.replace(HydroNode::Placeholder))
811 } else {
812 Self::make(
813 self.location.clone(),
814 HydroNode::CrossSingleton {
815 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
816 right: Box::new(Self::other_ir_node(other)),
817 metadata: self.location.new_node_metadata(CollectionKind::Optional {
818 bound: B::BOUND_KIND,
819 element_type: stageleft::quote_type::<
820 <Self as ZipResult<'a, O>>::ElementType,
821 >()
822 .into(),
823 }),
824 },
825 )
826 }
827 }
828
829 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
830 /// boolean signal is `true`, otherwise the output is null.
831 ///
832 /// # Example
833 /// ```rust
834 /// # #[cfg(feature = "deploy")] {
835 /// # use hydro_lang::prelude::*;
836 /// # use futures::StreamExt;
837 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
838 /// let tick = process.tick();
839 /// // ticks are lazy by default, forces the second tick to run
840 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
841 ///
842 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
843 /// let batch_first_tick = process
844 /// .source_iter(q!(vec![1]))
845 /// .batch(&tick, nondet!(/** test */));
846 /// let batch_second_tick = process
847 /// .source_iter(q!(vec![1, 2, 3]))
848 /// .batch(&tick, nondet!(/** test */))
849 /// .defer_tick();
850 /// batch_first_tick.chain(batch_second_tick).count()
851 /// .filter_if(signal)
852 /// .all_ticks()
853 /// # }, |mut stream| async move {
854 /// // [1]
855 /// # for w in vec![1] {
856 /// # assert_eq!(stream.next().await.unwrap(), w);
857 /// # }
858 /// # }));
859 /// # }
860 /// ```
861 pub fn filter_if(
862 self,
863 signal: Singleton<bool, L, B>,
864 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
865 where
866 B: IsBounded,
867 {
868 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
869 }
870
871 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
872 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
873 ///
874 /// Useful for conditionally processing, such as only emitting a singleton's value outside
875 /// a tick if some other condition is satisfied.
876 ///
877 /// # Example
878 /// ```rust
879 /// # #[cfg(feature = "deploy")] {
880 /// # use hydro_lang::prelude::*;
881 /// # use futures::StreamExt;
882 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
883 /// let tick = process.tick();
884 /// // ticks are lazy by default, forces the second tick to run
885 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
886 ///
887 /// let batch_first_tick = process
888 /// .source_iter(q!(vec![1]))
889 /// .batch(&tick, nondet!(/** test */));
890 /// let batch_second_tick = process
891 /// .source_iter(q!(vec![1, 2, 3]))
892 /// .batch(&tick, nondet!(/** test */))
893 /// .defer_tick(); // appears on the second tick
894 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
895 /// batch_first_tick.chain(batch_second_tick).count()
896 /// .filter_if_some(some_on_first_tick)
897 /// .all_ticks()
898 /// # }, |mut stream| async move {
899 /// // [1]
900 /// # for w in vec![1] {
901 /// # assert_eq!(stream.next().await.unwrap(), w);
902 /// # }
903 /// # }));
904 /// # }
905 /// ```
906 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
907 pub fn filter_if_some<U>(
908 self,
909 signal: Optional<U, L, B>,
910 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
911 where
912 B: IsBounded,
913 {
914 self.filter_if(signal.is_some())
915 }
916
917 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
918 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
919 ///
920 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
921 /// the condition.
922 ///
923 /// # Example
924 /// ```rust
925 /// # #[cfg(feature = "deploy")] {
926 /// # use hydro_lang::prelude::*;
927 /// # use futures::StreamExt;
928 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
929 /// let tick = process.tick();
930 /// // ticks are lazy by default, forces the second tick to run
931 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
932 ///
933 /// let batch_first_tick = process
934 /// .source_iter(q!(vec![1]))
935 /// .batch(&tick, nondet!(/** test */));
936 /// let batch_second_tick = process
937 /// .source_iter(q!(vec![1, 2, 3]))
938 /// .batch(&tick, nondet!(/** test */))
939 /// .defer_tick(); // appears on the second tick
940 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
941 /// batch_first_tick.chain(batch_second_tick).count()
942 /// .filter_if_none(some_on_first_tick)
943 /// .all_ticks()
944 /// # }, |mut stream| async move {
945 /// // [3]
946 /// # for w in vec![3] {
947 /// # assert_eq!(stream.next().await.unwrap(), w);
948 /// # }
949 /// # }));
950 /// # }
951 /// ```
952 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
953 pub fn filter_if_none<U>(
954 self,
955 other: Optional<U, L, B>,
956 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
957 where
958 B: IsBounded,
959 {
960 self.filter_if(other.is_none())
961 }
962
963 /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
964 ///
965 /// # Example
966 /// ```rust
967 /// # #[cfg(feature = "deploy")] {
968 /// # use hydro_lang::prelude::*;
969 /// # use futures::StreamExt;
970 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
971 /// let tick = process.tick();
972 /// let a = tick.singleton(q!(5));
973 /// let b = tick.singleton(q!(5));
974 /// a.equals(b).all_ticks()
975 /// # }, |mut stream| async move {
976 /// // [true]
977 /// # assert_eq!(stream.next().await.unwrap(), true);
978 /// # }));
979 /// # }
980 /// ```
981 pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
982 where
983 T: PartialEq,
984 B: IsBounded,
985 {
986 self.zip(other).map(q!(|(a, b)| a == b))
987 }
988
989 /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
990 /// greater than or equal to the provided threshold. The event will have the value of the
991 /// given threshold.
992 ///
993 /// This requires the incoming singleton to be monotonic, because otherwise the detection of
994 /// the threshold would be non-deterministic.
995 ///
996 /// # Example
997 /// ```rust
998 /// # #[cfg(feature = "deploy")] {
999 /// # use hydro_lang::prelude::*;
1000 /// # use futures::StreamExt;
1001 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1002 /// let a = // singleton 1 ~> 5 ~> 10
1003 /// # process.singleton(q!(5));
1004 /// let b = process.singleton(q!(4));
1005 /// a.threshold_greater_or_equal(b)
1006 /// # }, |mut stream| async move {
1007 /// // [4]
1008 /// # assert_eq!(stream.next().await.unwrap(), 4);
1009 /// # }));
1010 /// # }
1011 /// ```
1012 pub fn threshold_greater_or_equal<B2: IsBounded>(
1013 self,
1014 threshold: Singleton<T, L, B2>,
1015 ) -> Stream<T, L, B::UnderlyingBound>
1016 where
1017 T: Clone + PartialOrd,
1018 B: IsMonotonic,
1019 {
1020 let threshold = threshold.make_bounded();
1021 let self_location = self.location().clone();
1022 match self.try_make_bounded() {
1023 Ok(bounded) => {
1024 let uncasted = threshold
1025 .zip(bounded)
1026 .into_stream()
1027 .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
1028
1029 Stream::new(
1030 uncasted.location.clone(),
1031 uncasted.ir_node.replace(HydroNode::Placeholder),
1032 )
1033 }
1034 Err(me) => {
1035 let uncasted = sliced! {
1036 let me = use(me, nondet!(/** thresholds are deterministic */));
1037 let mut remaining_threshold = use::state(|l| {
1038 let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
1039 as_option
1040 });
1041
1042 let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
1043 remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
1044 passed.map(q!(|(t, _)| t))
1045 };
1046
1047 Stream::new(
1048 self_location,
1049 uncasted.ir_node.replace(HydroNode::Placeholder),
1050 )
1051 }
1052 }
1053 }
1054
1055 /// An operator which allows you to "name" a `HydroNode`.
1056 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1057 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
1058 {
1059 let mut node = self.ir_node.borrow_mut();
1060 let metadata = node.metadata_mut();
1061 metadata.tag = Some(name.to_owned());
1062 }
1063 self
1064 }
1065}
1066
1067impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
1068 type Output = Singleton<bool, L, B::UnderlyingBound>;
1069
1070 fn not(self) -> Self::Output {
1071 self.map(q!(|b| !b))
1072 }
1073}
1074
1075impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
1076where
1077 L: Location<'a>,
1078{
1079 /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
1080 /// the inner `Option`.
1081 ///
1082 /// This is implemented as an identity [`Singleton::filter_map`], passing through the
1083 /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
1084 /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
1085 ///
1086 /// # Example
1087 /// ```rust
1088 /// # #[cfg(feature = "deploy")] {
1089 /// # use hydro_lang::prelude::*;
1090 /// # use futures::StreamExt;
1091 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1092 /// let tick = process.tick();
1093 /// let singleton = tick.singleton(q!(Some(42)));
1094 /// singleton.into_optional().all_ticks()
1095 /// # }, |mut stream| async move {
1096 /// // 42
1097 /// # assert_eq!(stream.next().await.unwrap(), 42);
1098 /// # }));
1099 /// # }
1100 /// ```
1101 pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
1102 self.filter_map(q!(|v| v))
1103 }
1104}
1105
1106impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
1107where
1108 L: Location<'a>,
1109{
1110 /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
1111 ///
1112 /// # Example
1113 /// ```rust
1114 /// # #[cfg(feature = "deploy")] {
1115 /// # use hydro_lang::prelude::*;
1116 /// # use futures::StreamExt;
1117 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1118 /// let tick = process.tick();
1119 /// // ticks are lazy by default, forces the second tick to run
1120 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1121 ///
1122 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1123 /// let b = tick.singleton(q!(true)); // true, true
1124 /// a.and(b).all_ticks()
1125 /// # }, |mut stream| async move {
1126 /// // [true, false]
1127 /// # for w in vec![true, false] {
1128 /// # assert_eq!(stream.next().await.unwrap(), w);
1129 /// # }
1130 /// # }));
1131 /// # }
1132 /// ```
1133 pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1134 where
1135 B: IsBounded,
1136 {
1137 self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1138 }
1139
1140 /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1141 ///
1142 /// # Example
1143 /// ```rust
1144 /// # #[cfg(feature = "deploy")] {
1145 /// # use hydro_lang::prelude::*;
1146 /// # use futures::StreamExt;
1147 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1148 /// let tick = process.tick();
1149 /// // ticks are lazy by default, forces the second tick to run
1150 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1151 ///
1152 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1153 /// let b = tick.singleton(q!(false)); // false, false
1154 /// a.or(b).all_ticks()
1155 /// # }, |mut stream| async move {
1156 /// // [true, false]
1157 /// # for w in vec![true, false] {
1158 /// # assert_eq!(stream.next().await.unwrap(), w);
1159 /// # }
1160 /// # }));
1161 /// # }
1162 /// ```
1163 pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1164 where
1165 B: IsBounded,
1166 {
1167 self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1168 }
1169}
1170
1171impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1172where
1173 L: Location<'a>,
1174{
1175 /// Returns a singleton value corresponding to the latest snapshot of the singleton
1176 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1177 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1178 /// all snapshots of this singleton into the atomic-associated tick will observe the
1179 /// same value each tick.
1180 ///
1181 /// # Non-Determinism
1182 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1183 /// the output singleton has a non-deterministic value since the snapshot can be at an
1184 /// arbitrary point in time.
1185 pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1186 self,
1187 tick: &Tick<L2>,
1188 _nondet: NonDet,
1189 ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1190 Singleton::new(
1191 tick.drop_consistency(),
1192 HydroNode::Batch {
1193 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1194 metadata: tick
1195 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1196 },
1197 )
1198 }
1199
1200 /// Returns this singleton back into a top-level, asynchronous execution context where updates
1201 /// to the value will be asynchronously propagated.
1202 pub fn end_atomic(self) -> Singleton<T, L, B> {
1203 Singleton::new(
1204 self.location.tick.l.clone(),
1205 HydroNode::EndAtomic {
1206 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1207 metadata: self
1208 .location
1209 .tick
1210 .l
1211 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1212 },
1213 )
1214 }
1215}
1216
1217impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1218where
1219 L: Location<'a>,
1220{
1221 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1222 /// will observe the same version of the value and will be executed synchronously before any
1223 /// outputs are yielded (in [`Optional::end_atomic`]).
1224 ///
1225 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1226 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1227 /// a different version).
1228 pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1229 let id = self.location.flow_state().borrow_mut().next_clock_id();
1230 let out_location = Atomic {
1231 tick: Tick {
1232 id,
1233 l: self.location.clone(),
1234 },
1235 };
1236 Singleton::new(
1237 out_location.clone(),
1238 HydroNode::BeginAtomic {
1239 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1240 metadata: out_location
1241 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1242 },
1243 )
1244 }
1245
1246 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1247 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1248 /// relevant data that contributed to the snapshot at tick `t`.
1249 ///
1250 /// # Non-Determinism
1251 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1252 /// the output singleton has a non-deterministic value since the snapshot can be at an
1253 /// arbitrary point in time.
1254 pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1255 self,
1256 tick: &Tick<L2>,
1257 _nondet: NonDet,
1258 ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1259 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1260 Singleton::new(
1261 tick.drop_consistency(),
1262 HydroNode::Batch {
1263 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1264 metadata: tick
1265 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1266 },
1267 )
1268 }
1269
1270 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1271 /// with order corresponding to increasing prefixes of data contributing to the singleton.
1272 ///
1273 /// # Non-Determinism
1274 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1275 /// to non-deterministic batching and arrival of inputs, the output stream is
1276 /// non-deterministic.
1277 pub fn sample_eager(
1278 self,
1279 nondet: NonDet,
1280 ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1281 sliced! {
1282 let snapshot = use(self, nondet);
1283 snapshot.into_stream()
1284 }
1285 .weaken_retries()
1286 }
1287
1288 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1289 /// value taken at various points in time. Because the input singleton may be
1290 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1291 /// represent the value of the singleton given some prefix of the streams leading up to
1292 /// it.
1293 ///
1294 /// # Non-Determinism
1295 /// The output stream is non-deterministic in which elements are sampled, since this
1296 /// is controlled by a clock.
1297 pub fn sample_every(
1298 self,
1299 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1300 nondet: NonDet,
1301 ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce>
1302 where
1303 L: TopLevel<'a>,
1304 {
1305 let samples = self.location.source_interval(interval);
1306 sliced! {
1307 let snapshot = use(self, nondet);
1308 let sample_batch = use(samples, nondet);
1309
1310 snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1311 }
1312 .weaken_retries()
1313 }
1314
1315 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1316 /// implies that `B == Bounded`.
1317 pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1318 where
1319 B: IsBounded,
1320 {
1321 Singleton::new(
1322 self.location.clone(),
1323 self.ir_node.replace(HydroNode::Placeholder),
1324 )
1325 }
1326
1327 #[expect(clippy::result_large_err, reason = "internal use only")]
1328 fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1329 if B::UnderlyingBound::BOUNDED {
1330 Ok(Singleton::new(
1331 self.location.clone(),
1332 self.ir_node.replace(HydroNode::Placeholder),
1333 ))
1334 } else {
1335 Err(self)
1336 }
1337 }
1338
1339 /// Clones this bounded singleton into a tick, returning a singleton that has the
1340 /// same value as the outer singleton. Because the outer singleton is bounded, this
1341 /// is deterministic because there is only a single immutable version.
1342 pub fn clone_into_tick<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1343 self,
1344 tick: &Tick<L2>,
1345 ) -> Singleton<T, Tick<L2>, Bounded>
1346 where
1347 B: IsBounded,
1348 T: Clone,
1349 {
1350 // TODO(shadaj): avoid printing simulator logs for this snapshot
1351 let inner = self.snapshot(
1352 tick,
1353 nondet!(/** bounded top-level singleton so deterministic */),
1354 );
1355 Singleton::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
1356 }
1357
1358 /// Converts this singleton into a [`Stream`] containing a single element, the value.
1359 ///
1360 /// # Example
1361 /// ```rust
1362 /// # #[cfg(feature = "deploy")] {
1363 /// # use hydro_lang::prelude::*;
1364 /// # use futures::StreamExt;
1365 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1366 /// let tick = process.tick();
1367 /// let batch_input = process
1368 /// .source_iter(q!(vec![123, 456]))
1369 /// .batch(&tick, nondet!(/** test */));
1370 /// batch_input.clone().chain(
1371 /// batch_input.count().into_stream()
1372 /// ).all_ticks()
1373 /// # }, |mut stream| async move {
1374 /// // [123, 456, 2]
1375 /// # for w in vec![123, 456, 2] {
1376 /// # assert_eq!(stream.next().await.unwrap(), w);
1377 /// # }
1378 /// # }));
1379 /// # }
1380 /// ```
1381 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1382 where
1383 B: IsBounded,
1384 {
1385 Stream::new(
1386 self.location.clone(),
1387 HydroNode::Cast {
1388 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1389 metadata: self.location.new_node_metadata(Stream::<
1390 T,
1391 Tick<L>,
1392 Bounded,
1393 TotalOrder,
1394 ExactlyOnce,
1395 >::collection_kind()),
1396 },
1397 )
1398 }
1399
1400 /// Resolves the singleton's [`Future`] value by blocking until it completes,
1401 /// producing a singleton of the resolved output.
1402 ///
1403 /// This is useful when the singleton contains an async computation that must
1404 /// be awaited before further processing. The future is polled to completion
1405 /// before the output value is emitted.
1406 ///
1407 /// # Example
1408 /// ```rust
1409 /// # #[cfg(feature = "deploy")] {
1410 /// # use hydro_lang::prelude::*;
1411 /// # use futures::StreamExt;
1412 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1413 /// let tick = process.tick();
1414 /// let singleton = tick.singleton(q!(5));
1415 /// singleton
1416 /// .map(q!(|v| async move { v * 2 }))
1417 /// .resolve_future_blocking()
1418 /// .all_ticks()
1419 /// # }, |mut stream| async move {
1420 /// // 10
1421 /// # assert_eq!(stream.next().await.unwrap(), 10);
1422 /// # }));
1423 /// # }
1424 /// ```
1425 pub fn resolve_future_blocking(
1426 self,
1427 ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1428 where
1429 T: Future,
1430 B: IsBounded,
1431 {
1432 Singleton::new(
1433 self.location.clone(),
1434 HydroNode::ResolveFuturesBlocking {
1435 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1436 metadata: self
1437 .location
1438 .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1439 },
1440 )
1441 }
1442}
1443
1444impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1445where
1446 L: Location<'a>,
1447{
1448 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1449 /// which will stream the value computed in _each_ tick as a separate stream element.
1450 ///
1451 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1452 /// producing one element in the output for each tick. This is useful for batched computations,
1453 /// where the results from each tick must be combined together.
1454 ///
1455 /// # Example
1456 /// ```rust
1457 /// # #[cfg(feature = "deploy")] {
1458 /// # use hydro_lang::prelude::*;
1459 /// # use futures::StreamExt;
1460 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1461 /// let tick = process.tick();
1462 /// # // ticks are lazy by default, forces the second tick to run
1463 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1464 /// # let batch_first_tick = process
1465 /// # .source_iter(q!(vec![1]))
1466 /// # .batch(&tick, nondet!(/** test */));
1467 /// # let batch_second_tick = process
1468 /// # .source_iter(q!(vec![1, 2, 3]))
1469 /// # .batch(&tick, nondet!(/** test */))
1470 /// # .defer_tick(); // appears on the second tick
1471 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1472 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1473 /// .count()
1474 /// .all_ticks()
1475 /// # }, |mut stream| async move {
1476 /// // [1, 3]
1477 /// # for w in vec![1, 3] {
1478 /// # assert_eq!(stream.next().await.unwrap(), w);
1479 /// # }
1480 /// # }));
1481 /// # }
1482 /// ```
1483 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1484 self.into_stream().all_ticks()
1485 }
1486
1487 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1488 /// which will stream the value computed in _each_ tick as a separate stream element.
1489 ///
1490 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1491 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1492 /// singleton's [`Tick`] context.
1493 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1494 self.into_stream().all_ticks_atomic()
1495 }
1496
1497 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1498 /// be asynchronously updated with the latest value of the singleton inside the tick.
1499 ///
1500 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1501 /// tick that tracks the inner value. This is useful for getting the value as of the
1502 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1503 ///
1504 /// # Example
1505 /// ```rust
1506 /// # #[cfg(feature = "deploy")] {
1507 /// # use hydro_lang::prelude::*;
1508 /// # use futures::StreamExt;
1509 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1510 /// let tick = process.tick();
1511 /// # // ticks are lazy by default, forces the second tick to run
1512 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1513 /// # let batch_first_tick = process
1514 /// # .source_iter(q!(vec![1]))
1515 /// # .batch(&tick, nondet!(/** test */));
1516 /// # let batch_second_tick = process
1517 /// # .source_iter(q!(vec![1, 2, 3]))
1518 /// # .batch(&tick, nondet!(/** test */))
1519 /// # .defer_tick(); // appears on the second tick
1520 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1521 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1522 /// .count()
1523 /// .latest()
1524 /// # .sample_eager(nondet!(/** test */))
1525 /// # }, |mut stream| async move {
1526 /// // asynchronously changes from 1 ~> 3
1527 /// # for w in vec![1, 3] {
1528 /// # assert_eq!(stream.next().await.unwrap(), w);
1529 /// # }
1530 /// # }));
1531 /// # }
1532 /// ```
1533 pub fn latest(self) -> Singleton<T, L, Unbounded> {
1534 Singleton::new(
1535 self.location.outer().clone(),
1536 HydroNode::YieldConcat {
1537 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1538 metadata: self
1539 .location
1540 .outer()
1541 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1542 },
1543 )
1544 }
1545
1546 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1547 /// be updated with the latest value of the singleton inside the tick.
1548 ///
1549 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1550 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1551 /// singleton's [`Tick`] context.
1552 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1553 let out_location = Atomic {
1554 tick: self.location.clone(),
1555 };
1556 Singleton::new(
1557 out_location.clone(),
1558 HydroNode::YieldConcat {
1559 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1560 metadata: out_location
1561 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1562 },
1563 )
1564 }
1565}
1566
1567#[doc(hidden)]
1568/// Helper trait that determines the output collection type for [`Singleton::zip`].
1569///
1570/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1571/// [`Singleton`].
1572#[sealed::sealed]
1573pub trait ZipResult<'a, Other> {
1574 /// The output collection type.
1575 type Out;
1576 /// The type of the tupled output value.
1577 type ElementType;
1578 /// The type of the other collection's value.
1579 type OtherType;
1580 /// The location where the tupled result will be materialized.
1581 type Location: Location<'a>;
1582
1583 /// The location of the second input to the `zip`.
1584 fn other_location(other: &Other) -> Self::Location;
1585 /// The IR node of the second input to the `zip`.
1586 fn other_ir_node(other: Other) -> HydroNode;
1587
1588 /// Constructs the output live collection given an IR node containing the zip result.
1589 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1590}
1591
1592#[sealed::sealed]
1593impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1594where
1595 L: Location<'a>,
1596{
1597 type Out = Singleton<(T, U), L, B>;
1598 type ElementType = (T, U);
1599 type OtherType = U;
1600 type Location = L;
1601
1602 fn other_location(other: &Singleton<U, L, B>) -> L {
1603 other.location.clone()
1604 }
1605
1606 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1607 other.ir_node.replace(HydroNode::Placeholder)
1608 }
1609
1610 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1611 Singleton::new(
1612 location.clone(),
1613 HydroNode::Cast {
1614 inner: Box::new(ir_node),
1615 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1616 },
1617 )
1618 }
1619}
1620
1621#[sealed::sealed]
1622impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1623 for Singleton<T, L, B>
1624where
1625 L: Location<'a>,
1626{
1627 type Out = Optional<(T, U), L, B::UnderlyingBound>;
1628 type ElementType = (T, U);
1629 type OtherType = U;
1630 type Location = L;
1631
1632 fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1633 other.location.clone()
1634 }
1635
1636 fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1637 other.ir_node.replace(HydroNode::Placeholder)
1638 }
1639
1640 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1641 Optional::new(location, ir_node)
1642 }
1643}
1644
1645#[cfg(test)]
1646mod tests {
1647 #[cfg(feature = "deploy")]
1648 use futures::{SinkExt, StreamExt};
1649 #[cfg(feature = "deploy")]
1650 use hydro_deploy::Deployment;
1651 #[cfg(any(feature = "deploy", feature = "sim"))]
1652 use stageleft::q;
1653
1654 #[cfg(any(feature = "deploy", feature = "sim"))]
1655 use crate::compile::builder::FlowBuilder;
1656 #[cfg(feature = "deploy")]
1657 use crate::live_collections::stream::ExactlyOnce;
1658 #[cfg(any(feature = "deploy", feature = "sim"))]
1659 use crate::location::Location;
1660 #[cfg(any(feature = "deploy", feature = "sim"))]
1661 use crate::nondet::nondet;
1662
1663 #[cfg(feature = "deploy")]
1664 #[tokio::test]
1665 async fn tick_cycle_cardinality() {
1666 let mut deployment = Deployment::new();
1667
1668 let mut flow = FlowBuilder::new();
1669 let node = flow.process::<()>();
1670 let external = flow.external::<()>();
1671
1672 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1673
1674 let node_tick = node.tick();
1675 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1676 let counts = singleton
1677 .clone()
1678 .into_stream()
1679 .count()
1680 .filter_if(
1681 input
1682 .batch(&node_tick, nondet!(/** testing */))
1683 .first()
1684 .is_some(),
1685 )
1686 .all_ticks()
1687 .send_bincode_external(&external);
1688 complete_cycle.complete_next_tick(singleton);
1689
1690 let nodes = flow
1691 .with_process(&node, deployment.Localhost())
1692 .with_external(&external, deployment.Localhost())
1693 .deploy(&mut deployment);
1694
1695 deployment.deploy().await.unwrap();
1696
1697 let mut tick_trigger = nodes.connect(input_send).await;
1698 let mut external_out = nodes.connect(counts).await;
1699
1700 deployment.start().await.unwrap();
1701
1702 tick_trigger.send(()).await.unwrap();
1703
1704 assert_eq!(external_out.next().await.unwrap(), 1);
1705
1706 tick_trigger.send(()).await.unwrap();
1707
1708 assert_eq!(external_out.next().await.unwrap(), 1);
1709 }
1710
1711 #[cfg(feature = "sim")]
1712 #[test]
1713 #[should_panic]
1714 fn sim_fold_intermediate_states() {
1715 let mut flow = FlowBuilder::new();
1716 let node = flow.process::<()>();
1717
1718 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1719 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1720
1721 let tick = node.tick();
1722 let batch = folded.snapshot(&tick, nondet!(/** test */));
1723 let out_recv = batch.all_ticks().sim_output();
1724
1725 flow.sim().exhaustive(async || {
1726 assert_eq!(out_recv.next().await.unwrap(), 10);
1727 });
1728 }
1729
1730 #[cfg(feature = "sim")]
1731 #[test]
1732 fn sim_fold_intermediate_state_count() {
1733 let mut flow = FlowBuilder::new();
1734 let node = flow.process::<()>();
1735
1736 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1737 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1738
1739 let tick = node.tick();
1740 let batch = folded.snapshot(&tick, nondet!(/** test */));
1741 let out_recv = batch.all_ticks().sim_output();
1742
1743 let instance_count = flow.sim().exhaustive(async || {
1744 let out = out_recv.collect::<Vec<_>>().await;
1745 assert_eq!(out.last(), Some(&10));
1746 });
1747
1748 assert_eq!(
1749 instance_count,
1750 16 // 2^4 possible subsets of intermediates (including initial state)
1751 )
1752 }
1753
1754 #[cfg(feature = "sim")]
1755 #[test]
1756 fn sim_fold_no_repeat_initial() {
1757 // check that we don't repeat the initial state of the fold in autonomous decisions
1758
1759 let mut flow = FlowBuilder::new();
1760 let node = flow.process::<()>();
1761
1762 let (in_port, input) = node.sim_input();
1763 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1764
1765 let tick = node.tick();
1766 let batch = folded.snapshot(&tick, nondet!(/** test */));
1767 let out_recv = batch.all_ticks().sim_output();
1768
1769 flow.sim().exhaustive(async || {
1770 assert_eq!(out_recv.next().await.unwrap(), 0);
1771
1772 in_port.send(123);
1773
1774 assert_eq!(out_recv.next().await.unwrap(), 123);
1775 });
1776 }
1777
1778 #[cfg(feature = "sim")]
1779 #[test]
1780 #[should_panic]
1781 fn sim_fold_repeats_snapshots() {
1782 // when the tick is driven by a snapshot AND something else, the snapshot can
1783 // "stutter" and repeat the same state multiple times
1784
1785 let mut flow = FlowBuilder::new();
1786 let node = flow.process::<()>();
1787
1788 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1789 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1790
1791 let tick = node.tick();
1792 let batch = source
1793 .batch(&tick, nondet!(/** test */))
1794 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1795 let out_recv = batch.all_ticks().sim_output();
1796
1797 flow.sim().exhaustive(async || {
1798 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1799 {
1800 panic!("repeated snapshot");
1801 }
1802 });
1803 }
1804
1805 #[cfg(feature = "sim")]
1806 #[test]
1807 fn sim_fold_repeats_snapshots_count() {
1808 // check the number of instances
1809 let mut flow = FlowBuilder::new();
1810 let node = flow.process::<()>();
1811
1812 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1813 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1814
1815 let tick = node.tick();
1816 let batch = source
1817 .batch(&tick, nondet!(/** test */))
1818 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1819 let out_recv = batch.all_ticks().sim_output();
1820
1821 let count = flow.sim().exhaustive(async || {
1822 let _ = out_recv.collect::<Vec<_>>().await;
1823 });
1824
1825 assert_eq!(count, 52);
1826 // don't have a combinatorial explanation for this number yet, but checked via logs
1827 }
1828
1829 #[cfg(feature = "sim")]
1830 #[test]
1831 fn sim_top_level_singleton_exhaustive() {
1832 // ensures that top-level singletons have only one snapshot
1833 let mut flow = FlowBuilder::new();
1834 let node = flow.process::<()>();
1835
1836 let singleton = node.singleton(q!(1));
1837 let tick = node.tick();
1838 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1839 let out_recv = batch.all_ticks().sim_output();
1840
1841 let count = flow.sim().exhaustive(async || {
1842 let _ = out_recv.collect::<Vec<_>>().await;
1843 });
1844
1845 assert_eq!(count, 1);
1846 }
1847
1848 #[cfg(feature = "sim")]
1849 #[test]
1850 fn sim_top_level_singleton_join_count() {
1851 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1852 // exploration
1853
1854 let mut flow = FlowBuilder::new();
1855 let node = flow.process::<()>();
1856
1857 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1858 let tick = node.tick();
1859 let batch = source_iter
1860 .batch(&tick, nondet!(/** test */))
1861 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1862 let out_recv = batch.all_ticks().sim_output();
1863
1864 let instance_count = flow.sim().exhaustive(async || {
1865 let _ = out_recv.collect::<Vec<_>>().await;
1866 });
1867
1868 assert_eq!(
1869 instance_count,
1870 16 // 2^4 ways to split up (including a possibly empty first batch)
1871 )
1872 }
1873
1874 #[cfg(feature = "sim")]
1875 #[test]
1876 fn top_level_singleton_into_stream_no_replay() {
1877 let mut flow = FlowBuilder::new();
1878 let node = flow.process::<()>();
1879
1880 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1881 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1882
1883 let out_recv = folded.into_stream().sim_output();
1884
1885 flow.sim().exhaustive(async || {
1886 out_recv.assert_yields_only([10]).await;
1887 });
1888 }
1889
1890 #[cfg(feature = "sim")]
1891 #[test]
1892 fn inside_tick_singleton_zip() {
1893 use crate::live_collections::Stream;
1894 use crate::live_collections::sliced::sliced;
1895
1896 let mut flow = FlowBuilder::new();
1897 let node = flow.process::<()>();
1898
1899 let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1900 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1901
1902 let out_recv = sliced! {
1903 let v = use(folded, nondet!(/** test */));
1904 v.clone().zip(v).into_stream()
1905 }
1906 .sim_output();
1907
1908 let count = flow.sim().exhaustive(async || {
1909 let out = out_recv.collect::<Vec<_>>().await;
1910 assert_eq!(out.last(), Some(&(3, 3)));
1911 });
1912
1913 assert_eq!(count, 4);
1914 }
1915
1916 /// Reproducer for simulator hang when using cross_singleton on a top-level
1917 /// unbounded stream (not inside sliced!). The exhaustive simulator hangs
1918 /// after the first iteration.
1919 #[cfg(feature = "sim")]
1920 #[test]
1921 fn sim_cross_singleton_top_level_unbounded_hang() {
1922 let mut flow = FlowBuilder::new();
1923 let node = flow.process::<()>();
1924
1925 let (cmd_port, input) = node.sim_input::<String, _, _>();
1926
1927 let top_level_singleton = node.singleton(q!(123));
1928
1929 // cross_singleton on a top-level stream - bug trigger
1930 let crossed = input.cross_singleton(top_level_singleton);
1931
1932 // Output directly
1933 let resp_port = crossed.sim_output();
1934
1935 let count = flow.sim().exhaustive(async || {
1936 cmd_port.send("abc".to_owned());
1937
1938 let responses: Vec<_> = resp_port.collect().await;
1939 assert!(!responses.is_empty());
1940 });
1941
1942 assert_eq!(count, 1);
1943 }
1944
1945 #[cfg(feature = "sim")]
1946 #[test]
1947 fn sim_top_level_singleton_state_count() {
1948 let mut flow = FlowBuilder::new();
1949 let process = flow.process::<()>();
1950
1951 let (cmd_port, input) = process.sim_input();
1952 {
1953 // increases exhaustive inputs from 1 to 2 before we optimized `From`
1954 use super::Singleton;
1955 use crate::live_collections::boundedness::Unbounded;
1956 let _singleton: Singleton<_, _, Unbounded> = process.singleton(q!(false)).into();
1957 }
1958 let tick = process.tick();
1959 let batched_unbatched = input.batch(&tick, nondet!(/** */)).all_ticks();
1960 let resp_port = batched_unbatched.sim_output();
1961
1962 let count = flow.sim().exhaustive(async || {
1963 cmd_port.send(());
1964 let _responses: Vec<_> = resp_port.collect().await;
1965 });
1966
1967 assert_eq!(count, 1);
1968 }
1969}