1 // `llevents` contains the lowest level APIs for interacting with rockhopper, other than using the reactor directly. 2 // does not cover all operations, as some things can be done synchronously from `eventDriver`. 3 module rockhopper.core.llevents; 4 5 // TODO: when all of these are wrapped at a high level, check the docs for which need fSynchronized! 6 7 // general imports 8 import rockhopper.core.reactor : llawait, yield; 9 import rockhopper.core.uda : Async; 10 // yield should only be used for the socket wrappers, all others should use llawait only 11 import rockhopper.core.suspends; 12 import eventcore.core : eventDriver; 13 import std.typecons : Tuple, tuple; 14 15 // dns related imports 16 /* public */ import eventcore.driver : DNSStatus, RefAddress; 17 18 // thread event related imports 19 import eventcore.driver : EventID; 20 21 // file/pipe related imports 22 import eventcore.driver : IOMode, FileFD, PipeFD; 23 /* public */ import eventcore.driver : FileOpenMode, OpenStatus, CloseStatus, IOStatus; 24 25 // process related imports 26 /* public */ import eventcore.driver : Process, ProcessID; 27 28 // signal related imports 29 /* public */ import eventcore.driver : SignalStatus; 30 31 // socket related imports 32 import eventcore.driver : StreamSocketFD, StreamListenSocketFD, DatagramSocketFD; 33 /* public */ import eventcore.driver : ConnectStatus, StreamListenOptions, RefAddress; 34 import std.socket : Address; 35 36 // sleep related imports 37 import std.datetime : Duration, dur; 38 39 SRStreamConnect streamConnect(Address peer, Address bind) @Async 40 { 41 return llawait(SuspendSend.streamConnect(SSStreamConnect(peer, bind))).streamConnectValue; 42 } 43 44 struct StreamListen 45 { 46 import std.socket : Address, UnknownAddress; 47 import std.typecons : Nullable, tuple, Tuple; 48 49 import rockhopper.core.reactor : cloneRefAddress; 50 51 @disable this(ref StreamListen); // prevent implicit copying 52 53 Address addr; 54 StreamListenOptions opts = StreamListenOptions.defaults; 55 Nullable!StreamListenSocketFD fd; 56 57 Tuple!(StreamSocketFD, RefAddress)[] sockets; 58 59 void registerListen() 60 { 61 assert(fd.isNull); 62 63 fd = eventDriver.sockets.listenStream(addr, opts, (_fd, sfd, ad) nothrow { 64 assert(_fd == fd); 65 66 sockets ~= tuple(sfd, cloneRefAddress(ad)); 67 }); 68 } 69 70 // uses waitForConnections instead of listenStream, required you bring your own fd, ignores this.addr and this.opts. 71 void registerWaitConns(StreamListenSocketFD listenfd) 72 { 73 assert(fd.isNull); 74 fd = listenfd; 75 76 eventDriver.sockets.waitForConnections(listenfd, (_fd, sfd, ad) nothrow { 77 assert(_fd == fd); 78 79 sockets ~= tuple(sfd, cloneRefAddress(ad)); 80 }); 81 } 82 83 void cleanup() 84 { 85 assert(!fd.isNull); 86 eventDriver.sockets.releaseRef(fd.get); 87 } 88 89 Tuple!(StreamSocketFD, RefAddress) wait() @Async 90 { 91 while (sockets is null || !sockets.length) yield(); 92 93 auto s = sockets[0]; 94 sockets = sockets[1 .. $]; 95 return s; 96 } 97 } 98 99 SRRW streamRead(StreamSocketFD fd, ubyte[] buf, IOMode mode = IOMode.once) @Async 100 { 101 return llawait(SuspendSend.streamRead(SSStreamRead(fd, 0, buf, mode))).rwValue; 102 } 103 104 IOStatus streamWaitForData(StreamSocketFD fd) @Async 105 { 106 return streamRead(fd, []).status; 107 } 108 109 SRDgramSendReceive dgramReceive(DatagramSocketFD fd, ubyte[] buf, IOMode mode = IOMode.once) @Async 110 { 111 return llawait(SuspendSend.dgramReceive(SSDgramReceive(fd, 0, buf, mode))).dgramSendReceiveValue; 112 } 113 114 SRDgramSendReceive dgramSend(DatagramSocketFD fd, const(ubyte)[] buf, Address target, IOMode mode = IOMode.once) @Async 115 { 116 return llawait(SuspendSend.dgramSend(SSDgramSend(fd, buf, mode, target))).dgramSendReceiveValue; 117 } 118 119 SRRW streamWrite(StreamSocketFD fd, const(ubyte)[] buf, IOMode mode = IOMode.once) @Async 120 { 121 return llawait(SuspendSend.streamWrite(SSStreamWrite(fd, 0, buf, mode))).rwValue; 122 } 123 124 SRNsLookup nsLookup(string name) @Async 125 { 126 return llawait(SuspendSend.nsLookup(name)).nsLookupValue; 127 } 128 129 void waitThreadEvent(EventID evid) @Async 130 { 131 llawait(SuspendSend.threadEvent(evid)); 132 } 133 134 SRFileOpen fileOpen(string path, FileOpenMode mode) @Async 135 { 136 return llawait(SuspendSend.fileOpen(SSFileOpen(path, mode))).fileOpenValue; 137 } 138 139 // this always returns instantly, btw 140 // https://github.com/vibe-d/eventcore/blob/515edf9b7ebf47d08b0cec8a7a5ae2cced30be71/source/eventcore/drivers/threadedfile.d#L250 141 // therefore it doesn't *really* need to go via the reactor, but oh well. Plus releaseref works too. 142 CloseStatus fileClose(FileFD fd) @Async 143 { 144 return llawait(SuspendSend.fileClose(fd)).fileCloseValue; 145 } 146 147 SRRW fileRead(FileFD fd, ulong oset, ubyte[] buffer, IOMode mode = IOMode.once) @Async 148 { 149 return llawait(SuspendSend.fileRead(SSFileRead(fd, oset, buffer, mode))).rwValue; 150 } 151 152 SRRW pipeRead(PipeFD fd, ubyte[] buffer, IOMode mode = IOMode.once) @Async 153 { 154 return llawait(SuspendSend.pipeRead(SSPipeRead(fd, 0, buffer, mode))).rwValue; 155 } 156 157 SRRW fileWrite(FileFD fd, ulong oset, const(ubyte)[] buffer, IOMode mode = IOMode.once) @Async 158 { 159 return llawait(SuspendSend.fileWrite(SSFileWrite(fd, oset, buffer, mode))).rwValue; 160 } 161 162 SRRW pipeWrite(PipeFD fd, const(ubyte)[] buffer, IOMode mode = IOMode.once) @Async 163 { 164 return llawait(SuspendSend.pipeWrite(SSPipeWrite(fd, 0, buffer, mode))).rwValue; 165 } 166 167 int processWait(ProcessID pid) @Async 168 { 169 return llawait(SuspendSend.procWait(pid)).procWaitValue; 170 } 171 172 SignalStatus signalTrap(int sig) @Async 173 { 174 auto result = llawait(SuspendSend.signalTrap(sig)).signalTrapValue; 175 eventDriver.signals.releaseRef(result.slID); 176 return result.status; 177 } 178 179 void sleep(Duration d) @Async 180 { 181 182 // 0ms repeat = don't repeat 183 auto timer = eventDriver.timers.create(); 184 eventDriver.timers.set(timer, d, dur!"msecs"(0)); 185 186 llawait(SuspendSend.sleep(timer)); 187 }