Skip to main content

hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::builder::{CycleId, FlowState};
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::stream::{Ordering, Retries};
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::DeferTick;
28use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
29use crate::manual_expr::ManualExpr;
30use crate::nondet::{NonDet, nondet};
31use crate::properties::manual_proof;
32
33/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
34///
35/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
36/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
37/// indicates that entries may be added over time, but once an entry is added it will never be
38/// removed and its value will never change.
39pub trait KeyedSingletonBound {
40    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
41    type UnderlyingBound: Boundedness;
42    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
43    type ValueBound: Boundedness;
44
45    /// The type of the keyed singleton if the value for each key is immutable.
46    type WithBoundedValue: KeyedSingletonBound<
47            UnderlyingBound = Self::UnderlyingBound,
48            ValueBound = Bounded,
49            EraseMonotonic = Self::WithBoundedValue,
50        >;
51
52    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`KeyedStream`] with [`Self`] boundedness.
53    type KeyedStreamToMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
54
55    /// The type of the keyed singleton if the value for each key is no longer monotonic.
56    type EraseMonotonic: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
57
58    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
59    fn bound_kind() -> KeyedSingletonBoundKind;
60}
61
62impl KeyedSingletonBound for Unbounded {
63    type UnderlyingBound = Unbounded;
64    type ValueBound = Unbounded;
65    type WithBoundedValue = BoundedValue;
66    type KeyedStreamToMonotone = MonotonicValue;
67    type EraseMonotonic = Unbounded;
68
69    fn bound_kind() -> KeyedSingletonBoundKind {
70        KeyedSingletonBoundKind::Unbounded
71    }
72}
73
74impl KeyedSingletonBound for Bounded {
75    type UnderlyingBound = Bounded;
76    type ValueBound = Bounded;
77    type WithBoundedValue = Bounded;
78    type KeyedStreamToMonotone = Bounded;
79    type EraseMonotonic = Bounded;
80
81    fn bound_kind() -> KeyedSingletonBoundKind {
82        KeyedSingletonBoundKind::Bounded
83    }
84}
85
86/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
87/// its value is bounded and will never change, but new entries may appear asynchronously
88pub struct BoundedValue;
89
90impl KeyedSingletonBound for BoundedValue {
91    type UnderlyingBound = Unbounded;
92    type ValueBound = Bounded;
93    type WithBoundedValue = BoundedValue;
94    type KeyedStreamToMonotone = BoundedValue;
95    type EraseMonotonic = BoundedValue;
96
97    fn bound_kind() -> KeyedSingletonBoundKind {
98        KeyedSingletonBoundKind::BoundedValue
99    }
100}
101
102/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
103/// it will never be removed, and the corresponding value will only increase monotonically.
104pub struct MonotonicValue;
105
106impl KeyedSingletonBound for MonotonicValue {
107    type UnderlyingBound = Unbounded;
108    type ValueBound = Unbounded;
109    type WithBoundedValue = BoundedValue;
110    type KeyedStreamToMonotone = MonotonicValue;
111    type EraseMonotonic = Unbounded;
112
113    fn bound_kind() -> KeyedSingletonBoundKind {
114        KeyedSingletonBoundKind::MonotonicValue
115    }
116}
117
118/// Mapping from keys of type `K` to values of type `V`.
119///
120/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
121/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
122/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
123/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
124/// keys cannot be removed and the value for each key is immutable.
125///
126/// Type Parameters:
127/// - `K`: the type of the key for each entry
128/// - `V`: the type of the value for each entry
129/// - `Loc`: the [`Location`] where the keyed singleton is materialized
130/// - `Bound`: tracks whether the entries are:
131///     - [`Bounded`] (local and finite)
132///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
133///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
134pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
135    pub(crate) location: Loc,
136    pub(crate) ir_node: RefCell<HydroNode>,
137    pub(crate) flow_state: FlowState,
138
139    _phantom: PhantomData<(K, V, Loc, Bound)>,
140}
141
142impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
143    fn drop(&mut self) {
144        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
145        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
146            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
147                input: Box::new(ir_node),
148                op_metadata: HydroIrOpMetadata::new(),
149            });
150        }
151    }
152}
153
154impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
155    for KeyedSingleton<K, V, Loc, Bound>
156{
157    fn clone(&self) -> Self {
158        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
159            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
160            *self.ir_node.borrow_mut() = HydroNode::Tee {
161                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
162                metadata: self.location.new_node_metadata(Self::collection_kind()),
163            };
164        }
165
166        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
167            KeyedSingleton {
168                location: self.location.clone(),
169                flow_state: self.flow_state.clone(),
170                ir_node: HydroNode::Tee {
171                    inner: SharedNode(inner.0.clone()),
172                    metadata: metadata.clone(),
173                }
174                .into(),
175                _phantom: PhantomData,
176            }
177        } else {
178            unreachable!()
179        }
180    }
181}
182
183impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
184    for KeyedSingleton<K, V, L, B>
185where
186    L: Location<'a> + NoTick,
187{
188    type Location = L;
189
190    fn create_source(cycle_id: CycleId, location: L) -> Self {
191        KeyedSingleton {
192            flow_state: location.flow_state().clone(),
193            location: location.clone(),
194            ir_node: RefCell::new(HydroNode::CycleSource {
195                cycle_id,
196                metadata: location.new_node_metadata(Self::collection_kind()),
197            }),
198            _phantom: PhantomData,
199        }
200    }
201}
202
203impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
204where
205    L: Location<'a>,
206{
207    type Location = Tick<L>;
208
209    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
210        KeyedSingleton::new(
211            location.clone(),
212            HydroNode::CycleSource {
213                cycle_id,
214                metadata: location.new_node_metadata(Self::collection_kind()),
215            },
216        )
217    }
218}
219
220impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
221where
222    L: Location<'a>,
223{
224    fn defer_tick(self) -> Self {
225        KeyedSingleton::defer_tick(self)
226    }
227}
228
229impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
230    for KeyedSingleton<K, V, L, B>
231where
232    L: Location<'a> + NoTick,
233{
234    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
235        assert_eq!(
236            Location::id(&self.location),
237            expected_location,
238            "locations do not match"
239        );
240        self.location
241            .flow_state()
242            .borrow_mut()
243            .push_root(HydroRoot::CycleSink {
244                cycle_id,
245                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
246                op_metadata: HydroIrOpMetadata::new(),
247            });
248    }
249}
250
251impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
252where
253    L: Location<'a>,
254{
255    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
256        assert_eq!(
257            Location::id(&self.location),
258            expected_location,
259            "locations do not match"
260        );
261        self.location
262            .flow_state()
263            .borrow_mut()
264            .push_root(HydroRoot::CycleSink {
265                cycle_id,
266                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
267                op_metadata: HydroIrOpMetadata::new(),
268            });
269    }
270}
271
272impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
273    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
274        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
275        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
276
277        let flow_state = location.flow_state().clone();
278        KeyedSingleton {
279            location,
280            flow_state,
281            ir_node: RefCell::new(ir_node),
282            _phantom: PhantomData,
283        }
284    }
285
286    /// Returns the [`Location`] where this keyed singleton is being materialized.
287    pub fn location(&self) -> &L {
288        &self.location
289    }
290}
291
292#[cfg(stageleft_runtime)]
293fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
294    me: KeyedSingleton<K, V, L, Bounded>,
295) -> Singleton<usize, L, Bounded> {
296    me.entries().count()
297}
298
299#[cfg(stageleft_runtime)]
300fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
301    me: KeyedSingleton<K, V, L, Bounded>,
302) -> Singleton<HashMap<K, V>, L, Bounded>
303where
304    K: Eq + Hash,
305{
306    me.entries()
307        .assume_ordering_trusted(nondet!(
308            /// There is only one element associated with each key. The closure technically
309            /// isn't commutative in the case where both passed entries have the same key
310            /// but different values.
311            ///
312            /// In the future, we may want to have an `assume!(...)` statement in the UDF that
313            /// the key is never already present in the map.
314        ))
315        .fold(
316            q!(|| HashMap::new()),
317            q!(|map, (k, v)| {
318                map.insert(k, v);
319            }),
320        )
321}
322
323impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
324    pub(crate) fn collection_kind() -> CollectionKind {
325        CollectionKind::KeyedSingleton {
326            bound: B::bound_kind(),
327            key_type: stageleft::quote_type::<K>().into(),
328            value_type: stageleft::quote_type::<V>().into(),
329        }
330    }
331
332    /// Transforms each value by invoking `f` on each element, with keys staying the same
333    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
334    ///
335    /// If you do not want to modify the stream and instead only want to view
336    /// each item use [`KeyedSingleton::inspect`] instead.
337    ///
338    /// # Example
339    /// ```rust
340    /// # #[cfg(feature = "deploy")] {
341    /// # use hydro_lang::prelude::*;
342    /// # use futures::StreamExt;
343    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
344    /// let keyed_singleton = // { 1: 2, 2: 4 }
345    /// # process
346    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
347    /// #     .into_keyed()
348    /// #     .first();
349    /// keyed_singleton.map(q!(|v| v + 1))
350    /// #   .entries()
351    /// # }, |mut stream| async move {
352    /// // { 1: 3, 2: 5 }
353    /// # let mut results = Vec::new();
354    /// # for _ in 0..2 {
355    /// #     results.push(stream.next().await.unwrap());
356    /// # }
357    /// # results.sort();
358    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
359    /// # }));
360    /// # }
361    /// ```
362    pub fn map<U, F>(
363        self,
364        f: impl IntoQuotedMut<'a, F, L> + Copy,
365    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
366    where
367        F: Fn(V) -> U + 'a,
368    {
369        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
370        let map_f = q!({
371            let orig = f;
372            move |(k, v)| (k, orig(v))
373        })
374        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
375        .into();
376
377        KeyedSingleton::new(
378            self.location.clone(),
379            HydroNode::Map {
380                f: map_f,
381                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
382                metadata: self.location.new_node_metadata(KeyedSingleton::<
383                    K,
384                    U,
385                    L,
386                    B::EraseMonotonic,
387                >::collection_kind()),
388            },
389        )
390    }
391
392    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
393    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
394    ///
395    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
396    /// the new value `U`. The key remains unchanged in the output.
397    ///
398    /// # Example
399    /// ```rust
400    /// # #[cfg(feature = "deploy")] {
401    /// # use hydro_lang::prelude::*;
402    /// # use futures::StreamExt;
403    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
404    /// let keyed_singleton = // { 1: 2, 2: 4 }
405    /// # process
406    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
407    /// #     .into_keyed()
408    /// #     .first();
409    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
410    /// #   .entries()
411    /// # }, |mut stream| async move {
412    /// // { 1: 3, 2: 6 }
413    /// # let mut results = Vec::new();
414    /// # for _ in 0..2 {
415    /// #     results.push(stream.next().await.unwrap());
416    /// # }
417    /// # results.sort();
418    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
419    /// # }));
420    /// # }
421    /// ```
422    pub fn map_with_key<U, F>(
423        self,
424        f: impl IntoQuotedMut<'a, F, L> + Copy,
425    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
426    where
427        F: Fn((K, V)) -> U + 'a,
428        K: Clone,
429    {
430        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
431        let map_f = q!({
432            let orig = f;
433            move |(k, v)| {
434                let out = orig((Clone::clone(&k), v));
435                (k, out)
436            }
437        })
438        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
439        .into();
440
441        KeyedSingleton::new(
442            self.location.clone(),
443            HydroNode::Map {
444                f: map_f,
445                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
446                metadata: self.location.new_node_metadata(KeyedSingleton::<
447                    K,
448                    U,
449                    L,
450                    B::EraseMonotonic,
451                >::collection_kind()),
452            },
453        )
454    }
455
456    /// Gets the number of keys in the keyed singleton.
457    ///
458    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
459    /// since keys may be added / removed over time. When the set of keys changes, the count will
460    /// be asynchronously updated.
461    ///
462    /// # Example
463    /// ```rust
464    /// # #[cfg(feature = "deploy")] {
465    /// # use hydro_lang::prelude::*;
466    /// # use futures::StreamExt;
467    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
468    /// # let tick = process.tick();
469    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
470    /// # process
471    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
472    /// #     .into_keyed()
473    /// #     .batch(&tick, nondet!(/** test */))
474    /// #     .first();
475    /// keyed_singleton.key_count()
476    /// # .all_ticks()
477    /// # }, |mut stream| async move {
478    /// // 3
479    /// # assert_eq!(stream.next().await.unwrap(), 3);
480    /// # }));
481    /// # }
482    /// ```
483    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
484        if B::ValueBound::BOUNDED {
485            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
486                location: self.location.clone(),
487                flow_state: self.flow_state.clone(),
488                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
489                _phantom: PhantomData,
490            };
491
492            me.entries().count().ignore_monotonic()
493        } else if L::is_top_level()
494            && let Some(tick) = self.location.try_tick()
495            && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
496        {
497            let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
498                self.location.clone(),
499                self.ir_node.replace(HydroNode::Placeholder),
500            );
501
502            let out =
503                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
504                    .latest();
505            Singleton::new(
506                out.location.clone(),
507                out.ir_node.replace(HydroNode::Placeholder),
508            )
509        } else {
510            panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
511        }
512    }
513
514    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
515    ///
516    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
517    /// asynchronously as well.
518    ///
519    /// # Example
520    /// ```rust
521    /// # #[cfg(feature = "deploy")] {
522    /// # use hydro_lang::prelude::*;
523    /// # use futures::StreamExt;
524    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
525    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
526    /// # process
527    /// #     .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
528    /// #     .into_keyed()
529    /// #     .batch(&process.tick(), nondet!(/** test */))
530    /// #     .first();
531    /// keyed_singleton.into_singleton()
532    /// # .all_ticks()
533    /// # }, |mut stream| async move {
534    /// // { 1: "a", 2: "b", 3: "c" }
535    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
536    /// # }));
537    /// # }
538    /// ```
539    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
540    where
541        K: Eq + Hash,
542    {
543        if B::ValueBound::BOUNDED {
544            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
545                location: self.location.clone(),
546                flow_state: self.flow_state.clone(),
547                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
548                _phantom: PhantomData,
549            };
550
551            me.entries()
552                .assume_ordering_trusted(nondet!(
553                    /// There is only one element associated with each key. The closure technically
554                    /// isn't commutative in the case where both passed entries have the same key
555                    /// but different values.
556                    ///
557                    /// In the future, we may want to have an `assume!(...)` statement in the UDF that
558                    /// the key is never already present in the map.
559                ))
560                .fold(
561                    q!(|| HashMap::new()),
562                    q!(|map, (k, v)| {
563                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
564                        map.insert(k, v);
565                    }),
566                )
567        } else if L::is_top_level()
568            && let Some(tick) = self.location.try_tick()
569            && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
570        {
571            let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
572                self.location.clone(),
573                self.ir_node.replace(HydroNode::Placeholder),
574            );
575
576            let out = into_singleton_inside_tick(
577                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
578            )
579            .latest();
580            Singleton::new(
581                out.location.clone(),
582                out.ir_node.replace(HydroNode::Placeholder),
583            )
584        } else {
585            panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
586        }
587    }
588
589    /// An operator which allows you to "name" a `HydroNode`.
590    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
591    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
592        {
593            let mut node = self.ir_node.borrow_mut();
594            let metadata = node.metadata_mut();
595            metadata.tag = Some(name.to_owned());
596        }
597        self
598    }
599
600    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
601    /// implies that `B == Bounded`.
602    pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
603    where
604        B: IsBounded,
605    {
606        KeyedSingleton::new(
607            self.location.clone(),
608            self.ir_node.replace(HydroNode::Placeholder),
609        )
610    }
611
612    /// Gets the value associated with a specific key from the keyed singleton.
613    /// Returns `None` if the key is `None` or there is no associated value.
614    ///
615    /// # Example
616    /// ```rust
617    /// # #[cfg(feature = "deploy")] {
618    /// # use hydro_lang::prelude::*;
619    /// # use futures::StreamExt;
620    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
621    /// let tick = process.tick();
622    /// let keyed_data = process
623    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
624    ///     .into_keyed()
625    ///     .batch(&tick, nondet!(/** test */))
626    ///     .first();
627    /// let key = tick.singleton(q!(1));
628    /// keyed_data.get(key).all_ticks()
629    /// # }, |mut stream| async move {
630    /// // 2
631    /// # assert_eq!(stream.next().await.unwrap(), 2);
632    /// # }));
633    /// # }
634    /// ```
635    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
636    where
637        B: IsBounded,
638        K: Hash + Eq + Clone,
639        V: Clone,
640    {
641        self.make_bounded()
642            .into_keyed_stream()
643            .get(key)
644            .cast_at_most_one_element()
645    }
646
647    /// Emit a keyed stream containing keys shared between the keyed singleton and the
648    /// keyed stream, where each value in the output keyed stream is a tuple of
649    /// (the keyed singleton's value, the keyed stream's value).
650    ///
651    /// # Example
652    /// ```rust
653    /// # #[cfg(feature = "deploy")] {
654    /// # use hydro_lang::prelude::*;
655    /// # use futures::StreamExt;
656    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
657    /// let tick = process.tick();
658    /// let keyed_data = process
659    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
660    ///     .into_keyed()
661    ///     .batch(&tick, nondet!(/** test */))
662    ///     .first();
663    /// let other_data = process
664    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
665    ///     .into_keyed()
666    ///     .batch(&tick, nondet!(/** test */));
667    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
668    /// # }, |mut stream| async move {
669    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
670    /// # let mut results = vec![];
671    /// # for _ in 0..3 {
672    /// #     results.push(stream.next().await.unwrap());
673    /// # }
674    /// # results.sort();
675    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
676    /// # }));
677    /// # }
678    /// ```
679    pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2, B2: Boundedness>(
680        self,
681        other: KeyedStream<K, V2, L, B2, O2, R2>,
682    ) -> KeyedStream<K, (V, V2), L, B2, O2, R2>
683    where
684        B: IsBounded,
685        K: Eq + Hash + Clone,
686        V: Clone,
687        V2: Clone,
688    {
689        // TODO(shadaj): if DFIR guarantees that joining unbounded keyed stream x bounded keyed stream
690        // always produces deterministic order per key (nested loop join), this could just use
691        // `join_keyed_stream` without constructing IRs manually
692        KeyedStream::new(
693            self.location.clone(),
694            HydroNode::Join {
695                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
696                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
697                metadata: self
698                    .location
699                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B2, O2, R2>::collection_kind()),
700            },
701        )
702    }
703
704    /// Emit a keyed singleton containing all keys shared between two keyed singletons,
705    /// where each value in the output keyed singleton is a tuple of
706    /// (self.value, other.value).
707    ///
708    /// # Example
709    /// ```rust
710    /// # #[cfg(feature = "deploy")] {
711    /// # use hydro_lang::prelude::*;
712    /// # use futures::StreamExt;
713    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
714    /// # let tick = process.tick();
715    /// let requests = // { 1: 10, 2: 20, 3: 30 }
716    /// # process
717    /// #     .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
718    /// #     .into_keyed()
719    /// #     .batch(&tick, nondet!(/** test */))
720    /// #     .first();
721    /// let other = // { 1: 100, 2: 200, 4: 400 }
722    /// # process
723    /// #     .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
724    /// #     .into_keyed()
725    /// #     .batch(&tick, nondet!(/** test */))
726    /// #     .first();
727    /// requests.join_keyed_singleton(other)
728    /// # .entries().all_ticks()
729    /// # }, |mut stream| async move {
730    /// // { 1: (10, 100), 2: (20, 200) }
731    /// # let mut results = vec![];
732    /// # for _ in 0..2 {
733    /// #     results.push(stream.next().await.unwrap());
734    /// # }
735    /// # results.sort();
736    /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
737    /// # }));
738    /// # }
739    /// ```
740    pub fn join_keyed_singleton<V2: Clone>(
741        self,
742        other: KeyedSingleton<K, V2, L, Bounded>,
743    ) -> KeyedSingleton<K, (V, V2), L, Bounded>
744    where
745        B: IsBounded,
746        K: Eq + Hash + Clone,
747        V: Clone,
748    {
749        let result_stream = self
750            .make_bounded()
751            .entries()
752            .join(other.entries())
753            .into_keyed();
754
755        // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
756        result_stream.cast_at_most_one_entry_per_key()
757    }
758
759    /// For each value in `self`, find the matching key in `lookup`.
760    /// The output is a keyed singleton with the key from `self`, and a value
761    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
762    /// If the key is not present in `lookup`, the option will be [`None`].
763    ///
764    /// # Example
765    /// ```rust
766    /// # #[cfg(feature = "deploy")] {
767    /// # use hydro_lang::prelude::*;
768    /// # use futures::StreamExt;
769    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
770    /// # let tick = process.tick();
771    /// let requests = // { 1: 10, 2: 20 }
772    /// # process
773    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
774    /// #     .into_keyed()
775    /// #     .batch(&tick, nondet!(/** test */))
776    /// #     .first();
777    /// let other_data = // { 10: 100, 11: 110 }
778    /// # process
779    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
780    /// #     .into_keyed()
781    /// #     .batch(&tick, nondet!(/** test */))
782    /// #     .first();
783    /// requests.lookup_keyed_singleton(other_data)
784    /// # .entries().all_ticks()
785    /// # }, |mut stream| async move {
786    /// // { 1: (10, Some(100)), 2: (20, None) }
787    /// # let mut results = vec![];
788    /// # for _ in 0..2 {
789    /// #     results.push(stream.next().await.unwrap());
790    /// # }
791    /// # results.sort();
792    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
793    /// # }));
794    /// # }
795    /// ```
796    pub fn lookup_keyed_singleton<V2>(
797        self,
798        lookup: KeyedSingleton<V, V2, L, Bounded>,
799    ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
800    where
801        B: IsBounded,
802        K: Eq + Hash + Clone,
803        V: Eq + Hash + Clone,
804        V2: Clone,
805    {
806        let result_stream = self
807            .make_bounded()
808            .into_keyed_stream()
809            .lookup_keyed_stream(lookup.into_keyed_stream());
810
811        // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
812        result_stream.cast_at_most_one_entry_per_key()
813    }
814
815    /// For each value in `self`, find the matching key in `lookup`.
816    /// The output is a keyed stream with the key from `self`, and a value
817    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
818    /// If the key is not present in `lookup`, the option will be [`None`].
819    ///
820    /// # Example
821    /// ```rust
822    /// # #[cfg(feature = "deploy")] {
823    /// # use hydro_lang::prelude::*;
824    /// # use futures::StreamExt;
825    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
826    /// # let tick = process.tick();
827    /// let requests = // { 1: 10, 2: 20 }
828    /// # process
829    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
830    /// #     .into_keyed()
831    /// #     .batch(&tick, nondet!(/** test */))
832    /// #     .first();
833    /// let other_data = // { 10: 100, 10: 110 }
834    /// # process
835    /// #     .source_iter(q!(vec![(10, 100), (10, 110)]))
836    /// #     .into_keyed()
837    /// #     .batch(&tick, nondet!(/** test */));
838    /// requests.lookup_keyed_stream(other_data)
839    /// # .entries().all_ticks()
840    /// # }, |mut stream| async move {
841    /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
842    /// # let mut results = vec![];
843    /// # for _ in 0..3 {
844    /// #     results.push(stream.next().await.unwrap());
845    /// # }
846    /// # results.sort();
847    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
848    /// # }));
849    /// # }
850    /// ```
851    pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
852        self,
853        lookup: KeyedStream<V, V2, L, Bounded, O, R>,
854    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
855    where
856        B: IsBounded,
857        K: Eq + Hash + Clone,
858        V: Eq + Hash + Clone,
859        V2: Clone,
860    {
861        self.make_bounded()
862            .entries()
863            .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
864            .into_keyed()
865            .lookup_keyed_stream(lookup)
866    }
867}
868
869impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
870    KeyedSingleton<K, V, L, B>
871{
872    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
873    ///
874    /// The value for each key must be bounded, otherwise the resulting stream elements would be
875    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
876    /// into the output.
877    ///
878    /// # Example
879    /// ```rust
880    /// # #[cfg(feature = "deploy")] {
881    /// # use hydro_lang::prelude::*;
882    /// # use futures::StreamExt;
883    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
884    /// let keyed_singleton = // { 1: 2, 2: 4 }
885    /// # process
886    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
887    /// #     .into_keyed()
888    /// #     .first();
889    /// keyed_singleton.entries()
890    /// # }, |mut stream| async move {
891    /// // (1, 2), (2, 4) in any order
892    /// # let mut results = Vec::new();
893    /// # for _ in 0..2 {
894    /// #     results.push(stream.next().await.unwrap());
895    /// # }
896    /// # results.sort();
897    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
898    /// # }));
899    /// # }
900    /// ```
901    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
902        self.into_keyed_stream().entries()
903    }
904
905    /// Flattens the keyed singleton into an unordered stream of just the values.
906    ///
907    /// The value for each key must be bounded, otherwise the resulting stream elements would be
908    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
909    /// into the output.
910    ///
911    /// # Example
912    /// ```rust
913    /// # #[cfg(feature = "deploy")] {
914    /// # use hydro_lang::prelude::*;
915    /// # use futures::StreamExt;
916    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
917    /// let keyed_singleton = // { 1: 2, 2: 4 }
918    /// # process
919    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
920    /// #     .into_keyed()
921    /// #     .first();
922    /// keyed_singleton.values()
923    /// # }, |mut stream| async move {
924    /// // 2, 4 in any order
925    /// # let mut results = Vec::new();
926    /// # for _ in 0..2 {
927    /// #     results.push(stream.next().await.unwrap());
928    /// # }
929    /// # results.sort();
930    /// # assert_eq!(results, vec![2, 4]);
931    /// # }));
932    /// # }
933    /// ```
934    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
935        let map_f = q!(|(_, v)| v)
936            .splice_fn1_ctx::<(K, V), V>(&self.location)
937            .into();
938
939        Stream::new(
940            self.location.clone(),
941            HydroNode::Map {
942                f: map_f,
943                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
944                metadata: self.location.new_node_metadata(Stream::<
945                    V,
946                    L,
947                    B::UnderlyingBound,
948                    NoOrder,
949                    ExactlyOnce,
950                >::collection_kind()),
951            },
952        )
953    }
954
955    /// Flattens the keyed singleton into an unordered stream of just the keys.
956    ///
957    /// The value for each key must be bounded, otherwise the removal of keys would result in
958    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
959    /// into the output.
960    ///
961    /// # Example
962    /// ```rust
963    /// # #[cfg(feature = "deploy")] {
964    /// # use hydro_lang::prelude::*;
965    /// # use futures::StreamExt;
966    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
967    /// let keyed_singleton = // { 1: 2, 2: 4 }
968    /// # process
969    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
970    /// #     .into_keyed()
971    /// #     .first();
972    /// keyed_singleton.keys()
973    /// # }, |mut stream| async move {
974    /// // 1, 2 in any order
975    /// # let mut results = Vec::new();
976    /// # for _ in 0..2 {
977    /// #     results.push(stream.next().await.unwrap());
978    /// # }
979    /// # results.sort();
980    /// # assert_eq!(results, vec![1, 2]);
981    /// # }));
982    /// # }
983    /// ```
984    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
985        self.entries().map(q!(|(k, _)| k))
986    }
987
988    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
989    /// entries whose keys are not in the provided stream.
990    ///
991    /// # Example
992    /// ```rust
993    /// # #[cfg(feature = "deploy")] {
994    /// # use hydro_lang::prelude::*;
995    /// # use futures::StreamExt;
996    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
997    /// let tick = process.tick();
998    /// let keyed_singleton = // { 1: 2, 2: 4 }
999    /// # process
1000    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1001    /// #     .into_keyed()
1002    /// #     .first()
1003    /// #     .batch(&tick, nondet!(/** test */));
1004    /// let keys_to_remove = process
1005    ///     .source_iter(q!(vec![1]))
1006    ///     .batch(&tick, nondet!(/** test */));
1007    /// keyed_singleton.filter_key_not_in(keys_to_remove)
1008    /// #   .entries().all_ticks()
1009    /// # }, |mut stream| async move {
1010    /// // { 2: 4 }
1011    /// # for w in vec![(2, 4)] {
1012    /// #     assert_eq!(stream.next().await.unwrap(), w);
1013    /// # }
1014    /// # }));
1015    /// # }
1016    /// ```
1017    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1018        self,
1019        other: Stream<K, L, Bounded, O2, R2>,
1020    ) -> Self
1021    where
1022        K: Hash + Eq,
1023    {
1024        check_matching_location(&self.location, &other.location);
1025
1026        KeyedSingleton::new(
1027            self.location.clone(),
1028            HydroNode::AntiJoin {
1029                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1030                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1031                metadata: self.location.new_node_metadata(Self::collection_kind()),
1032            },
1033        )
1034    }
1035
1036    /// An operator which allows you to "inspect" each value of a keyed singleton without
1037    /// modifying it. The closure `f` is called on a reference to each value. This is
1038    /// mainly useful for debugging, and should not be used to generate side-effects.
1039    ///
1040    /// # Example
1041    /// ```rust
1042    /// # #[cfg(feature = "deploy")] {
1043    /// # use hydro_lang::prelude::*;
1044    /// # use futures::StreamExt;
1045    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1046    /// let keyed_singleton = // { 1: 2, 2: 4 }
1047    /// # process
1048    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1049    /// #     .into_keyed()
1050    /// #     .first();
1051    /// keyed_singleton
1052    ///     .inspect(q!(|v| println!("{}", v)))
1053    /// #   .entries()
1054    /// # }, |mut stream| async move {
1055    /// // { 1: 2, 2: 4 }
1056    /// # for w in vec![(1, 2), (2, 4)] {
1057    /// #     assert_eq!(stream.next().await.unwrap(), w);
1058    /// # }
1059    /// # }));
1060    /// # }
1061    /// ```
1062    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1063    where
1064        F: Fn(&V) + 'a,
1065    {
1066        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1067        let inspect_f = q!({
1068            let orig = f;
1069            move |t: &(_, _)| orig(&t.1)
1070        })
1071        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1072        .into();
1073
1074        KeyedSingleton::new(
1075            self.location.clone(),
1076            HydroNode::Inspect {
1077                f: inspect_f,
1078                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1079                metadata: self.location.new_node_metadata(Self::collection_kind()),
1080            },
1081        )
1082    }
1083
1084    /// An operator which allows you to "inspect" each entry of a keyed singleton without
1085    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1086    /// mainly useful for debugging, and should not be used to generate side-effects.
1087    ///
1088    /// # Example
1089    /// ```rust
1090    /// # #[cfg(feature = "deploy")] {
1091    /// # use hydro_lang::prelude::*;
1092    /// # use futures::StreamExt;
1093    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1094    /// let keyed_singleton = // { 1: 2, 2: 4 }
1095    /// # process
1096    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1097    /// #     .into_keyed()
1098    /// #     .first();
1099    /// keyed_singleton
1100    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1101    /// #   .entries()
1102    /// # }, |mut stream| async move {
1103    /// // { 1: 2, 2: 4 }
1104    /// # for w in vec![(1, 2), (2, 4)] {
1105    /// #     assert_eq!(stream.next().await.unwrap(), w);
1106    /// # }
1107    /// # }));
1108    /// # }
1109    /// ```
1110    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1111    where
1112        F: Fn(&(K, V)) + 'a,
1113    {
1114        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1115
1116        KeyedSingleton::new(
1117            self.location.clone(),
1118            HydroNode::Inspect {
1119                f: inspect_f,
1120                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1121                metadata: self.location.new_node_metadata(Self::collection_kind()),
1122            },
1123        )
1124    }
1125
1126    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1127    ///
1128    /// Because this method requires values to be bounded, the output [`Optional`] will only be
1129    /// asynchronously updated if a new key is added that is higher than the previous max key.
1130    ///
1131    /// # Example
1132    /// ```rust
1133    /// # #[cfg(feature = "deploy")] {
1134    /// # use hydro_lang::prelude::*;
1135    /// # use futures::StreamExt;
1136    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1137    /// let tick = process.tick();
1138    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1139    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1140    /// #     .into_keyed()
1141    /// #     .first();
1142    /// keyed_singleton.get_max_key()
1143    /// # .sample_eager(nondet!(/** test */))
1144    /// # }, |mut stream| async move {
1145    /// // (2, 456)
1146    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1147    /// # }));
1148    /// # }
1149    /// ```
1150    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1151    where
1152        K: Ord,
1153    {
1154        self.entries()
1155            .assume_ordering_trusted(nondet!(
1156                /// There is only one element associated with each key, and the keys are totallly
1157                /// ordered so we will produce a deterministic value. The closure technically
1158                /// isn't commutative in the case where both passed entries have the same key
1159                /// but different values.
1160                ///
1161                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1162                /// the two inputs do not have the same key.
1163            ))
1164            .reduce(q!(
1165                move |curr, new| {
1166                    if new.0 > curr.0 {
1167                        *curr = new;
1168                    }
1169                },
1170                idempotent = manual_proof!(/** repeated elements are ignored */)
1171            ))
1172    }
1173
1174    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1175    /// element, the value.
1176    ///
1177    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1178    ///
1179    /// # Example
1180    /// ```rust
1181    /// # #[cfg(feature = "deploy")] {
1182    /// # use hydro_lang::prelude::*;
1183    /// # use futures::StreamExt;
1184    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1185    /// let keyed_singleton = // { 1: 2, 2: 4 }
1186    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1187    /// #     .into_keyed()
1188    /// #     .first();
1189    /// keyed_singleton
1190    ///     .clone()
1191    ///     .into_keyed_stream()
1192    ///     .merge_unordered(
1193    ///         keyed_singleton.into_keyed_stream()
1194    ///     )
1195    /// #   .entries()
1196    /// # }, |mut stream| async move {
1197    /// /// // { 1: [2, 2], 2: [4, 4] }
1198    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1199    /// #     assert_eq!(stream.next().await.unwrap(), w);
1200    /// # }
1201    /// # }));
1202    /// # }
1203    /// ```
1204    pub fn into_keyed_stream(
1205        self,
1206    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1207        KeyedStream::new(
1208            self.location.clone(),
1209            HydroNode::Cast {
1210                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1211                metadata: self.location.new_node_metadata(KeyedStream::<
1212                    K,
1213                    V,
1214                    L,
1215                    B::UnderlyingBound,
1216                    TotalOrder,
1217                    ExactlyOnce,
1218                >::collection_kind()),
1219            },
1220        )
1221    }
1222}
1223
1224impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1225where
1226    L: Location<'a>,
1227{
1228    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1229    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1230    ///
1231    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1232    /// processed before an acknowledgement is emitted.
1233    pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1234        let id = self.location.flow_state().borrow_mut().next_clock_id();
1235        let out_location = Atomic {
1236            tick: Tick {
1237                id,
1238                l: self.location.clone(),
1239            },
1240        };
1241        KeyedSingleton::new(
1242            out_location.clone(),
1243            HydroNode::BeginAtomic {
1244                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1245                metadata: out_location
1246                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1247            },
1248        )
1249    }
1250}
1251
1252impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1253where
1254    L: Location<'a> + NoTick,
1255{
1256    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1257    /// See [`KeyedSingleton::atomic`] for more details.
1258    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1259        KeyedSingleton::new(
1260            self.location.tick.l.clone(),
1261            HydroNode::EndAtomic {
1262                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1263                metadata: self
1264                    .location
1265                    .tick
1266                    .l
1267                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1268            },
1269        )
1270    }
1271}
1272
1273impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1274    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1275    /// tick `T` always has the entries of `self` at tick `T - 1`.
1276    ///
1277    /// At tick `0`, the output has no entries, since there is no previous tick.
1278    ///
1279    /// This operator enables stateful iterative processing with ticks, by sending data from one
1280    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1281    ///
1282    /// # Example
1283    /// ```rust
1284    /// # #[cfg(feature = "deploy")] {
1285    /// # use hydro_lang::prelude::*;
1286    /// # use futures::StreamExt;
1287    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1288    /// let tick = process.tick();
1289    /// # // ticks are lazy by default, forces the second tick to run
1290    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1291    /// # let batch_first_tick = process
1292    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1293    /// #   .batch(&tick, nondet!(/** test */))
1294    /// #   .into_keyed();
1295    /// # let batch_second_tick = process
1296    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1297    /// #   .batch(&tick, nondet!(/** test */))
1298    /// #   .into_keyed()
1299    /// #   .defer_tick(); // appears on the second tick
1300    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1301    /// # batch_first_tick.chain(batch_second_tick).first();
1302    /// input_batch.clone().filter_key_not_in(
1303    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1304    /// )
1305    /// # .entries().all_ticks()
1306    /// # }, |mut stream| async move {
1307    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1308    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1309    /// #     assert_eq!(stream.next().await.unwrap(), w);
1310    /// # }
1311    /// # }));
1312    /// # }
1313    /// ```
1314    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1315        KeyedSingleton::new(
1316            self.location.clone(),
1317            HydroNode::DeferTick {
1318                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1319                metadata: self
1320                    .location
1321                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1322            },
1323        )
1324    }
1325}
1326
1327impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1328where
1329    L: Location<'a>,
1330{
1331    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1332    /// point in time.
1333    ///
1334    /// # Non-Determinism
1335    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1336    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1337    pub fn snapshot(
1338        self,
1339        tick: &Tick<L>,
1340        _nondet: NonDet,
1341    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1342        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1343        KeyedSingleton::new(
1344            tick.clone(),
1345            HydroNode::Batch {
1346                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1347                metadata: tick
1348                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1349            },
1350        )
1351    }
1352}
1353
1354impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1355where
1356    L: Location<'a> + NoTick,
1357{
1358    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1359    /// state of the keyed singleton being atomically processed.
1360    ///
1361    /// # Non-Determinism
1362    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1363    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1364    pub fn snapshot_atomic(
1365        self,
1366        tick: &Tick<L>,
1367        _nondet: NonDet,
1368    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1369        KeyedSingleton::new(
1370            tick.clone(),
1371            HydroNode::Batch {
1372                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1373                metadata: tick
1374                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1375            },
1376        )
1377    }
1378}
1379
1380impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1381where
1382    L: Location<'a>,
1383{
1384    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1385    ///
1386    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1387    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1388    /// is filtered out.
1389    ///
1390    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1391    /// not modify or take ownership of the values. If you need to modify the values while filtering
1392    /// use [`KeyedSingleton::filter_map`] instead.
1393    ///
1394    /// # Example
1395    /// ```rust
1396    /// # #[cfg(feature = "deploy")] {
1397    /// # use hydro_lang::prelude::*;
1398    /// # use futures::StreamExt;
1399    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1400    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1401    /// # process
1402    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1403    /// #     .into_keyed()
1404    /// #     .first();
1405    /// keyed_singleton.filter(q!(|&v| v > 1))
1406    /// #   .entries()
1407    /// # }, |mut stream| async move {
1408    /// // { 1: 2, 2: 4 }
1409    /// # let mut results = Vec::new();
1410    /// # for _ in 0..2 {
1411    /// #     results.push(stream.next().await.unwrap());
1412    /// # }
1413    /// # results.sort();
1414    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1415    /// # }));
1416    /// # }
1417    /// ```
1418    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1419    where
1420        F: Fn(&V) -> bool + 'a,
1421    {
1422        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1423        let filter_f = q!({
1424            let orig = f;
1425            move |t: &(_, _)| orig(&t.1)
1426        })
1427        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1428        .into();
1429
1430        KeyedSingleton::new(
1431            self.location.clone(),
1432            HydroNode::Filter {
1433                f: filter_f,
1434                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1435                metadata: self
1436                    .location
1437                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1438            },
1439        )
1440    }
1441
1442    /// An operator that both filters and maps values. It yields only the key-value pairs where
1443    /// the supplied closure `f` returns `Some(value)`.
1444    ///
1445    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1446    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1447    /// If it returns `None`, the key-value pair is filtered out.
1448    ///
1449    /// # Example
1450    /// ```rust
1451    /// # #[cfg(feature = "deploy")] {
1452    /// # use hydro_lang::prelude::*;
1453    /// # use futures::StreamExt;
1454    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1455    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1456    /// # process
1457    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1458    /// #     .into_keyed()
1459    /// #     .first();
1460    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1461    /// #   .entries()
1462    /// # }, |mut stream| async move {
1463    /// // { 1: 42, 3: 100 }
1464    /// # let mut results = Vec::new();
1465    /// # for _ in 0..2 {
1466    /// #     results.push(stream.next().await.unwrap());
1467    /// # }
1468    /// # results.sort();
1469    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1470    /// # }));
1471    /// # }
1472    /// ```
1473    pub fn filter_map<F, U>(
1474        self,
1475        f: impl IntoQuotedMut<'a, F, L> + Copy,
1476    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
1477    where
1478        F: Fn(V) -> Option<U> + 'a,
1479    {
1480        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1481        let filter_map_f = q!({
1482            let orig = f;
1483            move |(k, v)| orig(v).map(|o| (k, o))
1484        })
1485        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1486        .into();
1487
1488        KeyedSingleton::new(
1489            self.location.clone(),
1490            HydroNode::FilterMap {
1491                f: filter_map_f,
1492                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1493                metadata: self.location.new_node_metadata(KeyedSingleton::<
1494                    K,
1495                    U,
1496                    L,
1497                    B::EraseMonotonic,
1498                >::collection_kind()),
1499            },
1500        )
1501    }
1502
1503    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1504    /// arrived since the previous batch was released.
1505    ///
1506    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1507    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1508    ///
1509    /// # Non-Determinism
1510    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1511    /// has a non-deterministic set of key-value pairs.
1512    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1513    where
1514        L: NoTick,
1515    {
1516        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1517        KeyedSingleton::new(
1518            tick.clone(),
1519            HydroNode::Batch {
1520                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1521                metadata: tick
1522                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1523            },
1524        )
1525    }
1526}
1527
1528impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1529where
1530    L: Location<'a> + NoTick,
1531{
1532    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1533    /// atomically processed.
1534    ///
1535    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1536    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1537    ///
1538    /// # Non-Determinism
1539    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1540    /// has a non-deterministic set of key-value pairs.
1541    pub fn batch_atomic(
1542        self,
1543        tick: &Tick<L>,
1544        nondet: NonDet,
1545    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1546        let _ = nondet;
1547        KeyedSingleton::new(
1548            tick.clone(),
1549            HydroNode::Batch {
1550                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1551                metadata: tick
1552                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1553            },
1554        )
1555    }
1556}
1557
1558#[cfg(test)]
1559mod tests {
1560    #[cfg(feature = "deploy")]
1561    use futures::{SinkExt, StreamExt};
1562    #[cfg(feature = "deploy")]
1563    use hydro_deploy::Deployment;
1564    #[cfg(any(feature = "deploy", feature = "sim"))]
1565    use stageleft::q;
1566
1567    #[cfg(any(feature = "deploy", feature = "sim"))]
1568    use crate::compile::builder::FlowBuilder;
1569    #[cfg(any(feature = "deploy", feature = "sim"))]
1570    use crate::location::Location;
1571    #[cfg(any(feature = "deploy", feature = "sim"))]
1572    use crate::nondet::nondet;
1573
1574    #[cfg(feature = "deploy")]
1575    #[tokio::test]
1576    async fn key_count_bounded_value() {
1577        let mut deployment = Deployment::new();
1578
1579        let mut flow = FlowBuilder::new();
1580        let node = flow.process::<()>();
1581        let external = flow.external::<()>();
1582
1583        let (input_port, input) = node.source_external_bincode(&external);
1584        let out = input
1585            .into_keyed()
1586            .first()
1587            .key_count()
1588            .sample_eager(nondet!(/** test */))
1589            .send_bincode_external(&external);
1590
1591        let nodes = flow
1592            .with_process(&node, deployment.Localhost())
1593            .with_external(&external, deployment.Localhost())
1594            .deploy(&mut deployment);
1595
1596        deployment.deploy().await.unwrap();
1597
1598        let mut external_in = nodes.connect(input_port).await;
1599        let mut external_out = nodes.connect(out).await;
1600
1601        deployment.start().await.unwrap();
1602
1603        assert_eq!(external_out.next().await.unwrap(), 0);
1604
1605        external_in.send((1, 1)).await.unwrap();
1606        assert_eq!(external_out.next().await.unwrap(), 1);
1607
1608        external_in.send((2, 2)).await.unwrap();
1609        assert_eq!(external_out.next().await.unwrap(), 2);
1610    }
1611
1612    #[cfg(feature = "deploy")]
1613    #[tokio::test]
1614    async fn key_count_unbounded_value() {
1615        let mut deployment = Deployment::new();
1616
1617        let mut flow = FlowBuilder::new();
1618        let node = flow.process::<()>();
1619        let external = flow.external::<()>();
1620
1621        let (input_port, input) = node.source_external_bincode(&external);
1622        let out = input
1623            .into_keyed()
1624            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1625            .key_count()
1626            .sample_eager(nondet!(/** test */))
1627            .send_bincode_external(&external);
1628
1629        let nodes = flow
1630            .with_process(&node, deployment.Localhost())
1631            .with_external(&external, deployment.Localhost())
1632            .deploy(&mut deployment);
1633
1634        deployment.deploy().await.unwrap();
1635
1636        let mut external_in = nodes.connect(input_port).await;
1637        let mut external_out = nodes.connect(out).await;
1638
1639        deployment.start().await.unwrap();
1640
1641        assert_eq!(external_out.next().await.unwrap(), 0);
1642
1643        external_in.send((1, 1)).await.unwrap();
1644        assert_eq!(external_out.next().await.unwrap(), 1);
1645
1646        external_in.send((1, 2)).await.unwrap();
1647        assert_eq!(external_out.next().await.unwrap(), 1);
1648
1649        external_in.send((2, 2)).await.unwrap();
1650        assert_eq!(external_out.next().await.unwrap(), 2);
1651
1652        external_in.send((1, 1)).await.unwrap();
1653        assert_eq!(external_out.next().await.unwrap(), 2);
1654
1655        external_in.send((3, 1)).await.unwrap();
1656        assert_eq!(external_out.next().await.unwrap(), 3);
1657    }
1658
1659    #[cfg(feature = "deploy")]
1660    #[tokio::test]
1661    async fn into_singleton_bounded_value() {
1662        let mut deployment = Deployment::new();
1663
1664        let mut flow = FlowBuilder::new();
1665        let node = flow.process::<()>();
1666        let external = flow.external::<()>();
1667
1668        let (input_port, input) = node.source_external_bincode(&external);
1669        let out = input
1670            .into_keyed()
1671            .first()
1672            .into_singleton()
1673            .sample_eager(nondet!(/** test */))
1674            .send_bincode_external(&external);
1675
1676        let nodes = flow
1677            .with_process(&node, deployment.Localhost())
1678            .with_external(&external, deployment.Localhost())
1679            .deploy(&mut deployment);
1680
1681        deployment.deploy().await.unwrap();
1682
1683        let mut external_in = nodes.connect(input_port).await;
1684        let mut external_out = nodes.connect(out).await;
1685
1686        deployment.start().await.unwrap();
1687
1688        assert_eq!(
1689            external_out.next().await.unwrap(),
1690            std::collections::HashMap::new()
1691        );
1692
1693        external_in.send((1, 1)).await.unwrap();
1694        assert_eq!(
1695            external_out.next().await.unwrap(),
1696            vec![(1, 1)].into_iter().collect()
1697        );
1698
1699        external_in.send((2, 2)).await.unwrap();
1700        assert_eq!(
1701            external_out.next().await.unwrap(),
1702            vec![(1, 1), (2, 2)].into_iter().collect()
1703        );
1704    }
1705
1706    #[cfg(feature = "deploy")]
1707    #[tokio::test]
1708    async fn into_singleton_unbounded_value() {
1709        let mut deployment = Deployment::new();
1710
1711        let mut flow = FlowBuilder::new();
1712        let node = flow.process::<()>();
1713        let external = flow.external::<()>();
1714
1715        let (input_port, input) = node.source_external_bincode(&external);
1716        let out = input
1717            .into_keyed()
1718            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1719            .into_singleton()
1720            .sample_eager(nondet!(/** test */))
1721            .send_bincode_external(&external);
1722
1723        let nodes = flow
1724            .with_process(&node, deployment.Localhost())
1725            .with_external(&external, deployment.Localhost())
1726            .deploy(&mut deployment);
1727
1728        deployment.deploy().await.unwrap();
1729
1730        let mut external_in = nodes.connect(input_port).await;
1731        let mut external_out = nodes.connect(out).await;
1732
1733        deployment.start().await.unwrap();
1734
1735        assert_eq!(
1736            external_out.next().await.unwrap(),
1737            std::collections::HashMap::new()
1738        );
1739
1740        external_in.send((1, 1)).await.unwrap();
1741        assert_eq!(
1742            external_out.next().await.unwrap(),
1743            vec![(1, 1)].into_iter().collect()
1744        );
1745
1746        external_in.send((1, 2)).await.unwrap();
1747        assert_eq!(
1748            external_out.next().await.unwrap(),
1749            vec![(1, 2)].into_iter().collect()
1750        );
1751
1752        external_in.send((2, 2)).await.unwrap();
1753        assert_eq!(
1754            external_out.next().await.unwrap(),
1755            vec![(1, 2), (2, 1)].into_iter().collect()
1756        );
1757
1758        external_in.send((1, 1)).await.unwrap();
1759        assert_eq!(
1760            external_out.next().await.unwrap(),
1761            vec![(1, 3), (2, 1)].into_iter().collect()
1762        );
1763
1764        external_in.send((3, 1)).await.unwrap();
1765        assert_eq!(
1766            external_out.next().await.unwrap(),
1767            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1768        );
1769    }
1770
1771    #[cfg(feature = "sim")]
1772    #[test]
1773    fn sim_unbounded_singleton_snapshot() {
1774        let mut flow = FlowBuilder::new();
1775        let node = flow.process::<()>();
1776
1777        let (input_port, input) = node.sim_input();
1778        let output = input
1779            .into_keyed()
1780            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1781            .snapshot(&node.tick(), nondet!(/** test */))
1782            .entries()
1783            .all_ticks()
1784            .sim_output();
1785
1786        let count = flow.sim().exhaustive(async || {
1787            input_port.send((1, 123));
1788            input_port.send((1, 456));
1789            input_port.send((2, 123));
1790
1791            let all = output.collect_sorted::<Vec<_>>().await;
1792            assert_eq!(all.last().unwrap(), &(2, 1));
1793        });
1794
1795        assert_eq!(count, 8);
1796    }
1797
1798    #[cfg(feature = "deploy")]
1799    #[tokio::test]
1800    async fn join_keyed_stream() {
1801        let mut deployment = Deployment::new();
1802
1803        let mut flow = FlowBuilder::new();
1804        let node = flow.process::<()>();
1805        let external = flow.external::<()>();
1806
1807        let tick = node.tick();
1808        let keyed_data = node
1809            .source_iter(q!(vec![(1, 10), (2, 20)]))
1810            .into_keyed()
1811            .batch(&tick, nondet!(/** test */))
1812            .first();
1813        let requests = node
1814            .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1815            .into_keyed()
1816            .batch(&tick, nondet!(/** test */));
1817
1818        let out = keyed_data
1819            .join_keyed_stream(requests)
1820            .entries()
1821            .all_ticks()
1822            .send_bincode_external(&external);
1823
1824        let nodes = flow
1825            .with_process(&node, deployment.Localhost())
1826            .with_external(&external, deployment.Localhost())
1827            .deploy(&mut deployment);
1828
1829        deployment.deploy().await.unwrap();
1830
1831        let mut external_out = nodes.connect(out).await;
1832
1833        deployment.start().await.unwrap();
1834
1835        let mut results = vec![];
1836        for _ in 0..2 {
1837            results.push(external_out.next().await.unwrap());
1838        }
1839        results.sort();
1840
1841        assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1842    }
1843}