1 // `syncf` contains fiber synchronization primitives. 2 // They provide familiar looking sync tools with support for fibers, without thread safety. 3 // They are designed to be as low overhead and efficient as possible. (stack allocated structs without locks, etc) 4 // Also check `synct`! 5 6 7 module rockhopper.rhapi.syncf; 8 9 import rockhopper.core.reactor : yield, spawn; 10 import rockhopper.core.llevents : sleep; 11 import rockhopper.core.uda : Async, Synchronized; 12 import std.datetime : Duration; 13 import core.thread.fiber : Fiber; 14 15 // core.sync.event : Event 16 struct FEvent 17 { 18 // this does not have reference semantics so purposefully prevent footgunning, use a FEvent* if you need it. 19 @disable this(ref FEvent); 20 21 private bool raised; 22 private uint waiters; 23 24 void notify() { raised = true; } 25 26 void wait() @Async 27 { 28 waiters++; 29 while (!raised) yield(); 30 waiters--; 31 if (waiters == 0) raised = false; // auto-reset 32 } 33 } 34 35 // this is like an FEvent however instead of just wrapping a bool, it keeps a count, 36 // such that exactly as many wait()s are resolved as notify()s are called. 37 // one notify resolves one wait. 38 // core.sync.semaphore : Semaphore 39 struct FSemaphore 40 { 41 @disable this(ref FSemaphore); // see FEvent::this(ref FEvent) 42 43 private uint count; 44 45 void notify() { count++; } 46 47 bool tryWait() 48 { 49 if (count == 0) return false; 50 count--; 51 return true; 52 } 53 54 void wait() @Async 55 { 56 while (count == 0) yield(); 57 count--; 58 } 59 } 60 61 62 // non-recursive mutex, can only have one lock at all even from the same fiber 63 struct FMutex 64 { 65 @disable this(ref FMutex); 66 67 private bool locked; 68 69 void lock() @Async 70 { 71 while (locked) yield(); 72 locked = true; 73 } 74 75 void unlock() 76 { 77 assert(locked, "unlocking an unlocked FMutex makes no sense"); 78 locked = false; 79 } 80 } 81 82 // core.sync.mutex : Mutex 83 // this is a recursive mutex - the SAME FIBER ONLY can call lock() multiple times without deadlocking 84 // two fibers however still cannot hold a lock at once. 85 struct FReMutex 86 { 87 @disable this(ref FMutex); // see FEvent::this(ref FEvent) 88 89 private Fiber lockHolder; // null-safety: will be null iff lockcount == 0. 90 private uint lockcount; 91 92 void lock() @Async 93 { 94 auto thisF = Fiber.getThis; 95 96 if (lockHolder == thisF) 97 { 98 assert(lockcount > 0, "when there is a lockholder, there must be a lock"); 99 // recursive - increment lock 100 lockcount++; 101 return; 102 } 103 104 while (lockcount > 0) yield(); // wait for other fiber to unlock if necessary 105 106 assert(lockHolder is null, "when theres no locks, there can't be a holder of locks"); 107 108 // initialize lock 109 lockHolder = thisF; 110 lockcount = 1; 111 } 112 void unlock() 113 { 114 assert(lockHolder !is null); 115 assert(lockcount > 0); 116 assert(lockHolder == Fiber.getThis, "you should not unlock a FMutex held by another fiber"); 117 lockcount--; 118 if (lockcount == 0) lockHolder = null; 119 } 120 } 121 122 // core.sync.rwmutex : ReadWriteMutex 123 // not re-entrant. 124 // a mutex that can either be locked for reading or writing. 125 // many read locks are allowed at once and zero write locks OR exactly one write lock and zero read locks 126 struct FRWMutex 127 { 128 @disable this(ref FRWMutex); // see FEvent::this(ref FEvent) 129 130 private bool writeLocked; 131 private uint readLocks; // writeLocked => readLocks = 0 (in the mathematical sense of =>) 132 133 void lockWrite() @Async 134 { 135 // note that after waiting for all read locks to be freed, a write lock may have been placed again! 136 // (or vice versa) so we must write to ensure no locks AT ALL before locking 137 138 if (writeLocked) 139 assert(readLocks == 0, "cannot be any read locks while a write lock is held"); 140 141 // wait for all locks of any kind to be freed 142 while (writeLocked || readLocks > 0) yield(); 143 144 writeLocked = true; 145 } 146 147 void lockRead() @Async 148 { 149 if (writeLocked) 150 { 151 assert(readLocks == 0, "cannot be any read locks while a write lock is held"); 152 153 // its okay if someone else gets a read lock while we're yielding 154 while (writeLocked) yield(); 155 } 156 157 readLocks++; 158 } 159 160 void unlockWrite() 161 { 162 assert(writeLocked); 163 assert(readLocks == 0); 164 165 writeLocked = false; 166 } 167 168 void unlockRead() 169 { 170 assert(!writeLocked); 171 assert(readLocks > 0); 172 173 readLocks--; 174 } 175 } 176 177 // a lighter-weight thread-unsafe fiber equivalent of the dlang built in `synchronized` attr 178 // only one fiber can be in the process of executing this function at once. 179 template fSynchronized(alias func) 180 { 181 import std.traits : isSomeFunction, ReturnType, Parameters; 182 183 static assert(isSomeFunction!func, "fSynchronized may only be instantiated with a function"); 184 185 // even though this is always true, if we don't if for it, we get more compiler errors than just the assert 186 // and nobody needs that, so just be satisfied with the assert. 187 static if(isSomeFunction!func) 188 { 189 FMutex m; 190 191 ReturnType!func fSynchronized(Parameters!func args) @Async @Synchronized 192 { 193 m.lock(); 194 func(args); 195 m.unlock(); 196 } 197 } 198 } // i learned templates from scratch for this, and i'm proud of the result :) -- sink 199 200 // like std typecons Nullable!T but calling .get waits for a value 201 // TODO: there has to be a better name for this surely 202 struct FGuardedResult(T) 203 { 204 import std.typecons : Nullable; 205 206 @disable this(ref FGuardedResult); // see FEvent::this(ref FEvent) 207 208 private bool _hasValue; 209 private T value; 210 211 bool hasValue() inout @property { return _hasValue; } 212 213 void set(T val) 214 { 215 value = val; 216 _hasValue = true; 217 } 218 219 void nullify() { 220 _hasValue = false; 221 value = T.init; 222 } 223 224 T get() @Async 225 { 226 while (!_hasValue) yield(); 227 return value; 228 } 229 230 Nullable!T tryGet() 231 { 232 if (_hasValue) return value; 233 else return Nullable!T.init; 234 } 235 } 236 237 // like a golang WaitGroup 238 // while a semaphore releases one wait per notify (many notify -> many wait), 239 // and an event releases all waits on one notify (one notify -> many wait), 240 // a waitgroup holds waits until a set quantity of notifies have happened (many notify -> one wait) 241 // tip: you can pass the initial amount of notifies required in the constructor instead of using add 242 struct FWaitGroup 243 { 244 @disable this(ref FWaitGroup); // see FEvent::this(ref FEvent) 245 246 private uint count; 247 248 void add(uint amt) 249 { 250 count += amt; 251 } 252 253 void done() 254 { 255 assert(count > 0); 256 count--; 257 } 258 259 void wait() @Async 260 { 261 while (count > 0) yield(); 262 } 263 } 264 265 // kinda like a channel in Go 266 struct FMessageBox(T) 267 { 268 import std.container : DList; 269 import std.typecons : Nullable; 270 271 272 @disable this(ref FMessageBox); // see FEvent::this(ref FEvent) 273 274 private DList!T queue; 275 276 void send(T val) 277 { 278 queue.insertBack(val); 279 } 280 281 T receive() @Async 282 { 283 while (queue.empty) yield(); 284 285 auto front = queue.front; 286 queue.removeFront(); 287 return front; 288 } 289 290 Nullable!T tryReceive() 291 { 292 if (queue.empty) return Nullable!T.init; 293 294 return receive(); 295 } 296 }