1 // `threading` contains fundamental threading utilities akin to the functions in `reactor`.
2 
3 module rockhopper.core.threading;
4 
5 import rockhopper.core.reactor : entrypoint, spawn;
6 import rockhopper.core.llevents : waitThreadEvent;
7 import rockhopper.core.uda : Async;
8 
9 import eventcore.driver : EventDriver;
10 import eventcore.core : eventDriver;
11 import core.thread.osthread : Thread;
12 import std.typecons : Tuple, tuple;
13 
14 struct ThreadHandle
15 {
16 	Thread th;
17 	shared(EventDriver) ed;
18 
19 	void join() { joinThread(th); }
20 
21 	void spawn(void delegate() fn)
22 	{
23 		assert(th.isRunning, "cannot spawn in a thread that is finished");
24 		spawnInThread(ed, fn);
25 	}
26 }
27 
28 // spawns a new thread and runs the given function as a fiber in that thread's reactor
29 ThreadHandle spawnThread(void delegate() fn) @Async
30 {
31 	auto ev = eventDriver.events.create();
32 	shared sd = cast(shared) eventDriver;
33 
34 	shared(EventDriver) res;
35 
36 	auto t = new Thread({
37 		// send over a shared reference to this driver
38 		res = cast(shared) eventDriver;
39 		// tell the other thread that that's done
40 		sd.events.trigger(ev, true);
41 
42 		// run the fiber!
43 		entrypoint(fn);
44 	}).start();
45 
46 	assert(t !is null);
47 
48 	// wait for `res` to be set
49 	waitThreadEvent(ev);
50 	assert(res !is null, "after the second thread fires the event, it should have send it's driver too.");
51 
52 	return ThreadHandle(t, res);
53 }
54 
55 private void _spawnInThread_springboard(void delegate() f) @trusted nothrow
56 {
57 	try
58 	{
59 		spawn(f);
60 	}
61 	catch (Exception e)
62 	{
63 		try
64 		{
65 			import std.stdio : stderr;
66 
67 			stderr.writeln("[rockhopper.core.threading.spawnInThread] failed to spawn fiber in remote thread: ", e);
68 		}
69 		catch (Exception)
70 		{
71 			// oh nuts.
72 			assert(0);
73 		}
74 	}
75 }
76 
77 // runs the fiber in another thread's event loop
78 // REQUIRES that that thread's event loop is already running! if `entrypoint` has exited, this won't work.
79 void spawnInThread(shared(EventDriver) ed, void delegate() fn)
80 {
81 	ed.core.runInOwnerThread(&_spawnInThread_springboard, fn);
82 }
83 
84 // waits for a thread to exit asynchronously
85 void joinThread(Thread th) @Async
86 {
87 	auto ev = eventDriver.events.create();
88 	shared ed = cast(shared) eventDriver;
89 
90 	// create an event that triggers when `th` exits
91 	auto t = new Thread({
92 		th.join();
93 
94 		ed.events.trigger(ev, true);
95 	}).start();
96 
97 	// wait for it, then clean up `t`
98 	waitThreadEvent(ev);
99 	t.join();
100 }