hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick};
30use crate::location::{Location, Tick, TopLevel, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35 AggFuncAlgebra, ApplyMonotoneStream, StreamMapFuncAlgebra, ValidCommutativityFor,
36 ValidIdempotenceFor, ValidMutCommutativityFor, ValidMutIdempotenceFor,
37};
38
39pub mod networking;
40
41/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
42#[sealed::sealed]
43pub trait Ordering:
44 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
45{
46 /// The [`StreamOrder`] corresponding to this type.
47 const ORDERING_KIND: StreamOrder;
48}
49
50/// Marks the stream as being totally ordered, which means that there are
51/// no sources of non-determinism (other than intentional ones) that will
52/// affect the order of elements.
53pub enum TotalOrder {}
54
55#[sealed::sealed]
56impl Ordering for TotalOrder {
57 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
58}
59
60/// Marks the stream as having no order, which means that the order of
61/// elements may be affected by non-determinism.
62///
63/// This restricts certain operators, such as `fold` and `reduce`, to only
64/// be used with commutative aggregation functions.
65pub enum NoOrder {}
66
67#[sealed::sealed]
68impl Ordering for NoOrder {
69 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
70}
71
72/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
73/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
74/// have `Self` guarantees instead.
75#[sealed::sealed]
76pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
77#[sealed::sealed]
78impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
79
80/// Helper trait for determining the weakest of two orderings.
81#[sealed::sealed]
82pub trait MinOrder<Other: ?Sized> {
83 /// The weaker of the two orderings.
84 type Min: Ordering;
85}
86
87#[sealed::sealed]
88impl<O: Ordering> MinOrder<O> for TotalOrder {
89 type Min = O;
90}
91
92#[sealed::sealed]
93impl<O: Ordering> MinOrder<O> for NoOrder {
94 type Min = NoOrder;
95}
96
97/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
98#[sealed::sealed]
99pub trait Retries:
100 MinRetries<Self, Min = Self>
101 + MinRetries<ExactlyOnce, Min = Self>
102 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
103{
104 /// The [`StreamRetry`] corresponding to this type.
105 const RETRIES_KIND: StreamRetry;
106}
107
108/// Marks the stream as having deterministic message cardinality, with no
109/// possibility of duplicates.
110pub enum ExactlyOnce {}
111
112#[sealed::sealed]
113impl Retries for ExactlyOnce {
114 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
115}
116
117/// Marks the stream as having non-deterministic message cardinality, which
118/// means that duplicates may occur, but messages will not be dropped.
119pub enum AtLeastOnce {}
120
121#[sealed::sealed]
122impl Retries for AtLeastOnce {
123 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
124}
125
126/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
127/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
128/// have `Self` guarantees instead.
129#[sealed::sealed]
130pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
131#[sealed::sealed]
132impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
133
134/// Helper trait for determining the weakest of two retry guarantees.
135#[sealed::sealed]
136pub trait MinRetries<Other: ?Sized> {
137 /// The weaker of the two retry guarantees.
138 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
139}
140
141#[sealed::sealed]
142impl<R: Retries> MinRetries<R> for ExactlyOnce {
143 type Min = R;
144}
145
146#[sealed::sealed]
147impl<R: Retries> MinRetries<R> for AtLeastOnce {
148 type Min = AtLeastOnce;
149}
150
151#[sealed::sealed]
152#[diagnostic::on_unimplemented(
153 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
154 label = "required here",
155 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
156)]
157/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
158pub trait IsOrdered: Ordering {}
159
160#[sealed::sealed]
161#[diagnostic::do_not_recommend]
162impl IsOrdered for TotalOrder {}
163
164#[sealed::sealed]
165#[diagnostic::on_unimplemented(
166 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
167 label = "required here",
168 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
169)]
170/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
171pub trait IsExactlyOnce: Retries {}
172
173#[sealed::sealed]
174#[diagnostic::do_not_recommend]
175impl IsExactlyOnce for ExactlyOnce {}
176
177/// Streaming sequence of elements with type `Type`.
178///
179/// This live collection represents a growing sequence of elements, with new elements being
180/// asynchronously appended to the end of the sequence. This can be used to model the arrival
181/// of network input, such as API requests, or streaming ingestion.
182///
183/// By default, all streams have deterministic ordering and each element is materialized exactly
184/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
185/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
186/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
187///
188/// Type Parameters:
189/// - `Type`: the type of elements in the stream
190/// - `Loc`: the location where the stream is being materialized
191/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
192/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
193/// (default is [`TotalOrder`])
194/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
195/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
196pub struct Stream<
197 Type,
198 Loc,
199 Bound: Boundedness = Unbounded,
200 Order: Ordering = TotalOrder,
201 Retry: Retries = ExactlyOnce,
202> {
203 pub(crate) location: Loc,
204 pub(crate) ir_node: RefCell<HydroNode>,
205 pub(crate) flow_state: FlowState,
206
207 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
208}
209
210impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
211 fn drop(&mut self) {
212 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
213 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
214 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
215 input: Box::new(ir_node),
216 op_metadata: HydroIrOpMetadata::new(),
217 });
218 }
219 }
220}
221
222impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
223 for Stream<T, L, Unbounded, O, R>
224where
225 L: Location<'a>,
226{
227 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
228 let new_meta = stream
229 .location
230 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
231
232 Stream {
233 location: stream.location.clone(),
234 flow_state: stream.flow_state.clone(),
235 ir_node: RefCell::new(HydroNode::Cast {
236 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
237 metadata: new_meta,
238 }),
239 _phantom: PhantomData,
240 }
241 }
242}
243
244impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
245 for Stream<T, L, B, NoOrder, R>
246where
247 L: Location<'a>,
248{
249 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
250 stream.weaken_ordering()
251 }
252}
253
254impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
255 for Stream<T, L, B, O, AtLeastOnce>
256where
257 L: Location<'a>,
258{
259 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
260 stream.weaken_retries()
261 }
262}
263
264impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
265where
266 L: Location<'a>,
267{
268 fn defer_tick(self) -> Self {
269 Stream::defer_tick(self)
270 }
271}
272
273impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
274 for Stream<T, Tick<L>, Bounded, O, R>
275where
276 L: Location<'a>,
277{
278 type Location = Tick<L>;
279
280 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
281 Stream::new(
282 location.clone(),
283 HydroNode::CycleSource {
284 cycle_id,
285 metadata: location.new_node_metadata(Self::collection_kind()),
286 },
287 )
288 }
289}
290
291impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
292 for Stream<T, Tick<L>, Bounded, O, R>
293where
294 L: Location<'a>,
295{
296 type Location = Tick<L>;
297
298 fn location(&self) -> &Self::Location {
299 self.location()
300 }
301
302 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
303 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
304 location.clone(),
305 HydroNode::DeferTick {
306 input: Box::new(HydroNode::CycleSource {
307 cycle_id,
308 metadata: location.new_node_metadata(Self::collection_kind()),
309 }),
310 metadata: location.new_node_metadata(Self::collection_kind()),
311 },
312 );
313
314 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
315 }
316}
317
318impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
319 for Stream<T, Tick<L>, Bounded, O, R>
320where
321 L: Location<'a>,
322{
323 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
324 assert_eq!(
325 Location::id(&self.location),
326 expected_location,
327 "locations do not match"
328 );
329 self.location
330 .flow_state()
331 .borrow_mut()
332 .push_root(HydroRoot::CycleSink {
333 cycle_id,
334 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
335 op_metadata: HydroIrOpMetadata::new(),
336 });
337 }
338}
339
340impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
341 for Stream<T, L, B, O, R>
342where
343 L: Location<'a>,
344{
345 type Location = L;
346
347 fn create_source(cycle_id: CycleId, location: L) -> Self {
348 Stream::new(
349 location.clone(),
350 HydroNode::CycleSource {
351 cycle_id,
352 metadata: location.new_node_metadata(Self::collection_kind()),
353 },
354 )
355 }
356}
357
358impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
359 for Stream<T, L, B, O, R>
360where
361 L: Location<'a>,
362{
363 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
364 assert_eq!(
365 Location::id(&self.location),
366 expected_location,
367 "locations do not match"
368 );
369 self.location
370 .flow_state()
371 .borrow_mut()
372 .push_root(HydroRoot::CycleSink {
373 cycle_id,
374 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
375 op_metadata: HydroIrOpMetadata::new(),
376 });
377 }
378}
379
380impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
381where
382 T: Clone,
383 L: Location<'a>,
384{
385 fn clone(&self) -> Self {
386 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
387 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
388 *self.ir_node.borrow_mut() = HydroNode::Tee {
389 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
390 metadata: self.location.new_node_metadata(Self::collection_kind()),
391 };
392 }
393
394 let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
395 unreachable!()
396 };
397 Stream {
398 location: self.location.clone(),
399 flow_state: self.flow_state.clone(),
400 ir_node: HydroNode::Tee {
401 inner: SharedNode(inner.0.clone()),
402 metadata: metadata.clone(),
403 }
404 .into(),
405 _phantom: PhantomData,
406 }
407 }
408}
409
410impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
411where
412 L: Location<'a>,
413{
414 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
415 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
416 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
417
418 let flow_state = location.flow_state().clone();
419 Stream {
420 location,
421 flow_state,
422 ir_node: RefCell::new(ir_node),
423 _phantom: PhantomData,
424 }
425 }
426
427 /// Returns the [`Location`] where this stream is being materialized.
428 pub fn location(&self) -> &L {
429 &self.location
430 }
431
432 /// Weakens the consistency of this live collection to not guarantee any consistency across
433 /// cluster members (if this collection is on a cluster).
434 pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
435 where
436 L: Location<'a>,
437 {
438 if L::consistency()
439 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
440 {
441 // already no consistency
442 Stream::new(
443 self.location.drop_consistency(),
444 self.ir_node.replace(HydroNode::Placeholder),
445 )
446 } else {
447 Stream::new(
448 self.location.drop_consistency(),
449 HydroNode::Cast {
450 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
451 metadata: self.location.drop_consistency().new_node_metadata(Stream::<
452 T,
453 L::DropConsistency,
454 B,
455 O,
456 R,
457 >::collection_kind(
458 )),
459 },
460 )
461 }
462 }
463
464 /// Casts this live collection to have the consistency guarantees specified in the given
465 /// location type parameter. The developer must ensure that the strengthened consistency
466 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
467 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
468 self,
469 _proof: impl crate::properties::ConsistencyProof,
470 ) -> Stream<T, L2, B, O, R>
471 where
472 L: Location<'a>,
473 {
474 if L::consistency() == L2::consistency() {
475 Stream::new(
476 self.location.with_consistency_of(),
477 self.ir_node.replace(HydroNode::Placeholder),
478 )
479 } else {
480 Stream::new(
481 self.location.with_consistency_of(),
482 HydroNode::AssertIsConsistent {
483 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
484 trusted: false,
485 metadata: self
486 .location
487 .clone()
488 .with_consistency_of::<L2>()
489 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
490 },
491 )
492 }
493 }
494
495 pub(crate) fn assert_has_consistency_of_trusted<
496 L2: Location<'a, DropConsistency = L::DropConsistency>,
497 >(
498 self,
499 _proof: impl crate::properties::ConsistencyProof,
500 ) -> Stream<T, L2, B, O, R>
501 where
502 L: Location<'a>,
503 {
504 if L::consistency() == L2::consistency() {
505 Stream::new(
506 self.location.with_consistency_of(),
507 self.ir_node.replace(HydroNode::Placeholder),
508 )
509 } else {
510 Stream::new(
511 self.location.with_consistency_of(),
512 HydroNode::AssertIsConsistent {
513 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
514 trusted: true,
515 metadata: self
516 .location
517 .clone()
518 .with_consistency_of::<L2>()
519 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
520 },
521 )
522 }
523 }
524
525 pub(crate) fn collection_kind() -> CollectionKind {
526 CollectionKind::Stream {
527 bound: B::BOUND_KIND,
528 order: O::ORDERING_KIND,
529 retry: R::RETRIES_KIND,
530 element_type: quote_type::<T>().into(),
531 }
532 }
533
534 /// Produces a stream based on invoking `f` on each element.
535 /// If you do not want to modify the stream and instead only want to view
536 /// each item use [`Stream::inspect`] instead.
537 ///
538 /// # Example
539 /// ```rust
540 /// # #[cfg(feature = "deploy")] {
541 /// # use hydro_lang::prelude::*;
542 /// # use futures::StreamExt;
543 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
544 /// let words = process.source_iter(q!(vec!["hello", "world"]));
545 /// words.map(q!(|x| x.to_uppercase()))
546 /// # }, |mut stream| async move {
547 /// # for w in vec!["HELLO", "WORLD"] {
548 /// # assert_eq!(stream.next().await.unwrap(), w);
549 /// # }
550 /// # }));
551 /// # }
552 /// ```
553 pub fn map<U, F, C, I, const WAS_MUT: bool>(
554 self,
555 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, I>>,
556 ) -> Stream<U, L, B, O, R>
557 where
558 F: FnMut(T) -> U + 'a,
559 C: ValidMutCommutativityFor<F, T, U, O, WAS_MUT>,
560 I: ValidMutIdempotenceFor<F, T, U, R, WAS_MUT>,
561 {
562 let f = crate::singleton_ref::with_singleton_capture(|| {
563 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
564 proof.register_proof(&expr);
565 expr.into()
566 });
567 Stream::new(
568 self.location.clone(),
569 HydroNode::Map {
570 f,
571 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
572 metadata: self
573 .location
574 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
575 },
576 )
577 }
578
579 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
580 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
581 /// for the output type `U` must produce items in a **deterministic** order.
582 ///
583 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
584 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
585 ///
586 /// # Example
587 /// ```rust
588 /// # #[cfg(feature = "deploy")] {
589 /// # use hydro_lang::prelude::*;
590 /// # use futures::StreamExt;
591 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
592 /// process
593 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
594 /// .flat_map_ordered(q!(|x| x))
595 /// # }, |mut stream| async move {
596 /// // 1, 2, 3, 4
597 /// # for w in (1..5) {
598 /// # assert_eq!(stream.next().await.unwrap(), w);
599 /// # }
600 /// # }));
601 /// # }
602 /// ```
603 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
604 where
605 I: IntoIterator<Item = U>,
606 F: Fn(T) -> I + 'a,
607 {
608 let f = crate::singleton_ref::with_singleton_capture(|| {
609 f.splice_fn1_ctx(&self.location).into()
610 });
611 Stream::new(
612 self.location.clone(),
613 HydroNode::FlatMap {
614 f,
615 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
616 metadata: self
617 .location
618 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
619 },
620 )
621 }
622
623 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
624 /// for the output type `U` to produce items in any order.
625 ///
626 /// # Example
627 /// ```rust
628 /// # #[cfg(feature = "deploy")] {
629 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
630 /// # use futures::StreamExt;
631 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
632 /// process
633 /// .source_iter(q!(vec![
634 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
635 /// std::collections::HashSet::from_iter(vec![3, 4]),
636 /// ]))
637 /// .flat_map_unordered(q!(|x| x))
638 /// # }, |mut stream| async move {
639 /// // 1, 2, 3, 4, but in no particular order
640 /// # let mut results = Vec::new();
641 /// # for w in (1..5) {
642 /// # results.push(stream.next().await.unwrap());
643 /// # }
644 /// # results.sort();
645 /// # assert_eq!(results, vec![1, 2, 3, 4]);
646 /// # }));
647 /// # }
648 /// ```
649 pub fn flat_map_unordered<U, I, F>(
650 self,
651 f: impl IntoQuotedMut<'a, F, L>,
652 ) -> Stream<U, L, B, NoOrder, R>
653 where
654 I: IntoIterator<Item = U>,
655 F: Fn(T) -> I + 'a,
656 {
657 let f = crate::singleton_ref::with_singleton_capture(|| {
658 f.splice_fn1_ctx(&self.location).into()
659 });
660 Stream::new(
661 self.location.clone(),
662 HydroNode::FlatMap {
663 f,
664 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
665 metadata: self
666 .location
667 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
668 },
669 )
670 }
671
672 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
673 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
674 ///
675 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
676 /// not deterministic, use [`Stream::flatten_unordered`] instead.
677 ///
678 /// ```rust
679 /// # #[cfg(feature = "deploy")] {
680 /// # use hydro_lang::prelude::*;
681 /// # use futures::StreamExt;
682 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
683 /// process
684 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
685 /// .flatten_ordered()
686 /// # }, |mut stream| async move {
687 /// // 1, 2, 3, 4
688 /// # for w in (1..5) {
689 /// # assert_eq!(stream.next().await.unwrap(), w);
690 /// # }
691 /// # }));
692 /// # }
693 /// ```
694 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
695 where
696 T: IntoIterator<Item = U>,
697 {
698 self.flat_map_ordered(q!(|d| d))
699 }
700
701 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
702 /// for the element type `T` to produce items in any order.
703 ///
704 /// # Example
705 /// ```rust
706 /// # #[cfg(feature = "deploy")] {
707 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
708 /// # use futures::StreamExt;
709 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
710 /// process
711 /// .source_iter(q!(vec![
712 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
713 /// std::collections::HashSet::from_iter(vec![3, 4]),
714 /// ]))
715 /// .flatten_unordered()
716 /// # }, |mut stream| async move {
717 /// // 1, 2, 3, 4, but in no particular order
718 /// # let mut results = Vec::new();
719 /// # for w in (1..5) {
720 /// # results.push(stream.next().await.unwrap());
721 /// # }
722 /// # results.sort();
723 /// # assert_eq!(results, vec![1, 2, 3, 4]);
724 /// # }));
725 /// # }
726 /// ```
727 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
728 where
729 T: IntoIterator<Item = U>,
730 {
731 self.flat_map_unordered(q!(|d| d))
732 }
733
734 /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
735 /// then emit the elements of that stream one by one. When the inner stream yields
736 /// `Pending`, this operator yields as well.
737 pub fn flat_map_stream_blocking<U, S, F>(
738 self,
739 f: impl IntoQuotedMut<'a, F, L>,
740 ) -> Stream<U, L, B, O, R>
741 where
742 S: futures::Stream<Item = U>,
743 F: Fn(T) -> S + 'a,
744 {
745 let f = f.splice_fn1_ctx(&self.location).into();
746 Stream::new(
747 self.location.clone(),
748 HydroNode::FlatMapStreamBlocking {
749 f,
750 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
751 metadata: self
752 .location
753 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
754 },
755 )
756 }
757
758 /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
759 /// emit its elements one by one. When the inner stream yields `Pending`, this operator
760 /// yields as well.
761 pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
762 where
763 T: futures::Stream<Item = U>,
764 {
765 self.flat_map_stream_blocking(q!(|d| d))
766 }
767
768 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
769 /// `f`, preserving the order of the elements.
770 ///
771 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
772 /// not modify or take ownership of the values. If you need to modify the values while filtering
773 /// use [`Stream::filter_map`] instead.
774 ///
775 /// # Example
776 /// ```rust
777 /// # #[cfg(feature = "deploy")] {
778 /// # use hydro_lang::prelude::*;
779 /// # use futures::StreamExt;
780 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
781 /// process
782 /// .source_iter(q!(vec![1, 2, 3, 4]))
783 /// .filter(q!(|&x| x > 2))
784 /// # }, |mut stream| async move {
785 /// // 3, 4
786 /// # for w in (3..5) {
787 /// # assert_eq!(stream.next().await.unwrap(), w);
788 /// # }
789 /// # }));
790 /// # }
791 /// ```
792 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
793 where
794 F: Fn(&T) -> bool + 'a,
795 {
796 let f = crate::singleton_ref::with_singleton_capture(|| {
797 f.splice_fn1_borrow_ctx(&self.location).into()
798 });
799 Stream::new(
800 self.location.clone(),
801 HydroNode::Filter {
802 f,
803 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
804 metadata: self.location.new_node_metadata(Self::collection_kind()),
805 },
806 )
807 }
808
809 /// Splits the stream into two streams based on a predicate, without cloning elements.
810 ///
811 /// Elements for which `f` returns `true` are sent to the first output stream,
812 /// and elements for which `f` returns `false` are sent to the second output stream.
813 ///
814 /// Unlike using `filter` twice, this only evaluates the predicate once per element
815 /// and does not require `T: Clone`.
816 ///
817 /// The closure `f` receives a reference `&T` rather than an owned value `T` because
818 /// the predicate is only used for routing; the element itself is moved to the
819 /// appropriate output stream.
820 ///
821 /// # Example
822 /// ```rust
823 /// # #[cfg(feature = "deploy")] {
824 /// # use hydro_lang::prelude::*;
825 /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
826 /// # use futures::StreamExt;
827 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
828 /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
829 /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
830 /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
831 /// evens.map(q!(|x| (x, true)))
832 /// .merge_unordered(odds.map(q!(|x| (x, false))))
833 /// # }, |mut stream| async move {
834 /// # let mut results = Vec::new();
835 /// # for _ in 0..6 {
836 /// # results.push(stream.next().await.unwrap());
837 /// # }
838 /// # results.sort();
839 /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
840 /// # }));
841 /// # }
842 /// ```
843 #[expect(
844 clippy::type_complexity,
845 reason = "return type mirrors the input stream type"
846 )]
847 pub fn partition<F>(
848 self,
849 f: impl IntoQuotedMut<'a, F, L>,
850 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
851 where
852 F: Fn(&T) -> bool + 'a,
853 {
854 let f = crate::singleton_ref::with_singleton_capture(|| {
855 f.splice_fn1_borrow_ctx(&self.location).into()
856 });
857 let shared = SharedNode(Rc::new(RefCell::new(
858 self.ir_node.replace(HydroNode::Placeholder),
859 )));
860
861 let true_stream = Stream::new(
862 self.location.clone(),
863 HydroNode::Partition {
864 inner: SharedNode(shared.0.clone()),
865 f: f.clone(),
866 is_true: true,
867 metadata: self.location.new_node_metadata(Self::collection_kind()),
868 },
869 );
870
871 let false_stream = Stream::new(
872 self.location.clone(),
873 HydroNode::Partition {
874 inner: SharedNode(shared.0),
875 f,
876 is_true: false,
877 metadata: self.location.new_node_metadata(Self::collection_kind()),
878 },
879 );
880
881 (true_stream, false_stream)
882 }
883
884 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
885 ///
886 /// # Example
887 /// ```rust
888 /// # #[cfg(feature = "deploy")] {
889 /// # use hydro_lang::prelude::*;
890 /// # use futures::StreamExt;
891 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
892 /// process
893 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
894 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
895 /// # }, |mut stream| async move {
896 /// // 1, 2
897 /// # for w in (1..3) {
898 /// # assert_eq!(stream.next().await.unwrap(), w);
899 /// # }
900 /// # }));
901 /// # }
902 /// ```
903 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
904 where
905 F: Fn(T) -> Option<U> + 'a,
906 {
907 let f = crate::singleton_ref::with_singleton_capture(|| {
908 f.splice_fn1_ctx(&self.location).into()
909 });
910 Stream::new(
911 self.location.clone(),
912 HydroNode::FilterMap {
913 f,
914 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
915 metadata: self
916 .location
917 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
918 },
919 )
920 }
921
922 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
923 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
924 /// If `other` is an empty [`Optional`], no values will be produced.
925 ///
926 /// # Example
927 /// ```rust
928 /// # #[cfg(feature = "deploy")] {
929 /// # use hydro_lang::prelude::*;
930 /// # use futures::StreamExt;
931 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
932 /// let tick = process.tick();
933 /// let batch = process
934 /// .source_iter(q!(vec![1, 2, 3, 4]))
935 /// .batch(&tick, nondet!(/** test */));
936 /// let count = batch.clone().count(); // `count()` returns a singleton
937 /// batch.cross_singleton(count).all_ticks()
938 /// # }, |mut stream| async move {
939 /// // (1, 4), (2, 4), (3, 4), (4, 4)
940 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
941 /// # assert_eq!(stream.next().await.unwrap(), w);
942 /// # }
943 /// # }));
944 /// # }
945 /// ```
946 pub fn cross_singleton<O2>(
947 self,
948 other: impl Into<Optional<O2, L, Bounded>>,
949 ) -> Stream<(T, O2), L, B, O, R>
950 where
951 O2: Clone,
952 {
953 let other: Optional<O2, L, Bounded> = other.into();
954 check_matching_location(&self.location, &other.location);
955
956 Stream::new(
957 self.location.clone(),
958 HydroNode::CrossSingleton {
959 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
960 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
961 metadata: self
962 .location
963 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
964 },
965 )
966 }
967
968 /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
969 ///
970 /// # Example
971 /// ```rust
972 /// # #[cfg(feature = "deploy")] {
973 /// # use hydro_lang::prelude::*;
974 /// # use futures::StreamExt;
975 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
976 /// let tick = process.tick();
977 /// // ticks are lazy by default, forces the second tick to run
978 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
979 ///
980 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
981 /// let batch_first_tick = process
982 /// .source_iter(q!(vec![1, 2, 3, 4]))
983 /// .batch(&tick, nondet!(/** test */));
984 /// let batch_second_tick = process
985 /// .source_iter(q!(vec![5, 6, 7, 8]))
986 /// .batch(&tick, nondet!(/** test */))
987 /// .defer_tick();
988 /// batch_first_tick.chain(batch_second_tick)
989 /// .filter_if(signal)
990 /// .all_ticks()
991 /// # }, |mut stream| async move {
992 /// // [1, 2, 3, 4]
993 /// # for w in vec![1, 2, 3, 4] {
994 /// # assert_eq!(stream.next().await.unwrap(), w);
995 /// # }
996 /// # }));
997 /// # }
998 /// ```
999 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
1000 self.cross_singleton(signal.filter(q!(|b| *b)))
1001 .map(q!(|(d, _)| d))
1002 }
1003
1004 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
1005 ///
1006 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
1007 /// leader of a cluster.
1008 ///
1009 /// # Example
1010 /// ```rust
1011 /// # #[cfg(feature = "deploy")] {
1012 /// # use hydro_lang::prelude::*;
1013 /// # use futures::StreamExt;
1014 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1015 /// let tick = process.tick();
1016 /// // ticks are lazy by default, forces the second tick to run
1017 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1018 ///
1019 /// let batch_first_tick = process
1020 /// .source_iter(q!(vec![1, 2, 3, 4]))
1021 /// .batch(&tick, nondet!(/** test */));
1022 /// let batch_second_tick = process
1023 /// .source_iter(q!(vec![5, 6, 7, 8]))
1024 /// .batch(&tick, nondet!(/** test */))
1025 /// .defer_tick(); // appears on the second tick
1026 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1027 /// batch_first_tick.chain(batch_second_tick)
1028 /// .filter_if_some(some_on_first_tick)
1029 /// .all_ticks()
1030 /// # }, |mut stream| async move {
1031 /// // [1, 2, 3, 4]
1032 /// # for w in vec![1, 2, 3, 4] {
1033 /// # assert_eq!(stream.next().await.unwrap(), w);
1034 /// # }
1035 /// # }));
1036 /// # }
1037 /// ```
1038 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1039 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1040 self.filter_if(signal.is_some())
1041 }
1042
1043 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
1044 ///
1045 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
1046 /// some local state.
1047 ///
1048 /// # Example
1049 /// ```rust
1050 /// # #[cfg(feature = "deploy")] {
1051 /// # use hydro_lang::prelude::*;
1052 /// # use futures::StreamExt;
1053 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1054 /// let tick = process.tick();
1055 /// // ticks are lazy by default, forces the second tick to run
1056 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1057 ///
1058 /// let batch_first_tick = process
1059 /// .source_iter(q!(vec![1, 2, 3, 4]))
1060 /// .batch(&tick, nondet!(/** test */));
1061 /// let batch_second_tick = process
1062 /// .source_iter(q!(vec![5, 6, 7, 8]))
1063 /// .batch(&tick, nondet!(/** test */))
1064 /// .defer_tick(); // appears on the second tick
1065 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1066 /// batch_first_tick.chain(batch_second_tick)
1067 /// .filter_if_none(some_on_first_tick)
1068 /// .all_ticks()
1069 /// # }, |mut stream| async move {
1070 /// // [5, 6, 7, 8]
1071 /// # for w in vec![5, 6, 7, 8] {
1072 /// # assert_eq!(stream.next().await.unwrap(), w);
1073 /// # }
1074 /// # }));
1075 /// # }
1076 /// ```
1077 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1078 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1079 self.filter_if(other.is_none())
1080 }
1081
1082 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
1083 /// returning all tupled pairs.
1084 ///
1085 /// When the right side is [`Bounded`], it is accumulated first and the left side streams
1086 /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
1087 /// symmetric hash join is used and ordering is [`NoOrder`].
1088 ///
1089 /// # Example
1090 /// ```rust
1091 /// # #[cfg(feature = "deploy")] {
1092 /// # use hydro_lang::prelude::*;
1093 /// # use std::collections::HashSet;
1094 /// # use futures::StreamExt;
1095 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1096 /// let tick = process.tick();
1097 /// let stream1 = process.source_iter(q!(vec![1, 2]));
1098 /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
1099 /// stream1.cross_product(stream2)
1100 /// # }, |mut stream| async move {
1101 /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
1102 /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
1103 /// # stream.map(|i| assert!(expected.contains(&i)));
1104 /// # }));
1105 /// # }
1106 #[expect(
1107 clippy::type_complexity,
1108 reason = "MinRetries projection in return type"
1109 )]
1110 pub fn cross_product<T2, B2: Boundedness, O2: Ordering, R2: Retries>(
1111 self,
1112 other: Stream<T2, L, B2, O2, R2>,
1113 ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
1114 where
1115 T: Clone,
1116 T2: Clone,
1117 R: MinRetries<R2>,
1118 {
1119 self.map(q!(|v| ((), v)))
1120 .join(other.map(q!(|v| ((), v))))
1121 .map(q!(|((), (v1, v2))| (v1, v2)))
1122 }
1123
1124 /// Takes one stream as input and filters out any duplicate occurrences. The output
1125 /// contains all unique values from the input.
1126 ///
1127 /// # Example
1128 /// ```rust
1129 /// # #[cfg(feature = "deploy")] {
1130 /// # use hydro_lang::prelude::*;
1131 /// # use futures::StreamExt;
1132 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1133 /// let tick = process.tick();
1134 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1135 /// # }, |mut stream| async move {
1136 /// # for w in vec![1, 2, 3, 4] {
1137 /// # assert_eq!(stream.next().await.unwrap(), w);
1138 /// # }
1139 /// # }));
1140 /// # }
1141 /// ```
1142 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1143 where
1144 T: Eq + Hash,
1145 {
1146 Stream::new(
1147 self.location.clone(),
1148 HydroNode::Unique {
1149 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1150 metadata: self
1151 .location
1152 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1153 },
1154 )
1155 }
1156
1157 /// Outputs everything in this stream that is *not* contained in the `other` stream.
1158 ///
1159 /// The `other` stream must be [`Bounded`], since this function will wait until
1160 /// all its elements are available before producing any output.
1161 /// # Example
1162 /// ```rust
1163 /// # #[cfg(feature = "deploy")] {
1164 /// # use hydro_lang::prelude::*;
1165 /// # use futures::StreamExt;
1166 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1167 /// let tick = process.tick();
1168 /// let stream = process
1169 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1170 /// .batch(&tick, nondet!(/** test */));
1171 /// let batch = process
1172 /// .source_iter(q!(vec![1, 2]))
1173 /// .batch(&tick, nondet!(/** test */));
1174 /// stream.filter_not_in(batch).all_ticks()
1175 /// # }, |mut stream| async move {
1176 /// # for w in vec![3, 4] {
1177 /// # assert_eq!(stream.next().await.unwrap(), w);
1178 /// # }
1179 /// # }));
1180 /// # }
1181 /// ```
1182 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1183 where
1184 T: Eq + Hash,
1185 B2: IsBounded,
1186 {
1187 check_matching_location(&self.location, &other.location);
1188
1189 Stream::new(
1190 self.location.clone(),
1191 HydroNode::Difference {
1192 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1193 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1194 metadata: self
1195 .location
1196 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1197 },
1198 )
1199 }
1200
1201 /// An operator which allows you to "inspect" each element of a stream without
1202 /// modifying it. The closure `f` is called on a reference to each item. This is
1203 /// mainly useful for debugging, and should not be used to generate side-effects.
1204 ///
1205 /// # Example
1206 /// ```rust
1207 /// # #[cfg(feature = "deploy")] {
1208 /// # use hydro_lang::prelude::*;
1209 /// # use futures::StreamExt;
1210 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1211 /// let nums = process.source_iter(q!(vec![1, 2]));
1212 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1213 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1214 /// # }, |mut stream| async move {
1215 /// # for w in vec![1, 2] {
1216 /// # assert_eq!(stream.next().await.unwrap(), w);
1217 /// # }
1218 /// # }));
1219 /// # }
1220 /// ```
1221 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L::DropConsistency>) -> Self
1222 where
1223 F: Fn(&T) + 'a,
1224 {
1225 let f = crate::singleton_ref::with_singleton_capture(|| {
1226 f.splice_fn1_borrow_ctx(&self.location.drop_consistency())
1227 .into()
1228 });
1229
1230 Stream::new(
1231 self.location.clone(),
1232 HydroNode::Inspect {
1233 f,
1234 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1235 metadata: self.location.new_node_metadata(Self::collection_kind()),
1236 },
1237 )
1238 }
1239
1240 /// Executes the provided closure for every element in this stream.
1241 ///
1242 /// Because the closure may have side effects, the stream must have deterministic order
1243 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1244 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1245 /// [`Stream::assume_retries`] with an explanation for why this is the case.
1246 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1247 where
1248 O: IsOrdered,
1249 R: IsExactlyOnce,
1250 {
1251 let f = f.splice_fn1_ctx(&self.location).into();
1252 self.location
1253 .flow_state()
1254 .borrow_mut()
1255 .push_root(HydroRoot::ForEach {
1256 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1257 f,
1258 op_metadata: HydroIrOpMetadata::new(),
1259 });
1260 }
1261
1262 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1263 /// TCP socket to some other server. You should _not_ use this API for interacting with
1264 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1265 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1266 /// interaction with asynchronous sinks.
1267 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1268 where
1269 O: IsOrdered,
1270 R: IsExactlyOnce,
1271 S: 'a + futures::Sink<T> + Unpin,
1272 {
1273 self.location
1274 .flow_state()
1275 .borrow_mut()
1276 .push_root(HydroRoot::DestSink {
1277 sink: sink.splice_typed_ctx(&self.location).into(),
1278 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1279 op_metadata: HydroIrOpMetadata::new(),
1280 });
1281 }
1282
1283 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1284 ///
1285 /// # Example
1286 /// ```rust
1287 /// # #[cfg(feature = "deploy")] {
1288 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1289 /// # use futures::StreamExt;
1290 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1291 /// let tick = process.tick();
1292 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1293 /// numbers.enumerate()
1294 /// # }, |mut stream| async move {
1295 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1296 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1297 /// # assert_eq!(stream.next().await.unwrap(), w);
1298 /// # }
1299 /// # }));
1300 /// # }
1301 /// ```
1302 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1303 where
1304 O: IsOrdered,
1305 R: IsExactlyOnce,
1306 {
1307 Stream::new(
1308 self.location.clone(),
1309 HydroNode::Enumerate {
1310 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1311 metadata: self.location.new_node_metadata(Stream::<
1312 (usize, T),
1313 L,
1314 B,
1315 TotalOrder,
1316 ExactlyOnce,
1317 >::collection_kind()),
1318 },
1319 )
1320 }
1321
1322 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1323 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1324 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1325 ///
1326 /// Depending on the input stream guarantees, the closure may need to be commutative
1327 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1328 ///
1329 /// # Example
1330 /// ```rust
1331 /// # #[cfg(feature = "deploy")] {
1332 /// # use hydro_lang::prelude::*;
1333 /// # use futures::StreamExt;
1334 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1335 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1336 /// words
1337 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1338 /// .into_stream()
1339 /// # }, |mut stream| async move {
1340 /// // "HELLOWORLD"
1341 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1342 /// # }));
1343 /// # }
1344 /// ```
1345 pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1346 self,
1347 init: impl IntoQuotedMut<'a, I, L>,
1348 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1349 ) -> Singleton<A, L, B2>
1350 where
1351 I: Fn() -> A + 'a,
1352 F: 'a + Fn(&mut A, T),
1353 C: ValidCommutativityFor<O>,
1354 Idemp: ValidIdempotenceFor<R>,
1355 B: ApplyMonotoneStream<M, B2>,
1356 {
1357 let init = init.splice_fn0_ctx(&self.location).into();
1358 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1359 proof.register_proof(&comb);
1360
1361 // Only assume_retries (for idempotence), not assume_ordering.
1362 // The fold hook in the simulator handles ordering non-determinism directly.
1363 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1364 let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1365
1366 let core = HydroNode::Fold {
1367 init,
1368 acc: comb.into(),
1369 input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1370 metadata: retried
1371 .location
1372 .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1373 // we do not guarantee consistency at this point because if the algebraic properties
1374 // do not hold in practice, replica consistency may fail to be maintained, so we
1375 // would like the simulator to assert consistency; in the future, this will be dynamic
1376 // based on the proof mechanism
1377 };
1378
1379 Singleton::new(retried.location.clone(), core)
1380 .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1381 }
1382
1383 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1384 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1385 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1386 /// reference, so that it can be modified in place.
1387 ///
1388 /// Depending on the input stream guarantees, the closure may need to be commutative
1389 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1390 ///
1391 /// # Example
1392 /// ```rust
1393 /// # #[cfg(feature = "deploy")] {
1394 /// # use hydro_lang::prelude::*;
1395 /// # use futures::StreamExt;
1396 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1397 /// let bools = process.source_iter(q!(vec![false, true, false]));
1398 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1399 /// # }, |mut stream| async move {
1400 /// // true
1401 /// # assert_eq!(stream.next().await.unwrap(), true);
1402 /// # }));
1403 /// # }
1404 /// ```
1405 pub fn reduce<F, C, Idemp>(
1406 self,
1407 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1408 ) -> Optional<T, L, B>
1409 where
1410 F: Fn(&mut T, T) + 'a,
1411 C: ValidCommutativityFor<O>,
1412 Idemp: ValidIdempotenceFor<R>,
1413 {
1414 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1415 proof.register_proof(&f);
1416
1417 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1418 let ordered_etc: Stream<T, L::DropConsistency, B> =
1419 self.assume_retries(nondet).assume_ordering(nondet);
1420
1421 let core = HydroNode::Reduce {
1422 f: f.into(),
1423 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1424 metadata: ordered_etc
1425 .location
1426 .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1427 };
1428
1429 Optional::new(ordered_etc.location.clone(), core)
1430 .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1431 }
1432
1433 /// Computes the maximum element in the stream as an [`Optional`], which
1434 /// will be empty until the first element in the input arrives.
1435 ///
1436 /// # Example
1437 /// ```rust
1438 /// # #[cfg(feature = "deploy")] {
1439 /// # use hydro_lang::prelude::*;
1440 /// # use futures::StreamExt;
1441 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1442 /// let tick = process.tick();
1443 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1444 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1445 /// batch.max().all_ticks()
1446 /// # }, |mut stream| async move {
1447 /// // 4
1448 /// # assert_eq!(stream.next().await.unwrap(), 4);
1449 /// # }));
1450 /// # }
1451 /// ```
1452 pub fn max(self) -> Optional<T, L, B>
1453 where
1454 T: Ord,
1455 {
1456 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1457 .assume_ordering_trusted_bounded::<TotalOrder>(
1458 nondet!(/** max is commutative, but order affects intermediates */),
1459 )
1460 .reduce(q!(|curr, new| {
1461 if new > *curr {
1462 *curr = new;
1463 }
1464 }))
1465 }
1466
1467 /// Computes the minimum element in the stream as an [`Optional`], which
1468 /// will be empty until the first element in the input arrives.
1469 ///
1470 /// # Example
1471 /// ```rust
1472 /// # #[cfg(feature = "deploy")] {
1473 /// # use hydro_lang::prelude::*;
1474 /// # use futures::StreamExt;
1475 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1476 /// let tick = process.tick();
1477 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1478 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1479 /// batch.min().all_ticks()
1480 /// # }, |mut stream| async move {
1481 /// // 1
1482 /// # assert_eq!(stream.next().await.unwrap(), 1);
1483 /// # }));
1484 /// # }
1485 /// ```
1486 pub fn min(self) -> Optional<T, L, B>
1487 where
1488 T: Ord,
1489 {
1490 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1491 .assume_ordering_trusted_bounded::<TotalOrder>(
1492 nondet!(/** max is commutative, but order affects intermediates */),
1493 )
1494 .reduce(q!(|curr, new| {
1495 if new < *curr {
1496 *curr = new;
1497 }
1498 }))
1499 }
1500
1501 /// Computes the first element in the stream as an [`Optional`], which
1502 /// will be empty until the first element in the input arrives.
1503 ///
1504 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1505 /// re-ordering of elements may cause the first element to change.
1506 ///
1507 /// # Example
1508 /// ```rust
1509 /// # #[cfg(feature = "deploy")] {
1510 /// # use hydro_lang::prelude::*;
1511 /// # use futures::StreamExt;
1512 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1513 /// let tick = process.tick();
1514 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1515 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1516 /// batch.first().all_ticks()
1517 /// # }, |mut stream| async move {
1518 /// // 1
1519 /// # assert_eq!(stream.next().await.unwrap(), 1);
1520 /// # }));
1521 /// # }
1522 /// ```
1523 pub fn first(self) -> Optional<T, L, B>
1524 where
1525 O: IsOrdered,
1526 {
1527 self.make_totally_ordered()
1528 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1529 .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1530 .reduce(q!(|_, _| {}))
1531 }
1532
1533 /// Computes the last element in the stream as an [`Optional`], which
1534 /// will be empty until an element in the input arrives.
1535 ///
1536 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1537 /// re-ordering of elements may cause the last element to change.
1538 ///
1539 /// # Example
1540 /// ```rust
1541 /// # #[cfg(feature = "deploy")] {
1542 /// # use hydro_lang::prelude::*;
1543 /// # use futures::StreamExt;
1544 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1545 /// let tick = process.tick();
1546 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1547 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1548 /// batch.last().all_ticks()
1549 /// # }, |mut stream| async move {
1550 /// // 4
1551 /// # assert_eq!(stream.next().await.unwrap(), 4);
1552 /// # }));
1553 /// # }
1554 /// ```
1555 pub fn last(self) -> Optional<T, L, B>
1556 where
1557 O: IsOrdered,
1558 {
1559 self.make_totally_ordered()
1560 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1561 .reduce(q!(|curr, new| *curr = new))
1562 }
1563
1564 /// Returns a stream containing at most the first `n` elements of the input stream,
1565 /// preserving the original order. Similar to `LIMIT` in SQL.
1566 ///
1567 /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1568 /// retries, since the result depends on the order and cardinality of elements.
1569 ///
1570 /// # Example
1571 /// ```rust
1572 /// # #[cfg(feature = "deploy")] {
1573 /// # use hydro_lang::prelude::*;
1574 /// # use futures::StreamExt;
1575 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1576 /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1577 /// numbers.limit(q!(3))
1578 /// # }, |mut stream| async move {
1579 /// // 10, 20, 30
1580 /// # for w in vec![10, 20, 30] {
1581 /// # assert_eq!(stream.next().await.unwrap(), w);
1582 /// # }
1583 /// # }));
1584 /// # }
1585 /// ```
1586 pub fn limit(
1587 self,
1588 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1589 ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1590 where
1591 O: IsOrdered,
1592 R: IsExactlyOnce,
1593 {
1594 self.generator(
1595 q!(|| 0usize),
1596 q!(move |count, item| {
1597 if *count == n {
1598 Generate::Break
1599 } else {
1600 *count += 1;
1601 if *count == n {
1602 Generate::Return(item)
1603 } else {
1604 Generate::Yield(item)
1605 }
1606 }
1607 }),
1608 )
1609 }
1610
1611 /// Collects all the elements of this stream into a single [`Vec`] element.
1612 ///
1613 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1614 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1615 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1616 /// the vector at an arbitrary point in time.
1617 ///
1618 /// # Example
1619 /// ```rust
1620 /// # #[cfg(feature = "deploy")] {
1621 /// # use hydro_lang::prelude::*;
1622 /// # use futures::StreamExt;
1623 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1624 /// let tick = process.tick();
1625 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1626 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1627 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1628 /// # }, |mut stream| async move {
1629 /// // [ vec![1, 2, 3, 4] ]
1630 /// # for w in vec![vec![1, 2, 3, 4]] {
1631 /// # assert_eq!(stream.next().await.unwrap(), w);
1632 /// # }
1633 /// # }));
1634 /// # }
1635 /// ```
1636 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1637 where
1638 O: IsOrdered,
1639 R: IsExactlyOnce,
1640 {
1641 self.make_totally_ordered().make_exactly_once().fold(
1642 q!(|| vec![]),
1643 q!(|acc, v| {
1644 acc.push(v);
1645 }),
1646 )
1647 }
1648
1649 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1650 /// and emitting each intermediate result.
1651 ///
1652 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1653 /// containing all intermediate accumulated values. The scan operation can also terminate early
1654 /// by returning `None`.
1655 ///
1656 /// The function takes a mutable reference to the accumulator and the current element, and returns
1657 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1658 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1659 ///
1660 /// # Examples
1661 ///
1662 /// Basic usage - running sum:
1663 /// ```rust
1664 /// # #[cfg(feature = "deploy")] {
1665 /// # use hydro_lang::prelude::*;
1666 /// # use futures::StreamExt;
1667 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1668 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1669 /// q!(|| 0),
1670 /// q!(|acc, x| {
1671 /// *acc += x;
1672 /// Some(*acc)
1673 /// }),
1674 /// )
1675 /// # }, |mut stream| async move {
1676 /// // Output: 1, 3, 6, 10
1677 /// # for w in vec![1, 3, 6, 10] {
1678 /// # assert_eq!(stream.next().await.unwrap(), w);
1679 /// # }
1680 /// # }));
1681 /// # }
1682 /// ```
1683 ///
1684 /// Early termination example:
1685 /// ```rust
1686 /// # #[cfg(feature = "deploy")] {
1687 /// # use hydro_lang::prelude::*;
1688 /// # use futures::StreamExt;
1689 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1690 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1691 /// q!(|| 1),
1692 /// q!(|state, x| {
1693 /// *state = *state * x;
1694 /// if *state > 6 {
1695 /// None // Terminate the stream
1696 /// } else {
1697 /// Some(-*state)
1698 /// }
1699 /// }),
1700 /// )
1701 /// # }, |mut stream| async move {
1702 /// // Output: -1, -2, -6
1703 /// # for w in vec![-1, -2, -6] {
1704 /// # assert_eq!(stream.next().await.unwrap(), w);
1705 /// # }
1706 /// # }));
1707 /// # }
1708 /// ```
1709 pub fn scan<A, U, I, F>(
1710 self,
1711 init: impl IntoQuotedMut<'a, I, L>,
1712 f: impl IntoQuotedMut<'a, F, L>,
1713 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1714 where
1715 O: IsOrdered,
1716 R: IsExactlyOnce,
1717 I: Fn() -> A + 'a,
1718 F: Fn(&mut A, T) -> Option<U> + 'a,
1719 {
1720 let init = init.splice_fn0_ctx(&self.location).into();
1721 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1722
1723 Stream::new(
1724 self.location.clone(),
1725 HydroNode::Scan {
1726 init,
1727 acc: f,
1728 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1729 metadata: self.location.new_node_metadata(
1730 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1731 ),
1732 },
1733 )
1734 }
1735
1736 /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1737 /// stream, maintaining an internal state (accumulator) and emitting the values returned
1738 /// by the function.
1739 ///
1740 /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1741 /// future. The future is polled to completion. If it resolves to `Some`, the value is
1742 /// emitted. If it resolves to `None`, the item is filtered out.
1743 ///
1744 /// # Examples
1745 ///
1746 /// ```rust
1747 /// # #[cfg(feature = "deploy")] {
1748 /// # use hydro_lang::prelude::*;
1749 /// # use futures::StreamExt;
1750 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1751 /// process
1752 /// .source_iter(q!(vec![1, 2, 3, 4]))
1753 /// .scan_async_blocking(
1754 /// q!(|| 0),
1755 /// q!(|acc, x| {
1756 /// *acc += x;
1757 /// let val = *acc;
1758 /// async move { Some(val) }
1759 /// }),
1760 /// )
1761 /// # }, |mut stream| async move {
1762 /// // Output: 1, 3, 6, 10
1763 /// # for w in vec![1, 3, 6, 10] {
1764 /// # assert_eq!(stream.next().await.unwrap(), w);
1765 /// # }
1766 /// # }));
1767 /// # }
1768 /// ```
1769 pub fn scan_async_blocking<A, U, I, F, Fut>(
1770 self,
1771 init: impl IntoQuotedMut<'a, I, L>,
1772 f: impl IntoQuotedMut<'a, F, L>,
1773 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1774 where
1775 O: IsOrdered,
1776 R: IsExactlyOnce,
1777 I: Fn() -> A + 'a,
1778 F: Fn(&mut A, T) -> Fut + 'a,
1779 Fut: Future<Output = Option<U>> + 'a,
1780 {
1781 let init = init.splice_fn0_ctx(&self.location).into();
1782 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1783
1784 Stream::new(
1785 self.location.clone(),
1786 HydroNode::ScanAsyncBlocking {
1787 init,
1788 acc: f,
1789 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1790 metadata: self.location.new_node_metadata(
1791 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1792 ),
1793 },
1794 )
1795 }
1796
1797 /// Iteratively processes the elements of the stream using a state machine that can yield
1798 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1799 /// syntax in Rust, without requiring special syntax.
1800 ///
1801 /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1802 /// state. The second argument defines the processing logic, taking in a mutable reference
1803 /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1804 /// variants define what is emitted and whether further inputs should be processed.
1805 ///
1806 /// # Example
1807 /// ```rust
1808 /// # #[cfg(feature = "deploy")] {
1809 /// # use hydro_lang::prelude::*;
1810 /// # use futures::StreamExt;
1811 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1812 /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1813 /// q!(|| 0),
1814 /// q!(|acc, x| {
1815 /// *acc += x;
1816 /// if *acc > 100 {
1817 /// hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1818 /// } else if *acc % 2 == 0 {
1819 /// hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1820 /// } else {
1821 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1822 /// }
1823 /// }),
1824 /// )
1825 /// # }, |mut stream| async move {
1826 /// // Output: "even", "done!"
1827 /// # let mut results = Vec::new();
1828 /// # for _ in 0..2 {
1829 /// # results.push(stream.next().await.unwrap());
1830 /// # }
1831 /// # results.sort();
1832 /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1833 /// # }));
1834 /// # }
1835 /// ```
1836 pub fn generator<A, U, I, F>(
1837 self,
1838 init: impl IntoQuotedMut<'a, I, L> + Copy,
1839 f: impl IntoQuotedMut<'a, F, L> + Copy,
1840 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1841 where
1842 O: IsOrdered,
1843 R: IsExactlyOnce,
1844 I: Fn() -> A + 'a,
1845 F: Fn(&mut A, T) -> Generate<U> + 'a,
1846 {
1847 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1848 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1849
1850 let this = self.make_totally_ordered().make_exactly_once();
1851
1852 // State is Option<Option<A>>:
1853 // None = not yet initialized
1854 // Some(Some(a)) = active with state a
1855 // Some(None) = terminated
1856 let scan_init = q!(|| None)
1857 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1858 .into();
1859 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1860 if state.is_none() {
1861 *state = Some(Some(init()));
1862 }
1863 match state {
1864 Some(Some(state_value)) => match f(state_value, v) {
1865 Generate::Yield(out) => Some(Some(out)),
1866 Generate::Return(out) => {
1867 *state = Some(None);
1868 Some(Some(out))
1869 }
1870 // Unlike KeyedStream, we can terminate the scan directly on
1871 // Break/Return because there is only one state (no other keys
1872 // that still need processing).
1873 Generate::Break => None,
1874 Generate::Continue => Some(None),
1875 },
1876 // State is Some(None) after Return; terminate the scan.
1877 _ => None,
1878 }
1879 })
1880 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1881 .into();
1882
1883 let scan_node = HydroNode::Scan {
1884 init: scan_init,
1885 acc: scan_f,
1886 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1887 metadata: this.location.new_node_metadata(Stream::<
1888 Option<U>,
1889 L,
1890 B,
1891 TotalOrder,
1892 ExactlyOnce,
1893 >::collection_kind()),
1894 };
1895
1896 let flatten_f = q!(|d| d)
1897 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1898 .into();
1899 let flatten_node = HydroNode::FlatMap {
1900 f: flatten_f,
1901 input: Box::new(scan_node),
1902 metadata: this
1903 .location
1904 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1905 };
1906
1907 Stream::new(this.location.clone(), flatten_node)
1908 }
1909
1910 /// Given a time interval, returns a stream corresponding to samples taken from the
1911 /// stream roughly at that interval. The output will have elements in the same order
1912 /// as the input, but with arbitrary elements skipped between samples. There is also
1913 /// no guarantee on the exact timing of the samples.
1914 ///
1915 /// # Non-Determinism
1916 /// The output stream is non-deterministic in which elements are sampled, since this
1917 /// is controlled by a clock.
1918 pub fn sample_every(
1919 self,
1920 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1921 nondet: NonDet,
1922 ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1923 where
1924 L: TopLevel<'a>,
1925 {
1926 let samples = self.location.source_interval(interval);
1927
1928 let tick = self.location.tick();
1929 self.batch(&tick, nondet)
1930 .filter_if(samples.batch(&tick, nondet).first().is_some())
1931 .all_ticks()
1932 .weaken_retries()
1933 }
1934
1935 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1936 /// stream has not emitted a value since that duration.
1937 ///
1938 /// # Non-Determinism
1939 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1940 /// samples take place, timeouts may be non-deterministically generated or missed,
1941 /// and the notification of the timeout may be delayed as well. There is also no
1942 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1943 /// detected based on when the next sample is taken.
1944 pub fn timeout(
1945 self,
1946 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
1947 nondet: NonDet,
1948 ) -> Optional<(), L::DropConsistency, Unbounded>
1949 where
1950 L: TopLevel<'a>,
1951 {
1952 let tick = self.location.tick();
1953
1954 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1955 q!(|| None),
1956 q!(
1957 |latest, _| {
1958 *latest = Some(Instant::now());
1959 },
1960 commutative = manual_proof!(/** TODO */)
1961 ),
1962 );
1963
1964 latest_received
1965 .snapshot(&tick, nondet)
1966 .filter_map(q!(move |latest_received| {
1967 if let Some(latest_received) = latest_received {
1968 if Instant::now().duration_since(latest_received) > duration {
1969 Some(())
1970 } else {
1971 None
1972 }
1973 } else {
1974 Some(())
1975 }
1976 }))
1977 .latest()
1978 }
1979
1980 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1981 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1982 ///
1983 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1984 /// processed before an acknowledgement is emitted.
1985 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1986 let id = self.location.flow_state().borrow_mut().next_clock_id();
1987 let out_location = Atomic {
1988 tick: Tick {
1989 id,
1990 l: self.location.clone(),
1991 },
1992 };
1993 Stream::new(
1994 out_location.clone(),
1995 HydroNode::BeginAtomic {
1996 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1997 metadata: out_location
1998 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1999 },
2000 )
2001 }
2002
2003 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2004 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2005 /// the order of the input. The output stream will execute in the [`Tick`] that was
2006 /// used to create the atomic section.
2007 ///
2008 /// # Non-Determinism
2009 /// The batch boundaries are non-deterministic and may change across executions.
2010 pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2011 self,
2012 tick: &Tick<L2>,
2013 _nondet: NonDet,
2014 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2015 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2016 Stream::new(
2017 tick.drop_consistency(),
2018 HydroNode::Batch {
2019 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2020 metadata: tick
2021 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2022 },
2023 )
2024 }
2025
2026 /// An operator which allows you to "name" a `HydroNode`.
2027 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
2028 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2029 {
2030 let mut node = self.ir_node.borrow_mut();
2031 let metadata = node.metadata_mut();
2032 metadata.tag = Some(name.to_owned());
2033 }
2034 self
2035 }
2036
2037 /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
2038 /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
2039 /// so uses must be carefully vetted.
2040 pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2041 where
2042 B: IsBounded,
2043 {
2044 Optional::new(
2045 self.location.clone(),
2046 HydroNode::Cast {
2047 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2048 metadata: self
2049 .location
2050 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2051 },
2052 )
2053 }
2054
2055 pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2056 if O::ORDERING_KIND == O2::ORDERING_KIND {
2057 Stream::new(
2058 self.location.clone(),
2059 self.ir_node.replace(HydroNode::Placeholder),
2060 )
2061 } else {
2062 panic!(
2063 "Runtime ordering {:?} did not match requested cast {:?}.",
2064 O::ORDERING_KIND,
2065 O2::ORDERING_KIND
2066 )
2067 }
2068 }
2069
2070 /// Explicitly "casts" the stream to a type with a different ordering
2071 /// guarantee. Useful in unsafe code where the ordering cannot be proven
2072 /// by the type-system.
2073 ///
2074 /// # Non-Determinism
2075 /// This function is used as an escape hatch, and any mistakes in the
2076 /// provided ordering guarantee will propagate into the guarantees
2077 /// for the rest of the program.
2078 pub fn assume_ordering<O2: Ordering>(
2079 self,
2080 _nondet: NonDet,
2081 ) -> Stream<T, L::DropConsistency, B, O2, R> {
2082 if O::ORDERING_KIND == O2::ORDERING_KIND {
2083 self.use_ordering_type().weaken_consistency()
2084 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2085 // We can always weaken the ordering guarantee
2086 let target_location = self.location().drop_consistency();
2087 Stream::new(
2088 target_location.clone(),
2089 HydroNode::Cast {
2090 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2091 metadata: target_location
2092 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2093 },
2094 )
2095 } else {
2096 let target_location = self.location().drop_consistency();
2097 Stream::new(
2098 target_location.clone(),
2099 HydroNode::ObserveNonDet {
2100 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2101 trusted: false,
2102 metadata: target_location
2103 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2104 },
2105 )
2106 }
2107 }
2108
2109 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
2110 // intermediate states will not be revealed
2111 fn assume_ordering_trusted_bounded<O2: Ordering>(
2112 self,
2113 nondet: NonDet,
2114 ) -> Stream<T, L, B, O2, R> {
2115 if B::BOUNDED {
2116 self.assume_ordering_trusted(nondet)
2117 } else {
2118 let self_location = self.location.clone();
2119 let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2120 Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2121 }
2122 }
2123
2124 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2125 // is not observable
2126 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2127 self,
2128 _nondet: NonDet,
2129 ) -> Stream<T, L, B, O2, R> {
2130 if O::ORDERING_KIND == O2::ORDERING_KIND {
2131 self.use_ordering_type()
2132 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2133 // We can always weaken the ordering guarantee
2134 Stream::new(
2135 self.location.clone(),
2136 HydroNode::Cast {
2137 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2138 metadata: self
2139 .location
2140 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2141 },
2142 )
2143 } else {
2144 Stream::new(
2145 self.location.clone(),
2146 HydroNode::ObserveNonDet {
2147 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2148 trusted: true,
2149 metadata: self
2150 .location
2151 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2152 },
2153 )
2154 }
2155 }
2156
2157 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2158 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2159 /// which is always safe because that is the weakest possible guarantee.
2160 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2161 self.weaken_ordering::<NoOrder>()
2162 }
2163
2164 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2165 /// enforcing that `O2` is weaker than the input ordering guarantee.
2166 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2167 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2168 self.assume_ordering_trusted::<O2>(nondet)
2169 }
2170
2171 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2172 /// implies that `O == TotalOrder`.
2173 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2174 where
2175 O: IsOrdered,
2176 {
2177 self.assume_ordering_trusted(nondet!(/** no-op */))
2178 }
2179
2180 /// Explicitly "casts" the stream to a type with a different retries
2181 /// guarantee. Useful in unsafe code where the lack of retries cannot
2182 /// be proven by the type-system.
2183 ///
2184 /// # Non-Determinism
2185 /// This function is used as an escape hatch, and any mistakes in the
2186 /// provided retries guarantee will propagate into the guarantees
2187 /// for the rest of the program.
2188 pub fn assume_retries<R2: Retries>(
2189 self,
2190 _nondet: NonDet,
2191 ) -> Stream<T, L::DropConsistency, B, O, R2> {
2192 if R::RETRIES_KIND == R2::RETRIES_KIND {
2193 Stream::new(
2194 self.location.drop_consistency(),
2195 self.ir_node.replace(HydroNode::Placeholder),
2196 )
2197 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2198 // We can always weaken the retries guarantee
2199 let target_location = self.location.drop_consistency();
2200 Stream::new(
2201 target_location.clone(),
2202 HydroNode::Cast {
2203 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2204 metadata: target_location
2205 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2206 },
2207 )
2208 } else {
2209 let target_location = self.location.drop_consistency();
2210 Stream::new(
2211 target_location.clone(),
2212 HydroNode::ObserveNonDet {
2213 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2214 trusted: false,
2215 metadata: target_location
2216 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2217 },
2218 )
2219 }
2220 }
2221
2222 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2223 // is not observable
2224 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2225 if R::RETRIES_KIND == R2::RETRIES_KIND {
2226 Stream::new(
2227 self.location.clone(),
2228 self.ir_node.replace(HydroNode::Placeholder),
2229 )
2230 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2231 // We can always weaken the retries guarantee
2232 Stream::new(
2233 self.location.clone(),
2234 HydroNode::Cast {
2235 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2236 metadata: self
2237 .location
2238 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2239 },
2240 )
2241 } else {
2242 Stream::new(
2243 self.location.clone(),
2244 HydroNode::ObserveNonDet {
2245 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2246 trusted: true,
2247 metadata: self
2248 .location
2249 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2250 },
2251 )
2252 }
2253 }
2254
2255 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2256 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2257 /// which is always safe because that is the weakest possible guarantee.
2258 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2259 self.weaken_retries::<AtLeastOnce>()
2260 }
2261
2262 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2263 /// enforcing that `R2` is weaker than the input retries guarantee.
2264 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2265 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2266 self.assume_retries_trusted::<R2>(nondet)
2267 }
2268
2269 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2270 /// implies that `R == ExactlyOnce`.
2271 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2272 where
2273 R: IsExactlyOnce,
2274 {
2275 self.assume_retries_trusted(nondet!(/** no-op */))
2276 }
2277
2278 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2279 /// implies that `B == Bounded`.
2280 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2281 where
2282 B: IsBounded,
2283 {
2284 self.weaken_boundedness()
2285 }
2286
2287 /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2288 /// which implies that `B == Bounded`.
2289 pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2290 if B::BOUNDED == B2::BOUNDED {
2291 Stream::new(
2292 self.location.clone(),
2293 self.ir_node.replace(HydroNode::Placeholder),
2294 )
2295 } else {
2296 // We can always weaken the boundedness
2297 Stream::new(
2298 self.location.clone(),
2299 HydroNode::Cast {
2300 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2301 metadata: self
2302 .location
2303 .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2304 },
2305 )
2306 }
2307 }
2308}
2309
2310impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2311where
2312 L: Location<'a>,
2313{
2314 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2315 ///
2316 /// # Example
2317 /// ```rust
2318 /// # #[cfg(feature = "deploy")] {
2319 /// # use hydro_lang::prelude::*;
2320 /// # use futures::StreamExt;
2321 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2322 /// process.source_iter(q!(&[1, 2, 3])).cloned()
2323 /// # }, |mut stream| async move {
2324 /// // 1, 2, 3
2325 /// # for w in vec![1, 2, 3] {
2326 /// # assert_eq!(stream.next().await.unwrap(), w);
2327 /// # }
2328 /// # }));
2329 /// # }
2330 /// ```
2331 pub fn cloned(self) -> Stream<T, L, B, O, R>
2332 where
2333 T: Clone,
2334 {
2335 self.map(q!(|d| d.clone()))
2336 }
2337}
2338
2339impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2340where
2341 L: Location<'a>,
2342{
2343 /// Computes the number of elements in the stream as a [`Singleton`].
2344 ///
2345 /// # Example
2346 /// ```rust
2347 /// # #[cfg(feature = "deploy")] {
2348 /// # use hydro_lang::prelude::*;
2349 /// # use futures::StreamExt;
2350 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2351 /// let tick = process.tick();
2352 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2353 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2354 /// batch.count().all_ticks()
2355 /// # }, |mut stream| async move {
2356 /// // 4
2357 /// # assert_eq!(stream.next().await.unwrap(), 4);
2358 /// # }));
2359 /// # }
2360 /// ```
2361 pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2362 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2363 /// Order does not affect eventual count, and also does not affect intermediate states.
2364 ))
2365 .fold(
2366 q!(|| 0usize),
2367 q!(
2368 |count, _| *count += 1,
2369 monotone = manual_proof!(/** += 1 is monotone */)
2370 ),
2371 )
2372 }
2373}
2374
2375impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2376 /// Produces a new stream that merges the elements of the two input streams.
2377 /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2378 ///
2379 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2380 /// [`Bounded`], you can use [`Stream::chain`] instead.
2381 ///
2382 /// # Example
2383 /// ```rust
2384 /// # #[cfg(feature = "deploy")] {
2385 /// # use hydro_lang::prelude::*;
2386 /// # use futures::StreamExt;
2387 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2388 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2389 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2390 /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2391 /// # }, |mut stream| async move {
2392 /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2393 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2394 /// # assert_eq!(stream.next().await.unwrap(), w);
2395 /// # }
2396 /// # }));
2397 /// # }
2398 /// ```
2399 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2400 self,
2401 other: Stream<T, L, Unbounded, O2, R2>,
2402 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2403 where
2404 R: MinRetries<R2>,
2405 {
2406 Stream::new(
2407 self.location.clone(),
2408 HydroNode::Chain {
2409 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2410 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2411 metadata: self.location.new_node_metadata(Stream::<
2412 T,
2413 L,
2414 Unbounded,
2415 NoOrder,
2416 <R as MinRetries<R2>>::Min,
2417 >::collection_kind()),
2418 },
2419 )
2420 }
2421
2422 /// Deprecated: use [`Stream::merge_unordered`] instead.
2423 #[deprecated(note = "use `merge_unordered` instead")]
2424 pub fn interleave<O2: Ordering, R2: Retries>(
2425 self,
2426 other: Stream<T, L, Unbounded, O2, R2>,
2427 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2428 where
2429 R: MinRetries<R2>,
2430 {
2431 self.merge_unordered(other)
2432 }
2433}
2434
2435impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2436 /// Produces a new stream that combines the elements of the two input streams,
2437 /// preserving the relative order of elements within each input.
2438 ///
2439 /// # Non-Determinism
2440 /// The order in which elements *across* the two streams will be interleaved is
2441 /// non-deterministic, so the order of elements will vary across runs. If the output
2442 /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2443 /// but emits an unordered stream. For deterministic first-then-second ordering on
2444 /// bounded streams, use [`Stream::chain`].
2445 ///
2446 /// # Example
2447 /// ```rust
2448 /// # #[cfg(feature = "deploy")] {
2449 /// # use hydro_lang::prelude::*;
2450 /// # use futures::StreamExt;
2451 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2452 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2453 /// # process.source_iter(q!(vec![1, 3])).into();
2454 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2455 /// # }, |mut stream| async move {
2456 /// // 1, 3 and 2, 4 in some order, preserving the original local order
2457 /// # for w in vec![1, 3, 2, 4] {
2458 /// # assert_eq!(stream.next().await.unwrap(), w);
2459 /// # }
2460 /// # }));
2461 /// # }
2462 /// ```
2463 pub fn merge_ordered<R2: Retries>(
2464 self,
2465 other: Stream<T, L, B, TotalOrder, R2>,
2466 _nondet: NonDet,
2467 ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2468 where
2469 R: MinRetries<R2>,
2470 {
2471 let target_location = self.location().drop_consistency();
2472 Stream::new(
2473 target_location.clone(),
2474 HydroNode::MergeOrdered {
2475 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2476 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2477 metadata: target_location.new_node_metadata(Stream::<
2478 T,
2479 L::DropConsistency,
2480 B,
2481 TotalOrder,
2482 <R as MinRetries<R2>>::Min,
2483 >::collection_kind()),
2484 },
2485 )
2486 }
2487}
2488
2489impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2490where
2491 L: Location<'a>,
2492{
2493 /// Produces a new stream that emits the input elements in sorted order.
2494 ///
2495 /// The input stream can have any ordering guarantee, but the output stream
2496 /// will have a [`TotalOrder`] guarantee. This operator will block until all
2497 /// elements in the input stream are available, so it requires the input stream
2498 /// to be [`Bounded`].
2499 ///
2500 /// # Example
2501 /// ```rust
2502 /// # #[cfg(feature = "deploy")] {
2503 /// # use hydro_lang::prelude::*;
2504 /// # use futures::StreamExt;
2505 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2506 /// let tick = process.tick();
2507 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2508 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2509 /// batch.sort().all_ticks()
2510 /// # }, |mut stream| async move {
2511 /// // 1, 2, 3, 4
2512 /// # for w in (1..5) {
2513 /// # assert_eq!(stream.next().await.unwrap(), w);
2514 /// # }
2515 /// # }));
2516 /// # }
2517 /// ```
2518 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2519 where
2520 B: IsBounded,
2521 T: Ord,
2522 {
2523 let this = self.make_bounded();
2524 Stream::new(
2525 this.location.clone(),
2526 HydroNode::Sort {
2527 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2528 metadata: this
2529 .location
2530 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2531 },
2532 )
2533 }
2534
2535 /// Produces a new stream that first emits the elements of the `self` stream,
2536 /// and then emits the elements of the `other` stream. The output stream has
2537 /// a [`TotalOrder`] guarantee if and only if both input streams have a
2538 /// [`TotalOrder`] guarantee.
2539 ///
2540 /// Currently, both input streams must be [`Bounded`]. This operator will block
2541 /// on the first stream until all its elements are available. In a future version,
2542 /// we will relax the requirement on the `other` stream.
2543 ///
2544 /// # Example
2545 /// ```rust
2546 /// # #[cfg(feature = "deploy")] {
2547 /// # use hydro_lang::prelude::*;
2548 /// # use futures::StreamExt;
2549 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2550 /// let tick = process.tick();
2551 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2552 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2553 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2554 /// # }, |mut stream| async move {
2555 /// // 2, 3, 4, 5, 1, 2, 3, 4
2556 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2557 /// # assert_eq!(stream.next().await.unwrap(), w);
2558 /// # }
2559 /// # }));
2560 /// # }
2561 /// ```
2562 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2563 self,
2564 other: Stream<T, L, B2, O2, R2>,
2565 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2566 where
2567 B: IsBounded,
2568 O: MinOrder<O2>,
2569 R: MinRetries<R2>,
2570 {
2571 check_matching_location(&self.location, &other.location);
2572
2573 Stream::new(
2574 self.location.clone(),
2575 HydroNode::Chain {
2576 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2577 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2578 metadata: self.location.new_node_metadata(Stream::<
2579 T,
2580 L,
2581 B2,
2582 <O as MinOrder<O2>>::Min,
2583 <R as MinRetries<R2>>::Min,
2584 >::collection_kind()),
2585 },
2586 )
2587 }
2588
2589 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2590 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2591 /// because this is compiled into a nested loop.
2592 #[expect(
2593 clippy::type_complexity,
2594 reason = "MinRetries projection in return type"
2595 )]
2596 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>, R2: Retries>(
2597 self,
2598 other: Stream<T2, L, Bounded, O2, R2>,
2599 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, <R as MinRetries<R2>>::Min>
2600 where
2601 B: IsBounded,
2602 T: Clone,
2603 T2: Clone,
2604 R: MinRetries<R2>,
2605 {
2606 let this = self.make_bounded();
2607 check_matching_location(&this.location, &other.location);
2608
2609 Stream::new(
2610 this.location.clone(),
2611 HydroNode::CrossProduct {
2612 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2613 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2614 metadata: this.location.new_node_metadata(Stream::<
2615 (T, T2),
2616 L,
2617 Bounded,
2618 <O2 as MinOrder<O>>::Min,
2619 <R as MinRetries<R2>>::Min,
2620 >::collection_kind()),
2621 },
2622 )
2623 }
2624
2625 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2626 /// `self` used as the values for *each* key.
2627 ///
2628 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2629 /// values. For example, it can be used to send the same set of elements to several cluster
2630 /// members, if the membership information is available as a [`KeyedSingleton`].
2631 ///
2632 /// # Example
2633 /// ```rust
2634 /// # #[cfg(feature = "deploy")] {
2635 /// # use hydro_lang::prelude::*;
2636 /// # use futures::StreamExt;
2637 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2638 /// # let tick = process.tick();
2639 /// let keyed_singleton = // { 1: (), 2: () }
2640 /// # process
2641 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
2642 /// # .into_keyed()
2643 /// # .batch(&tick, nondet!(/** test */))
2644 /// # .first();
2645 /// let stream = // [ "a", "b" ]
2646 /// # process
2647 /// # .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2648 /// # .batch(&tick, nondet!(/** test */));
2649 /// stream.repeat_with_keys(keyed_singleton)
2650 /// # .entries().all_ticks()
2651 /// # }, |mut stream| async move {
2652 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2653 /// # let mut results = Vec::new();
2654 /// # for _ in 0..4 {
2655 /// # results.push(stream.next().await.unwrap());
2656 /// # }
2657 /// # results.sort();
2658 /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2659 /// # }));
2660 /// # }
2661 /// ```
2662 pub fn repeat_with_keys<K, V2>(
2663 self,
2664 keys: KeyedSingleton<K, V2, L, Bounded>,
2665 ) -> KeyedStream<K, T, L, Bounded, O, R>
2666 where
2667 B: IsBounded,
2668 K: Clone,
2669 T: Clone,
2670 {
2671 keys.keys()
2672 .assume_ordering_trusted::<TotalOrder>(
2673 nondet!(/** keyed stream does not depend on ordering of keys */),
2674 )
2675 .cross_product_nested_loop(self.make_bounded())
2676 .into_keyed()
2677 }
2678
2679 /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2680 /// execution until all results are available. The output order is based on when futures
2681 /// complete, and may be different than the input order.
2682 ///
2683 /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2684 /// while futures are pending, this variant blocks until the futures resolve.
2685 ///
2686 /// # Example
2687 /// ```rust
2688 /// # #[cfg(feature = "deploy")] {
2689 /// # use std::collections::HashSet;
2690 /// # use futures::StreamExt;
2691 /// # use hydro_lang::prelude::*;
2692 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2693 /// process
2694 /// .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2695 /// .map(q!(|x| async move {
2696 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2697 /// x
2698 /// }))
2699 /// .resolve_futures_blocking()
2700 /// # },
2701 /// # |mut stream| async move {
2702 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2703 /// # let mut output = HashSet::new();
2704 /// # for _ in 1..10 {
2705 /// # output.insert(stream.next().await.unwrap());
2706 /// # }
2707 /// # assert_eq!(
2708 /// # output,
2709 /// # HashSet::<i32>::from_iter(1..10)
2710 /// # );
2711 /// # },
2712 /// # ));
2713 /// # }
2714 /// ```
2715 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2716 where
2717 T: Future,
2718 {
2719 Stream::new(
2720 self.location.clone(),
2721 HydroNode::ResolveFuturesBlocking {
2722 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2723 metadata: self
2724 .location
2725 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2726 },
2727 )
2728 }
2729
2730 /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2731 ///
2732 /// # Example
2733 /// ```rust
2734 /// # #[cfg(feature = "deploy")] {
2735 /// # use hydro_lang::prelude::*;
2736 /// # use futures::StreamExt;
2737 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2738 /// let tick = process.tick();
2739 /// let empty: Stream<i32, _, Bounded> = process
2740 /// .source_iter(q!(Vec::<i32>::new()))
2741 /// .batch(&tick, nondet!(/** test */));
2742 /// empty.is_empty().all_ticks()
2743 /// # }, |mut stream| async move {
2744 /// // true
2745 /// # assert_eq!(stream.next().await.unwrap(), true);
2746 /// # }));
2747 /// # }
2748 /// ```
2749 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2750 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2751 where
2752 B: IsBounded,
2753 {
2754 self.make_bounded()
2755 .assume_ordering_trusted::<TotalOrder>(
2756 nondet!(/** is_empty intermediates unaffected by order */),
2757 )
2758 .first()
2759 .is_none()
2760 }
2761}
2762
2763impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2764where
2765 L: Location<'a>,
2766{
2767 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2768 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2769 /// by equi-joining the two streams on the key attribute `K`.
2770 ///
2771 /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2772 /// and streams the left side through, preserving the left side's ordering. When both
2773 /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2774 ///
2775 /// # Example
2776 /// ```rust
2777 /// # #[cfg(feature = "deploy")] {
2778 /// # use hydro_lang::prelude::*;
2779 /// # use std::collections::HashSet;
2780 /// # use futures::StreamExt;
2781 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2782 /// let tick = process.tick();
2783 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2784 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2785 /// stream1.join(stream2)
2786 /// # }, |mut stream| async move {
2787 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2788 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2789 /// # stream.map(|i| assert!(expected.contains(&i)));
2790 /// # }));
2791 /// # }
2792 pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2793 self,
2794 n: Stream<(K, V2), L, B2, O2, R2>,
2795 ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2796 where
2797 K: Eq + Hash + Clone,
2798 R: MinRetries<R2>,
2799 V1: Clone,
2800 V2: Clone,
2801 {
2802 check_matching_location(&self.location, &n.location);
2803
2804 let ir_node = if B2::BOUNDED {
2805 HydroNode::JoinHalf {
2806 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2807 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2808 metadata: self.location.new_node_metadata(Stream::<
2809 (K, (V1, V2)),
2810 L,
2811 B,
2812 B2::PreserveOrderIfBounded<O>,
2813 <R as MinRetries<R2>>::Min,
2814 >::collection_kind()),
2815 }
2816 } else {
2817 HydroNode::Join {
2818 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2819 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2820 metadata: self.location.new_node_metadata(Stream::<
2821 (K, (V1, V2)),
2822 L,
2823 B,
2824 B2::PreserveOrderIfBounded<O>,
2825 <R as MinRetries<R2>>::Min,
2826 >::collection_kind()),
2827 }
2828 };
2829
2830 Stream::new(self.location.clone(), ir_node)
2831 }
2832
2833 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2834 /// computes the anti-join of the items in the input -- i.e. returns
2835 /// unique items in the first input that do not have a matching key
2836 /// in the second input.
2837 ///
2838 /// # Example
2839 /// ```rust
2840 /// # #[cfg(feature = "deploy")] {
2841 /// # use hydro_lang::prelude::*;
2842 /// # use futures::StreamExt;
2843 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2844 /// let tick = process.tick();
2845 /// let stream = process
2846 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2847 /// .batch(&tick, nondet!(/** test */));
2848 /// let batch = process
2849 /// .source_iter(q!(vec![1, 2]))
2850 /// .batch(&tick, nondet!(/** test */));
2851 /// stream.anti_join(batch).all_ticks()
2852 /// # }, |mut stream| async move {
2853 /// # for w in vec![(3, 'c'), (4, 'd')] {
2854 /// # assert_eq!(stream.next().await.unwrap(), w);
2855 /// # }
2856 /// # }));
2857 /// # }
2858 pub fn anti_join<O2: Ordering, R2: Retries>(
2859 self,
2860 n: Stream<K, L, Bounded, O2, R2>,
2861 ) -> Stream<(K, V1), L, B, O, R>
2862 where
2863 K: Eq + Hash,
2864 {
2865 check_matching_location(&self.location, &n.location);
2866
2867 Stream::new(
2868 self.location.clone(),
2869 HydroNode::AntiJoin {
2870 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2871 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2872 metadata: self
2873 .location
2874 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2875 },
2876 )
2877 }
2878}
2879
2880impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2881 Stream<(K, V), L, B, O, R>
2882{
2883 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2884 /// is used as the key and the second element is added to the entries associated with that key.
2885 ///
2886 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2887 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2888 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2889 /// total ordering _within_ each group but no ordering _across_ groups.
2890 ///
2891 /// # Example
2892 /// ```rust
2893 /// # #[cfg(feature = "deploy")] {
2894 /// # use hydro_lang::prelude::*;
2895 /// # use futures::StreamExt;
2896 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2897 /// process
2898 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2899 /// .into_keyed()
2900 /// # .entries()
2901 /// # }, |mut stream| async move {
2902 /// // { 1: [2, 3], 2: [4] }
2903 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2904 /// # assert_eq!(stream.next().await.unwrap(), w);
2905 /// # }
2906 /// # }));
2907 /// # }
2908 /// ```
2909 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2910 KeyedStream::new(
2911 self.location.clone(),
2912 HydroNode::Cast {
2913 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2914 metadata: self
2915 .location
2916 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2917 },
2918 )
2919 }
2920}
2921
2922impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2923where
2924 K: Eq + Hash,
2925 L: Location<'a>,
2926{
2927 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2928 /// # Example
2929 /// ```rust
2930 /// # #[cfg(feature = "deploy")] {
2931 /// # use hydro_lang::prelude::*;
2932 /// # use futures::StreamExt;
2933 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2934 /// let tick = process.tick();
2935 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2936 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2937 /// batch.keys().all_ticks()
2938 /// # }, |mut stream| async move {
2939 /// // 1, 2
2940 /// # assert_eq!(stream.next().await.unwrap(), 1);
2941 /// # assert_eq!(stream.next().await.unwrap(), 2);
2942 /// # }));
2943 /// # }
2944 /// ```
2945 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2946 self.into_keyed()
2947 .fold(
2948 q!(|| ()),
2949 q!(
2950 |_, _| {},
2951 commutative = manual_proof!(/** values are ignored */),
2952 idempotent = manual_proof!(/** values are ignored */)
2953 ),
2954 )
2955 .keys()
2956 }
2957}
2958
2959impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2960where
2961 L: Location<'a>,
2962{
2963 /// Returns a stream corresponding to the latest batch of elements being atomically
2964 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2965 /// the order of the input.
2966 ///
2967 /// # Non-Determinism
2968 /// The batch boundaries are non-deterministic and may change across executions.
2969 pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2970 self,
2971 tick: &Tick<L2>,
2972 _nondet: NonDet,
2973 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2974 Stream::new(
2975 tick.drop_consistency(),
2976 HydroNode::Batch {
2977 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2978 metadata: tick
2979 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2980 },
2981 )
2982 }
2983
2984 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2985 /// See [`Stream::atomic`] for more details.
2986 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2987 Stream::new(
2988 self.location.tick.l.clone(),
2989 HydroNode::EndAtomic {
2990 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2991 metadata: self
2992 .location
2993 .tick
2994 .l
2995 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2996 },
2997 )
2998 }
2999}
3000
3001impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
3002where
3003 L: TopLevel<'a>,
3004 F: Future<Output = T>,
3005{
3006 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3007 /// Future outputs are produced as available, regardless of input arrival order.
3008 ///
3009 /// # Example
3010 /// ```rust
3011 /// # #[cfg(feature = "deploy")] {
3012 /// # use std::collections::HashSet;
3013 /// # use futures::StreamExt;
3014 /// # use hydro_lang::prelude::*;
3015 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3016 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3017 /// .map(q!(|x| async move {
3018 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3019 /// x
3020 /// }))
3021 /// .resolve_futures()
3022 /// # },
3023 /// # |mut stream| async move {
3024 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
3025 /// # let mut output = HashSet::new();
3026 /// # for _ in 1..10 {
3027 /// # output.insert(stream.next().await.unwrap());
3028 /// # }
3029 /// # assert_eq!(
3030 /// # output,
3031 /// # HashSet::<i32>::from_iter(1..10)
3032 /// # );
3033 /// # },
3034 /// # ));
3035 /// # }
3036 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3037 Stream::new(
3038 self.location.clone(),
3039 HydroNode::ResolveFutures {
3040 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3041 metadata: self
3042 .location
3043 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3044 },
3045 )
3046 }
3047
3048 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3049 /// Future outputs are produced in the same order as the input stream.
3050 ///
3051 /// # Example
3052 /// ```rust
3053 /// # #[cfg(feature = "deploy")] {
3054 /// # use std::collections::HashSet;
3055 /// # use futures::StreamExt;
3056 /// # use hydro_lang::prelude::*;
3057 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3058 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3059 /// .map(q!(|x| async move {
3060 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3061 /// x
3062 /// }))
3063 /// .resolve_futures_ordered()
3064 /// # },
3065 /// # |mut stream| async move {
3066 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
3067 /// # let mut output = Vec::new();
3068 /// # for _ in 1..10 {
3069 /// # output.push(stream.next().await.unwrap());
3070 /// # }
3071 /// # assert_eq!(
3072 /// # output,
3073 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
3074 /// # );
3075 /// # },
3076 /// # ));
3077 /// # }
3078 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3079 Stream::new(
3080 self.location.clone(),
3081 HydroNode::ResolveFuturesOrdered {
3082 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3083 metadata: self
3084 .location
3085 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3086 },
3087 )
3088 }
3089}
3090
3091impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3092where
3093 L: Location<'a>,
3094{
3095 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
3096 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3097 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3098 Stream::new(
3099 self.location.outer().clone(),
3100 HydroNode::YieldConcat {
3101 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3102 metadata: self
3103 .location
3104 .outer()
3105 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3106 },
3107 )
3108 }
3109
3110 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
3111 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3112 ///
3113 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
3114 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
3115 /// stream's [`Tick`] context.
3116 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3117 let out_location = Atomic {
3118 tick: self.location.clone(),
3119 };
3120
3121 Stream::new(
3122 out_location.clone(),
3123 HydroNode::YieldConcat {
3124 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3125 metadata: out_location
3126 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3127 },
3128 )
3129 }
3130
3131 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
3132 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
3133 /// input.
3134 ///
3135 /// This API is particularly useful for stateful computation on batches of data, such as
3136 /// maintaining an accumulated state that is up to date with the current batch.
3137 ///
3138 /// # Example
3139 /// ```rust
3140 /// # #[cfg(feature = "deploy")] {
3141 /// # use hydro_lang::prelude::*;
3142 /// # use futures::StreamExt;
3143 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3144 /// let tick = process.tick();
3145 /// # // ticks are lazy by default, forces the second tick to run
3146 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3147 /// # let batch_first_tick = process
3148 /// # .source_iter(q!(vec![1, 2, 3, 4]))
3149 /// # .batch(&tick, nondet!(/** test */));
3150 /// # let batch_second_tick = process
3151 /// # .source_iter(q!(vec![5, 6, 7]))
3152 /// # .batch(&tick, nondet!(/** test */))
3153 /// # .defer_tick(); // appears on the second tick
3154 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3155 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3156 ///
3157 /// input.batch(&tick, nondet!(/** test */))
3158 /// .across_ticks(|s| s.count()).all_ticks()
3159 /// # }, |mut stream| async move {
3160 /// // [4, 7]
3161 /// assert_eq!(stream.next().await.unwrap(), 4);
3162 /// assert_eq!(stream.next().await.unwrap(), 7);
3163 /// # }));
3164 /// # }
3165 /// ```
3166 pub fn across_ticks<Out: BatchAtomic<'a>>(
3167 self,
3168 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3169 ) -> Out::Batched {
3170 thunk(self.all_ticks_atomic()).batched_atomic()
3171 }
3172
3173 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3174 /// always has the elements of `self` at tick `T - 1`.
3175 ///
3176 /// At tick `0`, the output stream is empty, since there is no previous tick.
3177 ///
3178 /// This operator enables stateful iterative processing with ticks, by sending data from one
3179 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3180 ///
3181 /// # Example
3182 /// ```rust
3183 /// # #[cfg(feature = "deploy")] {
3184 /// # use hydro_lang::prelude::*;
3185 /// # use futures::StreamExt;
3186 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3187 /// let tick = process.tick();
3188 /// // ticks are lazy by default, forces the second tick to run
3189 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3190 ///
3191 /// let batch_first_tick = process
3192 /// .source_iter(q!(vec![1, 2, 3, 4]))
3193 /// .batch(&tick, nondet!(/** test */));
3194 /// let batch_second_tick = process
3195 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
3196 /// .batch(&tick, nondet!(/** test */))
3197 /// .defer_tick(); // appears on the second tick
3198 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3199 ///
3200 /// changes_across_ticks.clone().filter_not_in(
3201 /// changes_across_ticks.defer_tick() // the elements from the previous tick
3202 /// ).all_ticks()
3203 /// # }, |mut stream| async move {
3204 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3205 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3206 /// # assert_eq!(stream.next().await.unwrap(), w);
3207 /// # }
3208 /// # }));
3209 /// # }
3210 /// ```
3211 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3212 Stream::new(
3213 self.location.clone(),
3214 HydroNode::DeferTick {
3215 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3216 metadata: self
3217 .location
3218 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3219 },
3220 )
3221 }
3222}
3223
3224#[cfg(test)]
3225mod tests {
3226 #[cfg(feature = "deploy")]
3227 use futures::{SinkExt, StreamExt};
3228 #[cfg(feature = "deploy")]
3229 use hydro_deploy::Deployment;
3230 #[cfg(feature = "deploy")]
3231 use serde::{Deserialize, Serialize};
3232 #[cfg(any(feature = "deploy", feature = "sim"))]
3233 use stageleft::q;
3234
3235 #[cfg(any(feature = "deploy", feature = "sim"))]
3236 use crate::compile::builder::FlowBuilder;
3237 #[cfg(feature = "deploy")]
3238 use crate::live_collections::sliced::sliced;
3239 #[cfg(feature = "deploy")]
3240 use crate::live_collections::stream::ExactlyOnce;
3241 #[cfg(feature = "sim")]
3242 use crate::live_collections::stream::NoOrder;
3243 #[cfg(any(feature = "deploy", feature = "sim"))]
3244 use crate::live_collections::stream::TotalOrder;
3245 #[cfg(any(feature = "deploy", feature = "sim"))]
3246 use crate::location::Location;
3247 #[cfg(feature = "sim")]
3248 use crate::networking::TCP;
3249 #[cfg(any(feature = "deploy", feature = "sim"))]
3250 use crate::nondet::nondet;
3251
3252 mod backtrace_chained_ops;
3253
3254 #[cfg(feature = "deploy")]
3255 struct P1 {}
3256 #[cfg(feature = "deploy")]
3257 struct P2 {}
3258
3259 #[cfg(feature = "deploy")]
3260 #[derive(Serialize, Deserialize, Debug)]
3261 struct SendOverNetwork {
3262 n: u32,
3263 }
3264
3265 #[cfg(feature = "deploy")]
3266 #[tokio::test]
3267 async fn first_ten_distributed() {
3268 use crate::networking::TCP;
3269
3270 let mut deployment = Deployment::new();
3271
3272 let mut flow = FlowBuilder::new();
3273 let first_node = flow.process::<P1>();
3274 let second_node = flow.process::<P2>();
3275 let external = flow.external::<P2>();
3276
3277 let numbers = first_node.source_iter(q!(0..10));
3278 let out_port = numbers
3279 .map(q!(|n| SendOverNetwork { n }))
3280 .send(&second_node, TCP.fail_stop().bincode())
3281 .send_bincode_external(&external);
3282
3283 let nodes = flow
3284 .with_process(&first_node, deployment.Localhost())
3285 .with_process(&second_node, deployment.Localhost())
3286 .with_external(&external, deployment.Localhost())
3287 .deploy(&mut deployment);
3288
3289 deployment.deploy().await.unwrap();
3290
3291 let mut external_out = nodes.connect(out_port).await;
3292
3293 deployment.start().await.unwrap();
3294
3295 for i in 0..10 {
3296 assert_eq!(external_out.next().await.unwrap().n, i);
3297 }
3298 }
3299
3300 #[cfg(feature = "deploy")]
3301 #[tokio::test]
3302 async fn first_cardinality() {
3303 let mut deployment = Deployment::new();
3304
3305 let mut flow = FlowBuilder::new();
3306 let node = flow.process::<()>();
3307 let external = flow.external::<()>();
3308
3309 let node_tick = node.tick();
3310 let count = node_tick
3311 .singleton(q!([1, 2, 3]))
3312 .into_stream()
3313 .flatten_ordered()
3314 .first()
3315 .into_stream()
3316 .count()
3317 .all_ticks()
3318 .send_bincode_external(&external);
3319
3320 let nodes = flow
3321 .with_process(&node, deployment.Localhost())
3322 .with_external(&external, deployment.Localhost())
3323 .deploy(&mut deployment);
3324
3325 deployment.deploy().await.unwrap();
3326
3327 let mut external_out = nodes.connect(count).await;
3328
3329 deployment.start().await.unwrap();
3330
3331 assert_eq!(external_out.next().await.unwrap(), 1);
3332 }
3333
3334 #[cfg(feature = "deploy")]
3335 #[tokio::test]
3336 async fn unbounded_reduce_remembers_state() {
3337 let mut deployment = Deployment::new();
3338
3339 let mut flow = FlowBuilder::new();
3340 let node = flow.process::<()>();
3341 let external = flow.external::<()>();
3342
3343 let (input_port, input) = node.source_external_bincode(&external);
3344 let out = input
3345 .reduce(q!(|acc, v| *acc += v))
3346 .sample_eager(nondet!(/** test */))
3347 .send_bincode_external(&external);
3348
3349 let nodes = flow
3350 .with_process(&node, deployment.Localhost())
3351 .with_external(&external, deployment.Localhost())
3352 .deploy(&mut deployment);
3353
3354 deployment.deploy().await.unwrap();
3355
3356 let mut external_in = nodes.connect(input_port).await;
3357 let mut external_out = nodes.connect(out).await;
3358
3359 deployment.start().await.unwrap();
3360
3361 external_in.send(1).await.unwrap();
3362 assert_eq!(external_out.next().await.unwrap(), 1);
3363
3364 external_in.send(2).await.unwrap();
3365 assert_eq!(external_out.next().await.unwrap(), 3);
3366 }
3367
3368 #[cfg(feature = "deploy")]
3369 #[tokio::test]
3370 async fn top_level_bounded_cross_singleton() {
3371 let mut deployment = Deployment::new();
3372
3373 let mut flow = FlowBuilder::new();
3374 let node = flow.process::<()>();
3375 let external = flow.external::<()>();
3376
3377 let (input_port, input) =
3378 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3379
3380 let out = input
3381 .cross_singleton(
3382 node.source_iter(q!(vec![1, 2, 3]))
3383 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3384 )
3385 .send_bincode_external(&external);
3386
3387 let nodes = flow
3388 .with_process(&node, deployment.Localhost())
3389 .with_external(&external, deployment.Localhost())
3390 .deploy(&mut deployment);
3391
3392 deployment.deploy().await.unwrap();
3393
3394 let mut external_in = nodes.connect(input_port).await;
3395 let mut external_out = nodes.connect(out).await;
3396
3397 deployment.start().await.unwrap();
3398
3399 external_in.send(1).await.unwrap();
3400 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3401
3402 external_in.send(2).await.unwrap();
3403 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3404 }
3405
3406 #[cfg(feature = "deploy")]
3407 #[tokio::test]
3408 async fn top_level_bounded_reduce_cardinality() {
3409 let mut deployment = Deployment::new();
3410
3411 let mut flow = FlowBuilder::new();
3412 let node = flow.process::<()>();
3413 let external = flow.external::<()>();
3414
3415 let (input_port, input) =
3416 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3417
3418 let out = sliced! {
3419 let input = use(input, nondet!(/** test */));
3420 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3421 input.cross_singleton(v.into_stream().count())
3422 }
3423 .send_bincode_external(&external);
3424
3425 let nodes = flow
3426 .with_process(&node, deployment.Localhost())
3427 .with_external(&external, deployment.Localhost())
3428 .deploy(&mut deployment);
3429
3430 deployment.deploy().await.unwrap();
3431
3432 let mut external_in = nodes.connect(input_port).await;
3433 let mut external_out = nodes.connect(out).await;
3434
3435 deployment.start().await.unwrap();
3436
3437 external_in.send(1).await.unwrap();
3438 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3439
3440 external_in.send(2).await.unwrap();
3441 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3442 }
3443
3444 #[cfg(feature = "deploy")]
3445 #[tokio::test]
3446 async fn top_level_bounded_into_singleton_cardinality() {
3447 let mut deployment = Deployment::new();
3448
3449 let mut flow = FlowBuilder::new();
3450 let node = flow.process::<()>();
3451 let external = flow.external::<()>();
3452
3453 let (input_port, input) =
3454 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3455
3456 let out = sliced! {
3457 let input = use(input, nondet!(/** test */));
3458 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3459 input.cross_singleton(v.into_stream().count())
3460 }
3461 .send_bincode_external(&external);
3462
3463 let nodes = flow
3464 .with_process(&node, deployment.Localhost())
3465 .with_external(&external, deployment.Localhost())
3466 .deploy(&mut deployment);
3467
3468 deployment.deploy().await.unwrap();
3469
3470 let mut external_in = nodes.connect(input_port).await;
3471 let mut external_out = nodes.connect(out).await;
3472
3473 deployment.start().await.unwrap();
3474
3475 external_in.send(1).await.unwrap();
3476 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3477
3478 external_in.send(2).await.unwrap();
3479 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3480 }
3481
3482 #[cfg(feature = "deploy")]
3483 #[tokio::test]
3484 async fn atomic_fold_replays_each_tick() {
3485 let mut deployment = Deployment::new();
3486
3487 let mut flow = FlowBuilder::new();
3488 let node = flow.process::<()>();
3489 let external = flow.external::<()>();
3490
3491 let (input_port, input) =
3492 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3493 let tick = node.tick();
3494
3495 let out = input
3496 .batch(&tick, nondet!(/** test */))
3497 .cross_singleton(
3498 node.source_iter(q!(vec![1, 2, 3]))
3499 .atomic()
3500 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3501 .snapshot_atomic(&tick, nondet!(/** test */)),
3502 )
3503 .all_ticks()
3504 .send_bincode_external(&external);
3505
3506 let nodes = flow
3507 .with_process(&node, deployment.Localhost())
3508 .with_external(&external, deployment.Localhost())
3509 .deploy(&mut deployment);
3510
3511 deployment.deploy().await.unwrap();
3512
3513 let mut external_in = nodes.connect(input_port).await;
3514 let mut external_out = nodes.connect(out).await;
3515
3516 deployment.start().await.unwrap();
3517
3518 external_in.send(1).await.unwrap();
3519 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3520
3521 external_in.send(2).await.unwrap();
3522 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3523 }
3524
3525 #[cfg(feature = "deploy")]
3526 #[tokio::test]
3527 async fn unbounded_scan_remembers_state() {
3528 let mut deployment = Deployment::new();
3529
3530 let mut flow = FlowBuilder::new();
3531 let node = flow.process::<()>();
3532 let external = flow.external::<()>();
3533
3534 let (input_port, input) = node.source_external_bincode(&external);
3535 let out = input
3536 .scan(
3537 q!(|| 0),
3538 q!(|acc, v| {
3539 *acc += v;
3540 Some(*acc)
3541 }),
3542 )
3543 .send_bincode_external(&external);
3544
3545 let nodes = flow
3546 .with_process(&node, deployment.Localhost())
3547 .with_external(&external, deployment.Localhost())
3548 .deploy(&mut deployment);
3549
3550 deployment.deploy().await.unwrap();
3551
3552 let mut external_in = nodes.connect(input_port).await;
3553 let mut external_out = nodes.connect(out).await;
3554
3555 deployment.start().await.unwrap();
3556
3557 external_in.send(1).await.unwrap();
3558 assert_eq!(external_out.next().await.unwrap(), 1);
3559
3560 external_in.send(2).await.unwrap();
3561 assert_eq!(external_out.next().await.unwrap(), 3);
3562 }
3563
3564 #[cfg(feature = "deploy")]
3565 #[tokio::test]
3566 async fn unbounded_enumerate_remembers_state() {
3567 let mut deployment = Deployment::new();
3568
3569 let mut flow = FlowBuilder::new();
3570 let node = flow.process::<()>();
3571 let external = flow.external::<()>();
3572
3573 let (input_port, input) = node.source_external_bincode(&external);
3574 let out = input.enumerate().send_bincode_external(&external);
3575
3576 let nodes = flow
3577 .with_process(&node, deployment.Localhost())
3578 .with_external(&external, deployment.Localhost())
3579 .deploy(&mut deployment);
3580
3581 deployment.deploy().await.unwrap();
3582
3583 let mut external_in = nodes.connect(input_port).await;
3584 let mut external_out = nodes.connect(out).await;
3585
3586 deployment.start().await.unwrap();
3587
3588 external_in.send(1).await.unwrap();
3589 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3590
3591 external_in.send(2).await.unwrap();
3592 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3593 }
3594
3595 #[cfg(feature = "deploy")]
3596 #[tokio::test]
3597 async fn unbounded_unique_remembers_state() {
3598 let mut deployment = Deployment::new();
3599
3600 let mut flow = FlowBuilder::new();
3601 let node = flow.process::<()>();
3602 let external = flow.external::<()>();
3603
3604 let (input_port, input) =
3605 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3606 let out = input.unique().send_bincode_external(&external);
3607
3608 let nodes = flow
3609 .with_process(&node, deployment.Localhost())
3610 .with_external(&external, deployment.Localhost())
3611 .deploy(&mut deployment);
3612
3613 deployment.deploy().await.unwrap();
3614
3615 let mut external_in = nodes.connect(input_port).await;
3616 let mut external_out = nodes.connect(out).await;
3617
3618 deployment.start().await.unwrap();
3619
3620 external_in.send(1).await.unwrap();
3621 assert_eq!(external_out.next().await.unwrap(), 1);
3622
3623 external_in.send(2).await.unwrap();
3624 assert_eq!(external_out.next().await.unwrap(), 2);
3625
3626 external_in.send(1).await.unwrap();
3627 external_in.send(3).await.unwrap();
3628 assert_eq!(external_out.next().await.unwrap(), 3);
3629 }
3630
3631 #[cfg(feature = "sim")]
3632 #[test]
3633 #[should_panic]
3634 fn sim_batch_nondet_size() {
3635 let mut flow = FlowBuilder::new();
3636 let node = flow.process::<()>();
3637
3638 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3639
3640 let tick = node.tick();
3641 let out_recv = input
3642 .batch(&tick, nondet!(/** test */))
3643 .count()
3644 .all_ticks()
3645 .sim_output();
3646
3647 flow.sim().exhaustive(async || {
3648 in_send.send(());
3649 in_send.send(());
3650 in_send.send(());
3651
3652 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3653 });
3654 }
3655
3656 #[cfg(feature = "sim")]
3657 #[test]
3658 fn sim_batch_preserves_order() {
3659 let mut flow = FlowBuilder::new();
3660 let node = flow.process::<()>();
3661
3662 let (in_send, input) = node.sim_input();
3663
3664 let tick = node.tick();
3665 let out_recv = input
3666 .batch(&tick, nondet!(/** test */))
3667 .all_ticks()
3668 .sim_output();
3669
3670 flow.sim().exhaustive(async || {
3671 in_send.send(1);
3672 in_send.send(2);
3673 in_send.send(3);
3674
3675 out_recv.assert_yields_only([1, 2, 3]).await;
3676 });
3677 }
3678
3679 #[cfg(feature = "sim")]
3680 #[test]
3681 #[should_panic]
3682 fn sim_batch_unordered_shuffles() {
3683 let mut flow = FlowBuilder::new();
3684 let node = flow.process::<()>();
3685
3686 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3687
3688 let tick = node.tick();
3689 let batch = input.batch(&tick, nondet!(/** test */));
3690 let out_recv = batch
3691 .clone()
3692 .min()
3693 .zip(batch.max())
3694 .all_ticks()
3695 .sim_output();
3696
3697 flow.sim().exhaustive(async || {
3698 in_send.send_many_unordered([1, 2, 3]);
3699
3700 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3701 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3702 }
3703 });
3704 }
3705
3706 #[cfg(feature = "sim")]
3707 #[test]
3708 fn sim_batch_unordered_shuffles_count() {
3709 let mut flow = FlowBuilder::new();
3710 let node = flow.process::<()>();
3711
3712 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3713
3714 let tick = node.tick();
3715 let batch = input.batch(&tick, nondet!(/** test */));
3716 let out_recv = batch.all_ticks().sim_output();
3717
3718 let instance_count = flow.sim().exhaustive(async || {
3719 in_send.send_many_unordered([1, 2, 3, 4]);
3720 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3721 });
3722
3723 assert_eq!(
3724 instance_count,
3725 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3726 )
3727 }
3728
3729 #[cfg(feature = "sim")]
3730 #[test]
3731 #[should_panic]
3732 fn sim_observe_order_batched() {
3733 let mut flow = FlowBuilder::new();
3734 let node = flow.process::<()>();
3735
3736 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3737
3738 let tick = node.tick();
3739 let batch = input.batch(&tick, nondet!(/** test */));
3740 let out_recv = batch
3741 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3742 .all_ticks()
3743 .sim_output();
3744
3745 flow.sim().exhaustive(async || {
3746 in_send.send_many_unordered([1, 2, 3, 4]);
3747 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3748 });
3749 }
3750
3751 #[cfg(feature = "sim")]
3752 #[test]
3753 fn sim_observe_order_batched_count() {
3754 let mut flow = FlowBuilder::new();
3755 let node = flow.process::<()>();
3756
3757 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3758
3759 let tick = node.tick();
3760 let batch = input.batch(&tick, nondet!(/** test */));
3761 let out_recv = batch
3762 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3763 .all_ticks()
3764 .sim_output();
3765
3766 let instance_count = flow.sim().exhaustive(async || {
3767 in_send.send_many_unordered([1, 2, 3, 4]);
3768 let _ = out_recv.collect::<Vec<_>>().await;
3769 });
3770
3771 assert_eq!(
3772 instance_count,
3773 192 // 4! * 2^{4 - 1}
3774 )
3775 }
3776
3777 #[cfg(feature = "sim")]
3778 #[test]
3779 fn sim_unordered_count_instance_count() {
3780 let mut flow = FlowBuilder::new();
3781 let node = flow.process::<()>();
3782
3783 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3784
3785 let tick = node.tick();
3786 let out_recv = input
3787 .count()
3788 .snapshot(&tick, nondet!(/** test */))
3789 .all_ticks()
3790 .sim_output();
3791
3792 let instance_count = flow.sim().exhaustive(async || {
3793 in_send.send_many_unordered([1, 2, 3, 4]);
3794 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3795 });
3796
3797 assert_eq!(
3798 instance_count,
3799 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3800 )
3801 }
3802
3803 #[cfg(feature = "sim")]
3804 #[test]
3805 fn sim_top_level_assume_ordering() {
3806 let mut flow = FlowBuilder::new();
3807 let node = flow.process::<()>();
3808
3809 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3810
3811 let out_recv = input
3812 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3813 .sim_output();
3814
3815 let instance_count = flow.sim().exhaustive(async || {
3816 in_send.send_many_unordered([1, 2, 3]);
3817 let mut out = out_recv.collect::<Vec<_>>().await;
3818 out.sort();
3819 assert_eq!(out, vec![1, 2, 3]);
3820 });
3821
3822 assert_eq!(instance_count, 6)
3823 }
3824
3825 #[cfg(feature = "sim")]
3826 #[test]
3827 fn sim_top_level_assume_ordering_cycle_back() {
3828 let mut flow = FlowBuilder::new();
3829 let node = flow.process::<()>();
3830 let node2 = flow.process::<()>();
3831
3832 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3833
3834 let (complete_cycle_back, cycle_back) =
3835 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3836 let ordered = input
3837 .merge_unordered(cycle_back)
3838 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3839 complete_cycle_back.complete(
3840 ordered
3841 .clone()
3842 .map(q!(|v| v + 1))
3843 .filter(q!(|v| v % 2 == 1))
3844 .send(&node2, TCP.fail_stop().bincode())
3845 .send(&node, TCP.fail_stop().bincode()),
3846 );
3847
3848 let out_recv = ordered.sim_output();
3849
3850 let mut saw = false;
3851 let instance_count = flow.sim().exhaustive(async || {
3852 in_send.send_many_unordered([0, 2]);
3853 let out = out_recv.collect::<Vec<_>>().await;
3854
3855 if out.starts_with(&[0, 1, 2]) {
3856 saw = true;
3857 }
3858 });
3859
3860 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3861 assert_eq!(instance_count, 6);
3862 }
3863
3864 #[cfg(feature = "sim")]
3865 #[test]
3866 fn sim_top_level_assume_ordering_cycle_back_tick() {
3867 let mut flow = FlowBuilder::new();
3868 let node = flow.process::<()>();
3869 let node2 = flow.process::<()>();
3870
3871 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3872
3873 let (complete_cycle_back, cycle_back) =
3874 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3875 let ordered = input
3876 .merge_unordered(cycle_back)
3877 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3878 complete_cycle_back.complete(
3879 ordered
3880 .clone()
3881 .batch(&node.tick(), nondet!(/** test */))
3882 .all_ticks()
3883 .map(q!(|v| v + 1))
3884 .filter(q!(|v| v % 2 == 1))
3885 .send(&node2, TCP.fail_stop().bincode())
3886 .send(&node, TCP.fail_stop().bincode()),
3887 );
3888
3889 let out_recv = ordered.sim_output();
3890
3891 let mut saw = false;
3892 let instance_count = flow.sim().exhaustive(async || {
3893 in_send.send_many_unordered([0, 2]);
3894 let out = out_recv.collect::<Vec<_>>().await;
3895
3896 if out.starts_with(&[0, 1, 2]) {
3897 saw = true;
3898 }
3899 });
3900
3901 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3902 assert_eq!(instance_count, 58);
3903 }
3904
3905 #[cfg(feature = "sim")]
3906 #[test]
3907 fn sim_top_level_assume_ordering_multiple() {
3908 let mut flow = FlowBuilder::new();
3909 let node = flow.process::<()>();
3910 let node2 = flow.process::<()>();
3911
3912 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3913 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3914
3915 let (complete_cycle_back, cycle_back) =
3916 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3917 let input1_ordered = input
3918 .clone()
3919 .merge_unordered(cycle_back)
3920 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3921 let foo = input1_ordered
3922 .clone()
3923 .map(q!(|v| v + 3))
3924 .weaken_ordering::<NoOrder>()
3925 .merge_unordered(input2)
3926 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3927
3928 complete_cycle_back.complete(
3929 foo.filter(q!(|v| *v == 3))
3930 .send(&node2, TCP.fail_stop().bincode())
3931 .send(&node, TCP.fail_stop().bincode()),
3932 );
3933
3934 let out_recv = input1_ordered.sim_output();
3935
3936 let mut saw = false;
3937 let instance_count = flow.sim().exhaustive(async || {
3938 in_send.send_many_unordered([0, 1]);
3939 let out = out_recv.collect::<Vec<_>>().await;
3940
3941 if out.starts_with(&[0, 3, 1]) {
3942 saw = true;
3943 }
3944 });
3945
3946 assert!(saw, "did not see an instance with 0, 3, 1 in order");
3947 assert_eq!(instance_count, 24);
3948 }
3949
3950 #[cfg(feature = "sim")]
3951 #[test]
3952 fn sim_atomic_assume_ordering_cycle_back() {
3953 let mut flow = FlowBuilder::new();
3954 let node = flow.process::<()>();
3955 let node2 = flow.process::<()>();
3956
3957 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3958
3959 let (complete_cycle_back, cycle_back) =
3960 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3961 let ordered = input
3962 .merge_unordered(cycle_back)
3963 .atomic()
3964 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3965 .end_atomic();
3966 complete_cycle_back.complete(
3967 ordered
3968 .clone()
3969 .map(q!(|v| v + 1))
3970 .filter(q!(|v| v % 2 == 1))
3971 .send(&node2, TCP.fail_stop().bincode())
3972 .send(&node, TCP.fail_stop().bincode()),
3973 );
3974
3975 let out_recv = ordered.sim_output();
3976
3977 let instance_count = flow.sim().exhaustive(async || {
3978 in_send.send_many_unordered([0, 2]);
3979 let out = out_recv.collect::<Vec<_>>().await;
3980 assert_eq!(out.len(), 4);
3981 });
3982 assert_eq!(instance_count, 22);
3983 }
3984
3985 #[cfg(feature = "deploy")]
3986 #[tokio::test]
3987 async fn partition_evens_odds() {
3988 let mut deployment = Deployment::new();
3989
3990 let mut flow = FlowBuilder::new();
3991 let node = flow.process::<()>();
3992 let external = flow.external::<()>();
3993
3994 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3995 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3996 let evens_port = evens.send_bincode_external(&external);
3997 let odds_port = odds.send_bincode_external(&external);
3998
3999 let nodes = flow
4000 .with_process(&node, deployment.Localhost())
4001 .with_external(&external, deployment.Localhost())
4002 .deploy(&mut deployment);
4003
4004 deployment.deploy().await.unwrap();
4005
4006 let mut evens_out = nodes.connect(evens_port).await;
4007 let mut odds_out = nodes.connect(odds_port).await;
4008
4009 deployment.start().await.unwrap();
4010
4011 let mut even_results = Vec::new();
4012 for _ in 0..3 {
4013 even_results.push(evens_out.next().await.unwrap());
4014 }
4015 even_results.sort();
4016 assert_eq!(even_results, vec![2, 4, 6]);
4017
4018 let mut odd_results = Vec::new();
4019 for _ in 0..3 {
4020 odd_results.push(odds_out.next().await.unwrap());
4021 }
4022 odd_results.sort();
4023 assert_eq!(odd_results, vec![1, 3, 5]);
4024 }
4025
4026 #[cfg(feature = "deploy")]
4027 #[tokio::test]
4028 async fn unconsumed_inspect_still_runs() {
4029 use crate::deploy::DeployCrateWrapper;
4030
4031 let mut deployment = Deployment::new();
4032
4033 let mut flow = FlowBuilder::new();
4034 let node = flow.process::<()>();
4035
4036 // The return value of .inspect() is intentionally dropped.
4037 // Before the Null-root fix, this would silently do nothing.
4038 node.source_iter(q!(0..5))
4039 .inspect(q!(|x| println!("inspect: {}", x)));
4040
4041 let nodes = flow
4042 .with_process(&node, deployment.Localhost())
4043 .deploy(&mut deployment);
4044
4045 deployment.deploy().await.unwrap();
4046
4047 let mut stdout = nodes.get_process(&node).stdout();
4048
4049 deployment.start().await.unwrap();
4050
4051 let mut lines = Vec::new();
4052 for _ in 0..5 {
4053 lines.push(stdout.recv().await.unwrap());
4054 }
4055 lines.sort();
4056 assert_eq!(
4057 lines,
4058 vec![
4059 "inspect: 0",
4060 "inspect: 1",
4061 "inspect: 2",
4062 "inspect: 3",
4063 "inspect: 4",
4064 ]
4065 );
4066 }
4067
4068 #[cfg(feature = "sim")]
4069 #[test]
4070 fn sim_limit() {
4071 let mut flow = FlowBuilder::new();
4072 let node = flow.process::<()>();
4073
4074 let (in_send, input) = node.sim_input();
4075
4076 let out_recv = input.limit(q!(3)).sim_output();
4077
4078 flow.sim().exhaustive(async || {
4079 in_send.send(1);
4080 in_send.send(2);
4081 in_send.send(3);
4082 in_send.send(4);
4083 in_send.send(5);
4084
4085 out_recv.assert_yields_only([1, 2, 3]).await;
4086 });
4087 }
4088
4089 #[cfg(feature = "sim")]
4090 #[test]
4091 fn sim_limit_zero() {
4092 let mut flow = FlowBuilder::new();
4093 let node = flow.process::<()>();
4094
4095 let (in_send, input) = node.sim_input();
4096
4097 let out_recv = input.limit(q!(0)).sim_output();
4098
4099 flow.sim().exhaustive(async || {
4100 in_send.send(1);
4101 in_send.send(2);
4102
4103 out_recv.assert_yields_only::<i32, _>([]).await;
4104 });
4105 }
4106
4107 #[cfg(feature = "sim")]
4108 #[test]
4109 fn sim_merge_ordered() {
4110 let mut flow = FlowBuilder::new();
4111 let node = flow.process::<()>();
4112
4113 let (in_send, input) = node.sim_input();
4114 let (in_send2, input2) = node.sim_input();
4115
4116 let out_recv = input
4117 .merge_ordered(input2, nondet!(/** test */))
4118 .sim_output();
4119
4120 let mut saw_out_of_order = false;
4121 let instances = flow.sim().exhaustive(async || {
4122 in_send.send(1);
4123 in_send.send(2);
4124 in_send2.send(3);
4125 in_send2.send(4);
4126
4127 let out = out_recv.collect::<Vec<_>>().await;
4128
4129 if out == [1, 3, 2, 4] {
4130 saw_out_of_order = true;
4131 }
4132
4133 // Assert ordering preservation: elements from each input must
4134 // appear in their original relative order.
4135 let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4136 let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4137 assert_eq!(
4138 first_elements,
4139 vec![1, 2],
4140 "first input order violated: {:?}",
4141 out
4142 );
4143 assert_eq!(
4144 second_elements,
4145 vec![3, 4],
4146 "second input order violated: {:?}",
4147 out
4148 );
4149
4150 first_elements.append(&mut second_elements);
4151 first_elements.sort();
4152 assert_eq!(first_elements, vec![1, 2, 3, 4]);
4153 });
4154
4155 assert!(saw_out_of_order);
4156 assert_eq!(instances, 6);
4157 }
4158
4159 /// Tests that merge_ordered passes through elements when only one input
4160 /// has data.
4161 #[cfg(feature = "sim")]
4162 #[test]
4163 fn sim_merge_ordered_one_empty() {
4164 let mut flow = FlowBuilder::new();
4165 let node = flow.process::<()>();
4166
4167 let (in_send, input) = node.sim_input();
4168 let (_in_send2, input2) = node.sim_input();
4169
4170 let out_recv = input
4171 .merge_ordered(input2, nondet!(/** test */))
4172 .sim_output();
4173
4174 let instances = flow.sim().exhaustive(async || {
4175 in_send.send(1);
4176 in_send.send(2);
4177
4178 let out = out_recv.collect::<Vec<_>>().await;
4179 assert_eq!(out, vec![1, 2]);
4180 });
4181
4182 // Only one possible interleaving when one input is empty
4183 assert_eq!(instances, 1);
4184 }
4185
4186 /// Tests that merge_ordered correctly handles feedback cycles.
4187 /// An element output from merge_ordered is filtered and cycled back to
4188 /// one of its inputs. The one-at-a-time release must allow the cycled-back
4189 /// element to arrive and potentially be emitted before elements still
4190 /// waiting on the other input.
4191 #[cfg(feature = "sim")]
4192 #[test]
4193 fn sim_merge_ordered_cycle_back() {
4194 let mut flow = FlowBuilder::new();
4195 let node = flow.process::<()>();
4196
4197 let (in_send, input) = node.sim_input();
4198
4199 // Create a forward ref for the cycle back
4200 let (complete_cycle_back, cycle_back) =
4201 node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4202
4203 // merge_ordered: input (external) with cycle_back
4204 let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4205
4206 // Cycle back: elements equal to 1 get mapped to 10 and fed back
4207 complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4208
4209 let out_recv = merged.sim_output();
4210
4211 // Send 1 and 2. Element 1 should cycle back as 10.
4212 // Valid orderings must have 1 before 10 (since 10 depends on 1).
4213 let mut saw_cycle_before_second = false;
4214 flow.sim().exhaustive(async || {
4215 in_send.send(1);
4216 in_send.send(2);
4217
4218 let out = out_recv.collect::<Vec<_>>().await;
4219
4220 // 10 must always come after 1 (causal dependency)
4221 let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4222 let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4223 assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4224
4225 // Check if we see [1, 10, 2] — the cycled element beats the second input
4226 if out == [1, 10, 2] {
4227 saw_cycle_before_second = true;
4228 }
4229
4230 let mut sorted = out;
4231 sorted.sort();
4232 assert_eq!(sorted, vec![1, 2, 10]);
4233 });
4234
4235 assert!(
4236 saw_cycle_before_second,
4237 "never saw the cycled element arrive before the second input element"
4238 );
4239 }
4240
4241 /// Tests that merge_ordered correctly interleaves when one input has a
4242 /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4243 /// element 2 should be able to appear after b's elements.
4244 #[cfg(feature = "sim")]
4245 #[test]
4246 fn sim_merge_ordered_delayed() {
4247 let mut flow = FlowBuilder::new();
4248 let node = flow.process::<()>();
4249
4250 let (in_send, input) = node.sim_input();
4251 let (in_send2, input2) = node.sim_input();
4252
4253 let out_recv = input
4254 .merge_ordered(input2, nondet!(/** test */))
4255 .sim_output();
4256
4257 let mut saw_delayed_interleaving = false;
4258 flow.sim().exhaustive(async || {
4259 // Send 1 from a, and 3, 4 from b
4260 in_send.send(1);
4261 in_send2.send(3);
4262 in_send2.send(4);
4263
4264 // Collect what's available so far
4265 let first_batch = out_recv.collect::<Vec<_>>().await;
4266
4267 // Now send the delayed element 2 from a
4268 in_send.send(2);
4269 let second_batch = out_recv.collect::<Vec<_>>().await;
4270
4271 let mut all: Vec<_> = first_batch
4272 .iter()
4273 .chain(second_batch.iter())
4274 .copied()
4275 .collect();
4276
4277 // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4278 if all == [1, 3, 4, 2] {
4279 saw_delayed_interleaving = true;
4280 }
4281
4282 all.sort();
4283 assert_eq!(all, vec![1, 2, 3, 4]);
4284 });
4285
4286 assert!(saw_delayed_interleaving);
4287 }
4288
4289 /// Deploy test: merge_ordered with a delayed element on one input.
4290 /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4291 /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4292 /// both inputs are pulled and the delayed element arrives later.
4293 #[cfg(feature = "deploy")]
4294 #[tokio::test]
4295 async fn deploy_merge_ordered_delayed() {
4296 let mut deployment = Deployment::new();
4297
4298 let mut flow = FlowBuilder::new();
4299 let node = flow.process::<()>();
4300 let external = flow.external::<()>();
4301
4302 let (input_a_port, input_a) = node.source_external_bincode(&external);
4303 let (input_b_port, input_b) = node.source_external_bincode(&external);
4304
4305 let out = input_a
4306 .assume_ordering(nondet!(/** test */))
4307 .merge_ordered(
4308 input_b.assume_ordering(nondet!(/** test */)),
4309 nondet!(/** test */),
4310 )
4311 .send_bincode_external(&external);
4312
4313 let nodes = flow
4314 .with_process(&node, deployment.Localhost())
4315 .with_external(&external, deployment.Localhost())
4316 .deploy(&mut deployment);
4317
4318 deployment.deploy().await.unwrap();
4319
4320 let mut ext_a = nodes.connect(input_a_port).await;
4321 let mut ext_b = nodes.connect(input_b_port).await;
4322 let mut ext_out = nodes.connect(out).await;
4323
4324 deployment.start().await.unwrap();
4325
4326 // Send a=1, b=3, b=4
4327 ext_a.send(1).await.unwrap();
4328 ext_b.send(3).await.unwrap();
4329 ext_b.send(4).await.unwrap();
4330
4331 // Collect the first 3 elements
4332 let mut received = Vec::new();
4333 for _ in 0..3 {
4334 received.push(ext_out.next().await.unwrap());
4335 }
4336
4337 // Now send the delayed a=2
4338 ext_a.send(2).await.unwrap();
4339 received.push(ext_out.next().await.unwrap());
4340
4341 // All elements should be present
4342 received.sort();
4343 assert_eq!(received, vec![1, 2, 3, 4]);
4344 }
4345
4346 #[cfg(feature = "deploy")]
4347 #[tokio::test]
4348 async fn monotone_fold_threshold() {
4349 use crate::properties::manual_proof;
4350
4351 let mut deployment = Deployment::new();
4352
4353 let mut flow = FlowBuilder::new();
4354 let node = flow.process::<()>();
4355 let external = flow.external::<()>();
4356
4357 let in_unbounded: super::Stream<_, _> =
4358 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4359 let sum = in_unbounded.fold(
4360 q!(|| 0),
4361 q!(
4362 |sum, v| {
4363 *sum += v;
4364 },
4365 monotone = manual_proof!(/** test */)
4366 ),
4367 );
4368
4369 let threshold_out = sum
4370 .threshold_greater_or_equal(node.singleton(q!(7)))
4371 .send_bincode_external(&external);
4372
4373 let nodes = flow
4374 .with_process(&node, deployment.Localhost())
4375 .with_external(&external, deployment.Localhost())
4376 .deploy(&mut deployment);
4377
4378 deployment.deploy().await.unwrap();
4379
4380 let mut threshold_out = nodes.connect(threshold_out).await;
4381
4382 deployment.start().await.unwrap();
4383
4384 assert_eq!(threshold_out.next().await.unwrap(), 7);
4385 }
4386
4387 #[cfg(feature = "deploy")]
4388 #[tokio::test]
4389 async fn monotone_count_threshold() {
4390 let mut deployment = Deployment::new();
4391
4392 let mut flow = FlowBuilder::new();
4393 let node = flow.process::<()>();
4394 let external = flow.external::<()>();
4395
4396 let in_unbounded: super::Stream<_, _> =
4397 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4398 let sum = in_unbounded.count();
4399
4400 let threshold_out = sum
4401 .threshold_greater_or_equal(node.singleton(q!(3)))
4402 .send_bincode_external(&external);
4403
4404 let nodes = flow
4405 .with_process(&node, deployment.Localhost())
4406 .with_external(&external, deployment.Localhost())
4407 .deploy(&mut deployment);
4408
4409 deployment.deploy().await.unwrap();
4410
4411 let mut threshold_out = nodes.connect(threshold_out).await;
4412
4413 deployment.start().await.unwrap();
4414
4415 assert_eq!(threshold_out.next().await.unwrap(), 3);
4416 }
4417
4418 #[cfg(feature = "deploy")]
4419 #[tokio::test]
4420 async fn monotone_map_order_preserving_threshold() {
4421 use crate::properties::manual_proof;
4422
4423 let mut deployment = Deployment::new();
4424
4425 let mut flow = FlowBuilder::new();
4426 let node = flow.process::<()>();
4427 let external = flow.external::<()>();
4428
4429 let in_unbounded: super::Stream<_, _> =
4430 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4431 let sum = in_unbounded.fold(
4432 q!(|| 0),
4433 q!(
4434 |sum, v| {
4435 *sum += v;
4436 },
4437 monotone = manual_proof!(/** test */)
4438 ),
4439 );
4440
4441 // map with order_preserving should preserve monotonicity
4442 let doubled = sum.map(q!(
4443 |v| v * 2,
4444 order_preserving = manual_proof!(/** doubling preserves order */)
4445 ));
4446
4447 let threshold_out = doubled
4448 .threshold_greater_or_equal(node.singleton(q!(14)))
4449 .send_bincode_external(&external);
4450
4451 let nodes = flow
4452 .with_process(&node, deployment.Localhost())
4453 .with_external(&external, deployment.Localhost())
4454 .deploy(&mut deployment);
4455
4456 deployment.deploy().await.unwrap();
4457
4458 let mut threshold_out = nodes.connect(threshold_out).await;
4459
4460 deployment.start().await.unwrap();
4461
4462 assert_eq!(threshold_out.next().await.unwrap(), 14);
4463 }
4464
4465 // === Compile-time type tests for join/cross_product ordering ===
4466
4467 #[cfg(any(feature = "deploy", feature = "sim"))]
4468 mod join_ordering_type_tests {
4469 use crate::live_collections::boundedness::{Bounded, Unbounded};
4470 use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4471 use crate::location::{Location, Process};
4472
4473 #[expect(dead_code, reason = "compile-time type test")]
4474 fn join_unbounded_with_bounded_preserves_order<'a>(
4475 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4476 right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4477 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4478 left.join(right)
4479 }
4480
4481 #[expect(dead_code, reason = "compile-time type test")]
4482 fn join_unbounded_with_unbounded_is_no_order<'a>(
4483 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4484 right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4485 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4486 left.join(right)
4487 }
4488
4489 #[expect(dead_code, reason = "compile-time type test")]
4490 fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4491 left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4492 right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4493 ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4494 left.join(right)
4495 }
4496
4497 #[expect(dead_code, reason = "compile-time type test")]
4498 fn join_unbounded_noorder_with_bounded<'a>(
4499 left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4500 right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4501 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4502 left.join(right)
4503 }
4504
4505 // === Compile-time type tests for cross_product ordering ===
4506
4507 #[expect(dead_code, reason = "compile-time type test")]
4508 fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4509 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4510 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4511 ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4512 left.cross_product(right)
4513 }
4514
4515 #[expect(dead_code, reason = "compile-time type test")]
4516 fn cross_product_bounded_with_bounded_preserves_order<'a>(
4517 left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4518 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4519 ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4520 left.cross_product(right)
4521 }
4522
4523 #[expect(dead_code, reason = "compile-time type test")]
4524 fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4525 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4526 right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4527 ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4528 left.cross_product(right)
4529 }
4530 } // mod join_ordering_type_tests
4531
4532 // === Runtime correctness tests for bounded join/cross_product ===
4533
4534 #[cfg(feature = "sim")]
4535 #[test]
4536 fn cross_product_mixed_boundedness_correctness() {
4537 use stageleft::q;
4538
4539 use crate::compile::builder::FlowBuilder;
4540 use crate::nondet::nondet;
4541
4542 let mut flow = FlowBuilder::new();
4543 let process = flow.process::<()>();
4544 let tick = process.tick();
4545
4546 let left = process.source_iter(q!(vec![1, 2]));
4547 let right = process
4548 .source_iter(q!(vec!['a', 'b']))
4549 .batch(&tick, nondet!(/** test */))
4550 .all_ticks();
4551
4552 let out = left.cross_product(right).sim_output();
4553
4554 flow.sim().exhaustive(async || {
4555 out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4556 .await;
4557 });
4558 }
4559
4560 #[cfg(feature = "sim")]
4561 #[test]
4562 fn join_mixed_boundedness_correctness() {
4563 use stageleft::q;
4564
4565 use crate::compile::builder::FlowBuilder;
4566 use crate::nondet::nondet;
4567
4568 let mut flow = FlowBuilder::new();
4569 let process = flow.process::<()>();
4570 let tick = process.tick();
4571
4572 let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4573 let right = process
4574 .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4575 .batch(&tick, nondet!(/** test */))
4576 .all_ticks();
4577
4578 let out = left.join(right).sim_output();
4579
4580 flow.sim().exhaustive(async || {
4581 out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4582 .await;
4583 });
4584 }
4585
4586 #[cfg(feature = "sim")]
4587 #[test]
4588 fn sim_merge_unordered_independent_atomics() {
4589 let mut flow = FlowBuilder::new();
4590 let node = flow.process::<()>();
4591
4592 let (in1_send, input1) = node.sim_input::<_, TotalOrder, _>();
4593 let (in2_send, input2) = node.sim_input::<_, TotalOrder, _>();
4594
4595 let out = input1
4596 .atomic()
4597 .merge_unordered(input2.atomic())
4598 .end_atomic()
4599 .sim_output();
4600
4601 flow.sim().exhaustive(async || {
4602 in1_send.send(1);
4603 in2_send.send(2);
4604
4605 out.assert_yields_only_unordered(vec![1, 2]).await;
4606 });
4607 }
4608}