1 // `syncf` contains fiber synchronization primitives.
2 // They provide familiar looking sync tools with support for fibers, without thread safety.
3 // They are designed to be as low overhead and efficient as possible. (stack allocated structs without locks, etc)
4 // Also check `synct`!
5 
6 
7 module rockhopper.rhapi.syncf;
8 
9 import rockhopper.core.reactor : yield, spawn;
10 import rockhopper.core.llevents : sleep;
11 import rockhopper.core.uda : Async, Synchronized;
12 import std.datetime : Duration;
13 import core.thread.fiber : Fiber;
14 
15 // core.sync.event : Event
16 struct FEvent
17 {
18 	// this does not have reference semantics so purposefully prevent footgunning, use a FEvent* if you need it.
19 	@disable this(ref FEvent);
20 
21 	private bool raised;
22 	private uint waiters;
23 
24 	void notify() { raised = true; }
25 
26 	void wait() @Async
27 	{
28 		waiters++;
29 		while (!raised) yield();
30 		waiters--;
31 		if (waiters == 0) raised = false; // auto-reset
32 	}
33 }
34 
35 // this is like an FEvent however instead of just wrapping a bool, it keeps a count,
36 // such that exactly as many wait()s are resolved as notify()s are called.
37 // one notify resolves one wait.
38 // core.sync.semaphore : Semaphore
39 struct FSemaphore
40 {
41 	@disable this(ref FSemaphore); // see FEvent::this(ref FEvent)
42 
43 	private uint count;
44 
45 	void notify() { count++; }
46 
47 	bool tryWait()
48 	{
49 		if (count == 0) return false;
50 		count--;
51 		return true;
52 	}
53 
54 	void wait() @Async
55 	{
56 		while (count == 0) yield();
57 		count--;
58 	}
59 }
60 
61 
62 // non-recursive mutex, can only have one lock at all even from the same fiber
63 struct FMutex
64 {
65 	@disable this(ref FMutex);
66 
67 	private bool locked;
68 
69 	void lock() @Async
70 	{
71 		while (locked) yield();
72 		locked = true;
73 	}
74 
75 	void unlock()
76 	{
77 		assert(locked, "unlocking an unlocked FMutex makes no sense");
78 		locked = false;
79 	}
80 }
81 
82 // core.sync.mutex : Mutex
83 // this is a recursive mutex - the SAME FIBER ONLY can call lock() multiple times without deadlocking
84 // two fibers however still cannot hold a lock at once.
85 struct FReMutex
86 {
87 	@disable this(ref FMutex); // see FEvent::this(ref FEvent)
88 
89 	private Fiber lockHolder; // null-safety: will be null iff lockcount == 0.
90 	private uint lockcount;
91 
92 	void lock() @Async
93 	{
94 		auto thisF = Fiber.getThis;
95 
96 		if (lockHolder == thisF)
97 		{
98 			assert(lockcount > 0, "when there is a lockholder, there must be a lock");
99 			// recursive - increment lock
100 			lockcount++;
101 			return;
102 		}
103 
104 		while (lockcount > 0) yield(); // wait for other fiber to unlock if necessary
105 
106 		assert(lockHolder is null, "when theres no locks, there can't be a holder of locks");
107 
108 		// initialize lock
109 		lockHolder = thisF;
110 		lockcount = 1;
111 	}
112 	void unlock()
113 	{
114 		assert(lockHolder !is null);
115 		assert(lockcount > 0);
116 		assert(lockHolder == Fiber.getThis, "you should not unlock a FMutex held by another fiber");
117 		lockcount--;
118 		if (lockcount == 0) lockHolder = null;
119 	}
120 }
121 
122 // core.sync.rwmutex : ReadWriteMutex
123 // not re-entrant.
124 // a mutex that can either be locked for reading or writing.
125 // many read locks are allowed at once and zero write locks OR exactly one write lock and zero read locks
126 struct FRWMutex
127 {
128 	@disable this(ref FRWMutex); // see FEvent::this(ref FEvent)
129 
130 	private bool writeLocked;
131 	private uint readLocks; // writeLocked => readLocks = 0 (in the mathematical sense of =>)
132 
133 	void lockWrite() @Async
134 	{
135 		// note that after waiting for all read locks to be freed, a write lock may have been placed again!
136 		// (or vice versa) so we must write to ensure no locks AT ALL before locking
137 
138 		if (writeLocked)
139 			assert(readLocks == 0, "cannot be any read locks while a write lock is held");
140 
141 		// wait for all locks of any kind to be freed
142 		while (writeLocked || readLocks > 0) yield();
143 
144 		writeLocked = true;
145 	}
146 
147 	void lockRead() @Async
148 	{
149 		if (writeLocked)
150 		{
151 			assert(readLocks == 0, "cannot be any read locks while a write lock is held");
152 
153 			// its okay if someone else gets a read lock while we're yielding
154 			while (writeLocked) yield();
155 		}
156 
157 		readLocks++;
158 	}
159 
160 	void unlockWrite()
161 	{
162 		assert(writeLocked);
163 		assert(readLocks == 0);
164 
165 		writeLocked = false;
166 	}
167 
168 	void unlockRead()
169 	{
170 		assert(!writeLocked);
171 		assert(readLocks > 0);
172 
173 		readLocks--;
174 	}
175 }
176 
177 // a lighter-weight thread-unsafe fiber equivalent of the dlang built in `synchronized` attr
178 // only one fiber can be in the process of executing this function at once.
179 template fSynchronized(alias func)
180 {
181 	import std.traits : isSomeFunction, ReturnType, Parameters;
182 
183 	static assert(isSomeFunction!func, "fSynchronized may only be instantiated with a function");
184 
185 	// even though this is always true, if we don't if for it, we get more compiler errors than just the assert
186 	// and nobody needs that, so just be satisfied with the assert.
187 	static if(isSomeFunction!func)
188 	{
189 		FMutex m;
190 
191 		ReturnType!func fSynchronized(Parameters!func args) @Async @Synchronized
192 		{
193 			m.lock();
194 			func(args);
195 			m.unlock();
196 		}
197 	}
198 } // i learned templates from scratch for this, and i'm proud of the result :) -- sink
199 
200 // like std typecons Nullable!T but calling .get waits for a value
201 // TODO: there has to be a better name for this surely
202 struct FGuardedResult(T)
203 {
204 	import std.typecons : Nullable;
205 
206 	@disable this(ref FGuardedResult); // see FEvent::this(ref FEvent)
207 
208 	private bool _hasValue;
209 	private T value;
210 
211 	bool hasValue() inout @property { return _hasValue; }
212 
213 	void set(T val)
214 	{
215 		value = val;
216 		_hasValue = true;
217 	}
218 
219 	void nullify() {
220 		_hasValue = false;
221 		value = T.init;
222 	}
223 
224 	T get() @Async
225 	{
226 		while (!_hasValue) yield();
227 		return value;
228 	}
229 
230 	Nullable!T tryGet()
231 	{
232 		if (_hasValue) return value;
233 		else return Nullable!T.init;
234 	}
235 }
236 
237 // like a golang WaitGroup
238 // while a semaphore releases one wait per notify (many notify -> many wait),
239 // and an event releases all waits on one notify (one notify -> many wait),
240 // a waitgroup holds waits until a set quantity of notifies have happened (many notify -> one wait)
241 // tip: you can pass the initial amount of notifies required in the constructor instead of using add
242 struct FWaitGroup
243 {
244 	@disable this(ref FWaitGroup); // see FEvent::this(ref FEvent)
245 
246 	private uint count;
247 
248 	void add(uint amt)
249 	{
250 		count += amt;
251 	}
252 
253 	void done()
254 	{
255 		assert(count > 0);
256 		count--;
257 	}
258 
259 	void wait() @Async
260 	{
261 		while (count > 0) yield();
262 	}
263 }
264 
265 // kinda like a channel in Go
266 struct FMessageBox(T)
267 {
268 	import std.container : DList;
269 	import std.typecons : Nullable;
270 
271 
272 	@disable this(ref FMessageBox); // see FEvent::this(ref FEvent)
273 
274 	private DList!T queue;
275 
276 	void send(T val)
277 	{
278 		queue.insertBack(val);
279 	}
280 
281 	T receive() @Async
282 	{
283 		while (queue.empty) yield();
284 
285 		auto front = queue.front;
286 		queue.removeFront();
287 		return front;
288 	}
289 
290 	Nullable!T tryReceive()
291 	{
292 		if (queue.empty) return Nullable!T.init;
293 
294 		return receive();
295 	}
296 }