1use std::any::type_name;
2use std::cell::RefCell;
3use std::marker::PhantomData;
4use std::rc::Rc;
5
6use slotmap::{SecondaryMap, SlotMap};
7
8#[cfg(feature = "build")]
9use super::compiled::CompiledFlow;
10#[cfg(feature = "build")]
11use super::deploy::{DeployFlow, DeployResult};
12#[cfg(feature = "build")]
13use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
14use super::ir::HydroRoot;
15use crate::location::{Cluster, External, LocationKey, LocationType, Process};
16
17pub enum Sidecar {
20 Simple {
22 location_key: LocationKey,
23 future_expr: Box<syn::Expr>,
24 },
25 Bidi {
29 location_key: LocationKey,
30 sidecar_id: SidecarId,
31 sidecar_closure: Box<syn::Expr>,
32 },
33}
34#[cfg(feature = "sim")]
35#[cfg(stageleft_runtime)]
36use crate::sim::flow::SimFlow;
37use crate::staging_util::Invariant;
38
39#[stageleft::export(ExternalPortId, CycleId, ClockId, SidecarId)]
40crate::newtype_counter! {
41 pub struct ExternalPortId(usize);
43
44 pub struct CycleId(usize);
46
47 pub struct ClockId(usize);
49
50 pub struct SidecarId(usize);
52}
53
54impl CycleId {
55 #[cfg(feature = "build")]
56 pub(crate) fn as_ident(&self) -> syn::Ident {
57 syn::Ident::new(&format!("cycle_{}", self), proc_macro2::Span::call_site())
58 }
59}
60
61impl SidecarId {
62 pub fn idents(&self) -> (syn::Ident, syn::Ident) {
64 let span = proc_macro2::Span::call_site();
65 (
66 syn::Ident::new(&format!("__hydro_sidecar_{}_stream", self), span),
67 syn::Ident::new(&format!("__hydro_sidecar_{}_sink", self), span),
68 )
69 }
70}
71
72pub(crate) type FlowState = Rc<RefCell<FlowStateInner>>;
73
74pub(crate) struct FlowStateInner {
75 roots: Option<Vec<HydroRoot>>,
79
80 next_external_port: ExternalPortId,
82
83 next_cycle_id: CycleId,
85
86 next_clock_id: ClockId,
88
89 next_sidecar_id: SidecarId,
91
92 pub sidecars: Vec<Sidecar>,
95}
96
97impl FlowStateInner {
98 pub fn next_external_port(&mut self) -> ExternalPortId {
99 self.next_external_port.get_and_increment()
100 }
101
102 pub fn next_cycle_id(&mut self) -> CycleId {
103 self.next_cycle_id.get_and_increment()
104 }
105
106 pub fn next_clock_id(&mut self) -> ClockId {
107 self.next_clock_id.get_and_increment()
108 }
109
110 pub fn next_sidecar_id(&mut self) -> SidecarId {
111 self.next_sidecar_id.get_and_increment()
112 }
113
114 pub fn push_root(&mut self, root: HydroRoot) {
115 self.roots
116 .as_mut()
117 .expect("Attempted to add a root to a flow that has already been finalized. No roots can be added after the flow has been compiled.")
118 .push(root);
119 }
120
121 pub fn try_push_root(&mut self, root: HydroRoot) {
122 if let Some(roots) = self.roots.as_mut() {
123 roots.push(root);
124 }
125 }
126}
127
128pub struct FlowBuilder<'a> {
129 flow_state: FlowState,
131
132 locations: SlotMap<LocationKey, LocationType>,
134 location_names: SecondaryMap<LocationKey, String>,
136
137 #[cfg_attr(
139 not(feature = "build"),
140 expect(dead_code, reason = "unused without build")
141 )]
142 flow_name: String,
143
144 finalized: bool,
147
148 _phantom: Invariant<'a>,
153}
154
155impl Drop for FlowBuilder<'_> {
156 fn drop(&mut self) {
157 if !self.finalized && !std::thread::panicking() {
158 panic!(
159 "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
160 );
161 }
162 }
163}
164
165#[expect(missing_docs, reason = "TODO")]
166impl<'a> FlowBuilder<'a> {
167 #[expect(
169 clippy::new_without_default,
170 reason = "call `new` explicitly, not `default`"
171 )]
172 pub fn new() -> Self {
173 let mut name = std::env::var("CARGO_PKG_NAME").unwrap_or_else(|_| "unknown".to_owned());
174 if let Ok(bin_path) = std::env::current_exe()
175 && let Some(bin_name) = bin_path.file_stem()
176 {
177 name = format!("{}/{}", name, bin_name.display());
178 }
179 Self::with_name(name)
180 }
181
182 pub fn with_name(name: impl Into<String>) -> Self {
184 Self {
185 flow_state: Rc::new(RefCell::new(FlowStateInner {
186 roots: Some(vec![]),
187 next_external_port: ExternalPortId::default(),
188 next_cycle_id: CycleId::default(),
189 next_clock_id: ClockId::default(),
190 next_sidecar_id: SidecarId::default(),
191 sidecars: Vec::new(),
192 })),
193 locations: SlotMap::with_key(),
194 location_names: SecondaryMap::new(),
195 flow_name: name.into(),
196 finalized: false,
197 _phantom: PhantomData,
198 }
199 }
200
201 pub(crate) fn flow_state(&self) -> &FlowState {
202 &self.flow_state
203 }
204
205 pub fn process<P>(&mut self) -> Process<'a, P> {
206 let key = self.locations.insert(LocationType::Process);
207 self.location_names.insert(key, type_name::<P>().to_owned());
208 Process {
209 key,
210 flow_state: self.flow_state().clone(),
211 _phantom: PhantomData,
212 }
213 }
214
215 pub fn cluster<C>(&mut self) -> Cluster<'a, C> {
216 let key = self.locations.insert(LocationType::Cluster);
217 self.location_names.insert(key, type_name::<C>().to_owned());
218 Cluster {
219 key,
220 flow_state: self.flow_state().clone(),
221 _phantom: PhantomData,
222 }
223 }
224
225 pub fn external<E>(&mut self) -> External<'a, E> {
226 let key = self.locations.insert(LocationType::External);
227 self.location_names.insert(key, type_name::<E>().to_owned());
228 External {
229 key,
230 flow_state: self.flow_state().clone(),
231 _phantom: PhantomData,
232 }
233 }
234}
235
236#[cfg(feature = "build")]
237#[cfg_attr(docsrs, doc(cfg(feature = "build")))]
238#[expect(missing_docs, reason = "TODO")]
239impl<'a> FlowBuilder<'a> {
240 pub fn finalize(mut self) -> super::built::BuiltFlow<'a> {
241 self.finalized = true;
242
243 let mut flow_state = self.flow_state.borrow_mut();
244 let mut ir = flow_state.roots.take().unwrap();
245 let sidecars = std::mem::take(&mut flow_state.sidecars);
246 drop(flow_state);
247
248 super::ir::unify_atomic_ticks(&mut ir);
249
250 super::built::BuiltFlow {
251 ir,
252 locations: std::mem::take(&mut self.locations),
253 location_names: std::mem::take(&mut self.location_names),
254 sidecars,
255 flow_name: std::mem::take(&mut self.flow_name),
256 _phantom: PhantomData,
257 }
258 }
259
260 pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
261 self.finalize().with_default_optimize()
262 }
263
264 pub fn optimize_with(self, f: impl FnOnce(&mut [HydroRoot])) -> super::built::BuiltFlow<'a> {
265 self.finalize().optimize_with(f)
266 }
267
268 pub fn with_process<P, D: Deploy<'a>>(
269 self,
270 process: &Process<P>,
271 spec: impl IntoProcessSpec<'a, D>,
272 ) -> DeployFlow<'a, D> {
273 self.with_default_optimize().with_process(process, spec)
274 }
275
276 pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
277 self,
278 spec: impl Fn() -> S,
279 ) -> DeployFlow<'a, D> {
280 self.with_default_optimize().with_remaining_processes(spec)
281 }
282
283 pub fn with_external<P, D: Deploy<'a>>(
284 self,
285 process: &External<P>,
286 spec: impl ExternalSpec<'a, D>,
287 ) -> DeployFlow<'a, D> {
288 self.with_default_optimize().with_external(process, spec)
289 }
290
291 pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
292 self,
293 spec: impl Fn() -> S,
294 ) -> DeployFlow<'a, D> {
295 self.with_default_optimize().with_remaining_externals(spec)
296 }
297
298 pub fn with_cluster<C, D: Deploy<'a>>(
299 self,
300 cluster: &Cluster<C>,
301 spec: impl ClusterSpec<'a, D>,
302 ) -> DeployFlow<'a, D> {
303 self.with_default_optimize().with_cluster(cluster, spec)
304 }
305
306 pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
307 self,
308 spec: impl Fn() -> S,
309 ) -> DeployFlow<'a, D> {
310 self.with_default_optimize().with_remaining_clusters(spec)
311 }
312
313 pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
314 self.with_default_optimize::<D>().compile()
315 }
316
317 pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
318 self.with_default_optimize().deploy(env)
319 }
320
321 #[cfg(feature = "sim")]
322 pub fn sim(self) -> SimFlow<'a> {
325 self.finalize().sim()
326 }
327
328 pub fn from_built<'b>(built: &super::built::BuiltFlow) -> FlowBuilder<'b> {
329 FlowBuilder {
330 flow_state: Rc::new(RefCell::new(FlowStateInner {
331 roots: None,
332 next_external_port: ExternalPortId::default(),
333 next_cycle_id: CycleId::default(),
334 next_clock_id: ClockId::default(),
335 next_sidecar_id: SidecarId::default(),
336 sidecars: Vec::new(),
337 })),
338 locations: built.locations.clone(),
339 location_names: built.location_names.clone(),
340 flow_name: built.flow_name.clone(),
341 finalized: false,
342 _phantom: PhantomData,
343 }
344 }
345
346 #[doc(hidden)] pub fn replace_ir(&mut self, roots: Vec<HydroRoot>) {
348 self.flow_state.borrow_mut().roots = Some(roots);
349 }
350
351 #[doc(hidden)] pub fn next_clock_id(&mut self) -> ClockId {
353 self.flow_state.borrow_mut().next_clock_id()
354 }
355
356 #[doc(hidden)] pub fn next_cycle_id(&mut self) -> CycleId {
358 self.flow_state.borrow_mut().next_cycle_id()
359 }
360}