1 // `synct` has synchronization primitives like `syncf` that are thread safe, and will lock between fibers across 2 // multiple threads' reactors. 3 // these should be preferred over core.sync as instead of e.g. one thread being allowed to lock a mutex, only one fiber 4 // is allowed to lock that mutex, across many threads. 5 6 module rockhopper.rhapi.synct; 7 import rockhopper.core.uda : Async, ThreadSafe, Synchronized; 8 9 import rockhopper.core.llevents : waitThreadEvent; 10 import core.atomic : atomicOp; 11 12 // for note for future me: 13 // `synchronized` methods lock on the whole instance, not per-method: 14 // https://dlang.org/spec/class.html#synchronized-methods #16.6.1 15 16 // These need to be classes so we can use `synchronized(this)` and `synchronized` members 17 18 final shared @ThreadSafe class TEvent 19 { 20 @ThreadSafe: 21 22 import eventcore.core : eventDriver; 23 import eventcore.driver : EventID, EventDriver; 24 import std.typecons : Tuple, tuple; 25 import core.thread.osthread : Thread; 26 27 alias ThreadEventsTy = Tuple!(shared(EventDriver), EventID)[typeof(Thread.getThis.id)]; 28 29 // you may only await an event from the thread that created it, so we need one event per thread 30 // SAFETY: only accessed within `synchronized` 31 private __gshared ThreadEventsTy threadEvents; 32 33 synchronized void notify() 34 { 35 foreach (_, tup; threadEvents) 36 tup[0].events.trigger(tup[1], true); 37 38 // reset at same time since all those thread events are done with now 39 // expect the freshly awoken threads to free the resources we're removing the references to here 40 threadEvents = ThreadEventsTy.init; 41 } 42 43 void wait() @Async 44 { 45 auto tid = Thread.getThis.id; 46 EventID ev = void; // always assigned 47 48 synchronized (this) 49 { 50 if (tid in threadEvents) 51 { 52 ev = threadEvents[tid][1]; 53 eventDriver.events.addRef(ev); // just in case two fibers wait this TEvent on the same thread :) 54 } 55 else 56 { 57 ev = eventDriver.events.create(); 58 threadEvents[tid] = tuple(cast(shared) eventDriver, ev); 59 } 60 } 61 62 waitThreadEvent(ev); 63 // threadEvents has now been cleared so this is safe to clean up. 64 eventDriver.events.releaseRef(ev); // free resources like a good citizen 65 } 66 } 67 68 // like TEvent but has a stateful triggered/untriggered value, idk if theres a good name for this 69 /* final shared @ThreadSafe class TStatefulEvent 70 { 71 @ThreadSafe: 72 73 private bool triggered; 74 private TEvent ev = new TEvent; 75 76 synchronized void notify() 77 { 78 if (triggered) 79 return; // i don't think this is necessary but can't hurt. 80 81 triggered = true; 82 83 ev.notify(); 84 } 85 86 synchronized void reset() 87 { 88 if (triggered) 89 { 90 triggered = false; 91 // safety: by here no fibers should have a waiting ec event anymore so we can wipe them. 92 threadEvents = ThreadEventsTy.init; 93 } 94 // resetting an event that has not been triggered is a no-op! - otherwise it could hang fibers forever. 95 } 96 97 void wait() @Async 98 { 99 while (!triggered) 100 { 101 ev.wait(); 102 // if the thread event resolves, triggered may still be false because another fiber got there before us, 103 // and reset the event. to prevent this case, we loop around again. 104 } 105 } 106 } */ 107 108 final shared @ThreadSafe class TSemaphore 109 { 110 @ThreadSafe: 111 112 // we always access this inside of a `synchronized` so atomics are unnecessary 113 // __gshared disables the compiler enforcement for that 114 // MAKE SURE YOU NEVER USE THIS OUTSIDE OF A `synchronized(this)` OR `synchronized` METHOD 115 private __gshared uint count; 116 private TEvent notifyEv = new TEvent; 117 118 synchronized void notify() 119 { 120 count++; 121 122 // wake up the fibers! 123 notifyEv.notify(); 124 } 125 126 synchronized bool tryWait() 127 { 128 if (count == 0) 129 return false; 130 131 count--; 132 return true; 133 } 134 135 void wait() @Async 136 { 137 while (true) 138 { 139 // try and "wake" this fiber, only one thread and therefore fiber is allowed in this block at a time. 140 synchronized (this) 141 { 142 if (count > 0) 143 { 144 count--; 145 return; 146 } 147 } 148 149 // someone else got here first! release the lock and wait until we're told theres new notifies. 150 notifyEv.wait(); 151 } 152 } 153 } 154 155 // non-recursive mutex 156 // im PRETTY SURE this is safe but don't shoot me if it isn't im doing my best 157 final shared @ThreadSafe class TMutex 158 { 159 @ThreadSafe: 160 161 private bool locked; 162 private TEvent unlockEv = new TEvent; 163 164 void lock() @Async 165 { 166 while (true) 167 { 168 // see if we get to take the mutex! 169 synchronized (this) 170 { 171 if (!locked) 172 { 173 // we can lock! 174 locked = true; 175 return; 176 } 177 } 178 179 // we didn't get it this time, efficiently wait for an unlock. 180 unlockEv.wait(); 181 } 182 } 183 184 synchronized void unlock() 185 { 186 assert(locked, "unlocking an unlocked TMutex makes no sense"); 187 locked = false; 188 unlockEv.notify(); 189 } 190 } 191 192 // TODO: remutex 193 // TODO: rwmutex 194 195 // an equivalent of the `synchronized` attribute that also ensures mut-ex of fibers, not just threads. 196 template tSynchronized(alias func) 197 { 198 import std.traits : isSomeFunction, ReturnType, Parameters; 199 200 static assert(isSomeFunction!func, "tSynchronized may only be instantiated with a function"); 201 202 // even though this is always true, if we don't if for it, we get more compiler errors than just the assert 203 // and nobody needs that, so just be satisfied with the assert. 204 static if(isSomeFunction!func) 205 { 206 TMutex m; 207 208 ReturnType!func tSynchronized(Parameters!func args) @Async @Synchronized 209 { 210 m.lock(); 211 func(args); 212 m.unlock(); 213 } 214 } 215 } 216 217 // a nullable in which trying to get the value while its null waits for a result to be set 218 final shared @ThreadSafe class TGuardedResult(T) 219 { 220 @ThreadSafe: 221 import std.typecons : Nullable; 222 223 private TEvent setEv = new TEvent; 224 private bool hasValue; 225 private T value; 226 227 synchronized void set(T val) 228 { 229 value = T; 230 hasValue = true; 231 setEv.notify(); 232 } 233 234 synchronized void nullify() 235 { 236 hasValue = false; 237 value = T.init; 238 } 239 240 T get() @Async 241 { 242 // this pattern should be familiar by now, if not, read the comments on some earlier sync tools 243 while (true) 244 { 245 synchronized (this) 246 { 247 if (hasValue) 248 return value; 249 } 250 251 setEv.wait(); 252 } 253 } 254 255 synchronized Nullable!T tryGet() 256 { 257 if (hasValue) return value; 258 return Nullable!T.init; 259 } 260 } 261 262 // like a golang WaitGroup 263 // see FWaitGroup for more detailed notes on how this works and how it differs from a semaphore 264 final shared @ThreadSafe class TWaitGroup 265 { 266 @ThreadSafe: 267 private TEvent doneEv = new TEvent; 268 // safety: this is only accessed inside `synchronized` sections. 269 private __gshared uint count; 270 271 this() { } 272 273 this(uint c) 274 { 275 count = c; 276 } 277 278 synchronized void add(uint amt) 279 { 280 count += amt; 281 } 282 283 synchronized void done() 284 { 285 assert(count > 0); 286 count--; 287 doneEv.notify(); 288 } 289 290 void wait() @Async 291 { 292 while (true) 293 { 294 synchronized (this) 295 { 296 if (count == 0) return; 297 } 298 299 doneEv.wait(); 300 } 301 } 302 } 303 304 // kinda like a channel in go 305 final shared @ThreadSafe class TMessageBox(T) 306 { 307 @ThreadSafe: 308 import std.container : DList; 309 import std.typecons : Nullable; 310 311 private TEvent sendEv = new TEvent; 312 private DList!T queue; 313 314 synchronized void send(T val) 315 { 316 queue.insertBack(val); 317 sendEv.notify(); 318 } 319 320 T receive() @Async 321 { 322 while (true) 323 { 324 synchronized (this) 325 { 326 if (!queue.empty) 327 { 328 auto f = queue.front; 329 queue.removeFront(); 330 return f; 331 } 332 } 333 334 sendEv.wait(); 335 } 336 } 337 338 synchronized Nullable!T tryReceive() 339 { 340 if (queue.empty) return Nullable!T.init; 341 // in a good display of why synct is less efficient than syncf, 342 // FMessageBox just calls receive() here for free, but that causes an extra lock here :( 343 auto f = queue.front; 344 queue.removeFront(); 345 return f; 346 } 347 }