Skip to main content

hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{
16    ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
17};
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::keyed_singleton::KeyedSingletonBound;
27use crate::live_collections::stream::{
28    AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
29};
30#[cfg(stageleft_runtime)]
31use crate::location::dynamic::{DynLocation, LocationId};
32use crate::location::tick::DeferTick;
33use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
34use crate::manual_expr::ManualExpr;
35use crate::nondet::{NonDet, nondet};
36use crate::properties::{
37    AggFuncAlgebra, ApplyMonotoneKeyedStream, ValidCommutativityFor, ValidIdempotenceFor,
38    manual_proof,
39};
40
41pub mod networking;
42
43/// Streaming elements of type `V` grouped by a key of type `K`.
44///
45/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
46/// order of keys is non-deterministic but the order *within* each group may be deterministic.
47///
48/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
49/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
50/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
51///
52/// Type Parameters:
53/// - `K`: the type of the key for each group
54/// - `V`: the type of the elements inside each group
55/// - `Loc`: the [`Location`] where the keyed stream is materialized
56/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
57/// - `Order`: tracks whether the elements within each group have deterministic order
58///   ([`TotalOrder`]) or not ([`NoOrder`])
59/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
60///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
61pub struct KeyedStream<
62    K,
63    V,
64    Loc,
65    Bound: Boundedness = Unbounded,
66    Order: Ordering = TotalOrder,
67    Retry: Retries = ExactlyOnce,
68> {
69    pub(crate) location: Loc,
70    pub(crate) ir_node: RefCell<HydroNode>,
71    pub(crate) flow_state: FlowState,
72
73    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
74}
75
76impl<K, V, L, B: Boundedness, O: Ordering, R: Retries> Drop for KeyedStream<K, V, L, B, O, R> {
77    fn drop(&mut self) {
78        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
79        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
80            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
81                input: Box::new(ir_node),
82                op_metadata: HydroIrOpMetadata::new(),
83            });
84        }
85    }
86}
87
88impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
89    for KeyedStream<K, V, L, Unbounded, O, R>
90where
91    L: Location<'a>,
92{
93    fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
94        let new_meta = stream
95            .location
96            .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
97
98        KeyedStream {
99            location: stream.location.clone(),
100            flow_state: stream.flow_state.clone(),
101            ir_node: RefCell::new(HydroNode::Cast {
102                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
103                metadata: new_meta,
104            }),
105            _phantom: PhantomData,
106        }
107    }
108}
109
110impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
111    for KeyedStream<K, V, L, B, NoOrder, R>
112where
113    L: Location<'a>,
114{
115    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
116        stream.weaken_ordering()
117    }
118}
119
120impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
121where
122    L: Location<'a>,
123{
124    fn defer_tick(self) -> Self {
125        KeyedStream::defer_tick(self)
126    }
127}
128
129impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
130    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
131where
132    L: Location<'a>,
133{
134    type Location = Tick<L>;
135
136    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
137        KeyedStream {
138            flow_state: location.flow_state().clone(),
139            location: location.clone(),
140            ir_node: RefCell::new(HydroNode::CycleSource {
141                cycle_id,
142                metadata: location.new_node_metadata(
143                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
144                ),
145            }),
146            _phantom: PhantomData,
147        }
148    }
149}
150
151impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
152    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
153where
154    L: Location<'a>,
155{
156    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
157        assert_eq!(
158            Location::id(&self.location),
159            expected_location,
160            "locations do not match"
161        );
162
163        self.location
164            .flow_state()
165            .borrow_mut()
166            .push_root(HydroRoot::CycleSink {
167                cycle_id,
168                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
169                op_metadata: HydroIrOpMetadata::new(),
170            });
171    }
172}
173
174impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
175    for KeyedStream<K, V, L, B, O, R>
176where
177    L: Location<'a> + NoTick,
178{
179    type Location = L;
180
181    fn create_source(cycle_id: CycleId, location: L) -> Self {
182        KeyedStream {
183            flow_state: location.flow_state().clone(),
184            location: location.clone(),
185            ir_node: RefCell::new(HydroNode::CycleSource {
186                cycle_id,
187                metadata: location
188                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
189            }),
190            _phantom: PhantomData,
191        }
192    }
193}
194
195impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
196    for KeyedStream<K, V, L, B, O, R>
197where
198    L: Location<'a> + NoTick,
199{
200    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
201        assert_eq!(
202            Location::id(&self.location),
203            expected_location,
204            "locations do not match"
205        );
206        self.location
207            .flow_state()
208            .borrow_mut()
209            .push_root(HydroRoot::CycleSink {
210                cycle_id,
211                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
212                op_metadata: HydroIrOpMetadata::new(),
213            });
214    }
215}
216
217impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
218    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
219{
220    fn clone(&self) -> Self {
221        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
222            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
223            *self.ir_node.borrow_mut() = HydroNode::Tee {
224                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
225                metadata: self.location.new_node_metadata(Self::collection_kind()),
226            };
227        }
228
229        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
230            KeyedStream {
231                location: self.location.clone(),
232                flow_state: self.flow_state.clone(),
233                ir_node: HydroNode::Tee {
234                    inner: SharedNode(inner.0.clone()),
235                    metadata: metadata.clone(),
236                }
237                .into(),
238                _phantom: PhantomData,
239            }
240        } else {
241            unreachable!()
242        }
243    }
244}
245
246/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
247/// control the processing of future elements.
248pub enum Generate<T> {
249    /// Emit the provided element, and keep processing future inputs.
250    Yield(T),
251    /// Emit the provided element as the _final_ element, do not process future inputs.
252    Return(T),
253    /// Do not emit anything, but continue processing future inputs.
254    Continue,
255    /// Do not emit anything, and do not process further inputs.
256    Break,
257}
258
259impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
260    KeyedStream<K, V, L, B, O, R>
261{
262    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
263        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
264        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
265
266        let flow_state = location.flow_state().clone();
267        KeyedStream {
268            location,
269            flow_state,
270            ir_node: RefCell::new(ir_node),
271            _phantom: PhantomData,
272        }
273    }
274
275    /// Returns the [`CollectionKind`] corresponding to this type.
276    pub fn collection_kind() -> CollectionKind {
277        CollectionKind::KeyedStream {
278            bound: B::BOUND_KIND,
279            value_order: O::ORDERING_KIND,
280            value_retry: R::RETRIES_KIND,
281            key_type: stageleft::quote_type::<K>().into(),
282            value_type: stageleft::quote_type::<V>().into(),
283        }
284    }
285
286    /// Returns the [`Location`] where this keyed stream is being materialized.
287    pub fn location(&self) -> &L {
288        &self.location
289    }
290
291    /// Turns this [`KeyedStream`] into a [`Stream`] preserving ordering, under the invariant
292    /// assumption that there is at most one key. If this invariant is broken, the program
293    /// may exhibit undefined behavior, so uses must be carefully vetted.
294    pub(crate) fn cast_at_most_one_key(self) -> Stream<(K, V), L, B, O, R> {
295        Stream::new(
296            self.location.clone(),
297            HydroNode::Cast {
298                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
299                metadata: self
300                    .location
301                    .new_node_metadata(Stream::<(K, V), L, B, O, R>::collection_kind()),
302            },
303        )
304    }
305
306    /// Turns this [`KeyedStream`] into a [`KeyedSingleton`], under the invariant assumption that
307    /// there is at most one entry per key. If this invariant is broken, the program may exhibit
308    /// undefined behavior, so uses must be carefully vetted.
309    pub(crate) fn cast_at_most_one_entry_per_key(
310        self,
311    ) -> KeyedSingleton<K, V, L, B::WithBoundedValue> {
312        KeyedSingleton::new(
313            self.location.clone(),
314            HydroNode::Cast {
315                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
316                metadata: self.location.new_node_metadata(KeyedSingleton::<
317                    K,
318                    V,
319                    L,
320                    B::WithBoundedValue,
321                >::collection_kind()),
322            },
323        )
324    }
325
326    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> KeyedStream<K, V, L, B, O2, R> {
327        if O::ORDERING_KIND == O2::ORDERING_KIND {
328            KeyedStream::new(
329                self.location.clone(),
330                self.ir_node.replace(HydroNode::Placeholder),
331            )
332        } else {
333            panic!(
334                "Runtime ordering {:?} did not match requested cast {:?}.",
335                O::ORDERING_KIND,
336                O2::ORDERING_KIND
337            )
338        }
339    }
340
341    /// Explicitly "casts" the keyed stream to a type with a different ordering
342    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
343    /// by the type-system.
344    ///
345    /// # Non-Determinism
346    /// This function is used as an escape hatch, and any mistakes in the
347    /// provided ordering guarantee will propagate into the guarantees
348    /// for the rest of the program.
349    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
350        if O::ORDERING_KIND == O2::ORDERING_KIND {
351            KeyedStream::new(
352                self.location.clone(),
353                self.ir_node.replace(HydroNode::Placeholder),
354            )
355        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
356            // We can always weaken the ordering guarantee
357            KeyedStream::new(
358                self.location.clone(),
359                HydroNode::Cast {
360                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
361                    metadata: self
362                        .location
363                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
364                },
365            )
366        } else {
367            KeyedStream::new(
368                self.location.clone(),
369                HydroNode::ObserveNonDet {
370                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
371                    trusted: false,
372                    metadata: self
373                        .location
374                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
375                },
376            )
377        }
378    }
379
380    fn assume_ordering_trusted<O2: Ordering>(
381        self,
382        _nondet: NonDet,
383    ) -> KeyedStream<K, V, L, B, O2, R> {
384        if O::ORDERING_KIND == O2::ORDERING_KIND {
385            KeyedStream::new(
386                self.location.clone(),
387                self.ir_node.replace(HydroNode::Placeholder),
388            )
389        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
390            // We can always weaken the ordering guarantee
391            KeyedStream::new(
392                self.location.clone(),
393                HydroNode::Cast {
394                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
395                    metadata: self
396                        .location
397                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
398                },
399            )
400        } else {
401            KeyedStream::new(
402                self.location.clone(),
403                HydroNode::ObserveNonDet {
404                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
405                    trusted: true,
406                    metadata: self
407                        .location
408                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
409                },
410            )
411        }
412    }
413
414    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
415    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
416    /// which is always safe because that is the weakest possible guarantee.
417    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
418        self.weaken_ordering::<NoOrder>()
419    }
420
421    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
422    /// enforcing that `O2` is weaker than the input ordering guarantee.
423    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
424        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
425        self.assume_ordering::<O2>(nondet)
426    }
427
428    /// Explicitly "casts" the keyed stream to a type with a different retries
429    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
430    /// be proven by the type-system.
431    ///
432    /// # Non-Determinism
433    /// This function is used as an escape hatch, and any mistakes in the
434    /// provided retries guarantee will propagate into the guarantees
435    /// for the rest of the program.
436    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
437        if R::RETRIES_KIND == R2::RETRIES_KIND {
438            KeyedStream::new(
439                self.location.clone(),
440                self.ir_node.replace(HydroNode::Placeholder),
441            )
442        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
443            // We can always weaken the retries guarantee
444            KeyedStream::new(
445                self.location.clone(),
446                HydroNode::Cast {
447                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
448                    metadata: self
449                        .location
450                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
451                },
452            )
453        } else {
454            KeyedStream::new(
455                self.location.clone(),
456                HydroNode::ObserveNonDet {
457                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
458                    trusted: false,
459                    metadata: self
460                        .location
461                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
462                },
463            )
464        }
465    }
466
467    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
468    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
469    /// which is always safe because that is the weakest possible guarantee.
470    pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
471        self.weaken_retries::<AtLeastOnce>()
472    }
473
474    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
475    /// enforcing that `R2` is weaker than the input retries guarantee.
476    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
477        let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
478        self.assume_retries::<R2>(nondet)
479    }
480
481    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
482    /// implies that `O == TotalOrder`.
483    pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
484    where
485        O: IsOrdered,
486    {
487        self.assume_ordering(nondet!(/** no-op */))
488    }
489
490    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
491    /// implies that `R == ExactlyOnce`.
492    pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
493    where
494        R: IsExactlyOnce,
495    {
496        self.assume_retries(nondet!(/** no-op */))
497    }
498
499    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
500    /// implies that `B == Bounded`.
501    pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
502    where
503        B: IsBounded,
504    {
505        self.weaken_boundedness()
506    }
507
508    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
509    /// which implies that `B == Bounded`.
510    pub fn weaken_boundedness<B2: Boundedness>(self) -> KeyedStream<K, V, L, B2, O, R> {
511        if B::BOUNDED == B2::BOUNDED {
512            KeyedStream::new(
513                self.location.clone(),
514                self.ir_node.replace(HydroNode::Placeholder),
515            )
516        } else {
517            // We can always weaken the boundedness
518            KeyedStream::new(
519                self.location.clone(),
520                HydroNode::Cast {
521                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
522                    metadata: self
523                        .location
524                        .new_node_metadata(KeyedStream::<K, V, L, B2, O, R>::collection_kind()),
525                },
526            )
527        }
528    }
529
530    /// Flattens the keyed stream into an unordered stream of key-value pairs.
531    ///
532    /// # Example
533    /// ```rust
534    /// # #[cfg(feature = "deploy")] {
535    /// # use hydro_lang::prelude::*;
536    /// # use futures::StreamExt;
537    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
538    /// process
539    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
540    ///     .into_keyed()
541    ///     .entries()
542    /// # }, |mut stream| async move {
543    /// // (1, 2), (1, 3), (2, 4) in any order
544    /// # let mut results = Vec::new();
545    /// # for _ in 0..3 {
546    /// #     results.push(stream.next().await.unwrap());
547    /// # }
548    /// # results.sort();
549    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
550    /// # }));
551    /// # }
552    /// ```
553    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
554        Stream::new(
555            self.location.clone(),
556            HydroNode::Cast {
557                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
558                metadata: self
559                    .location
560                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
561            },
562        )
563    }
564
565    /// Flattens the keyed stream into a totally ordered stream of key-value pairs,
566    /// preserving the order of values within each key group but non-deterministically
567    /// interleaving across keys.
568    ///
569    /// Requires the keyed stream to be totally ordered within each group (`O: IsOrdered`).
570    ///
571    /// # Non-Determinism
572    /// The interleaving of entries across different keys is non-deterministic.
573    /// Within each key, the original order is preserved.
574    pub fn entries_partially_ordered(self, _nondet: NonDet) -> Stream<(K, V), L, B, TotalOrder, R>
575    where
576        O: IsOrdered,
577    {
578        Stream::new(
579            self.location.clone(),
580            HydroNode::ObserveNonDet {
581                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
582                trusted: false,
583                metadata: self
584                    .location
585                    .new_node_metadata(Stream::<(K, V), L, B, TotalOrder, R>::collection_kind()),
586            },
587        )
588    }
589
590    /// Flattens the keyed stream into an unordered stream of only the values.
591    ///
592    /// # Example
593    /// ```rust
594    /// # #[cfg(feature = "deploy")] {
595    /// # use hydro_lang::prelude::*;
596    /// # use futures::StreamExt;
597    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
598    /// process
599    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
600    ///     .into_keyed()
601    ///     .values()
602    /// # }, |mut stream| async move {
603    /// // 2, 3, 4 in any order
604    /// # let mut results = Vec::new();
605    /// # for _ in 0..3 {
606    /// #     results.push(stream.next().await.unwrap());
607    /// # }
608    /// # results.sort();
609    /// # assert_eq!(results, vec![2, 3, 4]);
610    /// # }));
611    /// # }
612    /// ```
613    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
614        self.entries().map(q!(|(_, v)| v))
615    }
616
617    /// Flattens the keyed stream into an unordered stream of just the keys.
618    ///
619    /// # Example
620    /// ```rust
621    /// # #[cfg(feature = "deploy")] {
622    /// # use hydro_lang::prelude::*;
623    /// # use futures::StreamExt;
624    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
625    /// # process
626    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
627    /// #     .into_keyed()
628    /// #     .keys()
629    /// # }, |mut stream| async move {
630    /// // 1, 2 in any order
631    /// # let mut results = Vec::new();
632    /// # for _ in 0..2 {
633    /// #     results.push(stream.next().await.unwrap());
634    /// # }
635    /// # results.sort();
636    /// # assert_eq!(results, vec![1, 2]);
637    /// # }));
638    /// # }
639    /// ```
640    pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
641    where
642        K: Eq + Hash,
643    {
644        self.entries().map(q!(|(k, _)| k)).unique()
645    }
646
647    /// Transforms each value by invoking `f` on each element, with keys staying the same
648    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
649    ///
650    /// If you do not want to modify the stream and instead only want to view
651    /// each item use [`KeyedStream::inspect`] instead.
652    ///
653    /// # Example
654    /// ```rust
655    /// # #[cfg(feature = "deploy")] {
656    /// # use hydro_lang::prelude::*;
657    /// # use futures::StreamExt;
658    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
659    /// process
660    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
661    ///     .into_keyed()
662    ///     .map(q!(|v| v + 1))
663    /// #   .entries()
664    /// # }, |mut stream| async move {
665    /// // { 1: [3, 4], 2: [5] }
666    /// # let mut results = Vec::new();
667    /// # for _ in 0..3 {
668    /// #     results.push(stream.next().await.unwrap());
669    /// # }
670    /// # results.sort();
671    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
672    /// # }));
673    /// # }
674    /// ```
675    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
676    where
677        F: Fn(V) -> U + 'a,
678    {
679        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
680        let map_f = q!({
681            let orig = f;
682            move |(k, v)| (k, orig(v))
683        })
684        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
685        .into();
686
687        KeyedStream::new(
688            self.location.clone(),
689            HydroNode::Map {
690                f: map_f,
691                singleton_refs: Vec::new(),
692                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
693                metadata: self
694                    .location
695                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
696            },
697        )
698    }
699
700    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
701    /// re-grouped even they are tuples; instead they will be grouped under the original key.
702    ///
703    /// If you do not want to modify the stream and instead only want to view
704    /// each item use [`KeyedStream::inspect_with_key`] instead.
705    ///
706    /// # Example
707    /// ```rust
708    /// # #[cfg(feature = "deploy")] {
709    /// # use hydro_lang::prelude::*;
710    /// # use futures::StreamExt;
711    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
712    /// process
713    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
714    ///     .into_keyed()
715    ///     .map_with_key(q!(|(k, v)| k + v))
716    /// #   .entries()
717    /// # }, |mut stream| async move {
718    /// // { 1: [3, 4], 2: [6] }
719    /// # let mut results = Vec::new();
720    /// # for _ in 0..3 {
721    /// #     results.push(stream.next().await.unwrap());
722    /// # }
723    /// # results.sort();
724    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
725    /// # }));
726    /// # }
727    /// ```
728    pub fn map_with_key<U, F>(
729        self,
730        f: impl IntoQuotedMut<'a, F, L> + Copy,
731    ) -> KeyedStream<K, U, L, B, O, R>
732    where
733        F: Fn((K, V)) -> U + 'a,
734        K: Clone,
735    {
736        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
737        let map_f = q!({
738            let orig = f;
739            move |(k, v)| {
740                let out = orig((Clone::clone(&k), v));
741                (k, out)
742            }
743        })
744        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
745        .into();
746
747        KeyedStream::new(
748            self.location.clone(),
749            HydroNode::Map {
750                f: map_f,
751                singleton_refs: Vec::new(),
752                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
753                metadata: self
754                    .location
755                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
756            },
757        )
758    }
759
760    /// Prepends a new value to the key of each element in the stream, producing a new
761    /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
762    /// occurs and the elements in each group preserve their original order.
763    ///
764    /// # Example
765    /// ```rust
766    /// # #[cfg(feature = "deploy")] {
767    /// # use hydro_lang::prelude::*;
768    /// # use futures::StreamExt;
769    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
770    /// process
771    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
772    ///     .into_keyed()
773    ///     .prefix_key(q!(|&(k, _)| k % 2))
774    /// #   .entries()
775    /// # }, |mut stream| async move {
776    /// // { (1, 1): [2, 3], (0, 2): [4] }
777    /// # let mut results = Vec::new();
778    /// # for _ in 0..3 {
779    /// #     results.push(stream.next().await.unwrap());
780    /// # }
781    /// # results.sort();
782    /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
783    /// # }));
784    /// # }
785    /// ```
786    pub fn prefix_key<K2, F>(
787        self,
788        f: impl IntoQuotedMut<'a, F, L> + Copy,
789    ) -> KeyedStream<(K2, K), V, L, B, O, R>
790    where
791        F: Fn(&(K, V)) -> K2 + 'a,
792    {
793        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
794        let map_f = q!({
795            let orig = f;
796            move |kv| {
797                let out = orig(&kv);
798                ((out, kv.0), kv.1)
799            }
800        })
801        .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
802        .into();
803
804        KeyedStream::new(
805            self.location.clone(),
806            HydroNode::Map {
807                f: map_f,
808                singleton_refs: Vec::new(),
809                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
810                metadata: self
811                    .location
812                    .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
813            },
814        )
815    }
816
817    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
818    /// `f`, preserving the order of the elements within the group.
819    ///
820    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
821    /// not modify or take ownership of the values. If you need to modify the values while filtering
822    /// use [`KeyedStream::filter_map`] instead.
823    ///
824    /// # Example
825    /// ```rust
826    /// # #[cfg(feature = "deploy")] {
827    /// # use hydro_lang::prelude::*;
828    /// # use futures::StreamExt;
829    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
830    /// process
831    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
832    ///     .into_keyed()
833    ///     .filter(q!(|&x| x > 2))
834    /// #   .entries()
835    /// # }, |mut stream| async move {
836    /// // { 1: [3], 2: [4] }
837    /// # let mut results = Vec::new();
838    /// # for _ in 0..2 {
839    /// #     results.push(stream.next().await.unwrap());
840    /// # }
841    /// # results.sort();
842    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
843    /// # }));
844    /// # }
845    /// ```
846    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
847    where
848        F: Fn(&V) -> bool + 'a,
849    {
850        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
851        let filter_f = q!({
852            let orig = f;
853            move |t: &(_, _)| orig(&t.1)
854        })
855        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
856        .into();
857
858        KeyedStream::new(
859            self.location.clone(),
860            HydroNode::Filter {
861                f: filter_f,
862                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
863                metadata: self.location.new_node_metadata(Self::collection_kind()),
864            },
865        )
866    }
867
868    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
869    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
870    ///
871    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
872    /// not modify or take ownership of the values. If you need to modify the values while filtering
873    /// use [`KeyedStream::filter_map_with_key`] instead.
874    ///
875    /// # Example
876    /// ```rust
877    /// # #[cfg(feature = "deploy")] {
878    /// # use hydro_lang::prelude::*;
879    /// # use futures::StreamExt;
880    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
881    /// process
882    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
883    ///     .into_keyed()
884    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
885    /// #   .entries()
886    /// # }, |mut stream| async move {
887    /// // { 1: [3], 2: [4] }
888    /// # let mut results = Vec::new();
889    /// # for _ in 0..2 {
890    /// #     results.push(stream.next().await.unwrap());
891    /// # }
892    /// # results.sort();
893    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
894    /// # }));
895    /// # }
896    /// ```
897    pub fn filter_with_key<F>(
898        self,
899        f: impl IntoQuotedMut<'a, F, L> + Copy,
900    ) -> KeyedStream<K, V, L, B, O, R>
901    where
902        F: Fn(&(K, V)) -> bool + 'a,
903    {
904        let filter_f = f
905            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
906            .into();
907
908        KeyedStream::new(
909            self.location.clone(),
910            HydroNode::Filter {
911                f: filter_f,
912                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
913                metadata: self.location.new_node_metadata(Self::collection_kind()),
914            },
915        )
916    }
917
918    /// An operator that both filters and maps each value, with keys staying the same.
919    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
920    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
921    ///
922    /// # Example
923    /// ```rust
924    /// # #[cfg(feature = "deploy")] {
925    /// # use hydro_lang::prelude::*;
926    /// # use futures::StreamExt;
927    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
928    /// process
929    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
930    ///     .into_keyed()
931    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
932    /// #   .entries()
933    /// # }, |mut stream| async move {
934    /// // { 1: [2], 2: [4] }
935    /// # let mut results = Vec::new();
936    /// # for _ in 0..2 {
937    /// #     results.push(stream.next().await.unwrap());
938    /// # }
939    /// # results.sort();
940    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
941    /// # }));
942    /// # }
943    /// ```
944    pub fn filter_map<U, F>(
945        self,
946        f: impl IntoQuotedMut<'a, F, L> + Copy,
947    ) -> KeyedStream<K, U, L, B, O, R>
948    where
949        F: Fn(V) -> Option<U> + 'a,
950    {
951        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
952        let filter_map_f = q!({
953            let orig = f;
954            move |(k, v)| orig(v).map(|o| (k, o))
955        })
956        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
957        .into();
958
959        KeyedStream::new(
960            self.location.clone(),
961            HydroNode::FilterMap {
962                f: filter_map_f,
963                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
964                metadata: self
965                    .location
966                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
967            },
968        )
969    }
970
971    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
972    /// re-grouped even they are tuples; instead they will be grouped under the original key.
973    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
974    ///
975    /// # Example
976    /// ```rust
977    /// # #[cfg(feature = "deploy")] {
978    /// # use hydro_lang::prelude::*;
979    /// # use futures::StreamExt;
980    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
981    /// process
982    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
983    ///     .into_keyed()
984    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
985    /// #   .entries()
986    /// # }, |mut stream| async move {
987    /// // { 2: [2] }
988    /// # let mut results = Vec::new();
989    /// # for _ in 0..1 {
990    /// #     results.push(stream.next().await.unwrap());
991    /// # }
992    /// # results.sort();
993    /// # assert_eq!(results, vec![(2, 2)]);
994    /// # }));
995    /// # }
996    /// ```
997    pub fn filter_map_with_key<U, F>(
998        self,
999        f: impl IntoQuotedMut<'a, F, L> + Copy,
1000    ) -> KeyedStream<K, U, L, B, O, R>
1001    where
1002        F: Fn((K, V)) -> Option<U> + 'a,
1003        K: Clone,
1004    {
1005        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1006        let filter_map_f = q!({
1007            let orig = f;
1008            move |(k, v)| {
1009                let out = orig((Clone::clone(&k), v));
1010                out.map(|o| (k, o))
1011            }
1012        })
1013        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1014        .into();
1015
1016        KeyedStream::new(
1017            self.location.clone(),
1018            HydroNode::FilterMap {
1019                f: filter_map_f,
1020                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1021                metadata: self
1022                    .location
1023                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1024            },
1025        )
1026    }
1027
1028    /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
1029    /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
1030    /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
1031    ///
1032    /// # Example
1033    /// ```rust
1034    /// # #[cfg(feature = "deploy")] {
1035    /// # use hydro_lang::prelude::*;
1036    /// # use futures::StreamExt;
1037    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1038    /// let tick = process.tick();
1039    /// let batch = process
1040    ///   .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
1041    ///   .into_keyed()
1042    ///   .batch(&tick, nondet!(/** test */));
1043    /// let count = batch.clone().entries().count(); // `count()` returns a singleton
1044    /// batch.cross_singleton(count).all_ticks().entries()
1045    /// # }, |mut stream| async move {
1046    /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
1047    /// # let mut results = Vec::new();
1048    /// # for _ in 0..3 {
1049    /// #     results.push(stream.next().await.unwrap());
1050    /// # }
1051    /// # results.sort();
1052    /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
1053    /// # }));
1054    /// # }
1055    /// ```
1056    pub fn cross_singleton<O2>(
1057        self,
1058        other: impl Into<Optional<O2, L, Bounded>>,
1059    ) -> KeyedStream<K, (V, O2), L, B, O, R>
1060    where
1061        O2: Clone,
1062    {
1063        let other: Optional<O2, L, Bounded> = other.into();
1064        check_matching_location(&self.location, &other.location);
1065
1066        Stream::new(
1067            self.location.clone(),
1068            HydroNode::CrossSingleton {
1069                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1070                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1071                metadata: self
1072                    .location
1073                    .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
1074            },
1075        )
1076        .map(q!(|((k, v), o2)| (k, (v, o2))))
1077        .into_keyed()
1078    }
1079
1080    /// For each value `v` in each group, transform `v` using `f` and then treat the
1081    /// result as an [`Iterator`] to produce values one by one within the same group.
1082    /// The implementation for [`Iterator`] for the output type `I` must produce items
1083    /// in a **deterministic** order.
1084    ///
1085    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
1086    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
1087    ///
1088    /// # Example
1089    /// ```rust
1090    /// # #[cfg(feature = "deploy")] {
1091    /// # use hydro_lang::prelude::*;
1092    /// # use futures::StreamExt;
1093    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1094    /// process
1095    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1096    ///     .into_keyed()
1097    ///     .flat_map_ordered(q!(|x| x))
1098    /// #   .entries()
1099    /// # }, |mut stream| async move {
1100    /// // { 1: [2, 3, 4], 2: [5, 6] }
1101    /// # let mut results = Vec::new();
1102    /// # for _ in 0..5 {
1103    /// #     results.push(stream.next().await.unwrap());
1104    /// # }
1105    /// # results.sort();
1106    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1107    /// # }));
1108    /// # }
1109    /// ```
1110    pub fn flat_map_ordered<U, I, F>(
1111        self,
1112        f: impl IntoQuotedMut<'a, F, L> + Copy,
1113    ) -> KeyedStream<K, U, L, B, O, R>
1114    where
1115        I: IntoIterator<Item = U>,
1116        F: Fn(V) -> I + 'a,
1117        K: Clone,
1118    {
1119        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1120        let flat_map_f = q!({
1121            let orig = f;
1122            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1123        })
1124        .splice_fn1_ctx::<(K, V), _>(&self.location)
1125        .into();
1126
1127        KeyedStream::new(
1128            self.location.clone(),
1129            HydroNode::FlatMap {
1130                f: flat_map_f,
1131                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1132                metadata: self
1133                    .location
1134                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1135            },
1136        )
1137    }
1138
1139    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1140    /// for the output type `I` to produce items in any order.
1141    ///
1142    /// # Example
1143    /// ```rust
1144    /// # #[cfg(feature = "deploy")] {
1145    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1146    /// # use futures::StreamExt;
1147    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1148    /// process
1149    ///     .source_iter(q!(vec![
1150    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1151    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1152    ///     ]))
1153    ///     .into_keyed()
1154    ///     .flat_map_unordered(q!(|x| x))
1155    /// #   .entries()
1156    /// # }, |mut stream| async move {
1157    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1158    /// # let mut results = Vec::new();
1159    /// # for _ in 0..4 {
1160    /// #     results.push(stream.next().await.unwrap());
1161    /// # }
1162    /// # results.sort();
1163    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1164    /// # }));
1165    /// # }
1166    /// ```
1167    pub fn flat_map_unordered<U, I, F>(
1168        self,
1169        f: impl IntoQuotedMut<'a, F, L> + Copy,
1170    ) -> KeyedStream<K, U, L, B, NoOrder, R>
1171    where
1172        I: IntoIterator<Item = U>,
1173        F: Fn(V) -> I + 'a,
1174        K: Clone,
1175    {
1176        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1177        let flat_map_f = q!({
1178            let orig = f;
1179            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1180        })
1181        .splice_fn1_ctx::<(K, V), _>(&self.location)
1182        .into();
1183
1184        KeyedStream::new(
1185            self.location.clone(),
1186            HydroNode::FlatMap {
1187                f: flat_map_f,
1188                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1189                metadata: self
1190                    .location
1191                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1192            },
1193        )
1194    }
1195
1196    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1197    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1198    /// items in a **deterministic** order.
1199    ///
1200    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1201    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1202    ///
1203    /// # Example
1204    /// ```rust
1205    /// # #[cfg(feature = "deploy")] {
1206    /// # use hydro_lang::prelude::*;
1207    /// # use futures::StreamExt;
1208    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1209    /// process
1210    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1211    ///     .into_keyed()
1212    ///     .flatten_ordered()
1213    /// #   .entries()
1214    /// # }, |mut stream| async move {
1215    /// // { 1: [2, 3, 4], 2: [5, 6] }
1216    /// # let mut results = Vec::new();
1217    /// # for _ in 0..5 {
1218    /// #     results.push(stream.next().await.unwrap());
1219    /// # }
1220    /// # results.sort();
1221    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1222    /// # }));
1223    /// # }
1224    /// ```
1225    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1226    where
1227        V: IntoIterator<Item = U>,
1228        K: Clone,
1229    {
1230        self.flat_map_ordered(q!(|d| d))
1231    }
1232
1233    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1234    /// for the value type `V` to produce items in any order.
1235    ///
1236    /// # Example
1237    /// ```rust
1238    /// # #[cfg(feature = "deploy")] {
1239    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1240    /// # use futures::StreamExt;
1241    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1242    /// process
1243    ///     .source_iter(q!(vec![
1244    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1245    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1246    ///     ]))
1247    ///     .into_keyed()
1248    ///     .flatten_unordered()
1249    /// #   .entries()
1250    /// # }, |mut stream| async move {
1251    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1252    /// # let mut results = Vec::new();
1253    /// # for _ in 0..4 {
1254    /// #     results.push(stream.next().await.unwrap());
1255    /// # }
1256    /// # results.sort();
1257    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1258    /// # }));
1259    /// # }
1260    /// ```
1261    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1262    where
1263        V: IntoIterator<Item = U>,
1264        K: Clone,
1265    {
1266        self.flat_map_unordered(q!(|d| d))
1267    }
1268
1269    /// An operator which allows you to "inspect" each element of a stream without
1270    /// modifying it. The closure `f` is called on a reference to each value. This is
1271    /// mainly useful for debugging, and should not be used to generate side-effects.
1272    ///
1273    /// # Example
1274    /// ```rust
1275    /// # #[cfg(feature = "deploy")] {
1276    /// # use hydro_lang::prelude::*;
1277    /// # use futures::StreamExt;
1278    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1279    /// process
1280    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1281    ///     .into_keyed()
1282    ///     .inspect(q!(|v| println!("{}", v)))
1283    /// #   .entries()
1284    /// # }, |mut stream| async move {
1285    /// # let mut results = Vec::new();
1286    /// # for _ in 0..3 {
1287    /// #     results.push(stream.next().await.unwrap());
1288    /// # }
1289    /// # results.sort();
1290    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1291    /// # }));
1292    /// # }
1293    /// ```
1294    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1295    where
1296        F: Fn(&V) + 'a,
1297    {
1298        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1299        let inspect_f = q!({
1300            let orig = f;
1301            move |t: &(_, _)| orig(&t.1)
1302        })
1303        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1304        .into();
1305
1306        KeyedStream::new(
1307            self.location.clone(),
1308            HydroNode::Inspect {
1309                f: inspect_f,
1310                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1311                metadata: self.location.new_node_metadata(Self::collection_kind()),
1312            },
1313        )
1314    }
1315
1316    /// An operator which allows you to "inspect" each element of a stream without
1317    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1318    /// mainly useful for debugging, and should not be used to generate side-effects.
1319    ///
1320    /// # Example
1321    /// ```rust
1322    /// # #[cfg(feature = "deploy")] {
1323    /// # use hydro_lang::prelude::*;
1324    /// # use futures::StreamExt;
1325    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1326    /// process
1327    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1328    ///     .into_keyed()
1329    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1330    /// #   .entries()
1331    /// # }, |mut stream| async move {
1332    /// # let mut results = Vec::new();
1333    /// # for _ in 0..3 {
1334    /// #     results.push(stream.next().await.unwrap());
1335    /// # }
1336    /// # results.sort();
1337    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1338    /// # }));
1339    /// # }
1340    /// ```
1341    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1342    where
1343        F: Fn(&(K, V)) + 'a,
1344    {
1345        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1346
1347        KeyedStream::new(
1348            self.location.clone(),
1349            HydroNode::Inspect {
1350                f: inspect_f,
1351                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1352                metadata: self.location.new_node_metadata(Self::collection_kind()),
1353            },
1354        )
1355    }
1356
1357    /// An operator which allows you to "name" a `HydroNode`.
1358    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1359    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1360        {
1361            let mut node = self.ir_node.borrow_mut();
1362            let metadata = node.metadata_mut();
1363            metadata.tag = Some(name.to_owned());
1364        }
1365        self
1366    }
1367
1368    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1369    ///
1370    /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1371    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1372    /// early by returning `None`.
1373    ///
1374    /// The function takes a mutable reference to the accumulator and the current element, and returns
1375    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1376    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1377    ///
1378    /// # Example
1379    /// ```rust
1380    /// # #[cfg(feature = "deploy")] {
1381    /// # use hydro_lang::prelude::*;
1382    /// # use futures::StreamExt;
1383    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1384    /// process
1385    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1386    ///     .into_keyed()
1387    ///     .scan(
1388    ///         q!(|| 0),
1389    ///         q!(|acc, x| {
1390    ///             *acc += x;
1391    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1392    ///         }),
1393    ///     )
1394    /// #   .entries()
1395    /// # }, |mut stream| async move {
1396    /// // Output: { 0: [1], 1: [3, 7] }
1397    /// # let mut results = Vec::new();
1398    /// # for _ in 0..3 {
1399    /// #     results.push(stream.next().await.unwrap());
1400    /// # }
1401    /// # results.sort();
1402    /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1403    /// # }));
1404    /// # }
1405    /// ```
1406    pub fn scan<A, U, I, F>(
1407        self,
1408        init: impl IntoQuotedMut<'a, I, L> + Copy,
1409        f: impl IntoQuotedMut<'a, F, L> + Copy,
1410    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1411    where
1412        O: IsOrdered,
1413        R: IsExactlyOnce,
1414        K: Clone + Eq + Hash,
1415        I: Fn() -> A + 'a,
1416        F: Fn(&mut A, V) -> Option<U> + 'a,
1417    {
1418        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1419        self.make_totally_ordered().make_exactly_once().generator(
1420            init,
1421            q!({
1422                let orig = f;
1423                move |state, v| {
1424                    if let Some(out) = orig(state, v) {
1425                        Generate::Yield(out)
1426                    } else {
1427                        Generate::Break
1428                    }
1429                }
1430            }),
1431        )
1432    }
1433
1434    /// Iteratively processes the elements in each group using a state machine that can yield
1435    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1436    /// syntax in Rust, without requiring special syntax.
1437    ///
1438    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1439    /// state for each group. The second argument defines the processing logic, taking in a
1440    /// mutable reference to the group's state and the value to be processed. It emits a
1441    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1442    /// should be processed.
1443    ///
1444    /// # Example
1445    /// ```rust
1446    /// # #[cfg(feature = "deploy")] {
1447    /// # use hydro_lang::prelude::*;
1448    /// # use futures::StreamExt;
1449    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1450    /// process
1451    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1452    ///     .into_keyed()
1453    ///     .generator(
1454    ///         q!(|| 0),
1455    ///         q!(|acc, x| {
1456    ///             *acc += x;
1457    ///             if *acc > 100 {
1458    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1459    ///                     "done!".to_owned()
1460    ///                 )
1461    ///             } else if *acc % 2 == 0 {
1462    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1463    ///                     "even".to_owned()
1464    ///                 )
1465    ///             } else {
1466    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1467    ///             }
1468    ///         }),
1469    ///     )
1470    /// #   .entries()
1471    /// # }, |mut stream| async move {
1472    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1473    /// # let mut results = Vec::new();
1474    /// # for _ in 0..3 {
1475    /// #     results.push(stream.next().await.unwrap());
1476    /// # }
1477    /// # results.sort();
1478    /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1479    /// # }));
1480    /// # }
1481    /// ```
1482    pub fn generator<A, U, I, F>(
1483        self,
1484        init: impl IntoQuotedMut<'a, I, L> + Copy,
1485        f: impl IntoQuotedMut<'a, F, L> + Copy,
1486    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1487    where
1488        O: IsOrdered,
1489        R: IsExactlyOnce,
1490        K: Clone + Eq + Hash,
1491        I: Fn() -> A + 'a,
1492        F: Fn(&mut A, V) -> Generate<U> + 'a,
1493    {
1494        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1495        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1496
1497        let this = self.make_totally_ordered().make_exactly_once();
1498
1499        let scan_init = q!(|| HashMap::new())
1500            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1501            .into();
1502        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1503            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1504            if let Some(existing_state_value) = existing_state {
1505                match f(existing_state_value, v) {
1506                    Generate::Yield(out) => Some(Some((k, out))),
1507                    Generate::Return(out) => {
1508                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1509                        Some(Some((k, out)))
1510                    }
1511                    Generate::Break => {
1512                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1513                        Some(None)
1514                    }
1515                    Generate::Continue => Some(None),
1516                }
1517            } else {
1518                Some(None)
1519            }
1520        })
1521        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1522        .into();
1523
1524        let scan_node = HydroNode::Scan {
1525            init: scan_init,
1526            acc: scan_f,
1527            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1528            metadata: this.location.new_node_metadata(Stream::<
1529                Option<(K, U)>,
1530                L,
1531                B,
1532                TotalOrder,
1533                ExactlyOnce,
1534            >::collection_kind()),
1535        };
1536
1537        let flatten_f = q!(|d| d)
1538            .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1539            .into();
1540        let flatten_node = HydroNode::FlatMap {
1541            f: flatten_f,
1542            input: Box::new(scan_node),
1543            metadata: this.location.new_node_metadata(KeyedStream::<
1544                K,
1545                U,
1546                L,
1547                B,
1548                TotalOrder,
1549                ExactlyOnce,
1550            >::collection_kind()),
1551        };
1552
1553        KeyedStream::new(this.location.clone(), flatten_node)
1554    }
1555
1556    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1557    /// in-order across the values in each group. But the aggregation function returns a boolean,
1558    /// which when true indicates that the aggregated result is complete and can be released to
1559    /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1560    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1561    /// normal stream elements.
1562    ///
1563    /// # Example
1564    /// ```rust
1565    /// # #[cfg(feature = "deploy")] {
1566    /// # use hydro_lang::prelude::*;
1567    /// # use futures::StreamExt;
1568    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1569    /// process
1570    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1571    ///     .into_keyed()
1572    ///     .fold_early_stop(
1573    ///         q!(|| 0),
1574    ///         q!(|acc, x| {
1575    ///             *acc += x;
1576    ///             x % 2 == 0
1577    ///         }),
1578    ///     )
1579    /// #   .entries()
1580    /// # }, |mut stream| async move {
1581    /// // Output: { 0: 2, 1: 9 }
1582    /// # let mut results = Vec::new();
1583    /// # for _ in 0..2 {
1584    /// #     results.push(stream.next().await.unwrap());
1585    /// # }
1586    /// # results.sort();
1587    /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1588    /// # }));
1589    /// # }
1590    /// ```
1591    pub fn fold_early_stop<A, I, F>(
1592        self,
1593        init: impl IntoQuotedMut<'a, I, L> + Copy,
1594        f: impl IntoQuotedMut<'a, F, L> + Copy,
1595    ) -> KeyedSingleton<K, A, L, B::WithBoundedValue>
1596    where
1597        O: IsOrdered,
1598        R: IsExactlyOnce,
1599        K: Clone + Eq + Hash,
1600        I: Fn() -> A + 'a,
1601        F: Fn(&mut A, V) -> bool + 'a,
1602    {
1603        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1604        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1605        let out_without_bound_cast = self.generator(
1606            q!(move || Some(init())),
1607            q!(move |key_state, v| {
1608                if let Some(key_state_value) = key_state.as_mut() {
1609                    if f(key_state_value, v) {
1610                        Generate::Return(key_state.take().unwrap())
1611                    } else {
1612                        Generate::Continue
1613                    }
1614                } else {
1615                    unreachable!()
1616                }
1617            }),
1618        );
1619
1620        // SAFETY: The generator will only ever return at most one value per key, since once it
1621        // returns a value for a key it will never process any more values for that key.
1622        out_without_bound_cast.cast_at_most_one_entry_per_key()
1623    }
1624
1625    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1626    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1627    /// otherwise the first element would be non-deterministic.
1628    ///
1629    /// # Example
1630    /// ```rust
1631    /// # #[cfg(feature = "deploy")] {
1632    /// # use hydro_lang::prelude::*;
1633    /// # use futures::StreamExt;
1634    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1635    /// process
1636    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1637    ///     .into_keyed()
1638    ///     .first()
1639    /// #   .entries()
1640    /// # }, |mut stream| async move {
1641    /// // Output: { 0: 2, 1: 3 }
1642    /// # let mut results = Vec::new();
1643    /// # for _ in 0..2 {
1644    /// #     results.push(stream.next().await.unwrap());
1645    /// # }
1646    /// # results.sort();
1647    /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1648    /// # }));
1649    /// # }
1650    /// ```
1651    pub fn first(self) -> KeyedSingleton<K, V, L, B::WithBoundedValue>
1652    where
1653        O: IsOrdered,
1654        R: IsExactlyOnce,
1655        K: Clone + Eq + Hash,
1656    {
1657        self.fold_early_stop(
1658            q!(|| None),
1659            q!(|acc, v| {
1660                *acc = Some(v);
1661                true
1662            }),
1663        )
1664        .map(q!(|v| v.unwrap()))
1665    }
1666
1667    /// Returns a keyed stream containing at most the first `n` values per key,
1668    /// preserving the original order within each group. Similar to SQL `LIMIT`
1669    /// applied per group.
1670    ///
1671    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1672    /// retries, since the result depends on the order and cardinality of elements
1673    /// within each group.
1674    ///
1675    /// # Example
1676    /// ```rust
1677    /// # #[cfg(feature = "deploy")] {
1678    /// # use hydro_lang::prelude::*;
1679    /// # use futures::StreamExt;
1680    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1681    /// process
1682    ///     .source_iter(q!(vec![(1, 10), (1, 20), (1, 30), (2, 40), (2, 50)]))
1683    ///     .into_keyed()
1684    ///     .limit(q!(2))
1685    /// #   .entries()
1686    /// # }, |mut stream| async move {
1687    /// // { 1: [10, 20], 2: [40, 50] }
1688    /// # let mut results = Vec::new();
1689    /// # for _ in 0..4 {
1690    /// #     results.push(stream.next().await.unwrap());
1691    /// # }
1692    /// # results.sort();
1693    /// # assert_eq!(results, vec![(1, 10), (1, 20), (2, 40), (2, 50)]);
1694    /// # }));
1695    /// # }
1696    /// ```
1697    pub fn limit(
1698        self,
1699        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1700    ) -> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1701    where
1702        O: IsOrdered,
1703        R: IsExactlyOnce,
1704        K: Clone + Eq + Hash,
1705    {
1706        self.generator(
1707            q!(|| 0usize),
1708            q!(move |count, item| {
1709                if *count == n {
1710                    Generate::Break
1711                } else {
1712                    *count += 1;
1713                    if *count == n {
1714                        Generate::Return(item)
1715                    } else {
1716                        Generate::Yield(item)
1717                    }
1718                }
1719            }),
1720        )
1721    }
1722
1723    /// Assigns a zero-based index to each value within each key group, emitting
1724    /// `(K, (index, V))` tuples with per-key sequential indices.
1725    ///
1726    /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1727    /// This is a streaming operator that processes elements as they arrive.
1728    ///
1729    /// # Example
1730    /// ```rust
1731    /// # #[cfg(feature = "deploy")] {
1732    /// # use hydro_lang::prelude::*;
1733    /// # use futures::StreamExt;
1734    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1735    /// process
1736    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1737    ///     .into_keyed()
1738    ///     .enumerate()
1739    /// # .entries()
1740    /// # }, |mut stream| async move {
1741    /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1742    /// # let mut results = Vec::new();
1743    /// # for _ in 0..3 {
1744    /// #     results.push(stream.next().await.unwrap());
1745    /// # }
1746    /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1747    /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1748    /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1749    /// # assert_eq!(key2, vec![(0, 20)]);
1750    /// # }));
1751    /// # }
1752    /// ```
1753    pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1754    where
1755        O: IsOrdered,
1756        R: IsExactlyOnce,
1757        K: Eq + Hash + Clone,
1758    {
1759        self.scan(
1760            q!(|| 0),
1761            q!(|acc, next| {
1762                let curr = *acc;
1763                *acc += 1;
1764                Some((curr, next))
1765            }),
1766        )
1767    }
1768
1769    /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1770    ///
1771    /// # Example
1772    /// ```rust
1773    /// # #[cfg(feature = "deploy")] {
1774    /// # use hydro_lang::prelude::*;
1775    /// # use futures::StreamExt;
1776    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1777    /// let tick = process.tick();
1778    /// let numbers = process
1779    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1780    ///     .into_keyed();
1781    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1782    /// batch
1783    ///     .value_counts()
1784    ///     .entries()
1785    ///     .all_ticks()
1786    /// # }, |mut stream| async move {
1787    /// // (1, 3), (2, 2)
1788    /// # let mut results = Vec::new();
1789    /// # for _ in 0..2 {
1790    /// #     results.push(stream.next().await.unwrap());
1791    /// # }
1792    /// # results.sort();
1793    /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1794    /// # }));
1795    /// # }
1796    /// ```
1797    pub fn value_counts(
1798        self,
1799    ) -> KeyedSingleton<K, usize, L, <B as KeyedSingletonBound>::KeyedStreamToMonotone>
1800    where
1801        R: IsExactlyOnce,
1802        K: Eq + Hash,
1803    {
1804        self.make_exactly_once()
1805            .assume_ordering_trusted(
1806                nondet!(/** ordering within each group affects neither result nor intermediates */),
1807            )
1808            .fold(
1809                q!(|| 0),
1810                q!(
1811                    |acc, _| *acc += 1,
1812                    monotone = manual_proof!(/** += 1 is monotonic */)
1813                ),
1814            )
1815    }
1816
1817    /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1818    /// group via the `comb` closure.
1819    ///
1820    /// Depending on the input stream guarantees, the closure may need to be commutative
1821    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1822    ///
1823    /// If the input and output value types are the same and do not require initialization then use
1824    /// [`KeyedStream::reduce`].
1825    ///
1826    /// # Example
1827    /// ```rust
1828    /// # #[cfg(feature = "deploy")] {
1829    /// # use hydro_lang::prelude::*;
1830    /// # use futures::StreamExt;
1831    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1832    /// let tick = process.tick();
1833    /// let numbers = process
1834    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1835    ///     .into_keyed();
1836    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1837    /// batch
1838    ///     .fold(q!(|| false), q!(|acc, x| *acc |= x))
1839    ///     .entries()
1840    ///     .all_ticks()
1841    /// # }, |mut stream| async move {
1842    /// // (1, false), (2, true)
1843    /// # let mut results = Vec::new();
1844    /// # for _ in 0..2 {
1845    /// #     results.push(stream.next().await.unwrap());
1846    /// # }
1847    /// # results.sort();
1848    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1849    /// # }));
1850    /// # }
1851    /// ```
1852    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp, M, B2: KeyedSingletonBound>(
1853        self,
1854        init: impl IntoQuotedMut<'a, I, L>,
1855        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1856    ) -> KeyedSingleton<K, A, L, B2>
1857    where
1858        K: Eq + Hash,
1859        C: ValidCommutativityFor<O>,
1860        Idemp: ValidIdempotenceFor<R>,
1861        B: ApplyMonotoneKeyedStream<M, B2>,
1862    {
1863        let init = init.splice_fn0_ctx(&self.location).into();
1864        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1865        proof.register_proof(&comb);
1866
1867        let retried = self
1868            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */));
1869
1870        KeyedSingleton::new(
1871            retried.location.clone(),
1872            HydroNode::FoldKeyed {
1873                init,
1874                acc: comb.into(),
1875                input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1876                metadata: retried
1877                    .location
1878                    .new_node_metadata(KeyedSingleton::<K, A, L, B2>::collection_kind()),
1879            },
1880        )
1881    }
1882
1883    /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1884    /// group via the `comb` closure.
1885    ///
1886    /// Depending on the input stream guarantees, the closure may need to be commutative
1887    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1888    ///
1889    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1890    ///
1891    /// # Example
1892    /// ```rust
1893    /// # #[cfg(feature = "deploy")] {
1894    /// # use hydro_lang::prelude::*;
1895    /// # use futures::StreamExt;
1896    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1897    /// let tick = process.tick();
1898    /// let numbers = process
1899    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1900    ///     .into_keyed();
1901    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1902    /// batch
1903    ///     .reduce(q!(|acc, x| *acc |= x))
1904    ///     .entries()
1905    ///     .all_ticks()
1906    /// # }, |mut stream| async move {
1907    /// // (1, false), (2, true)
1908    /// # let mut results = Vec::new();
1909    /// # for _ in 0..2 {
1910    /// #     results.push(stream.next().await.unwrap());
1911    /// # }
1912    /// # results.sort();
1913    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1914    /// # }));
1915    /// # }
1916    /// ```
1917    pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1918        self,
1919        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1920    ) -> KeyedSingleton<K, V, L, B>
1921    where
1922        K: Eq + Hash,
1923        C: ValidCommutativityFor<O>,
1924        Idemp: ValidIdempotenceFor<R>,
1925    {
1926        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1927        proof.register_proof(&f);
1928
1929        let ordered = self
1930            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1931            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1932
1933        KeyedSingleton::new(
1934            ordered.location.clone(),
1935            HydroNode::ReduceKeyed {
1936                f: f.into(),
1937                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1938                metadata: ordered
1939                    .location
1940                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1941            },
1942        )
1943    }
1944
1945    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1946    /// are automatically deleted.
1947    ///
1948    /// Depending on the input stream guarantees, the closure may need to be commutative
1949    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1950    ///
1951    /// # Example
1952    /// ```rust
1953    /// # #[cfg(feature = "deploy")] {
1954    /// # use hydro_lang::prelude::*;
1955    /// # use futures::StreamExt;
1956    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1957    /// let tick = process.tick();
1958    /// let watermark = tick.singleton(q!(2));
1959    /// let numbers = process
1960    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1961    ///     .into_keyed();
1962    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1963    /// batch
1964    ///     .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1965    ///     .entries()
1966    ///     .all_ticks()
1967    /// # }, |mut stream| async move {
1968    /// // (2, true)
1969    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1970    /// # }));
1971    /// # }
1972    /// ```
1973    pub fn reduce_watermark<O2, F, C, Idemp>(
1974        self,
1975        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1976        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1977    ) -> KeyedSingleton<K, V, L, B>
1978    where
1979        K: Eq + Hash,
1980        O2: Clone,
1981        F: Fn(&mut V, V) + 'a,
1982        C: ValidCommutativityFor<O>,
1983        Idemp: ValidIdempotenceFor<R>,
1984    {
1985        let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1986        check_matching_location(&self.location.root(), other.location.outer());
1987        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1988        proof.register_proof(&f);
1989
1990        let ordered = self
1991            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1992            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1993
1994        KeyedSingleton::new(
1995            ordered.location.clone(),
1996            HydroNode::ReduceKeyedWatermark {
1997                f: f.into(),
1998                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1999                watermark: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2000                metadata: ordered
2001                    .location
2002                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
2003            },
2004        )
2005    }
2006
2007    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
2008    /// whose keys are not in the bounded stream.
2009    ///
2010    /// # Example
2011    /// ```rust
2012    /// # #[cfg(feature = "deploy")] {
2013    /// # use hydro_lang::prelude::*;
2014    /// # use futures::StreamExt;
2015    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2016    /// let tick = process.tick();
2017    /// let keyed_stream = process
2018    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2019    ///     .batch(&tick, nondet!(/** test */))
2020    ///     .into_keyed();
2021    /// let keys_to_remove = process
2022    ///     .source_iter(q!(vec![1, 2]))
2023    ///     .batch(&tick, nondet!(/** test */));
2024    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
2025    /// #   .entries()
2026    /// # }, |mut stream| async move {
2027    /// // { 3: ['c'], 4: ['d'] }
2028    /// # let mut results = Vec::new();
2029    /// # for _ in 0..2 {
2030    /// #     results.push(stream.next().await.unwrap());
2031    /// # }
2032    /// # results.sort();
2033    /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
2034    /// # }));
2035    /// # }
2036    /// ```
2037    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
2038        self,
2039        other: Stream<K, L, Bounded, O2, R2>,
2040    ) -> Self
2041    where
2042        K: Eq + Hash,
2043    {
2044        check_matching_location(&self.location, &other.location);
2045
2046        KeyedStream::new(
2047            self.location.clone(),
2048            HydroNode::AntiJoin {
2049                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2050                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2051                metadata: self.location.new_node_metadata(Self::collection_kind()),
2052            },
2053        )
2054    }
2055
2056    /// Emit a keyed stream containing keys shared between two keyed streams,
2057    /// where each value in the output keyed stream is a tuple of
2058    /// (self's value, other's value).
2059    /// If there are multiple values for the same key, this performs a cross product
2060    /// for each matching key.
2061    ///
2062    /// # Example
2063    /// ```rust
2064    /// # #[cfg(feature = "deploy")] {
2065    /// # use hydro_lang::prelude::*;
2066    /// # use futures::StreamExt;
2067    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2068    /// let tick = process.tick();
2069    /// let keyed_data = process
2070    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2071    ///     .into_keyed()
2072    ///     .batch(&tick, nondet!(/** test */));
2073    /// let other_data = process
2074    ///     .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
2075    ///     .into_keyed()
2076    ///     .batch(&tick, nondet!(/** test */));
2077    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
2078    /// # }, |mut stream| async move {
2079    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
2080    /// # let mut results = vec![];
2081    /// # for _ in 0..4 {
2082    /// #     results.push(stream.next().await.unwrap());
2083    /// # }
2084    /// # results.sort();
2085    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
2086    /// # }));
2087    /// # }
2088    /// ```
2089    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2090    pub fn join_keyed_stream<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2091        self,
2092        other: KeyedStream<K, V2, L, B2, O2, R2>,
2093    ) -> KeyedStream<
2094        K,
2095        (V, V2),
2096        L,
2097        B,
2098        B2::PreserveOrderIfBounded<NoOrder>,
2099        <R as MinRetries<R2>>::Min,
2100    >
2101    where
2102        K: Eq + Hash + Clone,
2103        R: MinRetries<R2>,
2104        V: Clone,
2105        V2: Clone,
2106    {
2107        self.entries().join(other.entries()).into_keyed()
2108    }
2109
2110    /// Deduplicates values within each key group, emitting each unique value per key
2111    /// exactly once.
2112    ///
2113    /// # Example
2114    /// ```rust
2115    /// # #[cfg(feature = "deploy")] {
2116    /// # use hydro_lang::prelude::*;
2117    /// # use futures::StreamExt;
2118    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2119    /// process
2120    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
2121    ///     .into_keyed()
2122    ///     .unique()
2123    /// # .entries()
2124    /// # }, |mut stream| async move {
2125    /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
2126    /// # let mut results = Vec::new();
2127    /// # for _ in 0..4 {
2128    /// #     results.push(stream.next().await.unwrap());
2129    /// # }
2130    /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2131    /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2132    /// # key1.sort();
2133    /// # key2.sort();
2134    /// # assert_eq!(key1, vec![10, 20]);
2135    /// # assert_eq!(key2, vec![20, 30]);
2136    /// # }));
2137    /// # }
2138    /// ```
2139    pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
2140    where
2141        K: Eq + Hash + Clone,
2142        V: Eq + Hash + Clone,
2143    {
2144        self.entries().unique().into_keyed()
2145    }
2146
2147    /// Sorts the values within each key group in ascending order.
2148    ///
2149    /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
2150    /// each group. This operator will block until all elements in the input stream
2151    /// are available, so it requires the input stream to be [`Bounded`].
2152    ///
2153    /// # Example
2154    /// ```rust
2155    /// # #[cfg(feature = "deploy")] {
2156    /// # use hydro_lang::prelude::*;
2157    /// # use futures::StreamExt;
2158    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2159    /// let tick = process.tick();
2160    /// let numbers = process
2161    ///     .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
2162    ///     .into_keyed();
2163    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2164    /// batch.sort().all_ticks()
2165    /// # .entries()
2166    /// # }, |mut stream| async move {
2167    /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
2168    /// # let mut results = Vec::new();
2169    /// # for _ in 0..4 {
2170    /// #     results.push(stream.next().await.unwrap());
2171    /// # }
2172    /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2173    /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2174    /// # assert_eq!(key1_vals, vec![1, 3]);
2175    /// # assert_eq!(key2_vals, vec![1, 2]);
2176    /// # }));
2177    /// # }
2178    /// ```
2179    pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
2180    where
2181        B: IsBounded,
2182        K: Ord,
2183        V: Ord,
2184    {
2185        self.entries().sort().into_keyed()
2186    }
2187
2188    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2189    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2190    /// is only present in one of the inputs, its values are passed through as-is). The output has
2191    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2192    ///
2193    /// Currently, both input streams must be [`Bounded`]. This operator will block
2194    /// on the first stream until all its elements are available. In a future version,
2195    /// we will relax the requirement on the `other` stream.
2196    ///
2197    /// # Example
2198    /// ```rust
2199    /// # #[cfg(feature = "deploy")] {
2200    /// # use hydro_lang::prelude::*;
2201    /// # use futures::StreamExt;
2202    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2203    /// let tick = process.tick();
2204    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2205    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2206    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2207    /// # .entries()
2208    /// # }, |mut stream| async move {
2209    /// // { 0: [2, 1], 1: [4, 3] }
2210    /// # let mut results = Vec::new();
2211    /// # for _ in 0..4 {
2212    /// #     results.push(stream.next().await.unwrap());
2213    /// # }
2214    /// # results.sort();
2215    /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2216    /// # }));
2217    /// # }
2218    /// ```
2219    pub fn chain<O2: Ordering, R2: Retries>(
2220        self,
2221        other: KeyedStream<K, V, L, Bounded, O2, R2>,
2222    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2223    where
2224        B: IsBounded,
2225        O: MinOrder<O2>,
2226        R: MinRetries<R2>,
2227    {
2228        let this = self.make_bounded();
2229        check_matching_location(&this.location, &other.location);
2230
2231        KeyedStream::new(
2232            this.location.clone(),
2233            HydroNode::Chain {
2234                first: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2235                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2236                metadata: this.location.new_node_metadata(KeyedStream::<
2237                    K,
2238                    V,
2239                    L,
2240                    Bounded,
2241                    <O as MinOrder<O2>>::Min,
2242                    <R as MinRetries<R2>>::Min,
2243                >::collection_kind()),
2244            },
2245        )
2246    }
2247
2248    /// Emit a keyed stream containing keys shared between the keyed stream and the
2249    /// keyed singleton, where each value in the output keyed stream is a tuple of
2250    /// (the keyed stream's value, the keyed singleton's value).
2251    ///
2252    /// # Example
2253    /// ```rust
2254    /// # #[cfg(feature = "deploy")] {
2255    /// # use hydro_lang::prelude::*;
2256    /// # use futures::StreamExt;
2257    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2258    /// let tick = process.tick();
2259    /// let keyed_data = process
2260    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2261    ///     .into_keyed()
2262    ///     .batch(&tick, nondet!(/** test */));
2263    /// let singleton_data = process
2264    ///     .source_iter(q!(vec![(1, 100), (2, 200)]))
2265    ///     .into_keyed()
2266    ///     .batch(&tick, nondet!(/** test */))
2267    ///     .first();
2268    /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2269    /// # }, |mut stream| async move {
2270    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2271    /// # let mut results = vec![];
2272    /// # for _ in 0..3 {
2273    /// #     results.push(stream.next().await.unwrap());
2274    /// # }
2275    /// # results.sort();
2276    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2277    /// # }));
2278    /// # }
2279    /// ```
2280    pub fn join_keyed_singleton<V2: Clone, B2: IsBounded>(
2281        self,
2282        other: KeyedSingleton<K, V2, L, B2>,
2283    ) -> KeyedStream<K, (V, V2), L, B, O, R>
2284    where
2285        K: Eq + Hash + Clone,
2286        V: Clone,
2287    {
2288        let ir_node = if B2::BOUNDED {
2289            HydroNode::JoinHalf {
2290                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2291                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2292                metadata: self
2293                    .location
2294                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2295            }
2296        } else {
2297            HydroNode::Join {
2298                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2299                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2300                metadata: self
2301                    .location
2302                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2303            }
2304        };
2305
2306        KeyedStream::new(self.location.clone(), ir_node)
2307    }
2308
2309    /// Gets the values associated with a specific key from the keyed stream.
2310    /// Returns an empty stream if the key is `None` or there are no associated values.
2311    ///
2312    /// # Example
2313    /// ```rust
2314    /// # #[cfg(feature = "deploy")] {
2315    /// # use hydro_lang::prelude::*;
2316    /// # use futures::StreamExt;
2317    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2318    /// let tick = process.tick();
2319    /// let keyed_data = process
2320    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2321    ///     .into_keyed()
2322    ///     .batch(&tick, nondet!(/** test */));
2323    /// let key = tick.singleton(q!(1));
2324    /// keyed_data.get(key).all_ticks()
2325    /// # }, |mut stream| async move {
2326    /// // 10, 11
2327    /// # let mut results = vec![];
2328    /// # for _ in 0..2 {
2329    /// #     results.push(stream.next().await.unwrap());
2330    /// # }
2331    /// # results.sort();
2332    /// # assert_eq!(results, vec![10, 11]);
2333    /// # }));
2334    /// # }
2335    /// ```
2336    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Stream<V, L, B, O, R>
2337    where
2338        K: Eq + Hash + Clone,
2339        V: Clone,
2340    {
2341        let joined =
2342            self.join_keyed_singleton(key.into().map(q!(|k| (k, ()))).into_keyed_singleton());
2343
2344        if O::ORDERING_KIND == StreamOrder::TotalOrder {
2345            joined
2346                .use_ordering_type::<TotalOrder>()
2347                .cast_at_most_one_key()
2348                .map(q!(|(_, (v, _))| v))
2349                .weaken_ordering()
2350        } else {
2351            joined.values().map(q!(|(v, _)| v)).use_ordering_type()
2352        }
2353    }
2354
2355    /// For each value in `self`, find the matching key in `lookup`.
2356    /// The output is a keyed stream with the key from `self`, and a value
2357    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2358    /// If the key is not present in `lookup`, the option will be [`None`].
2359    ///
2360    /// # Example
2361    /// ```rust
2362    /// # #[cfg(feature = "deploy")] {
2363    /// # use hydro_lang::prelude::*;
2364    /// # use futures::StreamExt;
2365    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2366    /// # let tick = process.tick();
2367    /// let requests = // { 1: [10, 11], 2: 20 }
2368    /// # process
2369    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2370    /// #     .into_keyed()
2371    /// #     .batch(&tick, nondet!(/** test */));
2372    /// let other_data = // { 10: 100, 11: 110 }
2373    /// # process
2374    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
2375    /// #     .into_keyed()
2376    /// #     .batch(&tick, nondet!(/** test */))
2377    /// #     .first();
2378    /// requests.lookup_keyed_singleton(other_data)
2379    /// # .entries().all_ticks()
2380    /// # }, |mut stream| async move {
2381    /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2382    /// # let mut results = vec![];
2383    /// # for _ in 0..3 {
2384    /// #     results.push(stream.next().await.unwrap());
2385    /// # }
2386    /// # results.sort();
2387    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2388    /// # }));
2389    /// # }
2390    /// ```
2391    pub fn lookup_keyed_singleton<V2>(
2392        self,
2393        lookup: KeyedSingleton<V, V2, L, Bounded>,
2394    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2395    where
2396        B: IsBounded,
2397        K: Eq + Hash + Clone,
2398        V: Eq + Hash + Clone,
2399        V2: Clone,
2400    {
2401        self.lookup_keyed_stream(
2402            lookup
2403                .into_keyed_stream()
2404                .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2405        )
2406    }
2407
2408    /// For each value in `self`, find the matching key in `lookup`.
2409    /// The output is a keyed stream with the key from `self`, and a value
2410    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2411    /// If the key is not present in `lookup`, the option will be [`None`].
2412    ///
2413    /// # Example
2414    /// ```rust
2415    /// # #[cfg(feature = "deploy")] {
2416    /// # use hydro_lang::prelude::*;
2417    /// # use futures::StreamExt;
2418    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2419    /// # let tick = process.tick();
2420    /// let requests = // { 1: [10, 11], 2: 20 }
2421    /// # process
2422    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2423    /// #     .into_keyed()
2424    /// #     .batch(&tick, nondet!(/** test */));
2425    /// let other_data = // { 10: [100, 101], 11: 110 }
2426    /// # process
2427    /// #     .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2428    /// #     .into_keyed()
2429    /// #     .batch(&tick, nondet!(/** test */));
2430    /// requests.lookup_keyed_stream(other_data)
2431    /// # .entries().all_ticks()
2432    /// # }, |mut stream| async move {
2433    /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2434    /// # let mut results = vec![];
2435    /// # for _ in 0..4 {
2436    /// #     results.push(stream.next().await.unwrap());
2437    /// # }
2438    /// # results.sort();
2439    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2440    /// # }));
2441    /// # }
2442    /// ```
2443    #[expect(clippy::type_complexity, reason = "retries propagation")]
2444    pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2445        self,
2446        lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2447    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2448    where
2449        B: IsBounded,
2450        K: Eq + Hash + Clone,
2451        V: Eq + Hash + Clone,
2452        V2: Clone,
2453        R: MinRetries<R2>,
2454    {
2455        let inverted = self
2456            .make_bounded()
2457            .entries()
2458            .map(q!(|(key, lookup_value)| (lookup_value, key)))
2459            .into_keyed();
2460        let found = inverted
2461            .clone()
2462            .join_keyed_stream(lookup.clone())
2463            .entries()
2464            .map(q!(|(lookup_value, (key, value))| (
2465                key,
2466                (lookup_value, Some(value))
2467            )))
2468            .into_keyed();
2469        let not_found = inverted
2470            .filter_key_not_in(lookup.keys())
2471            .entries()
2472            .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2473            .into_keyed();
2474
2475        found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2476    }
2477
2478    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2479    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2480    ///
2481    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2482    /// processed before an acknowledgement is emitted.
2483    pub fn atomic(self) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2484        let id = self.location.flow_state().borrow_mut().next_clock_id();
2485        let out_location = Atomic {
2486            tick: Tick {
2487                id,
2488                l: self.location.clone(),
2489            },
2490        };
2491        KeyedStream::new(
2492            out_location.clone(),
2493            HydroNode::BeginAtomic {
2494                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2495                metadata: out_location
2496                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2497            },
2498        )
2499    }
2500
2501    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2502    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2503    /// the order of the input.
2504    ///
2505    /// # Non-Determinism
2506    /// The batch boundaries are non-deterministic and may change across executions.
2507    pub fn batch(
2508        self,
2509        tick: &Tick<L>,
2510        nondet: NonDet,
2511    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2512        let _ = nondet;
2513        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2514        KeyedStream::new(
2515            tick.clone(),
2516            HydroNode::Batch {
2517                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2518                metadata: tick.new_node_metadata(
2519                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2520                ),
2521            },
2522        )
2523    }
2524}
2525
2526impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2527    KeyedStream<(K1, K2), V, L, B, O, R>
2528{
2529    /// Produces a new keyed stream by dropping the first element of the compound key.
2530    ///
2531    /// Because multiple keys may share the same suffix, this operation results in re-grouping
2532    /// of the values under the new keys. The values across groups with the same new key
2533    /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2534    ///
2535    /// # Example
2536    /// ```rust
2537    /// # #[cfg(feature = "deploy")] {
2538    /// # use hydro_lang::prelude::*;
2539    /// # use futures::StreamExt;
2540    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2541    /// process
2542    ///     .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2543    ///     .into_keyed()
2544    ///     .drop_key_prefix()
2545    /// #   .entries()
2546    /// # }, |mut stream| async move {
2547    /// // { 10: [2, 3], 20: [4] }
2548    /// # let mut results = Vec::new();
2549    /// # for _ in 0..3 {
2550    /// #     results.push(stream.next().await.unwrap());
2551    /// # }
2552    /// # results.sort();
2553    /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2554    /// # }));
2555    /// # }
2556    /// ```
2557    pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2558        self.entries()
2559            .map(q!(|((_k1, k2), v)| (k2, v)))
2560            .into_keyed()
2561    }
2562}
2563
2564impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
2565    KeyedStream<K, V, L, Unbounded, O, R>
2566{
2567    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2568    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2569    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2570    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2571    ///
2572    /// Currently, both input streams must be [`Unbounded`].
2573    ///
2574    /// # Example
2575    /// ```rust
2576    /// # #[cfg(feature = "deploy")] {
2577    /// # use hydro_lang::prelude::*;
2578    /// # use futures::StreamExt;
2579    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2580    /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2581    /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2582    /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2583    /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2584    /// numbers1.merge_unordered(numbers2)
2585    /// #   .entries()
2586    /// # }, |mut stream| async move {
2587    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2588    /// # let mut results = Vec::new();
2589    /// # for _ in 0..4 {
2590    /// #     results.push(stream.next().await.unwrap());
2591    /// # }
2592    /// # results.sort();
2593    /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2594    /// # }));
2595    /// # }
2596    /// ```
2597    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2598        self,
2599        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2600    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2601    where
2602        R: MinRetries<R2>,
2603    {
2604        KeyedStream::new(
2605            self.location.clone(),
2606            HydroNode::Chain {
2607                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2608                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2609                metadata: self.location.new_node_metadata(KeyedStream::<
2610                    K,
2611                    V,
2612                    L,
2613                    Unbounded,
2614                    NoOrder,
2615                    <R as MinRetries<R2>>::Min,
2616                >::collection_kind()),
2617            },
2618        )
2619    }
2620
2621    /// Deprecated: use [`KeyedStream::merge_unordered`] instead.
2622    #[deprecated(note = "use `merge_unordered` instead")]
2623    pub fn interleave<O2: Ordering, R2: Retries>(
2624        self,
2625        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2626    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2627    where
2628        R: MinRetries<R2>,
2629    {
2630        self.merge_unordered(other)
2631    }
2632}
2633
2634impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2635where
2636    L: Location<'a> + NoTick,
2637{
2638    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2639    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2640    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2641    /// used to create the atomic section.
2642    ///
2643    /// # Non-Determinism
2644    /// The batch boundaries are non-deterministic and may change across executions.
2645    pub fn batch_atomic(
2646        self,
2647        tick: &Tick<L>,
2648        nondet: NonDet,
2649    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2650        let _ = nondet;
2651        KeyedStream::new(
2652            tick.clone(),
2653            HydroNode::Batch {
2654                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2655                metadata: tick.new_node_metadata(
2656                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2657                ),
2658            },
2659        )
2660    }
2661
2662    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2663    /// See [`KeyedStream::atomic`] for more details.
2664    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2665        KeyedStream::new(
2666            self.location.tick.l.clone(),
2667            HydroNode::EndAtomic {
2668                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2669                metadata: self
2670                    .location
2671                    .tick
2672                    .l
2673                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2674            },
2675        )
2676    }
2677}
2678
2679impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2680where
2681    L: Location<'a>,
2682{
2683    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2684    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2685    /// each key.
2686    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2687        KeyedStream::new(
2688            self.location.outer().clone(),
2689            HydroNode::YieldConcat {
2690                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2691                metadata: self.location.outer().new_node_metadata(KeyedStream::<
2692                    K,
2693                    V,
2694                    L,
2695                    Unbounded,
2696                    O,
2697                    R,
2698                >::collection_kind(
2699                )),
2700            },
2701        )
2702    }
2703
2704    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2705    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2706    /// each key.
2707    ///
2708    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2709    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2710    /// stream's [`Tick`] context.
2711    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2712        let out_location = Atomic {
2713            tick: self.location.clone(),
2714        };
2715
2716        KeyedStream::new(
2717            out_location.clone(),
2718            HydroNode::YieldConcat {
2719                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2720                metadata: out_location.new_node_metadata(KeyedStream::<
2721                    K,
2722                    V,
2723                    Atomic<L>,
2724                    Unbounded,
2725                    O,
2726                    R,
2727                >::collection_kind()),
2728            },
2729        )
2730    }
2731
2732    /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2733    /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2734    ///
2735    /// This API is particularly useful for stateful computation on batches of data, such as
2736    /// maintaining an accumulated state that is up to date with the current batch.
2737    ///
2738    /// # Example
2739    /// ```rust
2740    /// # #[cfg(feature = "deploy")] {
2741    /// # use hydro_lang::prelude::*;
2742    /// # use futures::StreamExt;
2743    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2744    /// let tick = process.tick();
2745    /// # // ticks are lazy by default, forces the second tick to run
2746    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2747    /// # let batch_first_tick = process
2748    /// #   .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2749    /// #   .into_keyed()
2750    /// #   .batch(&tick, nondet!(/** test */));
2751    /// # let batch_second_tick = process
2752    /// #   .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2753    /// #   .into_keyed()
2754    /// #   .batch(&tick, nondet!(/** test */))
2755    /// #   .defer_tick(); // appears on the second tick
2756    /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2757    ///
2758    /// input.batch(&tick, nondet!(/** test */))
2759    ///     .across_ticks(|s| s.reduce(q!(|sum, new| {
2760    ///         *sum += new;
2761    ///     }))).entries().all_ticks()
2762    /// # }, |mut stream| async move {
2763    /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2764    /// # let mut results = Vec::new();
2765    /// # for _ in 0..4 {
2766    /// #     results.push(stream.next().await.unwrap());
2767    /// # }
2768    /// # results.sort();
2769    /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2770    /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2771    /// # results.clear();
2772    /// # for _ in 0..4 {
2773    /// #     results.push(stream.next().await.unwrap());
2774    /// # }
2775    /// # results.sort();
2776    /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2777    /// # }));
2778    /// # }
2779    /// ```
2780    pub fn across_ticks<Out: BatchAtomic>(
2781        self,
2782        thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2783    ) -> Out::Batched {
2784        thunk(self.all_ticks_atomic()).batched_atomic()
2785    }
2786
2787    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2788    /// tick `T` always has the entries of `self` at tick `T - 1`.
2789    ///
2790    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2791    ///
2792    /// This operator enables stateful iterative processing with ticks, by sending data from one
2793    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2794    ///
2795    /// # Example
2796    /// ```rust
2797    /// # #[cfg(feature = "deploy")] {
2798    /// # use hydro_lang::prelude::*;
2799    /// # use futures::StreamExt;
2800    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2801    /// let tick = process.tick();
2802    /// # // ticks are lazy by default, forces the second tick to run
2803    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2804    /// # let batch_first_tick = process
2805    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2806    /// #   .batch(&tick, nondet!(/** test */))
2807    /// #   .into_keyed();
2808    /// # let batch_second_tick = process
2809    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2810    /// #   .batch(&tick, nondet!(/** test */))
2811    /// #   .defer_tick()
2812    /// #   .into_keyed(); // appears on the second tick
2813    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2814    /// # batch_first_tick.chain(batch_second_tick);
2815    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2816    ///     changes_across_ticks // from the current tick
2817    /// )
2818    /// # .entries().all_ticks()
2819    /// # }, |mut stream| async move {
2820    /// // First tick: { 1: [2, 3] }
2821    /// # let mut results = Vec::new();
2822    /// # for _ in 0..2 {
2823    /// #     results.push(stream.next().await.unwrap());
2824    /// # }
2825    /// # results.sort();
2826    /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2827    /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2828    /// # results.clear();
2829    /// # for _ in 0..4 {
2830    /// #     results.push(stream.next().await.unwrap());
2831    /// # }
2832    /// # results.sort();
2833    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2834    /// // Third tick: { 1: [4], 2: [5] }
2835    /// # results.clear();
2836    /// # for _ in 0..2 {
2837    /// #     results.push(stream.next().await.unwrap());
2838    /// # }
2839    /// # results.sort();
2840    /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2841    /// # }));
2842    /// # }
2843    /// ```
2844    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2845        KeyedStream::new(
2846            self.location.clone(),
2847            HydroNode::DeferTick {
2848                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2849                metadata: self.location.new_node_metadata(KeyedStream::<
2850                    K,
2851                    V,
2852                    Tick<L>,
2853                    Bounded,
2854                    O,
2855                    R,
2856                >::collection_kind()),
2857            },
2858        )
2859    }
2860}
2861
2862#[cfg(test)]
2863mod tests {
2864    #[cfg(feature = "deploy")]
2865    use futures::{SinkExt, StreamExt};
2866    #[cfg(feature = "deploy")]
2867    use hydro_deploy::Deployment;
2868    #[cfg(any(feature = "deploy", feature = "sim"))]
2869    use stageleft::q;
2870
2871    #[cfg(any(feature = "deploy", feature = "sim"))]
2872    use crate::compile::builder::FlowBuilder;
2873    #[cfg(feature = "deploy")]
2874    use crate::live_collections::stream::ExactlyOnce;
2875    #[cfg(feature = "sim")]
2876    use crate::live_collections::stream::{NoOrder, TotalOrder};
2877    #[cfg(any(feature = "deploy", feature = "sim"))]
2878    use crate::location::Location;
2879    #[cfg(feature = "sim")]
2880    use crate::networking::TCP;
2881    #[cfg(any(feature = "deploy", feature = "sim"))]
2882    use crate::nondet::nondet;
2883    #[cfg(feature = "deploy")]
2884    use crate::properties::manual_proof;
2885
2886    #[cfg(feature = "deploy")]
2887    #[tokio::test]
2888    async fn get_unbounded_keyed_stream_bounded_singleton() {
2889        let mut deployment = Deployment::new();
2890
2891        let mut flow = FlowBuilder::new();
2892        let node = flow.process::<()>();
2893        let external = flow.external::<()>();
2894
2895        let (input_send, input_stream) =
2896            node.source_external_bincode::<_, (i32, i32), _, ExactlyOnce>(&external);
2897
2898        let key = node.singleton(q!(1));
2899
2900        let out = input_stream
2901            .into_keyed()
2902            .get(key)
2903            .send_bincode_external(&external);
2904
2905        let nodes = flow
2906            .with_process(&node, deployment.Localhost())
2907            .with_external(&external, deployment.Localhost())
2908            .deploy(&mut deployment);
2909
2910        deployment.deploy().await.unwrap();
2911
2912        let mut input_send = nodes.connect(input_send).await;
2913        let mut out = nodes.connect(out).await;
2914
2915        deployment.start().await.unwrap();
2916
2917        // First batch
2918        input_send.send((1, 10)).await.unwrap();
2919        input_send.send((2, 20)).await.unwrap();
2920        assert_eq!(out.next().await.unwrap(), 10);
2921
2922        // Second batch
2923        input_send.send((1, 11)).await.unwrap();
2924        input_send.send((2, 21)).await.unwrap();
2925        assert_eq!(out.next().await.unwrap(), 11);
2926    }
2927
2928    #[cfg(feature = "deploy")]
2929    #[tokio::test]
2930    async fn reduce_watermark_filter() {
2931        let mut deployment = Deployment::new();
2932
2933        let mut flow = FlowBuilder::new();
2934        let node = flow.process::<()>();
2935        let external = flow.external::<()>();
2936
2937        let node_tick = node.tick();
2938        let watermark = node_tick.singleton(q!(2));
2939
2940        let sum = node
2941            .source_stream(q!(tokio_stream::iter([
2942                (0, 100),
2943                (1, 101),
2944                (2, 102),
2945                (2, 102)
2946            ])))
2947            .into_keyed()
2948            .reduce_watermark(
2949                watermark,
2950                q!(|acc, v| {
2951                    *acc += v;
2952                }),
2953            )
2954            .snapshot(&node_tick, nondet!(/** test */))
2955            .entries()
2956            .all_ticks()
2957            .send_bincode_external(&external);
2958
2959        let nodes = flow
2960            .with_process(&node, deployment.Localhost())
2961            .with_external(&external, deployment.Localhost())
2962            .deploy(&mut deployment);
2963
2964        deployment.deploy().await.unwrap();
2965
2966        let mut out = nodes.connect(sum).await;
2967
2968        deployment.start().await.unwrap();
2969
2970        assert_eq!(out.next().await.unwrap(), (2, 204));
2971    }
2972
2973    #[cfg(feature = "deploy")]
2974    #[tokio::test]
2975    async fn reduce_watermark_bounded() {
2976        let mut deployment = Deployment::new();
2977
2978        let mut flow = FlowBuilder::new();
2979        let node = flow.process::<()>();
2980        let external = flow.external::<()>();
2981
2982        let node_tick = node.tick();
2983        let watermark = node_tick.singleton(q!(2));
2984
2985        let sum = node
2986            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2987            .into_keyed()
2988            .reduce_watermark(
2989                watermark,
2990                q!(|acc, v| {
2991                    *acc += v;
2992                }),
2993            )
2994            .entries()
2995            .send_bincode_external(&external);
2996
2997        let nodes = flow
2998            .with_process(&node, deployment.Localhost())
2999            .with_external(&external, deployment.Localhost())
3000            .deploy(&mut deployment);
3001
3002        deployment.deploy().await.unwrap();
3003
3004        let mut out = nodes.connect(sum).await;
3005
3006        deployment.start().await.unwrap();
3007
3008        assert_eq!(out.next().await.unwrap(), (2, 204));
3009    }
3010
3011    #[cfg(feature = "deploy")]
3012    #[tokio::test]
3013    async fn reduce_watermark_garbage_collect() {
3014        let mut deployment = Deployment::new();
3015
3016        let mut flow = FlowBuilder::new();
3017        let node = flow.process::<()>();
3018        let external = flow.external::<()>();
3019        let (tick_send, tick_trigger) =
3020            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
3021
3022        let node_tick = node.tick();
3023        let (watermark_complete_cycle, watermark) =
3024            node_tick.cycle_with_initial(node_tick.singleton(q!(2)));
3025        let next_watermark = watermark.clone().map(q!(|v| v + 1));
3026        watermark_complete_cycle.complete_next_tick(next_watermark);
3027
3028        let tick_triggered_input = node_tick
3029            .singleton(q!((3, 103)))
3030            .into_stream()
3031            .filter_if(
3032                tick_trigger
3033                    .clone()
3034                    .batch(&node_tick, nondet!(/** test */))
3035                    .first()
3036                    .is_some(),
3037            )
3038            .all_ticks();
3039
3040        let sum = node
3041            .source_stream(q!(tokio_stream::iter([
3042                (0, 100),
3043                (1, 101),
3044                (2, 102),
3045                (2, 102)
3046            ])))
3047            .merge_unordered(tick_triggered_input)
3048            .into_keyed()
3049            .reduce_watermark(
3050                watermark,
3051                q!(
3052                    |acc, v| {
3053                        *acc += v;
3054                    },
3055                    commutative = manual_proof!(/** integer addition is commutative */)
3056                ),
3057            )
3058            .snapshot(&node_tick, nondet!(/** test */))
3059            .entries()
3060            .all_ticks()
3061            .send_bincode_external(&external);
3062
3063        let nodes = flow
3064            .with_default_optimize()
3065            .with_process(&node, deployment.Localhost())
3066            .with_external(&external, deployment.Localhost())
3067            .deploy(&mut deployment);
3068
3069        deployment.deploy().await.unwrap();
3070
3071        let mut tick_send = nodes.connect(tick_send).await;
3072        let mut out_recv = nodes.connect(sum).await;
3073
3074        deployment.start().await.unwrap();
3075
3076        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
3077
3078        tick_send.send(()).await.unwrap();
3079
3080        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
3081    }
3082
3083    #[cfg(feature = "sim")]
3084    #[test]
3085    #[should_panic]
3086    fn sim_batch_nondet_size() {
3087        let mut flow = FlowBuilder::new();
3088        let node = flow.process::<()>();
3089
3090        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3091
3092        let tick = node.tick();
3093        let out_recv = input
3094            .batch(&tick, nondet!(/** test */))
3095            .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
3096            .entries()
3097            .all_ticks()
3098            .sim_output();
3099
3100        flow.sim().exhaustive(async || {
3101            out_recv
3102                .assert_yields_only_unordered([(1, vec![1, 2])])
3103                .await;
3104        });
3105    }
3106
3107    #[cfg(feature = "sim")]
3108    #[test]
3109    fn sim_batch_preserves_group_order() {
3110        let mut flow = FlowBuilder::new();
3111        let node = flow.process::<()>();
3112
3113        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3114
3115        let tick = node.tick();
3116        let out_recv = input
3117            .batch(&tick, nondet!(/** test */))
3118            .all_ticks()
3119            .fold_early_stop(
3120                q!(|| 0),
3121                q!(|acc, v| {
3122                    *acc = std::cmp::max(v, *acc);
3123                    *acc >= 2
3124                }),
3125            )
3126            .entries()
3127            .sim_output();
3128
3129        let instances = flow.sim().exhaustive(async || {
3130            out_recv
3131                .assert_yields_only_unordered([(1, 2), (2, 3)])
3132                .await;
3133        });
3134
3135        assert_eq!(instances, 8);
3136        // - three cases: all three in a separate tick (pick where (2, 3) is)
3137        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
3138        // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
3139        // - one case: all three together
3140    }
3141
3142    #[cfg(feature = "sim")]
3143    #[test]
3144    fn sim_batch_unordered_shuffles() {
3145        let mut flow = FlowBuilder::new();
3146        let node = flow.process::<()>();
3147
3148        let input = node
3149            .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
3150            .into_keyed()
3151            .weaken_ordering::<NoOrder>();
3152
3153        let tick = node.tick();
3154        let out_recv = input
3155            .batch(&tick, nondet!(/** test */))
3156            .all_ticks()
3157            .entries()
3158            .sim_output();
3159
3160        let instances = flow.sim().exhaustive(async || {
3161            out_recv
3162                .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
3163                .await;
3164        });
3165
3166        assert_eq!(instances, 13);
3167        // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
3168        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3169        // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
3170        // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3171    }
3172
3173    #[cfg(feature = "sim")]
3174    #[test]
3175    #[should_panic]
3176    fn sim_observe_order_batched() {
3177        let mut flow = FlowBuilder::new();
3178        let node = flow.process::<()>();
3179
3180        let (port, input) = node.sim_input::<_, NoOrder, _>();
3181
3182        let tick = node.tick();
3183        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3184        let out_recv = batch
3185            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3186            .all_ticks()
3187            .first()
3188            .entries()
3189            .sim_output();
3190
3191        flow.sim().exhaustive(async || {
3192            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3193            out_recv
3194                .assert_yields_only_unordered([(1, 1), (2, 1)])
3195                .await; // fails with assume_ordering
3196        });
3197    }
3198
3199    #[cfg(feature = "sim")]
3200    #[test]
3201    fn sim_observe_order_batched_count() {
3202        let mut flow = FlowBuilder::new();
3203        let node = flow.process::<()>();
3204
3205        let (port, input) = node.sim_input::<_, NoOrder, _>();
3206
3207        let tick = node.tick();
3208        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3209        let out_recv = batch
3210            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3211            .all_ticks()
3212            .entries()
3213            .sim_output();
3214
3215        let instance_count = flow.sim().exhaustive(async || {
3216            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3217            let _ = out_recv.collect_sorted::<Vec<_>>().await;
3218        });
3219
3220        assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
3221    }
3222
3223    #[cfg(feature = "sim")]
3224    #[test]
3225    fn sim_top_level_assume_ordering() {
3226        use std::collections::HashMap;
3227
3228        let mut flow = FlowBuilder::new();
3229        let node = flow.process::<()>();
3230
3231        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3232
3233        let out_recv = input
3234            .into_keyed()
3235            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3236            .fold_early_stop(
3237                q!(|| Vec::new()),
3238                q!(|acc, v| {
3239                    acc.push(v);
3240                    acc.len() >= 2
3241                }),
3242            )
3243            .entries()
3244            .sim_output();
3245
3246        let instance_count = flow.sim().exhaustive(async || {
3247            in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
3248            let out: HashMap<_, _> = out_recv
3249                .collect_sorted::<Vec<_>>()
3250                .await
3251                .into_iter()
3252                .collect();
3253            // Each key accumulates its values; we get one entry per key
3254            assert_eq!(out.len(), 2);
3255        });
3256
3257        assert_eq!(instance_count, 24)
3258    }
3259
3260    #[cfg(feature = "sim")]
3261    #[test]
3262    fn sim_top_level_assume_ordering_cycle_back() {
3263        use std::collections::HashMap;
3264
3265        let mut flow = FlowBuilder::new();
3266        let node = flow.process::<()>();
3267        let node2 = flow.process::<()>();
3268
3269        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3270
3271        let (complete_cycle_back, cycle_back) =
3272            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3273        let ordered = input
3274            .into_keyed()
3275            .merge_unordered(cycle_back)
3276            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3277        complete_cycle_back.complete(
3278            ordered
3279                .clone()
3280                .map(q!(|v| v + 1))
3281                .filter(q!(|v| v % 2 == 1))
3282                .entries()
3283                .send(&node2, TCP.fail_stop().bincode())
3284                .send(&node, TCP.fail_stop().bincode())
3285                .into_keyed(),
3286        );
3287
3288        let out_recv = ordered
3289            .fold_early_stop(
3290                q!(|| Vec::new()),
3291                q!(|acc, v| {
3292                    acc.push(v);
3293                    acc.len() >= 2
3294                }),
3295            )
3296            .entries()
3297            .sim_output();
3298
3299        let mut saw = false;
3300        let instance_count = flow.sim().exhaustive(async || {
3301            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3302            // We want to see [0, 1] - the cycled back value interleaved
3303            in_send.send_many_unordered([(1, 0), (1, 2)]);
3304            let out: HashMap<_, _> = out_recv
3305                .collect_sorted::<Vec<_>>()
3306                .await
3307                .into_iter()
3308                .collect();
3309
3310            // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3311            if let Some(values) = out.get(&1)
3312                && *values == vec![0, 1]
3313            {
3314                saw = true;
3315            }
3316        });
3317
3318        assert!(
3319            saw,
3320            "did not see an instance with key 1 having [0, 1] in order"
3321        );
3322        assert_eq!(instance_count, 6);
3323    }
3324
3325    #[cfg(feature = "sim")]
3326    #[test]
3327    fn sim_top_level_assume_ordering_cross_key_cycle() {
3328        use std::collections::HashMap;
3329
3330        // This test demonstrates why releasing one entry at a time is important:
3331        // When one key's observed order cycles back into a different key, we need
3332        // to be able to interleave the cycled-back entry with pending items for
3333        // that other key.
3334        let mut flow = FlowBuilder::new();
3335        let node = flow.process::<()>();
3336        let node2 = flow.process::<()>();
3337
3338        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3339
3340        let (complete_cycle_back, cycle_back) =
3341            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3342        let ordered = input
3343            .into_keyed()
3344            .merge_unordered(cycle_back)
3345            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3346
3347        // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3348        complete_cycle_back.complete(
3349            ordered
3350                .clone()
3351                .filter(q!(|v| *v == 10))
3352                .map(q!(|_| 100))
3353                .entries()
3354                .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3355                .send(&node2, TCP.fail_stop().bincode())
3356                .send(&node, TCP.fail_stop().bincode())
3357                .into_keyed(),
3358        );
3359
3360        let out_recv = ordered
3361            .fold_early_stop(
3362                q!(|| Vec::new()),
3363                q!(|acc, v| {
3364                    acc.push(v);
3365                    acc.len() >= 2
3366                }),
3367            )
3368            .entries()
3369            .sim_output();
3370
3371        // We want to see an instance where:
3372        // - (1, 10) is released first
3373        // - This causes (2, 100) to be cycled back
3374        // - (2, 100) is released BEFORE (2, 20) which was already pending
3375        let mut saw_cross_key_interleave = false;
3376        let instance_count = flow.sim().exhaustive(async || {
3377            // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3378            in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3379            let out: HashMap<_, _> = out_recv
3380                .collect_sorted::<Vec<_>>()
3381                .await
3382                .into_iter()
3383                .collect();
3384
3385            // Check if we see the cross-key interleaving:
3386            // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3387            if let Some(values) = out.get(&2)
3388                && values.len() >= 2
3389                && values[0] == 100
3390            {
3391                saw_cross_key_interleave = true;
3392            }
3393        });
3394
3395        assert!(
3396            saw_cross_key_interleave,
3397            "did not see an instance where cycled-back 100 was released before pending items for key 2"
3398        );
3399        assert_eq!(instance_count, 60);
3400    }
3401
3402    #[cfg(feature = "sim")]
3403    #[test]
3404    fn sim_top_level_assume_ordering_cycle_back_tick() {
3405        use std::collections::HashMap;
3406
3407        let mut flow = FlowBuilder::new();
3408        let node = flow.process::<()>();
3409        let node2 = flow.process::<()>();
3410
3411        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3412
3413        let (complete_cycle_back, cycle_back) =
3414            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3415        let ordered = input
3416            .into_keyed()
3417            .merge_unordered(cycle_back)
3418            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3419        complete_cycle_back.complete(
3420            ordered
3421                .clone()
3422                .batch(&node.tick(), nondet!(/** test */))
3423                .all_ticks()
3424                .map(q!(|v| v + 1))
3425                .filter(q!(|v| v % 2 == 1))
3426                .entries()
3427                .send(&node2, TCP.fail_stop().bincode())
3428                .send(&node, TCP.fail_stop().bincode())
3429                .into_keyed(),
3430        );
3431
3432        let out_recv = ordered
3433            .fold_early_stop(
3434                q!(|| Vec::new()),
3435                q!(|acc, v| {
3436                    acc.push(v);
3437                    acc.len() >= 2
3438                }),
3439            )
3440            .entries()
3441            .sim_output();
3442
3443        let mut saw = false;
3444        let instance_count = flow.sim().exhaustive(async || {
3445            in_send.send_many_unordered([(1, 0), (1, 2)]);
3446            let out: HashMap<_, _> = out_recv
3447                .collect_sorted::<Vec<_>>()
3448                .await
3449                .into_iter()
3450                .collect();
3451
3452            if let Some(values) = out.get(&1)
3453                && *values == vec![0, 1]
3454            {
3455                saw = true;
3456            }
3457        });
3458
3459        assert!(
3460            saw,
3461            "did not see an instance with key 1 having [0, 1] in order"
3462        );
3463        assert_eq!(instance_count, 58);
3464    }
3465
3466    #[cfg(feature = "sim")]
3467    #[test]
3468    fn sim_entries_partially_ordered_bounded() {
3469        let mut flow = FlowBuilder::new();
3470        let node = flow.process::<()>();
3471
3472        let (port, input) = node.sim_input::<_, TotalOrder, _>();
3473
3474        let tick = node.tick();
3475        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3476        let out_recv = batch
3477            .entries_partially_ordered(nondet!(/** test */))
3478            .all_ticks()
3479            .sim_output();
3480
3481        let instance_count = flow.sim().exhaustive(async || {
3482            port.send((1, 'a'));
3483            port.send((1, 'b'));
3484            port.send((2, 'c'));
3485            let _: Vec<(i32, char)> = out_recv.collect().await;
3486        });
3487
3488        assert_eq!(instance_count, 12);
3489    }
3490
3491    #[cfg(feature = "sim")]
3492    #[test]
3493    fn sim_entries_partially_ordered_top_level() {
3494        let mut flow = FlowBuilder::new();
3495        let node = flow.process::<()>();
3496
3497        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3498
3499        let out_recv = input
3500            .into_keyed()
3501            .entries_partially_ordered(nondet!(/** test */))
3502            .sim_output();
3503
3504        let instance_count = flow.sim().exhaustive(async || {
3505            in_send.send((1, 'a'));
3506            in_send.send((1, 'b'));
3507            in_send.send((2, 'c'));
3508            let _: Vec<(i32, char)> = out_recv.collect().await;
3509        });
3510
3511        assert_eq!(instance_count, 3);
3512    }
3513
3514    #[cfg(feature = "sim")]
3515    #[test]
3516    fn sim_entries_partially_ordered_cycle_back() {
3517        let mut flow = FlowBuilder::new();
3518        let node = flow.process::<()>();
3519        let node2 = flow.process::<()>();
3520
3521        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3522
3523        let (complete_cycle_back, cycle_back) =
3524            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3525        let ordered = input
3526            .into_keyed()
3527            .merge_unordered(cycle_back)
3528            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3529
3530        let flat = ordered
3531            .clone()
3532            .entries_partially_ordered(nondet!(/** test */));
3533
3534        complete_cycle_back.complete(
3535            flat.clone()
3536                .map(q!(|(k, v): (i32, i32)| (k, v + 1)))
3537                .filter(q!(|(_, v)| *v % 2 == 1))
3538                .send(&node2, TCP.fail_stop().bincode())
3539                .send(&node, TCP.fail_stop().bincode())
3540                .into_keyed(),
3541        );
3542
3543        let out_recv = flat.sim_output();
3544
3545        let mut saw = false;
3546        let instance_count = flow.sim().exhaustive(async || {
3547            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back as (1, 1).
3548            // We want to see (1, 1) before (1, 2) - the cycled back value beats the pending one
3549            in_send.send_many_unordered([(1, 0), (1, 2)]);
3550            let results: Vec<(i32, i32)> = out_recv.collect().await;
3551
3552            let pos_1 = results.iter().position(|v| *v == (1, 1));
3553            let pos_2 = results.iter().position(|v| *v == (1, 2));
3554            if let (Some(p1), Some(p2)) = (pos_1, pos_2)
3555                && p1 < p2
3556            {
3557                saw = true;
3558            }
3559        });
3560
3561        assert!(saw, "did not see an instance with (1, 1) before (1, 2)");
3562        assert_eq!(instance_count, 78);
3563    }
3564}