Skip to main content

hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, MinOrder, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::DynLocation;
21use crate::location::external_process::ExternalBincodeStream;
22use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
23use crate::networking::{NetworkFor, TCP};
24use crate::nondet::NonDet;
25#[cfg(feature = "sim")]
26use crate::sim::SimReceiver;
27use crate::staging_util::get_this_crate;
28
29// same as the one in `hydro_std`, but internal use only
30fn track_membership<'a, C, L: Location<'a> + NoTick>(
31    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
32) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
33    membership.fold(
34        q!(|| false),
35        q!(|present, event| {
36            match event {
37                MembershipEvent::Joined => *present = true,
38                MembershipEvent::Left => *present = false,
39            }
40        }),
41    )
42}
43
44fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
45    let root = get_this_crate();
46
47    if is_demux {
48        parse_quote! {
49            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
50                |(id, data)| {
51                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
52                }
53            )
54        }
55    } else {
56        parse_quote! {
57            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
58                |data| {
59                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
60                }
61            )
62        }
63    }
64}
65
66pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
67    serialize_bincode_with_type(is_demux, &quote_type::<T>())
68}
69
70fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
71    let root = get_this_crate();
72    if let Some(c_type) = tagged {
73        parse_quote! {
74            |res| {
75                let (id, b) = res.unwrap();
76                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
77            }
78        }
79    } else {
80        parse_quote! {
81            |res| {
82                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
83            }
84        }
85    }
86}
87
88pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
89    deserialize_bincode_with_type(tagged, &quote_type::<T>())
90}
91
92impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
93    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
94    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
95    /// using [`bincode`] to serialize/deserialize messages.
96    ///
97    /// The returned stream captures the elements received at the destination, where values will
98    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
99    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
100    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
101    /// dropped no further messages will be sent.
102    ///
103    /// # Example
104    /// ```rust
105    /// # #[cfg(feature = "deploy")] {
106    /// # use hydro_lang::prelude::*;
107    /// # use futures::StreamExt;
108    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
109    /// let p1 = flow.process::<()>();
110    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
111    /// let p2 = flow.process::<()>();
112    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
113    /// // 1, 2, 3
114    /// # on_p2.send_bincode(&p_out)
115    /// # }, |mut stream| async move {
116    /// # for w in 1..=3 {
117    /// #     assert_eq!(stream.next().await, Some(w));
118    /// # }
119    /// # }));
120    /// # }
121    /// ```
122    pub fn send_bincode<L2>(
123        self,
124        other: &Process<'a, L2>,
125    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
126    where
127        T: Serialize + DeserializeOwned,
128    {
129        self.send(other, TCP.fail_stop().bincode())
130    }
131
132    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
133    /// using the configuration in `via` to set up the message transport.
134    ///
135    /// The returned stream captures the elements received at the destination, where values will
136    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
137    /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
138    /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
139    /// dropped no further messages will be sent.
140    ///
141    /// # Example
142    /// ```rust
143    /// # #[cfg(feature = "deploy")] {
144    /// # use hydro_lang::prelude::*;
145    /// # use futures::StreamExt;
146    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
147    /// let p1 = flow.process::<()>();
148    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
149    /// let p2 = flow.process::<()>();
150    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
151    /// // 1, 2, 3
152    /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
153    /// # }, |mut stream| async move {
154    /// # for w in 1..=3 {
155    /// #     assert_eq!(stream.next().await, Some(w));
156    /// # }
157    /// # }));
158    /// # }
159    /// ```
160    pub fn send<L2, N: NetworkFor<T>>(
161        self,
162        to: &Process<'a, L2>,
163        via: N,
164    ) -> Stream<T, Process<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
165    where
166        T: Serialize + DeserializeOwned,
167        O: MinOrder<N::OrderingGuarantee>,
168    {
169        let serialize_pipeline = Some(N::serialize_thunk(false));
170        let deserialize_pipeline = Some(N::deserialize_thunk(None));
171
172        let name = via.name();
173        if to.multiversioned() && name.is_none() {
174            panic!(
175                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
176            );
177        }
178
179        Stream::new(
180            to.clone(),
181            HydroNode::Network {
182                name: name.map(ToOwned::to_owned),
183                networking_info: N::networking_info(),
184                serialize_fn: serialize_pipeline.map(|e| e.into()),
185                instantiate_fn: DebugInstantiate::Building,
186                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
187                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
188                metadata: to.new_node_metadata(Stream::<
189                    T,
190                    Process<'a, L2>,
191                    Unbounded,
192                    <O as MinOrder<N::OrderingGuarantee>>::Min,
193                    R,
194                >::collection_kind()),
195            },
196        )
197    }
198
199    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
200    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
201    /// using [`bincode`] to serialize/deserialize messages.
202    ///
203    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
204    /// membership information. This is a common pattern in distributed systems for broadcasting data to
205    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
206    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
207    /// each element to all cluster members.
208    ///
209    /// # Non-Determinism
210    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
211    /// to the current cluster members _at that point in time_. Depending on when we are notified of
212    /// membership changes, we will broadcast each element to different members.
213    ///
214    /// # Example
215    /// ```rust
216    /// # #[cfg(feature = "deploy")] {
217    /// # use hydro_lang::prelude::*;
218    /// # use futures::StreamExt;
219    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
220    /// let p1 = flow.process::<()>();
221    /// let workers: Cluster<()> = flow.cluster::<()>();
222    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
223    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
224    /// # on_worker.send_bincode(&p2).entries()
225    /// // if there are 4 members in the cluster, each receives one element
226    /// // - MemberId::<()>(0): [123]
227    /// // - MemberId::<()>(1): [123]
228    /// // - MemberId::<()>(2): [123]
229    /// // - MemberId::<()>(3): [123]
230    /// # }, |mut stream| async move {
231    /// # let mut results = Vec::new();
232    /// # for w in 0..4 {
233    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
234    /// # }
235    /// # results.sort();
236    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
237    /// # }));
238    /// # }
239    /// ```
240    pub fn broadcast_bincode<L2: 'a>(
241        self,
242        other: &Cluster<'a, L2>,
243        nondet_membership: NonDet,
244    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
245    where
246        T: Clone + Serialize + DeserializeOwned,
247    {
248        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
249    }
250
251    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
252    /// using the configuration in `via` to set up the message transport.
253    ///
254    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
255    /// membership information. This is a common pattern in distributed systems for broadcasting data to
256    /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
257    /// target specific members, `broadcast` takes a stream of **only data elements** and sends
258    /// each element to all cluster members.
259    ///
260    /// # Non-Determinism
261    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
262    /// to the current cluster members _at that point in time_. Depending on when we are notified of
263    /// membership changes, we will broadcast each element to different members.
264    ///
265    /// # Example
266    /// ```rust
267    /// # #[cfg(feature = "deploy")] {
268    /// # use hydro_lang::prelude::*;
269    /// # use futures::StreamExt;
270    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
271    /// let p1 = flow.process::<()>();
272    /// let workers: Cluster<()> = flow.cluster::<()>();
273    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
274    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
275    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
276    /// // if there are 4 members in the cluster, each receives one element
277    /// // - MemberId::<()>(0): [123]
278    /// // - MemberId::<()>(1): [123]
279    /// // - MemberId::<()>(2): [123]
280    /// // - MemberId::<()>(3): [123]
281    /// # }, |mut stream| async move {
282    /// # let mut results = Vec::new();
283    /// # for w in 0..4 {
284    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
285    /// # }
286    /// # results.sort();
287    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
288    /// # }));
289    /// # }
290    /// ```
291    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
292        self,
293        to: &Cluster<'a, L2>,
294        via: N,
295        nondet_membership: NonDet,
296    ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
297    where
298        T: Clone + Serialize + DeserializeOwned,
299        O: MinOrder<N::OrderingGuarantee>,
300    {
301        let ids = track_membership(self.location.source_cluster_members(to));
302        sliced! {
303            let members_snapshot = use(ids, nondet_membership);
304            let elements = use(self, nondet_membership);
305
306            let current_members = members_snapshot.filter(q!(|b| *b));
307            elements.repeat_with_keys(current_members)
308        }
309        .demux(to, via)
310    }
311
312    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
313    /// serialization. The external process can receive these elements by establishing a TCP
314    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
315    ///
316    /// # Example
317    /// ```rust
318    /// # #[cfg(feature = "deploy")] {
319    /// # use hydro_lang::prelude::*;
320    /// # use futures::StreamExt;
321    /// # tokio_test::block_on(async move {
322    /// let mut flow = FlowBuilder::new();
323    /// let process = flow.process::<()>();
324    /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
325    /// let external = flow.external::<()>();
326    /// let external_handle = numbers.send_bincode_external(&external);
327    ///
328    /// let mut deployment = hydro_deploy::Deployment::new();
329    /// let nodes = flow
330    ///     .with_process(&process, deployment.Localhost())
331    ///     .with_external(&external, deployment.Localhost())
332    ///     .deploy(&mut deployment);
333    ///
334    /// deployment.deploy().await.unwrap();
335    /// // establish the TCP connection
336    /// let mut external_recv_stream = nodes.connect(external_handle).await;
337    /// deployment.start().await.unwrap();
338    ///
339    /// for w in 1..=3 {
340    ///     assert_eq!(external_recv_stream.next().await, Some(w));
341    /// }
342    /// # });
343    /// # }
344    /// ```
345    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
346    where
347        T: Serialize + DeserializeOwned,
348    {
349        let serialize_pipeline = Some(serialize_bincode::<T>(false));
350
351        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
352
353        let external_port_id = flow_state_borrow.next_external_port();
354
355        flow_state_borrow.push_root(HydroRoot::SendExternal {
356            to_external_key: other.key,
357            to_port_id: external_port_id,
358            to_many: false,
359            unpaired: true,
360            serialize_fn: serialize_pipeline.map(|e| e.into()),
361            instantiate_fn: DebugInstantiate::Building,
362            input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
363            op_metadata: HydroIrOpMetadata::new(),
364        });
365
366        ExternalBincodeStream {
367            process_key: other.key,
368            port_id: external_port_id,
369            _phantom: PhantomData,
370        }
371    }
372
373    #[cfg(feature = "sim")]
374    /// Sets up a simulation output port for this stream, allowing test code to receive elements
375    /// sent to this stream during simulation.
376    pub fn sim_output(self) -> SimReceiver<T, O, R>
377    where
378        T: Serialize + DeserializeOwned,
379    {
380        let external_location: External<'a, ()> = External {
381            key: LocationKey::FIRST,
382            flow_state: self.location.flow_state().clone(),
383            _phantom: PhantomData,
384        };
385
386        let external = self.send_bincode_external(&external_location);
387
388        SimReceiver(external.port_id, PhantomData)
389    }
390}
391
392impl<'a, T, L: Location<'a> + NoTick, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce> {
393    /// Creates an external output for embedded deployment mode.
394    ///
395    /// The `name` parameter specifies the name of the field in the generated
396    /// `EmbeddedOutputs` struct that will receive elements from this stream.
397    /// The generated function will accept an `EmbeddedOutputs` struct with an
398    /// `impl FnMut(T)` field with this name.
399    pub fn embedded_output(self, name: impl Into<String>) {
400        let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
401
402        self.location
403            .flow_state()
404            .borrow_mut()
405            .push_root(HydroRoot::EmbeddedOutput {
406                ident,
407                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
408                op_metadata: HydroIrOpMetadata::new(),
409            });
410    }
411}
412
413impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
414    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
415{
416    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
417    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
418    /// using [`bincode`] to serialize/deserialize messages.
419    ///
420    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
421    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
422    /// this API allows precise targeting of specific cluster members rather than broadcasting to
423    /// all members.
424    ///
425    /// # Example
426    /// ```rust
427    /// # #[cfg(feature = "deploy")] {
428    /// # use hydro_lang::prelude::*;
429    /// # use futures::StreamExt;
430    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
431    /// let p1 = flow.process::<()>();
432    /// let workers: Cluster<()> = flow.cluster::<()>();
433    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
434    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
435    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
436    ///     .demux_bincode(&workers);
437    /// # on_worker.send_bincode(&p2).entries()
438    /// // if there are 4 members in the cluster, each receives one element
439    /// // - MemberId::<()>(0): [0]
440    /// // - MemberId::<()>(1): [1]
441    /// // - MemberId::<()>(2): [2]
442    /// // - MemberId::<()>(3): [3]
443    /// # }, |mut stream| async move {
444    /// # let mut results = Vec::new();
445    /// # for w in 0..4 {
446    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
447    /// # }
448    /// # results.sort();
449    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
450    /// # }));
451    /// # }
452    /// ```
453    pub fn demux_bincode(
454        self,
455        other: &Cluster<'a, L2>,
456    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
457    where
458        T: Serialize + DeserializeOwned,
459    {
460        self.demux(other, TCP.fail_stop().bincode())
461    }
462
463    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
464    /// using the configuration in `via` to set up the message transport.
465    ///
466    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
467    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
468    /// this API allows precise targeting of specific cluster members rather than broadcasting to
469    /// all members.
470    ///
471    /// # Example
472    /// ```rust
473    /// # #[cfg(feature = "deploy")] {
474    /// # use hydro_lang::prelude::*;
475    /// # use futures::StreamExt;
476    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
477    /// let p1 = flow.process::<()>();
478    /// let workers: Cluster<()> = flow.cluster::<()>();
479    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
480    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
481    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
482    ///     .demux(&workers, TCP.fail_stop().bincode());
483    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
484    /// // if there are 4 members in the cluster, each receives one element
485    /// // - MemberId::<()>(0): [0]
486    /// // - MemberId::<()>(1): [1]
487    /// // - MemberId::<()>(2): [2]
488    /// // - MemberId::<()>(3): [3]
489    /// # }, |mut stream| async move {
490    /// # let mut results = Vec::new();
491    /// # for w in 0..4 {
492    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
493    /// # }
494    /// # results.sort();
495    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
496    /// # }));
497    /// # }
498    /// ```
499    pub fn demux<N: NetworkFor<T>>(
500        self,
501        to: &Cluster<'a, L2>,
502        via: N,
503    ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
504    where
505        T: Serialize + DeserializeOwned,
506        O: MinOrder<N::OrderingGuarantee>,
507    {
508        self.into_keyed().demux(to, via)
509    }
510}
511
512impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
513    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
514    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
515    /// [`bincode`] to serialize/deserialize messages.
516    ///
517    /// This provides load balancing by evenly distributing work across cluster members. The
518    /// distribution is deterministic based on element order - the first element goes to member 0,
519    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
520    ///
521    /// # Non-Determinism
522    /// The set of cluster members may asynchronously change over time. Each element is distributed
523    /// based on the current cluster membership _at that point in time_. Depending on when cluster
524    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
525    /// membership is stable, the order of members in the round-robin pattern may change across runs.
526    ///
527    /// # Ordering Requirements
528    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
529    /// order of messages and retries affects the round-robin pattern.
530    ///
531    /// # Example
532    /// ```rust
533    /// # #[cfg(feature = "deploy")] {
534    /// # use hydro_lang::prelude::*;
535    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
536    /// # use futures::StreamExt;
537    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
538    /// let p1 = flow.process::<()>();
539    /// let workers: Cluster<()> = flow.cluster::<()>();
540    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
541    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
542    /// on_worker.send_bincode(&p2)
543    /// # .first().values() // we use first to assert that each member gets one element
544    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
545    /// // - MemberId::<()>(?): [1]
546    /// // - MemberId::<()>(?): [2]
547    /// // - MemberId::<()>(?): [3]
548    /// // - MemberId::<()>(?): [4]
549    /// # }, |mut stream| async move {
550    /// # let mut results = Vec::new();
551    /// # for w in 0..4 {
552    /// #     results.push(stream.next().await.unwrap());
553    /// # }
554    /// # results.sort();
555    /// # assert_eq!(results, vec![1, 2, 3, 4]);
556    /// # }));
557    /// # }
558    /// ```
559    pub fn round_robin_bincode<L2: 'a>(
560        self,
561        other: &Cluster<'a, L2>,
562        nondet_membership: NonDet,
563    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
564    where
565        T: Serialize + DeserializeOwned,
566    {
567        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
568    }
569
570    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
571    /// the configuration in `via` to set up the message transport.
572    ///
573    /// This provides load balancing by evenly distributing work across cluster members. The
574    /// distribution is deterministic based on element order - the first element goes to member 0,
575    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
576    ///
577    /// # Non-Determinism
578    /// The set of cluster members may asynchronously change over time. Each element is distributed
579    /// based on the current cluster membership _at that point in time_. Depending on when cluster
580    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
581    /// membership is stable, the order of members in the round-robin pattern may change across runs.
582    ///
583    /// # Ordering Requirements
584    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
585    /// order of messages and retries affects the round-robin pattern.
586    ///
587    /// # Example
588    /// ```rust
589    /// # #[cfg(feature = "deploy")] {
590    /// # use hydro_lang::prelude::*;
591    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
592    /// # use futures::StreamExt;
593    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
594    /// let p1 = flow.process::<()>();
595    /// let workers: Cluster<()> = flow.cluster::<()>();
596    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
597    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
598    /// on_worker.send(&p2, TCP.fail_stop().bincode())
599    /// # .first().values() // we use first to assert that each member gets one element
600    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
601    /// // - MemberId::<()>(?): [1]
602    /// // - MemberId::<()>(?): [2]
603    /// // - MemberId::<()>(?): [3]
604    /// // - MemberId::<()>(?): [4]
605    /// # }, |mut stream| async move {
606    /// # let mut results = Vec::new();
607    /// # for w in 0..4 {
608    /// #     results.push(stream.next().await.unwrap());
609    /// # }
610    /// # results.sort();
611    /// # assert_eq!(results, vec![1, 2, 3, 4]);
612    /// # }));
613    /// # }
614    /// ```
615    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
616        self,
617        to: &Cluster<'a, L2>,
618        via: N,
619        nondet_membership: NonDet,
620    ) -> Stream<T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
621    where
622        T: Serialize + DeserializeOwned,
623    {
624        let ids = track_membership(self.location.source_cluster_members(to));
625        sliced! {
626            let members_snapshot = use(ids, nondet_membership);
627            let elements = use(self.enumerate(), nondet_membership);
628
629            let current_members = members_snapshot
630                .filter(q!(|b| *b))
631                .keys()
632                .assume_ordering::<TotalOrder>(nondet_membership)
633                .collect_vec();
634
635            elements
636                .cross_singleton(current_members)
637                .filter_map(q!(|(data, members)| {
638                    if members.is_empty() {
639                        None
640                    } else {
641                        Some((members[data.0 % members.len()].clone(), data.1))
642                    }
643                }))
644        }
645        .demux(to, via)
646    }
647}
648
649impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
650    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
651    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
652    /// [`bincode`] to serialize/deserialize messages.
653    ///
654    /// This provides load balancing by evenly distributing work across cluster members. The
655    /// distribution is deterministic based on element order - the first element goes to member 0,
656    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
657    ///
658    /// # Non-Determinism
659    /// The set of cluster members may asynchronously change over time. Each element is distributed
660    /// based on the current cluster membership _at that point in time_. Depending on when cluster
661    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
662    /// membership is stable, the order of members in the round-robin pattern may change across runs.
663    ///
664    /// # Ordering Requirements
665    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
666    /// order of messages and retries affects the round-robin pattern.
667    ///
668    /// # Example
669    /// ```rust
670    /// # #[cfg(feature = "deploy")] {
671    /// # use hydro_lang::prelude::*;
672    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
673    /// # use hydro_lang::location::MemberId;
674    /// # use futures::StreamExt;
675    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
676    /// let p1 = flow.process::<()>();
677    /// let workers1: Cluster<()> = flow.cluster::<()>();
678    /// let workers2: Cluster<()> = flow.cluster::<()>();
679    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
680    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
681    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
682    /// on_worker2.send_bincode(&p2)
683    /// # .entries()
684    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
685    /// # }, |mut stream| async move {
686    /// # let mut results = Vec::new();
687    /// # let mut locations = std::collections::HashSet::new();
688    /// # for w in 0..=16 {
689    /// #     let (location, v) = stream.next().await.unwrap();
690    /// #     locations.insert(location);
691    /// #     results.push(v);
692    /// # }
693    /// # results.sort();
694    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
695    /// # assert_eq!(locations.len(), 16);
696    /// # }));
697    /// # }
698    /// ```
699    pub fn round_robin_bincode<L2: 'a>(
700        self,
701        other: &Cluster<'a, L2>,
702        nondet_membership: NonDet,
703    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
704    where
705        T: Serialize + DeserializeOwned,
706    {
707        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
708    }
709
710    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
711    /// the configuration in `via` to set up the message transport.
712    ///
713    /// This provides load balancing by evenly distributing work across cluster members. The
714    /// distribution is deterministic based on element order - the first element goes to member 0,
715    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
716    ///
717    /// # Non-Determinism
718    /// The set of cluster members may asynchronously change over time. Each element is distributed
719    /// based on the current cluster membership _at that point in time_. Depending on when cluster
720    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
721    /// membership is stable, the order of members in the round-robin pattern may change across runs.
722    ///
723    /// # Ordering Requirements
724    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
725    /// order of messages and retries affects the round-robin pattern.
726    ///
727    /// # Example
728    /// ```rust
729    /// # #[cfg(feature = "deploy")] {
730    /// # use hydro_lang::prelude::*;
731    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
732    /// # use hydro_lang::location::MemberId;
733    /// # use futures::StreamExt;
734    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
735    /// let p1 = flow.process::<()>();
736    /// let workers1: Cluster<()> = flow.cluster::<()>();
737    /// let workers2: Cluster<()> = flow.cluster::<()>();
738    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
739    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
740    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
741    /// on_worker2.send(&p2, TCP.fail_stop().bincode())
742    /// # .entries()
743    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
744    /// # }, |mut stream| async move {
745    /// # let mut results = Vec::new();
746    /// # let mut locations = std::collections::HashSet::new();
747    /// # for w in 0..=16 {
748    /// #     let (location, v) = stream.next().await.unwrap();
749    /// #     locations.insert(location);
750    /// #     results.push(v);
751    /// # }
752    /// # results.sort();
753    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
754    /// # assert_eq!(locations.len(), 16);
755    /// # }));
756    /// # }
757    /// ```
758    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
759        self,
760        to: &Cluster<'a, L2>,
761        via: N,
762        nondet_membership: NonDet,
763    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
764    where
765        T: Serialize + DeserializeOwned,
766    {
767        let ids = track_membership(self.location.source_cluster_members(to));
768        sliced! {
769            let members_snapshot = use(ids, nondet_membership);
770            let elements = use(self.enumerate(), nondet_membership);
771
772            let current_members = members_snapshot
773                .filter(q!(|b| *b))
774                .keys()
775                .assume_ordering::<TotalOrder>(nondet_membership)
776                .collect_vec();
777
778            elements
779                .cross_singleton(current_members)
780                .filter_map(q!(|(data, members)| {
781                    if members.is_empty() {
782                        None
783                    } else {
784                        Some((members[data.0 % members.len()].clone(), data.1))
785                    }
786                }))
787        }
788        .demux(to, via)
789    }
790}
791
792impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
793    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
794    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
795    /// using [`bincode`] to serialize/deserialize messages.
796    ///
797    /// Each cluster member sends its local stream elements, and they are collected at the destination
798    /// as a [`KeyedStream`] where keys identify the source cluster member.
799    ///
800    /// # Example
801    /// ```rust
802    /// # #[cfg(feature = "deploy")] {
803    /// # use hydro_lang::prelude::*;
804    /// # use futures::StreamExt;
805    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
806    /// let workers: Cluster<()> = flow.cluster::<()>();
807    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
808    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
809    /// # all_received.entries()
810    /// # }, |mut stream| async move {
811    /// // if there are 4 members in the cluster, we should receive 4 elements
812    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
813    /// # let mut results = Vec::new();
814    /// # for w in 0..4 {
815    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
816    /// # }
817    /// # results.sort();
818    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
819    /// # }));
820    /// # }
821    /// ```
822    ///
823    /// If you don't need to know the source for each element, you can use `.values()`
824    /// to get just the data:
825    /// ```rust
826    /// # #[cfg(feature = "deploy")] {
827    /// # use hydro_lang::prelude::*;
828    /// # use hydro_lang::live_collections::stream::NoOrder;
829    /// # use futures::StreamExt;
830    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
831    /// # let workers: Cluster<()> = flow.cluster::<()>();
832    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
833    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
834    /// # values
835    /// # }, |mut stream| async move {
836    /// # let mut results = Vec::new();
837    /// # for w in 0..4 {
838    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
839    /// # }
840    /// # results.sort();
841    /// // if there are 4 members in the cluster, we should receive 4 elements
842    /// // 1, 1, 1, 1
843    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
844    /// # }));
845    /// # }
846    /// ```
847    pub fn send_bincode<L2>(
848        self,
849        other: &Process<'a, L2>,
850    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
851    where
852        T: Serialize + DeserializeOwned,
853    {
854        self.send(other, TCP.fail_stop().bincode())
855    }
856
857    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
858    /// using the configuration in `via` to set up the message transport.
859    ///
860    /// Each cluster member sends its local stream elements, and they are collected at the destination
861    /// as a [`KeyedStream`] where keys identify the source cluster member.
862    ///
863    /// # Example
864    /// ```rust
865    /// # #[cfg(feature = "deploy")] {
866    /// # use hydro_lang::prelude::*;
867    /// # use futures::StreamExt;
868    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
869    /// let workers: Cluster<()> = flow.cluster::<()>();
870    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
871    /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
872    /// # all_received.entries()
873    /// # }, |mut stream| async move {
874    /// // if there are 4 members in the cluster, we should receive 4 elements
875    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
876    /// # let mut results = Vec::new();
877    /// # for w in 0..4 {
878    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
879    /// # }
880    /// # results.sort();
881    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
882    /// # }));
883    /// # }
884    /// ```
885    ///
886    /// If you don't need to know the source for each element, you can use `.values()`
887    /// to get just the data:
888    /// ```rust
889    /// # #[cfg(feature = "deploy")] {
890    /// # use hydro_lang::prelude::*;
891    /// # use hydro_lang::live_collections::stream::NoOrder;
892    /// # use futures::StreamExt;
893    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
894    /// # let workers: Cluster<()> = flow.cluster::<()>();
895    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
896    /// let values: Stream<i32, _, _, NoOrder> =
897    ///     numbers.send(&process, TCP.fail_stop().bincode()).values();
898    /// # values
899    /// # }, |mut stream| async move {
900    /// # let mut results = Vec::new();
901    /// # for w in 0..4 {
902    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
903    /// # }
904    /// # results.sort();
905    /// // if there are 4 members in the cluster, we should receive 4 elements
906    /// // 1, 1, 1, 1
907    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
908    /// # }));
909    /// # }
910    /// ```
911    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
912    pub fn send<L2, N: NetworkFor<T>>(
913        self,
914        to: &Process<'a, L2>,
915        via: N,
916    ) -> KeyedStream<
917        MemberId<L>,
918        T,
919        Process<'a, L2>,
920        Unbounded,
921        <O as MinOrder<N::OrderingGuarantee>>::Min,
922        R,
923    >
924    where
925        T: Serialize + DeserializeOwned,
926        O: MinOrder<N::OrderingGuarantee>,
927    {
928        let serialize_pipeline = Some(N::serialize_thunk(false));
929
930        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
931
932        let name = via.name();
933        if to.multiversioned() && name.is_none() {
934            panic!(
935                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
936            );
937        }
938
939        let raw_stream: Stream<
940            (MemberId<L>, T),
941            Process<'a, L2>,
942            Unbounded,
943            <O as MinOrder<N::OrderingGuarantee>>::Min,
944            R,
945        > = Stream::new(
946            to.clone(),
947            HydroNode::Network {
948                name: name.map(ToOwned::to_owned),
949                networking_info: N::networking_info(),
950                serialize_fn: serialize_pipeline.map(|e| e.into()),
951                instantiate_fn: DebugInstantiate::Building,
952                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
953                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
954                metadata: to.new_node_metadata(Stream::<
955                    (MemberId<L>, T),
956                    Process<'a, L2>,
957                    Unbounded,
958                    <O as MinOrder<N::OrderingGuarantee>>::Min,
959                    R,
960                >::collection_kind()),
961            },
962        );
963
964        raw_stream.into_keyed()
965    }
966
967    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
968    /// Broadcasts elements of this stream at each source member to all members of a destination
969    /// cluster, using [`bincode`] to serialize/deserialize messages.
970    ///
971    /// Each source member sends each of its stream elements to **every** member of the cluster
972    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
973    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
974    /// **only data elements** and sends each element to all cluster members.
975    ///
976    /// # Non-Determinism
977    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
978    /// to the current cluster members known _at that point in time_ at the source member. Depending
979    /// on when each source member is notified of membership changes, it will broadcast each element
980    /// to different members.
981    ///
982    /// # Example
983    /// ```rust
984    /// # #[cfg(feature = "deploy")] {
985    /// # use hydro_lang::prelude::*;
986    /// # use hydro_lang::location::MemberId;
987    /// # use futures::StreamExt;
988    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
989    /// # type Source = ();
990    /// # type Destination = ();
991    /// let source: Cluster<Source> = flow.cluster::<Source>();
992    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
993    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
994    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
995    /// # on_destination.entries().send_bincode(&p2).entries()
996    /// // if there are 4 members in the desination, each receives one element from each source member
997    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
998    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
999    /// // - ...
1000    /// # }, |mut stream| async move {
1001    /// # let mut results = Vec::new();
1002    /// # for w in 0..16 {
1003    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1004    /// # }
1005    /// # results.sort();
1006    /// # assert_eq!(results, vec![
1007    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1008    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1009    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1010    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1011    /// # ]);
1012    /// # }));
1013    /// # }
1014    /// ```
1015    pub fn broadcast_bincode<L2: 'a>(
1016        self,
1017        other: &Cluster<'a, L2>,
1018        nondet_membership: NonDet,
1019    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1020    where
1021        T: Clone + Serialize + DeserializeOwned,
1022    {
1023        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
1024    }
1025
1026    /// Broadcasts elements of this stream at each source member to all members of a destination
1027    /// cluster, using the configuration in `via` to set up the message transport.
1028    ///
1029    /// Each source member sends each of its stream elements to **every** member of the cluster
1030    /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1031    /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1032    /// **only data elements** and sends each element to all cluster members.
1033    ///
1034    /// # Non-Determinism
1035    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1036    /// to the current cluster members known _at that point in time_ at the source member. Depending
1037    /// on when each source member is notified of membership changes, it will broadcast each element
1038    /// to different members.
1039    ///
1040    /// # Example
1041    /// ```rust
1042    /// # #[cfg(feature = "deploy")] {
1043    /// # use hydro_lang::prelude::*;
1044    /// # use hydro_lang::location::MemberId;
1045    /// # use futures::StreamExt;
1046    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1047    /// # type Source = ();
1048    /// # type Destination = ();
1049    /// let source: Cluster<Source> = flow.cluster::<Source>();
1050    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1051    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1052    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1053    /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1054    /// // if there are 4 members in the desination, each receives one element from each source member
1055    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1056    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1057    /// // - ...
1058    /// # }, |mut stream| async move {
1059    /// # let mut results = Vec::new();
1060    /// # for w in 0..16 {
1061    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1062    /// # }
1063    /// # results.sort();
1064    /// # assert_eq!(results, vec![
1065    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1066    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1067    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1068    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1069    /// # ]);
1070    /// # }));
1071    /// # }
1072    /// ```
1073    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1074    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1075        self,
1076        to: &Cluster<'a, L2>,
1077        via: N,
1078        nondet_membership: NonDet,
1079    ) -> KeyedStream<
1080        MemberId<L>,
1081        T,
1082        Cluster<'a, L2>,
1083        Unbounded,
1084        <O as MinOrder<N::OrderingGuarantee>>::Min,
1085        R,
1086    >
1087    where
1088        T: Clone + Serialize + DeserializeOwned,
1089        O: MinOrder<N::OrderingGuarantee>,
1090    {
1091        let ids = track_membership(self.location.source_cluster_members(to));
1092        sliced! {
1093            let members_snapshot = use(ids, nondet_membership);
1094            let elements = use(self, nondet_membership);
1095
1096            let current_members = members_snapshot.filter(q!(|b| *b));
1097            elements.repeat_with_keys(current_members)
1098        }
1099        .demux(to, via)
1100    }
1101
1102    #[cfg(feature = "sim")]
1103    /// Sends elements of this cluster stream to an external location using bincode serialization.
1104    fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
1105    where
1106        T: Serialize + DeserializeOwned,
1107    {
1108        let serialize_pipeline = Some(serialize_bincode::<T>(false));
1109
1110        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1111
1112        let external_port_id = flow_state_borrow.next_external_port();
1113
1114        flow_state_borrow.push_root(HydroRoot::SendExternal {
1115            to_external_key: other.key,
1116            to_port_id: external_port_id,
1117            to_many: false,
1118            unpaired: true,
1119            serialize_fn: serialize_pipeline.map(|e| e.into()),
1120            instantiate_fn: DebugInstantiate::Building,
1121            input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1122            op_metadata: HydroIrOpMetadata::new(),
1123        });
1124
1125        ExternalBincodeStream {
1126            process_key: other.key,
1127            port_id: external_port_id,
1128            _phantom: PhantomData,
1129        }
1130    }
1131
1132    #[cfg(feature = "sim")]
1133    /// Sets up a simulation output port for this cluster stream, allowing test code
1134    /// to receive `(member_id, T)` pairs during simulation.
1135    pub fn sim_cluster_output(self) -> crate::sim::SimClusterReceiver<T, O, R>
1136    where
1137        T: Serialize + DeserializeOwned,
1138    {
1139        let external_location: External<'a, ()> = External {
1140            key: LocationKey::FIRST,
1141            flow_state: self.location.flow_state().clone(),
1142            _phantom: PhantomData,
1143        };
1144
1145        let external = self.send_bincode_external(&external_location);
1146
1147        crate::sim::SimClusterReceiver(external.port_id, PhantomData)
1148    }
1149}
1150
1151impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1152    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1153{
1154    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1155    /// Sends elements of this stream at each source member to specific members of a destination
1156    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1157    ///
1158    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1159    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1160    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1161    /// all members.
1162    ///
1163    /// Each cluster member sends its local stream elements, and they are collected at each
1164    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1165    ///
1166    /// # Example
1167    /// ```rust
1168    /// # #[cfg(feature = "deploy")] {
1169    /// # use hydro_lang::prelude::*;
1170    /// # use futures::StreamExt;
1171    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1172    /// # type Source = ();
1173    /// # type Destination = ();
1174    /// let source: Cluster<Source> = flow.cluster::<Source>();
1175    /// let to_send: Stream<_, Cluster<_>, _> = source
1176    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1177    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1178    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1179    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1180    /// # all_received.entries().send_bincode(&p2).entries()
1181    /// # }, |mut stream| async move {
1182    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1183    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1184    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1185    /// // - ...
1186    /// # let mut results = Vec::new();
1187    /// # for w in 0..16 {
1188    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1189    /// # }
1190    /// # results.sort();
1191    /// # assert_eq!(results, vec![
1192    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1193    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1194    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1195    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1196    /// # ]);
1197    /// # }));
1198    /// # }
1199    /// ```
1200    pub fn demux_bincode(
1201        self,
1202        other: &Cluster<'a, L2>,
1203    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1204    where
1205        T: Serialize + DeserializeOwned,
1206    {
1207        self.demux(other, TCP.fail_stop().bincode())
1208    }
1209
1210    /// Sends elements of this stream at each source member to specific members of a destination
1211    /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1212    /// message transport.
1213    ///
1214    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1215    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1216    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1217    /// all members.
1218    ///
1219    /// Each cluster member sends its local stream elements, and they are collected at each
1220    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1221    ///
1222    /// # Example
1223    /// ```rust
1224    /// # #[cfg(feature = "deploy")] {
1225    /// # use hydro_lang::prelude::*;
1226    /// # use futures::StreamExt;
1227    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1228    /// # type Source = ();
1229    /// # type Destination = ();
1230    /// let source: Cluster<Source> = flow.cluster::<Source>();
1231    /// let to_send: Stream<_, Cluster<_>, _> = source
1232    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1233    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1234    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1235    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1236    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1237    /// # }, |mut stream| async move {
1238    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1239    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1240    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1241    /// // - ...
1242    /// # let mut results = Vec::new();
1243    /// # for w in 0..16 {
1244    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1245    /// # }
1246    /// # results.sort();
1247    /// # assert_eq!(results, vec![
1248    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1249    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1250    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1251    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1252    /// # ]);
1253    /// # }));
1254    /// # }
1255    /// ```
1256    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1257    pub fn demux<N: NetworkFor<T>>(
1258        self,
1259        to: &Cluster<'a, L2>,
1260        via: N,
1261    ) -> KeyedStream<
1262        MemberId<L>,
1263        T,
1264        Cluster<'a, L2>,
1265        Unbounded,
1266        <O as MinOrder<N::OrderingGuarantee>>::Min,
1267        R,
1268    >
1269    where
1270        T: Serialize + DeserializeOwned,
1271        O: MinOrder<N::OrderingGuarantee>,
1272    {
1273        self.into_keyed().demux(to, via)
1274    }
1275}
1276
1277#[cfg(test)]
1278mod tests {
1279    #[cfg(feature = "sim")]
1280    use stageleft::q;
1281
1282    #[cfg(feature = "sim")]
1283    use crate::live_collections::sliced::sliced;
1284    #[cfg(feature = "sim")]
1285    use crate::location::{Location, MemberId};
1286    #[cfg(feature = "sim")]
1287    use crate::networking::TCP;
1288    #[cfg(feature = "sim")]
1289    use crate::nondet::nondet;
1290    #[cfg(feature = "sim")]
1291    use crate::prelude::FlowBuilder;
1292
1293    #[cfg(feature = "sim")]
1294    #[test]
1295    fn sim_send_bincode_o2o() {
1296        use crate::networking::TCP;
1297
1298        let mut flow = FlowBuilder::new();
1299        let node = flow.process::<()>();
1300        let node2 = flow.process::<()>();
1301
1302        let (in_send, input) = node.sim_input();
1303
1304        let out_recv = input
1305            .send(&node2, TCP.fail_stop().bincode())
1306            .batch(&node2.tick(), nondet!(/** test */))
1307            .count()
1308            .all_ticks()
1309            .sim_output();
1310
1311        let instances = flow.sim().exhaustive(async || {
1312            in_send.send(());
1313            in_send.send(());
1314            in_send.send(());
1315
1316            let received = out_recv.collect::<Vec<_>>().await;
1317            assert!(received.into_iter().sum::<usize>() == 3);
1318        });
1319
1320        assert_eq!(instances, 4); // 2^{3 - 1}
1321    }
1322
1323    #[cfg(feature = "sim")]
1324    #[test]
1325    fn sim_send_bincode_m2o() {
1326        let mut flow = FlowBuilder::new();
1327        let cluster = flow.cluster::<()>();
1328        let node = flow.process::<()>();
1329
1330        let input = cluster.source_iter(q!(vec![1]));
1331
1332        let out_recv = input
1333            .send(&node, TCP.fail_stop().bincode())
1334            .entries()
1335            .batch(&node.tick(), nondet!(/** test */))
1336            .all_ticks()
1337            .sim_output();
1338
1339        let instances = flow
1340            .sim()
1341            .with_cluster_size(&cluster, 4)
1342            .exhaustive(async || {
1343                out_recv
1344                    .assert_yields_only_unordered(vec![
1345                        (MemberId::from_raw_id(0), 1),
1346                        (MemberId::from_raw_id(1), 1),
1347                        (MemberId::from_raw_id(2), 1),
1348                        (MemberId::from_raw_id(3), 1),
1349                    ])
1350                    .await
1351            });
1352
1353        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1354    }
1355
1356    #[cfg(feature = "sim")]
1357    #[test]
1358    fn sim_send_bincode_multiple_m2o() {
1359        let mut flow = FlowBuilder::new();
1360        let cluster1 = flow.cluster::<()>();
1361        let cluster2 = flow.cluster::<()>();
1362        let node = flow.process::<()>();
1363
1364        let out_recv_1 = cluster1
1365            .source_iter(q!(vec![1]))
1366            .send(&node, TCP.fail_stop().bincode())
1367            .entries()
1368            .sim_output();
1369
1370        let out_recv_2 = cluster2
1371            .source_iter(q!(vec![2]))
1372            .send(&node, TCP.fail_stop().bincode())
1373            .entries()
1374            .sim_output();
1375
1376        let instances = flow
1377            .sim()
1378            .with_cluster_size(&cluster1, 3)
1379            .with_cluster_size(&cluster2, 4)
1380            .exhaustive(async || {
1381                out_recv_1
1382                    .assert_yields_only_unordered(vec![
1383                        (MemberId::from_raw_id(0), 1),
1384                        (MemberId::from_raw_id(1), 1),
1385                        (MemberId::from_raw_id(2), 1),
1386                    ])
1387                    .await;
1388
1389                out_recv_2
1390                    .assert_yields_only_unordered(vec![
1391                        (MemberId::from_raw_id(0), 2),
1392                        (MemberId::from_raw_id(1), 2),
1393                        (MemberId::from_raw_id(2), 2),
1394                        (MemberId::from_raw_id(3), 2),
1395                    ])
1396                    .await;
1397            });
1398
1399        assert_eq!(instances, 1);
1400    }
1401
1402    #[cfg(feature = "sim")]
1403    #[test]
1404    fn sim_send_bincode_o2m() {
1405        let mut flow = FlowBuilder::new();
1406        let cluster = flow.cluster::<()>();
1407        let node = flow.process::<()>();
1408
1409        let input = node.source_iter(q!(vec![
1410            (MemberId::from_raw_id(0), 123),
1411            (MemberId::from_raw_id(1), 456),
1412        ]));
1413
1414        let out_recv = input
1415            .demux(&cluster, TCP.fail_stop().bincode())
1416            .map(q!(|x| x + 1))
1417            .send(&node, TCP.fail_stop().bincode())
1418            .entries()
1419            .sim_output();
1420
1421        flow.sim()
1422            .with_cluster_size(&cluster, 4)
1423            .exhaustive(async || {
1424                out_recv
1425                    .assert_yields_only_unordered(vec![
1426                        (MemberId::from_raw_id(0), 124),
1427                        (MemberId::from_raw_id(1), 457),
1428                    ])
1429                    .await
1430            });
1431    }
1432
1433    #[cfg(feature = "sim")]
1434    #[test]
1435    fn sim_broadcast_bincode_o2m() {
1436        let mut flow = FlowBuilder::new();
1437        let cluster = flow.cluster::<()>();
1438        let node = flow.process::<()>();
1439
1440        let input = node.source_iter(q!(vec![123, 456]));
1441
1442        let out_recv = input
1443            .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1444            .map(q!(|x| x + 1))
1445            .send(&node, TCP.fail_stop().bincode())
1446            .entries()
1447            .sim_output();
1448
1449        let mut c_1_produced = false;
1450        let mut c_2_produced = false;
1451        let mut c_1_saw_457_but_not_124 = false;
1452
1453        flow.sim()
1454            .with_cluster_size(&cluster, 2)
1455            .exhaustive(async || {
1456                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1457
1458                // check that order is preserved
1459                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1460                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1461                    c_1_produced = true;
1462                }
1463
1464                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1465                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1466                    c_2_produced = true;
1467                }
1468
1469                if all_out.contains(&(MemberId::from_raw_id(0), 457))
1470                    && !all_out.contains(&(MemberId::from_raw_id(0), 124))
1471                {
1472                    c_1_saw_457_but_not_124 = true;
1473                }
1474            });
1475
1476        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1477
1478        // in at least one execution, the cluster member received 457 but not 124, this tests
1479        // that the simulator properly explores dynamic membership additions (a member that joins after 123 is broadcast)
1480        assert!(c_1_saw_457_but_not_124);
1481    }
1482
1483    #[cfg(feature = "sim")]
1484    #[test]
1485    fn sim_send_bincode_m2m() {
1486        let mut flow = FlowBuilder::new();
1487        let cluster = flow.cluster::<()>();
1488        let node = flow.process::<()>();
1489
1490        let input = node.source_iter(q!(vec![
1491            (MemberId::from_raw_id(0), 123),
1492            (MemberId::from_raw_id(1), 456),
1493        ]));
1494
1495        let out_recv = input
1496            .demux(&cluster, TCP.fail_stop().bincode())
1497            .map(q!(|x| x + 1))
1498            .flat_map_ordered(q!(|x| vec![
1499                (MemberId::from_raw_id(0), x),
1500                (MemberId::from_raw_id(1), x),
1501            ]))
1502            .demux(&cluster, TCP.fail_stop().bincode())
1503            .entries()
1504            .send(&node, TCP.fail_stop().bincode())
1505            .entries()
1506            .sim_output();
1507
1508        flow.sim()
1509            .with_cluster_size(&cluster, 4)
1510            .exhaustive(async || {
1511                out_recv
1512                    .assert_yields_only_unordered(vec![
1513                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1514                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1515                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1516                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1517                    ])
1518                    .await
1519            });
1520    }
1521
1522    #[cfg(feature = "sim")]
1523    #[test]
1524    fn sim_lossy_delayed_forever_o2o() {
1525        use std::collections::HashSet;
1526
1527        use crate::properties::manual_proof;
1528
1529        let mut flow = FlowBuilder::new();
1530        let node = flow.process::<()>();
1531        let node2 = flow.process::<()>();
1532
1533        let received = node
1534            .source_iter(q!(0..3_u32))
1535            .send(&node2, TCP.lossy_delayed_forever().bincode())
1536            .fold(
1537                q!(|| std::collections::HashSet::<u32>::new()),
1538                q!(
1539                    |set, v| {
1540                        set.insert(v);
1541                    },
1542                    commutative = manual_proof!(/** set insert is commutative */)
1543                ),
1544            );
1545
1546        let out_recv = sliced! {
1547            let snapshot = use(received, nondet!(/** test */));
1548            snapshot.into_stream()
1549        }
1550        .sim_output();
1551
1552        let mut saw_non_contiguous = false;
1553
1554        flow.sim().test_safety_only().exhaustive(async || {
1555            let snapshots = out_recv.collect::<Vec<HashSet<u32>>>().await;
1556
1557            // Check each individual snapshot for a non-contiguous subset.
1558            for set in &snapshots {
1559                #[expect(clippy::disallowed_methods, reason = "min / max are deterministic")]
1560                if set.len() >= 2 && set.len() < 3 {
1561                    let min = *set.iter().min().unwrap();
1562                    let max = *set.iter().max().unwrap();
1563                    if set.len() < (max - min + 1) as usize {
1564                        saw_non_contiguous = true;
1565                    }
1566                }
1567            }
1568        });
1569
1570        assert!(
1571            saw_non_contiguous,
1572            "Expected at least one execution with a non-contiguous subset of inputs"
1573        );
1574    }
1575}