Skip to main content

hydro_lang/sim/
compiled.rs

1//! Interfaces for compiled Hydro simulators and concrete simulation instances.
2//!
3//! NOTE: This module runs inside bolero's `catch_unwind` scope, which silently
4//! swallows panics. Internal invariant checks should use `abort_assert!`
5//! rather than `panic!`/`assert!`.
6//!
7//! TODO(mingwei): Panics inside the tick DFIR (generated code in the dylib) are
8//! also caught by bolero's `catch_unwind`. Consider a mechanism to detect and
9//! propagate those as well.
10
11/// Like `assert!`, but calls `std::process::abort()` instead of `panic!()`.
12/// Use for internal invariants that must not be silently caught by bolero.
13macro_rules! abort_assert {
14    ($cond:expr, $($arg:tt)*) => {
15        if !$cond {
16            eprintln!("Simulator internal error: {}", format!($($arg)*));
17            std::process::abort();
18        }
19    };
20}
21
22use core::{fmt, panic};
23use std::cell::{Cell, RefCell};
24use std::collections::{HashMap, VecDeque};
25use std::fmt::Debug;
26use std::panic::RefUnwindSafe;
27use std::path::Path;
28use std::pin::{Pin, pin};
29use std::rc::Rc;
30use std::task::ready;
31
32use bytes::Bytes;
33use colored::Colorize;
34use dfir_rs::scheduled::context::DfirErased;
35use futures::{Stream, StreamExt};
36use libloading::Library;
37use serde::Serialize;
38use serde::de::DeserializeOwned;
39use tempfile::TempPath;
40use tokio::sync::mpsc::UnboundedSender;
41use tokio::sync::{Mutex, Notify};
42use tokio_stream::wrappers::UnboundedReceiverStream;
43
44use super::runtime::{Hooks, InlineHooks};
45use super::{SimClusterReceiver, SimClusterSender, SimReceiver, SimSender};
46use crate::compile::builder::ExternalPortId;
47use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
48use crate::location::dynamic::LocationId;
49use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
50use crate::sim::runtime::SimHook;
51
52struct QuiescenceState {
53    /// Set to true when the scheduler reaches quiescence; reset to false when new input is sent.
54    quiescent: Cell<bool>,
55    /// Notified when the scheduler reaches quiescence (wakes receivers waiting for data).
56    quiescence_notify: Notify,
57    /// Notified when new input is sent, signaling the scheduler to resume.
58    resume_notify: Notify,
59}
60
61impl QuiescenceState {
62    /// Signal that new input has been sent, waking the scheduler if it was quiescent.
63    fn resume(&self) {
64        self.quiescent.set(false);
65        self.resume_notify.notify_waiters();
66    }
67
68    /// Whether the scheduler is currently quiescent (no more progress possible without input).
69    fn is_quiescent(&self) -> bool {
70        self.quiescent.get()
71    }
72
73    /// Returns a future that completes when the scheduler next reaches quiescence.
74    fn notified(&self) -> tokio::sync::futures::Notified<'_> {
75        self.quiescence_notify.notified()
76    }
77
78    /// Enter quiescence and wait for new input before continuing.
79    async fn wait_for_resume(&self) {
80        self.quiescent.set(true);
81        self.quiescence_notify.notify_waiters();
82        self.resume_notify.notified().await;
83        self.quiescent.set(false);
84    }
85}
86
87struct SimConnections {
88    input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
89    output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
90    cluster_input_senders: HashMap<SimExternalPort, HashMap<u32, Rc<UnboundedSender<Bytes>>>>,
91    cluster_output_receivers:
92        HashMap<SimExternalPort, HashMap<u32, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>>,
93    external_registered: HashMap<ExternalPortId, SimExternalPort>,
94    quiescence: Rc<QuiescenceState>,
95}
96
97tokio::task_local! {
98    static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
99}
100
101/// A handle to a compiled Hydro simulation, which can be instantiated and run.
102pub struct CompiledSim {
103    pub(super) _path: TempPath,
104    pub(super) lib: Library,
105    pub(super) externals_port_registry: SimExternalPortRegistry,
106    pub(super) unit_test_fuzz_iterations: usize,
107}
108
109#[sealed::sealed]
110/// A trait implemented by closures that can instantiate a compiled simulation.
111///
112/// This is needed to ensure [`RefUnwindSafe`] so instances can be created during fuzzing.
113pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
114#[sealed::sealed]
115impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
116
117fn null_handler(_args: fmt::Arguments) {}
118
119fn println_handler(args: fmt::Arguments) {
120    println!("{}", args);
121}
122
123fn eprintln_handler(args: fmt::Arguments) {
124    eprintln!("{}", args);
125}
126
127/// Creates a simulation instance, returning:
128/// - A list of async DFIRs to run (all process / cluster logic outside a tick)
129/// - A list of tick DFIRs to run (where the &'static str is for the tick location id)
130/// - A mapping of hooks for non-deterministic decisions at tick-input boundaries
131/// - A mapping of inline hooks for non-deterministic decisions inside ticks
132type SimLoaded<'a> = libloading::Symbol<
133    'a,
134    unsafe extern "Rust" fn(
135        should_color: bool,
136        external_out: &mut HashMap<usize, UnboundedReceiverStream<Bytes>>,
137        external_in: &mut HashMap<usize, UnboundedSender<Bytes>>,
138        cluster_external_out: &mut HashMap<usize, HashMap<u32, UnboundedReceiverStream<Bytes>>>,
139        cluster_external_in: &mut HashMap<usize, HashMap<u32, UnboundedSender<Bytes>>>,
140        println_handler: fn(fmt::Arguments<'_>),
141        eprintln_handler: fn(fmt::Arguments<'_>),
142    ) -> (
143        Vec<(&'static str, Option<u32>, DfirErased)>,
144        Vec<(&'static str, Option<u32>, DfirErased)>,
145        Hooks<&'static str>,
146        InlineHooks<&'static str>,
147    ),
148>;
149
150impl CompiledSim {
151    /// Executes the given closure with a single instance of the compiled simulation.
152    pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
153        self.with_instantiator(|instantiator| thunk(instantiator()), true)
154    }
155
156    /// Executes the given closure with an [`Instantiator`], which can be called to create
157    /// independent instances of the simulation. This is useful for fuzzing, where we need to
158    /// re-execute the simulation several times with different decisions.
159    ///
160    /// The `always_log` parameter controls whether to log tick executions and stream releases. If
161    /// it is `true`, logging will always be enabled. If it is `false`, logging will only be
162    /// enabled if the `HYDRO_SIM_LOG` environment variable is set to `1`.
163    pub fn with_instantiator<T>(
164        &self,
165        thunk: impl FnOnce(&dyn Instantiator) -> T,
166        always_log: bool,
167    ) -> T {
168        let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
169        let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
170        thunk(
171            &(|| CompiledSimInstance {
172                func: func.clone(),
173                externals_port_registry: self.externals_port_registry.clone(),
174                dylib_result: None,
175                log,
176            }),
177        )
178    }
179
180    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
181    /// closure will be repeatedly executed with instances of the Hydro program where the
182    /// batching boundaries, order of messages, and retries are varied.
183    ///
184    /// During development, you should run the test that invokes this function with the `cargo sim`
185    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
186    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
187    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
188    /// be executed, and if no reproducer is found a small number of random executions will be
189    /// performed.
190    pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
191        let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
192            .elements()
193            .into_iter()
194            .find(|e| {
195                !e.fn_name.starts_with("hydro_lang::sim::compiled")
196                    && !e.fn_name.starts_with("hydro_lang::sim::flow")
197                    && !e.fn_name.starts_with("fuzz<")
198                    && !e.fn_name.starts_with("<hydro_lang::sim")
199            })
200            .unwrap();
201
202        let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
203        let repro_folder = caller_path.parent().unwrap().join("sim-failures");
204
205        let caller_fuzz_repro_path = repro_folder
206            .join(caller_fn.fn_name.replace("::", "__"))
207            .with_extension("bin");
208
209        if std::env::var("BOLERO_FUZZER").is_ok() {
210            let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
211            std::fs::create_dir_all(&corpus_dir).unwrap();
212            let libfuzzer_args = format!(
213                "{} {} -artifact_prefix={}/ -handle_abrt=0",
214                corpus_dir.to_str().unwrap(),
215                corpus_dir.to_str().unwrap(),
216                corpus_dir.to_str().unwrap(),
217            );
218
219            std::fs::create_dir_all(&repro_folder).unwrap();
220
221            if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
222                unsafe {
223                    std::env::set_var(
224                        "BOLERO_FAILURE_OUTPUT",
225                        caller_fuzz_repro_path.to_str().unwrap(),
226                    );
227                }
228            }
229
230            unsafe {
231                std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
232            }
233
234            self.with_instantiator(
235                |instantiator| {
236                    bolero::test(bolero::TargetLocation {
237                        package_name: "",
238                        manifest_dir: "",
239                        module_path: "",
240                        file: "",
241                        line: 0,
242                        item_path: "<unknown>::__bolero_item_path__",
243                        test_name: None,
244                    })
245                    .run_with_replay(move |is_replay| {
246                        let mut instance = instantiator();
247
248                        if instance.log {
249                            eprintln!(
250                                "{}",
251                                "\n==== New Simulation Instance ===="
252                                    .color(colored::Color::Cyan)
253                                    .bold()
254                            );
255                        }
256
257                        if is_replay {
258                            instance.log = true;
259                        }
260
261                        tokio::runtime::Builder::new_current_thread()
262                            .build()
263                            .unwrap()
264                            .block_on(async { instance.run(&mut thunk).await })
265                    })
266                },
267                false,
268            );
269        } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
270            self.fuzz_repro(existing_bytes, async |compiled| {
271                compiled.launch();
272                thunk().await
273            });
274        } else {
275            eprintln!(
276                "Running a fuzz test without `cargo sim` and no reproducer found at {}, using {} iterations with random inputs.",
277                caller_fuzz_repro_path.display(),
278                self.unit_test_fuzz_iterations,
279            );
280            self.with_instantiator(
281                |instantiator| {
282                    bolero::test(bolero::TargetLocation {
283                        package_name: "",
284                        manifest_dir: "",
285                        module_path: "",
286                        file: ".",
287                        line: 0,
288                        item_path: "<unknown>::__bolero_item_path__",
289                        test_name: None,
290                    })
291                    .with_iterations(self.unit_test_fuzz_iterations)
292                    .run(move || {
293                        let instance = instantiator();
294                        tokio::runtime::Builder::new_current_thread()
295                            .build()
296                            .unwrap()
297                            .block_on(async { instance.run(&mut thunk).await })
298                    })
299                },
300                false,
301            );
302        }
303    }
304
305    /// Executes the given closure with a single instance of the compiled simulation, using the
306    /// provided bytes as the source of fuzzing decisions. This can be used to manually reproduce a
307    /// failure found during fuzzing.
308    pub fn fuzz_repro<'a>(
309        &'a self,
310        bytes: Vec<u8>,
311        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
312    ) {
313        self.with_instance(|instance| {
314            bolero::bolero_engine::any::scope::with(
315                Box::new(bolero::bolero_engine::driver::object::Object(
316                    bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
317                )),
318                || {
319                    tokio::runtime::Builder::new_current_thread()
320                        .build()
321                        .unwrap()
322                        .block_on(async { instance.run_without_launching(thunk).await })
323                },
324            )
325        });
326    }
327
328    /// Exhaustively searches all possible executions of the simulation. The provided
329    /// closure will be repeatedly executed with instances of the Hydro program where the
330    /// batching boundaries, order of messages, and retries are varied.
331    ///
332    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
333    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
334    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
335    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
336    ///
337    /// Returns the number of distinct executions explored.
338    pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
339        if std::env::var("BOLERO_FUZZER").is_ok() {
340            eprintln!(
341                "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
342            );
343            std::process::abort();
344        }
345
346        let mut count = 0;
347        let count_mut = &mut count;
348
349        self.with_instantiator(
350            |instantiator| {
351                bolero::test(bolero::TargetLocation {
352                    package_name: "",
353                    manifest_dir: "",
354                    module_path: "",
355                    file: "",
356                    line: 0,
357                    item_path: "<unknown>::__bolero_item_path__",
358                    test_name: None,
359                })
360                .exhaustive()
361                .run_with_replay(move |is_replay| {
362                    *count_mut += 1;
363
364                    let mut instance = instantiator();
365                    if instance.log {
366                        eprintln!(
367                            "{}",
368                            "\n==== New Simulation Instance ===="
369                                .color(colored::Color::Cyan)
370                                .bold()
371                        );
372                    }
373
374                    if is_replay {
375                        instance.log = true;
376                    }
377
378                    tokio::runtime::Builder::new_current_thread()
379                        .build()
380                        .unwrap()
381                        .block_on(async { instance.run(&mut thunk).await })
382                })
383            },
384            false,
385        );
386
387        count
388    }
389}
390
391// This must be a tuple because it is referenced from generated code in `graph.rs`.
392type DylibResult = (
393    Vec<(&'static str, Option<u32>, DfirErased)>,
394    Vec<(&'static str, Option<u32>, DfirErased)>,
395    Hooks<&'static str>,
396    InlineHooks<&'static str>,
397);
398
399/// A single instance of a compiled Hydro simulation, which provides methods to interactively
400/// execute the simulation, feed inputs, and receive outputs.
401pub struct CompiledSimInstance<'a> {
402    func: SimLoaded<'a>,
403    externals_port_registry: SimExternalPortRegistry,
404    dylib_result: Option<DylibResult>,
405    log: bool,
406}
407
408impl<'a> CompiledSimInstance<'a> {
409    async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
410        self.run_without_launching(async |instance| {
411            instance.launch();
412            thunk().await;
413        })
414        .await;
415    }
416
417    async fn run_without_launching(
418        mut self,
419        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
420    ) {
421        let mut external_out: HashMap<usize, UnboundedReceiverStream<Bytes>> = HashMap::new();
422        let mut external_in: HashMap<usize, UnboundedSender<Bytes>> = HashMap::new();
423        let mut cluster_external_out: HashMap<usize, HashMap<u32, UnboundedReceiverStream<Bytes>>> =
424            HashMap::new();
425        let mut cluster_external_in: HashMap<usize, HashMap<u32, UnboundedSender<Bytes>>> =
426            HashMap::new();
427
428        let dylib_result = unsafe {
429            (self.func)(
430                colored::control::SHOULD_COLORIZE.should_colorize(),
431                &mut external_out,
432                &mut external_in,
433                &mut cluster_external_out,
434                &mut cluster_external_in,
435                if self.log {
436                    println_handler
437                } else {
438                    null_handler
439                },
440                if self.log {
441                    eprintln_handler
442                } else {
443                    null_handler
444                },
445            )
446        };
447
448        let registered = &self.externals_port_registry.registered;
449
450        let quiescence = Rc::new(QuiescenceState {
451            quiescent: Cell::new(false),
452            quiescence_notify: Notify::new(),
453            resume_notify: Notify::new(),
454        });
455
456        let mut input_senders = HashMap::new();
457        let mut output_receivers = HashMap::new();
458        let mut cluster_input_senders = HashMap::new();
459        let mut cluster_output_receivers = HashMap::new();
460
461        #[expect(
462            clippy::disallowed_methods,
463            reason = "inserts into maps also unordered"
464        )]
465        for sim_port in registered.values() {
466            let usize_key = sim_port.into_inner();
467            if let Some(sender) = external_in.remove(&usize_key) {
468                input_senders.insert(*sim_port, Rc::new(sender));
469            }
470            if let Some(receiver) = external_out.remove(&usize_key) {
471                output_receivers.insert(*sim_port, Rc::new(Mutex::new(receiver)));
472            }
473            if let Some(senders) = cluster_external_in.remove(&usize_key) {
474                cluster_input_senders.insert(
475                    *sim_port,
476                    senders
477                        .into_iter()
478                        .map(|(member, s)| (member, Rc::new(s)))
479                        .collect(),
480                );
481            }
482            if let Some(receivers) = cluster_external_out.remove(&usize_key) {
483                cluster_output_receivers.insert(
484                    *sim_port,
485                    receivers
486                        .into_iter()
487                        .map(|(member, r)| (member, Rc::new(Mutex::new(r))))
488                        .collect(),
489                );
490            }
491        }
492
493        self.dylib_result = Some(dylib_result);
494
495        let local_set = tokio::task::LocalSet::new();
496        local_set
497            .run_until(CURRENT_SIM_CONNECTIONS.scope(
498                RefCell::new(SimConnections {
499                    input_senders,
500                    output_receivers,
501                    cluster_input_senders,
502                    cluster_output_receivers,
503                    external_registered: self.externals_port_registry.registered.clone(),
504                    quiescence: quiescence.clone(),
505                }),
506                async move {
507                    thunk(self).await;
508                },
509            ))
510            .await;
511    }
512
513    /// Launches the simulation, which will asynchronously simulate the Hydro program. This should
514    /// be invoked but before receiving any messages.
515    fn launch(self) {
516        tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
517    }
518
519    /// Returns a future that schedules simulation with the given logger for reporting the
520    /// simulation trace.
521    pub fn schedule_with_logger<W: std::io::Write>(
522        self,
523        log_writer: W,
524    ) -> impl use<W> + Future<Output = ()> {
525        self.schedule_with_maybe_logger(Some(log_writer))
526    }
527
528    fn schedule_with_maybe_logger<W: std::io::Write>(
529        mut self,
530        log_override: Option<W>,
531    ) -> impl use<W> + Future<Output = ()> {
532        let (async_dfirs, tick_dfirs, hooks, inline_hooks) = self.dylib_result.take().unwrap();
533
534        let not_ready_observation = async_dfirs
535            .iter()
536            .map(|(lid, c_id, _)| (serde_json::from_str(lid).unwrap(), *c_id))
537            .collect();
538
539        let quiescence = CURRENT_SIM_CONNECTIONS.with(|connections| {
540            let connections = connections.borrow();
541            connections.quiescence.clone()
542        });
543
544        let mut launched = LaunchedSim {
545            async_dfirs: async_dfirs
546                .into_iter()
547                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
548                .collect(),
549            possibly_ready_ticks: vec![],
550            not_ready_ticks: tick_dfirs
551                .into_iter()
552                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
553                .collect(),
554            possibly_ready_observation: vec![],
555            not_ready_observation,
556            hooks: hooks
557                .into_iter()
558                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
559                .collect(),
560            inline_hooks: inline_hooks
561                .into_iter()
562                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
563                .collect(),
564            log: if self.log {
565                if let Some(w) = log_override {
566                    LogKind::Custom(w)
567                } else {
568                    LogKind::Stderr
569                }
570            } else {
571                LogKind::Null
572            },
573            quiescence,
574        };
575
576        async move { launched.scheduler().await }
577    }
578}
579
580impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
581    fn clone(&self) -> Self {
582        *self
583    }
584}
585
586impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
587
588impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
589    async fn with_stream<Out>(
590        &self,
591        thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
592    ) -> Out {
593        let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
594            let connections = connections.borrow();
595            let port = connections.external_registered.get(&self.0).unwrap();
596            (
597                connections.output_receivers.get(port).unwrap().clone(),
598                connections.quiescence.clone(),
599            )
600        });
601
602        let mut receiver_stream = receiver.lock().await;
603        let mut notified_fut = pin!(quiescence.notified());
604        let mut quiescence_aware = futures::stream::poll_fn(|cx| {
605            use std::task::Poll;
606            match receiver_stream.poll_next_unpin(cx) {
607                Poll::Ready(Some(bytes)) => {
608                    return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
609                }
610                Poll::Ready(None) => return Poll::Ready(None),
611                Poll::Pending => {}
612            }
613            if quiescence.is_quiescent() {
614                return Poll::Ready(None);
615            }
616            let () = ready!(notified_fut.as_mut().poll(cx));
617            notified_fut.set(quiescence.notified());
618            Poll::Ready(None)
619        });
620        thunk(&mut pin!(&mut quiescence_aware)).await
621    }
622
623    /// Asserts that the stream has ended and no more messages can possibly arrive.
624    pub fn assert_no_more(self) -> impl Future<Output = ()>
625    where
626        T: Debug,
627    {
628        FutureTrackingCaller {
629            future: async move {
630                self.with_stream(async |stream| {
631                    if let Some(next) = stream.next().await {
632                        return Err(format!(
633                            "Stream yielded unexpected message: {:?}, expected termination",
634                            next
635                        ));
636                    }
637                    Ok(())
638                })
639                .await
640            },
641        }
642    }
643}
644
645impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
646    /// Receives the next message from the external bincode stream. This will wait until a message
647    /// is available, or return `None` if no more messages can possibly arrive.
648    pub async fn next(&self) -> Option<T> {
649        self.with_stream(async |stream| stream.next().await).await
650    }
651
652    /// Collects all remaining messages from the external bincode stream into a collection. This
653    /// will wait until no more messages can possibly arrive.
654    pub async fn collect<C: Default + Extend<T>>(self) -> C {
655        self.with_stream(async |stream| stream.collect().await)
656            .await
657    }
658
659    /// Asserts that the stream yields exactly the expected sequence of messages, in order.
660    /// This does not check that the stream ends, use [`Self::assert_yields_only`] for that.
661    pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
662        &self,
663        expected: I,
664    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
665    where
666        T: Debug + PartialEq<T2>,
667    {
668        FutureTrackingCaller {
669            future: async {
670                let mut expected: VecDeque<T2> = expected.into_iter().collect();
671
672                while !expected.is_empty() {
673                    if let Some(next) = self.next().await {
674                        let next_expected = expected.pop_front().unwrap();
675                        if next != next_expected {
676                            return Err(format!(
677                                "Stream yielded unexpected message: {:?}, expected: {:?}",
678                                next, next_expected
679                            ));
680                        }
681                    } else {
682                        return Err(format!(
683                            "Stream ended early, still expected: {:?}",
684                            expected
685                        ));
686                    }
687                }
688
689                Ok(())
690            },
691        }
692    }
693
694    /// Asserts that the stream yields only the expected sequence of messages, in order,
695    /// and then ends.
696    pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
697        &self,
698        expected: I,
699    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
700    where
701        T: Debug + PartialEq<T2>,
702    {
703        ChainedFuture {
704            first: self.assert_yields(expected),
705            second: self.assert_no_more(),
706            first_done: false,
707        }
708    }
709}
710
711pin_project_lite::pin_project! {
712    // A future that tracks the location of the `.await` call for better panic messages.
713    //
714    // `#[track_caller]` is important for us to create assertion methods because it makes
715    // the panic backtrace show up at that method (instead of inside the call tree within
716    // that method). This is e.g. what `Option::unwrap` uses. Unfortunately, `#[track_caller]`
717    // does not work correctly for async methods (or `dyn Future` either), so we have to
718    // create these concrete future types that (1) have `#[track_caller]` on their `poll()`
719    // method and (2) have the `panic!` triggered in their `poll()` method (or in a directly
720    // nested concrete future).
721    struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
722        #[pin]
723        future: F,
724    }
725}
726
727impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
728    type Output = ();
729
730    #[track_caller]
731    fn poll(
732        mut self: Pin<&mut Self>,
733        cx: &mut std::task::Context<'_>,
734    ) -> std::task::Poll<Self::Output> {
735        match ready!(self.as_mut().project().future.poll(cx)) {
736            Ok(()) => std::task::Poll::Ready(()),
737            Err(e) => panic!("{}", e),
738        }
739    }
740}
741
742pin_project_lite::pin_project! {
743    // A future that first awaits the first future, then the second, propagating caller info.
744    //
745    // See [`FutureTrackingCaller`] for context.
746    struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
747        #[pin]
748        first: F1,
749        #[pin]
750        second: F2,
751        first_done: bool,
752    }
753}
754
755impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
756    type Output = ();
757
758    #[track_caller]
759    fn poll(
760        mut self: Pin<&mut Self>,
761        cx: &mut std::task::Context<'_>,
762    ) -> std::task::Poll<Self::Output> {
763        if !self.first_done {
764            ready!(self.as_mut().project().first.poll(cx));
765            *self.as_mut().project().first_done = true;
766        }
767
768        self.as_mut().project().second.poll(cx)
769    }
770}
771
772impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
773    /// Collects all remaining messages from the external bincode stream into a collection,
774    /// sorting them. This will wait until no more messages can possibly arrive.
775    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
776    where
777        T: Ord,
778    {
779        self.with_stream(async |stream| {
780            let mut collected: C = stream.collect().await;
781            collected.as_mut().sort();
782            collected
783        })
784        .await
785    }
786
787    /// Asserts that the stream yields exactly the expected sequence of messages, in some order.
788    /// This does not check that the stream ends, use [`Self::assert_yields_only_unordered`] for that.
789    pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
790        &self,
791        expected: I,
792    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
793    where
794        T: Debug + PartialEq<T2>,
795    {
796        FutureTrackingCaller {
797            future: async {
798                self.with_stream(async |stream| {
799                    let mut expected: Vec<T2> = expected.into_iter().collect();
800
801                    while !expected.is_empty() {
802                        if let Some(next) = stream.next().await {
803                            let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
804                            if let Some((i, _)) = idx {
805                                expected.swap_remove(i);
806                            } else {
807                                return Err(format!(
808                                    "Stream yielded unexpected message: {:?}",
809                                    next
810                                ));
811                            }
812                        } else {
813                            return Err(format!(
814                                "Stream ended early, still expected: {:?}",
815                                expected
816                            ));
817                        }
818                    }
819
820                    Ok(())
821                })
822                .await
823            },
824        }
825    }
826
827    /// Asserts that the stream yields only the expected sequence of messages, in some order,
828    /// and then ends.
829    pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
830        &self,
831        expected: I,
832    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
833    where
834        T: Debug + PartialEq<T2>,
835    {
836        ChainedFuture {
837            first: self.assert_yields_unordered(expected),
838            second: self.assert_no_more(),
839            first_done: false,
840        }
841    }
842}
843
844impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
845    fn with_sink<Out>(
846        &self,
847        thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
848    ) -> Out {
849        let (sender, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
850            let connections = connections.borrow();
851            (
852                connections
853                    .input_senders
854                    .get(connections.external_registered.get(&self.0).unwrap())
855                    .unwrap()
856                    .clone(),
857                connections.quiescence.clone(),
858            )
859        });
860
861        thunk(&move |t| {
862            let res = sender.send(bincode::serialize(&t).unwrap().into());
863            quiescence.resume();
864            res
865        })
866    }
867}
868
869impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
870    /// Sends several messages to the external bincode sink. The messages will be asynchronously
871    /// processed as part of the simulation, in non-deterministic order.
872    pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
873        self.with_sink(|send| {
874            for t in iter {
875                send(t).unwrap();
876            }
877        })
878    }
879}
880
881impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
882    /// Sends a message to the external bincode sink. The message will be asynchronously processed
883    /// as part of the simulation.
884    pub fn send(&self, t: T) {
885        self.with_sink(|send| send(t)).unwrap();
886    }
887
888    /// Sends several messages to the external bincode sink. The messages will be asynchronously
889    /// processed as part of the simulation.
890    pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
891        self.with_sink(|send| {
892            for t in iter {
893                send(t).unwrap();
894            }
895        })
896    }
897}
898
899impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone
900    for SimClusterReceiver<T, O, R>
901{
902    fn clone(&self) -> Self {
903        *self
904    }
905}
906
907impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy
908    for SimClusterReceiver<T, O, R>
909{
910}
911
912impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterReceiver<T, O, R> {
913    async fn with_member_stream<Out>(
914        &self,
915        member_id: u32,
916        thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
917    ) -> Out {
918        let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
919            let connections = connections.borrow();
920            let port = connections.external_registered.get(&self.0).unwrap();
921            let receivers = connections.cluster_output_receivers.get(port).unwrap();
922            (
923                receivers[&member_id].clone(),
924                connections.quiescence.clone(),
925            )
926        });
927
928        let mut lock = receiver.lock().await;
929        let mut notified_fut = pin!(quiescence.notified());
930        let mut quiescence_aware = futures::stream::poll_fn(|cx| {
931            use std::task::Poll;
932            match lock.poll_next_unpin(cx) {
933                Poll::Ready(Some(bytes)) => {
934                    return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
935                }
936                Poll::Ready(None) => return Poll::Ready(None),
937                Poll::Pending => {}
938            }
939            if quiescence.is_quiescent() {
940                return Poll::Ready(None);
941            }
942            let () = ready!(notified_fut.as_mut().poll(cx));
943            notified_fut.set(quiescence.notified());
944            Poll::Ready(None)
945        });
946        thunk(&mut pin!(&mut quiescence_aware)).await
947    }
948}
949
950impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, TotalOrder, ExactlyOnce> {
951    /// Receives the next value from a specific cluster member.
952    pub async fn next(&self, member_id: u32) -> Option<T> {
953        self.with_member_stream(member_id, async |stream| stream.next().await)
954            .await
955    }
956
957    /// Collects all remaining values from a specific cluster member into a collection.
958    pub async fn collect<C: Default + Extend<T>>(self, member_id: u32) -> C {
959        self.with_member_stream(member_id, async |stream| stream.collect().await)
960            .await
961    }
962}
963
964impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, NoOrder, ExactlyOnce> {
965    /// Collects all remaining values from a specific cluster member, sorted.
966    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self, member_id: u32) -> C
967    where
968        T: Ord,
969    {
970        self.with_member_stream(member_id, async |stream| {
971            let mut collected: C = stream.collect().await;
972            collected.as_mut().sort();
973            collected
974        })
975        .await
976    }
977}
978
979impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterSender<T, O, R> {
980    fn with_sink<Out>(
981        &self,
982        thunk: impl FnOnce(
983            &dyn Fn(u32, T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>,
984        ) -> Out,
985    ) -> Out {
986        let (senders, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
987            let connections = connections.borrow();
988            (
989                connections
990                    .cluster_input_senders
991                    .get(connections.external_registered.get(&self.0).unwrap())
992                    .unwrap()
993                    .clone(),
994                connections.quiescence.clone(),
995            )
996        });
997
998        thunk(&move |member_id: u32, t: T| {
999            let payload = bincode::serialize(&t).unwrap();
1000            let res = senders[&member_id].send(Bytes::from(payload));
1001            quiescence.resume();
1002            res
1003        })
1004    }
1005}
1006
1007impl<T: Serialize + DeserializeOwned> SimClusterSender<T, TotalOrder, ExactlyOnce> {
1008    /// Sends a value to a specific cluster member.
1009    pub fn send(&self, member_id: u32, t: T) {
1010        self.with_sink(|send| send(member_id, t)).unwrap();
1011    }
1012
1013    /// Sends multiple values to specific cluster members.
1014    pub fn send_many<I: IntoIterator<Item = (u32, T)>>(&self, iter: I) {
1015        self.with_sink(|send| {
1016            for (member_id, t) in iter {
1017                send(member_id, t).unwrap();
1018            }
1019        })
1020    }
1021}
1022
1023enum LogKind<W: std::io::Write> {
1024    Null,
1025    Stderr,
1026    Custom(W),
1027}
1028
1029// via https://www.reddit.com/r/rust/comments/t69sld/is_there_a_way_to_allow_either_stdfmtwrite_or/
1030impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
1031    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
1032        match self {
1033            LogKind::Null => Ok(()),
1034            LogKind::Stderr => {
1035                eprint!("{}", s);
1036                Ok(())
1037            }
1038            LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
1039        }
1040    }
1041}
1042
1043/// A running simulation, which manages the async DFIRs, tick DFIRs, and hook-based
1044/// scheduling decisions for non-deterministic operators like `batch` and `assume_ordering`.
1045///
1046/// The scheduler loops between three kinds of work:
1047/// - **Async DFIRs**: long-running top-level dataflows (one per process/cluster member) that
1048///   produce data consumed by ticks and observations.
1049/// - **Ticks**: tick-scoped DFIRs that execute a single tick. Before running, their associated
1050///   hooks (e.g. from `batch`) are resolved to decide what data to release into the tick.
1051/// - **Observations**: top-level locations that have hooks (e.g. from `assume_ordering` on a
1052///   non-tick stream) needing decisions, but no tick DFIR to execute. The scheduler just
1053///   resolves their hooks.
1054struct LaunchedSim<W: std::io::Write> {
1055    /// Top-level async DFIRs, one per process/cluster member. These run continuously and
1056    /// produce data that feeds into ticks and observations.
1057    async_dfirs: Vec<(LocationId, Option<u32>, DfirErased)>,
1058    /// Tick DFIRs whose parent async DFIR has made progress, so they may be ready to run.
1059    /// The scheduler further filters these by checking whether their hooks have pending decisions.
1060    possibly_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1061    /// Tick DFIRs whose parent async DFIR has not yet made progress since they were last checked.
1062    not_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1063    /// Top-level locations whose async DFIR has made progress and whose hooks (from top-level
1064    /// `assume_ordering`) may have ordering decisions to resolve. Unlike ticks, these have no
1065    /// DFIR to execute — only hook resolution.
1066    possibly_ready_observation: Vec<(LocationId, Option<u32>)>,
1067    /// Top-level locations whose async DFIR has not yet made progress since they were last checked.
1068    not_ready_observation: Vec<(LocationId, Option<u32>)>,
1069    /// Hooks keyed by (location, cluster_member_id). These are resolved *before* a tick runs
1070    /// (for `batch` hooks) or standalone (for top-level `assume_ordering` hooks via observations).
1071    hooks: Hooks<LocationId>,
1072    /// Inline hooks keyed by (tick location, cluster_member_id). These are resolved *during*
1073    /// tick execution via a `tokio::select!` loop, for operators like `assume_ordering` inside
1074    /// a tick that block on ordering decisions while the tick DFIR is running.
1075    inline_hooks: InlineHooks<LocationId>,
1076    log: LogKind<W>,
1077    /// Represents quiescence state of the simulation.
1078    quiescence: Rc<QuiescenceState>,
1079}
1080
1081impl<W: std::io::Write> LaunchedSim<W> {
1082    async fn scheduler(&mut self) {
1083        loop {
1084            tokio::task::yield_now().await;
1085            let mut any_made_progress = false;
1086            for (loc, c_id, dfir) in &mut self.async_dfirs {
1087                if dfir.run_tick().await {
1088                    any_made_progress = true;
1089                    let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
1090                        .not_ready_ticks
1091                        .drain(..)
1092                        .partition(|(tick_loc, tick_c_id, _)| {
1093                            let LocationId::Tick(_, outer) = tick_loc else {
1094                                unreachable!()
1095                            };
1096                            outer.as_ref() == loc && tick_c_id == c_id
1097                        });
1098
1099                    self.possibly_ready_ticks.extend(now_ready);
1100                    self.not_ready_ticks.extend(still_not_ready);
1101
1102                    let (now_ready_obs, still_not_ready_obs): (Vec<_>, Vec<_>) = self
1103                        .not_ready_observation
1104                        .drain(..)
1105                        .partition(|(obs_loc, obs_c_id)| obs_loc == loc && obs_c_id == c_id);
1106
1107                    self.possibly_ready_observation.extend(now_ready_obs);
1108                    self.not_ready_observation.extend(still_not_ready_obs);
1109                }
1110            }
1111
1112            if any_made_progress {
1113                continue;
1114            } else {
1115                use bolero::generator::*;
1116
1117                let (ready_tick, mut not_ready_tick): (Vec<_>, Vec<_>) = self
1118                    .possibly_ready_ticks
1119                    .drain(..)
1120                    .partition(|(name, cid, _)| {
1121                        let hooks = self.hooks.get(&(name.clone(), *cid)).unwrap();
1122                        // All hooks must be ready (have received input or have a last value)
1123                        hooks.iter().all(|hook| hook.is_ready())
1124                            // And at least one hook must be able to make progress
1125                            && hooks.iter().any(|hook| {
1126                                hook.current_decision().unwrap_or(false)
1127                                    || hook.can_make_nontrivial_decision()
1128                            })
1129                    });
1130
1131                self.possibly_ready_ticks = ready_tick;
1132                self.not_ready_ticks.append(&mut not_ready_tick);
1133
1134                let (ready_obs, mut not_ready_obs): (Vec<_>, Vec<_>) = self
1135                    .possibly_ready_observation
1136                    .drain(..)
1137                    .partition(|(name, cid)| {
1138                        self.hooks
1139                            .get(&(name.clone(), *cid))
1140                            .into_iter()
1141                            .flatten()
1142                            .any(|hook| {
1143                                hook.current_decision().unwrap_or(false)
1144                                    || hook.can_make_nontrivial_decision()
1145                            })
1146                    });
1147
1148                self.possibly_ready_observation = ready_obs;
1149                self.not_ready_observation.append(&mut not_ready_obs);
1150
1151                if self.possibly_ready_ticks.is_empty()
1152                    && self.possibly_ready_observation.is_empty()
1153                {
1154                    // If any tick is blocked because a hook is not ready, that's a
1155                    // simulator bug — it means a singleton never received a value.
1156                    for (name, cid, _) in &self.not_ready_ticks {
1157                        let hooks = self.hooks.get(&(name.clone(), *cid)).unwrap();
1158                        abort_assert!(
1159                            hooks.iter().all(|hook| hook.is_ready()),
1160                            "tick has a hook that never became ready"
1161                        );
1162                    }
1163
1164                    // Signal quiescence and wait for new input.
1165                    self.quiescence.wait_for_resume().await;
1166                } else {
1167                    let next_tick_or_obs = (0..(self.possibly_ready_ticks.len()
1168                        + self.possibly_ready_observation.len()))
1169                        .any();
1170
1171                    if next_tick_or_obs < self.possibly_ready_ticks.len() {
1172                        let next_tick = next_tick_or_obs;
1173                        let mut removed = self.possibly_ready_ticks.remove(next_tick);
1174
1175                        match &mut self.log {
1176                            LogKind::Null => {}
1177                            LogKind::Stderr => {
1178                                if let Some(cid) = &removed.1 {
1179                                    eprintln!(
1180                                        "\n{}",
1181                                        format!("Running Tick (Cluster Member {})", cid)
1182                                            .color(colored::Color::Magenta)
1183                                            .bold()
1184                                    )
1185                                } else {
1186                                    eprintln!(
1187                                        "\n{}",
1188                                        "Running Tick".color(colored::Color::Magenta).bold()
1189                                    )
1190                                }
1191                            }
1192                            LogKind::Custom(writer) => {
1193                                writeln!(
1194                                    writer,
1195                                    "\n{}",
1196                                    "Running Tick".color(colored::Color::Magenta).bold()
1197                                )
1198                                .unwrap();
1199                            }
1200                        }
1201
1202                        let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
1203                            write.write_str(&"*".color(colored::Color::Magenta).bold())?;
1204                            write.write_str(" ")
1205                        };
1206
1207                        let mut tick_decision_writer = indenter::indented(&mut self.log)
1208                            .with_format(indenter::Format::Custom {
1209                                inserter: &mut asterisk_indenter,
1210                            });
1211
1212                        let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
1213                        run_hooks(&mut tick_decision_writer, hooks);
1214
1215                        let run_tick_future = removed.2.run_tick();
1216                        if let Some(inline_hooks) =
1217                            self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
1218                        {
1219                            let mut run_tick_future_pinned = pin!(run_tick_future);
1220
1221                            loop {
1222                                tokio::select! {
1223                                    biased;
1224                                    r = &mut run_tick_future_pinned => {
1225                                        abort_assert!(r, "tick DFIR run_tick() returned false");
1226                                        break;
1227                                    }
1228                                    _ = async {} => {
1229                                        bolero_generator::any::scope::borrow_with(|driver| {
1230                                            for hook in inline_hooks.iter_mut() {
1231                                                if hook.pending_decision() {
1232                                                    if !hook.has_decision() {
1233                                                        hook.autonomous_decision(driver);
1234                                                    }
1235
1236                                                    hook.release_decision(&mut tick_decision_writer);
1237                                                }
1238                                            }
1239                                        });
1240                                    }
1241                                }
1242                            }
1243                        } else {
1244                            abort_assert!(
1245                                run_tick_future.await,
1246                                "tick DFIR run_tick() returned false"
1247                            );
1248                        }
1249
1250                        self.possibly_ready_ticks.push(removed);
1251                    } else {
1252                        let next_obs = next_tick_or_obs - self.possibly_ready_ticks.len();
1253                        let mut default_hooks = vec![];
1254                        let hooks = self
1255                            .hooks
1256                            .get_mut(&self.possibly_ready_observation[next_obs])
1257                            .unwrap_or(&mut default_hooks);
1258
1259                        run_hooks(&mut self.log, hooks);
1260                    }
1261                }
1262            }
1263        }
1264    }
1265}
1266
1267fn run_hooks(tick_decision_writer: &mut impl std::fmt::Write, hooks: &mut Vec<Box<dyn SimHook>>) {
1268    let mut remaining_decision_count = hooks.len();
1269    let mut made_nontrivial_decision = false;
1270
1271    bolero::generator::bolero_generator::any::scope::borrow_with(|driver| {
1272        // first, scan manual decisions
1273        hooks.iter_mut().for_each(|hook| {
1274            if let Some(is_nontrivial) = hook.current_decision() {
1275                made_nontrivial_decision |= is_nontrivial;
1276                remaining_decision_count -= 1;
1277            } else if !hook.can_make_nontrivial_decision() {
1278                // if no nontrivial decision is possible, make a trivial one
1279                // (we need to do this in the first pass to force nontrivial decisions
1280                // on the remaining hooks)
1281                hook.autonomous_decision(driver, false);
1282                remaining_decision_count -= 1;
1283            }
1284        });
1285
1286        hooks.iter_mut().for_each(|hook| {
1287            if hook.current_decision().is_none() {
1288                made_nontrivial_decision |= hook.autonomous_decision(
1289                    driver,
1290                    !made_nontrivial_decision && remaining_decision_count == 1,
1291                );
1292                remaining_decision_count -= 1;
1293            }
1294
1295            hook.release_decision(tick_decision_writer);
1296        });
1297    });
1298}