1 // `reactor` contains the actual reactor implementation, and the thread-global reactor API 2 module rockhopper.core.reactor; 3 4 // === PUBLIC API === 5 import core.thread.fiber : Fiber; 6 import rockhopper.core.suspends : SuspendSend, SuspendReturn; 7 import rockhopper.core.uda : Async; 8 import std.traits : isCallable; 9 10 public { 11 void spawn(void delegate() fn) 12 { 13 reactor.enqueueFiber(fn); 14 } 15 16 mixin template rhMain(alias fn) if (is(typeof(fn) == void delegate()) || is(typeof(fn) == void function())) 17 { 18 void main() { entrypoint(fn); }; 19 } 20 21 void entrypoint(void delegate() fn) 22 { 23 assert(reactor.fibers.length == 0, "You can only have one entrypoint() call in flight at once on a reactor!"); 24 25 spawn(fn); 26 reactor.loop(); 27 } 28 29 void entrypoint(void function() fn) { 30 entrypoint({ 31 fn(); 32 }); 33 } 34 35 // Fiber.yield() for convenience 36 void yield() @Async 37 { 38 Fiber.yield(); 39 } 40 41 // return type must be the same as WrappedFiber.suspendResult 42 // This function informs the reactor to pause your fiber and potentially entire thread on the given suspend. 43 SuspendReturn llawait(SuspendSend bl) @Async 44 { 45 assert( 46 !reactor._currentFiber.isNull, 47 "You cannot await a suspend if you're not in a fiber (hint: wrap your code in entrypoint({}))" 48 ); 49 auto cf = reactor._currentFiber.get; 50 51 assert(cf.currentSuspend.isNull); 52 cf.currentSuspend = bl; 53 cf.suspendResult.nullify(); 54 55 while (cf.suspendResult.isNull) yield(); 56 57 auto res = cf.suspendResult.get; 58 cf.currentSuspend.nullify(); 59 cf.suspendRegistered = false; 60 61 return res; 62 } 63 64 // bails out of the event loop *now* 65 void earlyExit() 66 { 67 import eventcore.core : eventDriver; 68 69 eventDriver.core.exit(); 70 71 if (!reactor._currentFiber.isNull) 72 yield(); // causes an instant exit when called inside a fiber, or just causes it to exit ASAP from outside 73 } 74 } 75 76 private Reactor reactor; 77 78 // === REACTOR IMPLEMENTATION === 79 80 private struct Reactor 81 { 82 import std.typecons : Nullable; 83 84 // there should only ever be one reactor per thread, in TLS, and it is private to this module 85 // so it should not EVER be copied, but to make sure it isn't, this specifically prevents that. 86 // if this is ever a problem, reintroduce the old class lazy init thing but with a Reactor* instead. 87 @disable this(ref Reactor); 88 89 Nullable!(WrappedFiber*) _currentFiber; 90 91 WrappedFiber*[] fibers; 92 93 void enqueueFiber(void delegate() f) 94 { 95 fibers ~= ralloc!WrappedFiber(f); 96 } 97 98 void loop() 99 { 100 import eventcore.core : eventDriver; 101 import eventcore.driver : ExitReason; 102 import std.array : array; 103 import std.datetime : Duration; 104 105 while (fibers.length) 106 { 107 // step 1: run all fibers (appending to a `fibers` will not include those new ones in the `foreach`) 108 auto fibersBefore = fibers.length; 109 foreach (f; fibers) 110 { 111 _currentFiber = f; 112 f.fiber.call(); 113 } 114 _currentFiber.nullify(); 115 116 // step 2: check for new fibers! 117 // if we have new fibers, don't stop and wait for the current fibers to finish blocking, there's more stuff to do! 118 // instead, just loop back round to the start and keep going 119 auto newFibersAdded = fibers.length > fibersBefore; 120 121 // step 3: remove finished fibers from the list 122 auto filtered = new WrappedFiber*[fibers.length]; 123 auto fi = 0; 124 foreach (f; fibers) 125 { 126 if (f.fiber.state == Fiber.State.TERM) 127 rfree(f); 128 else 129 filtered[fi++] = f; 130 } 131 132 fibers = filtered[0 .. fi]; 133 134 // step 2.5: we have to remove finished fibers BEFORE we loop back around 135 // otherwise, we .call() on a finished fiber - segfault on dmd and ldc, noop on gdc. 136 if (newFibersAdded) continue; 137 138 // step 4: register callbacks on fibers that need it 139 foreach (f; fibers) 140 registerCallbackIfNeeded(f); 141 142 // step 6: run event loop! 143 // when processEvents is called with no params, will wait unless none are queued 144 // instead, we want to just wait indefinitely if there are no queued events, so pass Duration.max 145 // double check that fibers still exist! if all exited, then this would just hang forever. 146 if (fibers.length && ExitReason.exited == eventDriver.core.processEvents(Duration.max)) break; 147 148 // ExitReason.exited -> earlyExit() 149 // .idle -> processed some events 150 // .outOfWaiters -> no fibers have registered suspends (e.g. yield() without a suspend) 151 // .timeout -> impossible, lol 152 } 153 154 if (fibers.length) 155 { 156 foreach (f; fibers) 157 rfree(f); 158 159 fibers = null; 160 } 161 } 162 163 private void registerCallbackIfNeeded(WrappedFiber* f) 164 { 165 import eventcore.core : eventDriver; 166 167 // don't register a callback if there is nothing to register, or it's already done. 168 if (f.currentSuspend.isNull || f.suspendRegistered) 169 return; 170 171 f.suspendRegistered = true; 172 auto relevantSuspend = f.currentSuspend.get; 173 174 final switch (relevantSuspend.kind) with (SuspendSend.Kind) 175 { 176 case nsLookup: 177 // the mixin cannot handle this case due to needing to clone scoped resources 178 auto v = relevantSuspend.nsLookupValue; 179 180 eventDriver.dns.lookupHost(v, (_id, status, scope addrs) nothrow { 181 import std.algorithm : map; 182 import std.array : array; 183 import rockhopper.core.suspends : SuspendReturn, SRNsLookup; 184 185 auto escapedAddrs = addrs.map!(cloneRefAddress).array; 186 187 f.suspendResult = SuspendReturn.nsLookup(SRNsLookup(status, escapedAddrs)); 188 }); 189 break; 190 191 case threadEvent: 192 mixin RegisterCallback!("threadEvent", "events.wait", ["v"], 0); 193 MIXIN_RES(); 194 break; 195 196 case fileOpen: 197 mixin RegisterCallback!("fileOpen", "files.open", ["v.path", "v.mode"], 2, HandleArgumentPos.None, "SRFileOpen"); 198 MIXIN_RES(); 199 break; 200 201 202 case fileClose: 203 mixin RegisterCallback!("fileClose", "files.close", ["v"], 1); 204 MIXIN_RES(); 205 break; 206 207 case fileRead: 208 mixin RegisterCallback!("fileRead", "files.read", ["v.fd", "v.offset", "v.buf", "v.ioMode"], 2, HandleArgumentPos.First, "SRRW", "rw"); 209 MIXIN_RES(); 210 break; 211 212 case pipeRead: 213 mixin RegisterCallback!("pipeRead", "pipes.read", ["v.fd", "v.buf", "v.ioMode"], 2, HandleArgumentPos.First, "SRRW", "rw"); 214 MIXIN_RES(); 215 break; 216 217 case fileWrite: 218 mixin RegisterCallback!("fileWrite", "files.write", ["v.fd", "v.offset", "v.buf", "v.ioMode"], 2, HandleArgumentPos.First, "SRRW", "rw"); 219 MIXIN_RES(); 220 break; 221 222 case pipeWrite: 223 mixin RegisterCallback!("pipeWrite", "pipes.write", ["v.fd", "v.buf", "v.ioMode"], 2, HandleArgumentPos.First, "SRRW", "rw"); 224 MIXIN_RES(); 225 break; 226 227 case procWait: 228 mixin RegisterCallback!("procWait", "processes.wait", ["v"], 1); 229 MIXIN_RES(); 230 break; 231 232 233 case signalTrap: 234 mixin RegisterCallback!("signalTrap", "signals.listen", ["v"], 2, HandleArgumentPos.Last, "SRSignalTrap"); 235 MIXIN_RES(); 236 break; 237 238 case streamConnect: 239 mixin RegisterCallback!("streamConnect", "sockets.connectStream", ["v.peerAddress", "v.bindAddress"], 2, HandleArgumentPos.None, "SRStreamConnect"); 240 MIXIN_RES(); 241 break; 242 243 case streamRead: 244 mixin RegisterCallback!("streamRead", "sockets.read", ["v.fd", "v.buf", "v.ioMode"], 2, HandleArgumentPos.First, "SRRW", "rw"); 245 MIXIN_RES(); 246 break; 247 248 case dgramReceive: 249 // cannot use the mixin due to a refaddress that needs cloning 250 auto v = relevantSuspend.dgramReceiveValue; 251 252 eventDriver.sockets.receive(v.fd, v.buf, v.ioMode, (_fd, status, read, scope addr) nothrow{ 253 import rockhopper.core.suspends : SuspendReturn, SRDgramSendReceive; 254 255 f.suspendResult = SuspendReturn.dgramSendReceive(SRDgramSendReceive(status, read, cloneRefAddress(addr))); 256 }); 257 break; 258 259 case dgramSend: 260 // cannot use the mixin due to a refaddress that needs cloning 261 auto v = relevantSuspend.dgramSendValue; 262 263 eventDriver.sockets.send(v.fd, v.buf, v.ioMode, v.targetAddress, (_fd, status, written, scope addr) nothrow{ 264 import rockhopper.core.suspends : SuspendReturn, SRDgramSendReceive; 265 266 assert(addr is null); 267 268 f.suspendResult = SuspendReturn.dgramSendReceive(SRDgramSendReceive(status, written, null)); 269 }); 270 break; 271 272 case streamWrite: 273 mixin RegisterCallback!("streamWrite", "sockets.write", ["v.fd", "v.buf", "v.ioMode"], 2, HandleArgumentPos.First, "SRRW", "rw"); 274 MIXIN_RES(); 275 break; 276 277 case sleep: 278 mixin RegisterCallback!("sleep", "timers.wait", ["v"], 0); 279 MIXIN_RES(); 280 break; 281 } 282 } 283 284 struct WrappedFiber 285 { 286 // if this is copied, the state can desync. let's stop that from happening at all. 287 @disable this(ref WrappedFiber); 288 289 this(void delegate() fn) 290 { 291 fiber = ralloc!Fiber(fn); 292 } 293 294 Fiber fiber; 295 // when unset, fiber is unblocked, when set, the suspend the fiber is waiting on 296 Nullable!SuspendSend currentSuspend; 297 // if the current suspend has had its callback registered or not 298 bool suspendRegistered; 299 // when set, the result of the suspend (file data, etc) to be passed back to the fiber 300 Nullable!SuspendReturn suspendResult; 301 302 ~this() 303 { 304 rfree(fiber); 305 } 306 } 307 } 308 309 // === ALLOCATOR === 310 private { 311 import std.experimental.allocator : unbounded; 312 import std.experimental.allocator.building_blocks; 313 import std.algorithm.comparison : max, min; 314 import std.experimental.allocator : make, dispose; 315 316 // allocate in batches of 1kb to reduce amount of tiny fragmented allocs 317 alias Batched = AllocatorList!((n) => Region!Mallocator(max(1024 * 1024, n))); 318 319 // keep hold of reused memory, no size checking happening, but this is okay because 320 // we only use two different size allocations on this allocator, which are segregated. 321 alias FL = FreeList!(Batched, 0, unbounded); 322 323 enum WFSize = Reactor.WrappedFiber.sizeof; 324 enum FSize = __traits(classInstanceSize, Fiber); // equal to typeid(new Fiber({})).initializer.length 325 326 alias Allocator = Segregator!(min(WFSize, FSize), FL, FL); 327 328 Allocator _rallocator; 329 330 auto ralloc(T, A...)(auto ref A a) 331 if (is(T == Reactor.WrappedFiber) || is(T == Fiber)) // enforce only these two types are used for safety reasons. 332 => make!T(_rallocator, a); 333 334 void rfree(T)(auto ref T* p) => dispose(_rallocator, p); 335 336 void rfree(T)(auto ref T p) if (is(T == class)) 337 => dispose(_rallocator, p); 338 } 339 340 // === MIXIN FOR NEATER REGISTERING OF CALLBACKS === 341 342 // TODO: make registering callbacks more compile-time than this is already (maybe ct-ify all of suspends.d) 343 private enum HandleArgumentPos 344 { 345 // this enum should ONLY exist at compile time or something has gone VERY wrong. 346 None, 347 First, 348 Last 349 } 350 351 private mixin template RegisterCallback( 352 // name of suspend enums, and name of function on the event driver 353 string enumName, string edName, 354 // args to event driver and back from callback (not including repeat) 355 string[] edArgs, int cbArgCount, 356 // if an arg of the callback is a repeat of the first param, where 357 HandleArgumentPos hap = HandleArgumentPos.First, 358 // if not "", the name of a function to pass the args to (to construct a struct or sm) 359 string extraCons = "", 360 // if the return type enum has a different name to the sender 361 string returnOverride = "" 362 ) 363 { 364 import std.array : join, array; 365 import std.range : iota; 366 import std.algorithm : map; 367 import std.conv : to; 368 369 static assert(edArgs.length, "should be some arguments to eventDriver.*.*()"); 370 371 enum cbArgs = iota(0, cbArgCount).map!((n) => "__cbArg" ~ n.to!string).array; 372 373 static if (hap == HandleArgumentPos.None) 374 enum cbArgsPadded = cbArgs; 375 else static if (hap == HandleArgumentPos.First) 376 enum cbArgsPadded = ["__handle"] ~ cbArgs; 377 else 378 enum cbArgsPadded = cbArgs ~ ["__handle"]; 379 380 // you're only allowed to make declarations in a mixin template, not statements :( 381 void MIXIN_RES() 382 { 383 auto v = mixin("relevantSuspend." ~ enumName ~ "Value"); 384 385 // for some reason the ide support hates using an IES so I won't 386 enum sEdFuncName = "eventDriver." ~ edName; 387 enum sEdArgs = edArgs.join(","); 388 enum sCbargs = cbArgsPadded.join(","); 389 // assume first argument to event driver is the one we're testing against 390 enum sAssert = (hap == HandleArgumentPos.None ? "" : "assert(__handle==" ~ edArgs[0] ~ ");"); 391 enum sImportedExtraCons = "imported!\"rockhopper.core.suspends\"." ~ extraCons; 392 enum sReturnVal = extraCons.length ? (sImportedExtraCons ~ "(" ~ cbArgs.join(",") ~ ")") : cbArgs.join(","); 393 394 mixin( 395 sEdFuncName ~ "(" ~ sEdArgs ~ ", (" ~ sCbargs ~ ") nothrow {" 396 ~ sAssert 397 ~ "f.suspendResult = SuspendReturn." ~ (returnOverride.length ? returnOverride : enumName) ~ "(" ~ sReturnVal ~ ");" 398 ~ "});" 399 ); 400 } 401 } 402 403 // TODO: this function is kinda homeless :( 404 // utility needed for implementation of both DNS and sockets, to help escape refaddresses outside of the scope. 405 import eventcore.driver : RefAddress; 406 public RefAddress cloneRefAddress(scope RefAddress src) 407 nothrow @trusted 408 { 409 // oops! 410 if (src is null) return null; 411 412 // TODO: does this work for domain sockets? I assume so since EC use it internally? 413 version (Windows) 414 import core.sys.windows.winsock2 : sockaddr_storage, sockaddr; 415 else 416 import core.sys.posix.sys.socket : sockaddr_storage, sockaddr; 417 418 // heap memory to allow escaping instead of stack memory 419 auto storage = new sockaddr_storage; 420 // copy 421 *storage = *(cast(sockaddr_storage*) src.name); 422 423 // construct new refaddress 424 return new RefAddress(cast(sockaddr*) storage, src.nameLen); 425 }