1 // `synct` has synchronization primitives like `syncf` that are thread safe, and will lock between fibers across
2 // multiple threads' reactors.
3 // these should be preferred over core.sync as instead of e.g. one thread being allowed to lock a mutex, only one fiber
4 // is allowed to lock that mutex, across many threads.
5 
6 module rockhopper.rhapi.synct;
7 import rockhopper.core.uda : Async, ThreadSafe, Synchronized;
8 
9 import rockhopper.core.llevents : waitThreadEvent;
10 import core.atomic : atomicOp;
11 
12 // for note for future me:
13 // `synchronized` methods lock on the whole instance, not per-method:
14 // https://dlang.org/spec/class.html#synchronized-methods #16.6.1
15 
16 // These need to be classes so we can use `synchronized(this)` and `synchronized` members
17 
18 final shared @ThreadSafe class TEvent
19 {
20 @ThreadSafe:
21 
22 	import eventcore.core : eventDriver;
23 	import eventcore.driver : EventID, EventDriver;
24 	import std.typecons : Tuple, tuple;
25 	import core.thread.osthread : Thread;
26 
27 	alias ThreadEventsTy = Tuple!(shared(EventDriver), EventID)[typeof(Thread.getThis.id)];
28 
29 	// you may only await an event from the thread that created it, so we need one event per thread
30 	// SAFETY: only accessed within `synchronized`
31 	private __gshared ThreadEventsTy threadEvents;
32 
33 	synchronized void notify()
34 	{
35 		foreach (_, tup; threadEvents)
36 			tup[0].events.trigger(tup[1], true);
37 
38 		// reset at same time since all those thread events are done with now
39 		// expect the freshly awoken threads to free the resources we're removing the references to here
40 		threadEvents = ThreadEventsTy.init;
41 	}
42 
43 	void wait() @Async
44 	{
45 		auto tid = Thread.getThis.id;
46 		EventID ev = void; // always assigned
47 
48 		synchronized (this)
49 		{
50 			if (tid in threadEvents)
51 			{
52 				ev = threadEvents[tid][1];
53 				eventDriver.events.addRef(ev); // just in case two fibers wait this TEvent on the same thread :)
54 			}
55 			else
56 			{
57 				ev = eventDriver.events.create();
58 				threadEvents[tid] = tuple(cast(shared) eventDriver, ev);
59 			}
60 		}
61 
62 		waitThreadEvent(ev);
63 		// threadEvents has now been cleared so this is safe to clean up.
64 		eventDriver.events.releaseRef(ev); // free resources like a good citizen
65 	}
66 }
67 
68 // like TEvent but has a stateful triggered/untriggered value, idk if theres a good name for this
69 /* final shared @ThreadSafe class TStatefulEvent
70 {
71 @ThreadSafe:
72 
73 	private bool triggered;
74 	private TEvent ev = new TEvent;
75 
76 	synchronized void notify()
77 	{
78 		if (triggered)
79 			return; // i don't think this is necessary but can't hurt.
80 
81 		triggered = true;
82 
83 		ev.notify();
84 	}
85 
86 	synchronized void reset()
87 	{
88 		if (triggered)
89 		{
90 			triggered = false;
91 			// safety: by here no fibers should have a waiting ec event anymore so we can wipe them.
92 			threadEvents = ThreadEventsTy.init;
93 		}
94 		// resetting an event that has not been triggered is a no-op! - otherwise it could hang fibers forever.
95 	}
96 
97 	void wait() @Async
98 	{
99 		while (!triggered)
100 		{
101 			ev.wait();
102 			// if the thread event resolves, triggered may still be false because another fiber got there before us,
103 			// and reset the event. to prevent this case, we loop around again.
104 		}
105 	}
106 } */
107 
108 final shared @ThreadSafe class TSemaphore
109 {
110 @ThreadSafe:
111 
112 	// we always access this inside of a `synchronized` so atomics are unnecessary
113 	// __gshared disables the compiler enforcement for that
114 	// MAKE SURE YOU NEVER USE THIS OUTSIDE OF A `synchronized(this)` OR `synchronized` METHOD
115 	private __gshared uint count;
116 	private TEvent notifyEv = new TEvent;
117 
118 	synchronized void notify()
119 	{
120 		count++;
121 
122 		// wake up the fibers!
123 		notifyEv.notify();
124 	}
125 
126 	synchronized bool tryWait()
127 	{
128 		if (count == 0)
129 			return false;
130 
131 		count--;
132 		return true;
133 	}
134 
135 	void wait() @Async
136 	{
137 		while (true)
138 		{
139 			// try and "wake" this fiber, only one thread and therefore fiber is allowed in this block at a time.
140 			synchronized (this)
141 			{
142 				if (count > 0)
143 				{
144 					count--;
145 					return;
146 				}
147 			}
148 
149 			// someone else got here first! release the lock and wait until we're told theres new notifies.
150 			notifyEv.wait();
151 		}
152 	}
153 }
154 
155 // non-recursive mutex
156 // im PRETTY SURE this is safe but don't shoot me if it isn't im doing my best
157 final shared @ThreadSafe class TMutex
158 {
159 @ThreadSafe:
160 
161 	private bool locked;
162 	private TEvent unlockEv = new TEvent;
163 
164 	void lock() @Async
165 	{
166 		while (true)
167 		{
168 			// see if we get to take the mutex!
169 			synchronized (this)
170 			{
171 				if (!locked)
172 				{
173 					// we can lock!
174 					locked = true;
175 					return;
176 				}
177 			}
178 
179 			// we didn't get it this time, efficiently wait for an unlock.
180 			unlockEv.wait();
181 		}
182 	}
183 
184 	synchronized void unlock()
185 	{
186 		assert(locked, "unlocking an unlocked TMutex makes no sense");
187 		locked = false;
188 		unlockEv.notify();
189 	}
190 }
191 
192 // TODO: remutex
193 // TODO: rwmutex
194 
195 // an equivalent of the `synchronized` attribute that also ensures mut-ex of fibers, not just threads.
196 template tSynchronized(alias func)
197 {
198 	import std.traits : isSomeFunction, ReturnType, Parameters;
199 
200 	static assert(isSomeFunction!func, "tSynchronized may only be instantiated with a function");
201 
202 	// even though this is always true, if we don't if for it, we get more compiler errors than just the assert
203 	// and nobody needs that, so just be satisfied with the assert.
204 	static if(isSomeFunction!func)
205 	{
206 		TMutex m;
207 
208 		ReturnType!func tSynchronized(Parameters!func args) @Async @Synchronized
209 		{
210 			m.lock();
211 			func(args);
212 			m.unlock();
213 		}
214 	}
215 }
216 
217 // a nullable in which trying to get the value while its null waits for a result to be set
218 final shared @ThreadSafe class TGuardedResult(T)
219 {
220 @ThreadSafe:
221 	import std.typecons : Nullable;
222 
223 	private TEvent setEv = new TEvent;
224 	private bool hasValue;
225 	private T value;
226 
227 	synchronized void set(T val)
228 	{
229 		value = T;
230 		hasValue = true;
231 		setEv.notify();
232 	}
233 
234 	synchronized void nullify()
235 	{
236 		hasValue = false;
237 		value = T.init;
238 	}
239 
240 	T get() @Async
241 	{
242 		// this pattern should be familiar by now, if not, read the comments on some earlier sync tools
243 		while (true)
244 		{
245 			synchronized (this)
246 			{
247 				if (hasValue)
248 					return value;
249 			}
250 
251 			setEv.wait();
252 		}
253 	}
254 
255 	synchronized Nullable!T tryGet()
256 	{
257 		if (hasValue) return value;
258 		return Nullable!T.init;
259 	}
260 }
261 
262 // like a golang WaitGroup
263 // see FWaitGroup for more detailed notes on how this works and how it differs from a semaphore
264 final shared @ThreadSafe class TWaitGroup
265 {
266 @ThreadSafe:
267 	private TEvent doneEv = new TEvent;
268 	// safety: this is only accessed inside `synchronized` sections.
269 	private __gshared uint count;
270 
271 	this() { }
272 
273 	this(uint c)
274 	{
275 		count = c;
276 	}
277 
278 	synchronized void add(uint amt)
279 	{
280 		count += amt;
281 	}
282 
283 	synchronized void done()
284 	{
285 		assert(count > 0);
286 		count--;
287 		doneEv.notify();
288 	}
289 
290 	void wait() @Async
291 	{
292 		while (true)
293 		{
294 			synchronized (this)
295 			{
296 				if (count == 0) return;
297 			}
298 
299 			doneEv.wait();
300 		}
301 	}
302 }
303 
304 // kinda like a channel in go
305 final shared @ThreadSafe class TMessageBox(T)
306 {
307 @ThreadSafe:
308 	import std.container : DList;
309 	import std.typecons : Nullable;
310 
311 	private TEvent sendEv = new TEvent;
312 	private DList!T queue;
313 
314 	synchronized void send(T val)
315 	{
316 		queue.insertBack(val);
317 		sendEv.notify();
318 	}
319 
320 	T receive() @Async
321 	{
322 		while (true)
323 		{
324 			synchronized (this)
325 			{
326 				if (!queue.empty)
327 				{
328 					auto f = queue.front;
329 					queue.removeFront();
330 					return f;
331 				}
332 			}
333 
334 			sendEv.wait();
335 		}
336 	}
337 
338 	synchronized Nullable!T tryReceive()
339 	{
340 		if (queue.empty) return Nullable!T.init;
341 		// in a good display of why synct is less efficient than syncf,
342 		// FMessageBox just calls receive() here for free, but that causes an extra lock here :(
343 		auto f = queue.front;
344 		queue.removeFront();
345 		return f;
346 	}
347 }