hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, MinOrder, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::DynLocation;
21use crate::location::external_process::ExternalBincodeStream;
22use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
23use crate::networking::{NetworkFor, TCP};
24use crate::nondet::NonDet;
25#[cfg(feature = "sim")]
26use crate::sim::SimReceiver;
27use crate::staging_util::get_this_crate;
28
29// same as the one in `hydro_std`, but internal use only
30fn track_membership<'a, C, L: Location<'a> + NoTick>(
31 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
32) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
33 membership.fold(
34 q!(|| false),
35 q!(|present, event| {
36 match event {
37 MembershipEvent::Joined => *present = true,
38 MembershipEvent::Left => *present = false,
39 }
40 }),
41 )
42}
43
44fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
45 let root = get_this_crate();
46
47 if is_demux {
48 parse_quote! {
49 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
50 |(id, data)| {
51 (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
52 }
53 )
54 }
55 } else {
56 parse_quote! {
57 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
58 |data| {
59 #root::runtime_support::bincode::serialize(&data).unwrap().into()
60 }
61 )
62 }
63 }
64}
65
66pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
67 serialize_bincode_with_type(is_demux, "e_type::<T>())
68}
69
70fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
71 let root = get_this_crate();
72 if let Some(c_type) = tagged {
73 parse_quote! {
74 |res| {
75 let (id, b) = res.unwrap();
76 (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
77 }
78 }
79 } else {
80 parse_quote! {
81 |res| {
82 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
83 }
84 }
85 }
86}
87
88pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
89 deserialize_bincode_with_type(tagged, "e_type::<T>())
90}
91
92impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
93 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
94 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
95 /// using [`bincode`] to serialize/deserialize messages.
96 ///
97 /// The returned stream captures the elements received at the destination, where values will
98 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
99 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
100 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
101 /// dropped no further messages will be sent.
102 ///
103 /// # Example
104 /// ```rust
105 /// # #[cfg(feature = "deploy")] {
106 /// # use hydro_lang::prelude::*;
107 /// # use futures::StreamExt;
108 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
109 /// let p1 = flow.process::<()>();
110 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
111 /// let p2 = flow.process::<()>();
112 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
113 /// // 1, 2, 3
114 /// # on_p2.send_bincode(&p_out)
115 /// # }, |mut stream| async move {
116 /// # for w in 1..=3 {
117 /// # assert_eq!(stream.next().await, Some(w));
118 /// # }
119 /// # }));
120 /// # }
121 /// ```
122 pub fn send_bincode<L2>(
123 self,
124 other: &Process<'a, L2>,
125 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
126 where
127 T: Serialize + DeserializeOwned,
128 {
129 self.send(other, TCP.fail_stop().bincode())
130 }
131
132 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
133 /// using the configuration in `via` to set up the message transport.
134 ///
135 /// The returned stream captures the elements received at the destination, where values will
136 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
137 /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
138 /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
139 /// dropped no further messages will be sent.
140 ///
141 /// # Example
142 /// ```rust
143 /// # #[cfg(feature = "deploy")] {
144 /// # use hydro_lang::prelude::*;
145 /// # use futures::StreamExt;
146 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
147 /// let p1 = flow.process::<()>();
148 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
149 /// let p2 = flow.process::<()>();
150 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
151 /// // 1, 2, 3
152 /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
153 /// # }, |mut stream| async move {
154 /// # for w in 1..=3 {
155 /// # assert_eq!(stream.next().await, Some(w));
156 /// # }
157 /// # }));
158 /// # }
159 /// ```
160 pub fn send<L2, N: NetworkFor<T>>(
161 self,
162 to: &Process<'a, L2>,
163 via: N,
164 ) -> Stream<T, Process<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
165 where
166 T: Serialize + DeserializeOwned,
167 O: MinOrder<N::OrderingGuarantee>,
168 {
169 let serialize_pipeline = Some(N::serialize_thunk(false));
170 let deserialize_pipeline = Some(N::deserialize_thunk(None));
171
172 let name = via.name();
173 if to.multiversioned() && name.is_none() {
174 panic!(
175 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
176 );
177 }
178
179 Stream::new(
180 to.clone(),
181 HydroNode::Network {
182 name: name.map(ToOwned::to_owned),
183 networking_info: N::networking_info(),
184 serialize_fn: serialize_pipeline.map(|e| e.into()),
185 instantiate_fn: DebugInstantiate::Building,
186 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
187 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
188 metadata: to.new_node_metadata(Stream::<
189 T,
190 Process<'a, L2>,
191 Unbounded,
192 <O as MinOrder<N::OrderingGuarantee>>::Min,
193 R,
194 >::collection_kind()),
195 },
196 )
197 }
198
199 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
200 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
201 /// using [`bincode`] to serialize/deserialize messages.
202 ///
203 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
204 /// membership information. This is a common pattern in distributed systems for broadcasting data to
205 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
206 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
207 /// each element to all cluster members.
208 ///
209 /// # Non-Determinism
210 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
211 /// to the current cluster members _at that point in time_. Depending on when we are notified of
212 /// membership changes, we will broadcast each element to different members.
213 ///
214 /// # Example
215 /// ```rust
216 /// # #[cfg(feature = "deploy")] {
217 /// # use hydro_lang::prelude::*;
218 /// # use futures::StreamExt;
219 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
220 /// let p1 = flow.process::<()>();
221 /// let workers: Cluster<()> = flow.cluster::<()>();
222 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
223 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
224 /// # on_worker.send_bincode(&p2).entries()
225 /// // if there are 4 members in the cluster, each receives one element
226 /// // - MemberId::<()>(0): [123]
227 /// // - MemberId::<()>(1): [123]
228 /// // - MemberId::<()>(2): [123]
229 /// // - MemberId::<()>(3): [123]
230 /// # }, |mut stream| async move {
231 /// # let mut results = Vec::new();
232 /// # for w in 0..4 {
233 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
234 /// # }
235 /// # results.sort();
236 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
237 /// # }));
238 /// # }
239 /// ```
240 pub fn broadcast_bincode<L2: 'a>(
241 self,
242 other: &Cluster<'a, L2>,
243 nondet_membership: NonDet,
244 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
245 where
246 T: Clone + Serialize + DeserializeOwned,
247 {
248 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
249 }
250
251 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
252 /// using the configuration in `via` to set up the message transport.
253 ///
254 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
255 /// membership information. This is a common pattern in distributed systems for broadcasting data to
256 /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
257 /// target specific members, `broadcast` takes a stream of **only data elements** and sends
258 /// each element to all cluster members.
259 ///
260 /// # Non-Determinism
261 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
262 /// to the current cluster members _at that point in time_. Depending on when we are notified of
263 /// membership changes, we will broadcast each element to different members.
264 ///
265 /// # Example
266 /// ```rust
267 /// # #[cfg(feature = "deploy")] {
268 /// # use hydro_lang::prelude::*;
269 /// # use futures::StreamExt;
270 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
271 /// let p1 = flow.process::<()>();
272 /// let workers: Cluster<()> = flow.cluster::<()>();
273 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
274 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
275 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
276 /// // if there are 4 members in the cluster, each receives one element
277 /// // - MemberId::<()>(0): [123]
278 /// // - MemberId::<()>(1): [123]
279 /// // - MemberId::<()>(2): [123]
280 /// // - MemberId::<()>(3): [123]
281 /// # }, |mut stream| async move {
282 /// # let mut results = Vec::new();
283 /// # for w in 0..4 {
284 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
285 /// # }
286 /// # results.sort();
287 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
288 /// # }));
289 /// # }
290 /// ```
291 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
292 self,
293 to: &Cluster<'a, L2>,
294 via: N,
295 nondet_membership: NonDet,
296 ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
297 where
298 T: Clone + Serialize + DeserializeOwned,
299 O: MinOrder<N::OrderingGuarantee>,
300 {
301 let ids = track_membership(self.location.source_cluster_members(to));
302 sliced! {
303 let members_snapshot = use(ids, nondet_membership);
304 let elements = use(self, nondet_membership);
305
306 let current_members = members_snapshot.filter(q!(|b| *b));
307 elements.repeat_with_keys(current_members)
308 }
309 .demux(to, via)
310 }
311
312 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
313 /// serialization. The external process can receive these elements by establishing a TCP
314 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
315 ///
316 /// # Example
317 /// ```rust
318 /// # #[cfg(feature = "deploy")] {
319 /// # use hydro_lang::prelude::*;
320 /// # use futures::StreamExt;
321 /// # tokio_test::block_on(async move {
322 /// let mut flow = FlowBuilder::new();
323 /// let process = flow.process::<()>();
324 /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
325 /// let external = flow.external::<()>();
326 /// let external_handle = numbers.send_bincode_external(&external);
327 ///
328 /// let mut deployment = hydro_deploy::Deployment::new();
329 /// let nodes = flow
330 /// .with_process(&process, deployment.Localhost())
331 /// .with_external(&external, deployment.Localhost())
332 /// .deploy(&mut deployment);
333 ///
334 /// deployment.deploy().await.unwrap();
335 /// // establish the TCP connection
336 /// let mut external_recv_stream = nodes.connect(external_handle).await;
337 /// deployment.start().await.unwrap();
338 ///
339 /// for w in 1..=3 {
340 /// assert_eq!(external_recv_stream.next().await, Some(w));
341 /// }
342 /// # });
343 /// # }
344 /// ```
345 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
346 where
347 T: Serialize + DeserializeOwned,
348 {
349 let serialize_pipeline = Some(serialize_bincode::<T>(false));
350
351 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
352
353 let external_port_id = flow_state_borrow.next_external_port();
354
355 flow_state_borrow.push_root(HydroRoot::SendExternal {
356 to_external_key: other.key,
357 to_port_id: external_port_id,
358 to_many: false,
359 unpaired: true,
360 serialize_fn: serialize_pipeline.map(|e| e.into()),
361 instantiate_fn: DebugInstantiate::Building,
362 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
363 op_metadata: HydroIrOpMetadata::new(),
364 });
365
366 ExternalBincodeStream {
367 process_key: other.key,
368 port_id: external_port_id,
369 _phantom: PhantomData,
370 }
371 }
372
373 #[cfg(feature = "sim")]
374 /// Sets up a simulation output port for this stream, allowing test code to receive elements
375 /// sent to this stream during simulation.
376 pub fn sim_output(self) -> SimReceiver<T, O, R>
377 where
378 T: Serialize + DeserializeOwned,
379 {
380 let external_location: External<'a, ()> = External {
381 key: LocationKey::FIRST,
382 flow_state: self.location.flow_state().clone(),
383 _phantom: PhantomData,
384 };
385
386 let external = self.send_bincode_external(&external_location);
387
388 SimReceiver(external.port_id, PhantomData)
389 }
390}
391
392impl<'a, T, L: Location<'a> + NoTick, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce> {
393 /// Creates an external output for embedded deployment mode.
394 ///
395 /// The `name` parameter specifies the name of the field in the generated
396 /// `EmbeddedOutputs` struct that will receive elements from this stream.
397 /// The generated function will accept an `EmbeddedOutputs` struct with an
398 /// `impl FnMut(T)` field with this name.
399 pub fn embedded_output(self, name: impl Into<String>) {
400 let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
401
402 self.location
403 .flow_state()
404 .borrow_mut()
405 .push_root(HydroRoot::EmbeddedOutput {
406 ident,
407 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
408 op_metadata: HydroIrOpMetadata::new(),
409 });
410 }
411}
412
413impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
414 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
415{
416 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
417 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
418 /// using [`bincode`] to serialize/deserialize messages.
419 ///
420 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
421 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
422 /// this API allows precise targeting of specific cluster members rather than broadcasting to
423 /// all members.
424 ///
425 /// # Example
426 /// ```rust
427 /// # #[cfg(feature = "deploy")] {
428 /// # use hydro_lang::prelude::*;
429 /// # use futures::StreamExt;
430 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
431 /// let p1 = flow.process::<()>();
432 /// let workers: Cluster<()> = flow.cluster::<()>();
433 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
434 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
435 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
436 /// .demux_bincode(&workers);
437 /// # on_worker.send_bincode(&p2).entries()
438 /// // if there are 4 members in the cluster, each receives one element
439 /// // - MemberId::<()>(0): [0]
440 /// // - MemberId::<()>(1): [1]
441 /// // - MemberId::<()>(2): [2]
442 /// // - MemberId::<()>(3): [3]
443 /// # }, |mut stream| async move {
444 /// # let mut results = Vec::new();
445 /// # for w in 0..4 {
446 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
447 /// # }
448 /// # results.sort();
449 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
450 /// # }));
451 /// # }
452 /// ```
453 pub fn demux_bincode(
454 self,
455 other: &Cluster<'a, L2>,
456 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
457 where
458 T: Serialize + DeserializeOwned,
459 {
460 self.demux(other, TCP.fail_stop().bincode())
461 }
462
463 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
464 /// using the configuration in `via` to set up the message transport.
465 ///
466 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
467 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
468 /// this API allows precise targeting of specific cluster members rather than broadcasting to
469 /// all members.
470 ///
471 /// # Example
472 /// ```rust
473 /// # #[cfg(feature = "deploy")] {
474 /// # use hydro_lang::prelude::*;
475 /// # use futures::StreamExt;
476 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
477 /// let p1 = flow.process::<()>();
478 /// let workers: Cluster<()> = flow.cluster::<()>();
479 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
480 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
481 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
482 /// .demux(&workers, TCP.fail_stop().bincode());
483 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
484 /// // if there are 4 members in the cluster, each receives one element
485 /// // - MemberId::<()>(0): [0]
486 /// // - MemberId::<()>(1): [1]
487 /// // - MemberId::<()>(2): [2]
488 /// // - MemberId::<()>(3): [3]
489 /// # }, |mut stream| async move {
490 /// # let mut results = Vec::new();
491 /// # for w in 0..4 {
492 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
493 /// # }
494 /// # results.sort();
495 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
496 /// # }));
497 /// # }
498 /// ```
499 pub fn demux<N: NetworkFor<T>>(
500 self,
501 to: &Cluster<'a, L2>,
502 via: N,
503 ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
504 where
505 T: Serialize + DeserializeOwned,
506 O: MinOrder<N::OrderingGuarantee>,
507 {
508 self.into_keyed().demux(to, via)
509 }
510}
511
512impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
513 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
514 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
515 /// [`bincode`] to serialize/deserialize messages.
516 ///
517 /// This provides load balancing by evenly distributing work across cluster members. The
518 /// distribution is deterministic based on element order - the first element goes to member 0,
519 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
520 ///
521 /// # Non-Determinism
522 /// The set of cluster members may asynchronously change over time. Each element is distributed
523 /// based on the current cluster membership _at that point in time_. Depending on when cluster
524 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
525 /// membership is stable, the order of members in the round-robin pattern may change across runs.
526 ///
527 /// # Ordering Requirements
528 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
529 /// order of messages and retries affects the round-robin pattern.
530 ///
531 /// # Example
532 /// ```rust
533 /// # #[cfg(feature = "deploy")] {
534 /// # use hydro_lang::prelude::*;
535 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
536 /// # use futures::StreamExt;
537 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
538 /// let p1 = flow.process::<()>();
539 /// let workers: Cluster<()> = flow.cluster::<()>();
540 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
541 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
542 /// on_worker.send_bincode(&p2)
543 /// # .first().values() // we use first to assert that each member gets one element
544 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
545 /// // - MemberId::<()>(?): [1]
546 /// // - MemberId::<()>(?): [2]
547 /// // - MemberId::<()>(?): [3]
548 /// // - MemberId::<()>(?): [4]
549 /// # }, |mut stream| async move {
550 /// # let mut results = Vec::new();
551 /// # for w in 0..4 {
552 /// # results.push(stream.next().await.unwrap());
553 /// # }
554 /// # results.sort();
555 /// # assert_eq!(results, vec![1, 2, 3, 4]);
556 /// # }));
557 /// # }
558 /// ```
559 pub fn round_robin_bincode<L2: 'a>(
560 self,
561 other: &Cluster<'a, L2>,
562 nondet_membership: NonDet,
563 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
564 where
565 T: Serialize + DeserializeOwned,
566 {
567 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
568 }
569
570 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
571 /// the configuration in `via` to set up the message transport.
572 ///
573 /// This provides load balancing by evenly distributing work across cluster members. The
574 /// distribution is deterministic based on element order - the first element goes to member 0,
575 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
576 ///
577 /// # Non-Determinism
578 /// The set of cluster members may asynchronously change over time. Each element is distributed
579 /// based on the current cluster membership _at that point in time_. Depending on when cluster
580 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
581 /// membership is stable, the order of members in the round-robin pattern may change across runs.
582 ///
583 /// # Ordering Requirements
584 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
585 /// order of messages and retries affects the round-robin pattern.
586 ///
587 /// # Example
588 /// ```rust
589 /// # #[cfg(feature = "deploy")] {
590 /// # use hydro_lang::prelude::*;
591 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
592 /// # use futures::StreamExt;
593 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
594 /// let p1 = flow.process::<()>();
595 /// let workers: Cluster<()> = flow.cluster::<()>();
596 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
597 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
598 /// on_worker.send(&p2, TCP.fail_stop().bincode())
599 /// # .first().values() // we use first to assert that each member gets one element
600 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
601 /// // - MemberId::<()>(?): [1]
602 /// // - MemberId::<()>(?): [2]
603 /// // - MemberId::<()>(?): [3]
604 /// // - MemberId::<()>(?): [4]
605 /// # }, |mut stream| async move {
606 /// # let mut results = Vec::new();
607 /// # for w in 0..4 {
608 /// # results.push(stream.next().await.unwrap());
609 /// # }
610 /// # results.sort();
611 /// # assert_eq!(results, vec![1, 2, 3, 4]);
612 /// # }));
613 /// # }
614 /// ```
615 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
616 self,
617 to: &Cluster<'a, L2>,
618 via: N,
619 nondet_membership: NonDet,
620 ) -> Stream<T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
621 where
622 T: Serialize + DeserializeOwned,
623 {
624 let ids = track_membership(self.location.source_cluster_members(to));
625 sliced! {
626 let members_snapshot = use(ids, nondet_membership);
627 let elements = use(self.enumerate(), nondet_membership);
628
629 let current_members = members_snapshot
630 .filter(q!(|b| *b))
631 .keys()
632 .assume_ordering::<TotalOrder>(nondet_membership)
633 .collect_vec();
634
635 elements
636 .cross_singleton(current_members)
637 .filter_map(q!(|(data, members)| {
638 if members.is_empty() {
639 None
640 } else {
641 Some((members[data.0 % members.len()].clone(), data.1))
642 }
643 }))
644 }
645 .demux(to, via)
646 }
647}
648
649impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
650 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
651 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
652 /// [`bincode`] to serialize/deserialize messages.
653 ///
654 /// This provides load balancing by evenly distributing work across cluster members. The
655 /// distribution is deterministic based on element order - the first element goes to member 0,
656 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
657 ///
658 /// # Non-Determinism
659 /// The set of cluster members may asynchronously change over time. Each element is distributed
660 /// based on the current cluster membership _at that point in time_. Depending on when cluster
661 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
662 /// membership is stable, the order of members in the round-robin pattern may change across runs.
663 ///
664 /// # Ordering Requirements
665 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
666 /// order of messages and retries affects the round-robin pattern.
667 ///
668 /// # Example
669 /// ```rust
670 /// # #[cfg(feature = "deploy")] {
671 /// # use hydro_lang::prelude::*;
672 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
673 /// # use hydro_lang::location::MemberId;
674 /// # use futures::StreamExt;
675 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
676 /// let p1 = flow.process::<()>();
677 /// let workers1: Cluster<()> = flow.cluster::<()>();
678 /// let workers2: Cluster<()> = flow.cluster::<()>();
679 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
680 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
681 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
682 /// on_worker2.send_bincode(&p2)
683 /// # .entries()
684 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
685 /// # }, |mut stream| async move {
686 /// # let mut results = Vec::new();
687 /// # let mut locations = std::collections::HashSet::new();
688 /// # for w in 0..=16 {
689 /// # let (location, v) = stream.next().await.unwrap();
690 /// # locations.insert(location);
691 /// # results.push(v);
692 /// # }
693 /// # results.sort();
694 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
695 /// # assert_eq!(locations.len(), 16);
696 /// # }));
697 /// # }
698 /// ```
699 pub fn round_robin_bincode<L2: 'a>(
700 self,
701 other: &Cluster<'a, L2>,
702 nondet_membership: NonDet,
703 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
704 where
705 T: Serialize + DeserializeOwned,
706 {
707 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
708 }
709
710 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
711 /// the configuration in `via` to set up the message transport.
712 ///
713 /// This provides load balancing by evenly distributing work across cluster members. The
714 /// distribution is deterministic based on element order - the first element goes to member 0,
715 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
716 ///
717 /// # Non-Determinism
718 /// The set of cluster members may asynchronously change over time. Each element is distributed
719 /// based on the current cluster membership _at that point in time_. Depending on when cluster
720 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
721 /// membership is stable, the order of members in the round-robin pattern may change across runs.
722 ///
723 /// # Ordering Requirements
724 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
725 /// order of messages and retries affects the round-robin pattern.
726 ///
727 /// # Example
728 /// ```rust
729 /// # #[cfg(feature = "deploy")] {
730 /// # use hydro_lang::prelude::*;
731 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
732 /// # use hydro_lang::location::MemberId;
733 /// # use futures::StreamExt;
734 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
735 /// let p1 = flow.process::<()>();
736 /// let workers1: Cluster<()> = flow.cluster::<()>();
737 /// let workers2: Cluster<()> = flow.cluster::<()>();
738 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
739 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
740 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
741 /// on_worker2.send(&p2, TCP.fail_stop().bincode())
742 /// # .entries()
743 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
744 /// # }, |mut stream| async move {
745 /// # let mut results = Vec::new();
746 /// # let mut locations = std::collections::HashSet::new();
747 /// # for w in 0..=16 {
748 /// # let (location, v) = stream.next().await.unwrap();
749 /// # locations.insert(location);
750 /// # results.push(v);
751 /// # }
752 /// # results.sort();
753 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
754 /// # assert_eq!(locations.len(), 16);
755 /// # }));
756 /// # }
757 /// ```
758 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
759 self,
760 to: &Cluster<'a, L2>,
761 via: N,
762 nondet_membership: NonDet,
763 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
764 where
765 T: Serialize + DeserializeOwned,
766 {
767 let ids = track_membership(self.location.source_cluster_members(to));
768 sliced! {
769 let members_snapshot = use(ids, nondet_membership);
770 let elements = use(self.enumerate(), nondet_membership);
771
772 let current_members = members_snapshot
773 .filter(q!(|b| *b))
774 .keys()
775 .assume_ordering::<TotalOrder>(nondet_membership)
776 .collect_vec();
777
778 elements
779 .cross_singleton(current_members)
780 .filter_map(q!(|(data, members)| {
781 if members.is_empty() {
782 None
783 } else {
784 Some((members[data.0 % members.len()].clone(), data.1))
785 }
786 }))
787 }
788 .demux(to, via)
789 }
790}
791
792impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
793 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
794 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
795 /// using [`bincode`] to serialize/deserialize messages.
796 ///
797 /// Each cluster member sends its local stream elements, and they are collected at the destination
798 /// as a [`KeyedStream`] where keys identify the source cluster member.
799 ///
800 /// # Example
801 /// ```rust
802 /// # #[cfg(feature = "deploy")] {
803 /// # use hydro_lang::prelude::*;
804 /// # use futures::StreamExt;
805 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
806 /// let workers: Cluster<()> = flow.cluster::<()>();
807 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
808 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
809 /// # all_received.entries()
810 /// # }, |mut stream| async move {
811 /// // if there are 4 members in the cluster, we should receive 4 elements
812 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
813 /// # let mut results = Vec::new();
814 /// # for w in 0..4 {
815 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
816 /// # }
817 /// # results.sort();
818 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
819 /// # }));
820 /// # }
821 /// ```
822 ///
823 /// If you don't need to know the source for each element, you can use `.values()`
824 /// to get just the data:
825 /// ```rust
826 /// # #[cfg(feature = "deploy")] {
827 /// # use hydro_lang::prelude::*;
828 /// # use hydro_lang::live_collections::stream::NoOrder;
829 /// # use futures::StreamExt;
830 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
831 /// # let workers: Cluster<()> = flow.cluster::<()>();
832 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
833 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
834 /// # values
835 /// # }, |mut stream| async move {
836 /// # let mut results = Vec::new();
837 /// # for w in 0..4 {
838 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
839 /// # }
840 /// # results.sort();
841 /// // if there are 4 members in the cluster, we should receive 4 elements
842 /// // 1, 1, 1, 1
843 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
844 /// # }));
845 /// # }
846 /// ```
847 pub fn send_bincode<L2>(
848 self,
849 other: &Process<'a, L2>,
850 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
851 where
852 T: Serialize + DeserializeOwned,
853 {
854 self.send(other, TCP.fail_stop().bincode())
855 }
856
857 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
858 /// using the configuration in `via` to set up the message transport.
859 ///
860 /// Each cluster member sends its local stream elements, and they are collected at the destination
861 /// as a [`KeyedStream`] where keys identify the source cluster member.
862 ///
863 /// # Example
864 /// ```rust
865 /// # #[cfg(feature = "deploy")] {
866 /// # use hydro_lang::prelude::*;
867 /// # use futures::StreamExt;
868 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
869 /// let workers: Cluster<()> = flow.cluster::<()>();
870 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
871 /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
872 /// # all_received.entries()
873 /// # }, |mut stream| async move {
874 /// // if there are 4 members in the cluster, we should receive 4 elements
875 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
876 /// # let mut results = Vec::new();
877 /// # for w in 0..4 {
878 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
879 /// # }
880 /// # results.sort();
881 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
882 /// # }));
883 /// # }
884 /// ```
885 ///
886 /// If you don't need to know the source for each element, you can use `.values()`
887 /// to get just the data:
888 /// ```rust
889 /// # #[cfg(feature = "deploy")] {
890 /// # use hydro_lang::prelude::*;
891 /// # use hydro_lang::live_collections::stream::NoOrder;
892 /// # use futures::StreamExt;
893 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
894 /// # let workers: Cluster<()> = flow.cluster::<()>();
895 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
896 /// let values: Stream<i32, _, _, NoOrder> =
897 /// numbers.send(&process, TCP.fail_stop().bincode()).values();
898 /// # values
899 /// # }, |mut stream| async move {
900 /// # let mut results = Vec::new();
901 /// # for w in 0..4 {
902 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
903 /// # }
904 /// # results.sort();
905 /// // if there are 4 members in the cluster, we should receive 4 elements
906 /// // 1, 1, 1, 1
907 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
908 /// # }));
909 /// # }
910 /// ```
911 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
912 pub fn send<L2, N: NetworkFor<T>>(
913 self,
914 to: &Process<'a, L2>,
915 via: N,
916 ) -> KeyedStream<
917 MemberId<L>,
918 T,
919 Process<'a, L2>,
920 Unbounded,
921 <O as MinOrder<N::OrderingGuarantee>>::Min,
922 R,
923 >
924 where
925 T: Serialize + DeserializeOwned,
926 O: MinOrder<N::OrderingGuarantee>,
927 {
928 let serialize_pipeline = Some(N::serialize_thunk(false));
929
930 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
931
932 let name = via.name();
933 if to.multiversioned() && name.is_none() {
934 panic!(
935 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
936 );
937 }
938
939 let raw_stream: Stream<
940 (MemberId<L>, T),
941 Process<'a, L2>,
942 Unbounded,
943 <O as MinOrder<N::OrderingGuarantee>>::Min,
944 R,
945 > = Stream::new(
946 to.clone(),
947 HydroNode::Network {
948 name: name.map(ToOwned::to_owned),
949 networking_info: N::networking_info(),
950 serialize_fn: serialize_pipeline.map(|e| e.into()),
951 instantiate_fn: DebugInstantiate::Building,
952 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
953 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
954 metadata: to.new_node_metadata(Stream::<
955 (MemberId<L>, T),
956 Process<'a, L2>,
957 Unbounded,
958 <O as MinOrder<N::OrderingGuarantee>>::Min,
959 R,
960 >::collection_kind()),
961 },
962 );
963
964 raw_stream.into_keyed()
965 }
966
967 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
968 /// Broadcasts elements of this stream at each source member to all members of a destination
969 /// cluster, using [`bincode`] to serialize/deserialize messages.
970 ///
971 /// Each source member sends each of its stream elements to **every** member of the cluster
972 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
973 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
974 /// **only data elements** and sends each element to all cluster members.
975 ///
976 /// # Non-Determinism
977 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
978 /// to the current cluster members known _at that point in time_ at the source member. Depending
979 /// on when each source member is notified of membership changes, it will broadcast each element
980 /// to different members.
981 ///
982 /// # Example
983 /// ```rust
984 /// # #[cfg(feature = "deploy")] {
985 /// # use hydro_lang::prelude::*;
986 /// # use hydro_lang::location::MemberId;
987 /// # use futures::StreamExt;
988 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
989 /// # type Source = ();
990 /// # type Destination = ();
991 /// let source: Cluster<Source> = flow.cluster::<Source>();
992 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
993 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
994 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
995 /// # on_destination.entries().send_bincode(&p2).entries()
996 /// // if there are 4 members in the desination, each receives one element from each source member
997 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
998 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
999 /// // - ...
1000 /// # }, |mut stream| async move {
1001 /// # let mut results = Vec::new();
1002 /// # for w in 0..16 {
1003 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1004 /// # }
1005 /// # results.sort();
1006 /// # assert_eq!(results, vec![
1007 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1008 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1009 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1010 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1011 /// # ]);
1012 /// # }));
1013 /// # }
1014 /// ```
1015 pub fn broadcast_bincode<L2: 'a>(
1016 self,
1017 other: &Cluster<'a, L2>,
1018 nondet_membership: NonDet,
1019 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1020 where
1021 T: Clone + Serialize + DeserializeOwned,
1022 {
1023 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
1024 }
1025
1026 /// Broadcasts elements of this stream at each source member to all members of a destination
1027 /// cluster, using the configuration in `via` to set up the message transport.
1028 ///
1029 /// Each source member sends each of its stream elements to **every** member of the cluster
1030 /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1031 /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1032 /// **only data elements** and sends each element to all cluster members.
1033 ///
1034 /// # Non-Determinism
1035 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1036 /// to the current cluster members known _at that point in time_ at the source member. Depending
1037 /// on when each source member is notified of membership changes, it will broadcast each element
1038 /// to different members.
1039 ///
1040 /// # Example
1041 /// ```rust
1042 /// # #[cfg(feature = "deploy")] {
1043 /// # use hydro_lang::prelude::*;
1044 /// # use hydro_lang::location::MemberId;
1045 /// # use futures::StreamExt;
1046 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1047 /// # type Source = ();
1048 /// # type Destination = ();
1049 /// let source: Cluster<Source> = flow.cluster::<Source>();
1050 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1051 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1052 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1053 /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1054 /// // if there are 4 members in the desination, each receives one element from each source member
1055 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1056 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1057 /// // - ...
1058 /// # }, |mut stream| async move {
1059 /// # let mut results = Vec::new();
1060 /// # for w in 0..16 {
1061 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1062 /// # }
1063 /// # results.sort();
1064 /// # assert_eq!(results, vec![
1065 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1066 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1067 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1068 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1069 /// # ]);
1070 /// # }));
1071 /// # }
1072 /// ```
1073 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1074 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1075 self,
1076 to: &Cluster<'a, L2>,
1077 via: N,
1078 nondet_membership: NonDet,
1079 ) -> KeyedStream<
1080 MemberId<L>,
1081 T,
1082 Cluster<'a, L2>,
1083 Unbounded,
1084 <O as MinOrder<N::OrderingGuarantee>>::Min,
1085 R,
1086 >
1087 where
1088 T: Clone + Serialize + DeserializeOwned,
1089 O: MinOrder<N::OrderingGuarantee>,
1090 {
1091 let ids = track_membership(self.location.source_cluster_members(to));
1092 sliced! {
1093 let members_snapshot = use(ids, nondet_membership);
1094 let elements = use(self, nondet_membership);
1095
1096 let current_members = members_snapshot.filter(q!(|b| *b));
1097 elements.repeat_with_keys(current_members)
1098 }
1099 .demux(to, via)
1100 }
1101
1102 #[cfg(feature = "sim")]
1103 /// Sends elements of this cluster stream to an external location using bincode serialization.
1104 fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
1105 where
1106 T: Serialize + DeserializeOwned,
1107 {
1108 let serialize_pipeline = Some(serialize_bincode::<T>(false));
1109
1110 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1111
1112 let external_port_id = flow_state_borrow.next_external_port();
1113
1114 flow_state_borrow.push_root(HydroRoot::SendExternal {
1115 to_external_key: other.key,
1116 to_port_id: external_port_id,
1117 to_many: false,
1118 unpaired: true,
1119 serialize_fn: serialize_pipeline.map(|e| e.into()),
1120 instantiate_fn: DebugInstantiate::Building,
1121 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1122 op_metadata: HydroIrOpMetadata::new(),
1123 });
1124
1125 ExternalBincodeStream {
1126 process_key: other.key,
1127 port_id: external_port_id,
1128 _phantom: PhantomData,
1129 }
1130 }
1131
1132 #[cfg(feature = "sim")]
1133 /// Sets up a simulation output port for this cluster stream, allowing test code
1134 /// to receive `(member_id, T)` pairs during simulation.
1135 pub fn sim_cluster_output(self) -> crate::sim::SimClusterReceiver<T, O, R>
1136 where
1137 T: Serialize + DeserializeOwned,
1138 {
1139 let external_location: External<'a, ()> = External {
1140 key: LocationKey::FIRST,
1141 flow_state: self.location.flow_state().clone(),
1142 _phantom: PhantomData,
1143 };
1144
1145 let external = self.send_bincode_external(&external_location);
1146
1147 crate::sim::SimClusterReceiver(external.port_id, PhantomData)
1148 }
1149}
1150
1151impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1152 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1153{
1154 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1155 /// Sends elements of this stream at each source member to specific members of a destination
1156 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1157 ///
1158 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1159 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1160 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1161 /// all members.
1162 ///
1163 /// Each cluster member sends its local stream elements, and they are collected at each
1164 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1165 ///
1166 /// # Example
1167 /// ```rust
1168 /// # #[cfg(feature = "deploy")] {
1169 /// # use hydro_lang::prelude::*;
1170 /// # use futures::StreamExt;
1171 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1172 /// # type Source = ();
1173 /// # type Destination = ();
1174 /// let source: Cluster<Source> = flow.cluster::<Source>();
1175 /// let to_send: Stream<_, Cluster<_>, _> = source
1176 /// .source_iter(q!(vec![0, 1, 2, 3]))
1177 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1178 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1179 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1180 /// # all_received.entries().send_bincode(&p2).entries()
1181 /// # }, |mut stream| async move {
1182 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1183 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1184 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1185 /// // - ...
1186 /// # let mut results = Vec::new();
1187 /// # for w in 0..16 {
1188 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1189 /// # }
1190 /// # results.sort();
1191 /// # assert_eq!(results, vec![
1192 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1193 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1194 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1195 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1196 /// # ]);
1197 /// # }));
1198 /// # }
1199 /// ```
1200 pub fn demux_bincode(
1201 self,
1202 other: &Cluster<'a, L2>,
1203 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1204 where
1205 T: Serialize + DeserializeOwned,
1206 {
1207 self.demux(other, TCP.fail_stop().bincode())
1208 }
1209
1210 /// Sends elements of this stream at each source member to specific members of a destination
1211 /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1212 /// message transport.
1213 ///
1214 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1215 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1216 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1217 /// all members.
1218 ///
1219 /// Each cluster member sends its local stream elements, and they are collected at each
1220 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1221 ///
1222 /// # Example
1223 /// ```rust
1224 /// # #[cfg(feature = "deploy")] {
1225 /// # use hydro_lang::prelude::*;
1226 /// # use futures::StreamExt;
1227 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1228 /// # type Source = ();
1229 /// # type Destination = ();
1230 /// let source: Cluster<Source> = flow.cluster::<Source>();
1231 /// let to_send: Stream<_, Cluster<_>, _> = source
1232 /// .source_iter(q!(vec![0, 1, 2, 3]))
1233 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1234 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1235 /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1236 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1237 /// # }, |mut stream| async move {
1238 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1239 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1240 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1241 /// // - ...
1242 /// # let mut results = Vec::new();
1243 /// # for w in 0..16 {
1244 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1245 /// # }
1246 /// # results.sort();
1247 /// # assert_eq!(results, vec![
1248 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1249 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1250 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1251 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1252 /// # ]);
1253 /// # }));
1254 /// # }
1255 /// ```
1256 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1257 pub fn demux<N: NetworkFor<T>>(
1258 self,
1259 to: &Cluster<'a, L2>,
1260 via: N,
1261 ) -> KeyedStream<
1262 MemberId<L>,
1263 T,
1264 Cluster<'a, L2>,
1265 Unbounded,
1266 <O as MinOrder<N::OrderingGuarantee>>::Min,
1267 R,
1268 >
1269 where
1270 T: Serialize + DeserializeOwned,
1271 O: MinOrder<N::OrderingGuarantee>,
1272 {
1273 self.into_keyed().demux(to, via)
1274 }
1275}
1276
1277#[cfg(test)]
1278mod tests {
1279 #[cfg(feature = "sim")]
1280 use stageleft::q;
1281
1282 #[cfg(feature = "sim")]
1283 use crate::live_collections::sliced::sliced;
1284 #[cfg(feature = "sim")]
1285 use crate::location::{Location, MemberId};
1286 #[cfg(feature = "sim")]
1287 use crate::networking::TCP;
1288 #[cfg(feature = "sim")]
1289 use crate::nondet::nondet;
1290 #[cfg(feature = "sim")]
1291 use crate::prelude::FlowBuilder;
1292
1293 #[cfg(feature = "sim")]
1294 #[test]
1295 fn sim_send_bincode_o2o() {
1296 use crate::networking::TCP;
1297
1298 let mut flow = FlowBuilder::new();
1299 let node = flow.process::<()>();
1300 let node2 = flow.process::<()>();
1301
1302 let (in_send, input) = node.sim_input();
1303
1304 let out_recv = input
1305 .send(&node2, TCP.fail_stop().bincode())
1306 .batch(&node2.tick(), nondet!(/** test */))
1307 .count()
1308 .all_ticks()
1309 .sim_output();
1310
1311 let instances = flow.sim().exhaustive(async || {
1312 in_send.send(());
1313 in_send.send(());
1314 in_send.send(());
1315
1316 let received = out_recv.collect::<Vec<_>>().await;
1317 assert!(received.into_iter().sum::<usize>() == 3);
1318 });
1319
1320 assert_eq!(instances, 4); // 2^{3 - 1}
1321 }
1322
1323 #[cfg(feature = "sim")]
1324 #[test]
1325 fn sim_send_bincode_m2o() {
1326 let mut flow = FlowBuilder::new();
1327 let cluster = flow.cluster::<()>();
1328 let node = flow.process::<()>();
1329
1330 let input = cluster.source_iter(q!(vec![1]));
1331
1332 let out_recv = input
1333 .send(&node, TCP.fail_stop().bincode())
1334 .entries()
1335 .batch(&node.tick(), nondet!(/** test */))
1336 .all_ticks()
1337 .sim_output();
1338
1339 let instances = flow
1340 .sim()
1341 .with_cluster_size(&cluster, 4)
1342 .exhaustive(async || {
1343 out_recv
1344 .assert_yields_only_unordered(vec![
1345 (MemberId::from_raw_id(0), 1),
1346 (MemberId::from_raw_id(1), 1),
1347 (MemberId::from_raw_id(2), 1),
1348 (MemberId::from_raw_id(3), 1),
1349 ])
1350 .await
1351 });
1352
1353 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1354 }
1355
1356 #[cfg(feature = "sim")]
1357 #[test]
1358 fn sim_send_bincode_multiple_m2o() {
1359 let mut flow = FlowBuilder::new();
1360 let cluster1 = flow.cluster::<()>();
1361 let cluster2 = flow.cluster::<()>();
1362 let node = flow.process::<()>();
1363
1364 let out_recv_1 = cluster1
1365 .source_iter(q!(vec![1]))
1366 .send(&node, TCP.fail_stop().bincode())
1367 .entries()
1368 .sim_output();
1369
1370 let out_recv_2 = cluster2
1371 .source_iter(q!(vec![2]))
1372 .send(&node, TCP.fail_stop().bincode())
1373 .entries()
1374 .sim_output();
1375
1376 let instances = flow
1377 .sim()
1378 .with_cluster_size(&cluster1, 3)
1379 .with_cluster_size(&cluster2, 4)
1380 .exhaustive(async || {
1381 out_recv_1
1382 .assert_yields_only_unordered(vec![
1383 (MemberId::from_raw_id(0), 1),
1384 (MemberId::from_raw_id(1), 1),
1385 (MemberId::from_raw_id(2), 1),
1386 ])
1387 .await;
1388
1389 out_recv_2
1390 .assert_yields_only_unordered(vec![
1391 (MemberId::from_raw_id(0), 2),
1392 (MemberId::from_raw_id(1), 2),
1393 (MemberId::from_raw_id(2), 2),
1394 (MemberId::from_raw_id(3), 2),
1395 ])
1396 .await;
1397 });
1398
1399 assert_eq!(instances, 1);
1400 }
1401
1402 #[cfg(feature = "sim")]
1403 #[test]
1404 fn sim_send_bincode_o2m() {
1405 let mut flow = FlowBuilder::new();
1406 let cluster = flow.cluster::<()>();
1407 let node = flow.process::<()>();
1408
1409 let input = node.source_iter(q!(vec![
1410 (MemberId::from_raw_id(0), 123),
1411 (MemberId::from_raw_id(1), 456),
1412 ]));
1413
1414 let out_recv = input
1415 .demux(&cluster, TCP.fail_stop().bincode())
1416 .map(q!(|x| x + 1))
1417 .send(&node, TCP.fail_stop().bincode())
1418 .entries()
1419 .sim_output();
1420
1421 flow.sim()
1422 .with_cluster_size(&cluster, 4)
1423 .exhaustive(async || {
1424 out_recv
1425 .assert_yields_only_unordered(vec![
1426 (MemberId::from_raw_id(0), 124),
1427 (MemberId::from_raw_id(1), 457),
1428 ])
1429 .await
1430 });
1431 }
1432
1433 #[cfg(feature = "sim")]
1434 #[test]
1435 fn sim_broadcast_bincode_o2m() {
1436 let mut flow = FlowBuilder::new();
1437 let cluster = flow.cluster::<()>();
1438 let node = flow.process::<()>();
1439
1440 let input = node.source_iter(q!(vec![123, 456]));
1441
1442 let out_recv = input
1443 .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1444 .map(q!(|x| x + 1))
1445 .send(&node, TCP.fail_stop().bincode())
1446 .entries()
1447 .sim_output();
1448
1449 let mut c_1_produced = false;
1450 let mut c_2_produced = false;
1451 let mut c_1_saw_457_but_not_124 = false;
1452
1453 flow.sim()
1454 .with_cluster_size(&cluster, 2)
1455 .exhaustive(async || {
1456 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1457
1458 // check that order is preserved
1459 if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1460 assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1461 c_1_produced = true;
1462 }
1463
1464 if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1465 assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1466 c_2_produced = true;
1467 }
1468
1469 if all_out.contains(&(MemberId::from_raw_id(0), 457))
1470 && !all_out.contains(&(MemberId::from_raw_id(0), 124))
1471 {
1472 c_1_saw_457_but_not_124 = true;
1473 }
1474 });
1475
1476 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1477
1478 // in at least one execution, the cluster member received 457 but not 124, this tests
1479 // that the simulator properly explores dynamic membership additions (a member that joins after 123 is broadcast)
1480 assert!(c_1_saw_457_but_not_124);
1481 }
1482
1483 #[cfg(feature = "sim")]
1484 #[test]
1485 fn sim_send_bincode_m2m() {
1486 let mut flow = FlowBuilder::new();
1487 let cluster = flow.cluster::<()>();
1488 let node = flow.process::<()>();
1489
1490 let input = node.source_iter(q!(vec![
1491 (MemberId::from_raw_id(0), 123),
1492 (MemberId::from_raw_id(1), 456),
1493 ]));
1494
1495 let out_recv = input
1496 .demux(&cluster, TCP.fail_stop().bincode())
1497 .map(q!(|x| x + 1))
1498 .flat_map_ordered(q!(|x| vec![
1499 (MemberId::from_raw_id(0), x),
1500 (MemberId::from_raw_id(1), x),
1501 ]))
1502 .demux(&cluster, TCP.fail_stop().bincode())
1503 .entries()
1504 .send(&node, TCP.fail_stop().bincode())
1505 .entries()
1506 .sim_output();
1507
1508 flow.sim()
1509 .with_cluster_size(&cluster, 4)
1510 .exhaustive(async || {
1511 out_recv
1512 .assert_yields_only_unordered(vec![
1513 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1514 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1515 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1516 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1517 ])
1518 .await
1519 });
1520 }
1521
1522 #[cfg(feature = "sim")]
1523 #[test]
1524 fn sim_lossy_delayed_forever_o2o() {
1525 use std::collections::HashSet;
1526
1527 use crate::properties::manual_proof;
1528
1529 let mut flow = FlowBuilder::new();
1530 let node = flow.process::<()>();
1531 let node2 = flow.process::<()>();
1532
1533 let received = node
1534 .source_iter(q!(0..3_u32))
1535 .send(&node2, TCP.lossy_delayed_forever().bincode())
1536 .fold(
1537 q!(|| std::collections::HashSet::<u32>::new()),
1538 q!(
1539 |set, v| {
1540 set.insert(v);
1541 },
1542 commutative = manual_proof!(/** set insert is commutative */)
1543 ),
1544 );
1545
1546 let out_recv = sliced! {
1547 let snapshot = use(received, nondet!(/** test */));
1548 snapshot.into_stream()
1549 }
1550 .sim_output();
1551
1552 let mut saw_non_contiguous = false;
1553
1554 flow.sim().test_safety_only().exhaustive(async || {
1555 let snapshots = out_recv.collect::<Vec<HashSet<u32>>>().await;
1556
1557 // Check each individual snapshot for a non-contiguous subset.
1558 for set in &snapshots {
1559 #[expect(clippy::disallowed_methods, reason = "min / max are deterministic")]
1560 if set.len() >= 2 && set.len() < 3 {
1561 let min = *set.iter().min().unwrap();
1562 let max = *set.iter().max().unwrap();
1563 if set.len() < (max - min + 1) as usize {
1564 saw_non_contiguous = true;
1565 }
1566 }
1567 }
1568 });
1569
1570 assert!(
1571 saw_non_contiguous,
1572 "Expected at least one execution with a non-contiguous subset of inputs"
1573 );
1574 }
1575}