hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{
16 ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
17};
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::keyed_singleton::KeyedSingletonBound;
27use crate::live_collections::stream::{
28 AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
29};
30#[cfg(stageleft_runtime)]
31use crate::location::dynamic::{DynLocation, LocationId};
32use crate::location::tick::DeferTick;
33use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
34use crate::manual_expr::ManualExpr;
35use crate::nondet::{NonDet, nondet};
36use crate::properties::{
37 AggFuncAlgebra, ApplyMonotoneKeyedStream, ValidCommutativityFor, ValidIdempotenceFor,
38 manual_proof,
39};
40
41pub mod networking;
42
43/// Streaming elements of type `V` grouped by a key of type `K`.
44///
45/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
46/// order of keys is non-deterministic but the order *within* each group may be deterministic.
47///
48/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
49/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
50/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
51///
52/// Type Parameters:
53/// - `K`: the type of the key for each group
54/// - `V`: the type of the elements inside each group
55/// - `Loc`: the [`Location`] where the keyed stream is materialized
56/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
57/// - `Order`: tracks whether the elements within each group have deterministic order
58/// ([`TotalOrder`]) or not ([`NoOrder`])
59/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
60/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
61pub struct KeyedStream<
62 K,
63 V,
64 Loc,
65 Bound: Boundedness = Unbounded,
66 Order: Ordering = TotalOrder,
67 Retry: Retries = ExactlyOnce,
68> {
69 pub(crate) location: Loc,
70 pub(crate) ir_node: RefCell<HydroNode>,
71 pub(crate) flow_state: FlowState,
72
73 _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
74}
75
76impl<K, V, L, B: Boundedness, O: Ordering, R: Retries> Drop for KeyedStream<K, V, L, B, O, R> {
77 fn drop(&mut self) {
78 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
79 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
80 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
81 input: Box::new(ir_node),
82 op_metadata: HydroIrOpMetadata::new(),
83 });
84 }
85 }
86}
87
88impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
89 for KeyedStream<K, V, L, Unbounded, O, R>
90where
91 L: Location<'a>,
92{
93 fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
94 let new_meta = stream
95 .location
96 .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
97
98 KeyedStream {
99 location: stream.location.clone(),
100 flow_state: stream.flow_state.clone(),
101 ir_node: RefCell::new(HydroNode::Cast {
102 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
103 metadata: new_meta,
104 }),
105 _phantom: PhantomData,
106 }
107 }
108}
109
110impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
111 for KeyedStream<K, V, L, B, NoOrder, R>
112where
113 L: Location<'a>,
114{
115 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
116 stream.weaken_ordering()
117 }
118}
119
120impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
121where
122 L: Location<'a>,
123{
124 fn defer_tick(self) -> Self {
125 KeyedStream::defer_tick(self)
126 }
127}
128
129impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
130 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
131where
132 L: Location<'a>,
133{
134 type Location = Tick<L>;
135
136 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
137 KeyedStream {
138 flow_state: location.flow_state().clone(),
139 location: location.clone(),
140 ir_node: RefCell::new(HydroNode::CycleSource {
141 cycle_id,
142 metadata: location.new_node_metadata(
143 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
144 ),
145 }),
146 _phantom: PhantomData,
147 }
148 }
149}
150
151impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
152 for KeyedStream<K, V, Tick<L>, Bounded, O, R>
153where
154 L: Location<'a>,
155{
156 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
157 assert_eq!(
158 Location::id(&self.location),
159 expected_location,
160 "locations do not match"
161 );
162
163 self.location
164 .flow_state()
165 .borrow_mut()
166 .push_root(HydroRoot::CycleSink {
167 cycle_id,
168 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
169 op_metadata: HydroIrOpMetadata::new(),
170 });
171 }
172}
173
174impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
175 for KeyedStream<K, V, L, B, O, R>
176where
177 L: Location<'a> + NoTick,
178{
179 type Location = L;
180
181 fn create_source(cycle_id: CycleId, location: L) -> Self {
182 KeyedStream {
183 flow_state: location.flow_state().clone(),
184 location: location.clone(),
185 ir_node: RefCell::new(HydroNode::CycleSource {
186 cycle_id,
187 metadata: location
188 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
189 }),
190 _phantom: PhantomData,
191 }
192 }
193}
194
195impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
196 for KeyedStream<K, V, L, B, O, R>
197where
198 L: Location<'a> + NoTick,
199{
200 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
201 assert_eq!(
202 Location::id(&self.location),
203 expected_location,
204 "locations do not match"
205 );
206 self.location
207 .flow_state()
208 .borrow_mut()
209 .push_root(HydroRoot::CycleSink {
210 cycle_id,
211 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
212 op_metadata: HydroIrOpMetadata::new(),
213 });
214 }
215}
216
217impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
218 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
219{
220 fn clone(&self) -> Self {
221 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
222 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
223 *self.ir_node.borrow_mut() = HydroNode::Tee {
224 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
225 metadata: self.location.new_node_metadata(Self::collection_kind()),
226 };
227 }
228
229 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
230 KeyedStream {
231 location: self.location.clone(),
232 flow_state: self.flow_state.clone(),
233 ir_node: HydroNode::Tee {
234 inner: SharedNode(inner.0.clone()),
235 metadata: metadata.clone(),
236 }
237 .into(),
238 _phantom: PhantomData,
239 }
240 } else {
241 unreachable!()
242 }
243 }
244}
245
246/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
247/// control the processing of future elements.
248pub enum Generate<T> {
249 /// Emit the provided element, and keep processing future inputs.
250 Yield(T),
251 /// Emit the provided element as the _final_ element, do not process future inputs.
252 Return(T),
253 /// Do not emit anything, but continue processing future inputs.
254 Continue,
255 /// Do not emit anything, and do not process further inputs.
256 Break,
257}
258
259impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
260 KeyedStream<K, V, L, B, O, R>
261{
262 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
263 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
264 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
265
266 let flow_state = location.flow_state().clone();
267 KeyedStream {
268 location,
269 flow_state,
270 ir_node: RefCell::new(ir_node),
271 _phantom: PhantomData,
272 }
273 }
274
275 /// Returns the [`CollectionKind`] corresponding to this type.
276 pub fn collection_kind() -> CollectionKind {
277 CollectionKind::KeyedStream {
278 bound: B::BOUND_KIND,
279 value_order: O::ORDERING_KIND,
280 value_retry: R::RETRIES_KIND,
281 key_type: stageleft::quote_type::<K>().into(),
282 value_type: stageleft::quote_type::<V>().into(),
283 }
284 }
285
286 /// Returns the [`Location`] where this keyed stream is being materialized.
287 pub fn location(&self) -> &L {
288 &self.location
289 }
290
291 /// Turns this [`KeyedStream`] into a [`Stream`] preserving ordering, under the invariant
292 /// assumption that there is at most one key. If this invariant is broken, the program
293 /// may exhibit undefined behavior, so uses must be carefully vetted.
294 pub(crate) fn cast_at_most_one_key(self) -> Stream<(K, V), L, B, O, R> {
295 Stream::new(
296 self.location.clone(),
297 HydroNode::Cast {
298 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
299 metadata: self
300 .location
301 .new_node_metadata(Stream::<(K, V), L, B, O, R>::collection_kind()),
302 },
303 )
304 }
305
306 /// Turns this [`KeyedStream`] into a [`KeyedSingleton`], under the invariant assumption that
307 /// there is at most one entry per key. If this invariant is broken, the program may exhibit
308 /// undefined behavior, so uses must be carefully vetted.
309 pub(crate) fn cast_at_most_one_entry_per_key(
310 self,
311 ) -> KeyedSingleton<K, V, L, B::WithBoundedValue> {
312 KeyedSingleton::new(
313 self.location.clone(),
314 HydroNode::Cast {
315 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
316 metadata: self.location.new_node_metadata(KeyedSingleton::<
317 K,
318 V,
319 L,
320 B::WithBoundedValue,
321 >::collection_kind()),
322 },
323 )
324 }
325
326 pub(crate) fn use_ordering_type<O2: Ordering>(self) -> KeyedStream<K, V, L, B, O2, R> {
327 if O::ORDERING_KIND == O2::ORDERING_KIND {
328 KeyedStream::new(
329 self.location.clone(),
330 self.ir_node.replace(HydroNode::Placeholder),
331 )
332 } else {
333 panic!(
334 "Runtime ordering {:?} did not match requested cast {:?}.",
335 O::ORDERING_KIND,
336 O2::ORDERING_KIND
337 )
338 }
339 }
340
341 /// Explicitly "casts" the keyed stream to a type with a different ordering
342 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
343 /// by the type-system.
344 ///
345 /// # Non-Determinism
346 /// This function is used as an escape hatch, and any mistakes in the
347 /// provided ordering guarantee will propagate into the guarantees
348 /// for the rest of the program.
349 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
350 if O::ORDERING_KIND == O2::ORDERING_KIND {
351 KeyedStream::new(
352 self.location.clone(),
353 self.ir_node.replace(HydroNode::Placeholder),
354 )
355 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
356 // We can always weaken the ordering guarantee
357 KeyedStream::new(
358 self.location.clone(),
359 HydroNode::Cast {
360 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
361 metadata: self
362 .location
363 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
364 },
365 )
366 } else {
367 KeyedStream::new(
368 self.location.clone(),
369 HydroNode::ObserveNonDet {
370 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
371 trusted: false,
372 metadata: self
373 .location
374 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
375 },
376 )
377 }
378 }
379
380 fn assume_ordering_trusted<O2: Ordering>(
381 self,
382 _nondet: NonDet,
383 ) -> KeyedStream<K, V, L, B, O2, R> {
384 if O::ORDERING_KIND == O2::ORDERING_KIND {
385 KeyedStream::new(
386 self.location.clone(),
387 self.ir_node.replace(HydroNode::Placeholder),
388 )
389 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
390 // We can always weaken the ordering guarantee
391 KeyedStream::new(
392 self.location.clone(),
393 HydroNode::Cast {
394 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
395 metadata: self
396 .location
397 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
398 },
399 )
400 } else {
401 KeyedStream::new(
402 self.location.clone(),
403 HydroNode::ObserveNonDet {
404 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
405 trusted: true,
406 metadata: self
407 .location
408 .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
409 },
410 )
411 }
412 }
413
414 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
415 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
416 /// which is always safe because that is the weakest possible guarantee.
417 pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
418 self.weaken_ordering::<NoOrder>()
419 }
420
421 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
422 /// enforcing that `O2` is weaker than the input ordering guarantee.
423 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
424 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
425 self.assume_ordering::<O2>(nondet)
426 }
427
428 /// Explicitly "casts" the keyed stream to a type with a different retries
429 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
430 /// be proven by the type-system.
431 ///
432 /// # Non-Determinism
433 /// This function is used as an escape hatch, and any mistakes in the
434 /// provided retries guarantee will propagate into the guarantees
435 /// for the rest of the program.
436 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
437 if R::RETRIES_KIND == R2::RETRIES_KIND {
438 KeyedStream::new(
439 self.location.clone(),
440 self.ir_node.replace(HydroNode::Placeholder),
441 )
442 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
443 // We can always weaken the retries guarantee
444 KeyedStream::new(
445 self.location.clone(),
446 HydroNode::Cast {
447 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
448 metadata: self
449 .location
450 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
451 },
452 )
453 } else {
454 KeyedStream::new(
455 self.location.clone(),
456 HydroNode::ObserveNonDet {
457 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
458 trusted: false,
459 metadata: self
460 .location
461 .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
462 },
463 )
464 }
465 }
466
467 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
468 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
469 /// which is always safe because that is the weakest possible guarantee.
470 pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
471 self.weaken_retries::<AtLeastOnce>()
472 }
473
474 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
475 /// enforcing that `R2` is weaker than the input retries guarantee.
476 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
477 let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
478 self.assume_retries::<R2>(nondet)
479 }
480
481 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
482 /// implies that `O == TotalOrder`.
483 pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
484 where
485 O: IsOrdered,
486 {
487 self.assume_ordering(nondet!(/** no-op */))
488 }
489
490 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
491 /// implies that `R == ExactlyOnce`.
492 pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
493 where
494 R: IsExactlyOnce,
495 {
496 self.assume_retries(nondet!(/** no-op */))
497 }
498
499 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
500 /// implies that `B == Bounded`.
501 pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
502 where
503 B: IsBounded,
504 {
505 self.weaken_boundedness()
506 }
507
508 /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
509 /// which implies that `B == Bounded`.
510 pub fn weaken_boundedness<B2: Boundedness>(self) -> KeyedStream<K, V, L, B2, O, R> {
511 if B::BOUNDED == B2::BOUNDED {
512 KeyedStream::new(
513 self.location.clone(),
514 self.ir_node.replace(HydroNode::Placeholder),
515 )
516 } else {
517 // We can always weaken the boundedness
518 KeyedStream::new(
519 self.location.clone(),
520 HydroNode::Cast {
521 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
522 metadata: self
523 .location
524 .new_node_metadata(KeyedStream::<K, V, L, B2, O, R>::collection_kind()),
525 },
526 )
527 }
528 }
529
530 /// Flattens the keyed stream into an unordered stream of key-value pairs.
531 ///
532 /// # Example
533 /// ```rust
534 /// # #[cfg(feature = "deploy")] {
535 /// # use hydro_lang::prelude::*;
536 /// # use futures::StreamExt;
537 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
538 /// process
539 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
540 /// .into_keyed()
541 /// .entries()
542 /// # }, |mut stream| async move {
543 /// // (1, 2), (1, 3), (2, 4) in any order
544 /// # let mut results = Vec::new();
545 /// # for _ in 0..3 {
546 /// # results.push(stream.next().await.unwrap());
547 /// # }
548 /// # results.sort();
549 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
550 /// # }));
551 /// # }
552 /// ```
553 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
554 Stream::new(
555 self.location.clone(),
556 HydroNode::Cast {
557 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
558 metadata: self
559 .location
560 .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
561 },
562 )
563 }
564
565 /// Flattens the keyed stream into a totally ordered stream of key-value pairs,
566 /// preserving the order of values within each key group but non-deterministically
567 /// interleaving across keys.
568 ///
569 /// Requires the keyed stream to be totally ordered within each group (`O: IsOrdered`).
570 ///
571 /// # Non-Determinism
572 /// The interleaving of entries across different keys is non-deterministic.
573 /// Within each key, the original order is preserved.
574 pub fn entries_partially_ordered(self, _nondet: NonDet) -> Stream<(K, V), L, B, TotalOrder, R>
575 where
576 O: IsOrdered,
577 {
578 Stream::new(
579 self.location.clone(),
580 HydroNode::ObserveNonDet {
581 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
582 trusted: false,
583 metadata: self
584 .location
585 .new_node_metadata(Stream::<(K, V), L, B, TotalOrder, R>::collection_kind()),
586 },
587 )
588 }
589
590 /// Flattens the keyed stream into an unordered stream of only the values.
591 ///
592 /// # Example
593 /// ```rust
594 /// # #[cfg(feature = "deploy")] {
595 /// # use hydro_lang::prelude::*;
596 /// # use futures::StreamExt;
597 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
598 /// process
599 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
600 /// .into_keyed()
601 /// .values()
602 /// # }, |mut stream| async move {
603 /// // 2, 3, 4 in any order
604 /// # let mut results = Vec::new();
605 /// # for _ in 0..3 {
606 /// # results.push(stream.next().await.unwrap());
607 /// # }
608 /// # results.sort();
609 /// # assert_eq!(results, vec![2, 3, 4]);
610 /// # }));
611 /// # }
612 /// ```
613 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
614 self.entries().map(q!(|(_, v)| v))
615 }
616
617 /// Flattens the keyed stream into an unordered stream of just the keys.
618 ///
619 /// # Example
620 /// ```rust
621 /// # #[cfg(feature = "deploy")] {
622 /// # use hydro_lang::prelude::*;
623 /// # use futures::StreamExt;
624 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
625 /// # process
626 /// # .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
627 /// # .into_keyed()
628 /// # .keys()
629 /// # }, |mut stream| async move {
630 /// // 1, 2 in any order
631 /// # let mut results = Vec::new();
632 /// # for _ in 0..2 {
633 /// # results.push(stream.next().await.unwrap());
634 /// # }
635 /// # results.sort();
636 /// # assert_eq!(results, vec![1, 2]);
637 /// # }));
638 /// # }
639 /// ```
640 pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
641 where
642 K: Eq + Hash,
643 {
644 self.entries().map(q!(|(k, _)| k)).unique()
645 }
646
647 /// Transforms each value by invoking `f` on each element, with keys staying the same
648 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
649 ///
650 /// If you do not want to modify the stream and instead only want to view
651 /// each item use [`KeyedStream::inspect`] instead.
652 ///
653 /// # Example
654 /// ```rust
655 /// # #[cfg(feature = "deploy")] {
656 /// # use hydro_lang::prelude::*;
657 /// # use futures::StreamExt;
658 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
659 /// process
660 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
661 /// .into_keyed()
662 /// .map(q!(|v| v + 1))
663 /// # .entries()
664 /// # }, |mut stream| async move {
665 /// // { 1: [3, 4], 2: [5] }
666 /// # let mut results = Vec::new();
667 /// # for _ in 0..3 {
668 /// # results.push(stream.next().await.unwrap());
669 /// # }
670 /// # results.sort();
671 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
672 /// # }));
673 /// # }
674 /// ```
675 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
676 where
677 F: Fn(V) -> U + 'a,
678 {
679 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
680 let map_f = q!({
681 let orig = f;
682 move |(k, v)| (k, orig(v))
683 })
684 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
685 .into();
686
687 KeyedStream::new(
688 self.location.clone(),
689 HydroNode::Map {
690 f: map_f,
691 singleton_refs: Vec::new(),
692 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
693 metadata: self
694 .location
695 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
696 },
697 )
698 }
699
700 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
701 /// re-grouped even they are tuples; instead they will be grouped under the original key.
702 ///
703 /// If you do not want to modify the stream and instead only want to view
704 /// each item use [`KeyedStream::inspect_with_key`] instead.
705 ///
706 /// # Example
707 /// ```rust
708 /// # #[cfg(feature = "deploy")] {
709 /// # use hydro_lang::prelude::*;
710 /// # use futures::StreamExt;
711 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
712 /// process
713 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
714 /// .into_keyed()
715 /// .map_with_key(q!(|(k, v)| k + v))
716 /// # .entries()
717 /// # }, |mut stream| async move {
718 /// // { 1: [3, 4], 2: [6] }
719 /// # let mut results = Vec::new();
720 /// # for _ in 0..3 {
721 /// # results.push(stream.next().await.unwrap());
722 /// # }
723 /// # results.sort();
724 /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
725 /// # }));
726 /// # }
727 /// ```
728 pub fn map_with_key<U, F>(
729 self,
730 f: impl IntoQuotedMut<'a, F, L> + Copy,
731 ) -> KeyedStream<K, U, L, B, O, R>
732 where
733 F: Fn((K, V)) -> U + 'a,
734 K: Clone,
735 {
736 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
737 let map_f = q!({
738 let orig = f;
739 move |(k, v)| {
740 let out = orig((Clone::clone(&k), v));
741 (k, out)
742 }
743 })
744 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
745 .into();
746
747 KeyedStream::new(
748 self.location.clone(),
749 HydroNode::Map {
750 f: map_f,
751 singleton_refs: Vec::new(),
752 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
753 metadata: self
754 .location
755 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
756 },
757 )
758 }
759
760 /// Prepends a new value to the key of each element in the stream, producing a new
761 /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
762 /// occurs and the elements in each group preserve their original order.
763 ///
764 /// # Example
765 /// ```rust
766 /// # #[cfg(feature = "deploy")] {
767 /// # use hydro_lang::prelude::*;
768 /// # use futures::StreamExt;
769 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
770 /// process
771 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
772 /// .into_keyed()
773 /// .prefix_key(q!(|&(k, _)| k % 2))
774 /// # .entries()
775 /// # }, |mut stream| async move {
776 /// // { (1, 1): [2, 3], (0, 2): [4] }
777 /// # let mut results = Vec::new();
778 /// # for _ in 0..3 {
779 /// # results.push(stream.next().await.unwrap());
780 /// # }
781 /// # results.sort();
782 /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
783 /// # }));
784 /// # }
785 /// ```
786 pub fn prefix_key<K2, F>(
787 self,
788 f: impl IntoQuotedMut<'a, F, L> + Copy,
789 ) -> KeyedStream<(K2, K), V, L, B, O, R>
790 where
791 F: Fn(&(K, V)) -> K2 + 'a,
792 {
793 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
794 let map_f = q!({
795 let orig = f;
796 move |kv| {
797 let out = orig(&kv);
798 ((out, kv.0), kv.1)
799 }
800 })
801 .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
802 .into();
803
804 KeyedStream::new(
805 self.location.clone(),
806 HydroNode::Map {
807 f: map_f,
808 singleton_refs: Vec::new(),
809 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
810 metadata: self
811 .location
812 .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
813 },
814 )
815 }
816
817 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
818 /// `f`, preserving the order of the elements within the group.
819 ///
820 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
821 /// not modify or take ownership of the values. If you need to modify the values while filtering
822 /// use [`KeyedStream::filter_map`] instead.
823 ///
824 /// # Example
825 /// ```rust
826 /// # #[cfg(feature = "deploy")] {
827 /// # use hydro_lang::prelude::*;
828 /// # use futures::StreamExt;
829 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
830 /// process
831 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
832 /// .into_keyed()
833 /// .filter(q!(|&x| x > 2))
834 /// # .entries()
835 /// # }, |mut stream| async move {
836 /// // { 1: [3], 2: [4] }
837 /// # let mut results = Vec::new();
838 /// # for _ in 0..2 {
839 /// # results.push(stream.next().await.unwrap());
840 /// # }
841 /// # results.sort();
842 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
843 /// # }));
844 /// # }
845 /// ```
846 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
847 where
848 F: Fn(&V) -> bool + 'a,
849 {
850 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
851 let filter_f = q!({
852 let orig = f;
853 move |t: &(_, _)| orig(&t.1)
854 })
855 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
856 .into();
857
858 KeyedStream::new(
859 self.location.clone(),
860 HydroNode::Filter {
861 f: filter_f,
862 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
863 metadata: self.location.new_node_metadata(Self::collection_kind()),
864 },
865 )
866 }
867
868 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
869 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
870 ///
871 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
872 /// not modify or take ownership of the values. If you need to modify the values while filtering
873 /// use [`KeyedStream::filter_map_with_key`] instead.
874 ///
875 /// # Example
876 /// ```rust
877 /// # #[cfg(feature = "deploy")] {
878 /// # use hydro_lang::prelude::*;
879 /// # use futures::StreamExt;
880 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
881 /// process
882 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
883 /// .into_keyed()
884 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
885 /// # .entries()
886 /// # }, |mut stream| async move {
887 /// // { 1: [3], 2: [4] }
888 /// # let mut results = Vec::new();
889 /// # for _ in 0..2 {
890 /// # results.push(stream.next().await.unwrap());
891 /// # }
892 /// # results.sort();
893 /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
894 /// # }));
895 /// # }
896 /// ```
897 pub fn filter_with_key<F>(
898 self,
899 f: impl IntoQuotedMut<'a, F, L> + Copy,
900 ) -> KeyedStream<K, V, L, B, O, R>
901 where
902 F: Fn(&(K, V)) -> bool + 'a,
903 {
904 let filter_f = f
905 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
906 .into();
907
908 KeyedStream::new(
909 self.location.clone(),
910 HydroNode::Filter {
911 f: filter_f,
912 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
913 metadata: self.location.new_node_metadata(Self::collection_kind()),
914 },
915 )
916 }
917
918 /// An operator that both filters and maps each value, with keys staying the same.
919 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
920 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
921 ///
922 /// # Example
923 /// ```rust
924 /// # #[cfg(feature = "deploy")] {
925 /// # use hydro_lang::prelude::*;
926 /// # use futures::StreamExt;
927 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
928 /// process
929 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
930 /// .into_keyed()
931 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
932 /// # .entries()
933 /// # }, |mut stream| async move {
934 /// // { 1: [2], 2: [4] }
935 /// # let mut results = Vec::new();
936 /// # for _ in 0..2 {
937 /// # results.push(stream.next().await.unwrap());
938 /// # }
939 /// # results.sort();
940 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
941 /// # }));
942 /// # }
943 /// ```
944 pub fn filter_map<U, F>(
945 self,
946 f: impl IntoQuotedMut<'a, F, L> + Copy,
947 ) -> KeyedStream<K, U, L, B, O, R>
948 where
949 F: Fn(V) -> Option<U> + 'a,
950 {
951 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
952 let filter_map_f = q!({
953 let orig = f;
954 move |(k, v)| orig(v).map(|o| (k, o))
955 })
956 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
957 .into();
958
959 KeyedStream::new(
960 self.location.clone(),
961 HydroNode::FilterMap {
962 f: filter_map_f,
963 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
964 metadata: self
965 .location
966 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
967 },
968 )
969 }
970
971 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
972 /// re-grouped even they are tuples; instead they will be grouped under the original key.
973 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
974 ///
975 /// # Example
976 /// ```rust
977 /// # #[cfg(feature = "deploy")] {
978 /// # use hydro_lang::prelude::*;
979 /// # use futures::StreamExt;
980 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
981 /// process
982 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
983 /// .into_keyed()
984 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
985 /// # .entries()
986 /// # }, |mut stream| async move {
987 /// // { 2: [2] }
988 /// # let mut results = Vec::new();
989 /// # for _ in 0..1 {
990 /// # results.push(stream.next().await.unwrap());
991 /// # }
992 /// # results.sort();
993 /// # assert_eq!(results, vec![(2, 2)]);
994 /// # }));
995 /// # }
996 /// ```
997 pub fn filter_map_with_key<U, F>(
998 self,
999 f: impl IntoQuotedMut<'a, F, L> + Copy,
1000 ) -> KeyedStream<K, U, L, B, O, R>
1001 where
1002 F: Fn((K, V)) -> Option<U> + 'a,
1003 K: Clone,
1004 {
1005 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1006 let filter_map_f = q!({
1007 let orig = f;
1008 move |(k, v)| {
1009 let out = orig((Clone::clone(&k), v));
1010 out.map(|o| (k, o))
1011 }
1012 })
1013 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1014 .into();
1015
1016 KeyedStream::new(
1017 self.location.clone(),
1018 HydroNode::FilterMap {
1019 f: filter_map_f,
1020 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1021 metadata: self
1022 .location
1023 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1024 },
1025 )
1026 }
1027
1028 /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
1029 /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
1030 /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
1031 ///
1032 /// # Example
1033 /// ```rust
1034 /// # #[cfg(feature = "deploy")] {
1035 /// # use hydro_lang::prelude::*;
1036 /// # use futures::StreamExt;
1037 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1038 /// let tick = process.tick();
1039 /// let batch = process
1040 /// .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
1041 /// .into_keyed()
1042 /// .batch(&tick, nondet!(/** test */));
1043 /// let count = batch.clone().entries().count(); // `count()` returns a singleton
1044 /// batch.cross_singleton(count).all_ticks().entries()
1045 /// # }, |mut stream| async move {
1046 /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
1047 /// # let mut results = Vec::new();
1048 /// # for _ in 0..3 {
1049 /// # results.push(stream.next().await.unwrap());
1050 /// # }
1051 /// # results.sort();
1052 /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
1053 /// # }));
1054 /// # }
1055 /// ```
1056 pub fn cross_singleton<O2>(
1057 self,
1058 other: impl Into<Optional<O2, L, Bounded>>,
1059 ) -> KeyedStream<K, (V, O2), L, B, O, R>
1060 where
1061 O2: Clone,
1062 {
1063 let other: Optional<O2, L, Bounded> = other.into();
1064 check_matching_location(&self.location, &other.location);
1065
1066 Stream::new(
1067 self.location.clone(),
1068 HydroNode::CrossSingleton {
1069 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1070 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1071 metadata: self
1072 .location
1073 .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
1074 },
1075 )
1076 .map(q!(|((k, v), o2)| (k, (v, o2))))
1077 .into_keyed()
1078 }
1079
1080 /// For each value `v` in each group, transform `v` using `f` and then treat the
1081 /// result as an [`Iterator`] to produce values one by one within the same group.
1082 /// The implementation for [`Iterator`] for the output type `I` must produce items
1083 /// in a **deterministic** order.
1084 ///
1085 /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
1086 /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
1087 ///
1088 /// # Example
1089 /// ```rust
1090 /// # #[cfg(feature = "deploy")] {
1091 /// # use hydro_lang::prelude::*;
1092 /// # use futures::StreamExt;
1093 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1094 /// process
1095 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1096 /// .into_keyed()
1097 /// .flat_map_ordered(q!(|x| x))
1098 /// # .entries()
1099 /// # }, |mut stream| async move {
1100 /// // { 1: [2, 3, 4], 2: [5, 6] }
1101 /// # let mut results = Vec::new();
1102 /// # for _ in 0..5 {
1103 /// # results.push(stream.next().await.unwrap());
1104 /// # }
1105 /// # results.sort();
1106 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1107 /// # }));
1108 /// # }
1109 /// ```
1110 pub fn flat_map_ordered<U, I, F>(
1111 self,
1112 f: impl IntoQuotedMut<'a, F, L> + Copy,
1113 ) -> KeyedStream<K, U, L, B, O, R>
1114 where
1115 I: IntoIterator<Item = U>,
1116 F: Fn(V) -> I + 'a,
1117 K: Clone,
1118 {
1119 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1120 let flat_map_f = q!({
1121 let orig = f;
1122 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1123 })
1124 .splice_fn1_ctx::<(K, V), _>(&self.location)
1125 .into();
1126
1127 KeyedStream::new(
1128 self.location.clone(),
1129 HydroNode::FlatMap {
1130 f: flat_map_f,
1131 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1132 metadata: self
1133 .location
1134 .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1135 },
1136 )
1137 }
1138
1139 /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1140 /// for the output type `I` to produce items in any order.
1141 ///
1142 /// # Example
1143 /// ```rust
1144 /// # #[cfg(feature = "deploy")] {
1145 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1146 /// # use futures::StreamExt;
1147 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1148 /// process
1149 /// .source_iter(q!(vec![
1150 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1151 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1152 /// ]))
1153 /// .into_keyed()
1154 /// .flat_map_unordered(q!(|x| x))
1155 /// # .entries()
1156 /// # }, |mut stream| async move {
1157 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1158 /// # let mut results = Vec::new();
1159 /// # for _ in 0..4 {
1160 /// # results.push(stream.next().await.unwrap());
1161 /// # }
1162 /// # results.sort();
1163 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1164 /// # }));
1165 /// # }
1166 /// ```
1167 pub fn flat_map_unordered<U, I, F>(
1168 self,
1169 f: impl IntoQuotedMut<'a, F, L> + Copy,
1170 ) -> KeyedStream<K, U, L, B, NoOrder, R>
1171 where
1172 I: IntoIterator<Item = U>,
1173 F: Fn(V) -> I + 'a,
1174 K: Clone,
1175 {
1176 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1177 let flat_map_f = q!({
1178 let orig = f;
1179 move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1180 })
1181 .splice_fn1_ctx::<(K, V), _>(&self.location)
1182 .into();
1183
1184 KeyedStream::new(
1185 self.location.clone(),
1186 HydroNode::FlatMap {
1187 f: flat_map_f,
1188 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1189 metadata: self
1190 .location
1191 .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1192 },
1193 )
1194 }
1195
1196 /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1197 /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1198 /// items in a **deterministic** order.
1199 ///
1200 /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1201 /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1202 ///
1203 /// # Example
1204 /// ```rust
1205 /// # #[cfg(feature = "deploy")] {
1206 /// # use hydro_lang::prelude::*;
1207 /// # use futures::StreamExt;
1208 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1209 /// process
1210 /// .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1211 /// .into_keyed()
1212 /// .flatten_ordered()
1213 /// # .entries()
1214 /// # }, |mut stream| async move {
1215 /// // { 1: [2, 3, 4], 2: [5, 6] }
1216 /// # let mut results = Vec::new();
1217 /// # for _ in 0..5 {
1218 /// # results.push(stream.next().await.unwrap());
1219 /// # }
1220 /// # results.sort();
1221 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1222 /// # }));
1223 /// # }
1224 /// ```
1225 pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1226 where
1227 V: IntoIterator<Item = U>,
1228 K: Clone,
1229 {
1230 self.flat_map_ordered(q!(|d| d))
1231 }
1232
1233 /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1234 /// for the value type `V` to produce items in any order.
1235 ///
1236 /// # Example
1237 /// ```rust
1238 /// # #[cfg(feature = "deploy")] {
1239 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1240 /// # use futures::StreamExt;
1241 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1242 /// process
1243 /// .source_iter(q!(vec![
1244 /// (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1245 /// (2, std::collections::HashSet::from_iter(vec![4, 5]))
1246 /// ]))
1247 /// .into_keyed()
1248 /// .flatten_unordered()
1249 /// # .entries()
1250 /// # }, |mut stream| async move {
1251 /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1252 /// # let mut results = Vec::new();
1253 /// # for _ in 0..4 {
1254 /// # results.push(stream.next().await.unwrap());
1255 /// # }
1256 /// # results.sort();
1257 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1258 /// # }));
1259 /// # }
1260 /// ```
1261 pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1262 where
1263 V: IntoIterator<Item = U>,
1264 K: Clone,
1265 {
1266 self.flat_map_unordered(q!(|d| d))
1267 }
1268
1269 /// An operator which allows you to "inspect" each element of a stream without
1270 /// modifying it. The closure `f` is called on a reference to each value. This is
1271 /// mainly useful for debugging, and should not be used to generate side-effects.
1272 ///
1273 /// # Example
1274 /// ```rust
1275 /// # #[cfg(feature = "deploy")] {
1276 /// # use hydro_lang::prelude::*;
1277 /// # use futures::StreamExt;
1278 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1279 /// process
1280 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1281 /// .into_keyed()
1282 /// .inspect(q!(|v| println!("{}", v)))
1283 /// # .entries()
1284 /// # }, |mut stream| async move {
1285 /// # let mut results = Vec::new();
1286 /// # for _ in 0..3 {
1287 /// # results.push(stream.next().await.unwrap());
1288 /// # }
1289 /// # results.sort();
1290 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1291 /// # }));
1292 /// # }
1293 /// ```
1294 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1295 where
1296 F: Fn(&V) + 'a,
1297 {
1298 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1299 let inspect_f = q!({
1300 let orig = f;
1301 move |t: &(_, _)| orig(&t.1)
1302 })
1303 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1304 .into();
1305
1306 KeyedStream::new(
1307 self.location.clone(),
1308 HydroNode::Inspect {
1309 f: inspect_f,
1310 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1311 metadata: self.location.new_node_metadata(Self::collection_kind()),
1312 },
1313 )
1314 }
1315
1316 /// An operator which allows you to "inspect" each element of a stream without
1317 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1318 /// mainly useful for debugging, and should not be used to generate side-effects.
1319 ///
1320 /// # Example
1321 /// ```rust
1322 /// # #[cfg(feature = "deploy")] {
1323 /// # use hydro_lang::prelude::*;
1324 /// # use futures::StreamExt;
1325 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1326 /// process
1327 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1328 /// .into_keyed()
1329 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1330 /// # .entries()
1331 /// # }, |mut stream| async move {
1332 /// # let mut results = Vec::new();
1333 /// # for _ in 0..3 {
1334 /// # results.push(stream.next().await.unwrap());
1335 /// # }
1336 /// # results.sort();
1337 /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1338 /// # }));
1339 /// # }
1340 /// ```
1341 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1342 where
1343 F: Fn(&(K, V)) + 'a,
1344 {
1345 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1346
1347 KeyedStream::new(
1348 self.location.clone(),
1349 HydroNode::Inspect {
1350 f: inspect_f,
1351 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1352 metadata: self.location.new_node_metadata(Self::collection_kind()),
1353 },
1354 )
1355 }
1356
1357 /// An operator which allows you to "name" a `HydroNode`.
1358 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1359 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1360 {
1361 let mut node = self.ir_node.borrow_mut();
1362 let metadata = node.metadata_mut();
1363 metadata.tag = Some(name.to_owned());
1364 }
1365 self
1366 }
1367
1368 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1369 ///
1370 /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1371 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1372 /// early by returning `None`.
1373 ///
1374 /// The function takes a mutable reference to the accumulator and the current element, and returns
1375 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1376 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1377 ///
1378 /// # Example
1379 /// ```rust
1380 /// # #[cfg(feature = "deploy")] {
1381 /// # use hydro_lang::prelude::*;
1382 /// # use futures::StreamExt;
1383 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1384 /// process
1385 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1386 /// .into_keyed()
1387 /// .scan(
1388 /// q!(|| 0),
1389 /// q!(|acc, x| {
1390 /// *acc += x;
1391 /// if *acc % 2 == 0 { None } else { Some(*acc) }
1392 /// }),
1393 /// )
1394 /// # .entries()
1395 /// # }, |mut stream| async move {
1396 /// // Output: { 0: [1], 1: [3, 7] }
1397 /// # let mut results = Vec::new();
1398 /// # for _ in 0..3 {
1399 /// # results.push(stream.next().await.unwrap());
1400 /// # }
1401 /// # results.sort();
1402 /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1403 /// # }));
1404 /// # }
1405 /// ```
1406 pub fn scan<A, U, I, F>(
1407 self,
1408 init: impl IntoQuotedMut<'a, I, L> + Copy,
1409 f: impl IntoQuotedMut<'a, F, L> + Copy,
1410 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1411 where
1412 O: IsOrdered,
1413 R: IsExactlyOnce,
1414 K: Clone + Eq + Hash,
1415 I: Fn() -> A + 'a,
1416 F: Fn(&mut A, V) -> Option<U> + 'a,
1417 {
1418 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1419 self.make_totally_ordered().make_exactly_once().generator(
1420 init,
1421 q!({
1422 let orig = f;
1423 move |state, v| {
1424 if let Some(out) = orig(state, v) {
1425 Generate::Yield(out)
1426 } else {
1427 Generate::Break
1428 }
1429 }
1430 }),
1431 )
1432 }
1433
1434 /// Iteratively processes the elements in each group using a state machine that can yield
1435 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1436 /// syntax in Rust, without requiring special syntax.
1437 ///
1438 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1439 /// state for each group. The second argument defines the processing logic, taking in a
1440 /// mutable reference to the group's state and the value to be processed. It emits a
1441 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1442 /// should be processed.
1443 ///
1444 /// # Example
1445 /// ```rust
1446 /// # #[cfg(feature = "deploy")] {
1447 /// # use hydro_lang::prelude::*;
1448 /// # use futures::StreamExt;
1449 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1450 /// process
1451 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1452 /// .into_keyed()
1453 /// .generator(
1454 /// q!(|| 0),
1455 /// q!(|acc, x| {
1456 /// *acc += x;
1457 /// if *acc > 100 {
1458 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
1459 /// "done!".to_owned()
1460 /// )
1461 /// } else if *acc % 2 == 0 {
1462 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
1463 /// "even".to_owned()
1464 /// )
1465 /// } else {
1466 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1467 /// }
1468 /// }),
1469 /// )
1470 /// # .entries()
1471 /// # }, |mut stream| async move {
1472 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1473 /// # let mut results = Vec::new();
1474 /// # for _ in 0..3 {
1475 /// # results.push(stream.next().await.unwrap());
1476 /// # }
1477 /// # results.sort();
1478 /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1479 /// # }));
1480 /// # }
1481 /// ```
1482 pub fn generator<A, U, I, F>(
1483 self,
1484 init: impl IntoQuotedMut<'a, I, L> + Copy,
1485 f: impl IntoQuotedMut<'a, F, L> + Copy,
1486 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1487 where
1488 O: IsOrdered,
1489 R: IsExactlyOnce,
1490 K: Clone + Eq + Hash,
1491 I: Fn() -> A + 'a,
1492 F: Fn(&mut A, V) -> Generate<U> + 'a,
1493 {
1494 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1495 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1496
1497 let this = self.make_totally_ordered().make_exactly_once();
1498
1499 let scan_init = q!(|| HashMap::new())
1500 .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1501 .into();
1502 let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1503 let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1504 if let Some(existing_state_value) = existing_state {
1505 match f(existing_state_value, v) {
1506 Generate::Yield(out) => Some(Some((k, out))),
1507 Generate::Return(out) => {
1508 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1509 Some(Some((k, out)))
1510 }
1511 Generate::Break => {
1512 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1513 Some(None)
1514 }
1515 Generate::Continue => Some(None),
1516 }
1517 } else {
1518 Some(None)
1519 }
1520 })
1521 .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1522 .into();
1523
1524 let scan_node = HydroNode::Scan {
1525 init: scan_init,
1526 acc: scan_f,
1527 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1528 metadata: this.location.new_node_metadata(Stream::<
1529 Option<(K, U)>,
1530 L,
1531 B,
1532 TotalOrder,
1533 ExactlyOnce,
1534 >::collection_kind()),
1535 };
1536
1537 let flatten_f = q!(|d| d)
1538 .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1539 .into();
1540 let flatten_node = HydroNode::FlatMap {
1541 f: flatten_f,
1542 input: Box::new(scan_node),
1543 metadata: this.location.new_node_metadata(KeyedStream::<
1544 K,
1545 U,
1546 L,
1547 B,
1548 TotalOrder,
1549 ExactlyOnce,
1550 >::collection_kind()),
1551 };
1552
1553 KeyedStream::new(this.location.clone(), flatten_node)
1554 }
1555
1556 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1557 /// in-order across the values in each group. But the aggregation function returns a boolean,
1558 /// which when true indicates that the aggregated result is complete and can be released to
1559 /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1560 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1561 /// normal stream elements.
1562 ///
1563 /// # Example
1564 /// ```rust
1565 /// # #[cfg(feature = "deploy")] {
1566 /// # use hydro_lang::prelude::*;
1567 /// # use futures::StreamExt;
1568 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1569 /// process
1570 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1571 /// .into_keyed()
1572 /// .fold_early_stop(
1573 /// q!(|| 0),
1574 /// q!(|acc, x| {
1575 /// *acc += x;
1576 /// x % 2 == 0
1577 /// }),
1578 /// )
1579 /// # .entries()
1580 /// # }, |mut stream| async move {
1581 /// // Output: { 0: 2, 1: 9 }
1582 /// # let mut results = Vec::new();
1583 /// # for _ in 0..2 {
1584 /// # results.push(stream.next().await.unwrap());
1585 /// # }
1586 /// # results.sort();
1587 /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1588 /// # }));
1589 /// # }
1590 /// ```
1591 pub fn fold_early_stop<A, I, F>(
1592 self,
1593 init: impl IntoQuotedMut<'a, I, L> + Copy,
1594 f: impl IntoQuotedMut<'a, F, L> + Copy,
1595 ) -> KeyedSingleton<K, A, L, B::WithBoundedValue>
1596 where
1597 O: IsOrdered,
1598 R: IsExactlyOnce,
1599 K: Clone + Eq + Hash,
1600 I: Fn() -> A + 'a,
1601 F: Fn(&mut A, V) -> bool + 'a,
1602 {
1603 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1604 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1605 let out_without_bound_cast = self.generator(
1606 q!(move || Some(init())),
1607 q!(move |key_state, v| {
1608 if let Some(key_state_value) = key_state.as_mut() {
1609 if f(key_state_value, v) {
1610 Generate::Return(key_state.take().unwrap())
1611 } else {
1612 Generate::Continue
1613 }
1614 } else {
1615 unreachable!()
1616 }
1617 }),
1618 );
1619
1620 // SAFETY: The generator will only ever return at most one value per key, since once it
1621 // returns a value for a key it will never process any more values for that key.
1622 out_without_bound_cast.cast_at_most_one_entry_per_key()
1623 }
1624
1625 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1626 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1627 /// otherwise the first element would be non-deterministic.
1628 ///
1629 /// # Example
1630 /// ```rust
1631 /// # #[cfg(feature = "deploy")] {
1632 /// # use hydro_lang::prelude::*;
1633 /// # use futures::StreamExt;
1634 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1635 /// process
1636 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1637 /// .into_keyed()
1638 /// .first()
1639 /// # .entries()
1640 /// # }, |mut stream| async move {
1641 /// // Output: { 0: 2, 1: 3 }
1642 /// # let mut results = Vec::new();
1643 /// # for _ in 0..2 {
1644 /// # results.push(stream.next().await.unwrap());
1645 /// # }
1646 /// # results.sort();
1647 /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1648 /// # }));
1649 /// # }
1650 /// ```
1651 pub fn first(self) -> KeyedSingleton<K, V, L, B::WithBoundedValue>
1652 where
1653 O: IsOrdered,
1654 R: IsExactlyOnce,
1655 K: Clone + Eq + Hash,
1656 {
1657 self.fold_early_stop(
1658 q!(|| None),
1659 q!(|acc, v| {
1660 *acc = Some(v);
1661 true
1662 }),
1663 )
1664 .map(q!(|v| v.unwrap()))
1665 }
1666
1667 /// Returns a keyed stream containing at most the first `n` values per key,
1668 /// preserving the original order within each group. Similar to SQL `LIMIT`
1669 /// applied per group.
1670 ///
1671 /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1672 /// retries, since the result depends on the order and cardinality of elements
1673 /// within each group.
1674 ///
1675 /// # Example
1676 /// ```rust
1677 /// # #[cfg(feature = "deploy")] {
1678 /// # use hydro_lang::prelude::*;
1679 /// # use futures::StreamExt;
1680 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1681 /// process
1682 /// .source_iter(q!(vec![(1, 10), (1, 20), (1, 30), (2, 40), (2, 50)]))
1683 /// .into_keyed()
1684 /// .limit(q!(2))
1685 /// # .entries()
1686 /// # }, |mut stream| async move {
1687 /// // { 1: [10, 20], 2: [40, 50] }
1688 /// # let mut results = Vec::new();
1689 /// # for _ in 0..4 {
1690 /// # results.push(stream.next().await.unwrap());
1691 /// # }
1692 /// # results.sort();
1693 /// # assert_eq!(results, vec![(1, 10), (1, 20), (2, 40), (2, 50)]);
1694 /// # }));
1695 /// # }
1696 /// ```
1697 pub fn limit(
1698 self,
1699 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1700 ) -> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1701 where
1702 O: IsOrdered,
1703 R: IsExactlyOnce,
1704 K: Clone + Eq + Hash,
1705 {
1706 self.generator(
1707 q!(|| 0usize),
1708 q!(move |count, item| {
1709 if *count == n {
1710 Generate::Break
1711 } else {
1712 *count += 1;
1713 if *count == n {
1714 Generate::Return(item)
1715 } else {
1716 Generate::Yield(item)
1717 }
1718 }
1719 }),
1720 )
1721 }
1722
1723 /// Assigns a zero-based index to each value within each key group, emitting
1724 /// `(K, (index, V))` tuples with per-key sequential indices.
1725 ///
1726 /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1727 /// This is a streaming operator that processes elements as they arrive.
1728 ///
1729 /// # Example
1730 /// ```rust
1731 /// # #[cfg(feature = "deploy")] {
1732 /// # use hydro_lang::prelude::*;
1733 /// # use futures::StreamExt;
1734 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1735 /// process
1736 /// .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1737 /// .into_keyed()
1738 /// .enumerate()
1739 /// # .entries()
1740 /// # }, |mut stream| async move {
1741 /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1742 /// # let mut results = Vec::new();
1743 /// # for _ in 0..3 {
1744 /// # results.push(stream.next().await.unwrap());
1745 /// # }
1746 /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1747 /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1748 /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1749 /// # assert_eq!(key2, vec![(0, 20)]);
1750 /// # }));
1751 /// # }
1752 /// ```
1753 pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1754 where
1755 O: IsOrdered,
1756 R: IsExactlyOnce,
1757 K: Eq + Hash + Clone,
1758 {
1759 self.scan(
1760 q!(|| 0),
1761 q!(|acc, next| {
1762 let curr = *acc;
1763 *acc += 1;
1764 Some((curr, next))
1765 }),
1766 )
1767 }
1768
1769 /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1770 ///
1771 /// # Example
1772 /// ```rust
1773 /// # #[cfg(feature = "deploy")] {
1774 /// # use hydro_lang::prelude::*;
1775 /// # use futures::StreamExt;
1776 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1777 /// let tick = process.tick();
1778 /// let numbers = process
1779 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1780 /// .into_keyed();
1781 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1782 /// batch
1783 /// .value_counts()
1784 /// .entries()
1785 /// .all_ticks()
1786 /// # }, |mut stream| async move {
1787 /// // (1, 3), (2, 2)
1788 /// # let mut results = Vec::new();
1789 /// # for _ in 0..2 {
1790 /// # results.push(stream.next().await.unwrap());
1791 /// # }
1792 /// # results.sort();
1793 /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1794 /// # }));
1795 /// # }
1796 /// ```
1797 pub fn value_counts(
1798 self,
1799 ) -> KeyedSingleton<K, usize, L, <B as KeyedSingletonBound>::KeyedStreamToMonotone>
1800 where
1801 R: IsExactlyOnce,
1802 K: Eq + Hash,
1803 {
1804 self.make_exactly_once()
1805 .assume_ordering_trusted(
1806 nondet!(/** ordering within each group affects neither result nor intermediates */),
1807 )
1808 .fold(
1809 q!(|| 0),
1810 q!(
1811 |acc, _| *acc += 1,
1812 monotone = manual_proof!(/** += 1 is monotonic */)
1813 ),
1814 )
1815 }
1816
1817 /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1818 /// group via the `comb` closure.
1819 ///
1820 /// Depending on the input stream guarantees, the closure may need to be commutative
1821 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1822 ///
1823 /// If the input and output value types are the same and do not require initialization then use
1824 /// [`KeyedStream::reduce`].
1825 ///
1826 /// # Example
1827 /// ```rust
1828 /// # #[cfg(feature = "deploy")] {
1829 /// # use hydro_lang::prelude::*;
1830 /// # use futures::StreamExt;
1831 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1832 /// let tick = process.tick();
1833 /// let numbers = process
1834 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1835 /// .into_keyed();
1836 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1837 /// batch
1838 /// .fold(q!(|| false), q!(|acc, x| *acc |= x))
1839 /// .entries()
1840 /// .all_ticks()
1841 /// # }, |mut stream| async move {
1842 /// // (1, false), (2, true)
1843 /// # let mut results = Vec::new();
1844 /// # for _ in 0..2 {
1845 /// # results.push(stream.next().await.unwrap());
1846 /// # }
1847 /// # results.sort();
1848 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1849 /// # }));
1850 /// # }
1851 /// ```
1852 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp, M, B2: KeyedSingletonBound>(
1853 self,
1854 init: impl IntoQuotedMut<'a, I, L>,
1855 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1856 ) -> KeyedSingleton<K, A, L, B2>
1857 where
1858 K: Eq + Hash,
1859 C: ValidCommutativityFor<O>,
1860 Idemp: ValidIdempotenceFor<R>,
1861 B: ApplyMonotoneKeyedStream<M, B2>,
1862 {
1863 let init = init.splice_fn0_ctx(&self.location).into();
1864 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1865 proof.register_proof(&comb);
1866
1867 let retried = self
1868 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */));
1869
1870 KeyedSingleton::new(
1871 retried.location.clone(),
1872 HydroNode::FoldKeyed {
1873 init,
1874 acc: comb.into(),
1875 input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1876 metadata: retried
1877 .location
1878 .new_node_metadata(KeyedSingleton::<K, A, L, B2>::collection_kind()),
1879 },
1880 )
1881 }
1882
1883 /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1884 /// group via the `comb` closure.
1885 ///
1886 /// Depending on the input stream guarantees, the closure may need to be commutative
1887 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1888 ///
1889 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1890 ///
1891 /// # Example
1892 /// ```rust
1893 /// # #[cfg(feature = "deploy")] {
1894 /// # use hydro_lang::prelude::*;
1895 /// # use futures::StreamExt;
1896 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1897 /// let tick = process.tick();
1898 /// let numbers = process
1899 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1900 /// .into_keyed();
1901 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1902 /// batch
1903 /// .reduce(q!(|acc, x| *acc |= x))
1904 /// .entries()
1905 /// .all_ticks()
1906 /// # }, |mut stream| async move {
1907 /// // (1, false), (2, true)
1908 /// # let mut results = Vec::new();
1909 /// # for _ in 0..2 {
1910 /// # results.push(stream.next().await.unwrap());
1911 /// # }
1912 /// # results.sort();
1913 /// # assert_eq!(results, vec![(1, false), (2, true)]);
1914 /// # }));
1915 /// # }
1916 /// ```
1917 pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
1918 self,
1919 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1920 ) -> KeyedSingleton<K, V, L, B>
1921 where
1922 K: Eq + Hash,
1923 C: ValidCommutativityFor<O>,
1924 Idemp: ValidIdempotenceFor<R>,
1925 {
1926 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1927 proof.register_proof(&f);
1928
1929 let ordered = self
1930 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1931 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1932
1933 KeyedSingleton::new(
1934 ordered.location.clone(),
1935 HydroNode::ReduceKeyed {
1936 f: f.into(),
1937 input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1938 metadata: ordered
1939 .location
1940 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1941 },
1942 )
1943 }
1944
1945 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
1946 /// are automatically deleted.
1947 ///
1948 /// Depending on the input stream guarantees, the closure may need to be commutative
1949 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1950 ///
1951 /// # Example
1952 /// ```rust
1953 /// # #[cfg(feature = "deploy")] {
1954 /// # use hydro_lang::prelude::*;
1955 /// # use futures::StreamExt;
1956 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1957 /// let tick = process.tick();
1958 /// let watermark = tick.singleton(q!(2));
1959 /// let numbers = process
1960 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1961 /// .into_keyed();
1962 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1963 /// batch
1964 /// .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
1965 /// .entries()
1966 /// .all_ticks()
1967 /// # }, |mut stream| async move {
1968 /// // (2, true)
1969 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1970 /// # }));
1971 /// # }
1972 /// ```
1973 pub fn reduce_watermark<O2, F, C, Idemp>(
1974 self,
1975 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1976 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1977 ) -> KeyedSingleton<K, V, L, B>
1978 where
1979 K: Eq + Hash,
1980 O2: Clone,
1981 F: Fn(&mut V, V) + 'a,
1982 C: ValidCommutativityFor<O>,
1983 Idemp: ValidIdempotenceFor<R>,
1984 {
1985 let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
1986 check_matching_location(&self.location.root(), other.location.outer());
1987 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1988 proof.register_proof(&f);
1989
1990 let ordered = self
1991 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1992 .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1993
1994 KeyedSingleton::new(
1995 ordered.location.clone(),
1996 HydroNode::ReduceKeyedWatermark {
1997 f: f.into(),
1998 input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1999 watermark: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2000 metadata: ordered
2001 .location
2002 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
2003 },
2004 )
2005 }
2006
2007 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
2008 /// whose keys are not in the bounded stream.
2009 ///
2010 /// # Example
2011 /// ```rust
2012 /// # #[cfg(feature = "deploy")] {
2013 /// # use hydro_lang::prelude::*;
2014 /// # use futures::StreamExt;
2015 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2016 /// let tick = process.tick();
2017 /// let keyed_stream = process
2018 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2019 /// .batch(&tick, nondet!(/** test */))
2020 /// .into_keyed();
2021 /// let keys_to_remove = process
2022 /// .source_iter(q!(vec![1, 2]))
2023 /// .batch(&tick, nondet!(/** test */));
2024 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
2025 /// # .entries()
2026 /// # }, |mut stream| async move {
2027 /// // { 3: ['c'], 4: ['d'] }
2028 /// # let mut results = Vec::new();
2029 /// # for _ in 0..2 {
2030 /// # results.push(stream.next().await.unwrap());
2031 /// # }
2032 /// # results.sort();
2033 /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
2034 /// # }));
2035 /// # }
2036 /// ```
2037 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
2038 self,
2039 other: Stream<K, L, Bounded, O2, R2>,
2040 ) -> Self
2041 where
2042 K: Eq + Hash,
2043 {
2044 check_matching_location(&self.location, &other.location);
2045
2046 KeyedStream::new(
2047 self.location.clone(),
2048 HydroNode::AntiJoin {
2049 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2050 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2051 metadata: self.location.new_node_metadata(Self::collection_kind()),
2052 },
2053 )
2054 }
2055
2056 /// Emit a keyed stream containing keys shared between two keyed streams,
2057 /// where each value in the output keyed stream is a tuple of
2058 /// (self's value, other's value).
2059 /// If there are multiple values for the same key, this performs a cross product
2060 /// for each matching key.
2061 ///
2062 /// # Example
2063 /// ```rust
2064 /// # #[cfg(feature = "deploy")] {
2065 /// # use hydro_lang::prelude::*;
2066 /// # use futures::StreamExt;
2067 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2068 /// let tick = process.tick();
2069 /// let keyed_data = process
2070 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2071 /// .into_keyed()
2072 /// .batch(&tick, nondet!(/** test */));
2073 /// let other_data = process
2074 /// .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
2075 /// .into_keyed()
2076 /// .batch(&tick, nondet!(/** test */));
2077 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
2078 /// # }, |mut stream| async move {
2079 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
2080 /// # let mut results = vec![];
2081 /// # for _ in 0..4 {
2082 /// # results.push(stream.next().await.unwrap());
2083 /// # }
2084 /// # results.sort();
2085 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
2086 /// # }));
2087 /// # }
2088 /// ```
2089 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2090 pub fn join_keyed_stream<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2091 self,
2092 other: KeyedStream<K, V2, L, B2, O2, R2>,
2093 ) -> KeyedStream<
2094 K,
2095 (V, V2),
2096 L,
2097 B,
2098 B2::PreserveOrderIfBounded<NoOrder>,
2099 <R as MinRetries<R2>>::Min,
2100 >
2101 where
2102 K: Eq + Hash + Clone,
2103 R: MinRetries<R2>,
2104 V: Clone,
2105 V2: Clone,
2106 {
2107 self.entries().join(other.entries()).into_keyed()
2108 }
2109
2110 /// Deduplicates values within each key group, emitting each unique value per key
2111 /// exactly once.
2112 ///
2113 /// # Example
2114 /// ```rust
2115 /// # #[cfg(feature = "deploy")] {
2116 /// # use hydro_lang::prelude::*;
2117 /// # use futures::StreamExt;
2118 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2119 /// process
2120 /// .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
2121 /// .into_keyed()
2122 /// .unique()
2123 /// # .entries()
2124 /// # }, |mut stream| async move {
2125 /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
2126 /// # let mut results = Vec::new();
2127 /// # for _ in 0..4 {
2128 /// # results.push(stream.next().await.unwrap());
2129 /// # }
2130 /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2131 /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2132 /// # key1.sort();
2133 /// # key2.sort();
2134 /// # assert_eq!(key1, vec![10, 20]);
2135 /// # assert_eq!(key2, vec![20, 30]);
2136 /// # }));
2137 /// # }
2138 /// ```
2139 pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
2140 where
2141 K: Eq + Hash + Clone,
2142 V: Eq + Hash + Clone,
2143 {
2144 self.entries().unique().into_keyed()
2145 }
2146
2147 /// Sorts the values within each key group in ascending order.
2148 ///
2149 /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
2150 /// each group. This operator will block until all elements in the input stream
2151 /// are available, so it requires the input stream to be [`Bounded`].
2152 ///
2153 /// # Example
2154 /// ```rust
2155 /// # #[cfg(feature = "deploy")] {
2156 /// # use hydro_lang::prelude::*;
2157 /// # use futures::StreamExt;
2158 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2159 /// let tick = process.tick();
2160 /// let numbers = process
2161 /// .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
2162 /// .into_keyed();
2163 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2164 /// batch.sort().all_ticks()
2165 /// # .entries()
2166 /// # }, |mut stream| async move {
2167 /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
2168 /// # let mut results = Vec::new();
2169 /// # for _ in 0..4 {
2170 /// # results.push(stream.next().await.unwrap());
2171 /// # }
2172 /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2173 /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2174 /// # assert_eq!(key1_vals, vec![1, 3]);
2175 /// # assert_eq!(key2_vals, vec![1, 2]);
2176 /// # }));
2177 /// # }
2178 /// ```
2179 pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
2180 where
2181 B: IsBounded,
2182 K: Ord,
2183 V: Ord,
2184 {
2185 self.entries().sort().into_keyed()
2186 }
2187
2188 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2189 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2190 /// is only present in one of the inputs, its values are passed through as-is). The output has
2191 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2192 ///
2193 /// Currently, both input streams must be [`Bounded`]. This operator will block
2194 /// on the first stream until all its elements are available. In a future version,
2195 /// we will relax the requirement on the `other` stream.
2196 ///
2197 /// # Example
2198 /// ```rust
2199 /// # #[cfg(feature = "deploy")] {
2200 /// # use hydro_lang::prelude::*;
2201 /// # use futures::StreamExt;
2202 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2203 /// let tick = process.tick();
2204 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2205 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2206 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2207 /// # .entries()
2208 /// # }, |mut stream| async move {
2209 /// // { 0: [2, 1], 1: [4, 3] }
2210 /// # let mut results = Vec::new();
2211 /// # for _ in 0..4 {
2212 /// # results.push(stream.next().await.unwrap());
2213 /// # }
2214 /// # results.sort();
2215 /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2216 /// # }));
2217 /// # }
2218 /// ```
2219 pub fn chain<O2: Ordering, R2: Retries>(
2220 self,
2221 other: KeyedStream<K, V, L, Bounded, O2, R2>,
2222 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2223 where
2224 B: IsBounded,
2225 O: MinOrder<O2>,
2226 R: MinRetries<R2>,
2227 {
2228 let this = self.make_bounded();
2229 check_matching_location(&this.location, &other.location);
2230
2231 KeyedStream::new(
2232 this.location.clone(),
2233 HydroNode::Chain {
2234 first: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2235 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2236 metadata: this.location.new_node_metadata(KeyedStream::<
2237 K,
2238 V,
2239 L,
2240 Bounded,
2241 <O as MinOrder<O2>>::Min,
2242 <R as MinRetries<R2>>::Min,
2243 >::collection_kind()),
2244 },
2245 )
2246 }
2247
2248 /// Emit a keyed stream containing keys shared between the keyed stream and the
2249 /// keyed singleton, where each value in the output keyed stream is a tuple of
2250 /// (the keyed stream's value, the keyed singleton's value).
2251 ///
2252 /// # Example
2253 /// ```rust
2254 /// # #[cfg(feature = "deploy")] {
2255 /// # use hydro_lang::prelude::*;
2256 /// # use futures::StreamExt;
2257 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2258 /// let tick = process.tick();
2259 /// let keyed_data = process
2260 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2261 /// .into_keyed()
2262 /// .batch(&tick, nondet!(/** test */));
2263 /// let singleton_data = process
2264 /// .source_iter(q!(vec![(1, 100), (2, 200)]))
2265 /// .into_keyed()
2266 /// .batch(&tick, nondet!(/** test */))
2267 /// .first();
2268 /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2269 /// # }, |mut stream| async move {
2270 /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2271 /// # let mut results = vec![];
2272 /// # for _ in 0..3 {
2273 /// # results.push(stream.next().await.unwrap());
2274 /// # }
2275 /// # results.sort();
2276 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2277 /// # }));
2278 /// # }
2279 /// ```
2280 pub fn join_keyed_singleton<V2: Clone, B2: IsBounded>(
2281 self,
2282 other: KeyedSingleton<K, V2, L, B2>,
2283 ) -> KeyedStream<K, (V, V2), L, B, O, R>
2284 where
2285 K: Eq + Hash + Clone,
2286 V: Clone,
2287 {
2288 let ir_node = if B2::BOUNDED {
2289 HydroNode::JoinHalf {
2290 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2291 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2292 metadata: self
2293 .location
2294 .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2295 }
2296 } else {
2297 HydroNode::Join {
2298 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2299 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2300 metadata: self
2301 .location
2302 .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2303 }
2304 };
2305
2306 KeyedStream::new(self.location.clone(), ir_node)
2307 }
2308
2309 /// Gets the values associated with a specific key from the keyed stream.
2310 /// Returns an empty stream if the key is `None` or there are no associated values.
2311 ///
2312 /// # Example
2313 /// ```rust
2314 /// # #[cfg(feature = "deploy")] {
2315 /// # use hydro_lang::prelude::*;
2316 /// # use futures::StreamExt;
2317 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2318 /// let tick = process.tick();
2319 /// let keyed_data = process
2320 /// .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2321 /// .into_keyed()
2322 /// .batch(&tick, nondet!(/** test */));
2323 /// let key = tick.singleton(q!(1));
2324 /// keyed_data.get(key).all_ticks()
2325 /// # }, |mut stream| async move {
2326 /// // 10, 11
2327 /// # let mut results = vec![];
2328 /// # for _ in 0..2 {
2329 /// # results.push(stream.next().await.unwrap());
2330 /// # }
2331 /// # results.sort();
2332 /// # assert_eq!(results, vec![10, 11]);
2333 /// # }));
2334 /// # }
2335 /// ```
2336 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Stream<V, L, B, O, R>
2337 where
2338 K: Eq + Hash + Clone,
2339 V: Clone,
2340 {
2341 let joined =
2342 self.join_keyed_singleton(key.into().map(q!(|k| (k, ()))).into_keyed_singleton());
2343
2344 if O::ORDERING_KIND == StreamOrder::TotalOrder {
2345 joined
2346 .use_ordering_type::<TotalOrder>()
2347 .cast_at_most_one_key()
2348 .map(q!(|(_, (v, _))| v))
2349 .weaken_ordering()
2350 } else {
2351 joined.values().map(q!(|(v, _)| v)).use_ordering_type()
2352 }
2353 }
2354
2355 /// For each value in `self`, find the matching key in `lookup`.
2356 /// The output is a keyed stream with the key from `self`, and a value
2357 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2358 /// If the key is not present in `lookup`, the option will be [`None`].
2359 ///
2360 /// # Example
2361 /// ```rust
2362 /// # #[cfg(feature = "deploy")] {
2363 /// # use hydro_lang::prelude::*;
2364 /// # use futures::StreamExt;
2365 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2366 /// # let tick = process.tick();
2367 /// let requests = // { 1: [10, 11], 2: 20 }
2368 /// # process
2369 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2370 /// # .into_keyed()
2371 /// # .batch(&tick, nondet!(/** test */));
2372 /// let other_data = // { 10: 100, 11: 110 }
2373 /// # process
2374 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
2375 /// # .into_keyed()
2376 /// # .batch(&tick, nondet!(/** test */))
2377 /// # .first();
2378 /// requests.lookup_keyed_singleton(other_data)
2379 /// # .entries().all_ticks()
2380 /// # }, |mut stream| async move {
2381 /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2382 /// # let mut results = vec![];
2383 /// # for _ in 0..3 {
2384 /// # results.push(stream.next().await.unwrap());
2385 /// # }
2386 /// # results.sort();
2387 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2388 /// # }));
2389 /// # }
2390 /// ```
2391 pub fn lookup_keyed_singleton<V2>(
2392 self,
2393 lookup: KeyedSingleton<V, V2, L, Bounded>,
2394 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2395 where
2396 B: IsBounded,
2397 K: Eq + Hash + Clone,
2398 V: Eq + Hash + Clone,
2399 V2: Clone,
2400 {
2401 self.lookup_keyed_stream(
2402 lookup
2403 .into_keyed_stream()
2404 .assume_retries::<R>(nondet!(/** Retries are irrelevant for keyed singletons */)),
2405 )
2406 }
2407
2408 /// For each value in `self`, find the matching key in `lookup`.
2409 /// The output is a keyed stream with the key from `self`, and a value
2410 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2411 /// If the key is not present in `lookup`, the option will be [`None`].
2412 ///
2413 /// # Example
2414 /// ```rust
2415 /// # #[cfg(feature = "deploy")] {
2416 /// # use hydro_lang::prelude::*;
2417 /// # use futures::StreamExt;
2418 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2419 /// # let tick = process.tick();
2420 /// let requests = // { 1: [10, 11], 2: 20 }
2421 /// # process
2422 /// # .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2423 /// # .into_keyed()
2424 /// # .batch(&tick, nondet!(/** test */));
2425 /// let other_data = // { 10: [100, 101], 11: 110 }
2426 /// # process
2427 /// # .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2428 /// # .into_keyed()
2429 /// # .batch(&tick, nondet!(/** test */));
2430 /// requests.lookup_keyed_stream(other_data)
2431 /// # .entries().all_ticks()
2432 /// # }, |mut stream| async move {
2433 /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2434 /// # let mut results = vec![];
2435 /// # for _ in 0..4 {
2436 /// # results.push(stream.next().await.unwrap());
2437 /// # }
2438 /// # results.sort();
2439 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2440 /// # }));
2441 /// # }
2442 /// ```
2443 #[expect(clippy::type_complexity, reason = "retries propagation")]
2444 pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2445 self,
2446 lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2447 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2448 where
2449 B: IsBounded,
2450 K: Eq + Hash + Clone,
2451 V: Eq + Hash + Clone,
2452 V2: Clone,
2453 R: MinRetries<R2>,
2454 {
2455 let inverted = self
2456 .make_bounded()
2457 .entries()
2458 .map(q!(|(key, lookup_value)| (lookup_value, key)))
2459 .into_keyed();
2460 let found = inverted
2461 .clone()
2462 .join_keyed_stream(lookup.clone())
2463 .entries()
2464 .map(q!(|(lookup_value, (key, value))| (
2465 key,
2466 (lookup_value, Some(value))
2467 )))
2468 .into_keyed();
2469 let not_found = inverted
2470 .filter_key_not_in(lookup.keys())
2471 .entries()
2472 .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2473 .into_keyed();
2474
2475 found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2476 }
2477
2478 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2479 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2480 ///
2481 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2482 /// processed before an acknowledgement is emitted.
2483 pub fn atomic(self) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2484 let id = self.location.flow_state().borrow_mut().next_clock_id();
2485 let out_location = Atomic {
2486 tick: Tick {
2487 id,
2488 l: self.location.clone(),
2489 },
2490 };
2491 KeyedStream::new(
2492 out_location.clone(),
2493 HydroNode::BeginAtomic {
2494 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2495 metadata: out_location
2496 .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2497 },
2498 )
2499 }
2500
2501 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2502 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2503 /// the order of the input.
2504 ///
2505 /// # Non-Determinism
2506 /// The batch boundaries are non-deterministic and may change across executions.
2507 pub fn batch(
2508 self,
2509 tick: &Tick<L>,
2510 nondet: NonDet,
2511 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2512 let _ = nondet;
2513 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2514 KeyedStream::new(
2515 tick.clone(),
2516 HydroNode::Batch {
2517 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2518 metadata: tick.new_node_metadata(
2519 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2520 ),
2521 },
2522 )
2523 }
2524}
2525
2526impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2527 KeyedStream<(K1, K2), V, L, B, O, R>
2528{
2529 /// Produces a new keyed stream by dropping the first element of the compound key.
2530 ///
2531 /// Because multiple keys may share the same suffix, this operation results in re-grouping
2532 /// of the values under the new keys. The values across groups with the same new key
2533 /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2534 ///
2535 /// # Example
2536 /// ```rust
2537 /// # #[cfg(feature = "deploy")] {
2538 /// # use hydro_lang::prelude::*;
2539 /// # use futures::StreamExt;
2540 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2541 /// process
2542 /// .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2543 /// .into_keyed()
2544 /// .drop_key_prefix()
2545 /// # .entries()
2546 /// # }, |mut stream| async move {
2547 /// // { 10: [2, 3], 20: [4] }
2548 /// # let mut results = Vec::new();
2549 /// # for _ in 0..3 {
2550 /// # results.push(stream.next().await.unwrap());
2551 /// # }
2552 /// # results.sort();
2553 /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2554 /// # }));
2555 /// # }
2556 /// ```
2557 pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2558 self.entries()
2559 .map(q!(|((_k1, k2), v)| (k2, v)))
2560 .into_keyed()
2561 }
2562}
2563
2564impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries>
2565 KeyedStream<K, V, L, Unbounded, O, R>
2566{
2567 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2568 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2569 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2570 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2571 ///
2572 /// Currently, both input streams must be [`Unbounded`].
2573 ///
2574 /// # Example
2575 /// ```rust
2576 /// # #[cfg(feature = "deploy")] {
2577 /// # use hydro_lang::prelude::*;
2578 /// # use futures::StreamExt;
2579 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2580 /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2581 /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2582 /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2583 /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2584 /// numbers1.merge_unordered(numbers2)
2585 /// # .entries()
2586 /// # }, |mut stream| async move {
2587 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2588 /// # let mut results = Vec::new();
2589 /// # for _ in 0..4 {
2590 /// # results.push(stream.next().await.unwrap());
2591 /// # }
2592 /// # results.sort();
2593 /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2594 /// # }));
2595 /// # }
2596 /// ```
2597 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2598 self,
2599 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2600 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2601 where
2602 R: MinRetries<R2>,
2603 {
2604 KeyedStream::new(
2605 self.location.clone(),
2606 HydroNode::Chain {
2607 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2608 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2609 metadata: self.location.new_node_metadata(KeyedStream::<
2610 K,
2611 V,
2612 L,
2613 Unbounded,
2614 NoOrder,
2615 <R as MinRetries<R2>>::Min,
2616 >::collection_kind()),
2617 },
2618 )
2619 }
2620
2621 /// Deprecated: use [`KeyedStream::merge_unordered`] instead.
2622 #[deprecated(note = "use `merge_unordered` instead")]
2623 pub fn interleave<O2: Ordering, R2: Retries>(
2624 self,
2625 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2626 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2627 where
2628 R: MinRetries<R2>,
2629 {
2630 self.merge_unordered(other)
2631 }
2632}
2633
2634impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2635where
2636 L: Location<'a> + NoTick,
2637{
2638 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2639 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2640 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2641 /// used to create the atomic section.
2642 ///
2643 /// # Non-Determinism
2644 /// The batch boundaries are non-deterministic and may change across executions.
2645 pub fn batch_atomic(
2646 self,
2647 tick: &Tick<L>,
2648 nondet: NonDet,
2649 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2650 let _ = nondet;
2651 KeyedStream::new(
2652 tick.clone(),
2653 HydroNode::Batch {
2654 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2655 metadata: tick.new_node_metadata(
2656 KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2657 ),
2658 },
2659 )
2660 }
2661
2662 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2663 /// See [`KeyedStream::atomic`] for more details.
2664 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2665 KeyedStream::new(
2666 self.location.tick.l.clone(),
2667 HydroNode::EndAtomic {
2668 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2669 metadata: self
2670 .location
2671 .tick
2672 .l
2673 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2674 },
2675 )
2676 }
2677}
2678
2679impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2680where
2681 L: Location<'a>,
2682{
2683 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2684 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2685 /// each key.
2686 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2687 KeyedStream::new(
2688 self.location.outer().clone(),
2689 HydroNode::YieldConcat {
2690 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2691 metadata: self.location.outer().new_node_metadata(KeyedStream::<
2692 K,
2693 V,
2694 L,
2695 Unbounded,
2696 O,
2697 R,
2698 >::collection_kind(
2699 )),
2700 },
2701 )
2702 }
2703
2704 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2705 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2706 /// each key.
2707 ///
2708 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2709 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2710 /// stream's [`Tick`] context.
2711 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2712 let out_location = Atomic {
2713 tick: self.location.clone(),
2714 };
2715
2716 KeyedStream::new(
2717 out_location.clone(),
2718 HydroNode::YieldConcat {
2719 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2720 metadata: out_location.new_node_metadata(KeyedStream::<
2721 K,
2722 V,
2723 Atomic<L>,
2724 Unbounded,
2725 O,
2726 R,
2727 >::collection_kind()),
2728 },
2729 )
2730 }
2731
2732 /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2733 /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2734 ///
2735 /// This API is particularly useful for stateful computation on batches of data, such as
2736 /// maintaining an accumulated state that is up to date with the current batch.
2737 ///
2738 /// # Example
2739 /// ```rust
2740 /// # #[cfg(feature = "deploy")] {
2741 /// # use hydro_lang::prelude::*;
2742 /// # use futures::StreamExt;
2743 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2744 /// let tick = process.tick();
2745 /// # // ticks are lazy by default, forces the second tick to run
2746 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2747 /// # let batch_first_tick = process
2748 /// # .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2749 /// # .into_keyed()
2750 /// # .batch(&tick, nondet!(/** test */));
2751 /// # let batch_second_tick = process
2752 /// # .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2753 /// # .into_keyed()
2754 /// # .batch(&tick, nondet!(/** test */))
2755 /// # .defer_tick(); // appears on the second tick
2756 /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2757 ///
2758 /// input.batch(&tick, nondet!(/** test */))
2759 /// .across_ticks(|s| s.reduce(q!(|sum, new| {
2760 /// *sum += new;
2761 /// }))).entries().all_ticks()
2762 /// # }, |mut stream| async move {
2763 /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2764 /// # let mut results = Vec::new();
2765 /// # for _ in 0..4 {
2766 /// # results.push(stream.next().await.unwrap());
2767 /// # }
2768 /// # results.sort();
2769 /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2770 /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2771 /// # results.clear();
2772 /// # for _ in 0..4 {
2773 /// # results.push(stream.next().await.unwrap());
2774 /// # }
2775 /// # results.sort();
2776 /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2777 /// # }));
2778 /// # }
2779 /// ```
2780 pub fn across_ticks<Out: BatchAtomic>(
2781 self,
2782 thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2783 ) -> Out::Batched {
2784 thunk(self.all_ticks_atomic()).batched_atomic()
2785 }
2786
2787 /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2788 /// tick `T` always has the entries of `self` at tick `T - 1`.
2789 ///
2790 /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2791 ///
2792 /// This operator enables stateful iterative processing with ticks, by sending data from one
2793 /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2794 ///
2795 /// # Example
2796 /// ```rust
2797 /// # #[cfg(feature = "deploy")] {
2798 /// # use hydro_lang::prelude::*;
2799 /// # use futures::StreamExt;
2800 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2801 /// let tick = process.tick();
2802 /// # // ticks are lazy by default, forces the second tick to run
2803 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2804 /// # let batch_first_tick = process
2805 /// # .source_iter(q!(vec![(1, 2), (1, 3)]))
2806 /// # .batch(&tick, nondet!(/** test */))
2807 /// # .into_keyed();
2808 /// # let batch_second_tick = process
2809 /// # .source_iter(q!(vec![(1, 4), (2, 5)]))
2810 /// # .batch(&tick, nondet!(/** test */))
2811 /// # .defer_tick()
2812 /// # .into_keyed(); // appears on the second tick
2813 /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2814 /// # batch_first_tick.chain(batch_second_tick);
2815 /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2816 /// changes_across_ticks // from the current tick
2817 /// )
2818 /// # .entries().all_ticks()
2819 /// # }, |mut stream| async move {
2820 /// // First tick: { 1: [2, 3] }
2821 /// # let mut results = Vec::new();
2822 /// # for _ in 0..2 {
2823 /// # results.push(stream.next().await.unwrap());
2824 /// # }
2825 /// # results.sort();
2826 /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2827 /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2828 /// # results.clear();
2829 /// # for _ in 0..4 {
2830 /// # results.push(stream.next().await.unwrap());
2831 /// # }
2832 /// # results.sort();
2833 /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2834 /// // Third tick: { 1: [4], 2: [5] }
2835 /// # results.clear();
2836 /// # for _ in 0..2 {
2837 /// # results.push(stream.next().await.unwrap());
2838 /// # }
2839 /// # results.sort();
2840 /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2841 /// # }));
2842 /// # }
2843 /// ```
2844 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2845 KeyedStream::new(
2846 self.location.clone(),
2847 HydroNode::DeferTick {
2848 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2849 metadata: self.location.new_node_metadata(KeyedStream::<
2850 K,
2851 V,
2852 Tick<L>,
2853 Bounded,
2854 O,
2855 R,
2856 >::collection_kind()),
2857 },
2858 )
2859 }
2860}
2861
2862#[cfg(test)]
2863mod tests {
2864 #[cfg(feature = "deploy")]
2865 use futures::{SinkExt, StreamExt};
2866 #[cfg(feature = "deploy")]
2867 use hydro_deploy::Deployment;
2868 #[cfg(any(feature = "deploy", feature = "sim"))]
2869 use stageleft::q;
2870
2871 #[cfg(any(feature = "deploy", feature = "sim"))]
2872 use crate::compile::builder::FlowBuilder;
2873 #[cfg(feature = "deploy")]
2874 use crate::live_collections::stream::ExactlyOnce;
2875 #[cfg(feature = "sim")]
2876 use crate::live_collections::stream::{NoOrder, TotalOrder};
2877 #[cfg(any(feature = "deploy", feature = "sim"))]
2878 use crate::location::Location;
2879 #[cfg(feature = "sim")]
2880 use crate::networking::TCP;
2881 #[cfg(any(feature = "deploy", feature = "sim"))]
2882 use crate::nondet::nondet;
2883 #[cfg(feature = "deploy")]
2884 use crate::properties::manual_proof;
2885
2886 #[cfg(feature = "deploy")]
2887 #[tokio::test]
2888 async fn get_unbounded_keyed_stream_bounded_singleton() {
2889 let mut deployment = Deployment::new();
2890
2891 let mut flow = FlowBuilder::new();
2892 let node = flow.process::<()>();
2893 let external = flow.external::<()>();
2894
2895 let (input_send, input_stream) =
2896 node.source_external_bincode::<_, (i32, i32), _, ExactlyOnce>(&external);
2897
2898 let key = node.singleton(q!(1));
2899
2900 let out = input_stream
2901 .into_keyed()
2902 .get(key)
2903 .send_bincode_external(&external);
2904
2905 let nodes = flow
2906 .with_process(&node, deployment.Localhost())
2907 .with_external(&external, deployment.Localhost())
2908 .deploy(&mut deployment);
2909
2910 deployment.deploy().await.unwrap();
2911
2912 let mut input_send = nodes.connect(input_send).await;
2913 let mut out = nodes.connect(out).await;
2914
2915 deployment.start().await.unwrap();
2916
2917 // First batch
2918 input_send.send((1, 10)).await.unwrap();
2919 input_send.send((2, 20)).await.unwrap();
2920 assert_eq!(out.next().await.unwrap(), 10);
2921
2922 // Second batch
2923 input_send.send((1, 11)).await.unwrap();
2924 input_send.send((2, 21)).await.unwrap();
2925 assert_eq!(out.next().await.unwrap(), 11);
2926 }
2927
2928 #[cfg(feature = "deploy")]
2929 #[tokio::test]
2930 async fn reduce_watermark_filter() {
2931 let mut deployment = Deployment::new();
2932
2933 let mut flow = FlowBuilder::new();
2934 let node = flow.process::<()>();
2935 let external = flow.external::<()>();
2936
2937 let node_tick = node.tick();
2938 let watermark = node_tick.singleton(q!(2));
2939
2940 let sum = node
2941 .source_stream(q!(tokio_stream::iter([
2942 (0, 100),
2943 (1, 101),
2944 (2, 102),
2945 (2, 102)
2946 ])))
2947 .into_keyed()
2948 .reduce_watermark(
2949 watermark,
2950 q!(|acc, v| {
2951 *acc += v;
2952 }),
2953 )
2954 .snapshot(&node_tick, nondet!(/** test */))
2955 .entries()
2956 .all_ticks()
2957 .send_bincode_external(&external);
2958
2959 let nodes = flow
2960 .with_process(&node, deployment.Localhost())
2961 .with_external(&external, deployment.Localhost())
2962 .deploy(&mut deployment);
2963
2964 deployment.deploy().await.unwrap();
2965
2966 let mut out = nodes.connect(sum).await;
2967
2968 deployment.start().await.unwrap();
2969
2970 assert_eq!(out.next().await.unwrap(), (2, 204));
2971 }
2972
2973 #[cfg(feature = "deploy")]
2974 #[tokio::test]
2975 async fn reduce_watermark_bounded() {
2976 let mut deployment = Deployment::new();
2977
2978 let mut flow = FlowBuilder::new();
2979 let node = flow.process::<()>();
2980 let external = flow.external::<()>();
2981
2982 let node_tick = node.tick();
2983 let watermark = node_tick.singleton(q!(2));
2984
2985 let sum = node
2986 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
2987 .into_keyed()
2988 .reduce_watermark(
2989 watermark,
2990 q!(|acc, v| {
2991 *acc += v;
2992 }),
2993 )
2994 .entries()
2995 .send_bincode_external(&external);
2996
2997 let nodes = flow
2998 .with_process(&node, deployment.Localhost())
2999 .with_external(&external, deployment.Localhost())
3000 .deploy(&mut deployment);
3001
3002 deployment.deploy().await.unwrap();
3003
3004 let mut out = nodes.connect(sum).await;
3005
3006 deployment.start().await.unwrap();
3007
3008 assert_eq!(out.next().await.unwrap(), (2, 204));
3009 }
3010
3011 #[cfg(feature = "deploy")]
3012 #[tokio::test]
3013 async fn reduce_watermark_garbage_collect() {
3014 let mut deployment = Deployment::new();
3015
3016 let mut flow = FlowBuilder::new();
3017 let node = flow.process::<()>();
3018 let external = flow.external::<()>();
3019 let (tick_send, tick_trigger) =
3020 node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
3021
3022 let node_tick = node.tick();
3023 let (watermark_complete_cycle, watermark) =
3024 node_tick.cycle_with_initial(node_tick.singleton(q!(2)));
3025 let next_watermark = watermark.clone().map(q!(|v| v + 1));
3026 watermark_complete_cycle.complete_next_tick(next_watermark);
3027
3028 let tick_triggered_input = node_tick
3029 .singleton(q!((3, 103)))
3030 .into_stream()
3031 .filter_if(
3032 tick_trigger
3033 .clone()
3034 .batch(&node_tick, nondet!(/** test */))
3035 .first()
3036 .is_some(),
3037 )
3038 .all_ticks();
3039
3040 let sum = node
3041 .source_stream(q!(tokio_stream::iter([
3042 (0, 100),
3043 (1, 101),
3044 (2, 102),
3045 (2, 102)
3046 ])))
3047 .merge_unordered(tick_triggered_input)
3048 .into_keyed()
3049 .reduce_watermark(
3050 watermark,
3051 q!(
3052 |acc, v| {
3053 *acc += v;
3054 },
3055 commutative = manual_proof!(/** integer addition is commutative */)
3056 ),
3057 )
3058 .snapshot(&node_tick, nondet!(/** test */))
3059 .entries()
3060 .all_ticks()
3061 .send_bincode_external(&external);
3062
3063 let nodes = flow
3064 .with_default_optimize()
3065 .with_process(&node, deployment.Localhost())
3066 .with_external(&external, deployment.Localhost())
3067 .deploy(&mut deployment);
3068
3069 deployment.deploy().await.unwrap();
3070
3071 let mut tick_send = nodes.connect(tick_send).await;
3072 let mut out_recv = nodes.connect(sum).await;
3073
3074 deployment.start().await.unwrap();
3075
3076 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
3077
3078 tick_send.send(()).await.unwrap();
3079
3080 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
3081 }
3082
3083 #[cfg(feature = "sim")]
3084 #[test]
3085 #[should_panic]
3086 fn sim_batch_nondet_size() {
3087 let mut flow = FlowBuilder::new();
3088 let node = flow.process::<()>();
3089
3090 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3091
3092 let tick = node.tick();
3093 let out_recv = input
3094 .batch(&tick, nondet!(/** test */))
3095 .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
3096 .entries()
3097 .all_ticks()
3098 .sim_output();
3099
3100 flow.sim().exhaustive(async || {
3101 out_recv
3102 .assert_yields_only_unordered([(1, vec![1, 2])])
3103 .await;
3104 });
3105 }
3106
3107 #[cfg(feature = "sim")]
3108 #[test]
3109 fn sim_batch_preserves_group_order() {
3110 let mut flow = FlowBuilder::new();
3111 let node = flow.process::<()>();
3112
3113 let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3114
3115 let tick = node.tick();
3116 let out_recv = input
3117 .batch(&tick, nondet!(/** test */))
3118 .all_ticks()
3119 .fold_early_stop(
3120 q!(|| 0),
3121 q!(|acc, v| {
3122 *acc = std::cmp::max(v, *acc);
3123 *acc >= 2
3124 }),
3125 )
3126 .entries()
3127 .sim_output();
3128
3129 let instances = flow.sim().exhaustive(async || {
3130 out_recv
3131 .assert_yields_only_unordered([(1, 2), (2, 3)])
3132 .await;
3133 });
3134
3135 assert_eq!(instances, 8);
3136 // - three cases: all three in a separate tick (pick where (2, 3) is)
3137 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
3138 // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
3139 // - one case: all three together
3140 }
3141
3142 #[cfg(feature = "sim")]
3143 #[test]
3144 fn sim_batch_unordered_shuffles() {
3145 let mut flow = FlowBuilder::new();
3146 let node = flow.process::<()>();
3147
3148 let input = node
3149 .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
3150 .into_keyed()
3151 .weaken_ordering::<NoOrder>();
3152
3153 let tick = node.tick();
3154 let out_recv = input
3155 .batch(&tick, nondet!(/** test */))
3156 .all_ticks()
3157 .entries()
3158 .sim_output();
3159
3160 let instances = flow.sim().exhaustive(async || {
3161 out_recv
3162 .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
3163 .await;
3164 });
3165
3166 assert_eq!(instances, 13);
3167 // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
3168 // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3169 // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
3170 // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3171 }
3172
3173 #[cfg(feature = "sim")]
3174 #[test]
3175 #[should_panic]
3176 fn sim_observe_order_batched() {
3177 let mut flow = FlowBuilder::new();
3178 let node = flow.process::<()>();
3179
3180 let (port, input) = node.sim_input::<_, NoOrder, _>();
3181
3182 let tick = node.tick();
3183 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3184 let out_recv = batch
3185 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3186 .all_ticks()
3187 .first()
3188 .entries()
3189 .sim_output();
3190
3191 flow.sim().exhaustive(async || {
3192 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3193 out_recv
3194 .assert_yields_only_unordered([(1, 1), (2, 1)])
3195 .await; // fails with assume_ordering
3196 });
3197 }
3198
3199 #[cfg(feature = "sim")]
3200 #[test]
3201 fn sim_observe_order_batched_count() {
3202 let mut flow = FlowBuilder::new();
3203 let node = flow.process::<()>();
3204
3205 let (port, input) = node.sim_input::<_, NoOrder, _>();
3206
3207 let tick = node.tick();
3208 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3209 let out_recv = batch
3210 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3211 .all_ticks()
3212 .entries()
3213 .sim_output();
3214
3215 let instance_count = flow.sim().exhaustive(async || {
3216 port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3217 let _ = out_recv.collect_sorted::<Vec<_>>().await;
3218 });
3219
3220 assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
3221 }
3222
3223 #[cfg(feature = "sim")]
3224 #[test]
3225 fn sim_top_level_assume_ordering() {
3226 use std::collections::HashMap;
3227
3228 let mut flow = FlowBuilder::new();
3229 let node = flow.process::<()>();
3230
3231 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3232
3233 let out_recv = input
3234 .into_keyed()
3235 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3236 .fold_early_stop(
3237 q!(|| Vec::new()),
3238 q!(|acc, v| {
3239 acc.push(v);
3240 acc.len() >= 2
3241 }),
3242 )
3243 .entries()
3244 .sim_output();
3245
3246 let instance_count = flow.sim().exhaustive(async || {
3247 in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
3248 let out: HashMap<_, _> = out_recv
3249 .collect_sorted::<Vec<_>>()
3250 .await
3251 .into_iter()
3252 .collect();
3253 // Each key accumulates its values; we get one entry per key
3254 assert_eq!(out.len(), 2);
3255 });
3256
3257 assert_eq!(instance_count, 24)
3258 }
3259
3260 #[cfg(feature = "sim")]
3261 #[test]
3262 fn sim_top_level_assume_ordering_cycle_back() {
3263 use std::collections::HashMap;
3264
3265 let mut flow = FlowBuilder::new();
3266 let node = flow.process::<()>();
3267 let node2 = flow.process::<()>();
3268
3269 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3270
3271 let (complete_cycle_back, cycle_back) =
3272 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3273 let ordered = input
3274 .into_keyed()
3275 .merge_unordered(cycle_back)
3276 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3277 complete_cycle_back.complete(
3278 ordered
3279 .clone()
3280 .map(q!(|v| v + 1))
3281 .filter(q!(|v| v % 2 == 1))
3282 .entries()
3283 .send(&node2, TCP.fail_stop().bincode())
3284 .send(&node, TCP.fail_stop().bincode())
3285 .into_keyed(),
3286 );
3287
3288 let out_recv = ordered
3289 .fold_early_stop(
3290 q!(|| Vec::new()),
3291 q!(|acc, v| {
3292 acc.push(v);
3293 acc.len() >= 2
3294 }),
3295 )
3296 .entries()
3297 .sim_output();
3298
3299 let mut saw = false;
3300 let instance_count = flow.sim().exhaustive(async || {
3301 // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3302 // We want to see [0, 1] - the cycled back value interleaved
3303 in_send.send_many_unordered([(1, 0), (1, 2)]);
3304 let out: HashMap<_, _> = out_recv
3305 .collect_sorted::<Vec<_>>()
3306 .await
3307 .into_iter()
3308 .collect();
3309
3310 // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3311 if let Some(values) = out.get(&1)
3312 && *values == vec![0, 1]
3313 {
3314 saw = true;
3315 }
3316 });
3317
3318 assert!(
3319 saw,
3320 "did not see an instance with key 1 having [0, 1] in order"
3321 );
3322 assert_eq!(instance_count, 6);
3323 }
3324
3325 #[cfg(feature = "sim")]
3326 #[test]
3327 fn sim_top_level_assume_ordering_cross_key_cycle() {
3328 use std::collections::HashMap;
3329
3330 // This test demonstrates why releasing one entry at a time is important:
3331 // When one key's observed order cycles back into a different key, we need
3332 // to be able to interleave the cycled-back entry with pending items for
3333 // that other key.
3334 let mut flow = FlowBuilder::new();
3335 let node = flow.process::<()>();
3336 let node2 = flow.process::<()>();
3337
3338 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3339
3340 let (complete_cycle_back, cycle_back) =
3341 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3342 let ordered = input
3343 .into_keyed()
3344 .merge_unordered(cycle_back)
3345 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3346
3347 // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3348 complete_cycle_back.complete(
3349 ordered
3350 .clone()
3351 .filter(q!(|v| *v == 10))
3352 .map(q!(|_| 100))
3353 .entries()
3354 .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3355 .send(&node2, TCP.fail_stop().bincode())
3356 .send(&node, TCP.fail_stop().bincode())
3357 .into_keyed(),
3358 );
3359
3360 let out_recv = ordered
3361 .fold_early_stop(
3362 q!(|| Vec::new()),
3363 q!(|acc, v| {
3364 acc.push(v);
3365 acc.len() >= 2
3366 }),
3367 )
3368 .entries()
3369 .sim_output();
3370
3371 // We want to see an instance where:
3372 // - (1, 10) is released first
3373 // - This causes (2, 100) to be cycled back
3374 // - (2, 100) is released BEFORE (2, 20) which was already pending
3375 let mut saw_cross_key_interleave = false;
3376 let instance_count = flow.sim().exhaustive(async || {
3377 // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3378 in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3379 let out: HashMap<_, _> = out_recv
3380 .collect_sorted::<Vec<_>>()
3381 .await
3382 .into_iter()
3383 .collect();
3384
3385 // Check if we see the cross-key interleaving:
3386 // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3387 if let Some(values) = out.get(&2)
3388 && values.len() >= 2
3389 && values[0] == 100
3390 {
3391 saw_cross_key_interleave = true;
3392 }
3393 });
3394
3395 assert!(
3396 saw_cross_key_interleave,
3397 "did not see an instance where cycled-back 100 was released before pending items for key 2"
3398 );
3399 assert_eq!(instance_count, 60);
3400 }
3401
3402 #[cfg(feature = "sim")]
3403 #[test]
3404 fn sim_top_level_assume_ordering_cycle_back_tick() {
3405 use std::collections::HashMap;
3406
3407 let mut flow = FlowBuilder::new();
3408 let node = flow.process::<()>();
3409 let node2 = flow.process::<()>();
3410
3411 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3412
3413 let (complete_cycle_back, cycle_back) =
3414 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3415 let ordered = input
3416 .into_keyed()
3417 .merge_unordered(cycle_back)
3418 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3419 complete_cycle_back.complete(
3420 ordered
3421 .clone()
3422 .batch(&node.tick(), nondet!(/** test */))
3423 .all_ticks()
3424 .map(q!(|v| v + 1))
3425 .filter(q!(|v| v % 2 == 1))
3426 .entries()
3427 .send(&node2, TCP.fail_stop().bincode())
3428 .send(&node, TCP.fail_stop().bincode())
3429 .into_keyed(),
3430 );
3431
3432 let out_recv = ordered
3433 .fold_early_stop(
3434 q!(|| Vec::new()),
3435 q!(|acc, v| {
3436 acc.push(v);
3437 acc.len() >= 2
3438 }),
3439 )
3440 .entries()
3441 .sim_output();
3442
3443 let mut saw = false;
3444 let instance_count = flow.sim().exhaustive(async || {
3445 in_send.send_many_unordered([(1, 0), (1, 2)]);
3446 let out: HashMap<_, _> = out_recv
3447 .collect_sorted::<Vec<_>>()
3448 .await
3449 .into_iter()
3450 .collect();
3451
3452 if let Some(values) = out.get(&1)
3453 && *values == vec![0, 1]
3454 {
3455 saw = true;
3456 }
3457 });
3458
3459 assert!(
3460 saw,
3461 "did not see an instance with key 1 having [0, 1] in order"
3462 );
3463 assert_eq!(instance_count, 58);
3464 }
3465
3466 #[cfg(feature = "sim")]
3467 #[test]
3468 fn sim_entries_partially_ordered_bounded() {
3469 let mut flow = FlowBuilder::new();
3470 let node = flow.process::<()>();
3471
3472 let (port, input) = node.sim_input::<_, TotalOrder, _>();
3473
3474 let tick = node.tick();
3475 let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3476 let out_recv = batch
3477 .entries_partially_ordered(nondet!(/** test */))
3478 .all_ticks()
3479 .sim_output();
3480
3481 let instance_count = flow.sim().exhaustive(async || {
3482 port.send((1, 'a'));
3483 port.send((1, 'b'));
3484 port.send((2, 'c'));
3485 let _: Vec<(i32, char)> = out_recv.collect().await;
3486 });
3487
3488 assert_eq!(instance_count, 12);
3489 }
3490
3491 #[cfg(feature = "sim")]
3492 #[test]
3493 fn sim_entries_partially_ordered_top_level() {
3494 let mut flow = FlowBuilder::new();
3495 let node = flow.process::<()>();
3496
3497 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3498
3499 let out_recv = input
3500 .into_keyed()
3501 .entries_partially_ordered(nondet!(/** test */))
3502 .sim_output();
3503
3504 let instance_count = flow.sim().exhaustive(async || {
3505 in_send.send((1, 'a'));
3506 in_send.send((1, 'b'));
3507 in_send.send((2, 'c'));
3508 let _: Vec<(i32, char)> = out_recv.collect().await;
3509 });
3510
3511 assert_eq!(instance_count, 3);
3512 }
3513
3514 #[cfg(feature = "sim")]
3515 #[test]
3516 fn sim_entries_partially_ordered_cycle_back() {
3517 let mut flow = FlowBuilder::new();
3518 let node = flow.process::<()>();
3519 let node2 = flow.process::<()>();
3520
3521 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3522
3523 let (complete_cycle_back, cycle_back) =
3524 node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3525 let ordered = input
3526 .into_keyed()
3527 .merge_unordered(cycle_back)
3528 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3529
3530 let flat = ordered
3531 .clone()
3532 .entries_partially_ordered(nondet!(/** test */));
3533
3534 complete_cycle_back.complete(
3535 flat.clone()
3536 .map(q!(|(k, v): (i32, i32)| (k, v + 1)))
3537 .filter(q!(|(_, v)| *v % 2 == 1))
3538 .send(&node2, TCP.fail_stop().bincode())
3539 .send(&node, TCP.fail_stop().bincode())
3540 .into_keyed(),
3541 );
3542
3543 let out_recv = flat.sim_output();
3544
3545 let mut saw = false;
3546 let instance_count = flow.sim().exhaustive(async || {
3547 // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back as (1, 1).
3548 // We want to see (1, 1) before (1, 2) - the cycled back value beats the pending one
3549 in_send.send_many_unordered([(1, 0), (1, 2)]);
3550 let results: Vec<(i32, i32)> = out_recv.collect().await;
3551
3552 let pos_1 = results.iter().position(|v| *v == (1, 1));
3553 let pos_2 = results.iter().position(|v| *v == (1, 2));
3554 if let (Some(p1), Some(p2)) = (pos_1, pos_2)
3555 && p1 < p2
3556 {
3557 saw = true;
3558 }
3559 });
3560
3561 assert!(saw, "did not see an instance with (1, 1) before (1, 2)");
3562 assert_eq!(instance_count, 78);
3563 }
3564}