1use std::collections::{HashMap, HashSet};
2use std::io::Error;
3use std::marker::PhantomData;
4use std::pin::Pin;
5
6use bytes::{Bytes, BytesMut};
7use futures::{Sink, Stream};
8use proc_macro2::Span;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use slotmap::{SecondaryMap, SlotMap, SparseSecondaryMap};
12use stageleft::QuotedWithContext;
13
14use super::built::build_inner;
15use super::compiled::CompiledFlow;
16use super::deploy_provider::{
17 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
18};
19use super::ir::HydroRoot;
20use crate::live_collections::stream::{Ordering, Retries};
21use crate::location::dynamic::LocationId;
22use crate::location::external_process::{
23 ExternalBincodeBidi, ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
24};
25use crate::location::{Cluster, External, Location, LocationKey, LocationType, Process};
26use crate::staging_util::Invariant;
27use crate::telemetry::Sidecar;
28
29pub struct DeployFlow<'a, D>
30where
31 D: Deploy<'a>,
32{
33 pub(super) ir: Vec<HydroRoot>,
34
35 pub(super) locations: SlotMap<LocationKey, LocationType>,
36 pub(super) location_names: SecondaryMap<LocationKey, String>,
37
38 pub(super) processes: SparseSecondaryMap<LocationKey, D::Process>,
40 pub(super) clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
41 pub(super) externals: SparseSecondaryMap<LocationKey, D::External>,
42
43 pub(super) sidecars: Vec<super::builder::Sidecar>,
45
46 pub(super) flow_name: String,
48
49 pub(super) _phantom: Invariant<'a, D>,
50}
51
52impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
53 pub fn ir(&self) -> &Vec<HydroRoot> {
54 &self.ir
55 }
56
57 pub fn flow_name(&self) -> &str {
59 &self.flow_name
60 }
61
62 pub fn with_process<P>(
63 mut self,
64 process: &Process<P>,
65 spec: impl IntoProcessSpec<'a, D>,
66 ) -> Self {
67 self.processes.insert(
68 process.key,
69 spec.into_process_spec()
70 .build(process.key, &self.location_names[process.key]),
71 );
72 self
73 }
74
75 #[doc(hidden)]
77 pub fn with_process_erased(
78 mut self,
79 process_loc_key: LocationKey,
80 spec: impl IntoProcessSpec<'a, D>,
81 ) -> Self {
82 assert_eq!(
83 Some(&LocationType::Process),
84 self.locations.get(process_loc_key),
85 "No process with the given `LocationKey` was found."
86 );
87 self.processes.insert(
88 process_loc_key,
89 spec.into_process_spec()
90 .build(process_loc_key, &self.location_names[process_loc_key]),
91 );
92 self
93 }
94
95 pub fn with_remaining_processes<S: IntoProcessSpec<'a, D> + 'a>(
96 mut self,
97 spec: impl Fn() -> S,
98 ) -> Self {
99 for (location_key, &location_type) in self.locations.iter() {
100 if LocationType::Process == location_type {
101 self.processes
102 .entry(location_key)
103 .expect("location was removed")
104 .or_insert_with(|| {
105 spec()
106 .into_process_spec()
107 .build(location_key, &self.location_names[location_key])
108 });
109 }
110 }
111 self
112 }
113
114 pub fn with_cluster<C>(mut self, cluster: &Cluster<C>, spec: impl ClusterSpec<'a, D>) -> Self {
115 self.clusters.insert(
116 cluster.key,
117 spec.build(cluster.key, &self.location_names[cluster.key]),
118 );
119 self
120 }
121
122 #[doc(hidden)]
124 pub fn with_cluster_erased(
125 mut self,
126 cluster_loc_key: LocationKey,
127 spec: impl ClusterSpec<'a, D>,
128 ) -> Self {
129 assert_eq!(
130 Some(&LocationType::Cluster),
131 self.locations.get(cluster_loc_key),
132 "No cluster with the given `LocationKey` was found."
133 );
134 self.clusters.insert(
135 cluster_loc_key,
136 spec.build(cluster_loc_key, &self.location_names[cluster_loc_key]),
137 );
138 self
139 }
140
141 pub fn with_remaining_clusters<S: ClusterSpec<'a, D> + 'a>(
142 mut self,
143 spec: impl Fn() -> S,
144 ) -> Self {
145 for (location_key, &location_type) in self.locations.iter() {
146 if LocationType::Cluster == location_type {
147 self.clusters
148 .entry(location_key)
149 .expect("location was removed")
150 .or_insert_with(|| {
151 spec().build(location_key, &self.location_names[location_key])
152 });
153 }
154 }
155 self
156 }
157
158 pub fn with_external<P>(
159 mut self,
160 external: &External<P>,
161 spec: impl ExternalSpec<'a, D>,
162 ) -> Self {
163 self.externals.insert(
164 external.key,
165 spec.build(external.key, &self.location_names[external.key]),
166 );
167 self
168 }
169
170 pub fn with_remaining_externals<S: ExternalSpec<'a, D> + 'a>(
171 mut self,
172 spec: impl Fn() -> S,
173 ) -> Self {
174 for (location_key, &location_type) in self.locations.iter() {
175 if LocationType::External == location_type {
176 self.externals
177 .entry(location_key)
178 .expect("location was removed")
179 .or_insert_with(|| {
180 spec().build(location_key, &self.location_names[location_key])
181 });
182 }
183 }
184 self
185 }
186
187 pub fn with_sidecar_all(mut self, sidecar: &impl Sidecar) -> Self {
189 for (location_key, &location_type) in self.locations.iter() {
190 if !matches!(location_type, LocationType::Process | LocationType::Cluster) {
191 continue;
192 }
193
194 let location_name = &self.location_names[location_key];
195
196 let future_expr = sidecar.to_expr(
197 self.flow_name(),
198 location_key,
199 location_type,
200 location_name,
201 "e::format_ident!("{}", super::DFIR_IDENT),
202 );
203 self.sidecars.push(super::builder::Sidecar::Simple {
204 location_key,
205 future_expr: Box::new(future_expr),
206 });
207 }
208
209 self
210 }
211
212 pub fn with_sidecar_internal(
214 mut self,
215 location_key: LocationKey,
216 sidecar: &impl Sidecar,
217 ) -> Self {
218 let location_type = self.locations[location_key];
219 let location_name = &self.location_names[location_key];
220 let future_expr = sidecar.to_expr(
221 self.flow_name(),
222 location_key,
223 location_type,
224 location_name,
225 "e::format_ident!("{}", super::DFIR_IDENT),
226 );
227 self.sidecars.push(super::builder::Sidecar::Simple {
228 location_key,
229 future_expr: Box::new(future_expr),
230 });
231 self
232 }
233
234 pub fn with_sidecar_process(self, process: &Process<()>, sidecar: &impl Sidecar) -> Self {
236 self.with_sidecar_internal(process.key, sidecar)
237 }
238
239 pub fn with_sidecar_cluster(self, cluster: &Cluster<()>, sidecar: &impl Sidecar) -> Self {
241 self.with_sidecar_internal(cluster.key, sidecar)
242 }
243
244 pub fn preview_compile(&mut self) -> CompiledFlow<'a> {
249 CompiledFlow {
252 dfir: build_inner(&mut self.ir),
253 extra_stmts: SparseSecondaryMap::new(),
254 sidecars: SparseSecondaryMap::new(),
255 _phantom: PhantomData,
256 }
257 }
258
259 pub fn compile(mut self) -> CompiledFlow<'a>
263 where
264 D: Deploy<'a, InstantiateEnv = ()>,
265 {
266 self.compile_internal(&mut ())
267 }
268
269 pub(super) fn compile_internal(&mut self, env: &mut D::InstantiateEnv) -> CompiledFlow<'a> {
273 let mut seen_tees: HashMap<_, _> = HashMap::new();
274 let mut seen_cluster_members = HashSet::new();
275 let mut extra_stmts = SparseSecondaryMap::new();
276 for leaf in self.ir.iter_mut() {
277 leaf.compile_network::<D>(
278 &mut extra_stmts,
279 &mut seen_tees,
280 &mut seen_cluster_members,
281 &self.processes,
282 &self.clusters,
283 &self.externals,
284 env,
285 );
286 }
287
288 let mut sidecars: SparseSecondaryMap<LocationKey, Vec<syn::Expr>> =
291 SparseSecondaryMap::new();
292 for decl in std::mem::take(&mut self.sidecars) {
293 match decl {
294 super::builder::Sidecar::Simple {
295 location_key,
296 future_expr,
297 } => {
298 sidecars
299 .entry(location_key)
300 .expect("location was removed")
301 .or_default()
302 .push(*future_expr);
303 }
304 super::builder::Sidecar::Bidi {
305 location_key,
306 sidecar_id,
307 sidecar_closure,
308 } => {
309 use syn::parse_quote;
310
311 let (stream_ident, sink_ident) = sidecar_id.idents();
312
313 let sidecar_closure_expr: &syn::Expr = &sidecar_closure;
314 let setup_stmt: syn::Stmt = parse_quote! {
315 let (#stream_ident, #sink_ident) = (#sidecar_closure_expr)();
316 };
317 extra_stmts
318 .entry(location_key)
319 .expect("location was removed")
320 .or_default()
321 .push(setup_stmt);
322 }
323 }
324 }
325
326 CompiledFlow {
327 dfir: build_inner(&mut self.ir),
328 extra_stmts,
329 sidecars,
330 _phantom: PhantomData,
331 }
332 }
333
334 fn cluster_id_stmts(&self, extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>) {
336 #[expect(
337 clippy::disallowed_methods,
338 reason = "nondeterministic iteration order, will be sorted"
339 )]
340 let mut all_clusters_sorted = self.clusters.keys().collect::<Vec<_>>();
341 all_clusters_sorted.sort();
342
343 for cluster_key in all_clusters_sorted {
344 let self_id_ident = syn::Ident::new(
345 &format!("__hydro_lang_cluster_self_id_{}", cluster_key),
346 Span::call_site(),
347 );
348 let self_id_expr = D::cluster_self_id().splice_untyped();
349 extra_stmts
350 .entry(cluster_key)
351 .expect("location was removed")
352 .or_default()
353 .push(syn::parse_quote! {
354 let #self_id_ident = &*Box::leak(Box::new(#self_id_expr));
355 });
356
357 let process_cluster_locations = self.location_names.keys().filter(|&location_key| {
358 self.processes.contains_key(location_key)
359 || self.clusters.contains_key(location_key)
360 });
361 for other_location in process_cluster_locations {
362 let other_id_ident = syn::Ident::new(
363 &format!("__hydro_lang_cluster_ids_{}", cluster_key),
364 Span::call_site(),
365 );
366 let other_id_expr = D::cluster_ids(cluster_key).splice_untyped();
367 extra_stmts
368 .entry(other_location)
369 .expect("location was removed")
370 .or_default()
371 .push(syn::parse_quote! {
372 let #other_id_ident = #other_id_expr;
373 });
374 }
375 }
376 }
377
378 #[must_use]
386 pub fn deploy(mut self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
387 let CompiledFlow {
388 dfir,
389 mut extra_stmts,
390 mut sidecars,
391 _phantom,
392 } = self.compile_internal(env);
393
394 let mut compiled = dfir;
395 self.cluster_id_stmts(&mut extra_stmts);
396 let mut meta = D::Meta::default();
397
398 let (processes, clusters, externals) = (
399 self.processes
400 .into_iter()
401 .filter(|&(node_key, ref node)| {
402 if let Some(ir) = compiled.remove(node_key) {
403 node.instantiate(
404 env,
405 &mut meta,
406 ir,
407 extra_stmts.remove(node_key).as_deref().unwrap_or_default(),
408 sidecars.remove(node_key).as_deref().unwrap_or_default(),
409 );
410 true
411 } else {
412 false
413 }
414 })
415 .collect::<SparseSecondaryMap<_, _>>(),
416 self.clusters
417 .into_iter()
418 .filter(|&(cluster_key, ref cluster)| {
419 if let Some(ir) = compiled.remove(cluster_key) {
420 cluster.instantiate(
421 env,
422 &mut meta,
423 ir,
424 extra_stmts
425 .remove(cluster_key)
426 .as_deref()
427 .unwrap_or_default(),
428 sidecars.remove(cluster_key).as_deref().unwrap_or_default(),
429 );
430 true
431 } else {
432 false
433 }
434 })
435 .collect::<SparseSecondaryMap<_, _>>(),
436 self.externals
437 .into_iter()
438 .inspect(|&(external_key, ref external)| {
439 assert!(!extra_stmts.contains_key(external_key));
440 assert!(!sidecars.contains_key(external_key));
441 external.instantiate(env, &mut meta, Default::default(), &[], &[]);
442 })
443 .collect::<SparseSecondaryMap<_, _>>(),
444 );
445
446 for location_key in self.locations.keys() {
447 if let Some(node) = processes.get(location_key) {
448 node.update_meta(&meta);
449 } else if let Some(cluster) = clusters.get(location_key) {
450 cluster.update_meta(&meta);
451 } else if let Some(external) = externals.get(location_key) {
452 external.update_meta(&meta);
453 }
454 }
455
456 let mut seen_tees_connect = HashMap::new();
457 for leaf in self.ir.iter_mut() {
458 leaf.connect_network(&mut seen_tees_connect);
459 }
460
461 DeployResult {
462 location_names: self.location_names,
463 processes,
464 clusters,
465 externals,
466 }
467 }
468}
469
470pub struct DeployResult<'a, D: Deploy<'a>> {
471 location_names: SecondaryMap<LocationKey, String>,
472 processes: SparseSecondaryMap<LocationKey, D::Process>,
473 clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
474 externals: SparseSecondaryMap<LocationKey, D::External>,
475}
476
477impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
478 pub fn get_process<P>(&self, p: &Process<P>) -> &D::Process {
479 let LocationId::Process(location_key) = p.id() else {
480 panic!("Process ID expected")
481 };
482 self.processes.get(location_key).unwrap()
483 }
484
485 pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
486 let LocationId::Cluster(location_key) = c.id() else {
487 panic!("Cluster ID expected")
488 };
489 self.clusters.get(location_key).unwrap()
490 }
491
492 pub fn get_external<P>(&self, e: &External<P>) -> &D::External {
493 self.externals.get(e.key).unwrap()
494 }
495
496 pub fn get_all_processes(&self) -> impl Iterator<Item = (LocationId, &str, &D::Process)> {
497 self.location_names
498 .iter()
499 .filter_map(|(location_key, location_name)| {
500 self.processes
501 .get(location_key)
502 .map(|process| (LocationId::Process(location_key), &**location_name, process))
503 })
504 }
505
506 pub fn get_all_clusters(&self) -> impl Iterator<Item = (LocationId, &str, &D::Cluster)> {
507 self.location_names
508 .iter()
509 .filter_map(|(location_key, location_name)| {
510 self.clusters
511 .get(location_key)
512 .map(|cluster| (LocationId::Cluster(location_key), &**location_name, cluster))
513 })
514 }
515
516 #[deprecated(note = "use `connect` instead")]
517 pub async fn connect_bytes<M>(
518 &self,
519 port: ExternalBytesPort<M>,
520 ) -> (
521 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
522 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
523 ) {
524 self.connect(port).await
525 }
526
527 #[deprecated(note = "use `connect` instead")]
528 pub async fn connect_sink_bytes<M>(
529 &self,
530 port: ExternalBytesPort<M>,
531 ) -> Pin<Box<dyn Sink<Bytes, Error = Error>>> {
532 self.connect(port).await.1
533 }
534
535 pub async fn connect_bincode<
536 InT: Serialize + 'static,
537 OutT: DeserializeOwned + 'static,
538 Many,
539 >(
540 &self,
541 port: ExternalBincodeBidi<InT, OutT, Many>,
542 ) -> (
543 Pin<Box<dyn Stream<Item = OutT>>>,
544 Pin<Box<dyn Sink<InT, Error = Error>>>,
545 ) {
546 self.externals
547 .get(port.process_key)
548 .unwrap()
549 .as_bincode_bidi(port.port_id)
550 .await
551 }
552
553 #[deprecated(note = "use `connect` instead")]
554 pub async fn connect_sink_bincode<T: Serialize + DeserializeOwned + 'static, Many>(
555 &self,
556 port: ExternalBincodeSink<T, Many>,
557 ) -> Pin<Box<dyn Sink<T, Error = Error>>> {
558 self.connect(port).await
559 }
560
561 #[deprecated(note = "use `connect` instead")]
562 pub async fn connect_source_bytes(
563 &self,
564 port: ExternalBytesPort,
565 ) -> Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>> {
566 self.connect(port).await.0
567 }
568
569 #[deprecated(note = "use `connect` instead")]
570 pub async fn connect_source_bincode<
571 T: Serialize + DeserializeOwned + 'static,
572 O: Ordering,
573 R: Retries,
574 >(
575 &self,
576 port: ExternalBincodeStream<T, O, R>,
577 ) -> Pin<Box<dyn Stream<Item = T>>> {
578 self.connect(port).await
579 }
580
581 pub async fn connect<'b, P: ConnectableAsync<&'b Self>>(
582 &'b self,
583 port: P,
584 ) -> <P as ConnectableAsync<&'b Self>>::Output {
585 port.connect(self).await
586 }
587}
588
589#[cfg(stageleft_runtime)]
590#[cfg(feature = "deploy")]
591#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
592impl DeployResult<'_, crate::deploy::HydroDeploy> {
593 pub fn raw_port<M>(
595 &self,
596 port: ExternalBytesPort<M>,
597 ) -> hydro_deploy::custom_service::CustomClientPort {
598 self.externals
599 .get(port.process_key)
600 .unwrap()
601 .raw_port(port.port_id)
602 }
603}
604
605pub trait ConnectableAsync<Ctx> {
606 type Output;
607
608 fn connect(self, ctx: Ctx) -> impl Future<Output = Self::Output>;
609}
610
611impl<'a, D: Deploy<'a>, M> ConnectableAsync<&DeployResult<'a, D>> for ExternalBytesPort<M> {
612 type Output = (
613 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
614 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
615 );
616
617 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
618 ctx.externals
619 .get(self.process_key)
620 .unwrap()
621 .as_bytes_bidi(self.port_id)
622 .await
623 }
624}
625
626impl<'a, D: Deploy<'a>, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
627 ConnectableAsync<&DeployResult<'a, D>> for ExternalBincodeStream<T, O, R>
628{
629 type Output = Pin<Box<dyn Stream<Item = T>>>;
630
631 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
632 ctx.externals
633 .get(self.process_key)
634 .unwrap()
635 .as_bincode_source(self.port_id)
636 .await
637 }
638}
639
640impl<'a, D: Deploy<'a>, T: Serialize + 'static, Many> ConnectableAsync<&DeployResult<'a, D>>
641 for ExternalBincodeSink<T, Many>
642{
643 type Output = Pin<Box<dyn Sink<T, Error = Error>>>;
644
645 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
646 ctx.externals
647 .get(self.process_key)
648 .unwrap()
649 .as_bincode_sink(self.port_id)
650 .await
651 }
652}