hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::{Atomic, NoAtomic};
25use crate::location::{Location, NoTick, Tick, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{
28 ApplyMonotoneStream, ApplyOrderPreservingSingleton, MapFuncAlgebra, Proved,
29};
30
31/// A marker trait indicating which components of a [`Singleton`] may change.
32///
33/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
34/// includes an additional variant [`Monotonic`], which means that the value will only grow.
35pub trait SingletonBound {
36 /// The [`Boundedness`] that this [`Singleton`] would be erased to.
37 type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
38
39 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
40 type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
41
42 /// Returns the [`SingletonBoundKind`] corresponding to this type.
43 fn bound_kind() -> SingletonBoundKind;
44}
45
46impl SingletonBound for Unbounded {
47 type UnderlyingBound = Unbounded;
48
49 type StreamToMonotone = Monotonic;
50
51 fn bound_kind() -> SingletonBoundKind {
52 SingletonBoundKind::Unbounded
53 }
54}
55
56impl SingletonBound for Bounded {
57 type UnderlyingBound = Bounded;
58
59 type StreamToMonotone = Bounded;
60
61 fn bound_kind() -> SingletonBoundKind {
62 SingletonBoundKind::Bounded
63 }
64}
65
66/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
67pub struct Monotonic;
68
69impl SingletonBound for Monotonic {
70 type UnderlyingBound = Unbounded;
71
72 type StreamToMonotone = Monotonic;
73
74 fn bound_kind() -> SingletonBoundKind {
75 SingletonBoundKind::Monotonic
76 }
77}
78
79#[sealed]
80#[diagnostic::on_unimplemented(
81 message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
82 label = "required here",
83 note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
84)]
85/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
86pub trait IsMonotonic: SingletonBound {}
87
88#[sealed]
89#[diagnostic::do_not_recommend]
90impl IsMonotonic for Monotonic {}
91
92#[sealed]
93#[diagnostic::do_not_recommend]
94impl<B: IsBounded> IsMonotonic for B {}
95
96/// A single Rust value that can asynchronously change over time.
97///
98/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
99/// [`Unbounded`], the value will asynchronously change over time.
100///
101/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
102/// a single number that will asynchronously change as events are processed. Singletons also appear
103/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
104/// such as getting the length of a batch of requests.
105///
106/// Type Parameters:
107/// - `Type`: the type of the value in this singleton
108/// - `Loc`: the [`Location`] where the singleton is materialized
109/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
110pub struct Singleton<Type, Loc, Bound: SingletonBound> {
111 pub(crate) location: Loc,
112 pub(crate) ir_node: RefCell<HydroNode>,
113 pub(crate) flow_state: FlowState,
114
115 _phantom: PhantomData<(Type, Loc, Bound)>,
116}
117
118impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
119 fn drop(&mut self) {
120 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
121 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
122 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
123 input: Box::new(ir_node),
124 op_metadata: HydroIrOpMetadata::new(),
125 });
126 }
127 }
128}
129
130impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
131where
132 T: Clone,
133 L: Location<'a> + NoTick,
134{
135 fn from(value: Singleton<T, L, Bounded>) -> Self {
136 let tick = value.location().tick();
137 value.clone_into_tick(&tick).latest()
138 }
139}
140
141impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
142where
143 L: Location<'a>,
144{
145 type Location = Tick<L>;
146
147 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
148 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
149 location.clone(),
150 HydroNode::DeferTick {
151 input: Box::new(HydroNode::CycleSource {
152 cycle_id,
153 metadata: location.new_node_metadata(Self::collection_kind()),
154 }),
155 metadata: location
156 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
157 },
158 );
159
160 from_previous_tick.unwrap_or(initial)
161 }
162}
163
164impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
165where
166 L: Location<'a>,
167{
168 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
169 assert_eq!(
170 Location::id(&self.location),
171 expected_location,
172 "locations do not match"
173 );
174 self.location
175 .flow_state()
176 .borrow_mut()
177 .push_root(HydroRoot::CycleSink {
178 cycle_id,
179 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
180 op_metadata: HydroIrOpMetadata::new(),
181 });
182 }
183}
184
185impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
186where
187 L: Location<'a>,
188{
189 type Location = Tick<L>;
190
191 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
192 Singleton::new(
193 location.clone(),
194 HydroNode::CycleSource {
195 cycle_id,
196 metadata: location.new_node_metadata(Self::collection_kind()),
197 },
198 )
199 }
200}
201
202impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
203where
204 L: Location<'a>,
205{
206 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
207 assert_eq!(
208 Location::id(&self.location),
209 expected_location,
210 "locations do not match"
211 );
212 self.location
213 .flow_state()
214 .borrow_mut()
215 .push_root(HydroRoot::CycleSink {
216 cycle_id,
217 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
218 op_metadata: HydroIrOpMetadata::new(),
219 });
220 }
221}
222
223impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
224where
225 L: Location<'a> + NoTick,
226{
227 type Location = L;
228
229 fn create_source(cycle_id: CycleId, location: L) -> Self {
230 Singleton::new(
231 location.clone(),
232 HydroNode::CycleSource {
233 cycle_id,
234 metadata: location.new_node_metadata(Self::collection_kind()),
235 },
236 )
237 }
238}
239
240impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
241where
242 L: Location<'a> + NoTick,
243{
244 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
245 assert_eq!(
246 Location::id(&self.location),
247 expected_location,
248 "locations do not match"
249 );
250 self.location
251 .flow_state()
252 .borrow_mut()
253 .push_root(HydroRoot::CycleSink {
254 cycle_id,
255 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
256 op_metadata: HydroIrOpMetadata::new(),
257 });
258 }
259}
260
261impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
262where
263 T: Clone,
264 L: Location<'a>,
265{
266 fn clone(&self) -> Self {
267 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
268 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
269 *self.ir_node.borrow_mut() = HydroNode::Tee {
270 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
271 metadata: self.location.new_node_metadata(Self::collection_kind()),
272 };
273 }
274
275 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
276 Singleton {
277 location: self.location.clone(),
278 flow_state: self.flow_state.clone(),
279 ir_node: HydroNode::Tee {
280 inner: SharedNode(inner.0.clone()),
281 metadata: metadata.clone(),
282 }
283 .into(),
284 _phantom: PhantomData,
285 }
286 } else {
287 unreachable!()
288 }
289 }
290}
291
292#[cfg(stageleft_runtime)]
293fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
294 me: Singleton<T, Tick<L>, B>,
295 other: Optional<O, Tick<L>, B::UnderlyingBound>,
296) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
297 let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
298 super::optional::zip_inside_tick(me_as_optional, other)
299}
300
301impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
302where
303 L: Location<'a>,
304{
305 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
306 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
307 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
308 let flow_state = location.flow_state().clone();
309 Singleton {
310 location,
311 flow_state,
312 ir_node: RefCell::new(ir_node),
313 _phantom: PhantomData,
314 }
315 }
316
317 pub(crate) fn collection_kind() -> CollectionKind {
318 CollectionKind::Singleton {
319 bound: B::bound_kind(),
320 element_type: stageleft::quote_type::<T>().into(),
321 }
322 }
323
324 /// Returns the [`Location`] where this singleton is being materialized.
325 pub fn location(&self) -> &L {
326 &self.location
327 }
328
329 /// Creates a lightweight reference handle to this singleton that can be captured
330 /// inside `q!()` closures. The handle resolves to `&T` at runtime.
331 ///
332 /// The singleton must be bounded, otherwise reading it would be non-deterministic.
333 ///
334 /// ```rust
335 /// # #[cfg(feature = "deploy")] {
336 /// # use hydro_lang::prelude::*;
337 /// # use futures::StreamExt;
338 /// # tokio_test::block_on(async {
339 /// # let mut deployment = hydro_deploy::Deployment::new();
340 /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
341 /// # let process = builder.process::<()>();
342 /// # let external = builder.external::<()>();
343 /// let my_count = process
344 /// .source_iter(q!(0..5i32))
345 /// .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
346 /// let count_ref = my_count.by_ref();
347 /// let out_port = process
348 /// .source_iter(q!(1..=3i32))
349 /// .map(q!(|x| x + *count_ref))
350 /// .send_bincode_external(&external);
351 /// # let nodes = builder
352 /// # .with_default_optimize()
353 /// # .with_process(&process, deployment.Localhost())
354 /// # .with_external(&external, deployment.Localhost())
355 /// # .deploy(&mut deployment);
356 /// # deployment.deploy().await.unwrap();
357 /// # let mut out_recv = nodes.connect(out_port).await;
358 /// # deployment.start().await.unwrap();
359 /// # let mut results = Vec::new();
360 /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
361 /// # results.sort();
362 /// // fold(0..5) = 10, so results are 11, 12, 13
363 /// # assert_eq!(results, vec![11, 12, 13]);
364 /// # });
365 /// # }
366 /// ```
367 pub fn by_ref(&self) -> crate::singleton_ref::SingletonRef<'a, T, L>
368 where
369 B: IsBounded,
370 {
371 // Wrap in HydroNode::Singleton for materialization + identity tracking.
372 // If already a Singleton node, reuse it. If a Tee (from clone), wrap inner.
373 if !matches!(&*self.ir_node.borrow(), HydroNode::Singleton { .. }) {
374 let orig = self.ir_node.replace(HydroNode::Placeholder);
375 *self.ir_node.borrow_mut() = HydroNode::Singleton {
376 inner: SharedNode(Rc::new(RefCell::new(orig))),
377 metadata: self.location.new_node_metadata(Self::collection_kind()),
378 };
379 }
380 let borrow = self.ir_node.borrow();
381 let HydroNode::Singleton { inner, .. } = &*borrow else {
382 unreachable!()
383 };
384 crate::singleton_ref::SingletonRef::new(Rc::clone(&inner.0))
385 }
386
387 /// Drops the monotonicity property of the [`Singleton`].
388 pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
389 if B::bound_kind() == B::UnderlyingBound::bound_kind() {
390 Singleton::new(
391 self.location.clone(),
392 self.ir_node.replace(HydroNode::Placeholder),
393 )
394 } else {
395 Singleton::new(
396 self.location.clone(),
397 HydroNode::Cast {
398 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
399 metadata:
400 self.location.new_node_metadata(
401 Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
402 ),
403 },
404 )
405 }
406 }
407
408 /// Transforms the singleton value by applying a function `f` to it,
409 /// continuously as the input is updated.
410 ///
411 /// # Example
412 /// ```rust
413 /// # #[cfg(feature = "deploy")] {
414 /// # use hydro_lang::prelude::*;
415 /// # use futures::StreamExt;
416 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
417 /// let tick = process.tick();
418 /// let singleton = tick.singleton(q!(5));
419 /// singleton.map(q!(|v| v * 2)).all_ticks()
420 /// # }, |mut stream| async move {
421 /// // 10
422 /// # assert_eq!(stream.next().await.unwrap(), 10);
423 /// # }));
424 /// # }
425 /// ```
426 pub fn map<U, F, OP, B2: SingletonBound>(
427 self,
428 f: impl IntoQuotedMut<'a, F, L, MapFuncAlgebra<OP>>,
429 ) -> Singleton<U, L, B2>
430 where
431 F: Fn(T) -> U + 'a,
432 B: ApplyOrderPreservingSingleton<OP, B2>,
433 {
434 let (f, proof) = f.splice_fn1_ctx_props(&self.location);
435 proof.register_proof(&f);
436 let f = f.into();
437 Singleton::new(
438 self.location.clone(),
439 HydroNode::Map {
440 f,
441 singleton_refs: Vec::new(),
442 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
443 metadata: self
444 .location
445 .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
446 },
447 )
448 }
449
450 /// Transforms the singleton value by applying a function `f` to it and then flattening
451 /// the result into a stream, preserving the order of elements.
452 ///
453 /// The function `f` is applied to the singleton value to produce an iterator, and all items
454 /// from that iterator are emitted in the output stream in deterministic order.
455 ///
456 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
457 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
458 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
459 ///
460 /// # Example
461 /// ```rust
462 /// # #[cfg(feature = "deploy")] {
463 /// # use hydro_lang::prelude::*;
464 /// # use futures::StreamExt;
465 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
466 /// let tick = process.tick();
467 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
468 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
469 /// # }, |mut stream| async move {
470 /// // 1, 2, 3
471 /// # for w in vec![1, 2, 3] {
472 /// # assert_eq!(stream.next().await.unwrap(), w);
473 /// # }
474 /// # }));
475 /// # }
476 /// ```
477 pub fn flat_map_ordered<U, I, F>(
478 self,
479 f: impl IntoQuotedMut<'a, F, L>,
480 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
481 where
482 B: IsBounded,
483 I: IntoIterator<Item = U>,
484 F: Fn(T) -> I + 'a,
485 {
486 self.into_stream().flat_map_ordered(f)
487 }
488
489 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
490 /// for the output type `I` to produce items in any order.
491 ///
492 /// The function `f` is applied to the singleton value to produce an iterator, and all items
493 /// from that iterator are emitted in the output stream in non-deterministic order.
494 ///
495 /// # Example
496 /// ```rust
497 /// # #[cfg(feature = "deploy")] {
498 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
499 /// # use futures::StreamExt;
500 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
501 /// let tick = process.tick();
502 /// let singleton = tick.singleton(q!(
503 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
504 /// ));
505 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
506 /// # }, |mut stream| async move {
507 /// // 1, 2, 3, but in no particular order
508 /// # let mut results = Vec::new();
509 /// # for _ in 0..3 {
510 /// # results.push(stream.next().await.unwrap());
511 /// # }
512 /// # results.sort();
513 /// # assert_eq!(results, vec![1, 2, 3]);
514 /// # }));
515 /// # }
516 /// ```
517 pub fn flat_map_unordered<U, I, F>(
518 self,
519 f: impl IntoQuotedMut<'a, F, L>,
520 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
521 where
522 B: IsBounded,
523 I: IntoIterator<Item = U>,
524 F: Fn(T) -> I + 'a,
525 {
526 self.into_stream().flat_map_unordered(f)
527 }
528
529 /// Flattens the singleton value into a stream, preserving the order of elements.
530 ///
531 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
532 /// are emitted in the output stream in deterministic order.
533 ///
534 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
535 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
536 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
537 ///
538 /// # Example
539 /// ```rust
540 /// # #[cfg(feature = "deploy")] {
541 /// # use hydro_lang::prelude::*;
542 /// # use futures::StreamExt;
543 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
544 /// let tick = process.tick();
545 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
546 /// singleton.flatten_ordered().all_ticks()
547 /// # }, |mut stream| async move {
548 /// // 1, 2, 3
549 /// # for w in vec![1, 2, 3] {
550 /// # assert_eq!(stream.next().await.unwrap(), w);
551 /// # }
552 /// # }));
553 /// # }
554 /// ```
555 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
556 where
557 B: IsBounded,
558 T: IntoIterator<Item = U>,
559 {
560 self.flat_map_ordered(q!(|x| x))
561 }
562
563 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
564 /// for the element type `T` to produce items in any order.
565 ///
566 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
567 /// are emitted in the output stream in non-deterministic order.
568 ///
569 /// # Example
570 /// ```rust
571 /// # #[cfg(feature = "deploy")] {
572 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
573 /// # use futures::StreamExt;
574 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
575 /// let tick = process.tick();
576 /// let singleton = tick.singleton(q!(
577 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
578 /// ));
579 /// singleton.flatten_unordered().all_ticks()
580 /// # }, |mut stream| async move {
581 /// // 1, 2, 3, but in no particular order
582 /// # let mut results = Vec::new();
583 /// # for _ in 0..3 {
584 /// # results.push(stream.next().await.unwrap());
585 /// # }
586 /// # results.sort();
587 /// # assert_eq!(results, vec![1, 2, 3]);
588 /// # }));
589 /// # }
590 /// ```
591 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
592 where
593 B: IsBounded,
594 T: IntoIterator<Item = U>,
595 {
596 self.flat_map_unordered(q!(|x| x))
597 }
598
599 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
600 ///
601 /// If the predicate returns `true`, the output optional contains the same value.
602 /// If the predicate returns `false`, the output optional is empty.
603 ///
604 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
605 /// not modify or take ownership of the value. If you need to modify the value while filtering
606 /// use [`Singleton::filter_map`] instead.
607 ///
608 /// # Example
609 /// ```rust
610 /// # #[cfg(feature = "deploy")] {
611 /// # use hydro_lang::prelude::*;
612 /// # use futures::StreamExt;
613 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
614 /// let tick = process.tick();
615 /// let singleton = tick.singleton(q!(5));
616 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
617 /// # }, |mut stream| async move {
618 /// // 5
619 /// # assert_eq!(stream.next().await.unwrap(), 5);
620 /// # }));
621 /// # }
622 /// ```
623 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
624 where
625 F: Fn(&T) -> bool + 'a,
626 {
627 let f = f.splice_fn1_borrow_ctx(&self.location).into();
628 Optional::new(
629 self.location.clone(),
630 HydroNode::Filter {
631 f,
632 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
633 metadata: self
634 .location
635 .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
636 },
637 )
638 }
639
640 /// An operator that both filters and maps. It yields the value only if the supplied
641 /// closure `f` returns `Some(value)`.
642 ///
643 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
644 /// If the closure returns `None`, the output optional is empty.
645 ///
646 /// # Example
647 /// ```rust
648 /// # #[cfg(feature = "deploy")] {
649 /// # use hydro_lang::prelude::*;
650 /// # use futures::StreamExt;
651 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
652 /// let tick = process.tick();
653 /// let singleton = tick.singleton(q!("42"));
654 /// singleton
655 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
656 /// .all_ticks()
657 /// # }, |mut stream| async move {
658 /// // 42
659 /// # assert_eq!(stream.next().await.unwrap(), 42);
660 /// # }));
661 /// # }
662 /// ```
663 pub fn filter_map<U, F>(
664 self,
665 f: impl IntoQuotedMut<'a, F, L>,
666 ) -> Optional<U, L, B::UnderlyingBound>
667 where
668 F: Fn(T) -> Option<U> + 'a,
669 {
670 let f = f.splice_fn1_ctx(&self.location).into();
671 Optional::new(
672 self.location.clone(),
673 HydroNode::FilterMap {
674 f,
675 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
676 metadata: self
677 .location
678 .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
679 },
680 )
681 }
682
683 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
684 ///
685 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
686 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
687 /// non-null. This is useful for combining several pieces of state together.
688 ///
689 /// # Example
690 /// ```rust
691 /// # #[cfg(feature = "deploy")] {
692 /// # use hydro_lang::prelude::*;
693 /// # use futures::StreamExt;
694 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
695 /// let tick = process.tick();
696 /// let numbers = process
697 /// .source_iter(q!(vec![123, 456]))
698 /// .batch(&tick, nondet!(/** test */));
699 /// let count = numbers.clone().count(); // Singleton
700 /// let max = numbers.max(); // Optional
701 /// count.zip(max).all_ticks()
702 /// # }, |mut stream| async move {
703 /// // [(2, 456)]
704 /// # for w in vec![(2, 456)] {
705 /// # assert_eq!(stream.next().await.unwrap(), w);
706 /// # }
707 /// # }));
708 /// # }
709 /// ```
710 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
711 where
712 Self: ZipResult<'a, O, Location = L>,
713 B: IsBounded,
714 {
715 check_matching_location(&self.location, &Self::other_location(&other));
716
717 if L::is_top_level()
718 && let Some(tick) = self.location.try_tick()
719 {
720 let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
721 let out = zip_inside_tick(
722 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
723 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
724 other_location.clone(),
725 HydroNode::Cast {
726 inner: Box::new(Self::other_ir_node(other)),
727 metadata: other_location.new_node_metadata(Optional::<
728 <Self as ZipResult<'a, O>>::OtherType,
729 Tick<L>,
730 Bounded,
731 >::collection_kind(
732 )),
733 },
734 )
735 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
736 )
737 .latest();
738
739 Self::make(
740 out.location.clone(),
741 out.ir_node.replace(HydroNode::Placeholder),
742 )
743 } else {
744 Self::make(
745 self.location.clone(),
746 HydroNode::CrossSingleton {
747 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
748 right: Box::new(Self::other_ir_node(other)),
749 metadata: self.location.new_node_metadata(CollectionKind::Optional {
750 bound: B::BOUND_KIND,
751 element_type: stageleft::quote_type::<
752 <Self as ZipResult<'a, O>>::ElementType,
753 >()
754 .into(),
755 }),
756 },
757 )
758 }
759 }
760
761 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
762 /// boolean signal is `true`, otherwise the output is null.
763 ///
764 /// # Example
765 /// ```rust
766 /// # #[cfg(feature = "deploy")] {
767 /// # use hydro_lang::prelude::*;
768 /// # use futures::StreamExt;
769 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
770 /// let tick = process.tick();
771 /// // ticks are lazy by default, forces the second tick to run
772 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
773 ///
774 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
775 /// let batch_first_tick = process
776 /// .source_iter(q!(vec![1]))
777 /// .batch(&tick, nondet!(/** test */));
778 /// let batch_second_tick = process
779 /// .source_iter(q!(vec![1, 2, 3]))
780 /// .batch(&tick, nondet!(/** test */))
781 /// .defer_tick();
782 /// batch_first_tick.chain(batch_second_tick).count()
783 /// .filter_if(signal)
784 /// .all_ticks()
785 /// # }, |mut stream| async move {
786 /// // [1]
787 /// # for w in vec![1] {
788 /// # assert_eq!(stream.next().await.unwrap(), w);
789 /// # }
790 /// # }));
791 /// # }
792 /// ```
793 pub fn filter_if(
794 self,
795 signal: Singleton<bool, L, B>,
796 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
797 where
798 B: IsBounded,
799 {
800 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
801 }
802
803 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
804 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
805 ///
806 /// Useful for conditionally processing, such as only emitting a singleton's value outside
807 /// a tick if some other condition is satisfied.
808 ///
809 /// # Example
810 /// ```rust
811 /// # #[cfg(feature = "deploy")] {
812 /// # use hydro_lang::prelude::*;
813 /// # use futures::StreamExt;
814 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
815 /// let tick = process.tick();
816 /// // ticks are lazy by default, forces the second tick to run
817 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
818 ///
819 /// let batch_first_tick = process
820 /// .source_iter(q!(vec![1]))
821 /// .batch(&tick, nondet!(/** test */));
822 /// let batch_second_tick = process
823 /// .source_iter(q!(vec![1, 2, 3]))
824 /// .batch(&tick, nondet!(/** test */))
825 /// .defer_tick(); // appears on the second tick
826 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
827 /// batch_first_tick.chain(batch_second_tick).count()
828 /// .filter_if_some(some_on_first_tick)
829 /// .all_ticks()
830 /// # }, |mut stream| async move {
831 /// // [1]
832 /// # for w in vec![1] {
833 /// # assert_eq!(stream.next().await.unwrap(), w);
834 /// # }
835 /// # }));
836 /// # }
837 /// ```
838 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
839 pub fn filter_if_some<U>(
840 self,
841 signal: Optional<U, L, B>,
842 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
843 where
844 B: IsBounded,
845 {
846 self.filter_if(signal.is_some())
847 }
848
849 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
850 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
851 ///
852 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
853 /// the condition.
854 ///
855 /// # Example
856 /// ```rust
857 /// # #[cfg(feature = "deploy")] {
858 /// # use hydro_lang::prelude::*;
859 /// # use futures::StreamExt;
860 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
861 /// let tick = process.tick();
862 /// // ticks are lazy by default, forces the second tick to run
863 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
864 ///
865 /// let batch_first_tick = process
866 /// .source_iter(q!(vec![1]))
867 /// .batch(&tick, nondet!(/** test */));
868 /// let batch_second_tick = process
869 /// .source_iter(q!(vec![1, 2, 3]))
870 /// .batch(&tick, nondet!(/** test */))
871 /// .defer_tick(); // appears on the second tick
872 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
873 /// batch_first_tick.chain(batch_second_tick).count()
874 /// .filter_if_none(some_on_first_tick)
875 /// .all_ticks()
876 /// # }, |mut stream| async move {
877 /// // [3]
878 /// # for w in vec![3] {
879 /// # assert_eq!(stream.next().await.unwrap(), w);
880 /// # }
881 /// # }));
882 /// # }
883 /// ```
884 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
885 pub fn filter_if_none<U>(
886 self,
887 other: Optional<U, L, B>,
888 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
889 where
890 B: IsBounded,
891 {
892 self.filter_if(other.is_none())
893 }
894
895 /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
896 ///
897 /// # Example
898 /// ```rust
899 /// # #[cfg(feature = "deploy")] {
900 /// # use hydro_lang::prelude::*;
901 /// # use futures::StreamExt;
902 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
903 /// let tick = process.tick();
904 /// let a = tick.singleton(q!(5));
905 /// let b = tick.singleton(q!(5));
906 /// a.equals(b).all_ticks()
907 /// # }, |mut stream| async move {
908 /// // [true]
909 /// # assert_eq!(stream.next().await.unwrap(), true);
910 /// # }));
911 /// # }
912 /// ```
913 pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
914 where
915 T: PartialEq,
916 B: IsBounded,
917 {
918 self.zip(other).map(q!(|(a, b)| a == b))
919 }
920
921 /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
922 /// greater than or equal to the provided threshold. The event will have the value of the
923 /// given threshold.
924 ///
925 /// This requires the incoming singleton to be monotonic, because otherwise the detection of
926 /// the threshold would be non-deterministic.
927 ///
928 /// # Example
929 /// ```rust
930 /// # #[cfg(feature = "deploy")] {
931 /// # use hydro_lang::prelude::*;
932 /// # use futures::StreamExt;
933 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
934 /// let a = // singleton 1 ~> 5 ~> 10
935 /// # process.singleton(q!(5));
936 /// let b = process.singleton(q!(4));
937 /// a.threshold_greater_or_equal(b)
938 /// # }, |mut stream| async move {
939 /// // [4]
940 /// # assert_eq!(stream.next().await.unwrap(), 4);
941 /// # }));
942 /// # }
943 /// ```
944 pub fn threshold_greater_or_equal<B2: IsBounded>(
945 self,
946 threshold: Singleton<T, L, B2>,
947 ) -> Stream<T, L, B::UnderlyingBound>
948 where
949 T: Clone + PartialOrd,
950 B: IsMonotonic,
951 {
952 let threshold = threshold.make_bounded();
953 match self.try_make_bounded() {
954 Ok(bounded) => {
955 let uncasted = threshold
956 .zip(bounded)
957 .into_stream()
958 .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
959
960 Stream::new(
961 uncasted.location.clone(),
962 uncasted.ir_node.replace(HydroNode::Placeholder),
963 )
964 }
965 Err(me) => {
966 let uncasted = sliced! {
967 let me = use(me, nondet!(/** thresholds are deterministic */));
968 let mut remaining_threshold = use::state(|l| {
969 let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
970 as_option
971 });
972
973 let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
974 remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
975 passed.map(q!(|(t, _)| t))
976 };
977
978 Stream::new(
979 uncasted.location.clone(),
980 uncasted.ir_node.replace(HydroNode::Placeholder),
981 )
982 }
983 }
984 }
985
986 /// An operator which allows you to "name" a `HydroNode`.
987 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
988 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
989 {
990 let mut node = self.ir_node.borrow_mut();
991 let metadata = node.metadata_mut();
992 metadata.tag = Some(name.to_owned());
993 }
994 self
995 }
996}
997
998impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
999 type Output = Singleton<bool, L, B::UnderlyingBound>;
1000
1001 fn not(self) -> Self::Output {
1002 self.map(q!(|b| !b))
1003 }
1004}
1005
1006impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
1007where
1008 L: Location<'a>,
1009{
1010 /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
1011 /// the inner `Option`.
1012 ///
1013 /// This is implemented as an identity [`Singleton::filter_map`], passing through the
1014 /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
1015 /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
1016 ///
1017 /// # Example
1018 /// ```rust
1019 /// # #[cfg(feature = "deploy")] {
1020 /// # use hydro_lang::prelude::*;
1021 /// # use futures::StreamExt;
1022 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1023 /// let tick = process.tick();
1024 /// let singleton = tick.singleton(q!(Some(42)));
1025 /// singleton.into_optional().all_ticks()
1026 /// # }, |mut stream| async move {
1027 /// // 42
1028 /// # assert_eq!(stream.next().await.unwrap(), 42);
1029 /// # }));
1030 /// # }
1031 /// ```
1032 pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
1033 self.filter_map(q!(|v| v))
1034 }
1035}
1036
1037impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
1038where
1039 L: Location<'a>,
1040{
1041 /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
1042 ///
1043 /// # Example
1044 /// ```rust
1045 /// # #[cfg(feature = "deploy")] {
1046 /// # use hydro_lang::prelude::*;
1047 /// # use futures::StreamExt;
1048 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1049 /// let tick = process.tick();
1050 /// // ticks are lazy by default, forces the second tick to run
1051 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1052 ///
1053 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1054 /// let b = tick.singleton(q!(true)); // true, true
1055 /// a.and(b).all_ticks()
1056 /// # }, |mut stream| async move {
1057 /// // [true, false]
1058 /// # for w in vec![true, false] {
1059 /// # assert_eq!(stream.next().await.unwrap(), w);
1060 /// # }
1061 /// # }));
1062 /// # }
1063 /// ```
1064 pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1065 where
1066 B: IsBounded,
1067 {
1068 self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1069 }
1070
1071 /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1072 ///
1073 /// # Example
1074 /// ```rust
1075 /// # #[cfg(feature = "deploy")] {
1076 /// # use hydro_lang::prelude::*;
1077 /// # use futures::StreamExt;
1078 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1079 /// let tick = process.tick();
1080 /// // ticks are lazy by default, forces the second tick to run
1081 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1082 ///
1083 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1084 /// let b = tick.singleton(q!(false)); // false, false
1085 /// a.or(b).all_ticks()
1086 /// # }, |mut stream| async move {
1087 /// // [true, false]
1088 /// # for w in vec![true, false] {
1089 /// # assert_eq!(stream.next().await.unwrap(), w);
1090 /// # }
1091 /// # }));
1092 /// # }
1093 /// ```
1094 pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1095 where
1096 B: IsBounded,
1097 {
1098 self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1099 }
1100}
1101
1102impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1103where
1104 L: Location<'a> + NoTick,
1105{
1106 /// Returns a singleton value corresponding to the latest snapshot of the singleton
1107 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1108 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1109 /// all snapshots of this singleton into the atomic-associated tick will observe the
1110 /// same value each tick.
1111 ///
1112 /// # Non-Determinism
1113 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1114 /// the output singleton has a non-deterministic value since the snapshot can be at an
1115 /// arbitrary point in time.
1116 pub fn snapshot_atomic(
1117 self,
1118 tick: &Tick<L>,
1119 _nondet: NonDet,
1120 ) -> Singleton<T, Tick<L>, Bounded> {
1121 Singleton::new(
1122 tick.clone(),
1123 HydroNode::Batch {
1124 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1125 metadata: tick
1126 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1127 },
1128 )
1129 }
1130
1131 /// Returns this singleton back into a top-level, asynchronous execution context where updates
1132 /// to the value will be asynchronously propagated.
1133 pub fn end_atomic(self) -> Singleton<T, L, B> {
1134 Singleton::new(
1135 self.location.tick.l.clone(),
1136 HydroNode::EndAtomic {
1137 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1138 metadata: self
1139 .location
1140 .tick
1141 .l
1142 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1143 },
1144 )
1145 }
1146}
1147
1148impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1149where
1150 L: Location<'a>,
1151{
1152 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1153 /// will observe the same version of the value and will be executed synchronously before any
1154 /// outputs are yielded (in [`Optional::end_atomic`]).
1155 ///
1156 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1157 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1158 /// a different version).
1159 pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1160 let id = self.location.flow_state().borrow_mut().next_clock_id();
1161 let out_location = Atomic {
1162 tick: Tick {
1163 id,
1164 l: self.location.clone(),
1165 },
1166 };
1167 Singleton::new(
1168 out_location.clone(),
1169 HydroNode::BeginAtomic {
1170 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1171 metadata: out_location
1172 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1173 },
1174 )
1175 }
1176
1177 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1178 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1179 /// relevant data that contributed to the snapshot at tick `t`.
1180 ///
1181 /// # Non-Determinism
1182 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1183 /// the output singleton has a non-deterministic value since the snapshot can be at an
1184 /// arbitrary point in time.
1185 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
1186 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1187 Singleton::new(
1188 tick.clone(),
1189 HydroNode::Batch {
1190 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1191 metadata: tick
1192 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1193 },
1194 )
1195 }
1196
1197 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1198 /// with order corresponding to increasing prefixes of data contributing to the singleton.
1199 ///
1200 /// # Non-Determinism
1201 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1202 /// to non-deterministic batching and arrival of inputs, the output stream is
1203 /// non-deterministic.
1204 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1205 where
1206 L: NoTick,
1207 {
1208 sliced! {
1209 let snapshot = use(self, nondet);
1210 snapshot.into_stream()
1211 }
1212 .weaken_retries()
1213 }
1214
1215 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1216 /// value taken at various points in time. Because the input singleton may be
1217 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1218 /// represent the value of the singleton given some prefix of the streams leading up to
1219 /// it.
1220 ///
1221 /// # Non-Determinism
1222 /// The output stream is non-deterministic in which elements are sampled, since this
1223 /// is controlled by a clock.
1224 pub fn sample_every(
1225 self,
1226 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1227 nondet: NonDet,
1228 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1229 where
1230 L: NoTick + NoAtomic,
1231 {
1232 let samples = self.location.source_interval(interval);
1233 sliced! {
1234 let snapshot = use(self, nondet);
1235 let sample_batch = use(samples, nondet);
1236
1237 snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1238 }
1239 .weaken_retries()
1240 }
1241
1242 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1243 /// implies that `B == Bounded`.
1244 pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1245 where
1246 B: IsBounded,
1247 {
1248 Singleton::new(
1249 self.location.clone(),
1250 self.ir_node.replace(HydroNode::Placeholder),
1251 )
1252 }
1253
1254 #[expect(clippy::result_large_err, reason = "internal use only")]
1255 fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1256 if B::UnderlyingBound::BOUNDED {
1257 Ok(Singleton::new(
1258 self.location.clone(),
1259 self.ir_node.replace(HydroNode::Placeholder),
1260 ))
1261 } else {
1262 Err(self)
1263 }
1264 }
1265
1266 /// Clones this bounded singleton into a tick, returning a singleton that has the
1267 /// same value as the outer singleton. Because the outer singleton is bounded, this
1268 /// is deterministic because there is only a single immutable version.
1269 pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
1270 where
1271 B: IsBounded,
1272 T: Clone,
1273 {
1274 // TODO(shadaj): avoid printing simulator logs for this snapshot
1275 self.snapshot(
1276 tick,
1277 nondet!(/** bounded top-level singleton so deterministic */),
1278 )
1279 }
1280
1281 /// Converts this singleton into a [`Stream`] containing a single element, the value.
1282 ///
1283 /// # Example
1284 /// ```rust
1285 /// # #[cfg(feature = "deploy")] {
1286 /// # use hydro_lang::prelude::*;
1287 /// # use futures::StreamExt;
1288 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1289 /// let tick = process.tick();
1290 /// let batch_input = process
1291 /// .source_iter(q!(vec![123, 456]))
1292 /// .batch(&tick, nondet!(/** test */));
1293 /// batch_input.clone().chain(
1294 /// batch_input.count().into_stream()
1295 /// ).all_ticks()
1296 /// # }, |mut stream| async move {
1297 /// // [123, 456, 2]
1298 /// # for w in vec![123, 456, 2] {
1299 /// # assert_eq!(stream.next().await.unwrap(), w);
1300 /// # }
1301 /// # }));
1302 /// # }
1303 /// ```
1304 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1305 where
1306 B: IsBounded,
1307 {
1308 Stream::new(
1309 self.location.clone(),
1310 HydroNode::Cast {
1311 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1312 metadata: self.location.new_node_metadata(Stream::<
1313 T,
1314 Tick<L>,
1315 Bounded,
1316 TotalOrder,
1317 ExactlyOnce,
1318 >::collection_kind()),
1319 },
1320 )
1321 }
1322
1323 /// Resolves the singleton's [`Future`] value by blocking until it completes,
1324 /// producing a singleton of the resolved output.
1325 ///
1326 /// This is useful when the singleton contains an async computation that must
1327 /// be awaited before further processing. The future is polled to completion
1328 /// before the output value is emitted.
1329 ///
1330 /// # Example
1331 /// ```rust
1332 /// # #[cfg(feature = "deploy")] {
1333 /// # use hydro_lang::prelude::*;
1334 /// # use futures::StreamExt;
1335 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1336 /// let tick = process.tick();
1337 /// let singleton = tick.singleton(q!(5));
1338 /// singleton
1339 /// .map(q!(|v| async move { v * 2 }))
1340 /// .resolve_future_blocking()
1341 /// .all_ticks()
1342 /// # }, |mut stream| async move {
1343 /// // 10
1344 /// # assert_eq!(stream.next().await.unwrap(), 10);
1345 /// # }));
1346 /// # }
1347 /// ```
1348 pub fn resolve_future_blocking(
1349 self,
1350 ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1351 where
1352 T: Future,
1353 B: IsBounded,
1354 {
1355 Singleton::new(
1356 self.location.clone(),
1357 HydroNode::ResolveFuturesBlocking {
1358 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1359 metadata: self
1360 .location
1361 .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1362 },
1363 )
1364 }
1365}
1366
1367impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1368where
1369 L: Location<'a>,
1370{
1371 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1372 /// which will stream the value computed in _each_ tick as a separate stream element.
1373 ///
1374 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1375 /// producing one element in the output for each tick. This is useful for batched computations,
1376 /// where the results from each tick must be combined together.
1377 ///
1378 /// # Example
1379 /// ```rust
1380 /// # #[cfg(feature = "deploy")] {
1381 /// # use hydro_lang::prelude::*;
1382 /// # use futures::StreamExt;
1383 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1384 /// let tick = process.tick();
1385 /// # // ticks are lazy by default, forces the second tick to run
1386 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1387 /// # let batch_first_tick = process
1388 /// # .source_iter(q!(vec![1]))
1389 /// # .batch(&tick, nondet!(/** test */));
1390 /// # let batch_second_tick = process
1391 /// # .source_iter(q!(vec![1, 2, 3]))
1392 /// # .batch(&tick, nondet!(/** test */))
1393 /// # .defer_tick(); // appears on the second tick
1394 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1395 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1396 /// .count()
1397 /// .all_ticks()
1398 /// # }, |mut stream| async move {
1399 /// // [1, 3]
1400 /// # for w in vec![1, 3] {
1401 /// # assert_eq!(stream.next().await.unwrap(), w);
1402 /// # }
1403 /// # }));
1404 /// # }
1405 /// ```
1406 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1407 self.into_stream().all_ticks()
1408 }
1409
1410 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1411 /// which will stream the value computed in _each_ tick as a separate stream element.
1412 ///
1413 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1414 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1415 /// singleton's [`Tick`] context.
1416 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1417 self.into_stream().all_ticks_atomic()
1418 }
1419
1420 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1421 /// be asynchronously updated with the latest value of the singleton inside the tick.
1422 ///
1423 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1424 /// tick that tracks the inner value. This is useful for getting the value as of the
1425 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1426 ///
1427 /// # Example
1428 /// ```rust
1429 /// # #[cfg(feature = "deploy")] {
1430 /// # use hydro_lang::prelude::*;
1431 /// # use futures::StreamExt;
1432 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1433 /// let tick = process.tick();
1434 /// # // ticks are lazy by default, forces the second tick to run
1435 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1436 /// # let batch_first_tick = process
1437 /// # .source_iter(q!(vec![1]))
1438 /// # .batch(&tick, nondet!(/** test */));
1439 /// # let batch_second_tick = process
1440 /// # .source_iter(q!(vec![1, 2, 3]))
1441 /// # .batch(&tick, nondet!(/** test */))
1442 /// # .defer_tick(); // appears on the second tick
1443 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1444 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1445 /// .count()
1446 /// .latest()
1447 /// # .sample_eager(nondet!(/** test */))
1448 /// # }, |mut stream| async move {
1449 /// // asynchronously changes from 1 ~> 3
1450 /// # for w in vec![1, 3] {
1451 /// # assert_eq!(stream.next().await.unwrap(), w);
1452 /// # }
1453 /// # }));
1454 /// # }
1455 /// ```
1456 pub fn latest(self) -> Singleton<T, L, Unbounded> {
1457 Singleton::new(
1458 self.location.outer().clone(),
1459 HydroNode::YieldConcat {
1460 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1461 metadata: self
1462 .location
1463 .outer()
1464 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1465 },
1466 )
1467 }
1468
1469 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1470 /// be updated with the latest value of the singleton inside the tick.
1471 ///
1472 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1473 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1474 /// singleton's [`Tick`] context.
1475 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1476 let out_location = Atomic {
1477 tick: self.location.clone(),
1478 };
1479 Singleton::new(
1480 out_location.clone(),
1481 HydroNode::YieldConcat {
1482 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1483 metadata: out_location
1484 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1485 },
1486 )
1487 }
1488}
1489
1490#[doc(hidden)]
1491/// Helper trait that determines the output collection type for [`Singleton::zip`].
1492///
1493/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1494/// [`Singleton`].
1495#[sealed::sealed]
1496pub trait ZipResult<'a, Other> {
1497 /// The output collection type.
1498 type Out;
1499 /// The type of the tupled output value.
1500 type ElementType;
1501 /// The type of the other collection's value.
1502 type OtherType;
1503 /// The location where the tupled result will be materialized.
1504 type Location: Location<'a>;
1505
1506 /// The location of the second input to the `zip`.
1507 fn other_location(other: &Other) -> Self::Location;
1508 /// The IR node of the second input to the `zip`.
1509 fn other_ir_node(other: Other) -> HydroNode;
1510
1511 /// Constructs the output live collection given an IR node containing the zip result.
1512 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1513}
1514
1515#[sealed::sealed]
1516impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1517where
1518 L: Location<'a>,
1519{
1520 type Out = Singleton<(T, U), L, B>;
1521 type ElementType = (T, U);
1522 type OtherType = U;
1523 type Location = L;
1524
1525 fn other_location(other: &Singleton<U, L, B>) -> L {
1526 other.location.clone()
1527 }
1528
1529 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1530 other.ir_node.replace(HydroNode::Placeholder)
1531 }
1532
1533 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1534 Singleton::new(
1535 location.clone(),
1536 HydroNode::Cast {
1537 inner: Box::new(ir_node),
1538 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1539 },
1540 )
1541 }
1542}
1543
1544#[sealed::sealed]
1545impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1546 for Singleton<T, L, B>
1547where
1548 L: Location<'a>,
1549{
1550 type Out = Optional<(T, U), L, B::UnderlyingBound>;
1551 type ElementType = (T, U);
1552 type OtherType = U;
1553 type Location = L;
1554
1555 fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1556 other.location.clone()
1557 }
1558
1559 fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1560 other.ir_node.replace(HydroNode::Placeholder)
1561 }
1562
1563 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1564 Optional::new(location, ir_node)
1565 }
1566}
1567
1568#[cfg(test)]
1569mod tests {
1570 #[cfg(feature = "deploy")]
1571 use futures::{SinkExt, StreamExt};
1572 #[cfg(feature = "deploy")]
1573 use hydro_deploy::Deployment;
1574 #[cfg(any(feature = "deploy", feature = "sim"))]
1575 use stageleft::q;
1576
1577 #[cfg(any(feature = "deploy", feature = "sim"))]
1578 use crate::compile::builder::FlowBuilder;
1579 #[cfg(feature = "deploy")]
1580 use crate::live_collections::stream::ExactlyOnce;
1581 #[cfg(any(feature = "deploy", feature = "sim"))]
1582 use crate::location::Location;
1583 #[cfg(any(feature = "deploy", feature = "sim"))]
1584 use crate::nondet::nondet;
1585
1586 #[cfg(feature = "deploy")]
1587 #[tokio::test]
1588 async fn tick_cycle_cardinality() {
1589 let mut deployment = Deployment::new();
1590
1591 let mut flow = FlowBuilder::new();
1592 let node = flow.process::<()>();
1593 let external = flow.external::<()>();
1594
1595 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1596
1597 let node_tick = node.tick();
1598 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1599 let counts = singleton
1600 .clone()
1601 .into_stream()
1602 .count()
1603 .filter_if(
1604 input
1605 .batch(&node_tick, nondet!(/** testing */))
1606 .first()
1607 .is_some(),
1608 )
1609 .all_ticks()
1610 .send_bincode_external(&external);
1611 complete_cycle.complete_next_tick(singleton);
1612
1613 let nodes = flow
1614 .with_process(&node, deployment.Localhost())
1615 .with_external(&external, deployment.Localhost())
1616 .deploy(&mut deployment);
1617
1618 deployment.deploy().await.unwrap();
1619
1620 let mut tick_trigger = nodes.connect(input_send).await;
1621 let mut external_out = nodes.connect(counts).await;
1622
1623 deployment.start().await.unwrap();
1624
1625 tick_trigger.send(()).await.unwrap();
1626
1627 assert_eq!(external_out.next().await.unwrap(), 1);
1628
1629 tick_trigger.send(()).await.unwrap();
1630
1631 assert_eq!(external_out.next().await.unwrap(), 1);
1632 }
1633
1634 #[cfg(feature = "sim")]
1635 #[test]
1636 #[should_panic]
1637 fn sim_fold_intermediate_states() {
1638 let mut flow = FlowBuilder::new();
1639 let node = flow.process::<()>();
1640
1641 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1642 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1643
1644 let tick = node.tick();
1645 let batch = folded.snapshot(&tick, nondet!(/** test */));
1646 let out_recv = batch.all_ticks().sim_output();
1647
1648 flow.sim().exhaustive(async || {
1649 assert_eq!(out_recv.next().await.unwrap(), 10);
1650 });
1651 }
1652
1653 #[cfg(feature = "sim")]
1654 #[test]
1655 fn sim_fold_intermediate_state_count() {
1656 let mut flow = FlowBuilder::new();
1657 let node = flow.process::<()>();
1658
1659 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1660 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1661
1662 let tick = node.tick();
1663 let batch = folded.snapshot(&tick, nondet!(/** test */));
1664 let out_recv = batch.all_ticks().sim_output();
1665
1666 let instance_count = flow.sim().exhaustive(async || {
1667 let out = out_recv.collect::<Vec<_>>().await;
1668 assert_eq!(out.last(), Some(&10));
1669 });
1670
1671 assert_eq!(
1672 instance_count,
1673 16 // 2^4 possible subsets of intermediates (including initial state)
1674 )
1675 }
1676
1677 #[cfg(feature = "sim")]
1678 #[test]
1679 fn sim_fold_no_repeat_initial() {
1680 // check that we don't repeat the initial state of the fold in autonomous decisions
1681
1682 let mut flow = FlowBuilder::new();
1683 let node = flow.process::<()>();
1684
1685 let (in_port, input) = node.sim_input();
1686 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1687
1688 let tick = node.tick();
1689 let batch = folded.snapshot(&tick, nondet!(/** test */));
1690 let out_recv = batch.all_ticks().sim_output();
1691
1692 flow.sim().exhaustive(async || {
1693 assert_eq!(out_recv.next().await.unwrap(), 0);
1694
1695 in_port.send(123);
1696
1697 assert_eq!(out_recv.next().await.unwrap(), 123);
1698 });
1699 }
1700
1701 #[cfg(feature = "sim")]
1702 #[test]
1703 #[should_panic]
1704 fn sim_fold_repeats_snapshots() {
1705 // when the tick is driven by a snapshot AND something else, the snapshot can
1706 // "stutter" and repeat the same state multiple times
1707
1708 let mut flow = FlowBuilder::new();
1709 let node = flow.process::<()>();
1710
1711 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1712 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1713
1714 let tick = node.tick();
1715 let batch = source
1716 .batch(&tick, nondet!(/** test */))
1717 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1718 let out_recv = batch.all_ticks().sim_output();
1719
1720 flow.sim().exhaustive(async || {
1721 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1722 {
1723 panic!("repeated snapshot");
1724 }
1725 });
1726 }
1727
1728 #[cfg(feature = "sim")]
1729 #[test]
1730 fn sim_fold_repeats_snapshots_count() {
1731 // check the number of instances
1732 let mut flow = FlowBuilder::new();
1733 let node = flow.process::<()>();
1734
1735 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1736 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1737
1738 let tick = node.tick();
1739 let batch = source
1740 .batch(&tick, nondet!(/** test */))
1741 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1742 let out_recv = batch.all_ticks().sim_output();
1743
1744 let count = flow.sim().exhaustive(async || {
1745 let _ = out_recv.collect::<Vec<_>>().await;
1746 });
1747
1748 assert_eq!(count, 52);
1749 // don't have a combinatorial explanation for this number yet, but checked via logs
1750 }
1751
1752 #[cfg(feature = "sim")]
1753 #[test]
1754 fn sim_top_level_singleton_exhaustive() {
1755 // ensures that top-level singletons have only one snapshot
1756 let mut flow = FlowBuilder::new();
1757 let node = flow.process::<()>();
1758
1759 let singleton = node.singleton(q!(1));
1760 let tick = node.tick();
1761 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1762 let out_recv = batch.all_ticks().sim_output();
1763
1764 let count = flow.sim().exhaustive(async || {
1765 let _ = out_recv.collect::<Vec<_>>().await;
1766 });
1767
1768 assert_eq!(count, 1);
1769 }
1770
1771 #[cfg(feature = "sim")]
1772 #[test]
1773 fn sim_top_level_singleton_join_count() {
1774 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1775 // exploration
1776
1777 let mut flow = FlowBuilder::new();
1778 let node = flow.process::<()>();
1779
1780 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1781 let tick = node.tick();
1782 let batch = source_iter
1783 .batch(&tick, nondet!(/** test */))
1784 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1785 let out_recv = batch.all_ticks().sim_output();
1786
1787 let instance_count = flow.sim().exhaustive(async || {
1788 let _ = out_recv.collect::<Vec<_>>().await;
1789 });
1790
1791 assert_eq!(
1792 instance_count,
1793 16 // 2^4 ways to split up (including a possibly empty first batch)
1794 )
1795 }
1796
1797 #[cfg(feature = "sim")]
1798 #[test]
1799 fn top_level_singleton_into_stream_no_replay() {
1800 let mut flow = FlowBuilder::new();
1801 let node = flow.process::<()>();
1802
1803 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1804 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1805
1806 let out_recv = folded.into_stream().sim_output();
1807
1808 flow.sim().exhaustive(async || {
1809 out_recv.assert_yields_only([10]).await;
1810 });
1811 }
1812
1813 #[cfg(feature = "sim")]
1814 #[test]
1815 fn inside_tick_singleton_zip() {
1816 use crate::live_collections::Stream;
1817 use crate::live_collections::sliced::sliced;
1818
1819 let mut flow = FlowBuilder::new();
1820 let node = flow.process::<()>();
1821
1822 let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1823 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1824
1825 let out_recv = sliced! {
1826 let v = use(folded, nondet!(/** test */));
1827 v.clone().zip(v).into_stream()
1828 }
1829 .sim_output();
1830
1831 let count = flow.sim().exhaustive(async || {
1832 let out = out_recv.collect::<Vec<_>>().await;
1833 assert_eq!(out.last(), Some(&(3, 3)));
1834 });
1835
1836 assert_eq!(count, 4);
1837 }
1838}