1 // `reactor` contains the actual reactor implementation, and the thread-global reactor API
2 module rockhopper.core.reactor;
3 
4 // === PUBLIC API ===
5 import core.thread.fiber : Fiber;
6 import rockhopper.core.suspends : SuspendSend, SuspendReturn;
7 import rockhopper.core.uda : Async;
8 import std.traits : isCallable;
9 
10 public {
11 	void spawn(void delegate() fn)
12 	{
13 		reactor.enqueueFiber(fn);
14 	}
15 
16 	mixin template rhMain(alias fn) if (is(typeof(fn) == void delegate()) || is(typeof(fn) == void function()))
17 	{
18 		void main() { entrypoint(fn); };
19 	}
20 
21 	void entrypoint(void delegate() fn)
22 	{
23 		assert(reactor.fibers.length == 0, "You can only have one entrypoint() call in flight at once on a reactor!");
24 
25 		spawn(fn);
26 		reactor.loop();
27 	}
28 
29 	void entrypoint(void function() fn) {
30 		entrypoint({
31 			fn();
32 		});
33 	}
34 
35 	// Fiber.yield() for convenience
36 	void yield() @Async
37 	{
38 		Fiber.yield();
39 	}
40 
41 	// return type must be the same as WrappedFiber.suspendResult
42 	// This function informs the reactor to pause your fiber and potentially entire thread on the given suspend.
43 	SuspendReturn llawait(SuspendSend bl) @Async
44 	{
45 		assert(
46 			!reactor._currentFiber.isNull,
47 			"You cannot await a suspend if you're not in a fiber (hint: wrap your code in entrypoint({}))"
48 		);
49 		auto cf = reactor._currentFiber.get;
50 
51 		assert(cf.currentSuspend.isNull);
52 		cf.currentSuspend = bl;
53 		cf.suspendResult.nullify();
54 
55 		while (cf.suspendResult.isNull) yield();
56 
57 		auto res = cf.suspendResult.get;
58 		cf.currentSuspend.nullify();
59 		cf.suspendRegistered = false;
60 
61 		return res;
62 	}
63 
64 	// bails out of the event loop *now*
65 	void earlyExit()
66 	{
67 		import eventcore.core : eventDriver;
68 
69 		eventDriver.core.exit();
70 
71 		if (!reactor._currentFiber.isNull)
72 			yield(); // causes an instant exit when called inside a fiber, or just causes it to exit ASAP from outside
73 	}
74 }
75 
76 private Reactor reactor;
77 
78 // === REACTOR IMPLEMENTATION ===
79 
80 private struct Reactor
81 {
82 	import std.typecons : Nullable;
83 
84 	// there should only ever be one reactor per thread, in TLS, and it is private to this module
85 	// so it should not EVER be copied, but to make sure it isn't, this specifically prevents that.
86 	// if this is ever a problem, reintroduce the old class lazy init thing but with a Reactor* instead.
87 	@disable this(ref Reactor);
88 
89 	Nullable!(WrappedFiber*) _currentFiber;
90 
91 	WrappedFiber*[] fibers;
92 
93 	void enqueueFiber(void delegate() f)
94 	{
95 		fibers ~= ralloc!WrappedFiber(f);
96 	}
97 
98 	void loop()
99 	{
100 		import eventcore.core : eventDriver;
101 		import eventcore.driver : ExitReason;
102 		import std.array : array;
103 		import std.datetime : Duration;
104 
105 		while (fibers.length)
106 		{
107 			// step 1: run all fibers (appending to a `fibers` will not include those new ones in the `foreach`)
108 			auto fibersBefore = fibers.length;
109 			foreach (f; fibers)
110 			{
111 				_currentFiber = f;
112 				f.fiber.call();
113 			}
114 			_currentFiber.nullify();
115 
116 			// step 2: check for new fibers!
117 			// if we have new fibers, don't stop and wait for the current fibers to finish blocking, there's more stuff to do!
118 			// instead, just loop back round to the start and keep going
119 			auto newFibersAdded = fibers.length > fibersBefore;
120 
121 			// step 3: remove finished fibers from the list
122 			auto filtered = new WrappedFiber*[fibers.length];
123 			auto fi = 0;
124 			foreach (f; fibers)
125 			{
126 				if (f.fiber.state == Fiber.State.TERM)
127 					rfree(f);
128 				else
129 					filtered[fi++] = f;
130 			}
131 
132 			fibers = filtered[0 .. fi];
133 
134 			// step 2.5: we have to remove finished fibers BEFORE we loop back around
135 			// otherwise, we .call() on a finished fiber - segfault on dmd and ldc, noop on gdc.
136 			if (newFibersAdded) continue;
137 
138 			// step 4: register callbacks on fibers that need it
139 			foreach (f; fibers)
140 				registerCallbackIfNeeded(f);
141 
142 			// step 6: run event loop!
143 			// when processEvents is called with no params, will wait unless none are queued
144 			// instead, we want to just wait indefinitely if there are no queued events, so pass Duration.max
145 			// double check that fibers still exist! if all exited, then this would just hang forever.
146 			if (fibers.length && ExitReason.exited == eventDriver.core.processEvents(Duration.max)) break;
147 
148 			// ExitReason.exited -> earlyExit()
149 			//           .idle -> processed some events
150 			//           .outOfWaiters -> no fibers have registered suspends (e.g. yield() without a suspend)
151 			//           .timeout -> impossible, lol
152 		}
153 
154 		if (fibers.length)
155 		{
156 			foreach (f; fibers)
157 				rfree(f);
158 
159 			fibers = null;
160 		}
161 	}
162 
163 	private void registerCallbackIfNeeded(WrappedFiber* f)
164 	{
165 		import eventcore.core : eventDriver;
166 
167 		// don't register a callback if there is nothing to register, or it's already done.
168 		if (f.currentSuspend.isNull || f.suspendRegistered)
169 			return;
170 
171 		f.suspendRegistered = true;
172 		auto relevantSuspend = f.currentSuspend.get;
173 
174 		final switch (relevantSuspend.kind) with (SuspendSend.Kind)
175 		{
176 		case nsLookup:
177 			// the mixin cannot handle this case due to needing to clone scoped resources
178 			auto v = relevantSuspend.nsLookupValue;
179 
180 			eventDriver.dns.lookupHost(v, (_id, status, scope addrs) nothrow {
181 				import std.algorithm : map;
182 				import std.array : array;
183 				import rockhopper.core.suspends : SuspendReturn, SRNsLookup;
184 
185 				auto escapedAddrs = addrs.map!(cloneRefAddress).array;
186 
187 				f.suspendResult = SuspendReturn.nsLookup(SRNsLookup(status, escapedAddrs));
188 			});
189 			break;
190 
191 		case threadEvent:
192 			mixin RegisterCallback!("threadEvent", "events.wait", ["v"], 0);
193 			MIXIN_RES();
194 			break;
195 
196 		case fileOpen:
197 			mixin RegisterCallback!("fileOpen", "files.open", ["v.path", "v.mode"], 2, HandleArgumentPos.None, "SRFileOpen");
198 			MIXIN_RES();
199 			break;
200 
201 
202 		case fileClose:
203 			mixin RegisterCallback!("fileClose", "files.close", ["v"], 1);
204 			MIXIN_RES();
205 			break;
206 
207 		case fileRead:
208 			mixin RegisterCallback!("fileRead", "files.read", ["v.fd", "v.offset", "v.buf", "v.ioMode"], 2, HandleArgumentPos.First, "SRRW", "rw");
209 			MIXIN_RES();
210 			break;
211 
212 		case pipeRead:
213 			mixin RegisterCallback!("pipeRead", "pipes.read", ["v.fd", "v.buf", "v.ioMode"], 2, HandleArgumentPos.First, "SRRW", "rw");
214 			MIXIN_RES();
215 			break;
216 
217 		case fileWrite:
218 			mixin RegisterCallback!("fileWrite", "files.write", ["v.fd", "v.offset", "v.buf", "v.ioMode"], 2, HandleArgumentPos.First, "SRRW", "rw");
219 			MIXIN_RES();
220 			break;
221 
222 		case pipeWrite:
223 			mixin RegisterCallback!("pipeWrite", "pipes.write", ["v.fd", "v.buf", "v.ioMode"], 2, HandleArgumentPos.First, "SRRW", "rw");
224 			MIXIN_RES();
225 			break;
226 
227 		case procWait:
228 			mixin RegisterCallback!("procWait", "processes.wait", ["v"], 1);
229 			MIXIN_RES();
230 			break;
231 
232 
233 		case signalTrap:
234 			mixin RegisterCallback!("signalTrap", "signals.listen", ["v"], 2, HandleArgumentPos.Last, "SRSignalTrap");
235 			MIXIN_RES();
236 			break;
237 
238 		case streamConnect:
239 			mixin RegisterCallback!("streamConnect", "sockets.connectStream", ["v.peerAddress", "v.bindAddress"], 2, HandleArgumentPos.None, "SRStreamConnect");
240 			MIXIN_RES();
241 			break;
242 
243 		case streamRead:
244 			mixin RegisterCallback!("streamRead", "sockets.read", ["v.fd", "v.buf", "v.ioMode"], 2, HandleArgumentPos.First, "SRRW", "rw");
245 			MIXIN_RES();
246 			break;
247 
248 		case dgramReceive:
249 			// cannot use the mixin due to a refaddress that needs cloning
250 			auto v = relevantSuspend.dgramReceiveValue;
251 
252 			eventDriver.sockets.receive(v.fd, v.buf, v.ioMode, (_fd, status, read, scope addr) nothrow{
253 				import rockhopper.core.suspends : SuspendReturn, SRDgramSendReceive;
254 
255 				f.suspendResult = SuspendReturn.dgramSendReceive(SRDgramSendReceive(status, read, cloneRefAddress(addr)));
256 			});
257 			break;
258 
259 		case dgramSend:
260 			// cannot use the mixin due to a refaddress that needs cloning
261 			auto v = relevantSuspend.dgramSendValue;
262 
263 			eventDriver.sockets.send(v.fd, v.buf, v.ioMode, v.targetAddress, (_fd, status, written, scope addr) nothrow{
264 				import rockhopper.core.suspends : SuspendReturn, SRDgramSendReceive;
265 
266 				assert(addr is null);
267 
268 				f.suspendResult = SuspendReturn.dgramSendReceive(SRDgramSendReceive(status, written, null));
269 			});
270 			break;
271 
272 		case streamWrite:
273 			mixin RegisterCallback!("streamWrite", "sockets.write", ["v.fd", "v.buf", "v.ioMode"], 2, HandleArgumentPos.First, "SRRW", "rw");
274 			MIXIN_RES();
275 			break;
276 
277 		case sleep:
278 			mixin RegisterCallback!("sleep", "timers.wait", ["v"], 0);
279 			MIXIN_RES();
280 			break;
281 		}
282 	}
283 
284 	struct WrappedFiber
285 	{
286 		// if this is copied, the state can desync. let's stop that from happening at all.
287 		@disable this(ref WrappedFiber);
288 
289 		this(void delegate() fn)
290 		{
291 			fiber = ralloc!Fiber(fn);
292 		}
293 
294 		Fiber fiber;
295 		// when unset, fiber is unblocked, when set, the suspend the fiber is waiting on
296 		Nullable!SuspendSend currentSuspend;
297 		// if the current suspend has had its callback registered or not
298 		bool suspendRegistered;
299 		// when set, the result of the suspend (file data, etc) to be passed back to the fiber
300 		Nullable!SuspendReturn suspendResult;
301 
302 		~this()
303 		{
304 			rfree(fiber);
305 		}
306 	}
307 }
308 
309 // === ALLOCATOR ===
310 private {
311 	import std.experimental.allocator : unbounded;
312 	import std.experimental.allocator.building_blocks;
313 	import std.algorithm.comparison : max, min;
314 	import std.experimental.allocator : make, dispose;
315 
316 	// allocate in batches of 1kb to reduce amount of tiny fragmented allocs
317 	alias Batched = AllocatorList!((n) => Region!Mallocator(max(1024 * 1024, n)));
318 
319 	// keep hold of reused memory, no size checking happening, but this is okay because
320 	// we only use two different size allocations on this allocator, which are segregated.
321 	alias FL = FreeList!(Batched, 0, unbounded);
322 
323 	enum WFSize = Reactor.WrappedFiber.sizeof;
324 	enum FSize = __traits(classInstanceSize, Fiber); // equal to typeid(new Fiber({})).initializer.length
325 
326 	alias Allocator = Segregator!(min(WFSize, FSize), FL, FL);
327 
328 	Allocator _rallocator;
329 
330 	auto ralloc(T, A...)(auto ref A a)
331 	if (is(T == Reactor.WrappedFiber) || is(T == Fiber)) // enforce only these two types are used for safety reasons.
332 		=> make!T(_rallocator, a);
333 
334 	void rfree(T)(auto ref T* p) => dispose(_rallocator, p);
335 
336 	void rfree(T)(auto ref T p) if (is(T == class))
337 		=> dispose(_rallocator, p);
338 }
339 
340 // === MIXIN FOR NEATER REGISTERING OF CALLBACKS ===
341 
342 // TODO: make registering callbacks more compile-time than this is already (maybe ct-ify all of suspends.d)
343 private enum HandleArgumentPos
344 {
345 	// this enum should ONLY exist at compile time or something has gone VERY wrong.
346 	None,
347 	First,
348 	Last
349 }
350 
351 private mixin template RegisterCallback(
352 	// name of suspend enums, and name of function on the event driver
353 	string enumName, string edName,
354 	// args to event driver and back from callback (not including repeat)
355 	string[] edArgs, int cbArgCount,
356 	// if an arg of the callback is a repeat of the first param, where
357 	HandleArgumentPos hap = HandleArgumentPos.First,
358 	// if not "", the name of a function to pass the args to (to construct a struct or sm)
359 	string extraCons = "",
360 	// if the return type enum has a different name to the sender
361 	string returnOverride = ""
362 )
363 {
364 	import std.array : join, array;
365 	import std.range : iota;
366 	import std.algorithm : map;
367 	import std.conv : to;
368 
369 	static assert(edArgs.length, "should be some arguments to eventDriver.*.*()");
370 
371 	enum cbArgs = iota(0, cbArgCount).map!((n) => "__cbArg" ~ n.to!string).array;
372 
373 	static if (hap == HandleArgumentPos.None)
374 		enum cbArgsPadded = cbArgs;
375 	else static if (hap == HandleArgumentPos.First)
376 		enum cbArgsPadded = ["__handle"] ~ cbArgs;
377 	else
378 		enum cbArgsPadded = cbArgs ~ ["__handle"];
379 
380 	// you're only allowed to make declarations in a mixin template, not statements :(
381 	void MIXIN_RES()
382 	{
383 		auto v = mixin("relevantSuspend." ~ enumName ~ "Value");
384 
385 		// for some reason the ide support hates using an IES so I won't
386 		enum sEdFuncName = "eventDriver." ~ edName;
387 		enum sEdArgs = edArgs.join(",");
388 		enum sCbargs = cbArgsPadded.join(",");
389 		// assume first argument to event driver is the one we're testing against
390 		enum sAssert = (hap == HandleArgumentPos.None ? "" : "assert(__handle==" ~ edArgs[0] ~ ");");
391 		enum sImportedExtraCons = "imported!\"rockhopper.core.suspends\"." ~ extraCons;
392 		enum sReturnVal = extraCons.length ? (sImportedExtraCons ~ "(" ~ cbArgs.join(",") ~ ")") : cbArgs.join(",");
393 
394 		mixin(
395 			sEdFuncName ~ "(" ~ sEdArgs ~ ", (" ~ sCbargs ~ ") nothrow {"
396 				~ sAssert
397 				~ "f.suspendResult = SuspendReturn." ~ (returnOverride.length ? returnOverride : enumName) ~ "(" ~ sReturnVal ~ ");"
398 			~ "});"
399 		);
400 	}
401 }
402 
403 // TODO: this function is kinda homeless :(
404 // utility needed for implementation of both DNS and sockets, to help escape refaddresses outside of the scope.
405 import eventcore.driver : RefAddress;
406 public RefAddress cloneRefAddress(scope RefAddress src)
407 nothrow @trusted
408 {
409 	// oops!
410 	if (src is null) return null;
411 
412 	// TODO: does this work for domain sockets? I assume so since EC use it internally?
413 	version (Windows)
414 		import core.sys.windows.winsock2 : sockaddr_storage, sockaddr;
415 	else
416 		import core.sys.posix.sys.socket : sockaddr_storage, sockaddr;
417 
418 	// heap memory to allow escaping instead of stack memory
419 	auto storage = new sockaddr_storage;
420 	// copy
421 	*storage = *(cast(sockaddr_storage*) src.name);
422 
423 	// construct new refaddress
424 	return new RefAddress(cast(sockaddr*) storage, src.nameLen);
425 }