1 // `llevents` contains the lowest level APIs for interacting with rockhopper, other than using the reactor directly.
2 //            does not cover all operations, as some things can be done synchronously from `eventDriver`.
3 module rockhopper.core.llevents;
4 
5 // TODO: when all of these are wrapped at a high level, check the docs for which need fSynchronized!
6 
7 // general imports
8 import rockhopper.core.reactor : llawait, yield;
9 import rockhopper.core.uda : Async;
10 // yield should only be used for the socket wrappers, all others should use llawait only
11 import rockhopper.core.suspends;
12 import eventcore.core : eventDriver;
13 import std.typecons : Tuple, tuple;
14 
15 // dns related imports
16 /* public */ import eventcore.driver : DNSStatus, RefAddress;
17 
18 // thread event related imports
19 import eventcore.driver : EventID;
20 
21 // file/pipe related imports
22 import eventcore.driver : IOMode, FileFD, PipeFD;
23 /* public */ import eventcore.driver : FileOpenMode, OpenStatus, CloseStatus, IOStatus;
24 
25 // process related imports
26 /* public */ import eventcore.driver : Process, ProcessID;
27 
28 // signal related imports
29 /* public */ import eventcore.driver : SignalStatus;
30 
31 // socket related imports
32 import eventcore.driver : StreamSocketFD, StreamListenSocketFD, DatagramSocketFD;
33 /* public */ import eventcore.driver : ConnectStatus, StreamListenOptions, RefAddress;
34 import std.socket : Address;
35 
36 // sleep related imports
37 import std.datetime : Duration, dur;
38 
39 SRStreamConnect streamConnect(Address peer, Address bind) @Async
40 {
41 	return llawait(SuspendSend.streamConnect(SSStreamConnect(peer, bind))).streamConnectValue;
42 }
43 
44 struct StreamListen
45 {
46 	import std.socket : Address, UnknownAddress;
47 	import std.typecons : Nullable, tuple, Tuple;
48 
49 	import rockhopper.core.reactor : cloneRefAddress;
50 
51 	@disable this(ref StreamListen); // prevent implicit copying
52 
53 	Address addr;
54 	StreamListenOptions opts = StreamListenOptions.defaults;
55 	Nullable!StreamListenSocketFD fd;
56 
57 	Tuple!(StreamSocketFD, RefAddress)[] sockets;
58 
59 	void registerListen()
60 	{
61 		assert(fd.isNull);
62 
63 		fd = eventDriver.sockets.listenStream(addr, opts, (_fd, sfd, ad) nothrow {
64 			assert(_fd == fd);
65 
66 			sockets ~= tuple(sfd, cloneRefAddress(ad));
67 		});
68 	}
69 
70 	// uses waitForConnections instead of listenStream, required you bring your own fd, ignores this.addr and this.opts.
71 	void registerWaitConns(StreamListenSocketFD listenfd)
72 	{
73 		assert(fd.isNull);
74 		fd = listenfd;
75 
76 		eventDriver.sockets.waitForConnections(listenfd, (_fd, sfd, ad) nothrow {
77 			assert(_fd == fd);
78 
79 			sockets ~= tuple(sfd, cloneRefAddress(ad));
80 		});
81 	}
82 
83 	void cleanup()
84 	{
85 		assert(!fd.isNull);
86 		eventDriver.sockets.releaseRef(fd.get);
87 	}
88 
89 	Tuple!(StreamSocketFD, RefAddress) wait() @Async
90 	{
91 		while (sockets is null || !sockets.length) yield();
92 
93 		auto s = sockets[0];
94 		sockets = sockets[1 .. $];
95 		return s;
96 	}
97 }
98 
99 SRRW streamRead(StreamSocketFD fd, ubyte[] buf, IOMode mode = IOMode.once) @Async
100 {
101 	return llawait(SuspendSend.streamRead(SSStreamRead(fd, 0, buf, mode))).rwValue;
102 }
103 
104 IOStatus streamWaitForData(StreamSocketFD fd) @Async
105 {
106 	return streamRead(fd, []).status;
107 }
108 
109 SRDgramSendReceive dgramReceive(DatagramSocketFD fd, ubyte[] buf, IOMode mode = IOMode.once) @Async
110 {
111 	return llawait(SuspendSend.dgramReceive(SSDgramReceive(fd, 0, buf, mode))).dgramSendReceiveValue;
112 }
113 
114 SRDgramSendReceive dgramSend(DatagramSocketFD fd, const(ubyte)[] buf, Address target, IOMode mode = IOMode.once) @Async
115 {
116 	return llawait(SuspendSend.dgramSend(SSDgramSend(fd, buf, mode, target))).dgramSendReceiveValue;
117 }
118 
119 SRRW streamWrite(StreamSocketFD fd, const(ubyte)[] buf, IOMode mode = IOMode.once) @Async
120 {
121 	return llawait(SuspendSend.streamWrite(SSStreamWrite(fd, 0, buf, mode))).rwValue;
122 }
123 
124 SRNsLookup nsLookup(string name) @Async
125 {
126 	return llawait(SuspendSend.nsLookup(name)).nsLookupValue;
127 }
128 
129 void waitThreadEvent(EventID evid) @Async
130 {
131 	llawait(SuspendSend.threadEvent(evid));
132 }
133 
134 SRFileOpen fileOpen(string path, FileOpenMode mode) @Async
135 {
136 	return llawait(SuspendSend.fileOpen(SSFileOpen(path, mode))).fileOpenValue;
137 }
138 
139 // this always returns instantly, btw
140 // https://github.com/vibe-d/eventcore/blob/515edf9b7ebf47d08b0cec8a7a5ae2cced30be71/source/eventcore/drivers/threadedfile.d#L250
141 // therefore it doesn't *really* need to go via the reactor, but oh well. Plus releaseref works too.
142 CloseStatus fileClose(FileFD fd) @Async
143 {
144 	return llawait(SuspendSend.fileClose(fd)).fileCloseValue;
145 }
146 
147 SRRW fileRead(FileFD fd, ulong oset, ubyte[] buffer, IOMode mode = IOMode.once) @Async
148 {
149 	return llawait(SuspendSend.fileRead(SSFileRead(fd, oset, buffer, mode))).rwValue;
150 }
151 
152 SRRW pipeRead(PipeFD fd, ubyte[] buffer, IOMode mode = IOMode.once) @Async
153 {
154 	return llawait(SuspendSend.pipeRead(SSPipeRead(fd, 0, buffer, mode))).rwValue;
155 }
156 
157 SRRW fileWrite(FileFD fd, ulong oset, const(ubyte)[] buffer, IOMode mode = IOMode.once) @Async
158 {
159 	return llawait(SuspendSend.fileWrite(SSFileWrite(fd, oset, buffer, mode))).rwValue;
160 }
161 
162 SRRW pipeWrite(PipeFD fd, const(ubyte)[] buffer, IOMode mode = IOMode.once) @Async
163 {
164 	return llawait(SuspendSend.pipeWrite(SSPipeWrite(fd, 0, buffer, mode))).rwValue;
165 }
166 
167 int processWait(ProcessID pid) @Async
168 {
169 	return llawait(SuspendSend.procWait(pid)).procWaitValue;
170 }
171 
172 SignalStatus signalTrap(int sig) @Async
173 {
174 	auto result = llawait(SuspendSend.signalTrap(sig)).signalTrapValue;
175 	eventDriver.signals.releaseRef(result.slID);
176 	return result.status;
177 }
178 
179 void sleep(Duration d) @Async
180 {
181 
182 	// 0ms repeat = don't repeat
183 	auto timer = eventDriver.timers.create();
184 	eventDriver.timers.set(timer, d, dur!"msecs"(0));
185 
186 	llawait(SuspendSend.sleep(timer));
187 }