hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19use crate::live_collections::singleton::SingletonBound;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick, NoAtomic};
23use crate::location::{Location, NoTick, Tick, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25use crate::prelude::KeyedSingleton;
26
27/// A *nullable* Rust value that can asynchronously change over time.
28///
29/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
30/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
31/// asynchronously change over time, including becoming present of uninhabited.
32///
33/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
34/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
35///
36/// Type Parameters:
37/// - `Type`: the type of the value in this optional (when it is not null)
38/// - `Loc`: the [`Location`] where the optional is materialized
39/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
40pub struct Optional<Type, Loc, Bound: Boundedness> {
41 pub(crate) location: Loc,
42 pub(crate) ir_node: RefCell<HydroNode>,
43 pub(crate) flow_state: FlowState,
44
45 _phantom: PhantomData<(Type, Loc, Bound)>,
46}
47
48impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
49 fn drop(&mut self) {
50 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
51 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
52 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
53 input: Box::new(ir_node),
54 op_metadata: HydroIrOpMetadata::new(),
55 });
56 }
57 }
58}
59
60impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
61where
62 T: Clone,
63 L: Location<'a> + NoTick,
64{
65 fn from(value: Optional<T, L, Bounded>) -> Self {
66 let tick = value.location().tick();
67 value.clone_into_tick(&tick).latest()
68 }
69}
70
71impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
72where
73 L: Location<'a>,
74{
75 fn defer_tick(self) -> Self {
76 Optional::defer_tick(self)
77 }
78}
79
80impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
81where
82 L: Location<'a>,
83{
84 type Location = Tick<L>;
85
86 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
87 Optional::new(
88 location.clone(),
89 HydroNode::CycleSource {
90 cycle_id,
91 metadata: location.new_node_metadata(Self::collection_kind()),
92 },
93 )
94 }
95}
96
97impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
98where
99 L: Location<'a>,
100{
101 type Location = Tick<L>;
102
103 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
104 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
105 location.clone(),
106 HydroNode::DeferTick {
107 input: Box::new(HydroNode::CycleSource {
108 cycle_id,
109 metadata: location.new_node_metadata(Self::collection_kind()),
110 }),
111 metadata: location
112 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
113 },
114 );
115
116 from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
117 }
118}
119
120impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
121where
122 L: Location<'a>,
123{
124 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
125 assert_eq!(
126 Location::id(&self.location),
127 expected_location,
128 "locations do not match"
129 );
130 self.location
131 .flow_state()
132 .borrow_mut()
133 .push_root(HydroRoot::CycleSink {
134 cycle_id,
135 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
136 op_metadata: HydroIrOpMetadata::new(),
137 });
138 }
139}
140
141impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
142where
143 L: Location<'a>,
144{
145 type Location = Tick<L>;
146
147 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
148 Optional::new(
149 location.clone(),
150 HydroNode::CycleSource {
151 cycle_id,
152 metadata: location.new_node_metadata(Self::collection_kind()),
153 },
154 )
155 }
156}
157
158impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
159where
160 L: Location<'a>,
161{
162 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
163 assert_eq!(
164 Location::id(&self.location),
165 expected_location,
166 "locations do not match"
167 );
168 self.location
169 .flow_state()
170 .borrow_mut()
171 .push_root(HydroRoot::CycleSink {
172 cycle_id,
173 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
174 op_metadata: HydroIrOpMetadata::new(),
175 });
176 }
177}
178
179impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
180where
181 L: Location<'a> + NoTick,
182{
183 type Location = L;
184
185 fn create_source(cycle_id: CycleId, location: L) -> Self {
186 Optional::new(
187 location.clone(),
188 HydroNode::CycleSource {
189 cycle_id,
190 metadata: location.new_node_metadata(Self::collection_kind()),
191 },
192 )
193 }
194}
195
196impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
197where
198 L: Location<'a> + NoTick,
199{
200 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
201 assert_eq!(
202 Location::id(&self.location),
203 expected_location,
204 "locations do not match"
205 );
206 self.location
207 .flow_state()
208 .borrow_mut()
209 .push_root(HydroRoot::CycleSink {
210 cycle_id,
211 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
212 op_metadata: HydroIrOpMetadata::new(),
213 });
214 }
215}
216
217impl<'a, T, L, B: SingletonBound> From<Singleton<T, L, B>> for Optional<T, L, B::UnderlyingBound>
218where
219 L: Location<'a>,
220{
221 fn from(singleton: Singleton<T, L, B>) -> Self {
222 Optional::new(
223 singleton.location.clone(),
224 HydroNode::Cast {
225 inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
226 metadata: singleton
227 .location
228 .new_node_metadata(Self::collection_kind()),
229 },
230 )
231 }
232}
233
234#[cfg(stageleft_runtime)]
235pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
236 me: Optional<T, L, B>,
237 other: Optional<O, L, B>,
238) -> Optional<(T, O), L, B> {
239 check_matching_location(&me.location, &other.location);
240
241 Optional::new(
242 me.location.clone(),
243 HydroNode::CrossSingleton {
244 left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
245 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
246 metadata: me
247 .location
248 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
249 },
250 )
251}
252
253#[cfg(stageleft_runtime)]
254fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
255 me: Optional<T, L, B>,
256 other: Optional<T, L, B>,
257) -> Optional<T, L, B> {
258 check_matching_location(&me.location, &other.location);
259
260 Optional::new(
261 me.location.clone(),
262 HydroNode::ChainFirst {
263 first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
264 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
265 metadata: me
266 .location
267 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
268 },
269 )
270}
271
272impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
273where
274 T: Clone,
275 L: Location<'a>,
276{
277 fn clone(&self) -> Self {
278 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
279 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
280 *self.ir_node.borrow_mut() = HydroNode::Tee {
281 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
282 metadata: self.location.new_node_metadata(Self::collection_kind()),
283 };
284 }
285
286 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
287 Optional {
288 location: self.location.clone(),
289 flow_state: self.flow_state.clone(),
290 ir_node: HydroNode::Tee {
291 inner: SharedNode(inner.0.clone()),
292 metadata: metadata.clone(),
293 }
294 .into(),
295 _phantom: PhantomData,
296 }
297 } else {
298 unreachable!()
299 }
300 }
301}
302
303impl<'a, T, L, B: Boundedness> Optional<T, L, B>
304where
305 L: Location<'a>,
306{
307 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
308 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
309 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
310 let flow_state = location.flow_state().clone();
311 Optional {
312 location,
313 flow_state,
314 ir_node: RefCell::new(ir_node),
315 _phantom: PhantomData,
316 }
317 }
318
319 pub(crate) fn collection_kind() -> CollectionKind {
320 CollectionKind::Optional {
321 bound: B::BOUND_KIND,
322 element_type: stageleft::quote_type::<T>().into(),
323 }
324 }
325
326 /// Returns the [`Location`] where this optional is being materialized.
327 pub fn location(&self) -> &L {
328 &self.location
329 }
330
331 /// Transforms the optional value by applying a function `f` to it,
332 /// continuously as the input is updated.
333 ///
334 /// Whenever the optional is empty, the output optional is also empty.
335 ///
336 /// # Example
337 /// ```rust
338 /// # #[cfg(feature = "deploy")] {
339 /// # use hydro_lang::prelude::*;
340 /// # use futures::StreamExt;
341 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
342 /// let tick = process.tick();
343 /// let optional = tick.optional_first_tick(q!(1));
344 /// optional.map(q!(|v| v + 1)).all_ticks()
345 /// # }, |mut stream| async move {
346 /// // 2
347 /// # assert_eq!(stream.next().await.unwrap(), 2);
348 /// # }));
349 /// # }
350 /// ```
351 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
352 where
353 F: Fn(T) -> U + 'a,
354 {
355 let f = f.splice_fn1_ctx(&self.location).into();
356 Optional::new(
357 self.location.clone(),
358 HydroNode::Map {
359 f,
360 singleton_refs: Vec::new(),
361 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
362 metadata: self
363 .location
364 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
365 },
366 )
367 }
368
369 /// Transforms the optional value by applying a function `f` to it and then flattening
370 /// the result into a stream, preserving the order of elements.
371 ///
372 /// If the optional is empty, the output stream is also empty. If the optional contains
373 /// a value, `f` is applied to produce an iterator, and all items from that iterator
374 /// are emitted in the output stream in deterministic order.
375 ///
376 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
377 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
378 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
379 ///
380 /// # Example
381 /// ```rust
382 /// # #[cfg(feature = "deploy")] {
383 /// # use hydro_lang::prelude::*;
384 /// # use futures::StreamExt;
385 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
386 /// let tick = process.tick();
387 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
388 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
389 /// # }, |mut stream| async move {
390 /// // 1, 2, 3
391 /// # for w in vec![1, 2, 3] {
392 /// # assert_eq!(stream.next().await.unwrap(), w);
393 /// # }
394 /// # }));
395 /// # }
396 /// ```
397 pub fn flat_map_ordered<U, I, F>(
398 self,
399 f: impl IntoQuotedMut<'a, F, L>,
400 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
401 where
402 B: IsBounded,
403 I: IntoIterator<Item = U>,
404 F: Fn(T) -> I + 'a,
405 {
406 self.into_stream().flat_map_ordered(f)
407 }
408
409 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
410 /// for the output type `I` to produce items in any order.
411 ///
412 /// If the optional is empty, the output stream is also empty. If the optional contains
413 /// a value, `f` is applied to produce an iterator, and all items from that iterator
414 /// are emitted in the output stream in non-deterministic order.
415 ///
416 /// # Example
417 /// ```rust
418 /// # #[cfg(feature = "deploy")] {
419 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
420 /// # use futures::StreamExt;
421 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
422 /// let tick = process.tick();
423 /// let optional = tick.optional_first_tick(q!(
424 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
425 /// ));
426 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
427 /// # }, |mut stream| async move {
428 /// // 1, 2, 3, but in no particular order
429 /// # let mut results = Vec::new();
430 /// # for _ in 0..3 {
431 /// # results.push(stream.next().await.unwrap());
432 /// # }
433 /// # results.sort();
434 /// # assert_eq!(results, vec![1, 2, 3]);
435 /// # }));
436 /// # }
437 /// ```
438 pub fn flat_map_unordered<U, I, F>(
439 self,
440 f: impl IntoQuotedMut<'a, F, L>,
441 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
442 where
443 B: IsBounded,
444 I: IntoIterator<Item = U>,
445 F: Fn(T) -> I + 'a,
446 {
447 self.into_stream().flat_map_unordered(f)
448 }
449
450 /// Flattens the optional value into a stream, preserving the order of elements.
451 ///
452 /// If the optional is empty, the output stream is also empty. If the optional contains
453 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
454 /// in the output stream in deterministic order.
455 ///
456 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
457 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
458 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
459 ///
460 /// # Example
461 /// ```rust
462 /// # #[cfg(feature = "deploy")] {
463 /// # use hydro_lang::prelude::*;
464 /// # use futures::StreamExt;
465 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
466 /// let tick = process.tick();
467 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
468 /// optional.flatten_ordered().all_ticks()
469 /// # }, |mut stream| async move {
470 /// // 1, 2, 3
471 /// # for w in vec![1, 2, 3] {
472 /// # assert_eq!(stream.next().await.unwrap(), w);
473 /// # }
474 /// # }));
475 /// # }
476 /// ```
477 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
478 where
479 B: IsBounded,
480 T: IntoIterator<Item = U>,
481 {
482 self.flat_map_ordered(q!(|v| v))
483 }
484
485 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
486 /// for the element type `T` to produce items in any order.
487 ///
488 /// If the optional is empty, the output stream is also empty. If the optional contains
489 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
490 /// in the output stream in non-deterministic order.
491 ///
492 /// # Example
493 /// ```rust
494 /// # #[cfg(feature = "deploy")] {
495 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
496 /// # use futures::StreamExt;
497 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
498 /// let tick = process.tick();
499 /// let optional = tick.optional_first_tick(q!(
500 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
501 /// ));
502 /// optional.flatten_unordered().all_ticks()
503 /// # }, |mut stream| async move {
504 /// // 1, 2, 3, but in no particular order
505 /// # let mut results = Vec::new();
506 /// # for _ in 0..3 {
507 /// # results.push(stream.next().await.unwrap());
508 /// # }
509 /// # results.sort();
510 /// # assert_eq!(results, vec![1, 2, 3]);
511 /// # }));
512 /// # }
513 /// ```
514 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
515 where
516 B: IsBounded,
517 T: IntoIterator<Item = U>,
518 {
519 self.flat_map_unordered(q!(|v| v))
520 }
521
522 /// Creates an optional containing only the value if it satisfies a predicate `f`.
523 ///
524 /// If the optional is empty, the output optional is also empty. If the optional contains
525 /// a value and the predicate returns `true`, the output optional contains the same value.
526 /// If the predicate returns `false`, the output optional is empty.
527 ///
528 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
529 /// not modify or take ownership of the value. If you need to modify the value while filtering
530 /// use [`Optional::filter_map`] instead.
531 ///
532 /// # Example
533 /// ```rust
534 /// # #[cfg(feature = "deploy")] {
535 /// # use hydro_lang::prelude::*;
536 /// # use futures::StreamExt;
537 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
538 /// let tick = process.tick();
539 /// let optional = tick.optional_first_tick(q!(5));
540 /// optional.filter(q!(|&x| x > 3)).all_ticks()
541 /// # }, |mut stream| async move {
542 /// // 5
543 /// # assert_eq!(stream.next().await.unwrap(), 5);
544 /// # }));
545 /// # }
546 /// ```
547 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
548 where
549 F: Fn(&T) -> bool + 'a,
550 {
551 let f = f.splice_fn1_borrow_ctx(&self.location).into();
552 Optional::new(
553 self.location.clone(),
554 HydroNode::Filter {
555 f,
556 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
557 metadata: self.location.new_node_metadata(Self::collection_kind()),
558 },
559 )
560 }
561
562 /// An operator that both filters and maps. It yields only the value if the supplied
563 /// closure `f` returns `Some(value)`.
564 ///
565 /// If the optional is empty, the output optional is also empty. If the optional contains
566 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
567 /// If the closure returns `None`, the output optional is empty.
568 ///
569 /// # Example
570 /// ```rust
571 /// # #[cfg(feature = "deploy")] {
572 /// # use hydro_lang::prelude::*;
573 /// # use futures::StreamExt;
574 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
575 /// let tick = process.tick();
576 /// let optional = tick.optional_first_tick(q!("42"));
577 /// optional
578 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
579 /// .all_ticks()
580 /// # }, |mut stream| async move {
581 /// // 42
582 /// # assert_eq!(stream.next().await.unwrap(), 42);
583 /// # }));
584 /// # }
585 /// ```
586 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
587 where
588 F: Fn(T) -> Option<U> + 'a,
589 {
590 let f = f.splice_fn1_ctx(&self.location).into();
591 Optional::new(
592 self.location.clone(),
593 HydroNode::FilterMap {
594 f,
595 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
596 metadata: self
597 .location
598 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
599 },
600 )
601 }
602
603 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
604 ///
605 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
606 /// non-null. This is useful for combining several pieces of state together.
607 ///
608 /// # Example
609 /// ```rust
610 /// # #[cfg(feature = "deploy")] {
611 /// # use hydro_lang::prelude::*;
612 /// # use futures::StreamExt;
613 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
614 /// let tick = process.tick();
615 /// let numbers = process
616 /// .source_iter(q!(vec![123, 456, 789]))
617 /// .batch(&tick, nondet!(/** test */));
618 /// let min = numbers.clone().min(); // Optional
619 /// let max = numbers.max(); // Optional
620 /// min.zip(max).all_ticks()
621 /// # }, |mut stream| async move {
622 /// // [(123, 789)]
623 /// # for w in vec![(123, 789)] {
624 /// # assert_eq!(stream.next().await.unwrap(), w);
625 /// # }
626 /// # }));
627 /// # }
628 /// ```
629 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
630 where
631 B: IsBounded,
632 {
633 let other: Optional<O, L, B> = other.into();
634 check_matching_location(&self.location, &other.location);
635
636 if L::is_top_level()
637 && let Some(tick) = self.location.try_tick()
638 {
639 let out = zip_inside_tick(
640 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
641 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
642 )
643 .latest();
644
645 Optional::new(
646 out.location.clone(),
647 out.ir_node.replace(HydroNode::Placeholder),
648 )
649 } else {
650 zip_inside_tick(self, other)
651 }
652 }
653
654 /// Passes through `self` when it has a value, otherwise passes through `other`.
655 ///
656 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
657 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
658 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
659 ///
660 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
661 /// of the inputs change (including to/from null states).
662 ///
663 /// # Example
664 /// ```rust
665 /// # #[cfg(feature = "deploy")] {
666 /// # use hydro_lang::prelude::*;
667 /// # use futures::StreamExt;
668 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
669 /// let tick = process.tick();
670 /// // ticks are lazy by default, forces the second tick to run
671 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
672 ///
673 /// let some_first_tick = tick.optional_first_tick(q!(123));
674 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
675 /// some_first_tick.or(some_second_tick).all_ticks()
676 /// # }, |mut stream| async move {
677 /// // [123 /* first tick */, 456 /* second tick */]
678 /// # for w in vec![123, 456] {
679 /// # assert_eq!(stream.next().await.unwrap(), w);
680 /// # }
681 /// # }));
682 /// # }
683 /// ```
684 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
685 check_matching_location(&self.location, &other.location);
686
687 if L::is_top_level()
688 && !B::BOUNDED // only if unbounded we need to use a tick
689 && let Some(tick) = self.location.try_tick()
690 {
691 let out = or_inside_tick(
692 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
693 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
694 )
695 .latest();
696
697 Optional::new(
698 out.location.clone(),
699 out.ir_node.replace(HydroNode::Placeholder),
700 )
701 } else {
702 Optional::new(
703 self.location.clone(),
704 HydroNode::ChainFirst {
705 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
706 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
707 metadata: self.location.new_node_metadata(Self::collection_kind()),
708 },
709 )
710 }
711 }
712
713 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
714 ///
715 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
716 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
717 ///
718 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
719 /// of the inputs change (including to/from null states).
720 ///
721 /// # Example
722 /// ```rust
723 /// # #[cfg(feature = "deploy")] {
724 /// # use hydro_lang::prelude::*;
725 /// # use futures::StreamExt;
726 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
727 /// let tick = process.tick();
728 /// // ticks are lazy by default, forces the later ticks to run
729 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
730 ///
731 /// let some_first_tick = tick.optional_first_tick(q!(123));
732 /// some_first_tick
733 /// .unwrap_or(tick.singleton(q!(456)))
734 /// .all_ticks()
735 /// # }, |mut stream| async move {
736 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
737 /// # for w in vec![123, 456, 456, 456] {
738 /// # assert_eq!(stream.next().await.unwrap(), w);
739 /// # }
740 /// # }));
741 /// # }
742 /// ```
743 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
744 let res_option = self.or(other.into());
745 Singleton::new(
746 res_option.location.clone(),
747 HydroNode::Cast {
748 inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
749 metadata: res_option
750 .location
751 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
752 },
753 )
754 }
755
756 /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
757 ///
758 /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
759 /// [`Optional`] when the default value of the type is a suitable fallback.
760 ///
761 /// # Example
762 /// ```rust
763 /// # #[cfg(feature = "deploy")] {
764 /// # use hydro_lang::prelude::*;
765 /// # use futures::StreamExt;
766 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
767 /// let tick = process.tick();
768 /// // ticks are lazy by default, forces the later ticks to run
769 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
770 ///
771 /// let some_first_tick = tick.optional_first_tick(q!(123i32));
772 /// some_first_tick.unwrap_or_default().all_ticks()
773 /// # }, |mut stream| async move {
774 /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
775 /// # for w in vec![123, 0, 0, 0] {
776 /// # assert_eq!(stream.next().await.unwrap(), w);
777 /// # }
778 /// # }));
779 /// # }
780 /// ```
781 pub fn unwrap_or_default(self) -> Singleton<T, L, B>
782 where
783 T: Default + Clone,
784 {
785 self.into_singleton().map(q!(|v| v.unwrap_or_default()))
786 }
787
788 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
789 ///
790 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
791 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
792 /// so that Hydro can skip any computation on null values.
793 ///
794 /// # Example
795 /// ```rust
796 /// # #[cfg(feature = "deploy")] {
797 /// # use hydro_lang::prelude::*;
798 /// # use futures::StreamExt;
799 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
800 /// let tick = process.tick();
801 /// // ticks are lazy by default, forces the later ticks to run
802 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
803 ///
804 /// let some_first_tick = tick.optional_first_tick(q!(123));
805 /// some_first_tick.into_singleton().all_ticks()
806 /// # }, |mut stream| async move {
807 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
808 /// # for w in vec![Some(123), None, None, None] {
809 /// # assert_eq!(stream.next().await.unwrap(), w);
810 /// # }
811 /// # }));
812 /// # }
813 /// ```
814 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
815 where
816 T: Clone,
817 {
818 let none: syn::Expr = parse_quote!(::std::option::Option::None);
819
820 let none_singleton = Singleton::new(
821 self.location.clone(),
822 HydroNode::SingletonSource {
823 value: none.into(),
824 first_tick_only: false,
825 metadata: self
826 .location
827 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
828 },
829 );
830
831 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
832 }
833
834 /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
835 ///
836 /// # Example
837 /// ```rust
838 /// # #[cfg(feature = "deploy")] {
839 /// # use hydro_lang::prelude::*;
840 /// # use futures::StreamExt;
841 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
842 /// let tick = process.tick();
843 /// // ticks are lazy by default, forces the second tick to run
844 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
845 ///
846 /// let some_first_tick = tick.optional_first_tick(q!(42));
847 /// some_first_tick.is_some().all_ticks()
848 /// # }, |mut stream| async move {
849 /// // [true /* first tick */, false /* second tick */, ...]
850 /// # for w in vec![true, false] {
851 /// # assert_eq!(stream.next().await.unwrap(), w);
852 /// # }
853 /// # }));
854 /// # }
855 /// ```
856 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
857 pub fn is_some(self) -> Singleton<bool, L, B> {
858 self.map(q!(|_| ()))
859 .into_singleton()
860 .map(q!(|o| o.is_some()))
861 }
862
863 /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
864 ///
865 /// # Example
866 /// ```rust
867 /// # #[cfg(feature = "deploy")] {
868 /// # use hydro_lang::prelude::*;
869 /// # use futures::StreamExt;
870 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
871 /// let tick = process.tick();
872 /// // ticks are lazy by default, forces the second tick to run
873 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
874 ///
875 /// let some_first_tick = tick.optional_first_tick(q!(42));
876 /// some_first_tick.is_none().all_ticks()
877 /// # }, |mut stream| async move {
878 /// // [false /* first tick */, true /* second tick */, ...]
879 /// # for w in vec![false, true] {
880 /// # assert_eq!(stream.next().await.unwrap(), w);
881 /// # }
882 /// # }));
883 /// # }
884 /// ```
885 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
886 pub fn is_none(self) -> Singleton<bool, L, B> {
887 self.map(q!(|_| ()))
888 .into_singleton()
889 .map(q!(|o| o.is_none()))
890 }
891
892 /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
893 /// values are equal, `false` otherwise (including when either is null).
894 ///
895 /// # Example
896 /// ```rust
897 /// # #[cfg(feature = "deploy")] {
898 /// # use hydro_lang::prelude::*;
899 /// # use futures::StreamExt;
900 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
901 /// let tick = process.tick();
902 /// // ticks are lazy by default, forces the second tick to run
903 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
904 ///
905 /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
906 /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
907 /// a.is_some_and_equals(b).all_ticks()
908 /// # }, |mut stream| async move {
909 /// // [true, false]
910 /// # for w in vec![true, false] {
911 /// # assert_eq!(stream.next().await.unwrap(), w);
912 /// # }
913 /// # }));
914 /// # }
915 /// ```
916 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
917 pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
918 where
919 T: PartialEq + Clone,
920 B: IsBounded,
921 {
922 self.into_singleton()
923 .zip(other.into_singleton())
924 .map(q!(|(a, b)| a.is_some() && a == b))
925 }
926
927 /// An operator which allows you to "name" a `HydroNode`.
928 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
929 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
930 {
931 let mut node = self.ir_node.borrow_mut();
932 let metadata = node.metadata_mut();
933 metadata.tag = Some(name.to_owned());
934 }
935 self
936 }
937
938 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
939 /// implies that `B == Bounded`.
940 pub fn make_bounded(self) -> Optional<T, L, Bounded>
941 where
942 B: IsBounded,
943 {
944 Optional::new(
945 self.location.clone(),
946 self.ir_node.replace(HydroNode::Placeholder),
947 )
948 }
949
950 /// Clones this bounded optional into a tick, returning a optional that has the
951 /// same value as the outer optional. Because the outer optional is bounded, this
952 /// is deterministic because there is only a single immutable version.
953 pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
954 where
955 B: IsBounded,
956 T: Clone,
957 {
958 // TODO(shadaj): avoid printing simulator logs for this snapshot
959 self.snapshot(
960 tick,
961 nondet!(/** bounded top-level optional so deterministic */),
962 )
963 }
964
965 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
966 /// non-null. Otherwise, the stream is empty.
967 ///
968 /// # Example
969 /// ```rust
970 /// # #[cfg(feature = "deploy")] {
971 /// # use hydro_lang::prelude::*;
972 /// # use futures::StreamExt;
973 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
974 /// # let tick = process.tick();
975 /// # // ticks are lazy by default, forces the second tick to run
976 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
977 /// # let batch_first_tick = process
978 /// # .source_iter(q!(vec![]))
979 /// # .batch(&tick, nondet!(/** test */));
980 /// # let batch_second_tick = process
981 /// # .source_iter(q!(vec![123, 456]))
982 /// # .batch(&tick, nondet!(/** test */))
983 /// # .defer_tick(); // appears on the second tick
984 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
985 /// input_batch // first tick: [], second tick: [123, 456]
986 /// .clone()
987 /// .max()
988 /// .into_stream()
989 /// .chain(input_batch)
990 /// .all_ticks()
991 /// # }, |mut stream| async move {
992 /// // [456, 123, 456]
993 /// # for w in vec![456, 123, 456] {
994 /// # assert_eq!(stream.next().await.unwrap(), w);
995 /// # }
996 /// # }));
997 /// # }
998 /// ```
999 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1000 where
1001 B: IsBounded,
1002 {
1003 Stream::new(
1004 self.location.clone(),
1005 HydroNode::Cast {
1006 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1007 metadata: self.location.new_node_metadata(Stream::<
1008 T,
1009 Tick<L>,
1010 Bounded,
1011 TotalOrder,
1012 ExactlyOnce,
1013 >::collection_kind()),
1014 },
1015 )
1016 }
1017
1018 /// Filters this optional, passing through the value if the boolean signal is `true`,
1019 /// otherwise the output is null.
1020 ///
1021 /// # Example
1022 /// ```rust
1023 /// # #[cfg(feature = "deploy")] {
1024 /// # use hydro_lang::prelude::*;
1025 /// # use futures::StreamExt;
1026 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1027 /// let tick = process.tick();
1028 /// // ticks are lazy by default, forces the second tick to run
1029 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1030 ///
1031 /// let some_first_tick = tick.optional_first_tick(q!(()));
1032 /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1033 /// let batch_first_tick = process
1034 /// .source_iter(q!(vec![456]))
1035 /// .batch(&tick, nondet!(/** test */));
1036 /// let batch_second_tick = process
1037 /// .source_iter(q!(vec![789]))
1038 /// .batch(&tick, nondet!(/** test */))
1039 /// .defer_tick();
1040 /// batch_first_tick.chain(batch_second_tick).first()
1041 /// .filter_if(signal)
1042 /// .unwrap_or(tick.singleton(q!(0)))
1043 /// .all_ticks()
1044 /// # }, |mut stream| async move {
1045 /// // [456, 0]
1046 /// # for w in vec![456, 0] {
1047 /// # assert_eq!(stream.next().await.unwrap(), w);
1048 /// # }
1049 /// # }));
1050 /// # }
1051 /// ```
1052 pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1053 where
1054 B: IsBounded,
1055 {
1056 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1057 }
1058
1059 /// Filters this optional, passing through the optional value if it is non-null **and** the
1060 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1061 ///
1062 /// Useful for conditionally processing, such as only emitting an optional's value outside
1063 /// a tick if some other condition is satisfied.
1064 ///
1065 /// # Example
1066 /// ```rust
1067 /// # #[cfg(feature = "deploy")] {
1068 /// # use hydro_lang::prelude::*;
1069 /// # use futures::StreamExt;
1070 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1071 /// let tick = process.tick();
1072 /// // ticks are lazy by default, forces the second tick to run
1073 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1074 ///
1075 /// let batch_first_tick = process
1076 /// .source_iter(q!(vec![]))
1077 /// .batch(&tick, nondet!(/** test */));
1078 /// let batch_second_tick = process
1079 /// .source_iter(q!(vec![456]))
1080 /// .batch(&tick, nondet!(/** test */))
1081 /// .defer_tick(); // appears on the second tick
1082 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1083 /// batch_first_tick.chain(batch_second_tick).first()
1084 /// .filter_if_some(some_on_first_tick)
1085 /// .unwrap_or(tick.singleton(q!(789)))
1086 /// .all_ticks()
1087 /// # }, |mut stream| async move {
1088 /// // [789, 789]
1089 /// # for w in vec![789, 789] {
1090 /// # assert_eq!(stream.next().await.unwrap(), w);
1091 /// # }
1092 /// # }));
1093 /// # }
1094 /// ```
1095 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1096 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1097 where
1098 B: IsBounded,
1099 {
1100 self.filter_if(signal.is_some())
1101 }
1102
1103 /// Filters this optional, passing through the optional value if it is non-null **and** the
1104 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1105 ///
1106 /// Useful for conditionally processing, such as only emitting an optional's value outside
1107 /// a tick if some other condition is satisfied.
1108 ///
1109 /// # Example
1110 /// ```rust
1111 /// # #[cfg(feature = "deploy")] {
1112 /// # use hydro_lang::prelude::*;
1113 /// # use futures::StreamExt;
1114 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1115 /// let tick = process.tick();
1116 /// // ticks are lazy by default, forces the second tick to run
1117 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1118 ///
1119 /// let batch_first_tick = process
1120 /// .source_iter(q!(vec![]))
1121 /// .batch(&tick, nondet!(/** test */));
1122 /// let batch_second_tick = process
1123 /// .source_iter(q!(vec![456]))
1124 /// .batch(&tick, nondet!(/** test */))
1125 /// .defer_tick(); // appears on the second tick
1126 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1127 /// batch_first_tick.chain(batch_second_tick).first()
1128 /// .filter_if_none(some_on_first_tick)
1129 /// .unwrap_or(tick.singleton(q!(789)))
1130 /// .all_ticks()
1131 /// # }, |mut stream| async move {
1132 /// // [789, 789]
1133 /// # for w in vec![789, 456] {
1134 /// # assert_eq!(stream.next().await.unwrap(), w);
1135 /// # }
1136 /// # }));
1137 /// # }
1138 /// ```
1139 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1140 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1141 where
1142 B: IsBounded,
1143 {
1144 self.filter_if(other.is_none())
1145 }
1146
1147 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1148 ///
1149 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1150 /// having a value, such as only releasing a piece of state if the node is the leader.
1151 ///
1152 /// # Example
1153 /// ```rust
1154 /// # #[cfg(feature = "deploy")] {
1155 /// # use hydro_lang::prelude::*;
1156 /// # use futures::StreamExt;
1157 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1158 /// let tick = process.tick();
1159 /// // ticks are lazy by default, forces the second tick to run
1160 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1161 ///
1162 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1163 /// some_on_first_tick
1164 /// .if_some_then(tick.singleton(q!(456)))
1165 /// .unwrap_or(tick.singleton(q!(123)))
1166 /// # .all_ticks()
1167 /// # }, |mut stream| async move {
1168 /// // 456 (first tick) ~> 123 (second tick onwards)
1169 /// # for w in vec![456, 123, 123] {
1170 /// # assert_eq!(stream.next().await.unwrap(), w);
1171 /// # }
1172 /// # }));
1173 /// # }
1174 /// ```
1175 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1176 pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1177 where
1178 B: IsBounded,
1179 {
1180 value.filter_if(self.is_some())
1181 }
1182}
1183
1184impl<'a, K, V, L, B: Boundedness> Optional<(K, V), L, B>
1185where
1186 L: Location<'a>,
1187{
1188 /// Converts this optional into a [`KeyedSingleton`] containing a single entry with the
1189 /// key-value pair of this [`Optional`].
1190 ///
1191 /// If this [`Optional`] is [`Bounded`], the [`KeyedSingleton`] will be [`Bounded`] as well
1192 /// if it is [`Unbounded`], the [`KeyedSingleton`] will be [`Unbounded`], which means that
1193 /// the entry will be updated and appear / disappear according to the state of the
1194 /// [`Optional`].
1195 pub fn into_keyed_singleton(self) -> KeyedSingleton<K, V, L, B> {
1196 KeyedSingleton::new(
1197 self.location.clone(),
1198 HydroNode::Cast {
1199 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1200 metadata: self
1201 .location
1202 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1203 },
1204 )
1205 }
1206}
1207
1208impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1209where
1210 L: Location<'a> + NoTick,
1211{
1212 /// Returns an optional value corresponding to the latest snapshot of the optional
1213 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1214 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1215 /// all snapshots of this optional into the atomic-associated tick will observe the
1216 /// same value each tick.
1217 ///
1218 /// # Non-Determinism
1219 /// Because this picks a snapshot of a optional whose value is continuously changing,
1220 /// the output optional has a non-deterministic value since the snapshot can be at an
1221 /// arbitrary point in time.
1222 pub fn snapshot_atomic(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1223 Optional::new(
1224 tick.clone(),
1225 HydroNode::Batch {
1226 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1227 metadata: tick
1228 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1229 },
1230 )
1231 }
1232
1233 /// Returns this optional back into a top-level, asynchronous execution context where updates
1234 /// to the value will be asynchronously propagated.
1235 pub fn end_atomic(self) -> Optional<T, L, B> {
1236 Optional::new(
1237 self.location.tick.l.clone(),
1238 HydroNode::EndAtomic {
1239 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1240 metadata: self
1241 .location
1242 .tick
1243 .l
1244 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1245 },
1246 )
1247 }
1248}
1249
1250impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1251where
1252 L: Location<'a>,
1253{
1254 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1255 /// will observe the same version of the value and will be executed synchronously before any
1256 /// outputs are yielded (in [`Optional::end_atomic`]).
1257 ///
1258 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1259 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1260 /// a different version).
1261 pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1262 let id = self.location.flow_state().borrow_mut().next_clock_id();
1263 let out_location = Atomic {
1264 tick: Tick {
1265 id,
1266 l: self.location.clone(),
1267 },
1268 };
1269 Optional::new(
1270 out_location.clone(),
1271 HydroNode::BeginAtomic {
1272 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1273 metadata: out_location
1274 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1275 },
1276 )
1277 }
1278
1279 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1280 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1281 /// relevant data that contributed to the snapshot at tick `t`.
1282 ///
1283 /// # Non-Determinism
1284 /// Because this picks a snapshot of a optional whose value is continuously changing,
1285 /// the output optional has a non-deterministic value since the snapshot can be at an
1286 /// arbitrary point in time.
1287 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1288 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1289 Optional::new(
1290 tick.clone(),
1291 HydroNode::Batch {
1292 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1293 metadata: tick
1294 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1295 },
1296 )
1297 }
1298
1299 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1300 /// with order corresponding to increasing prefixes of data contributing to the optional.
1301 ///
1302 /// # Non-Determinism
1303 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1304 /// to non-deterministic batching and arrival of inputs, the output stream is
1305 /// non-deterministic.
1306 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1307 where
1308 L: NoTick,
1309 {
1310 let tick = self.location.tick();
1311 self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1312 }
1313
1314 /// Given a time interval, returns a stream corresponding to snapshots of the optional
1315 /// value taken at various points in time. Because the input optional may be
1316 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1317 /// represent the value of the optional given some prefix of the streams leading up to
1318 /// it.
1319 ///
1320 /// # Non-Determinism
1321 /// The output stream is non-deterministic in which elements are sampled, since this
1322 /// is controlled by a clock.
1323 pub fn sample_every(
1324 self,
1325 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1326 nondet: NonDet,
1327 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1328 where
1329 L: NoTick + NoAtomic,
1330 {
1331 let samples = self.location.source_interval(interval);
1332 let tick = self.location.tick();
1333
1334 self.snapshot(&tick, nondet)
1335 .filter_if(samples.batch(&tick, nondet).first().is_some())
1336 .all_ticks()
1337 .weaken_retries()
1338 }
1339}
1340
1341impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1342where
1343 L: Location<'a>,
1344{
1345 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1346 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1347 /// null values).
1348 ///
1349 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1350 /// producing one element in the output for each (non-null) tick. This is useful for batched
1351 /// computations, where the results from each tick must be combined together.
1352 ///
1353 /// # Example
1354 /// ```rust
1355 /// # #[cfg(feature = "deploy")] {
1356 /// # use hydro_lang::prelude::*;
1357 /// # use futures::StreamExt;
1358 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1359 /// # let tick = process.tick();
1360 /// # // ticks are lazy by default, forces the second tick to run
1361 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1362 /// # let batch_first_tick = process
1363 /// # .source_iter(q!(vec![]))
1364 /// # .batch(&tick, nondet!(/** test */));
1365 /// # let batch_second_tick = process
1366 /// # .source_iter(q!(vec![1, 2, 3]))
1367 /// # .batch(&tick, nondet!(/** test */))
1368 /// # .defer_tick(); // appears on the second tick
1369 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1370 /// input_batch // first tick: [], second tick: [1, 2, 3]
1371 /// .max()
1372 /// .all_ticks()
1373 /// # }, |mut stream| async move {
1374 /// // [3]
1375 /// # for w in vec![3] {
1376 /// # assert_eq!(stream.next().await.unwrap(), w);
1377 /// # }
1378 /// # }));
1379 /// # }
1380 /// ```
1381 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1382 self.into_stream().all_ticks()
1383 }
1384
1385 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1386 /// which will stream the value computed in _each_ tick as a separate stream element.
1387 ///
1388 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1389 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1390 /// optional's [`Tick`] context.
1391 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1392 self.into_stream().all_ticks_atomic()
1393 }
1394
1395 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1396 /// be asynchronously updated with the latest value of the optional inside the tick, including
1397 /// whether the optional is null or not.
1398 ///
1399 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1400 /// tick that tracks the inner value. This is useful for getting the value as of the
1401 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1402 ///
1403 /// # Example
1404 /// ```rust
1405 /// # #[cfg(feature = "deploy")] {
1406 /// # use hydro_lang::prelude::*;
1407 /// # use futures::StreamExt;
1408 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1409 /// # let tick = process.tick();
1410 /// # // ticks are lazy by default, forces the second tick to run
1411 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1412 /// # let batch_first_tick = process
1413 /// # .source_iter(q!(vec![]))
1414 /// # .batch(&tick, nondet!(/** test */));
1415 /// # let batch_second_tick = process
1416 /// # .source_iter(q!(vec![1, 2, 3]))
1417 /// # .batch(&tick, nondet!(/** test */))
1418 /// # .defer_tick(); // appears on the second tick
1419 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1420 /// input_batch // first tick: [], second tick: [1, 2, 3]
1421 /// .max()
1422 /// .latest()
1423 /// # .into_singleton()
1424 /// # .sample_eager(nondet!(/** test */))
1425 /// # }, |mut stream| async move {
1426 /// // asynchronously changes from None ~> 3
1427 /// # for w in vec![None, Some(3)] {
1428 /// # assert_eq!(stream.next().await.unwrap(), w);
1429 /// # }
1430 /// # }));
1431 /// # }
1432 /// ```
1433 pub fn latest(self) -> Optional<T, L, Unbounded> {
1434 Optional::new(
1435 self.location.outer().clone(),
1436 HydroNode::YieldConcat {
1437 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1438 metadata: self
1439 .location
1440 .outer()
1441 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1442 },
1443 )
1444 }
1445
1446 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1447 /// be updated with the latest value of the optional inside the tick.
1448 ///
1449 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1450 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1451 /// optional's [`Tick`] context.
1452 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1453 let out_location = Atomic {
1454 tick: self.location.clone(),
1455 };
1456
1457 Optional::new(
1458 out_location.clone(),
1459 HydroNode::YieldConcat {
1460 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1461 metadata: out_location
1462 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1463 },
1464 )
1465 }
1466
1467 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1468 /// always has the state of `self` at tick `T - 1`.
1469 ///
1470 /// At tick `0`, the output optional is null, since there is no previous tick.
1471 ///
1472 /// This operator enables stateful iterative processing with ticks, by sending data from one
1473 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1474 ///
1475 /// # Example
1476 /// ```rust
1477 /// # #[cfg(feature = "deploy")] {
1478 /// # use hydro_lang::prelude::*;
1479 /// # use futures::StreamExt;
1480 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1481 /// let tick = process.tick();
1482 /// // ticks are lazy by default, forces the second tick to run
1483 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1484 ///
1485 /// let batch_first_tick = process
1486 /// .source_iter(q!(vec![1, 2]))
1487 /// .batch(&tick, nondet!(/** test */));
1488 /// let batch_second_tick = process
1489 /// .source_iter(q!(vec![3, 4]))
1490 /// .batch(&tick, nondet!(/** test */))
1491 /// .defer_tick(); // appears on the second tick
1492 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1493 /// .reduce(q!(|state, v| *state += v));
1494 ///
1495 /// current_tick_sum.clone().into_singleton().zip(
1496 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1497 /// ).all_ticks()
1498 /// # }, |mut stream| async move {
1499 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1500 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1501 /// # assert_eq!(stream.next().await.unwrap(), w);
1502 /// # }
1503 /// # }));
1504 /// # }
1505 /// ```
1506 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1507 Optional::new(
1508 self.location.clone(),
1509 HydroNode::DeferTick {
1510 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1511 metadata: self.location.new_node_metadata(Self::collection_kind()),
1512 },
1513 )
1514 }
1515}
1516
1517#[cfg(test)]
1518mod tests {
1519 #[cfg(feature = "deploy")]
1520 use futures::StreamExt;
1521 #[cfg(feature = "deploy")]
1522 use hydro_deploy::Deployment;
1523 #[cfg(any(feature = "deploy", feature = "sim"))]
1524 use stageleft::q;
1525
1526 #[cfg(feature = "deploy")]
1527 use super::Optional;
1528 #[cfg(any(feature = "deploy", feature = "sim"))]
1529 use crate::compile::builder::FlowBuilder;
1530 #[cfg(any(feature = "deploy", feature = "sim"))]
1531 use crate::location::Location;
1532 #[cfg(feature = "deploy")]
1533 use crate::nondet::nondet;
1534
1535 #[cfg(feature = "deploy")]
1536 #[tokio::test]
1537 async fn optional_or_cardinality() {
1538 let mut deployment = Deployment::new();
1539
1540 let mut flow = FlowBuilder::new();
1541 let node = flow.process::<()>();
1542 let external = flow.external::<()>();
1543
1544 let node_tick = node.tick();
1545 let tick_singleton = node_tick.singleton(q!(123));
1546 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1547 let counts = tick_optional_inhabited
1548 .clone()
1549 .or(tick_optional_inhabited)
1550 .into_stream()
1551 .count()
1552 .all_ticks()
1553 .send_bincode_external(&external);
1554
1555 let nodes = flow
1556 .with_process(&node, deployment.Localhost())
1557 .with_external(&external, deployment.Localhost())
1558 .deploy(&mut deployment);
1559
1560 deployment.deploy().await.unwrap();
1561
1562 let mut external_out = nodes.connect(counts).await;
1563
1564 deployment.start().await.unwrap();
1565
1566 assert_eq!(external_out.next().await.unwrap(), 1);
1567 }
1568
1569 #[cfg(feature = "deploy")]
1570 #[tokio::test]
1571 async fn into_singleton_top_level_none_cardinality() {
1572 let mut deployment = Deployment::new();
1573
1574 let mut flow = FlowBuilder::new();
1575 let node = flow.process::<()>();
1576 let external = flow.external::<()>();
1577
1578 let node_tick = node.tick();
1579 let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1580 let into_singleton = top_level_none.into_singleton();
1581
1582 let tick_driver = node.spin();
1583
1584 let counts = into_singleton
1585 .snapshot(&node_tick, nondet!(/** test */))
1586 .into_stream()
1587 .count()
1588 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1589 .map(q!(|(c, _)| c))
1590 .all_ticks()
1591 .send_bincode_external(&external);
1592
1593 let nodes = flow
1594 .with_process(&node, deployment.Localhost())
1595 .with_external(&external, deployment.Localhost())
1596 .deploy(&mut deployment);
1597
1598 deployment.deploy().await.unwrap();
1599
1600 let mut external_out = nodes.connect(counts).await;
1601
1602 deployment.start().await.unwrap();
1603
1604 assert_eq!(external_out.next().await.unwrap(), 1);
1605 assert_eq!(external_out.next().await.unwrap(), 1);
1606 assert_eq!(external_out.next().await.unwrap(), 1);
1607 }
1608
1609 #[cfg(feature = "deploy")]
1610 #[tokio::test]
1611 async fn into_singleton_unbounded_top_level_none_cardinality() {
1612 let mut deployment = Deployment::new();
1613
1614 let mut flow = FlowBuilder::new();
1615 let node = flow.process::<()>();
1616 let external = flow.external::<()>();
1617
1618 let node_tick = node.tick();
1619 let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1620 let into_singleton = top_level_none.into_singleton();
1621
1622 let tick_driver = node.spin();
1623
1624 let counts = into_singleton
1625 .snapshot(&node_tick, nondet!(/** test */))
1626 .into_stream()
1627 .count()
1628 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1629 .map(q!(|(c, _)| c))
1630 .all_ticks()
1631 .send_bincode_external(&external);
1632
1633 let nodes = flow
1634 .with_process(&node, deployment.Localhost())
1635 .with_external(&external, deployment.Localhost())
1636 .deploy(&mut deployment);
1637
1638 deployment.deploy().await.unwrap();
1639
1640 let mut external_out = nodes.connect(counts).await;
1641
1642 deployment.start().await.unwrap();
1643
1644 assert_eq!(external_out.next().await.unwrap(), 1);
1645 assert_eq!(external_out.next().await.unwrap(), 1);
1646 assert_eq!(external_out.next().await.unwrap(), 1);
1647 }
1648
1649 #[cfg(feature = "sim")]
1650 #[test]
1651 fn top_level_optional_some_into_stream_no_replay() {
1652 let mut flow = FlowBuilder::new();
1653 let node = flow.process::<()>();
1654
1655 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1656 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1657 let filtered_some = folded.filter(q!(|_| true));
1658
1659 let out_recv = filtered_some.into_stream().sim_output();
1660
1661 flow.sim().exhaustive(async || {
1662 out_recv.assert_yields_only([10]).await;
1663 });
1664 }
1665
1666 #[cfg(feature = "sim")]
1667 #[test]
1668 fn top_level_optional_none_into_stream_no_replay() {
1669 let mut flow = FlowBuilder::new();
1670 let node = flow.process::<()>();
1671
1672 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1673 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1674 let filtered_none = folded.filter(q!(|_| false));
1675
1676 let out_recv = filtered_none.into_stream().sim_output();
1677
1678 flow.sim().exhaustive(async || {
1679 out_recv.assert_yields_only([] as [i32; 0]).await;
1680 });
1681 }
1682}