1macro_rules! abort_assert {
14 ($cond:expr, $($arg:tt)*) => {
15 if !$cond {
16 eprintln!("Simulator internal error: {}", format!($($arg)*));
17 std::process::abort();
18 }
19 };
20}
21
22use core::{fmt, panic};
23use std::cell::{Cell, RefCell};
24use std::collections::{HashMap, VecDeque};
25use std::fmt::Debug;
26use std::panic::RefUnwindSafe;
27use std::path::Path;
28use std::pin::{Pin, pin};
29use std::rc::Rc;
30use std::task::ready;
31
32use bytes::Bytes;
33use colored::Colorize;
34use dfir_rs::scheduled::context::DfirErased;
35use futures::{Stream, StreamExt};
36use libloading::Library;
37use serde::Serialize;
38use serde::de::DeserializeOwned;
39use tempfile::TempPath;
40use tokio::sync::mpsc::UnboundedSender;
41use tokio::sync::{Mutex, Notify};
42use tokio_stream::wrappers::UnboundedReceiverStream;
43
44use super::runtime::{Hooks, InlineHooks};
45use super::{SimClusterReceiver, SimClusterSender, SimReceiver, SimSender};
46use crate::compile::builder::ExternalPortId;
47use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
48use crate::location::dynamic::LocationId;
49use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
50use crate::sim::runtime::SimHook;
51
52struct QuiescenceState {
53 quiescent: Cell<bool>,
55 quiescence_notify: Notify,
57 resume_notify: Notify,
59}
60
61impl QuiescenceState {
62 fn resume(&self) {
64 self.quiescent.set(false);
65 self.resume_notify.notify_waiters();
66 }
67
68 fn is_quiescent(&self) -> bool {
70 self.quiescent.get()
71 }
72
73 fn notified(&self) -> tokio::sync::futures::Notified<'_> {
75 self.quiescence_notify.notified()
76 }
77
78 async fn wait_for_resume(&self) {
80 self.quiescent.set(true);
81 self.quiescence_notify.notify_waiters();
82 self.resume_notify.notified().await;
83 self.quiescent.set(false);
84 }
85}
86
87struct SimConnections {
88 input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
89 output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
90 cluster_input_senders: HashMap<SimExternalPort, HashMap<u32, Rc<UnboundedSender<Bytes>>>>,
91 cluster_output_receivers:
92 HashMap<SimExternalPort, HashMap<u32, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>>,
93 external_registered: HashMap<ExternalPortId, SimExternalPort>,
94 quiescence: Rc<QuiescenceState>,
95}
96
97tokio::task_local! {
98 static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
99}
100
101pub struct CompiledSim {
103 pub(super) _path: TempPath,
104 pub(super) lib: Library,
105 pub(super) externals_port_registry: SimExternalPortRegistry,
106 pub(super) unit_test_fuzz_iterations: usize,
107}
108
109#[sealed::sealed]
110pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
114#[sealed::sealed]
115impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
116
117fn null_handler(_args: fmt::Arguments) {}
118
119fn println_handler(args: fmt::Arguments) {
120 println!("{}", args);
121}
122
123fn eprintln_handler(args: fmt::Arguments) {
124 eprintln!("{}", args);
125}
126
127type SimLoaded<'a> = libloading::Symbol<
133 'a,
134 unsafe extern "Rust" fn(
135 should_color: bool,
136 external_out: &mut HashMap<usize, UnboundedReceiverStream<Bytes>>,
137 external_in: &mut HashMap<usize, UnboundedSender<Bytes>>,
138 cluster_external_out: &mut HashMap<usize, HashMap<u32, UnboundedReceiverStream<Bytes>>>,
139 cluster_external_in: &mut HashMap<usize, HashMap<u32, UnboundedSender<Bytes>>>,
140 println_handler: fn(fmt::Arguments<'_>),
141 eprintln_handler: fn(fmt::Arguments<'_>),
142 ) -> (
143 Vec<(&'static str, Option<u32>, DfirErased)>,
144 Vec<(&'static str, Option<u32>, DfirErased)>,
145 Hooks<&'static str>,
146 InlineHooks<&'static str>,
147 ),
148>;
149
150impl CompiledSim {
151 pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
153 self.with_instantiator(|instantiator| thunk(instantiator()), true)
154 }
155
156 pub fn with_instantiator<T>(
164 &self,
165 thunk: impl FnOnce(&dyn Instantiator) -> T,
166 always_log: bool,
167 ) -> T {
168 let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
169 let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
170 thunk(
171 &(|| CompiledSimInstance {
172 func: func.clone(),
173 externals_port_registry: self.externals_port_registry.clone(),
174 dylib_result: None,
175 log,
176 }),
177 )
178 }
179
180 pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
191 let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
192 .elements()
193 .into_iter()
194 .find(|e| {
195 !e.fn_name.starts_with("hydro_lang::sim::compiled")
196 && !e.fn_name.starts_with("hydro_lang::sim::flow")
197 && !e.fn_name.starts_with("fuzz<")
198 && !e.fn_name.starts_with("<hydro_lang::sim")
199 })
200 .unwrap();
201
202 let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
203 let repro_folder = caller_path.parent().unwrap().join("sim-failures");
204
205 let caller_fuzz_repro_path = repro_folder
206 .join(caller_fn.fn_name.replace("::", "__"))
207 .with_extension("bin");
208
209 if std::env::var("BOLERO_FUZZER").is_ok() {
210 let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
211 std::fs::create_dir_all(&corpus_dir).unwrap();
212 let libfuzzer_args = format!(
213 "{} {} -artifact_prefix={}/ -handle_abrt=0",
214 corpus_dir.to_str().unwrap(),
215 corpus_dir.to_str().unwrap(),
216 corpus_dir.to_str().unwrap(),
217 );
218
219 std::fs::create_dir_all(&repro_folder).unwrap();
220
221 if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
222 unsafe {
223 std::env::set_var(
224 "BOLERO_FAILURE_OUTPUT",
225 caller_fuzz_repro_path.to_str().unwrap(),
226 );
227 }
228 }
229
230 unsafe {
231 std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
232 }
233
234 self.with_instantiator(
235 |instantiator| {
236 bolero::test(bolero::TargetLocation {
237 package_name: "",
238 manifest_dir: "",
239 module_path: "",
240 file: "",
241 line: 0,
242 item_path: "<unknown>::__bolero_item_path__",
243 test_name: None,
244 })
245 .run_with_replay(move |is_replay| {
246 let mut instance = instantiator();
247
248 if instance.log {
249 eprintln!(
250 "{}",
251 "\n==== New Simulation Instance ===="
252 .color(colored::Color::Cyan)
253 .bold()
254 );
255 }
256
257 if is_replay {
258 instance.log = true;
259 }
260
261 tokio::runtime::Builder::new_current_thread()
262 .build()
263 .unwrap()
264 .block_on(async { instance.run(&mut thunk).await })
265 })
266 },
267 false,
268 );
269 } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
270 self.fuzz_repro(existing_bytes, async |compiled| {
271 compiled.launch();
272 thunk().await
273 });
274 } else {
275 eprintln!(
276 "Running a fuzz test without `cargo sim` and no reproducer found at {}, using {} iterations with random inputs.",
277 caller_fuzz_repro_path.display(),
278 self.unit_test_fuzz_iterations,
279 );
280 self.with_instantiator(
281 |instantiator| {
282 bolero::test(bolero::TargetLocation {
283 package_name: "",
284 manifest_dir: "",
285 module_path: "",
286 file: ".",
287 line: 0,
288 item_path: "<unknown>::__bolero_item_path__",
289 test_name: None,
290 })
291 .with_iterations(self.unit_test_fuzz_iterations)
292 .run(move || {
293 let instance = instantiator();
294 tokio::runtime::Builder::new_current_thread()
295 .build()
296 .unwrap()
297 .block_on(async { instance.run(&mut thunk).await })
298 })
299 },
300 false,
301 );
302 }
303 }
304
305 pub fn fuzz_repro<'a>(
309 &'a self,
310 bytes: Vec<u8>,
311 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
312 ) {
313 self.with_instance(|instance| {
314 bolero::bolero_engine::any::scope::with(
315 Box::new(bolero::bolero_engine::driver::object::Object(
316 bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
317 )),
318 || {
319 tokio::runtime::Builder::new_current_thread()
320 .build()
321 .unwrap()
322 .block_on(async { instance.run_without_launching(thunk).await })
323 },
324 )
325 });
326 }
327
328 pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
339 if std::env::var("BOLERO_FUZZER").is_ok() {
340 eprintln!(
341 "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
342 );
343 std::process::abort();
344 }
345
346 let mut count = 0;
347 let count_mut = &mut count;
348
349 self.with_instantiator(
350 |instantiator| {
351 bolero::test(bolero::TargetLocation {
352 package_name: "",
353 manifest_dir: "",
354 module_path: "",
355 file: "",
356 line: 0,
357 item_path: "<unknown>::__bolero_item_path__",
358 test_name: None,
359 })
360 .exhaustive()
361 .run_with_replay(move |is_replay| {
362 *count_mut += 1;
363
364 let mut instance = instantiator();
365 if instance.log {
366 eprintln!(
367 "{}",
368 "\n==== New Simulation Instance ===="
369 .color(colored::Color::Cyan)
370 .bold()
371 );
372 }
373
374 if is_replay {
375 instance.log = true;
376 }
377
378 tokio::runtime::Builder::new_current_thread()
379 .build()
380 .unwrap()
381 .block_on(async { instance.run(&mut thunk).await })
382 })
383 },
384 false,
385 );
386
387 count
388 }
389}
390
391type DylibResult = (
393 Vec<(&'static str, Option<u32>, DfirErased)>,
394 Vec<(&'static str, Option<u32>, DfirErased)>,
395 Hooks<&'static str>,
396 InlineHooks<&'static str>,
397);
398
399pub struct CompiledSimInstance<'a> {
402 func: SimLoaded<'a>,
403 externals_port_registry: SimExternalPortRegistry,
404 dylib_result: Option<DylibResult>,
405 log: bool,
406}
407
408impl<'a> CompiledSimInstance<'a> {
409 async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
410 self.run_without_launching(async |instance| {
411 instance.launch();
412 thunk().await;
413 })
414 .await;
415 }
416
417 async fn run_without_launching(
418 mut self,
419 thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
420 ) {
421 let mut external_out: HashMap<usize, UnboundedReceiverStream<Bytes>> = HashMap::new();
422 let mut external_in: HashMap<usize, UnboundedSender<Bytes>> = HashMap::new();
423 let mut cluster_external_out: HashMap<usize, HashMap<u32, UnboundedReceiverStream<Bytes>>> =
424 HashMap::new();
425 let mut cluster_external_in: HashMap<usize, HashMap<u32, UnboundedSender<Bytes>>> =
426 HashMap::new();
427
428 let dylib_result = unsafe {
429 (self.func)(
430 colored::control::SHOULD_COLORIZE.should_colorize(),
431 &mut external_out,
432 &mut external_in,
433 &mut cluster_external_out,
434 &mut cluster_external_in,
435 if self.log {
436 println_handler
437 } else {
438 null_handler
439 },
440 if self.log {
441 eprintln_handler
442 } else {
443 null_handler
444 },
445 )
446 };
447
448 let registered = &self.externals_port_registry.registered;
449
450 let quiescence = Rc::new(QuiescenceState {
451 quiescent: Cell::new(false),
452 quiescence_notify: Notify::new(),
453 resume_notify: Notify::new(),
454 });
455
456 let mut input_senders = HashMap::new();
457 let mut output_receivers = HashMap::new();
458 let mut cluster_input_senders = HashMap::new();
459 let mut cluster_output_receivers = HashMap::new();
460
461 #[expect(
462 clippy::disallowed_methods,
463 reason = "inserts into maps also unordered"
464 )]
465 for sim_port in registered.values() {
466 let usize_key = sim_port.into_inner();
467 if let Some(sender) = external_in.remove(&usize_key) {
468 input_senders.insert(*sim_port, Rc::new(sender));
469 }
470 if let Some(receiver) = external_out.remove(&usize_key) {
471 output_receivers.insert(*sim_port, Rc::new(Mutex::new(receiver)));
472 }
473 if let Some(senders) = cluster_external_in.remove(&usize_key) {
474 cluster_input_senders.insert(
475 *sim_port,
476 senders
477 .into_iter()
478 .map(|(member, s)| (member, Rc::new(s)))
479 .collect(),
480 );
481 }
482 if let Some(receivers) = cluster_external_out.remove(&usize_key) {
483 cluster_output_receivers.insert(
484 *sim_port,
485 receivers
486 .into_iter()
487 .map(|(member, r)| (member, Rc::new(Mutex::new(r))))
488 .collect(),
489 );
490 }
491 }
492
493 self.dylib_result = Some(dylib_result);
494
495 let local_set = tokio::task::LocalSet::new();
496 local_set
497 .run_until(CURRENT_SIM_CONNECTIONS.scope(
498 RefCell::new(SimConnections {
499 input_senders,
500 output_receivers,
501 cluster_input_senders,
502 cluster_output_receivers,
503 external_registered: self.externals_port_registry.registered.clone(),
504 quiescence: quiescence.clone(),
505 }),
506 async move {
507 thunk(self).await;
508 },
509 ))
510 .await;
511 }
512
513 fn launch(self) {
516 tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
517 }
518
519 pub fn schedule_with_logger<W: std::io::Write>(
522 self,
523 log_writer: W,
524 ) -> impl use<W> + Future<Output = ()> {
525 self.schedule_with_maybe_logger(Some(log_writer))
526 }
527
528 fn schedule_with_maybe_logger<W: std::io::Write>(
529 mut self,
530 log_override: Option<W>,
531 ) -> impl use<W> + Future<Output = ()> {
532 let (async_dfirs, tick_dfirs, hooks, inline_hooks) = self.dylib_result.take().unwrap();
533
534 let not_ready_observation = async_dfirs
535 .iter()
536 .map(|(lid, c_id, _)| (serde_json::from_str(lid).unwrap(), *c_id))
537 .collect();
538
539 let quiescence = CURRENT_SIM_CONNECTIONS.with(|connections| {
540 let connections = connections.borrow();
541 connections.quiescence.clone()
542 });
543
544 let mut launched = LaunchedSim {
545 async_dfirs: async_dfirs
546 .into_iter()
547 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
548 .collect(),
549 possibly_ready_ticks: vec![],
550 not_ready_ticks: tick_dfirs
551 .into_iter()
552 .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
553 .collect(),
554 possibly_ready_observation: vec![],
555 not_ready_observation,
556 hooks: hooks
557 .into_iter()
558 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
559 .collect(),
560 inline_hooks: inline_hooks
561 .into_iter()
562 .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
563 .collect(),
564 log: if self.log {
565 if let Some(w) = log_override {
566 LogKind::Custom(w)
567 } else {
568 LogKind::Stderr
569 }
570 } else {
571 LogKind::Null
572 },
573 quiescence,
574 };
575
576 async move { launched.scheduler().await }
577 }
578}
579
580impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
581 fn clone(&self) -> Self {
582 *self
583 }
584}
585
586impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
587
588impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
589 async fn with_stream<Out>(
590 &self,
591 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
592 ) -> Out {
593 let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
594 let connections = connections.borrow();
595 let port = connections.external_registered.get(&self.0).unwrap();
596 (
597 connections.output_receivers.get(port).unwrap().clone(),
598 connections.quiescence.clone(),
599 )
600 });
601
602 let mut receiver_stream = receiver.lock().await;
603 let mut notified_fut = pin!(quiescence.notified());
604 let mut quiescence_aware = futures::stream::poll_fn(|cx| {
605 use std::task::Poll;
606 match receiver_stream.poll_next_unpin(cx) {
607 Poll::Ready(Some(bytes)) => {
608 return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
609 }
610 Poll::Ready(None) => return Poll::Ready(None),
611 Poll::Pending => {}
612 }
613 if quiescence.is_quiescent() {
614 return Poll::Ready(None);
615 }
616 let () = ready!(notified_fut.as_mut().poll(cx));
617 notified_fut.set(quiescence.notified());
618 Poll::Ready(None)
619 });
620 thunk(&mut pin!(&mut quiescence_aware)).await
621 }
622
623 pub fn assert_no_more(self) -> impl Future<Output = ()>
625 where
626 T: Debug,
627 {
628 FutureTrackingCaller {
629 future: async move {
630 self.with_stream(async |stream| {
631 if let Some(next) = stream.next().await {
632 return Err(format!(
633 "Stream yielded unexpected message: {:?}, expected termination",
634 next
635 ));
636 }
637 Ok(())
638 })
639 .await
640 },
641 }
642 }
643}
644
645impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
646 pub async fn next(&self) -> Option<T> {
649 self.with_stream(async |stream| stream.next().await).await
650 }
651
652 pub async fn collect<C: Default + Extend<T>>(self) -> C {
655 self.with_stream(async |stream| stream.collect().await)
656 .await
657 }
658
659 pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
662 &self,
663 expected: I,
664 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
665 where
666 T: Debug + PartialEq<T2>,
667 {
668 FutureTrackingCaller {
669 future: async {
670 let mut expected: VecDeque<T2> = expected.into_iter().collect();
671
672 while !expected.is_empty() {
673 if let Some(next) = self.next().await {
674 let next_expected = expected.pop_front().unwrap();
675 if next != next_expected {
676 return Err(format!(
677 "Stream yielded unexpected message: {:?}, expected: {:?}",
678 next, next_expected
679 ));
680 }
681 } else {
682 return Err(format!(
683 "Stream ended early, still expected: {:?}",
684 expected
685 ));
686 }
687 }
688
689 Ok(())
690 },
691 }
692 }
693
694 pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
697 &self,
698 expected: I,
699 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
700 where
701 T: Debug + PartialEq<T2>,
702 {
703 ChainedFuture {
704 first: self.assert_yields(expected),
705 second: self.assert_no_more(),
706 first_done: false,
707 }
708 }
709}
710
711pin_project_lite::pin_project! {
712 struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
722 #[pin]
723 future: F,
724 }
725}
726
727impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
728 type Output = ();
729
730 #[track_caller]
731 fn poll(
732 mut self: Pin<&mut Self>,
733 cx: &mut std::task::Context<'_>,
734 ) -> std::task::Poll<Self::Output> {
735 match ready!(self.as_mut().project().future.poll(cx)) {
736 Ok(()) => std::task::Poll::Ready(()),
737 Err(e) => panic!("{}", e),
738 }
739 }
740}
741
742pin_project_lite::pin_project! {
743 struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
747 #[pin]
748 first: F1,
749 #[pin]
750 second: F2,
751 first_done: bool,
752 }
753}
754
755impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
756 type Output = ();
757
758 #[track_caller]
759 fn poll(
760 mut self: Pin<&mut Self>,
761 cx: &mut std::task::Context<'_>,
762 ) -> std::task::Poll<Self::Output> {
763 if !self.first_done {
764 ready!(self.as_mut().project().first.poll(cx));
765 *self.as_mut().project().first_done = true;
766 }
767
768 self.as_mut().project().second.poll(cx)
769 }
770}
771
772impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
773 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
776 where
777 T: Ord,
778 {
779 self.with_stream(async |stream| {
780 let mut collected: C = stream.collect().await;
781 collected.as_mut().sort();
782 collected
783 })
784 .await
785 }
786
787 pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
790 &self,
791 expected: I,
792 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
793 where
794 T: Debug + PartialEq<T2>,
795 {
796 FutureTrackingCaller {
797 future: async {
798 self.with_stream(async |stream| {
799 let mut expected: Vec<T2> = expected.into_iter().collect();
800
801 while !expected.is_empty() {
802 if let Some(next) = stream.next().await {
803 let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
804 if let Some((i, _)) = idx {
805 expected.swap_remove(i);
806 } else {
807 return Err(format!(
808 "Stream yielded unexpected message: {:?}",
809 next
810 ));
811 }
812 } else {
813 return Err(format!(
814 "Stream ended early, still expected: {:?}",
815 expected
816 ));
817 }
818 }
819
820 Ok(())
821 })
822 .await
823 },
824 }
825 }
826
827 pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
830 &self,
831 expected: I,
832 ) -> impl use<'_, T, T2, I> + Future<Output = ()>
833 where
834 T: Debug + PartialEq<T2>,
835 {
836 ChainedFuture {
837 first: self.assert_yields_unordered(expected),
838 second: self.assert_no_more(),
839 first_done: false,
840 }
841 }
842}
843
844impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
845 fn with_sink<Out>(
846 &self,
847 thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
848 ) -> Out {
849 let (sender, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
850 let connections = connections.borrow();
851 (
852 connections
853 .input_senders
854 .get(connections.external_registered.get(&self.0).unwrap())
855 .unwrap()
856 .clone(),
857 connections.quiescence.clone(),
858 )
859 });
860
861 thunk(&move |t| {
862 let res = sender.send(bincode::serialize(&t).unwrap().into());
863 quiescence.resume();
864 res
865 })
866 }
867}
868
869impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
870 pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
873 self.with_sink(|send| {
874 for t in iter {
875 send(t).unwrap();
876 }
877 })
878 }
879}
880
881impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
882 pub fn send(&self, t: T) {
885 self.with_sink(|send| send(t)).unwrap();
886 }
887
888 pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
891 self.with_sink(|send| {
892 for t in iter {
893 send(t).unwrap();
894 }
895 })
896 }
897}
898
899impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone
900 for SimClusterReceiver<T, O, R>
901{
902 fn clone(&self) -> Self {
903 *self
904 }
905}
906
907impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy
908 for SimClusterReceiver<T, O, R>
909{
910}
911
912impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterReceiver<T, O, R> {
913 async fn with_member_stream<Out>(
914 &self,
915 member_id: u32,
916 thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
917 ) -> Out {
918 let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
919 let connections = connections.borrow();
920 let port = connections.external_registered.get(&self.0).unwrap();
921 let receivers = connections.cluster_output_receivers.get(port).unwrap();
922 (
923 receivers[&member_id].clone(),
924 connections.quiescence.clone(),
925 )
926 });
927
928 let mut lock = receiver.lock().await;
929 let mut notified_fut = pin!(quiescence.notified());
930 let mut quiescence_aware = futures::stream::poll_fn(|cx| {
931 use std::task::Poll;
932 match lock.poll_next_unpin(cx) {
933 Poll::Ready(Some(bytes)) => {
934 return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
935 }
936 Poll::Ready(None) => return Poll::Ready(None),
937 Poll::Pending => {}
938 }
939 if quiescence.is_quiescent() {
940 return Poll::Ready(None);
941 }
942 let () = ready!(notified_fut.as_mut().poll(cx));
943 notified_fut.set(quiescence.notified());
944 Poll::Ready(None)
945 });
946 thunk(&mut pin!(&mut quiescence_aware)).await
947 }
948}
949
950impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, TotalOrder, ExactlyOnce> {
951 pub async fn next(&self, member_id: u32) -> Option<T> {
953 self.with_member_stream(member_id, async |stream| stream.next().await)
954 .await
955 }
956
957 pub async fn collect<C: Default + Extend<T>>(self, member_id: u32) -> C {
959 self.with_member_stream(member_id, async |stream| stream.collect().await)
960 .await
961 }
962}
963
964impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, NoOrder, ExactlyOnce> {
965 pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self, member_id: u32) -> C
967 where
968 T: Ord,
969 {
970 self.with_member_stream(member_id, async |stream| {
971 let mut collected: C = stream.collect().await;
972 collected.as_mut().sort();
973 collected
974 })
975 .await
976 }
977}
978
979impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterSender<T, O, R> {
980 fn with_sink<Out>(
981 &self,
982 thunk: impl FnOnce(
983 &dyn Fn(u32, T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>,
984 ) -> Out,
985 ) -> Out {
986 let (senders, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
987 let connections = connections.borrow();
988 (
989 connections
990 .cluster_input_senders
991 .get(connections.external_registered.get(&self.0).unwrap())
992 .unwrap()
993 .clone(),
994 connections.quiescence.clone(),
995 )
996 });
997
998 thunk(&move |member_id: u32, t: T| {
999 let payload = bincode::serialize(&t).unwrap();
1000 let res = senders[&member_id].send(Bytes::from(payload));
1001 quiescence.resume();
1002 res
1003 })
1004 }
1005}
1006
1007impl<T: Serialize + DeserializeOwned> SimClusterSender<T, TotalOrder, ExactlyOnce> {
1008 pub fn send(&self, member_id: u32, t: T) {
1010 self.with_sink(|send| send(member_id, t)).unwrap();
1011 }
1012
1013 pub fn send_many<I: IntoIterator<Item = (u32, T)>>(&self, iter: I) {
1015 self.with_sink(|send| {
1016 for (member_id, t) in iter {
1017 send(member_id, t).unwrap();
1018 }
1019 })
1020 }
1021}
1022
1023enum LogKind<W: std::io::Write> {
1024 Null,
1025 Stderr,
1026 Custom(W),
1027}
1028
1029impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
1031 fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
1032 match self {
1033 LogKind::Null => Ok(()),
1034 LogKind::Stderr => {
1035 eprint!("{}", s);
1036 Ok(())
1037 }
1038 LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
1039 }
1040 }
1041}
1042
1043struct LaunchedSim<W: std::io::Write> {
1055 async_dfirs: Vec<(LocationId, Option<u32>, DfirErased)>,
1058 possibly_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1061 not_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1063 possibly_ready_observation: Vec<(LocationId, Option<u32>)>,
1067 not_ready_observation: Vec<(LocationId, Option<u32>)>,
1069 hooks: Hooks<LocationId>,
1072 inline_hooks: InlineHooks<LocationId>,
1076 log: LogKind<W>,
1077 quiescence: Rc<QuiescenceState>,
1079}
1080
1081impl<W: std::io::Write> LaunchedSim<W> {
1082 async fn scheduler(&mut self) {
1083 loop {
1084 tokio::task::yield_now().await;
1085 let mut any_made_progress = false;
1086 for (loc, c_id, dfir) in &mut self.async_dfirs {
1087 if dfir.run_tick().await {
1088 any_made_progress = true;
1089 let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
1090 .not_ready_ticks
1091 .drain(..)
1092 .partition(|(tick_loc, tick_c_id, _)| {
1093 let LocationId::Tick(_, outer) = tick_loc else {
1094 unreachable!()
1095 };
1096 outer.as_ref() == loc && tick_c_id == c_id
1097 });
1098
1099 self.possibly_ready_ticks.extend(now_ready);
1100 self.not_ready_ticks.extend(still_not_ready);
1101
1102 let (now_ready_obs, still_not_ready_obs): (Vec<_>, Vec<_>) = self
1103 .not_ready_observation
1104 .drain(..)
1105 .partition(|(obs_loc, obs_c_id)| obs_loc == loc && obs_c_id == c_id);
1106
1107 self.possibly_ready_observation.extend(now_ready_obs);
1108 self.not_ready_observation.extend(still_not_ready_obs);
1109 }
1110 }
1111
1112 if any_made_progress {
1113 continue;
1114 } else {
1115 use bolero::generator::*;
1116
1117 let (ready_tick, mut not_ready_tick): (Vec<_>, Vec<_>) = self
1118 .possibly_ready_ticks
1119 .drain(..)
1120 .partition(|(name, cid, _)| {
1121 let hooks = self.hooks.get(&(name.clone(), *cid)).unwrap();
1122 hooks.iter().all(|hook| hook.is_ready())
1124 && hooks.iter().any(|hook| {
1126 hook.current_decision().unwrap_or(false)
1127 || hook.can_make_nontrivial_decision()
1128 })
1129 });
1130
1131 self.possibly_ready_ticks = ready_tick;
1132 self.not_ready_ticks.append(&mut not_ready_tick);
1133
1134 let (ready_obs, mut not_ready_obs): (Vec<_>, Vec<_>) = self
1135 .possibly_ready_observation
1136 .drain(..)
1137 .partition(|(name, cid)| {
1138 self.hooks
1139 .get(&(name.clone(), *cid))
1140 .into_iter()
1141 .flatten()
1142 .any(|hook| {
1143 hook.current_decision().unwrap_or(false)
1144 || hook.can_make_nontrivial_decision()
1145 })
1146 });
1147
1148 self.possibly_ready_observation = ready_obs;
1149 self.not_ready_observation.append(&mut not_ready_obs);
1150
1151 if self.possibly_ready_ticks.is_empty()
1152 && self.possibly_ready_observation.is_empty()
1153 {
1154 for (name, cid, _) in &self.not_ready_ticks {
1157 let hooks = self.hooks.get(&(name.clone(), *cid)).unwrap();
1158 abort_assert!(
1159 hooks.iter().all(|hook| hook.is_ready()),
1160 "tick has a hook that never became ready"
1161 );
1162 }
1163
1164 self.quiescence.wait_for_resume().await;
1166 } else {
1167 let next_tick_or_obs = (0..(self.possibly_ready_ticks.len()
1168 + self.possibly_ready_observation.len()))
1169 .any();
1170
1171 if next_tick_or_obs < self.possibly_ready_ticks.len() {
1172 let next_tick = next_tick_or_obs;
1173 let mut removed = self.possibly_ready_ticks.remove(next_tick);
1174
1175 match &mut self.log {
1176 LogKind::Null => {}
1177 LogKind::Stderr => {
1178 if let Some(cid) = &removed.1 {
1179 eprintln!(
1180 "\n{}",
1181 format!("Running Tick (Cluster Member {})", cid)
1182 .color(colored::Color::Magenta)
1183 .bold()
1184 )
1185 } else {
1186 eprintln!(
1187 "\n{}",
1188 "Running Tick".color(colored::Color::Magenta).bold()
1189 )
1190 }
1191 }
1192 LogKind::Custom(writer) => {
1193 writeln!(
1194 writer,
1195 "\n{}",
1196 "Running Tick".color(colored::Color::Magenta).bold()
1197 )
1198 .unwrap();
1199 }
1200 }
1201
1202 let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
1203 write.write_str(&"*".color(colored::Color::Magenta).bold())?;
1204 write.write_str(" ")
1205 };
1206
1207 let mut tick_decision_writer = indenter::indented(&mut self.log)
1208 .with_format(indenter::Format::Custom {
1209 inserter: &mut asterisk_indenter,
1210 });
1211
1212 let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
1213 run_hooks(&mut tick_decision_writer, hooks);
1214
1215 let run_tick_future = removed.2.run_tick();
1216 if let Some(inline_hooks) =
1217 self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
1218 {
1219 let mut run_tick_future_pinned = pin!(run_tick_future);
1220
1221 loop {
1222 tokio::select! {
1223 biased;
1224 r = &mut run_tick_future_pinned => {
1225 abort_assert!(r, "tick DFIR run_tick() returned false");
1226 break;
1227 }
1228 _ = async {} => {
1229 bolero_generator::any::scope::borrow_with(|driver| {
1230 for hook in inline_hooks.iter_mut() {
1231 if hook.pending_decision() {
1232 if !hook.has_decision() {
1233 hook.autonomous_decision(driver);
1234 }
1235
1236 hook.release_decision(&mut tick_decision_writer);
1237 }
1238 }
1239 });
1240 }
1241 }
1242 }
1243 } else {
1244 abort_assert!(
1245 run_tick_future.await,
1246 "tick DFIR run_tick() returned false"
1247 );
1248 }
1249
1250 self.possibly_ready_ticks.push(removed);
1251 } else {
1252 let next_obs = next_tick_or_obs - self.possibly_ready_ticks.len();
1253 let mut default_hooks = vec![];
1254 let hooks = self
1255 .hooks
1256 .get_mut(&self.possibly_ready_observation[next_obs])
1257 .unwrap_or(&mut default_hooks);
1258
1259 run_hooks(&mut self.log, hooks);
1260 }
1261 }
1262 }
1263 }
1264 }
1265}
1266
1267fn run_hooks(tick_decision_writer: &mut impl std::fmt::Write, hooks: &mut Vec<Box<dyn SimHook>>) {
1268 let mut remaining_decision_count = hooks.len();
1269 let mut made_nontrivial_decision = false;
1270
1271 bolero::generator::bolero_generator::any::scope::borrow_with(|driver| {
1272 hooks.iter_mut().for_each(|hook| {
1274 if let Some(is_nontrivial) = hook.current_decision() {
1275 made_nontrivial_decision |= is_nontrivial;
1276 remaining_decision_count -= 1;
1277 } else if !hook.can_make_nontrivial_decision() {
1278 hook.autonomous_decision(driver, false);
1282 remaining_decision_count -= 1;
1283 }
1284 });
1285
1286 hooks.iter_mut().for_each(|hook| {
1287 if hook.current_decision().is_none() {
1288 made_nontrivial_decision |= hook.autonomous_decision(
1289 driver,
1290 !made_nontrivial_decision && remaining_decision_count == 1,
1291 );
1292 remaining_decision_count -= 1;
1293 }
1294
1295 hook.release_decision(tick_decision_writer);
1296 });
1297 });
1298}