rust.eventbus/src/lib.rs

344 lines
13 KiB
Rust

/*
* Event Bus
* Copyright (C) 2018 Soni L.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
/// Registers an event hook.
///
/// Usage: `register_hook!(bus, priority, type, handler)`
#[macro_export]
macro_rules! register_hook {
($b:expr, $p:expr, $t:ty, $h:expr) => {
{
// hygiene
let bus: &$crate::EventBus = $b;
let hook = $h;
let pri = $p;
{
static EVENT_ID: ::std::sync::atomic::AtomicUsize = std::sync::atomic::ATOMIC_USIZE_INIT;
static EVENT_ID_INIT: ::std::sync::Once = std::sync::ONCE_INIT;
// no generics allowed.
static _DUMMY: ::std::marker::PhantomData<dyn Fn(&mut $t) + Send + Sync> = ::std::marker::PhantomData;
EVENT_ID_INIT.call_once(|| {
EVENT_ID.store($crate::get_event_id::<$t>(), std::sync::atomic::Ordering::Relaxed);
});
let id = EVENT_ID.load(std::sync::atomic::Ordering::Relaxed);
$crate::register::<$t, _>(bus, pri, hook, id)
}
}
}
}
/// Posts an event.
///
/// Usage: `post_event!(bus, event, type, type, ...)`
#[macro_export]
macro_rules! post_event {
// prefer (bus, ty, ty, ty, ..., evt) but rust is just bad at parsing
($b:expr, $e:expr, $($t:ty),+) => {
{
// hygiene
let bus: &$crate::EventBus = $b;
// event setup, this may evaluate to any type.
let event: &mut _ = $e;
{
// it is a logic error for an event's cancellability to change based on its value.
static CANCELLABLE: ::std::sync::atomic::AtomicBool = ::std::sync::atomic::ATOMIC_BOOL_INIT;
static CANCELLABLE_INIT: ::std::sync::Once = ::std::sync::ONCE_INIT;
CANCELLABLE_INIT.call_once(|| {
CANCELLABLE.store(<$crate::Event>::cancellable(event), std::sync::atomic::Ordering::Relaxed);
});
let cancellable = CANCELLABLE.load(std::sync::atomic::Ordering::Relaxed);
#[allow(unused_mut)]
let mut event_handlers = ::std::iter::empty::<(i32, Box<dyn Fn(&mut _)>)>();
$(
let handlers;
#[allow(unused_mut)]
let mut event_handlers = {
// event type setup
static EVENT_ID: ::std::sync::atomic::AtomicUsize = ::std::sync::atomic::ATOMIC_USIZE_INIT;
static EVENT_ID_INIT: ::std::sync::Once = ::std::sync::ONCE_INIT;
// no generics allowed.
static _DUMMY: ::std::marker::PhantomData<dyn Fn(&mut $t) + Send + Sync> = ::std::marker::PhantomData;
EVENT_ID_INIT.call_once(|| {
EVENT_ID.store($crate::get_event_id::<$t>(), std::sync::atomic::Ordering::Relaxed);
});
let id = EVENT_ID.load(std::sync::atomic::Ordering::Relaxed);
// handler retrieval and invokation
{
let subevent: &mut $t = event;
handlers = $crate::get_post_targets::<$t>(bus, subevent, id);
}
$crate::type_hint_trick(event, event_handlers, handlers.iter().cloned(), |f| move |evt| f(evt))
};
)+
for (_pri, fun) in event_handlers {
fun(event);
if cancellable && <_ as $crate::Event>::cancelled(event) {
break;
}
}
cancellable && <_ as $crate::Event>::cancelled(event)
}
}
}
}
/// Basic trait for defining an event.
pub trait Event: 'static {
/// Returns whether this event is cancellable.
///
/// When this is true, `cancelled` and `cancel` should also be implemented.
///
/// Note: While this method does take a `&self`, it's a logic error for an event's
/// cancellability to change based on its value. The `&self` is just for trait objectification.
/// You have been warned!
fn cancellable(&self) -> bool {
false
}
/// Returns whether this event has been cancelled.
fn cancelled(&self) -> bool {
false
}
/// Sets whether this event is cancelled.
///
/// # Panics
///
/// Panics if this event is not cancellable.
fn set_cancelled(&mut self, cancel: bool) {
let _ = cancel;
panic!("not cancellable");
}
}
/// An event bus.
pub struct EventBus {
id: usize,
dropper: Mutex<LinkedList<Box<dyn Fn(usize) + Send + Sync>>>,
}
#[doc(hidden)]
pub fn type_hint_trick<T: Event + Sized, A, B, C: Event + ?Sized, E>(_event: &mut T, event_handlers: A, handlers: B, convert: fn(::std::sync::Arc<dyn Fn(&mut C) + Send + Sync>) -> E) -> impl ::std::iter::Iterator<Item=(i32, Box<dyn for<'a> Fn(&'a mut T) + 'static>)> where
A: ::std::iter::Iterator<Item=(i32, Box<dyn Fn(&mut T)>)>,
B: ::std::iter::Iterator<Item=(i32, ::std::sync::Arc<dyn Fn(&mut C) + Send + Sync>)>,
E: Fn(&mut T) + 'static,
{
itertools::merge_join_by(event_handlers, handlers, |l, r| l.0.cmp(&r.0))
.map(move |eob: itertools::EitherOrBoth<(i32, Box<dyn Fn(&mut T)>), (i32, ::std::sync::Arc<dyn Fn(&mut C) + Send + Sync>)>| ({
eob.as_ref().left().map(|v| v.0).or_else(|| eob.as_ref().right().map(|v| v.0)).unwrap_or(0)
}, Box::new(move |evt: &mut _| {
eob.as_ref().left().map(|x| (x.1)(evt));
eob.as_ref().right().map(|x| convert(x.1.clone())(evt));
}) as Box<_>))
}
#[macro_use]
extern crate lazy_static;
extern crate anymap;
extern crate itertools;
//use std::marker::PhantomData;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::ATOMIC_USIZE_INIT;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::Mutex;
use std::collections::LinkedList;
use std::any::Any;
use std::sync::Arc;
use std::sync::RwLock;
use std::mem;
/* only exists as a module to keep us from accidentally misusing these */
mod id_map {
use std::marker::PhantomData;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::ATOMIC_USIZE_INIT;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::Mutex;
use std::sync::Arc;
use super::Event;
use super::Handlers;
lazy_static! {
static ref EVENT_ID_MAP: Mutex<::anymap::Map<::anymap::any::Any + Send + Sync>> = Mutex::new(::anymap::Map::new());
}
struct EventId<T: Event + ?Sized> {
id: usize,
_t: PhantomData<dyn Fn(&mut T) + Send + Sync>
}
pub fn get_event_id<T: Event + ?Sized>() -> usize {
static EVENT_ID_COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
EVENT_ID_MAP.lock().expect("failed to allocate event id").entry::<EventId<T>>().or_insert_with(|| {
let handlers: Arc<Handlers<T>> = Default::default();
Arc::make_mut(&mut super::HANDLERS.write().expect("???")).push(handlers);
EventId { id: EVENT_ID_COUNT.fetch_add(1, AtomicOrdering::SeqCst), _t: PhantomData }
}).id
}
}
/// Low-level event handling function, returns the event ID for use with get_post_targets.
///
/// This function is (extremely) slow!
pub fn get_event_id<T: Event + ?Sized>() -> usize {
id_map::get_event_id::<T>()
}
type RwArcVec<T> = RwLock<Arc<Vec<T>>>;
struct Handlers<T: Event + ?Sized>(RwArcVec<RwArcVec<(i32, Arc<dyn Fn(&mut T) + Send + Sync>)>>);
impl<T: Event + ?Sized> Default for Handlers<T> {
fn default() -> Self {
Handlers(Default::default())
}
}
lazy_static! {
static ref HANDLERS: RwArcVec<Arc<dyn Any + Send + Sync>> = Default::default();
}
/// Low-level event handling function, returns the handlers that should be called to post the
/// event.
///
/// This is useful if you want to simulate inheritance.
///
/// # Panics
///
/// Panics if `id` doesn't match the given `T` (but not if the given `id` doesn't exist).
pub fn get_post_targets<T: Event + ?Sized>(bus: &EventBus, _event: &mut T, id: usize) -> Arc<Vec<(i32, Arc<dyn Fn(&mut T) + Send + Sync>)>> {
let target: Option<Arc<Vec<_>>> = HANDLERS.read().expect("failed to lock for reading").get(id).map(|arc| Arc::clone(arc))
.map(|any| Arc::downcast::<Handlers<T>>(any).expect("Wrong id"))
.and_then(|handlers| handlers.0.read().expect("failed to lock for reading").get(bus.id).map(|hooks| Arc::clone(&*hooks.read().expect("failed to lock for reading"))));
target.unwrap_or_else(|| Arc::new(Vec::new()))
}
/// Low-level event handling function, registers an event.
///
/// This function is kinda slow. It doesn't block currently executing hooks (and, in fact, may be
/// safely called from a running hook), but does block threads from starting hook execution.
///
/// # Panics
///
/// Panics if `id` doesn't match the given `T`, or if `id` doesn't exist.
pub fn register<T: Event + ?Sized, F: for<'a> Fn(&'a mut T) + Send + Sync + 'static>(bus: &EventBus, priority: i32, handler: F, id: usize) {
HANDLERS.read().expect("failed to lock for reading").get(id).map(|arc| Arc::clone(arc))
.map(|any| Arc::downcast::<Handlers<T>>(any).expect("Wrong id"))
.map(|handlers| {
let mut lock = handlers.0.read().expect("failed to lock for reading");
if lock.len() <= id {
mem::drop(lock);
let mut wlock = handlers.0.write().expect("failed to lock for writing");
if wlock.len() <= id { // another thread may have touched this
let vec = Arc::get_mut(&mut wlock).expect("failed to mutate busses");
let count = id - vec.len() + 1;
vec.reserve_exact(count);
for _ in 0..count {
vec.push(Default::default());
}
}
// there's no way to downgrade a write lock, but as long as we never shrink the
// vecs it should be fine.
mem::drop(wlock);
lock = handlers.0.read().expect("failed to lock for reading");
}
let option = lock.get(bus.id);
// last RwLock
let hooks = option.expect("failed to make vecs");
let hook: (i32, Arc<dyn Fn(&mut T) + Send + Sync + 'static>) = (priority, Arc::new(handler));
let mut wlock = hooks.write().expect("failed to lock for writing");
let vec = Arc::make_mut(&mut wlock);
// would be nice if Result had a .coalesce()
let pos = match vec.binary_search_by(|probe| probe.0.cmp(&priority)) { Ok(p) => p, Err(p) => p };
vec.insert(pos, hook);
}).expect("No such id");
}
static BUS_ID: AtomicUsize = ATOMIC_USIZE_INIT;
lazy_static! {
// this is the closest to high-performance we can get with std
// would be nicer if this didn't lock the whole list
static ref FREED_BUS_IDS: Mutex<LinkedList<usize>> = Mutex::new(LinkedList::new());
}
impl EventBus {
/// Makes a new EventBus.
pub fn new() -> EventBus {
EventBus {
id: FREED_BUS_IDS.lock().unwrap().pop_front().unwrap_or_else(|| BUS_ID.fetch_add(1, AtomicOrdering::SeqCst)),
dropper: Default::default(),
}
}
}
impl Drop for EventBus {
fn drop(&mut self) {
// TODO
}
}
// test with good error messages
#[cfg(test)]
mod useful_test {
use super::{Event, EventBus};
struct MyEvent {
i: i32
}
trait DynEvent : Event {
}
impl DynEvent for MyEvent {
}
impl Event for MyEvent {
}
fn add_handler(e: &mut MyEvent) {
/* adds 1 */
e.i += 1;
}
fn no_handler(e: &mut MyEvent) {
/* does nothing */
e.i += 0;
}
#[test]
fn test_usage_internal() {
let event_bus = EventBus::new();
//let handler_id = event_bus.register(add_handler, 0);
register_hook!(&event_bus, 0, MyEvent, add_handler);
let mut event = MyEvent { i: 3 };
assert_eq!(event.i, 3);
post_event!(&event_bus, &mut event, MyEvent);
post_event!(&event_bus, &mut event, dyn DynEvent);
assert_eq!(event.i, 4);
register_hook!(&event_bus, 1, MyEvent, no_handler);
post_event!(&event_bus, &mut event, MyEvent);
assert_eq!(event.i, 5);
//event_bus.unregister(handler_id);
post_event!(&event_bus, &mut event, MyEvent);
//assert_eq!(event.i, 5);
}
}