1 // `threading` contains fundamental threading utilities akin to the functions in `reactor`. 2 3 module rockhopper.core.threading; 4 5 import rockhopper.core.reactor : entrypoint, spawn; 6 import rockhopper.core.llevents : waitThreadEvent; 7 import rockhopper.core.uda : Async; 8 9 import eventcore.driver : EventDriver; 10 import eventcore.core : eventDriver; 11 import core.thread.osthread : Thread; 12 import std.typecons : Tuple, tuple; 13 14 struct ThreadHandle 15 { 16 Thread th; 17 shared(EventDriver) ed; 18 19 void join() { joinThread(th); } 20 21 void spawn(void delegate() fn) 22 { 23 assert(th.isRunning, "cannot spawn in a thread that is finished"); 24 spawnInThread(ed, fn); 25 } 26 } 27 28 // spawns a new thread and runs the given function as a fiber in that thread's reactor 29 ThreadHandle spawnThread(void delegate() fn) @Async 30 { 31 auto ev = eventDriver.events.create(); 32 shared sd = cast(shared) eventDriver; 33 34 shared(EventDriver) res; 35 36 auto t = new Thread({ 37 // send over a shared reference to this driver 38 res = cast(shared) eventDriver; 39 // tell the other thread that that's done 40 sd.events.trigger(ev, true); 41 42 // run the fiber! 43 entrypoint(fn); 44 }).start(); 45 46 assert(t !is null); 47 48 // wait for `res` to be set 49 waitThreadEvent(ev); 50 assert(res !is null, "after the second thread fires the event, it should have send it's driver too."); 51 52 return ThreadHandle(t, res); 53 } 54 55 private void _spawnInThread_springboard(void delegate() f) @trusted nothrow 56 { 57 try 58 { 59 spawn(f); 60 } 61 catch (Exception e) 62 { 63 try 64 { 65 import std.stdio : stderr; 66 67 stderr.writeln("[rockhopper.core.threading.spawnInThread] failed to spawn fiber in remote thread: ", e); 68 } 69 catch (Exception) 70 { 71 // oh nuts. 72 assert(0); 73 } 74 } 75 } 76 77 // runs the fiber in another thread's event loop 78 // REQUIRES that that thread's event loop is already running! if `entrypoint` has exited, this won't work. 79 void spawnInThread(shared(EventDriver) ed, void delegate() fn) 80 { 81 ed.core.runInOwnerThread(&_spawnInThread_springboard, fn); 82 } 83 84 // waits for a thread to exit asynchronously 85 void joinThread(Thread th) @Async 86 { 87 auto ev = eventDriver.events.create(); 88 shared ed = cast(shared) eventDriver; 89 90 // create an event that triggers when `th` exits 91 auto t = new Thread({ 92 th.join(); 93 94 ed.events.trigger(ev, true); 95 }).start(); 96 97 // wait for it, then clean up `t` 98 waitThreadEvent(ev); 99 t.join(); 100 }