Skip to main content

hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19use crate::live_collections::singleton::SingletonBound;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick, NoAtomic};
23use crate::location::{Location, NoTick, Tick, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25use crate::prelude::KeyedSingleton;
26
27/// A *nullable* Rust value that can asynchronously change over time.
28///
29/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
30/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
31/// asynchronously change over time, including becoming present of uninhabited.
32///
33/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
34/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
35///
36/// Type Parameters:
37/// - `Type`: the type of the value in this optional (when it is not null)
38/// - `Loc`: the [`Location`] where the optional is materialized
39/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
40pub struct Optional<Type, Loc, Bound: Boundedness> {
41    pub(crate) location: Loc,
42    pub(crate) ir_node: RefCell<HydroNode>,
43    pub(crate) flow_state: FlowState,
44
45    _phantom: PhantomData<(Type, Loc, Bound)>,
46}
47
48impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
49    fn drop(&mut self) {
50        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
51        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
52            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
53                input: Box::new(ir_node),
54                op_metadata: HydroIrOpMetadata::new(),
55            });
56        }
57    }
58}
59
60impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
61where
62    T: Clone,
63    L: Location<'a> + NoTick,
64{
65    fn from(value: Optional<T, L, Bounded>) -> Self {
66        let tick = value.location().tick();
67        value.clone_into_tick(&tick).latest()
68    }
69}
70
71impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
72where
73    L: Location<'a>,
74{
75    fn defer_tick(self) -> Self {
76        Optional::defer_tick(self)
77    }
78}
79
80impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
81where
82    L: Location<'a>,
83{
84    type Location = Tick<L>;
85
86    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
87        Optional::new(
88            location.clone(),
89            HydroNode::CycleSource {
90                cycle_id,
91                metadata: location.new_node_metadata(Self::collection_kind()),
92            },
93        )
94    }
95}
96
97impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
98where
99    L: Location<'a>,
100{
101    type Location = Tick<L>;
102
103    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
104        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
105            location.clone(),
106            HydroNode::DeferTick {
107                input: Box::new(HydroNode::CycleSource {
108                    cycle_id,
109                    metadata: location.new_node_metadata(Self::collection_kind()),
110                }),
111                metadata: location
112                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
113            },
114        );
115
116        from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
117    }
118}
119
120impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
121where
122    L: Location<'a>,
123{
124    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
125        assert_eq!(
126            Location::id(&self.location),
127            expected_location,
128            "locations do not match"
129        );
130        self.location
131            .flow_state()
132            .borrow_mut()
133            .push_root(HydroRoot::CycleSink {
134                cycle_id,
135                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
136                op_metadata: HydroIrOpMetadata::new(),
137            });
138    }
139}
140
141impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
142where
143    L: Location<'a>,
144{
145    type Location = Tick<L>;
146
147    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
148        Optional::new(
149            location.clone(),
150            HydroNode::CycleSource {
151                cycle_id,
152                metadata: location.new_node_metadata(Self::collection_kind()),
153            },
154        )
155    }
156}
157
158impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
159where
160    L: Location<'a>,
161{
162    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
163        assert_eq!(
164            Location::id(&self.location),
165            expected_location,
166            "locations do not match"
167        );
168        self.location
169            .flow_state()
170            .borrow_mut()
171            .push_root(HydroRoot::CycleSink {
172                cycle_id,
173                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
174                op_metadata: HydroIrOpMetadata::new(),
175            });
176    }
177}
178
179impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
180where
181    L: Location<'a> + NoTick,
182{
183    type Location = L;
184
185    fn create_source(cycle_id: CycleId, location: L) -> Self {
186        Optional::new(
187            location.clone(),
188            HydroNode::CycleSource {
189                cycle_id,
190                metadata: location.new_node_metadata(Self::collection_kind()),
191            },
192        )
193    }
194}
195
196impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
197where
198    L: Location<'a> + NoTick,
199{
200    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
201        assert_eq!(
202            Location::id(&self.location),
203            expected_location,
204            "locations do not match"
205        );
206        self.location
207            .flow_state()
208            .borrow_mut()
209            .push_root(HydroRoot::CycleSink {
210                cycle_id,
211                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
212                op_metadata: HydroIrOpMetadata::new(),
213            });
214    }
215}
216
217impl<'a, T, L, B: SingletonBound> From<Singleton<T, L, B>> for Optional<T, L, B::UnderlyingBound>
218where
219    L: Location<'a>,
220{
221    fn from(singleton: Singleton<T, L, B>) -> Self {
222        Optional::new(
223            singleton.location.clone(),
224            HydroNode::Cast {
225                inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
226                metadata: singleton
227                    .location
228                    .new_node_metadata(Self::collection_kind()),
229            },
230        )
231    }
232}
233
234#[cfg(stageleft_runtime)]
235pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
236    me: Optional<T, L, B>,
237    other: Optional<O, L, B>,
238) -> Optional<(T, O), L, B> {
239    check_matching_location(&me.location, &other.location);
240
241    Optional::new(
242        me.location.clone(),
243        HydroNode::CrossSingleton {
244            left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
245            right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
246            metadata: me
247                .location
248                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
249        },
250    )
251}
252
253#[cfg(stageleft_runtime)]
254fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
255    me: Optional<T, L, B>,
256    other: Optional<T, L, B>,
257) -> Optional<T, L, B> {
258    check_matching_location(&me.location, &other.location);
259
260    Optional::new(
261        me.location.clone(),
262        HydroNode::ChainFirst {
263            first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
264            second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
265            metadata: me
266                .location
267                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
268        },
269    )
270}
271
272impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
273where
274    T: Clone,
275    L: Location<'a>,
276{
277    fn clone(&self) -> Self {
278        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
279            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
280            *self.ir_node.borrow_mut() = HydroNode::Tee {
281                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
282                metadata: self.location.new_node_metadata(Self::collection_kind()),
283            };
284        }
285
286        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
287            Optional {
288                location: self.location.clone(),
289                flow_state: self.flow_state.clone(),
290                ir_node: HydroNode::Tee {
291                    inner: SharedNode(inner.0.clone()),
292                    metadata: metadata.clone(),
293                }
294                .into(),
295                _phantom: PhantomData,
296            }
297        } else {
298            unreachable!()
299        }
300    }
301}
302
303impl<'a, T, L, B: Boundedness> Optional<T, L, B>
304where
305    L: Location<'a>,
306{
307    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
308        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
309        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
310        let flow_state = location.flow_state().clone();
311        Optional {
312            location,
313            flow_state,
314            ir_node: RefCell::new(ir_node),
315            _phantom: PhantomData,
316        }
317    }
318
319    pub(crate) fn collection_kind() -> CollectionKind {
320        CollectionKind::Optional {
321            bound: B::BOUND_KIND,
322            element_type: stageleft::quote_type::<T>().into(),
323        }
324    }
325
326    /// Returns the [`Location`] where this optional is being materialized.
327    pub fn location(&self) -> &L {
328        &self.location
329    }
330
331    /// Transforms the optional value by applying a function `f` to it,
332    /// continuously as the input is updated.
333    ///
334    /// Whenever the optional is empty, the output optional is also empty.
335    ///
336    /// # Example
337    /// ```rust
338    /// # #[cfg(feature = "deploy")] {
339    /// # use hydro_lang::prelude::*;
340    /// # use futures::StreamExt;
341    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
342    /// let tick = process.tick();
343    /// let optional = tick.optional_first_tick(q!(1));
344    /// optional.map(q!(|v| v + 1)).all_ticks()
345    /// # }, |mut stream| async move {
346    /// // 2
347    /// # assert_eq!(stream.next().await.unwrap(), 2);
348    /// # }));
349    /// # }
350    /// ```
351    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
352    where
353        F: Fn(T) -> U + 'a,
354    {
355        let f = f.splice_fn1_ctx(&self.location).into();
356        Optional::new(
357            self.location.clone(),
358            HydroNode::Map {
359                f,
360                singleton_refs: Vec::new(),
361                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
362                metadata: self
363                    .location
364                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
365            },
366        )
367    }
368
369    /// Transforms the optional value by applying a function `f` to it and then flattening
370    /// the result into a stream, preserving the order of elements.
371    ///
372    /// If the optional is empty, the output stream is also empty. If the optional contains
373    /// a value, `f` is applied to produce an iterator, and all items from that iterator
374    /// are emitted in the output stream in deterministic order.
375    ///
376    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
377    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
378    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
379    ///
380    /// # Example
381    /// ```rust
382    /// # #[cfg(feature = "deploy")] {
383    /// # use hydro_lang::prelude::*;
384    /// # use futures::StreamExt;
385    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
386    /// let tick = process.tick();
387    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
388    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
389    /// # }, |mut stream| async move {
390    /// // 1, 2, 3
391    /// # for w in vec![1, 2, 3] {
392    /// #     assert_eq!(stream.next().await.unwrap(), w);
393    /// # }
394    /// # }));
395    /// # }
396    /// ```
397    pub fn flat_map_ordered<U, I, F>(
398        self,
399        f: impl IntoQuotedMut<'a, F, L>,
400    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
401    where
402        B: IsBounded,
403        I: IntoIterator<Item = U>,
404        F: Fn(T) -> I + 'a,
405    {
406        self.into_stream().flat_map_ordered(f)
407    }
408
409    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
410    /// for the output type `I` to produce items in any order.
411    ///
412    /// If the optional is empty, the output stream is also empty. If the optional contains
413    /// a value, `f` is applied to produce an iterator, and all items from that iterator
414    /// are emitted in the output stream in non-deterministic order.
415    ///
416    /// # Example
417    /// ```rust
418    /// # #[cfg(feature = "deploy")] {
419    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
420    /// # use futures::StreamExt;
421    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
422    /// let tick = process.tick();
423    /// let optional = tick.optional_first_tick(q!(
424    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
425    /// ));
426    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
427    /// # }, |mut stream| async move {
428    /// // 1, 2, 3, but in no particular order
429    /// # let mut results = Vec::new();
430    /// # for _ in 0..3 {
431    /// #     results.push(stream.next().await.unwrap());
432    /// # }
433    /// # results.sort();
434    /// # assert_eq!(results, vec![1, 2, 3]);
435    /// # }));
436    /// # }
437    /// ```
438    pub fn flat_map_unordered<U, I, F>(
439        self,
440        f: impl IntoQuotedMut<'a, F, L>,
441    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
442    where
443        B: IsBounded,
444        I: IntoIterator<Item = U>,
445        F: Fn(T) -> I + 'a,
446    {
447        self.into_stream().flat_map_unordered(f)
448    }
449
450    /// Flattens the optional value into a stream, preserving the order of elements.
451    ///
452    /// If the optional is empty, the output stream is also empty. If the optional contains
453    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
454    /// in the output stream in deterministic order.
455    ///
456    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
457    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
458    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
459    ///
460    /// # Example
461    /// ```rust
462    /// # #[cfg(feature = "deploy")] {
463    /// # use hydro_lang::prelude::*;
464    /// # use futures::StreamExt;
465    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
466    /// let tick = process.tick();
467    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
468    /// optional.flatten_ordered().all_ticks()
469    /// # }, |mut stream| async move {
470    /// // 1, 2, 3
471    /// # for w in vec![1, 2, 3] {
472    /// #     assert_eq!(stream.next().await.unwrap(), w);
473    /// # }
474    /// # }));
475    /// # }
476    /// ```
477    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
478    where
479        B: IsBounded,
480        T: IntoIterator<Item = U>,
481    {
482        self.flat_map_ordered(q!(|v| v))
483    }
484
485    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
486    /// for the element type `T` to produce items in any order.
487    ///
488    /// If the optional is empty, the output stream is also empty. If the optional contains
489    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
490    /// in the output stream in non-deterministic order.
491    ///
492    /// # Example
493    /// ```rust
494    /// # #[cfg(feature = "deploy")] {
495    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
496    /// # use futures::StreamExt;
497    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
498    /// let tick = process.tick();
499    /// let optional = tick.optional_first_tick(q!(
500    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
501    /// ));
502    /// optional.flatten_unordered().all_ticks()
503    /// # }, |mut stream| async move {
504    /// // 1, 2, 3, but in no particular order
505    /// # let mut results = Vec::new();
506    /// # for _ in 0..3 {
507    /// #     results.push(stream.next().await.unwrap());
508    /// # }
509    /// # results.sort();
510    /// # assert_eq!(results, vec![1, 2, 3]);
511    /// # }));
512    /// # }
513    /// ```
514    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
515    where
516        B: IsBounded,
517        T: IntoIterator<Item = U>,
518    {
519        self.flat_map_unordered(q!(|v| v))
520    }
521
522    /// Creates an optional containing only the value if it satisfies a predicate `f`.
523    ///
524    /// If the optional is empty, the output optional is also empty. If the optional contains
525    /// a value and the predicate returns `true`, the output optional contains the same value.
526    /// If the predicate returns `false`, the output optional is empty.
527    ///
528    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
529    /// not modify or take ownership of the value. If you need to modify the value while filtering
530    /// use [`Optional::filter_map`] instead.
531    ///
532    /// # Example
533    /// ```rust
534    /// # #[cfg(feature = "deploy")] {
535    /// # use hydro_lang::prelude::*;
536    /// # use futures::StreamExt;
537    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
538    /// let tick = process.tick();
539    /// let optional = tick.optional_first_tick(q!(5));
540    /// optional.filter(q!(|&x| x > 3)).all_ticks()
541    /// # }, |mut stream| async move {
542    /// // 5
543    /// # assert_eq!(stream.next().await.unwrap(), 5);
544    /// # }));
545    /// # }
546    /// ```
547    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
548    where
549        F: Fn(&T) -> bool + 'a,
550    {
551        let f = f.splice_fn1_borrow_ctx(&self.location).into();
552        Optional::new(
553            self.location.clone(),
554            HydroNode::Filter {
555                f,
556                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
557                metadata: self.location.new_node_metadata(Self::collection_kind()),
558            },
559        )
560    }
561
562    /// An operator that both filters and maps. It yields only the value if the supplied
563    /// closure `f` returns `Some(value)`.
564    ///
565    /// If the optional is empty, the output optional is also empty. If the optional contains
566    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
567    /// If the closure returns `None`, the output optional is empty.
568    ///
569    /// # Example
570    /// ```rust
571    /// # #[cfg(feature = "deploy")] {
572    /// # use hydro_lang::prelude::*;
573    /// # use futures::StreamExt;
574    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
575    /// let tick = process.tick();
576    /// let optional = tick.optional_first_tick(q!("42"));
577    /// optional
578    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
579    ///     .all_ticks()
580    /// # }, |mut stream| async move {
581    /// // 42
582    /// # assert_eq!(stream.next().await.unwrap(), 42);
583    /// # }));
584    /// # }
585    /// ```
586    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
587    where
588        F: Fn(T) -> Option<U> + 'a,
589    {
590        let f = f.splice_fn1_ctx(&self.location).into();
591        Optional::new(
592            self.location.clone(),
593            HydroNode::FilterMap {
594                f,
595                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
596                metadata: self
597                    .location
598                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
599            },
600        )
601    }
602
603    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
604    ///
605    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
606    /// non-null. This is useful for combining several pieces of state together.
607    ///
608    /// # Example
609    /// ```rust
610    /// # #[cfg(feature = "deploy")] {
611    /// # use hydro_lang::prelude::*;
612    /// # use futures::StreamExt;
613    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
614    /// let tick = process.tick();
615    /// let numbers = process
616    ///   .source_iter(q!(vec![123, 456, 789]))
617    ///   .batch(&tick, nondet!(/** test */));
618    /// let min = numbers.clone().min(); // Optional
619    /// let max = numbers.max(); // Optional
620    /// min.zip(max).all_ticks()
621    /// # }, |mut stream| async move {
622    /// // [(123, 789)]
623    /// # for w in vec![(123, 789)] {
624    /// #     assert_eq!(stream.next().await.unwrap(), w);
625    /// # }
626    /// # }));
627    /// # }
628    /// ```
629    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
630    where
631        B: IsBounded,
632    {
633        let other: Optional<O, L, B> = other.into();
634        check_matching_location(&self.location, &other.location);
635
636        if L::is_top_level()
637            && let Some(tick) = self.location.try_tick()
638        {
639            let out = zip_inside_tick(
640                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
641                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
642            )
643            .latest();
644
645            Optional::new(
646                out.location.clone(),
647                out.ir_node.replace(HydroNode::Placeholder),
648            )
649        } else {
650            zip_inside_tick(self, other)
651        }
652    }
653
654    /// Passes through `self` when it has a value, otherwise passes through `other`.
655    ///
656    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
657    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
658    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
659    ///
660    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
661    /// of the inputs change (including to/from null states).
662    ///
663    /// # Example
664    /// ```rust
665    /// # #[cfg(feature = "deploy")] {
666    /// # use hydro_lang::prelude::*;
667    /// # use futures::StreamExt;
668    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
669    /// let tick = process.tick();
670    /// // ticks are lazy by default, forces the second tick to run
671    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
672    ///
673    /// let some_first_tick = tick.optional_first_tick(q!(123));
674    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
675    /// some_first_tick.or(some_second_tick).all_ticks()
676    /// # }, |mut stream| async move {
677    /// // [123 /* first tick */, 456 /* second tick */]
678    /// # for w in vec![123, 456] {
679    /// #     assert_eq!(stream.next().await.unwrap(), w);
680    /// # }
681    /// # }));
682    /// # }
683    /// ```
684    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
685        check_matching_location(&self.location, &other.location);
686
687        if L::is_top_level()
688            && !B::BOUNDED // only if unbounded we need to use a tick
689            && let Some(tick) = self.location.try_tick()
690        {
691            let out = or_inside_tick(
692                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
693                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
694            )
695            .latest();
696
697            Optional::new(
698                out.location.clone(),
699                out.ir_node.replace(HydroNode::Placeholder),
700            )
701        } else {
702            Optional::new(
703                self.location.clone(),
704                HydroNode::ChainFirst {
705                    first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
706                    second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
707                    metadata: self.location.new_node_metadata(Self::collection_kind()),
708                },
709            )
710        }
711    }
712
713    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
714    ///
715    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
716    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
717    ///
718    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
719    /// of the inputs change (including to/from null states).
720    ///
721    /// # Example
722    /// ```rust
723    /// # #[cfg(feature = "deploy")] {
724    /// # use hydro_lang::prelude::*;
725    /// # use futures::StreamExt;
726    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
727    /// let tick = process.tick();
728    /// // ticks are lazy by default, forces the later ticks to run
729    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
730    ///
731    /// let some_first_tick = tick.optional_first_tick(q!(123));
732    /// some_first_tick
733    ///     .unwrap_or(tick.singleton(q!(456)))
734    ///     .all_ticks()
735    /// # }, |mut stream| async move {
736    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
737    /// # for w in vec![123, 456, 456, 456] {
738    /// #     assert_eq!(stream.next().await.unwrap(), w);
739    /// # }
740    /// # }));
741    /// # }
742    /// ```
743    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
744        let res_option = self.or(other.into());
745        Singleton::new(
746            res_option.location.clone(),
747            HydroNode::Cast {
748                inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
749                metadata: res_option
750                    .location
751                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
752            },
753        )
754    }
755
756    /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
757    ///
758    /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
759    /// [`Optional`] when the default value of the type is a suitable fallback.
760    ///
761    /// # Example
762    /// ```rust
763    /// # #[cfg(feature = "deploy")] {
764    /// # use hydro_lang::prelude::*;
765    /// # use futures::StreamExt;
766    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
767    /// let tick = process.tick();
768    /// // ticks are lazy by default, forces the later ticks to run
769    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
770    ///
771    /// let some_first_tick = tick.optional_first_tick(q!(123i32));
772    /// some_first_tick.unwrap_or_default().all_ticks()
773    /// # }, |mut stream| async move {
774    /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
775    /// # for w in vec![123, 0, 0, 0] {
776    /// #     assert_eq!(stream.next().await.unwrap(), w);
777    /// # }
778    /// # }));
779    /// # }
780    /// ```
781    pub fn unwrap_or_default(self) -> Singleton<T, L, B>
782    where
783        T: Default + Clone,
784    {
785        self.into_singleton().map(q!(|v| v.unwrap_or_default()))
786    }
787
788    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
789    ///
790    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
791    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
792    /// so that Hydro can skip any computation on null values.
793    ///
794    /// # Example
795    /// ```rust
796    /// # #[cfg(feature = "deploy")] {
797    /// # use hydro_lang::prelude::*;
798    /// # use futures::StreamExt;
799    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
800    /// let tick = process.tick();
801    /// // ticks are lazy by default, forces the later ticks to run
802    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
803    ///
804    /// let some_first_tick = tick.optional_first_tick(q!(123));
805    /// some_first_tick.into_singleton().all_ticks()
806    /// # }, |mut stream| async move {
807    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
808    /// # for w in vec![Some(123), None, None, None] {
809    /// #     assert_eq!(stream.next().await.unwrap(), w);
810    /// # }
811    /// # }));
812    /// # }
813    /// ```
814    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
815    where
816        T: Clone,
817    {
818        let none: syn::Expr = parse_quote!(::std::option::Option::None);
819
820        let none_singleton = Singleton::new(
821            self.location.clone(),
822            HydroNode::SingletonSource {
823                value: none.into(),
824                first_tick_only: false,
825                metadata: self
826                    .location
827                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
828            },
829        );
830
831        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
832    }
833
834    /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
835    ///
836    /// # Example
837    /// ```rust
838    /// # #[cfg(feature = "deploy")] {
839    /// # use hydro_lang::prelude::*;
840    /// # use futures::StreamExt;
841    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
842    /// let tick = process.tick();
843    /// // ticks are lazy by default, forces the second tick to run
844    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
845    ///
846    /// let some_first_tick = tick.optional_first_tick(q!(42));
847    /// some_first_tick.is_some().all_ticks()
848    /// # }, |mut stream| async move {
849    /// // [true /* first tick */, false /* second tick */, ...]
850    /// # for w in vec![true, false] {
851    /// #     assert_eq!(stream.next().await.unwrap(), w);
852    /// # }
853    /// # }));
854    /// # }
855    /// ```
856    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
857    pub fn is_some(self) -> Singleton<bool, L, B> {
858        self.map(q!(|_| ()))
859            .into_singleton()
860            .map(q!(|o| o.is_some()))
861    }
862
863    /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
864    ///
865    /// # Example
866    /// ```rust
867    /// # #[cfg(feature = "deploy")] {
868    /// # use hydro_lang::prelude::*;
869    /// # use futures::StreamExt;
870    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
871    /// let tick = process.tick();
872    /// // ticks are lazy by default, forces the second tick to run
873    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
874    ///
875    /// let some_first_tick = tick.optional_first_tick(q!(42));
876    /// some_first_tick.is_none().all_ticks()
877    /// # }, |mut stream| async move {
878    /// // [false /* first tick */, true /* second tick */, ...]
879    /// # for w in vec![false, true] {
880    /// #     assert_eq!(stream.next().await.unwrap(), w);
881    /// # }
882    /// # }));
883    /// # }
884    /// ```
885    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
886    pub fn is_none(self) -> Singleton<bool, L, B> {
887        self.map(q!(|_| ()))
888            .into_singleton()
889            .map(q!(|o| o.is_none()))
890    }
891
892    /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
893    /// values are equal, `false` otherwise (including when either is null).
894    ///
895    /// # Example
896    /// ```rust
897    /// # #[cfg(feature = "deploy")] {
898    /// # use hydro_lang::prelude::*;
899    /// # use futures::StreamExt;
900    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
901    /// let tick = process.tick();
902    /// // ticks are lazy by default, forces the second tick to run
903    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
904    ///
905    /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
906    /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
907    /// a.is_some_and_equals(b).all_ticks()
908    /// # }, |mut stream| async move {
909    /// // [true, false]
910    /// # for w in vec![true, false] {
911    /// #     assert_eq!(stream.next().await.unwrap(), w);
912    /// # }
913    /// # }));
914    /// # }
915    /// ```
916    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
917    pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
918    where
919        T: PartialEq + Clone,
920        B: IsBounded,
921    {
922        self.into_singleton()
923            .zip(other.into_singleton())
924            .map(q!(|(a, b)| a.is_some() && a == b))
925    }
926
927    /// An operator which allows you to "name" a `HydroNode`.
928    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
929    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
930        {
931            let mut node = self.ir_node.borrow_mut();
932            let metadata = node.metadata_mut();
933            metadata.tag = Some(name.to_owned());
934        }
935        self
936    }
937
938    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
939    /// implies that `B == Bounded`.
940    pub fn make_bounded(self) -> Optional<T, L, Bounded>
941    where
942        B: IsBounded,
943    {
944        Optional::new(
945            self.location.clone(),
946            self.ir_node.replace(HydroNode::Placeholder),
947        )
948    }
949
950    /// Clones this bounded optional into a tick, returning a optional that has the
951    /// same value as the outer optional. Because the outer optional is bounded, this
952    /// is deterministic because there is only a single immutable version.
953    pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
954    where
955        B: IsBounded,
956        T: Clone,
957    {
958        // TODO(shadaj): avoid printing simulator logs for this snapshot
959        self.snapshot(
960            tick,
961            nondet!(/** bounded top-level optional so deterministic */),
962        )
963    }
964
965    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
966    /// non-null. Otherwise, the stream is empty.
967    ///
968    /// # Example
969    /// ```rust
970    /// # #[cfg(feature = "deploy")] {
971    /// # use hydro_lang::prelude::*;
972    /// # use futures::StreamExt;
973    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
974    /// # let tick = process.tick();
975    /// # // ticks are lazy by default, forces the second tick to run
976    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
977    /// # let batch_first_tick = process
978    /// #   .source_iter(q!(vec![]))
979    /// #   .batch(&tick, nondet!(/** test */));
980    /// # let batch_second_tick = process
981    /// #   .source_iter(q!(vec![123, 456]))
982    /// #   .batch(&tick, nondet!(/** test */))
983    /// #   .defer_tick(); // appears on the second tick
984    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
985    /// input_batch // first tick: [], second tick: [123, 456]
986    ///     .clone()
987    ///     .max()
988    ///     .into_stream()
989    ///     .chain(input_batch)
990    ///     .all_ticks()
991    /// # }, |mut stream| async move {
992    /// // [456, 123, 456]
993    /// # for w in vec![456, 123, 456] {
994    /// #     assert_eq!(stream.next().await.unwrap(), w);
995    /// # }
996    /// # }));
997    /// # }
998    /// ```
999    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1000    where
1001        B: IsBounded,
1002    {
1003        Stream::new(
1004            self.location.clone(),
1005            HydroNode::Cast {
1006                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1007                metadata: self.location.new_node_metadata(Stream::<
1008                    T,
1009                    Tick<L>,
1010                    Bounded,
1011                    TotalOrder,
1012                    ExactlyOnce,
1013                >::collection_kind()),
1014            },
1015        )
1016    }
1017
1018    /// Filters this optional, passing through the value if the boolean signal is `true`,
1019    /// otherwise the output is null.
1020    ///
1021    /// # Example
1022    /// ```rust
1023    /// # #[cfg(feature = "deploy")] {
1024    /// # use hydro_lang::prelude::*;
1025    /// # use futures::StreamExt;
1026    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1027    /// let tick = process.tick();
1028    /// // ticks are lazy by default, forces the second tick to run
1029    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1030    ///
1031    /// let some_first_tick = tick.optional_first_tick(q!(()));
1032    /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1033    /// let batch_first_tick = process
1034    ///   .source_iter(q!(vec![456]))
1035    ///   .batch(&tick, nondet!(/** test */));
1036    /// let batch_second_tick = process
1037    ///   .source_iter(q!(vec![789]))
1038    ///   .batch(&tick, nondet!(/** test */))
1039    ///   .defer_tick();
1040    /// batch_first_tick.chain(batch_second_tick).first()
1041    ///   .filter_if(signal)
1042    ///   .unwrap_or(tick.singleton(q!(0)))
1043    ///   .all_ticks()
1044    /// # }, |mut stream| async move {
1045    /// // [456, 0]
1046    /// # for w in vec![456, 0] {
1047    /// #     assert_eq!(stream.next().await.unwrap(), w);
1048    /// # }
1049    /// # }));
1050    /// # }
1051    /// ```
1052    pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1053    where
1054        B: IsBounded,
1055    {
1056        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1057    }
1058
1059    /// Filters this optional, passing through the optional value if it is non-null **and** the
1060    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1061    ///
1062    /// Useful for conditionally processing, such as only emitting an optional's value outside
1063    /// a tick if some other condition is satisfied.
1064    ///
1065    /// # Example
1066    /// ```rust
1067    /// # #[cfg(feature = "deploy")] {
1068    /// # use hydro_lang::prelude::*;
1069    /// # use futures::StreamExt;
1070    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1071    /// let tick = process.tick();
1072    /// // ticks are lazy by default, forces the second tick to run
1073    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1074    ///
1075    /// let batch_first_tick = process
1076    ///   .source_iter(q!(vec![]))
1077    ///   .batch(&tick, nondet!(/** test */));
1078    /// let batch_second_tick = process
1079    ///   .source_iter(q!(vec![456]))
1080    ///   .batch(&tick, nondet!(/** test */))
1081    ///   .defer_tick(); // appears on the second tick
1082    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1083    /// batch_first_tick.chain(batch_second_tick).first()
1084    ///   .filter_if_some(some_on_first_tick)
1085    ///   .unwrap_or(tick.singleton(q!(789)))
1086    ///   .all_ticks()
1087    /// # }, |mut stream| async move {
1088    /// // [789, 789]
1089    /// # for w in vec![789, 789] {
1090    /// #     assert_eq!(stream.next().await.unwrap(), w);
1091    /// # }
1092    /// # }));
1093    /// # }
1094    /// ```
1095    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1096    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1097    where
1098        B: IsBounded,
1099    {
1100        self.filter_if(signal.is_some())
1101    }
1102
1103    /// Filters this optional, passing through the optional value if it is non-null **and** the
1104    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1105    ///
1106    /// Useful for conditionally processing, such as only emitting an optional's value outside
1107    /// a tick if some other condition is satisfied.
1108    ///
1109    /// # Example
1110    /// ```rust
1111    /// # #[cfg(feature = "deploy")] {
1112    /// # use hydro_lang::prelude::*;
1113    /// # use futures::StreamExt;
1114    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1115    /// let tick = process.tick();
1116    /// // ticks are lazy by default, forces the second tick to run
1117    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1118    ///
1119    /// let batch_first_tick = process
1120    ///   .source_iter(q!(vec![]))
1121    ///   .batch(&tick, nondet!(/** test */));
1122    /// let batch_second_tick = process
1123    ///   .source_iter(q!(vec![456]))
1124    ///   .batch(&tick, nondet!(/** test */))
1125    ///   .defer_tick(); // appears on the second tick
1126    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1127    /// batch_first_tick.chain(batch_second_tick).first()
1128    ///   .filter_if_none(some_on_first_tick)
1129    ///   .unwrap_or(tick.singleton(q!(789)))
1130    ///   .all_ticks()
1131    /// # }, |mut stream| async move {
1132    /// // [789, 789]
1133    /// # for w in vec![789, 456] {
1134    /// #     assert_eq!(stream.next().await.unwrap(), w);
1135    /// # }
1136    /// # }));
1137    /// # }
1138    /// ```
1139    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1140    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1141    where
1142        B: IsBounded,
1143    {
1144        self.filter_if(other.is_none())
1145    }
1146
1147    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1148    ///
1149    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1150    /// having a value, such as only releasing a piece of state if the node is the leader.
1151    ///
1152    /// # Example
1153    /// ```rust
1154    /// # #[cfg(feature = "deploy")] {
1155    /// # use hydro_lang::prelude::*;
1156    /// # use futures::StreamExt;
1157    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1158    /// let tick = process.tick();
1159    /// // ticks are lazy by default, forces the second tick to run
1160    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1161    ///
1162    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1163    /// some_on_first_tick
1164    ///     .if_some_then(tick.singleton(q!(456)))
1165    ///     .unwrap_or(tick.singleton(q!(123)))
1166    /// # .all_ticks()
1167    /// # }, |mut stream| async move {
1168    /// // 456 (first tick) ~> 123 (second tick onwards)
1169    /// # for w in vec![456, 123, 123] {
1170    /// #     assert_eq!(stream.next().await.unwrap(), w);
1171    /// # }
1172    /// # }));
1173    /// # }
1174    /// ```
1175    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1176    pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1177    where
1178        B: IsBounded,
1179    {
1180        value.filter_if(self.is_some())
1181    }
1182}
1183
1184impl<'a, K, V, L, B: Boundedness> Optional<(K, V), L, B>
1185where
1186    L: Location<'a>,
1187{
1188    /// Converts this optional into a [`KeyedSingleton`] containing a single entry with the
1189    /// key-value pair of this [`Optional`].
1190    ///
1191    /// If this [`Optional`] is [`Bounded`], the [`KeyedSingleton`] will be [`Bounded`] as well
1192    /// if it is [`Unbounded`], the [`KeyedSingleton`] will be [`Unbounded`], which means that
1193    /// the entry will be updated and appear / disappear according to the state of the
1194    /// [`Optional`].
1195    pub fn into_keyed_singleton(self) -> KeyedSingleton<K, V, L, B> {
1196        KeyedSingleton::new(
1197            self.location.clone(),
1198            HydroNode::Cast {
1199                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1200                metadata: self
1201                    .location
1202                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1203            },
1204        )
1205    }
1206}
1207
1208impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1209where
1210    L: Location<'a> + NoTick,
1211{
1212    /// Returns an optional value corresponding to the latest snapshot of the optional
1213    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1214    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1215    /// all snapshots of this optional into the atomic-associated tick will observe the
1216    /// same value each tick.
1217    ///
1218    /// # Non-Determinism
1219    /// Because this picks a snapshot of a optional whose value is continuously changing,
1220    /// the output optional has a non-deterministic value since the snapshot can be at an
1221    /// arbitrary point in time.
1222    pub fn snapshot_atomic(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1223        Optional::new(
1224            tick.clone(),
1225            HydroNode::Batch {
1226                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1227                metadata: tick
1228                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1229            },
1230        )
1231    }
1232
1233    /// Returns this optional back into a top-level, asynchronous execution context where updates
1234    /// to the value will be asynchronously propagated.
1235    pub fn end_atomic(self) -> Optional<T, L, B> {
1236        Optional::new(
1237            self.location.tick.l.clone(),
1238            HydroNode::EndAtomic {
1239                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1240                metadata: self
1241                    .location
1242                    .tick
1243                    .l
1244                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1245            },
1246        )
1247    }
1248}
1249
1250impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1251where
1252    L: Location<'a>,
1253{
1254    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1255    /// will observe the same version of the value and will be executed synchronously before any
1256    /// outputs are yielded (in [`Optional::end_atomic`]).
1257    ///
1258    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1259    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1260    /// a different version).
1261    pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1262        let id = self.location.flow_state().borrow_mut().next_clock_id();
1263        let out_location = Atomic {
1264            tick: Tick {
1265                id,
1266                l: self.location.clone(),
1267            },
1268        };
1269        Optional::new(
1270            out_location.clone(),
1271            HydroNode::BeginAtomic {
1272                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1273                metadata: out_location
1274                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1275            },
1276        )
1277    }
1278
1279    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1280    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1281    /// relevant data that contributed to the snapshot at tick `t`.
1282    ///
1283    /// # Non-Determinism
1284    /// Because this picks a snapshot of a optional whose value is continuously changing,
1285    /// the output optional has a non-deterministic value since the snapshot can be at an
1286    /// arbitrary point in time.
1287    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1288        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1289        Optional::new(
1290            tick.clone(),
1291            HydroNode::Batch {
1292                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1293                metadata: tick
1294                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1295            },
1296        )
1297    }
1298
1299    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1300    /// with order corresponding to increasing prefixes of data contributing to the optional.
1301    ///
1302    /// # Non-Determinism
1303    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1304    /// to non-deterministic batching and arrival of inputs, the output stream is
1305    /// non-deterministic.
1306    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1307    where
1308        L: NoTick,
1309    {
1310        let tick = self.location.tick();
1311        self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1312    }
1313
1314    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1315    /// value taken at various points in time. Because the input optional may be
1316    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1317    /// represent the value of the optional given some prefix of the streams leading up to
1318    /// it.
1319    ///
1320    /// # Non-Determinism
1321    /// The output stream is non-deterministic in which elements are sampled, since this
1322    /// is controlled by a clock.
1323    pub fn sample_every(
1324        self,
1325        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1326        nondet: NonDet,
1327    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1328    where
1329        L: NoTick + NoAtomic,
1330    {
1331        let samples = self.location.source_interval(interval);
1332        let tick = self.location.tick();
1333
1334        self.snapshot(&tick, nondet)
1335            .filter_if(samples.batch(&tick, nondet).first().is_some())
1336            .all_ticks()
1337            .weaken_retries()
1338    }
1339}
1340
1341impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1342where
1343    L: Location<'a>,
1344{
1345    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1346    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1347    /// null values).
1348    ///
1349    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1350    /// producing one element in the output for each (non-null) tick. This is useful for batched
1351    /// computations, where the results from each tick must be combined together.
1352    ///
1353    /// # Example
1354    /// ```rust
1355    /// # #[cfg(feature = "deploy")] {
1356    /// # use hydro_lang::prelude::*;
1357    /// # use futures::StreamExt;
1358    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1359    /// # let tick = process.tick();
1360    /// # // ticks are lazy by default, forces the second tick to run
1361    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1362    /// # let batch_first_tick = process
1363    /// #   .source_iter(q!(vec![]))
1364    /// #   .batch(&tick, nondet!(/** test */));
1365    /// # let batch_second_tick = process
1366    /// #   .source_iter(q!(vec![1, 2, 3]))
1367    /// #   .batch(&tick, nondet!(/** test */))
1368    /// #   .defer_tick(); // appears on the second tick
1369    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1370    /// input_batch // first tick: [], second tick: [1, 2, 3]
1371    ///     .max()
1372    ///     .all_ticks()
1373    /// # }, |mut stream| async move {
1374    /// // [3]
1375    /// # for w in vec![3] {
1376    /// #     assert_eq!(stream.next().await.unwrap(), w);
1377    /// # }
1378    /// # }));
1379    /// # }
1380    /// ```
1381    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1382        self.into_stream().all_ticks()
1383    }
1384
1385    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1386    /// which will stream the value computed in _each_ tick as a separate stream element.
1387    ///
1388    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1389    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1390    /// optional's [`Tick`] context.
1391    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1392        self.into_stream().all_ticks_atomic()
1393    }
1394
1395    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1396    /// be asynchronously updated with the latest value of the optional inside the tick, including
1397    /// whether the optional is null or not.
1398    ///
1399    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1400    /// tick that tracks the inner value. This is useful for getting the value as of the
1401    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1402    ///
1403    /// # Example
1404    /// ```rust
1405    /// # #[cfg(feature = "deploy")] {
1406    /// # use hydro_lang::prelude::*;
1407    /// # use futures::StreamExt;
1408    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1409    /// # let tick = process.tick();
1410    /// # // ticks are lazy by default, forces the second tick to run
1411    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1412    /// # let batch_first_tick = process
1413    /// #   .source_iter(q!(vec![]))
1414    /// #   .batch(&tick, nondet!(/** test */));
1415    /// # let batch_second_tick = process
1416    /// #   .source_iter(q!(vec![1, 2, 3]))
1417    /// #   .batch(&tick, nondet!(/** test */))
1418    /// #   .defer_tick(); // appears on the second tick
1419    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1420    /// input_batch // first tick: [], second tick: [1, 2, 3]
1421    ///     .max()
1422    ///     .latest()
1423    /// # .into_singleton()
1424    /// # .sample_eager(nondet!(/** test */))
1425    /// # }, |mut stream| async move {
1426    /// // asynchronously changes from None ~> 3
1427    /// # for w in vec![None, Some(3)] {
1428    /// #     assert_eq!(stream.next().await.unwrap(), w);
1429    /// # }
1430    /// # }));
1431    /// # }
1432    /// ```
1433    pub fn latest(self) -> Optional<T, L, Unbounded> {
1434        Optional::new(
1435            self.location.outer().clone(),
1436            HydroNode::YieldConcat {
1437                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1438                metadata: self
1439                    .location
1440                    .outer()
1441                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1442            },
1443        )
1444    }
1445
1446    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1447    /// be updated with the latest value of the optional inside the tick.
1448    ///
1449    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1450    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1451    /// optional's [`Tick`] context.
1452    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1453        let out_location = Atomic {
1454            tick: self.location.clone(),
1455        };
1456
1457        Optional::new(
1458            out_location.clone(),
1459            HydroNode::YieldConcat {
1460                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1461                metadata: out_location
1462                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1463            },
1464        )
1465    }
1466
1467    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1468    /// always has the state of `self` at tick `T - 1`.
1469    ///
1470    /// At tick `0`, the output optional is null, since there is no previous tick.
1471    ///
1472    /// This operator enables stateful iterative processing with ticks, by sending data from one
1473    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1474    ///
1475    /// # Example
1476    /// ```rust
1477    /// # #[cfg(feature = "deploy")] {
1478    /// # use hydro_lang::prelude::*;
1479    /// # use futures::StreamExt;
1480    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1481    /// let tick = process.tick();
1482    /// // ticks are lazy by default, forces the second tick to run
1483    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1484    ///
1485    /// let batch_first_tick = process
1486    ///   .source_iter(q!(vec![1, 2]))
1487    ///   .batch(&tick, nondet!(/** test */));
1488    /// let batch_second_tick = process
1489    ///   .source_iter(q!(vec![3, 4]))
1490    ///   .batch(&tick, nondet!(/** test */))
1491    ///   .defer_tick(); // appears on the second tick
1492    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1493    ///   .reduce(q!(|state, v| *state += v));
1494    ///
1495    /// current_tick_sum.clone().into_singleton().zip(
1496    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1497    /// ).all_ticks()
1498    /// # }, |mut stream| async move {
1499    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1500    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1501    /// #     assert_eq!(stream.next().await.unwrap(), w);
1502    /// # }
1503    /// # }));
1504    /// # }
1505    /// ```
1506    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1507        Optional::new(
1508            self.location.clone(),
1509            HydroNode::DeferTick {
1510                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1511                metadata: self.location.new_node_metadata(Self::collection_kind()),
1512            },
1513        )
1514    }
1515}
1516
1517#[cfg(test)]
1518mod tests {
1519    #[cfg(feature = "deploy")]
1520    use futures::StreamExt;
1521    #[cfg(feature = "deploy")]
1522    use hydro_deploy::Deployment;
1523    #[cfg(any(feature = "deploy", feature = "sim"))]
1524    use stageleft::q;
1525
1526    #[cfg(feature = "deploy")]
1527    use super::Optional;
1528    #[cfg(any(feature = "deploy", feature = "sim"))]
1529    use crate::compile::builder::FlowBuilder;
1530    #[cfg(any(feature = "deploy", feature = "sim"))]
1531    use crate::location::Location;
1532    #[cfg(feature = "deploy")]
1533    use crate::nondet::nondet;
1534
1535    #[cfg(feature = "deploy")]
1536    #[tokio::test]
1537    async fn optional_or_cardinality() {
1538        let mut deployment = Deployment::new();
1539
1540        let mut flow = FlowBuilder::new();
1541        let node = flow.process::<()>();
1542        let external = flow.external::<()>();
1543
1544        let node_tick = node.tick();
1545        let tick_singleton = node_tick.singleton(q!(123));
1546        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1547        let counts = tick_optional_inhabited
1548            .clone()
1549            .or(tick_optional_inhabited)
1550            .into_stream()
1551            .count()
1552            .all_ticks()
1553            .send_bincode_external(&external);
1554
1555        let nodes = flow
1556            .with_process(&node, deployment.Localhost())
1557            .with_external(&external, deployment.Localhost())
1558            .deploy(&mut deployment);
1559
1560        deployment.deploy().await.unwrap();
1561
1562        let mut external_out = nodes.connect(counts).await;
1563
1564        deployment.start().await.unwrap();
1565
1566        assert_eq!(external_out.next().await.unwrap(), 1);
1567    }
1568
1569    #[cfg(feature = "deploy")]
1570    #[tokio::test]
1571    async fn into_singleton_top_level_none_cardinality() {
1572        let mut deployment = Deployment::new();
1573
1574        let mut flow = FlowBuilder::new();
1575        let node = flow.process::<()>();
1576        let external = flow.external::<()>();
1577
1578        let node_tick = node.tick();
1579        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1580        let into_singleton = top_level_none.into_singleton();
1581
1582        let tick_driver = node.spin();
1583
1584        let counts = into_singleton
1585            .snapshot(&node_tick, nondet!(/** test */))
1586            .into_stream()
1587            .count()
1588            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1589            .map(q!(|(c, _)| c))
1590            .all_ticks()
1591            .send_bincode_external(&external);
1592
1593        let nodes = flow
1594            .with_process(&node, deployment.Localhost())
1595            .with_external(&external, deployment.Localhost())
1596            .deploy(&mut deployment);
1597
1598        deployment.deploy().await.unwrap();
1599
1600        let mut external_out = nodes.connect(counts).await;
1601
1602        deployment.start().await.unwrap();
1603
1604        assert_eq!(external_out.next().await.unwrap(), 1);
1605        assert_eq!(external_out.next().await.unwrap(), 1);
1606        assert_eq!(external_out.next().await.unwrap(), 1);
1607    }
1608
1609    #[cfg(feature = "deploy")]
1610    #[tokio::test]
1611    async fn into_singleton_unbounded_top_level_none_cardinality() {
1612        let mut deployment = Deployment::new();
1613
1614        let mut flow = FlowBuilder::new();
1615        let node = flow.process::<()>();
1616        let external = flow.external::<()>();
1617
1618        let node_tick = node.tick();
1619        let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1620        let into_singleton = top_level_none.into_singleton();
1621
1622        let tick_driver = node.spin();
1623
1624        let counts = into_singleton
1625            .snapshot(&node_tick, nondet!(/** test */))
1626            .into_stream()
1627            .count()
1628            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1629            .map(q!(|(c, _)| c))
1630            .all_ticks()
1631            .send_bincode_external(&external);
1632
1633        let nodes = flow
1634            .with_process(&node, deployment.Localhost())
1635            .with_external(&external, deployment.Localhost())
1636            .deploy(&mut deployment);
1637
1638        deployment.deploy().await.unwrap();
1639
1640        let mut external_out = nodes.connect(counts).await;
1641
1642        deployment.start().await.unwrap();
1643
1644        assert_eq!(external_out.next().await.unwrap(), 1);
1645        assert_eq!(external_out.next().await.unwrap(), 1);
1646        assert_eq!(external_out.next().await.unwrap(), 1);
1647    }
1648
1649    #[cfg(feature = "sim")]
1650    #[test]
1651    fn top_level_optional_some_into_stream_no_replay() {
1652        let mut flow = FlowBuilder::new();
1653        let node = flow.process::<()>();
1654
1655        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1656        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1657        let filtered_some = folded.filter(q!(|_| true));
1658
1659        let out_recv = filtered_some.into_stream().sim_output();
1660
1661        flow.sim().exhaustive(async || {
1662            out_recv.assert_yields_only([10]).await;
1663        });
1664    }
1665
1666    #[cfg(feature = "sim")]
1667    #[test]
1668    fn top_level_optional_none_into_stream_no_replay() {
1669        let mut flow = FlowBuilder::new();
1670        let node = flow.process::<()>();
1671
1672        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1673        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1674        let filtered_none = folded.filter(q!(|_| false));
1675
1676        let out_recv = filtered_none.into_stream().sim_output();
1677
1678        flow.sim().exhaustive(async || {
1679            out_recv.assert_yields_only([] as [i32; 0]).await;
1680        });
1681    }
1682}