Skip to main content

hydro_lang/deploy/
deploy_graph_containerized_ecs.rs

1//! Deployment backend for ECS that generates manifests describing the binaries,
2//! ports, and service naming needed to package and orchestrate Hydro applications.
3
4use std::cell::RefCell;
5use std::collections::BTreeMap;
6use std::pin::Pin;
7use std::rc::Rc;
8
9use bytes::Bytes;
10use dfir_lang::graph::DfirGraph;
11use futures::{Sink, Stream};
12use proc_macro2::Span;
13use serde::{Deserialize, Serialize};
14use stageleft::QuotedWithContext;
15use syn::parse_quote;
16use tracing::{instrument, trace};
17
18/// Manifest for exporting - describes all processes, clusters, and their configuration
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct HydroManifest {
21    /// Process definitions (single-instance services)
22    pub processes: BTreeMap<String, ProcessManifest>,
23    /// Cluster definitions (multi-instance services)
24    pub clusters: BTreeMap<String, ClusterManifest>,
25}
26
27/// Information the build toolchain needs to compile a trybuild binary.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct BuildConfig {
30    /// Path to the trybuild project directory
31    pub project_dir: String,
32    /// Path to the target directory
33    pub target_dir: String,
34    /// Example/binary name to build
35    pub bin_name: String,
36    /// Package name containing the example (for -p flag)
37    pub package_name: String,
38    /// Features to enable
39    pub features: Vec<String>,
40}
41
42/// Information about an exposed port.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "protocol", rename_all = "lowercase")]
45pub enum PortInfo {
46    /// A TCP listener.
47    Tcp {
48        /// The port number.
49        port: u16,
50    },
51}
52
53/// Manifest entry for a single process
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ProcessManifest {
56    /// Build toolchain info for this binary
57    pub build: BuildConfig,
58    /// Ports that need to be exposed, keyed by external port identifier
59    pub ports: BTreeMap<String, PortInfo>,
60    /// Task family name (used for ECS service discovery)
61    pub task_family: String,
62}
63
64/// Manifest entry for a cluster (multiple instances of the same service)
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ClusterManifest {
67    /// Build configuration for this cluster (same binary for all instances)
68    pub build: BuildConfig,
69    /// Ports that need to be exposed, keyed by external port identifier
70    pub ports: BTreeMap<String, PortInfo>,
71    /// Default number of instances
72    pub default_count: usize,
73    /// Task family prefix (instances will be named {prefix}-0, {prefix}-1, etc.)
74    pub task_family_prefix: String,
75}
76
77use super::deploy_runtime_containerized_ecs::*;
78use crate::compile::builder::ExternalPortId;
79use crate::compile::deploy::DeployResult;
80use crate::compile::deploy_provider::{
81    ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
82};
83use crate::compile::trybuild::generate::create_graph_trybuild;
84use crate::location::dynamic::LocationId;
85use crate::location::member_id::TaglessMemberId;
86use crate::location::{LocationKey, MembershipEvent, NetworkHint};
87
88/// Represents a process running in an ecs deployment
89#[derive(Clone)]
90pub struct EcsDeployProcess {
91    id: LocationKey,
92    name: String,
93    next_port: Rc<RefCell<u16>>,
94
95    exposed_ports: Rc<RefCell<BTreeMap<String, PortInfo>>>,
96
97    trybuild_config:
98        Rc<RefCell<Option<(String, crate::compile::trybuild::generate::TrybuildConfig)>>>,
99}
100
101impl Node for EcsDeployProcess {
102    type Port = u16;
103    type Meta = ();
104    type InstantiateEnv = EcsDeploy;
105
106    #[instrument(level = "trace", skip_all, ret, fields(id = %self.id, name = self.name))]
107    fn next_port(&self) -> Self::Port {
108        let port = {
109            let mut borrow = self.next_port.borrow_mut();
110            let port = *borrow;
111            *borrow += 1;
112            port
113        };
114
115        port
116    }
117
118    #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name))]
119    fn update_meta(&self, _meta: &Self::Meta) {}
120
121    #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
122    fn instantiate(
123        &self,
124        _env: &mut Self::InstantiateEnv,
125        meta: &mut Self::Meta,
126        graph: DfirGraph,
127        extra_stmts: &[syn::Stmt],
128        sidecars: &[syn::Expr],
129    ) {
130        let (bin_name, config) = create_graph_trybuild(
131            graph,
132            extra_stmts,
133            sidecars,
134            Some(&self.name),
135            crate::compile::trybuild::generate::DeployMode::Containerized,
136            crate::compile::trybuild::generate::LinkingMode::Static,
137        );
138
139        // Store the trybuild config for export
140        *self.trybuild_config.borrow_mut() = Some((bin_name, config));
141    }
142}
143
144impl EcsDeployProcess {
145    /// Expose a TCP port on this process for external access.
146    ///
147    /// This method records the port in the manifest's `ports` map so that
148    /// downstream tooling (CDK, deployment scripts) can configure security
149    /// groups, load balancers, and service discovery accordingly.
150    pub fn expose_port(&self, port: u16) {
151        let port_name = format!("exposed-{}", port);
152        self.exposed_ports
153            .borrow_mut()
154            .insert(port_name, PortInfo::Tcp { port });
155    }
156}
157
158/// Represents a logical cluster, which can be a variable amount of individual containers.
159#[derive(Clone)]
160pub struct EcsDeployCluster {
161    id: LocationKey,
162    name: String,
163    next_port: Rc<RefCell<u16>>,
164
165    exposed_ports: Rc<RefCell<BTreeMap<String, PortInfo>>>,
166
167    count: usize,
168
169    /// Stored trybuild config for export
170    trybuild_config:
171        Rc<RefCell<Option<(String, crate::compile::trybuild::generate::TrybuildConfig)>>>,
172}
173
174impl Node for EcsDeployCluster {
175    type Port = u16;
176    type Meta = ();
177    type InstantiateEnv = EcsDeploy;
178
179    #[instrument(level = "trace", skip_all, ret, fields(id = %self.id, name = self.name))]
180    fn next_port(&self) -> Self::Port {
181        let port = {
182            let mut borrow = self.next_port.borrow_mut();
183            let port = *borrow;
184            *borrow += 1;
185            port
186        };
187
188        port
189    }
190
191    #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name))]
192    fn update_meta(&self, _meta: &Self::Meta) {}
193
194    #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name, extra_stmts = extra_stmts.len()))]
195    fn instantiate(
196        &self,
197        _env: &mut Self::InstantiateEnv,
198        _meta: &mut Self::Meta,
199        graph: DfirGraph,
200        extra_stmts: &[syn::Stmt],
201        sidecars: &[syn::Expr],
202    ) {
203        let (bin_name, config) = create_graph_trybuild(
204            graph,
205            extra_stmts,
206            sidecars,
207            Some(&self.name),
208            crate::compile::trybuild::generate::DeployMode::Containerized,
209            crate::compile::trybuild::generate::LinkingMode::Static,
210        );
211
212        // Store the trybuild config for export
213        *self.trybuild_config.borrow_mut() = Some((bin_name, config));
214    }
215}
216
217/// Represents an external process, outside the control of this deployment but still with some communication into this deployment.
218#[derive(Clone, Debug)]
219pub struct EcsDeployExternal {
220    name: String,
221    next_port: Rc<RefCell<u16>>,
222}
223
224impl Node for EcsDeployExternal {
225    type Port = u16;
226    type Meta = ();
227    type InstantiateEnv = EcsDeploy;
228
229    #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
230    fn next_port(&self) -> Self::Port {
231        let port = {
232            let mut borrow = self.next_port.borrow_mut();
233            let port = *borrow;
234            *borrow += 1;
235            port
236        };
237
238        port
239    }
240
241    #[instrument(level = "trace", skip_all, fields(name = self.name))]
242    fn update_meta(&self, _meta: &Self::Meta) {}
243
244    #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
245    fn instantiate(
246        &self,
247        _env: &mut Self::InstantiateEnv,
248        meta: &mut Self::Meta,
249        graph: DfirGraph,
250        extra_stmts: &[syn::Stmt],
251        sidecars: &[syn::Expr],
252    ) {
253        trace!(name: "surface", surface = graph.surface_syntax_string());
254    }
255}
256
257impl EcsDeployCluster {
258    /// Expose a TCP port on every member of this cluster for external access.
259    ///
260    /// The binary running on this cluster must bind a `TcpListener` on this port.
261    /// This method records the port in the manifest's `ports` map so that
262    /// downstream tooling (CDK, deployment scripts) can configure security
263    /// groups, load balancers, and service discovery accordingly.
264    pub fn expose_port(&self, port: u16) {
265        let port_name = format!("exposed-{}", port);
266        self.exposed_ports
267            .borrow_mut()
268            .insert(port_name, PortInfo::Tcp { port });
269    }
270}
271
272type DynSourceSink<Out, In, InErr> = (
273    Pin<Box<dyn Stream<Item = Out>>>,
274    Pin<Box<dyn Sink<In, Error = InErr>>>,
275);
276
277impl<'a> RegisterPort<'a, EcsDeploy> for EcsDeployExternal {
278    #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
279    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {}
280
281    #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
282    fn as_bytes_bidi(
283        &self,
284        _external_port_id: ExternalPortId,
285    ) -> impl Future<
286        Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
287    > + 'a {
288        async { unimplemented!() }
289    }
290
291    #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
292    fn as_bincode_bidi<InT, OutT>(
293        &self,
294        _external_port_id: ExternalPortId,
295    ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
296    where
297        InT: Serialize + 'static,
298        OutT: serde::de::DeserializeOwned + 'static,
299    {
300        async { unimplemented!() }
301    }
302
303    #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
304    fn as_bincode_sink<T>(
305        &self,
306        _external_port_id: ExternalPortId,
307    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
308    where
309        T: Serialize + 'static,
310    {
311        async { unimplemented!() }
312    }
313
314    #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
315    fn as_bincode_source<T>(
316        &self,
317        _external_port_id: ExternalPortId,
318    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
319    where
320        T: serde::de::DeserializeOwned + 'static,
321    {
322        async { unimplemented!() }
323    }
324}
325
326/// Represents an aws ecs deployment.
327pub struct EcsDeploy;
328
329impl Default for EcsDeploy {
330    fn default() -> Self {
331        Self::new()
332    }
333}
334
335impl EcsDeploy {
336    /// Creates a new ecs deployment.
337    pub fn new() -> Self {
338        Self
339    }
340
341    /// Add an internal ecs process to the deployment.
342    pub fn add_ecs_process(&mut self) -> EcsDeployProcessSpec {
343        EcsDeployProcessSpec
344    }
345
346    /// Add an internal ecs cluster to the deployment.
347    pub fn add_ecs_cluster(&mut self, count: usize) -> EcsDeployClusterSpec {
348        EcsDeployClusterSpec { count }
349    }
350
351    /// Add an external process to the deployment.
352    pub fn add_external(&self, name: String) -> EcsDeployExternalSpec {
353        EcsDeployExternalSpec { name }
354    }
355
356    /// Export a deployment manifest describing each process and cluster: its
357    /// binary name, exposed ports, and ECS task-family naming.
358    ///
359    /// The returned [`HydroManifest`] is typically serialized to JSON and
360    /// consumed by a build script (to compile the trybuild binaries) and a
361    /// deployment tool (CDK, custom scripts, etc.) to create container images
362    /// and orchestrate services.
363    #[instrument(level = "trace", skip_all)]
364    pub fn export(&self, nodes: &DeployResult<'_, Self>) -> HydroManifest {
365        let mut manifest = HydroManifest {
366            processes: BTreeMap::new(),
367            clusters: BTreeMap::new(),
368        };
369
370        // processes
371        for (location_id, name_hint, process) in nodes.get_all_processes() {
372            let LocationId::Process(_) = location_id else {
373                unreachable!()
374            };
375
376            let (bin_name, trybuild_config) = process
377                .trybuild_config
378                .borrow()
379                .clone()
380                .expect("trybuild_config should be set after instantiate");
381
382            manifest.processes.insert(
383                name_hint.to_owned(),
384                ProcessManifest {
385                    build: build_info_from_config(&bin_name, &trybuild_config),
386                    ports: process.exposed_ports.borrow().clone(),
387                    task_family: process.name.clone(),
388                },
389            );
390        }
391
392        // clusters
393        for (location_id, name_hint, cluster) in nodes.get_all_clusters() {
394            let LocationId::Cluster(_) = location_id else {
395                unreachable!()
396            };
397
398            let (bin_name, trybuild_config) = cluster
399                .trybuild_config
400                .borrow()
401                .clone()
402                .expect("trybuild_config should be set after instantiate");
403
404            manifest.clusters.insert(
405                name_hint.to_owned(),
406                ClusterManifest {
407                    build: build_info_from_config(&bin_name, &trybuild_config),
408                    ports: cluster.exposed_ports.borrow().clone(),
409                    default_count: cluster.count,
410                    task_family_prefix: cluster.name.clone(),
411                },
412            );
413        }
414
415        manifest
416    }
417}
418
419fn build_info_from_config(
420    bin_name: &str,
421    config: &crate::compile::trybuild::generate::TrybuildConfig,
422) -> BuildConfig {
423    let mut features = vec!["hydro___feature_ecs_runtime".to_owned()];
424    if let Some(extra) = &config.features {
425        features.extend(extra.clone());
426    }
427    let crate_name = config
428        .project_dir
429        .file_name()
430        .and_then(|n| n.to_str())
431        .unwrap_or("unknown")
432        .replace("_", "-");
433
434    let package_name = format!("{}-hydro-trybuild", crate_name);
435
436    BuildConfig {
437        project_dir: config.project_dir.to_string_lossy().into_owned(),
438        target_dir: config.target_dir.to_string_lossy().into_owned(),
439        bin_name: bin_name.to_owned(),
440        package_name,
441        features,
442    }
443}
444
445impl<'a> Deploy<'a> for EcsDeploy {
446    type InstantiateEnv = Self;
447    type Process = EcsDeployProcess;
448    type Cluster = EcsDeployCluster;
449    type External = EcsDeployExternal;
450    type Meta = ();
451
452    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
453    fn o2o_sink_source(
454        _env: &mut Self::InstantiateEnv,
455        p1: &Self::Process,
456        p1_port: &<Self::Process as Node>::Port,
457        p2: &Self::Process,
458        p2_port: &<Self::Process as Node>::Port,
459        name: Option<&str>,
460        networking_info: &crate::networking::NetworkingInfo,
461    ) -> (syn::Expr, syn::Expr) {
462        match networking_info {
463            crate::networking::NetworkingInfo::Tcp {
464                fault: crate::networking::TcpFault::FailStop,
465            } => {}
466            _ => panic!("Unsupported networking info: {:?}", networking_info),
467        }
468
469        deploy_containerized_o2o(
470            &p2.name,
471            name.expect("channel name is required for containerized deployment"),
472        )
473    }
474
475    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
476    fn o2o_connect(
477        p1: &Self::Process,
478        p1_port: &<Self::Process as Node>::Port,
479        p2: &Self::Process,
480        p2_port: &<Self::Process as Node>::Port,
481    ) -> Box<dyn FnOnce()> {
482        let serialized = format!(
483            "o2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
484            p1.name, p2.name
485        );
486
487        Box::new(move || {
488            trace!(name: "o2o_connect thunk", %serialized);
489        })
490    }
491
492    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
493    fn o2m_sink_source(
494        _env: &mut Self::InstantiateEnv,
495        p1: &Self::Process,
496        p1_port: &<Self::Process as Node>::Port,
497        c2: &Self::Cluster,
498        c2_port: &<Self::Cluster as Node>::Port,
499        name: Option<&str>,
500        networking_info: &crate::networking::NetworkingInfo,
501    ) -> (syn::Expr, syn::Expr) {
502        match networking_info {
503            crate::networking::NetworkingInfo::Tcp {
504                fault: crate::networking::TcpFault::FailStop,
505            } => {}
506            _ => panic!("Unsupported networking info: {:?}", networking_info),
507        }
508
509        deploy_containerized_o2m(
510            name.expect("channel name is required for containerized deployment"),
511        )
512    }
513
514    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
515    fn o2m_connect(
516        p1: &Self::Process,
517        p1_port: &<Self::Process as Node>::Port,
518        c2: &Self::Cluster,
519        c2_port: &<Self::Cluster as Node>::Port,
520    ) -> Box<dyn FnOnce()> {
521        let serialized = format!(
522            "o2m_connect {}:{p1_port:?} -> {}:{c2_port:?}",
523            p1.name, c2.name
524        );
525
526        Box::new(move || {
527            trace!(name: "o2m_connect thunk", %serialized);
528        })
529    }
530
531    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
532    fn m2o_sink_source(
533        _env: &mut Self::InstantiateEnv,
534        c1: &Self::Cluster,
535        c1_port: &<Self::Cluster as Node>::Port,
536        p2: &Self::Process,
537        p2_port: &<Self::Process as Node>::Port,
538        name: Option<&str>,
539        networking_info: &crate::networking::NetworkingInfo,
540    ) -> (syn::Expr, syn::Expr) {
541        match networking_info {
542            crate::networking::NetworkingInfo::Tcp {
543                fault: crate::networking::TcpFault::FailStop,
544            } => {}
545            _ => panic!("Unsupported networking info: {:?}", networking_info),
546        }
547
548        deploy_containerized_m2o(
549            &p2.name,
550            name.expect("channel name is required for containerized deployment"),
551        )
552    }
553
554    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
555    fn m2o_connect(
556        c1: &Self::Cluster,
557        c1_port: &<Self::Cluster as Node>::Port,
558        p2: &Self::Process,
559        p2_port: &<Self::Process as Node>::Port,
560    ) -> Box<dyn FnOnce()> {
561        let serialized = format!(
562            "o2m_connect {}:{c1_port:?} -> {}:{p2_port:?}",
563            c1.name, p2.name
564        );
565
566        Box::new(move || {
567            trace!(name: "m2o_connect thunk", %serialized);
568        })
569    }
570
571    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
572    fn m2m_sink_source(
573        _env: &mut Self::InstantiateEnv,
574        c1: &Self::Cluster,
575        c1_port: &<Self::Cluster as Node>::Port,
576        c2: &Self::Cluster,
577        c2_port: &<Self::Cluster as Node>::Port,
578        name: Option<&str>,
579        networking_info: &crate::networking::NetworkingInfo,
580    ) -> (syn::Expr, syn::Expr) {
581        match networking_info {
582            crate::networking::NetworkingInfo::Tcp {
583                fault: crate::networking::TcpFault::FailStop,
584            } => {}
585            _ => panic!("Unsupported networking info: {:?}", networking_info),
586        }
587
588        deploy_containerized_m2m(
589            name.expect("channel name is required for containerized deployment"),
590        )
591    }
592
593    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
594    fn m2m_connect(
595        c1: &Self::Cluster,
596        c1_port: &<Self::Cluster as Node>::Port,
597        c2: &Self::Cluster,
598        c2_port: &<Self::Cluster as Node>::Port,
599    ) -> Box<dyn FnOnce()> {
600        let serialized = format!(
601            "m2m_connect {}:{c1_port:?} -> {}:{c2_port:?}",
602            c1.name, c2.name
603        );
604
605        Box::new(move || {
606            trace!(name: "m2m_connect thunk", %serialized);
607        })
608    }
609
610    #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
611    fn e2o_many_source(
612        extra_stmts: &mut Vec<syn::Stmt>,
613        p2: &Self::Process,
614        p2_port: &<Self::Process as Node>::Port,
615        codec_type: &syn::Type,
616        shared_handle: String,
617    ) -> syn::Expr {
618        p2.exposed_ports
619            .borrow_mut()
620            .insert(shared_handle.clone(), PortInfo::Tcp { port: *p2_port });
621
622        let socket_ident = syn::Ident::new(
623            &format!("__hydro_deploy_many_{}_socket", &shared_handle),
624            Span::call_site(),
625        );
626
627        let source_ident = syn::Ident::new(
628            &format!("__hydro_deploy_many_{}_source", &shared_handle),
629            Span::call_site(),
630        );
631
632        let sink_ident = syn::Ident::new(
633            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
634            Span::call_site(),
635        );
636
637        let membership_ident = syn::Ident::new(
638            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
639            Span::call_site(),
640        );
641
642        let bind_addr = format!("0.0.0.0:{}", p2_port);
643
644        extra_stmts.push(syn::parse_quote! {
645            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
646        });
647
648        let root = crate::staging_util::get_this_crate();
649
650        extra_stmts.push(syn::parse_quote! {
651            let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
652        });
653
654        parse_quote!(#source_ident)
655    }
656
657    #[instrument(level = "trace", skip_all, fields(%shared_handle))]
658    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
659        let sink_ident = syn::Ident::new(
660            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
661            Span::call_site(),
662        );
663        parse_quote!(#sink_ident)
664    }
665
666    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?codec_type, %shared_handle))]
667    fn e2o_source(
668        extra_stmts: &mut Vec<syn::Stmt>,
669        p1: &Self::External,
670        p1_port: &<Self::External as Node>::Port,
671        p2: &Self::Process,
672        p2_port: &<Self::Process as Node>::Port,
673        codec_type: &syn::Type,
674        shared_handle: String,
675    ) -> syn::Expr {
676        // Record the port for manifest export
677        p2.exposed_ports
678            .borrow_mut()
679            .insert(shared_handle.clone(), PortInfo::Tcp { port: *p2_port });
680
681        let source_ident = syn::Ident::new(
682            &format!("__hydro_deploy_{}_source", &shared_handle),
683            Span::call_site(),
684        );
685
686        let bind_addr = format!("0.0.0.0:{}", p2_port);
687
688        // Always use LazySinkSource for external connections - it creates both sink and source
689        // which is needed for bidirectional connections (unpaired: false)
690        let socket_ident = syn::Ident::new(
691            &format!("__hydro_deploy_{}_socket", &shared_handle),
692            Span::call_site(),
693        );
694
695        let sink_ident = syn::Ident::new(
696            &format!("__hydro_deploy_{}_sink", &shared_handle),
697            Span::call_site(),
698        );
699
700        extra_stmts.push(syn::parse_quote! {
701            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
702        });
703
704        let create_expr = deploy_containerized_external_sink_source_ident(bind_addr, socket_ident);
705
706        extra_stmts.push(syn::parse_quote! {
707            let (#sink_ident, #source_ident) = (#create_expr).split();
708        });
709
710        parse_quote!(#source_ident)
711    }
712
713    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
714    fn e2o_connect(
715        p1: &Self::External,
716        p1_port: &<Self::External as Node>::Port,
717        p2: &Self::Process,
718        p2_port: &<Self::Process as Node>::Port,
719        many: bool,
720        server_hint: NetworkHint,
721    ) -> Box<dyn FnOnce()> {
722        let serialized = format!(
723            "e2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
724            p1.name, p2.name
725        );
726
727        Box::new(move || {
728            trace!(name: "e2o_connect thunk", %serialized);
729        })
730    }
731
732    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
733    fn o2e_sink(
734        p1: &Self::Process,
735        p1_port: &<Self::Process as Node>::Port,
736        p2: &Self::External,
737        p2_port: &<Self::External as Node>::Port,
738        shared_handle: String,
739    ) -> syn::Expr {
740        let sink_ident = syn::Ident::new(
741            &format!("__hydro_deploy_{}_sink", &shared_handle),
742            Span::call_site(),
743        );
744        parse_quote!(#sink_ident)
745    }
746
747    #[instrument(level = "trace", skip_all, fields(%of_cluster))]
748    fn cluster_ids(
749        of_cluster: LocationKey,
750    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
751        cluster_ids()
752    }
753
754    #[instrument(level = "trace", skip_all)]
755    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
756        cluster_self_id()
757    }
758
759    #[instrument(level = "trace", skip_all, fields(?location_id))]
760    fn cluster_membership_stream(
761        _env: &mut Self::InstantiateEnv,
762        _at_location: &LocationId,
763        location_id: &LocationId,
764    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
765    {
766        cluster_membership_stream(location_id)
767    }
768}
769
770#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location))]
771fn get_ecs_image_name(name_hint: &str, location: LocationKey) -> String {
772    let name_hint = name_hint
773        .split("::")
774        .last()
775        .unwrap()
776        .to_ascii_lowercase()
777        .replace(".", "-")
778        .replace("_", "-")
779        .replace("::", "-");
780
781    format!("hy-{name_hint}-{location}")
782}
783
784/// Represents a Process running in an ecs deployment
785#[derive(Clone)]
786pub struct EcsDeployProcessSpec;
787
788impl<'a> ProcessSpec<'a, EcsDeploy> for EcsDeployProcessSpec {
789    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
790    fn build(self, id: LocationKey, name_hint: &'_ str) -> <EcsDeploy as Deploy<'a>>::Process {
791        EcsDeployProcess {
792            id,
793            name: get_ecs_image_name(name_hint, id),
794            next_port: Rc::new(RefCell::new(10001)),
795            exposed_ports: Rc::new(RefCell::new(BTreeMap::new())),
796            trybuild_config: Rc::new(RefCell::new(None)),
797        }
798    }
799}
800
801/// Represents a Cluster running across `count` ecs tasks.
802#[derive(Clone)]
803pub struct EcsDeployClusterSpec {
804    count: usize,
805}
806
807impl<'a> ClusterSpec<'a, EcsDeploy> for EcsDeployClusterSpec {
808    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
809    fn build(self, id: LocationKey, name_hint: &str) -> <EcsDeploy as Deploy<'a>>::Cluster {
810        EcsDeployCluster {
811            id,
812            name: get_ecs_image_name(name_hint, id),
813            next_port: Rc::new(RefCell::new(10001)),
814            exposed_ports: Rc::new(RefCell::new(BTreeMap::new())),
815            count: self.count,
816            trybuild_config: Rc::new(RefCell::new(None)),
817        }
818    }
819}
820
821/// Represents an external process outside of the management of hydro deploy.
822pub struct EcsDeployExternalSpec {
823    name: String,
824}
825
826impl<'a> ExternalSpec<'a, EcsDeploy> for EcsDeployExternalSpec {
827    #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
828    fn build(self, id: LocationKey, name_hint: &str) -> <EcsDeploy as Deploy<'a>>::External {
829        EcsDeployExternal {
830            name: self.name,
831            next_port: Rc::new(RefCell::new(10000)),
832        }
833    }
834}