1use std::fmt::{Debug, Formatter};
13use std::marker::PhantomData;
14
15use proc_macro2::Span;
16use quote::quote;
17use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
18use stageleft::{QuotedWithContextWithProps, quote_type};
19
20use super::dynamic::LocationId;
21use super::{Location, MemberId};
22use crate::compile::builder::FlowState;
23use crate::location::LocationKey;
24use crate::location::member_id::TaglessMemberId;
25use crate::staging_util::{Invariant, get_this_crate};
26
27pub struct Cluster<'a, ClusterTag> {
37 pub(crate) key: LocationKey,
38 pub(crate) flow_state: FlowState,
39 pub(crate) _phantom: Invariant<'a, ClusterTag>,
40}
41
42impl<C> Debug for Cluster<'_, C> {
43 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44 write!(f, "Cluster({})", self.key)
45 }
46}
47
48impl<C> Eq for Cluster<'_, C> {}
49impl<C> PartialEq for Cluster<'_, C> {
50 fn eq(&self, other: &Self) -> bool {
51 self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
52 }
53}
54
55impl<C> Clone for Cluster<'_, C> {
56 fn clone(&self) -> Self {
57 Cluster {
58 key: self.key,
59 flow_state: self.flow_state.clone(),
60 _phantom: PhantomData,
61 }
62 }
63}
64
65impl<'a, C> super::dynamic::DynLocation for Cluster<'a, C> {
66 fn id(&self) -> LocationId {
67 LocationId::Cluster(self.key)
68 }
69
70 fn flow_state(&self) -> &FlowState {
71 &self.flow_state
72 }
73
74 fn is_top_level() -> bool {
75 true
76 }
77
78 fn multiversioned(&self) -> bool {
79 false }
81}
82
83impl<'a, C> Location<'a> for Cluster<'a, C> {
84 type Root = Cluster<'a, C>;
85
86 fn root(&self) -> Self::Root {
87 self.clone()
88 }
89}
90
91#[cfg(feature = "sim")]
92impl<'a, C> Cluster<'a, C> {
93 #[expect(clippy::type_complexity, reason = "stream markers")]
98 pub fn sim_input<T>(
99 &self,
100 ) -> (
101 crate::sim::SimClusterSender<
102 T,
103 crate::live_collections::stream::TotalOrder,
104 crate::live_collections::stream::ExactlyOnce,
105 >,
106 crate::live_collections::Stream<
107 T,
108 Self,
109 crate::live_collections::boundedness::Unbounded,
110 crate::live_collections::stream::TotalOrder,
111 crate::live_collections::stream::ExactlyOnce,
112 >,
113 )
114 where
115 T: serde::Serialize + serde::de::DeserializeOwned,
116 {
117 use crate::location::Location;
118
119 let external_location: crate::location::External<'a, ()> = crate::location::External {
120 key: LocationKey::FIRST,
121 flow_state: self.flow_state.clone(),
122 _phantom: PhantomData,
123 };
124
125 let (external, stream) = self.source_external_bincode(&external_location);
126
127 (
128 crate::sim::SimClusterSender(external.port_id, PhantomData),
129 stream,
130 )
131 }
132}
133
134pub struct ClusterIds<'a> {
139 pub key: LocationKey,
141 pub _phantom: PhantomData<&'a ()>,
143}
144
145impl<'a> Clone for ClusterIds<'a> {
146 fn clone(&self) -> Self {
147 Self {
148 key: self.key,
149 _phantom: Default::default(),
150 }
151 }
152}
153
154impl<'a, Ctx> FreeVariableWithContextWithProps<Ctx, ()> for ClusterIds<'a> {
155 type O = &'a [TaglessMemberId];
156
157 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
158 where
159 Self: Sized,
160 {
161 let ident = syn::Ident::new(
162 &format!("__hydro_lang_cluster_ids_{}", self.key),
163 Span::call_site(),
164 );
165
166 (
167 QuoteTokens {
168 prelude: None,
169 expr: Some(quote! { #ident }),
170 },
171 (),
172 )
173 }
174}
175
176impl<'a, Ctx> QuotedWithContextWithProps<'a, &'a [TaglessMemberId], Ctx, ()> for ClusterIds<'a> {}
177
178pub trait IsCluster {
180 type Tag;
182}
183
184impl<C> IsCluster for Cluster<'_, C> {
185 type Tag = C;
186}
187
188pub static CLUSTER_SELF_ID: ClusterSelfId = ClusterSelfId { _private: &() };
191
192#[derive(Clone, Copy)]
197pub struct ClusterSelfId<'a> {
198 _private: &'a (),
199}
200
201impl<'a, L> FreeVariableWithContextWithProps<L, ()> for ClusterSelfId<'a>
202where
203 L: Location<'a>,
204 <L as Location<'a>>::Root: IsCluster,
205{
206 type O = MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>;
207
208 fn to_tokens(self, ctx: &L) -> (QuoteTokens, ())
209 where
210 Self: Sized,
211 {
212 let LocationId::Cluster(cluster_id) = ctx.root().id() else {
213 unreachable!()
214 };
215
216 let ident = syn::Ident::new(
217 &format!("__hydro_lang_cluster_self_id_{}", cluster_id),
218 Span::call_site(),
219 );
220 let root = get_this_crate();
221 let c_type: syn::Type = quote_type::<<<L as Location<'a>>::Root as IsCluster>::Tag>();
222
223 (
224 QuoteTokens {
225 prelude: None,
226 expr: Some(
227 quote! { #root::__staged::location::MemberId::<#c_type>::from_tagless((#ident).clone()) },
228 ),
229 },
230 (),
231 )
232 }
233}
234
235impl<'a, L>
236 QuotedWithContextWithProps<'a, MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>, L, ()>
237 for ClusterSelfId<'a>
238where
239 L: Location<'a>,
240 <L as Location<'a>>::Root: IsCluster,
241{
242}
243
244#[cfg(test)]
245mod tests {
246 #[cfg(feature = "sim")]
247 use stageleft::q;
248
249 #[cfg(feature = "sim")]
250 use super::CLUSTER_SELF_ID;
251 #[cfg(feature = "sim")]
252 use crate::location::{Location, MemberId, MembershipEvent};
253 #[cfg(feature = "sim")]
254 use crate::networking::TCP;
255 #[cfg(feature = "sim")]
256 use crate::nondet::nondet;
257 #[cfg(feature = "sim")]
258 use crate::prelude::FlowBuilder;
259
260 #[cfg(feature = "sim")]
261 #[test]
262 fn sim_cluster_self_id() {
263 let mut flow = FlowBuilder::new();
264 let cluster1 = flow.cluster::<()>();
265 let cluster2 = flow.cluster::<()>();
266
267 let node = flow.process::<()>();
268
269 let out_recv = cluster1
270 .source_iter(q!(vec![CLUSTER_SELF_ID]))
271 .send(&node, TCP.fail_stop().bincode())
272 .values()
273 .merge_unordered(
274 cluster2
275 .source_iter(q!(vec![CLUSTER_SELF_ID]))
276 .send(&node, TCP.fail_stop().bincode())
277 .values(),
278 )
279 .sim_output();
280
281 flow.sim()
282 .with_cluster_size(&cluster1, 3)
283 .with_cluster_size(&cluster2, 4)
284 .exhaustive(async || {
285 out_recv
286 .assert_yields_only_unordered([0, 1, 2, 0, 1, 2, 3].map(MemberId::from_raw_id))
287 .await
288 });
289 }
290
291 #[cfg(feature = "sim")]
292 #[test]
293 fn sim_cluster_with_tick() {
294 use std::collections::HashMap;
295
296 let mut flow = FlowBuilder::new();
297 let cluster = flow.cluster::<()>();
298 let node = flow.process::<()>();
299
300 let out_recv = cluster
301 .source_iter(q!(vec![1, 2, 3]))
302 .batch(&cluster.tick(), nondet!())
303 .count()
304 .all_ticks()
305 .send(&node, TCP.fail_stop().bincode())
306 .entries()
307 .map(q!(|(id, v)| (id, v)))
308 .sim_output();
309
310 let count = flow
311 .sim()
312 .with_cluster_size(&cluster, 2)
313 .exhaustive(async || {
314 let grouped = out_recv.collect_sorted::<Vec<_>>().await.into_iter().fold(
315 HashMap::new(),
316 |mut acc: HashMap<MemberId<()>, usize>, (id, v)| {
317 *acc.entry(id).or_default() += v;
318 acc
319 },
320 );
321
322 assert!(grouped.len() == 2);
323 for (_id, v) in grouped {
324 assert!(v == 3);
325 }
326 });
327
328 assert_eq!(count, 106);
329 }
333
334 #[cfg(feature = "sim")]
335 #[test]
336 fn sim_cluster_membership() {
337 let mut flow = FlowBuilder::new();
338 let cluster = flow.cluster::<()>();
339 let node = flow.process::<()>();
340
341 let out_recv = node
342 .source_cluster_members(&cluster)
343 .entries()
344 .map(q!(|(id, v)| (id, v)))
345 .sim_output();
346
347 flow.sim()
348 .with_cluster_size(&cluster, 2)
349 .exhaustive(async || {
350 out_recv
351 .assert_yields_only_unordered(vec![
352 (MemberId::from_raw_id(0), MembershipEvent::Joined),
353 (MemberId::from_raw_id(1), MembershipEvent::Joined),
354 ])
355 .await;
356 });
357 }
358}