Skip to main content

hydro_lang/compile/
deploy.rs

1use std::collections::{HashMap, HashSet};
2use std::io::Error;
3use std::marker::PhantomData;
4use std::pin::Pin;
5
6use bytes::{Bytes, BytesMut};
7use futures::{Sink, Stream};
8use proc_macro2::Span;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use slotmap::{SecondaryMap, SlotMap, SparseSecondaryMap};
12use stageleft::QuotedWithContext;
13
14use super::built::build_inner;
15use super::compiled::CompiledFlow;
16use super::deploy_provider::{
17    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
18};
19use super::ir::HydroRoot;
20use crate::live_collections::stream::{Ordering, Retries};
21use crate::location::dynamic::LocationId;
22use crate::location::external_process::{
23    ExternalBincodeBidi, ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
24};
25use crate::location::{Cluster, External, Location, LocationKey, LocationType, Process};
26use crate::staging_util::Invariant;
27use crate::telemetry::Sidecar;
28
29pub struct DeployFlow<'a, D>
30where
31    D: Deploy<'a>,
32{
33    pub(super) ir: Vec<HydroRoot>,
34
35    pub(super) locations: SlotMap<LocationKey, LocationType>,
36    pub(super) location_names: SecondaryMap<LocationKey, String>,
37
38    /// Deployed instances of each process in the flow
39    pub(super) processes: SparseSecondaryMap<LocationKey, D::Process>,
40    pub(super) clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
41    pub(super) externals: SparseSecondaryMap<LocationKey, D::External>,
42
43    /// Compile-time sidecar directives (both simple futures and external TCP sidecars).
44    pub(super) sidecars: Vec<super::builder::Sidecar>,
45
46    /// Application name used in telemetry.
47    pub(super) flow_name: String,
48
49    pub(super) _phantom: Invariant<'a, D>,
50}
51
52impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
53    pub fn ir(&self) -> &Vec<HydroRoot> {
54        &self.ir
55    }
56
57    /// Application name used in telemetry.
58    pub fn flow_name(&self) -> &str {
59        &self.flow_name
60    }
61
62    pub fn with_process<P>(
63        mut self,
64        process: &Process<P>,
65        spec: impl IntoProcessSpec<'a, D>,
66    ) -> Self {
67        self.processes.insert(
68            process.key,
69            spec.into_process_spec()
70                .build(process.key, &self.location_names[process.key]),
71        );
72        self
73    }
74
75    /// TODO(mingwei): unstable API
76    #[doc(hidden)]
77    pub fn with_process_erased(
78        mut self,
79        process_loc_key: LocationKey,
80        spec: impl IntoProcessSpec<'a, D>,
81    ) -> Self {
82        assert_eq!(
83            Some(&LocationType::Process),
84            self.locations.get(process_loc_key),
85            "No process with the given `LocationKey` was found."
86        );
87        self.processes.insert(
88            process_loc_key,
89            spec.into_process_spec()
90                .build(process_loc_key, &self.location_names[process_loc_key]),
91        );
92        self
93    }
94
95    pub fn with_remaining_processes<S: IntoProcessSpec<'a, D> + 'a>(
96        mut self,
97        spec: impl Fn() -> S,
98    ) -> Self {
99        for (location_key, &location_type) in self.locations.iter() {
100            if LocationType::Process == location_type {
101                self.processes
102                    .entry(location_key)
103                    .expect("location was removed")
104                    .or_insert_with(|| {
105                        spec()
106                            .into_process_spec()
107                            .build(location_key, &self.location_names[location_key])
108                    });
109            }
110        }
111        self
112    }
113
114    pub fn with_cluster<C>(mut self, cluster: &Cluster<C>, spec: impl ClusterSpec<'a, D>) -> Self {
115        self.clusters.insert(
116            cluster.key,
117            spec.build(cluster.key, &self.location_names[cluster.key]),
118        );
119        self
120    }
121
122    /// TODO(mingwei): unstable API
123    #[doc(hidden)]
124    pub fn with_cluster_erased(
125        mut self,
126        cluster_loc_key: LocationKey,
127        spec: impl ClusterSpec<'a, D>,
128    ) -> Self {
129        assert_eq!(
130            Some(&LocationType::Cluster),
131            self.locations.get(cluster_loc_key),
132            "No cluster with the given `LocationKey` was found."
133        );
134        self.clusters.insert(
135            cluster_loc_key,
136            spec.build(cluster_loc_key, &self.location_names[cluster_loc_key]),
137        );
138        self
139    }
140
141    pub fn with_remaining_clusters<S: ClusterSpec<'a, D> + 'a>(
142        mut self,
143        spec: impl Fn() -> S,
144    ) -> Self {
145        for (location_key, &location_type) in self.locations.iter() {
146            if LocationType::Cluster == location_type {
147                self.clusters
148                    .entry(location_key)
149                    .expect("location was removed")
150                    .or_insert_with(|| {
151                        spec().build(location_key, &self.location_names[location_key])
152                    });
153            }
154        }
155        self
156    }
157
158    pub fn with_external<P>(
159        mut self,
160        external: &External<P>,
161        spec: impl ExternalSpec<'a, D>,
162    ) -> Self {
163        self.externals.insert(
164            external.key,
165            spec.build(external.key, &self.location_names[external.key]),
166        );
167        self
168    }
169
170    pub fn with_remaining_externals<S: ExternalSpec<'a, D> + 'a>(
171        mut self,
172        spec: impl Fn() -> S,
173    ) -> Self {
174        for (location_key, &location_type) in self.locations.iter() {
175            if LocationType::External == location_type {
176                self.externals
177                    .entry(location_key)
178                    .expect("location was removed")
179                    .or_insert_with(|| {
180                        spec().build(location_key, &self.location_names[location_key])
181                    });
182            }
183        }
184        self
185    }
186
187    /// Adds a [`Sidecar`] to all processes and clusters in the flow.
188    pub fn with_sidecar_all(mut self, sidecar: &impl Sidecar) -> Self {
189        for (location_key, &location_type) in self.locations.iter() {
190            if !matches!(location_type, LocationType::Process | LocationType::Cluster) {
191                continue;
192            }
193
194            let location_name = &self.location_names[location_key];
195
196            let future_expr = sidecar.to_expr(
197                self.flow_name(),
198                location_key,
199                location_type,
200                location_name,
201                &quote::format_ident!("{}", super::DFIR_IDENT),
202            );
203            self.sidecars.push(super::builder::Sidecar::Simple {
204                location_key,
205                future_expr: Box::new(future_expr),
206            });
207        }
208
209        self
210    }
211
212    /// Adds a [`Sidecar`] to the given location.
213    pub fn with_sidecar_internal(
214        mut self,
215        location_key: LocationKey,
216        sidecar: &impl Sidecar,
217    ) -> Self {
218        let location_type = self.locations[location_key];
219        let location_name = &self.location_names[location_key];
220        let future_expr = sidecar.to_expr(
221            self.flow_name(),
222            location_key,
223            location_type,
224            location_name,
225            &quote::format_ident!("{}", super::DFIR_IDENT),
226        );
227        self.sidecars.push(super::builder::Sidecar::Simple {
228            location_key,
229            future_expr: Box::new(future_expr),
230        });
231        self
232    }
233
234    /// Adds a [`Sidecar`] to a specific process in the flow.
235    pub fn with_sidecar_process(self, process: &Process<()>, sidecar: &impl Sidecar) -> Self {
236        self.with_sidecar_internal(process.key, sidecar)
237    }
238
239    /// Adds a [`Sidecar`] to a specific cluster in the flow.
240    pub fn with_sidecar_cluster(self, cluster: &Cluster<()>, sidecar: &impl Sidecar) -> Self {
241        self.with_sidecar_internal(cluster.key, sidecar)
242    }
243
244    /// Compiles the flow into DFIR ([`dfir_lang::graph::DfirGraph`]) without networking.
245    /// Useful for generating Mermaid diagrams of the DFIR.
246    ///
247    /// (This returned DFIR will not compile due to the networking missing).
248    pub fn preview_compile(&mut self) -> CompiledFlow<'a> {
249        // NOTE: `build_inner` does not actually mutate the IR, but `&mut` is required
250        // only because the shared traversal logic requires it
251        CompiledFlow {
252            dfir: build_inner(&mut self.ir),
253            extra_stmts: SparseSecondaryMap::new(),
254            sidecars: SparseSecondaryMap::new(),
255            _phantom: PhantomData,
256        }
257    }
258
259    /// Compiles the flow into DFIR ([`dfir_lang::graph::DfirGraph`]) including networking.
260    ///
261    /// (This does not compile the DFIR itself, instead use [`Self::deploy`] to compile & deploy the DFIR).
262    pub fn compile(mut self) -> CompiledFlow<'a>
263    where
264        D: Deploy<'a, InstantiateEnv = ()>,
265    {
266        self.compile_internal(&mut ())
267    }
268
269    /// Same as [`Self::compile`] but does not invalidate `self`, for internal use.
270    ///
271    /// Empties `self.sidecars` and modifies `self.ir`, leaving `self` in a partial state.
272    pub(super) fn compile_internal(&mut self, env: &mut D::InstantiateEnv) -> CompiledFlow<'a> {
273        let mut seen_tees: HashMap<_, _> = HashMap::new();
274        let mut seen_cluster_members = HashSet::new();
275        let mut extra_stmts = SparseSecondaryMap::new();
276        for leaf in self.ir.iter_mut() {
277            leaf.compile_network::<D>(
278                &mut extra_stmts,
279                &mut seen_tees,
280                &mut seen_cluster_members,
281                &self.processes,
282                &self.clusters,
283                &self.externals,
284                env,
285            );
286        }
287
288        // Process sidecar declarations — compile-time directives that
289        // produce futures to spawn on each location's LocalSet.
290        let mut sidecars: SparseSecondaryMap<LocationKey, Vec<syn::Expr>> =
291            SparseSecondaryMap::new();
292        for decl in std::mem::take(&mut self.sidecars) {
293            match decl {
294                super::builder::Sidecar::Simple {
295                    location_key,
296                    future_expr,
297                } => {
298                    sidecars
299                        .entry(location_key)
300                        .expect("location was removed")
301                        .or_default()
302                        .push(*future_expr);
303                }
304                super::builder::Sidecar::Bidi {
305                    location_key,
306                    sidecar_id,
307                    sidecar_closure,
308                } => {
309                    use syn::parse_quote;
310
311                    let (stream_ident, sink_ident) = sidecar_id.idents();
312
313                    let sidecar_closure_expr: &syn::Expr = &sidecar_closure;
314                    let setup_stmt: syn::Stmt = parse_quote! {
315                        let (#stream_ident, #sink_ident) = (#sidecar_closure_expr)();
316                    };
317                    extra_stmts
318                        .entry(location_key)
319                        .expect("location was removed")
320                        .or_default()
321                        .push(setup_stmt);
322                }
323            }
324        }
325
326        CompiledFlow {
327            dfir: build_inner(&mut self.ir),
328            extra_stmts,
329            sidecars,
330            _phantom: PhantomData,
331        }
332    }
333
334    /// Creates the variables for cluster IDs and adds them into `extra_stmts`.
335    fn cluster_id_stmts(&self, extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>) {
336        #[expect(
337            clippy::disallowed_methods,
338            reason = "nondeterministic iteration order, will be sorted"
339        )]
340        let mut all_clusters_sorted = self.clusters.keys().collect::<Vec<_>>();
341        all_clusters_sorted.sort();
342
343        for cluster_key in all_clusters_sorted {
344            let self_id_ident = syn::Ident::new(
345                &format!("__hydro_lang_cluster_self_id_{}", cluster_key),
346                Span::call_site(),
347            );
348            let self_id_expr = D::cluster_self_id().splice_untyped();
349            extra_stmts
350                .entry(cluster_key)
351                .expect("location was removed")
352                .or_default()
353                .push(syn::parse_quote! {
354                    let #self_id_ident = &*Box::leak(Box::new(#self_id_expr));
355                });
356
357            let process_cluster_locations = self.location_names.keys().filter(|&location_key| {
358                self.processes.contains_key(location_key)
359                    || self.clusters.contains_key(location_key)
360            });
361            for other_location in process_cluster_locations {
362                let other_id_ident = syn::Ident::new(
363                    &format!("__hydro_lang_cluster_ids_{}", cluster_key),
364                    Span::call_site(),
365                );
366                let other_id_expr = D::cluster_ids(cluster_key).splice_untyped();
367                extra_stmts
368                    .entry(other_location)
369                    .expect("location was removed")
370                    .or_default()
371                    .push(syn::parse_quote! {
372                        let #other_id_ident = #other_id_expr;
373                    });
374            }
375        }
376    }
377
378    /// Compiles and deploys the flow.
379    ///
380    /// Rough outline of steps:
381    /// * Compiles the Hydro into DFIR.
382    /// * Instantiates nodes as configured.
383    /// * Compiles the corresponding DFIR into binaries for nodes as needed.
384    /// * Connects up networking as needed.
385    #[must_use]
386    pub fn deploy(mut self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
387        let CompiledFlow {
388            dfir,
389            mut extra_stmts,
390            mut sidecars,
391            _phantom,
392        } = self.compile_internal(env);
393
394        let mut compiled = dfir;
395        self.cluster_id_stmts(&mut extra_stmts);
396        let mut meta = D::Meta::default();
397
398        let (processes, clusters, externals) = (
399            self.processes
400                .into_iter()
401                .filter(|&(node_key, ref node)| {
402                    if let Some(ir) = compiled.remove(node_key) {
403                        node.instantiate(
404                            env,
405                            &mut meta,
406                            ir,
407                            extra_stmts.remove(node_key).as_deref().unwrap_or_default(),
408                            sidecars.remove(node_key).as_deref().unwrap_or_default(),
409                        );
410                        true
411                    } else {
412                        false
413                    }
414                })
415                .collect::<SparseSecondaryMap<_, _>>(),
416            self.clusters
417                .into_iter()
418                .filter(|&(cluster_key, ref cluster)| {
419                    if let Some(ir) = compiled.remove(cluster_key) {
420                        cluster.instantiate(
421                            env,
422                            &mut meta,
423                            ir,
424                            extra_stmts
425                                .remove(cluster_key)
426                                .as_deref()
427                                .unwrap_or_default(),
428                            sidecars.remove(cluster_key).as_deref().unwrap_or_default(),
429                        );
430                        true
431                    } else {
432                        false
433                    }
434                })
435                .collect::<SparseSecondaryMap<_, _>>(),
436            self.externals
437                .into_iter()
438                .inspect(|&(external_key, ref external)| {
439                    assert!(!extra_stmts.contains_key(external_key));
440                    assert!(!sidecars.contains_key(external_key));
441                    external.instantiate(env, &mut meta, Default::default(), &[], &[]);
442                })
443                .collect::<SparseSecondaryMap<_, _>>(),
444        );
445
446        for location_key in self.locations.keys() {
447            if let Some(node) = processes.get(location_key) {
448                node.update_meta(&meta);
449            } else if let Some(cluster) = clusters.get(location_key) {
450                cluster.update_meta(&meta);
451            } else if let Some(external) = externals.get(location_key) {
452                external.update_meta(&meta);
453            }
454        }
455
456        let mut seen_tees_connect = HashMap::new();
457        for leaf in self.ir.iter_mut() {
458            leaf.connect_network(&mut seen_tees_connect);
459        }
460
461        DeployResult {
462            location_names: self.location_names,
463            processes,
464            clusters,
465            externals,
466        }
467    }
468}
469
470pub struct DeployResult<'a, D: Deploy<'a>> {
471    location_names: SecondaryMap<LocationKey, String>,
472    processes: SparseSecondaryMap<LocationKey, D::Process>,
473    clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
474    externals: SparseSecondaryMap<LocationKey, D::External>,
475}
476
477impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
478    pub fn get_process<P>(&self, p: &Process<P>) -> &D::Process {
479        let LocationId::Process(location_key) = p.id() else {
480            panic!("Process ID expected")
481        };
482        self.processes.get(location_key).unwrap()
483    }
484
485    pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
486        let LocationId::Cluster(location_key) = c.id() else {
487            panic!("Cluster ID expected")
488        };
489        self.clusters.get(location_key).unwrap()
490    }
491
492    pub fn get_external<P>(&self, e: &External<P>) -> &D::External {
493        self.externals.get(e.key).unwrap()
494    }
495
496    pub fn get_all_processes(&self) -> impl Iterator<Item = (LocationId, &str, &D::Process)> {
497        self.location_names
498            .iter()
499            .filter_map(|(location_key, location_name)| {
500                self.processes
501                    .get(location_key)
502                    .map(|process| (LocationId::Process(location_key), &**location_name, process))
503            })
504    }
505
506    pub fn get_all_clusters(&self) -> impl Iterator<Item = (LocationId, &str, &D::Cluster)> {
507        self.location_names
508            .iter()
509            .filter_map(|(location_key, location_name)| {
510                self.clusters
511                    .get(location_key)
512                    .map(|cluster| (LocationId::Cluster(location_key), &**location_name, cluster))
513            })
514    }
515
516    #[deprecated(note = "use `connect` instead")]
517    pub async fn connect_bytes<M>(
518        &self,
519        port: ExternalBytesPort<M>,
520    ) -> (
521        Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
522        Pin<Box<dyn Sink<Bytes, Error = Error>>>,
523    ) {
524        self.connect(port).await
525    }
526
527    #[deprecated(note = "use `connect` instead")]
528    pub async fn connect_sink_bytes<M>(
529        &self,
530        port: ExternalBytesPort<M>,
531    ) -> Pin<Box<dyn Sink<Bytes, Error = Error>>> {
532        self.connect(port).await.1
533    }
534
535    pub async fn connect_bincode<
536        InT: Serialize + 'static,
537        OutT: DeserializeOwned + 'static,
538        Many,
539    >(
540        &self,
541        port: ExternalBincodeBidi<InT, OutT, Many>,
542    ) -> (
543        Pin<Box<dyn Stream<Item = OutT>>>,
544        Pin<Box<dyn Sink<InT, Error = Error>>>,
545    ) {
546        self.externals
547            .get(port.process_key)
548            .unwrap()
549            .as_bincode_bidi(port.port_id)
550            .await
551    }
552
553    #[deprecated(note = "use `connect` instead")]
554    pub async fn connect_sink_bincode<T: Serialize + DeserializeOwned + 'static, Many>(
555        &self,
556        port: ExternalBincodeSink<T, Many>,
557    ) -> Pin<Box<dyn Sink<T, Error = Error>>> {
558        self.connect(port).await
559    }
560
561    #[deprecated(note = "use `connect` instead")]
562    pub async fn connect_source_bytes(
563        &self,
564        port: ExternalBytesPort,
565    ) -> Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>> {
566        self.connect(port).await.0
567    }
568
569    #[deprecated(note = "use `connect` instead")]
570    pub async fn connect_source_bincode<
571        T: Serialize + DeserializeOwned + 'static,
572        O: Ordering,
573        R: Retries,
574    >(
575        &self,
576        port: ExternalBincodeStream<T, O, R>,
577    ) -> Pin<Box<dyn Stream<Item = T>>> {
578        self.connect(port).await
579    }
580
581    pub async fn connect<'b, P: ConnectableAsync<&'b Self>>(
582        &'b self,
583        port: P,
584    ) -> <P as ConnectableAsync<&'b Self>>::Output {
585        port.connect(self).await
586    }
587}
588
589#[cfg(stageleft_runtime)]
590#[cfg(feature = "deploy")]
591#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
592impl DeployResult<'_, crate::deploy::HydroDeploy> {
593    /// Get the raw port handle.
594    pub fn raw_port<M>(
595        &self,
596        port: ExternalBytesPort<M>,
597    ) -> hydro_deploy::custom_service::CustomClientPort {
598        self.externals
599            .get(port.process_key)
600            .unwrap()
601            .raw_port(port.port_id)
602    }
603}
604
605pub trait ConnectableAsync<Ctx> {
606    type Output;
607
608    fn connect(self, ctx: Ctx) -> impl Future<Output = Self::Output>;
609}
610
611impl<'a, D: Deploy<'a>, M> ConnectableAsync<&DeployResult<'a, D>> for ExternalBytesPort<M> {
612    type Output = (
613        Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
614        Pin<Box<dyn Sink<Bytes, Error = Error>>>,
615    );
616
617    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
618        ctx.externals
619            .get(self.process_key)
620            .unwrap()
621            .as_bytes_bidi(self.port_id)
622            .await
623    }
624}
625
626impl<'a, D: Deploy<'a>, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
627    ConnectableAsync<&DeployResult<'a, D>> for ExternalBincodeStream<T, O, R>
628{
629    type Output = Pin<Box<dyn Stream<Item = T>>>;
630
631    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
632        ctx.externals
633            .get(self.process_key)
634            .unwrap()
635            .as_bincode_source(self.port_id)
636            .await
637    }
638}
639
640impl<'a, D: Deploy<'a>, T: Serialize + 'static, Many> ConnectableAsync<&DeployResult<'a, D>>
641    for ExternalBincodeSink<T, Many>
642{
643    type Output = Pin<Box<dyn Sink<T, Error = Error>>>;
644
645    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
646        ctx.externals
647            .get(self.process_key)
648            .unwrap()
649            .as_bincode_sink(self.port_id)
650            .await
651    }
652}