1 // `task` provides a very minimal task implementation on top of fibers. 2 3 module rockhopper.rhapi.task; 4 5 import rockhopper.core.reactor : spawn; // used in constructor and in then 6 import rockhopper.rhapi.syncf : FEvent; // used for syncing the fibers 7 import rockhopper.core.uda : Async; 8 9 import std.typecons : Nullable; // used by tryGetRes 10 11 import std.traits : isSomeFunction, Parameters, ReturnType; // used for template inference stuff 12 13 struct Task(T) 14 { 15 enum VALUED = !is(T == void); 16 17 private FEvent ev; 18 19 static if(VALUED) 20 private T res; 21 22 // disable copying struct 23 @disable this(ref Task); 24 25 // may only call this with T function() or T delegate() 26 // we use a separate function to wrap this so we can infer the type T from F. 27 private this(F)(F dg) 28 { 29 static assert(isSomeFunction!F && Parameters!F.length == 0 && is(ReturnType!F == T)); 30 31 spawn({ 32 static if(VALUED) 33 res = dg(); 34 else 35 dg(); 36 ev.notify(); 37 }); 38 } 39 40 static if(VALUED) 41 { 42 private this(T value) 43 { 44 res = value; 45 ev.notify(); // this is so jank lmao 46 } 47 } 48 49 bool isFinished() inout @property 50 { 51 return ev.isSignaled; 52 } 53 54 static if(VALUED) 55 alias MAYBERES = Nullable!T; 56 else 57 alias MAYBERES = bool; 58 59 MAYBERES tryGetRes() 60 { 61 static if(!VALUED) 62 return isFinished; 63 else 64 return isFinished ? MAYBERES(res) : MAYBERES.init; 65 } 66 67 T waitRes() @Async 68 { 69 ev.wait(); 70 static if(VALUED) return res; 71 } 72 73 auto then(F)(F fn) 74 if (VALUED && isSomeFunction!F && Parameters!F.length == 1 && is(Parameters!F[0] == T)) 75 { 76 return Task!(ReturnType!F)({ 77 return fn(waitRes()); 78 }); 79 } 80 81 auto then(F)(F fn) 82 if (!VALUED && isSomeFunction!F && Parameters!F.length == 0) 83 { 84 return Task!(ReturnType!F)({ 85 waitRes(); 86 fn(); 87 }); 88 } 89 } 90 91 auto tSpawn(F)(F fn) 92 if (isSomeFunction!F && Parameters!F.length == 0) 93 { 94 return Task!(ReturnType!F)(fn); 95 } 96 97 auto taskify(alias F)(Parameters!F params) 98 { 99 return tSpawn({ return F(params); }); 100 } 101 102 auto completedTask(T)(T value) 103 { 104 return Task!T(value); 105 } 106 107 void waitAllTasks(T)(Task!T*[] tasks) @Async 108 { 109 foreach (t; tasks) t.waitRes(); // lol 110 } 111 112 // heterogenous version 113 void waitAllTasks(TASKS...)() @Async 114 { 115 import std.traits : isInstanceOf; 116 117 static foreach(t; TASKS) 118 { 119 static assert(isInstanceOf!(Task, typeof(t)), "Cannot pass a value to waitAllTasks that is not a Task"); 120 121 t.waitRes(); // lol it really is that shrimple 122 } 123 } 124 125 void waitAnyTask(T)(Task!T*[] tasks) @Async 126 { 127 FEvent ev; 128 129 foreach (t; tasks) 130 static if (is(T == void)) 131 t.then({ ev.notify(); }); 132 else 133 t.then((T _) { ev.notify(); }); 134 135 ev.wait(); 136 } 137 138 // heterogenous 139 void waitAnyTask(TASKS...)() @Async 140 { 141 import std.traits : isInstanceOf, TemplateArgsOf; 142 143 FEvent ev; 144 145 static foreach(t; TASKS) 146 {{ 147 alias TTask = typeof(t); 148 149 static assert(isInstanceOf!(Task, TTask), "Cannot pass a value to waitAnyTask that is not a Task"); 150 151 alias TValue = TemplateArgsOf!TTask[0]; 152 153 static if (is(TValue == void)) 154 t.then({ ev.notify(); }); 155 else 156 t.then((TValue _) { ev.notify(); }); 157 }} 158 159 ev.wait(); 160 } 161 162 // like tSpawn but runs on another thread. The returned task is for use on *this* thread. 163 auto tSpawnAsThread(F)(F fn) 164 if (isSomeFunction!F && Parameters!F.length == 0) 165 { 166 import rockhopper.core.threading : spawnThread; 167 168 alias T = ReturnType!F; 169 170 return tSpawn({ 171 static if (!is(T == void)) 172 shared ReturnType!F res; 173 174 spawnThread({ 175 static if (is(T == void)) 176 tSpawn(fn).waitRes(); 177 else 178 res = tSpawn(fn).waitRes(); 179 }).join(); 180 181 static if (!is(T == void)) 182 return res; 183 }); 184 }