1 module rockhopper.rhapi.file; 2 3 import rockhopper.core.reactor; 4 import rockhopper.core.uda; 5 6 public import eventcore.driver : FileOpenMode; 7 import eventcore.driver : FileFD, PipeFD, OpenStatus, IOStatus; 8 9 import std.traits : isInstanceOf; 10 11 12 class FileException(S) : Exception 13 { 14 static if (!is(S == void)) 15 { 16 S status; 17 18 this(S st, string msg) 19 { 20 status = st; 21 super(msg); 22 } 23 } 24 } 25 26 alias FileIOException = FileException!IOStatus; 27 alias FileOpenException = FileException!OpenStatus; 28 29 // throws FileException!S if v != OK 30 private void check(alias OK, alias TEM)(typeof(OK) v) 31 { 32 import std.exception : enforce; 33 import std.conv : to; 34 35 enforce(v == OK, new FileException!(typeof(OK))(v, TEM(v.to!string))); 36 } 37 38 // a quick note on types 39 // int / HANDLE system FD or handle 40 // FILE* C stdio.h streams <-- *THIS IS A RED HERRING, RESIST THE URGE TO USE THIS HERE* 41 // phobos file wraps a FILE* 42 // FileFD wraps an int / HANDLE 43 // rh File wraps a FileFD 44 45 // copyable via refcounting, auto closes on last detach 46 // modelled after the implementation of phobos/std/stdio.d/File 47 // if pipe, then reads and writes will not accept offsets etc. use the appropriate one for your FD type 48 private struct Handle(bool IS_PIPE = false, bool READABLE = true, bool WRITEABLE = true) 49 { 50 import std.meta : AliasSeq; 51 import core.memory : pureMalloc, pureFree; 52 import core.atomic : atomicStore, atomicLoad, atomicOp; 53 import eventcore.core : eventDriver; 54 55 import rockhopper.core.llevents : fileOpen, fileClose, fileRead, fileWrite, pipeRead, pipeWrite; 56 import rockhopper.rhapi.syncf : FMutex; // TODO: TMutex 57 58 public enum bool isPipe = IS_PIPE; 59 public enum bool isReadable = READABLE; 60 public enum bool isWriteable = WRITEABLE; 61 62 // make it easy to refer to the necessary driver 63 static if(IS_PIPE) 64 private auto driverfp = () => eventDriver.pipes; 65 else 66 private auto driverfp = () => eventDriver.files; 67 68 static if (IS_PIPE) 69 private alias FD = PipeFD; 70 else 71 private alias FD = FileFD; 72 73 // === STORAGE === 74 75 // struct to keep it all close together in memory 76 private struct Impl { 77 FD fd; // null if another instance closes this file 78 shared uint refCount; 79 bool noAutoClose; // if true, never close automatically 80 FMutex mutex; 81 } 82 private Impl* _impl; 83 84 // === CONSTRUCTORS === 85 86 this(int handle, bool noAutoClose = true, uint refs = 1) nothrow 87 { 88 initialize(driverfp().adopt(handle), noAutoClose, refs); 89 } 90 91 this(FD handle, bool noAutoClose = true, uint refs = 1) @nogc nothrow 92 { 93 initialize(handle, noAutoClose, refs); 94 } 95 96 private void initialize(FD handle, bool noAutoClose = false, uint refs = 1) @nogc nothrow 97 { 98 assert(!_impl); 99 _impl = cast(Impl*) pureMalloc(Impl.sizeof); 100 *_impl = Impl.init; // overwrite uninitialized memory from malloc! 101 assert(_impl); 102 103 104 _impl.fd = handle; 105 _impl.noAutoClose = noAutoClose; 106 atomicStore(_impl.refCount, refs); 107 } 108 109 // i've checked, async *struct* constructors are safe! 110 static if (!IS_PIPE) 111 this(string name, FileOpenMode mode) @Async 112 { 113 auto fd = fileOpen(name, mode); 114 check!(OpenStatus.ok, (s) => "open failed: " ~ s)(fd.status); 115 116 initialize(fd.fd); 117 } 118 119 // copy constructors between pipes of different types 120 static if (IS_PIPE) 121 { 122 this(P)(scope P rhs) 123 if ( 124 // pipew and piper -> pipea 125 (WRITEABLE && READABLE && (is(P == Handle!(true, true, false)) || is(P == Handle!(true, false, true)))) 126 // pipea -> pipew 127 || (WRITEABLE && !READABLE && is(P == Handle!true)) 128 // pipea -> piper 129 || (!WRITEABLE && READABLE && is(P == Handle!true)) 130 ) 131 { 132 if (!rhs._impl) return; 133 assert(atomicLoad(rhs._impl.refCount)); 134 atomicOp!"+="(rhs._impl.refCount, 1); // add a ref 135 _impl = cast(Impl*) rhs._impl; // duplicate the impl pointer 136 } 137 } 138 139 // === LIFECYCLE === 140 141 // +1 refcount for fresh instances 142 this(this) 143 { 144 if (!_impl) return; // null impl, whatever 145 assert(atomicLoad(_impl.refCount)); 146 atomicOp!"+="(_impl.refCount, 1); 147 } 148 149 ~this() => detach(); 150 151 // -1 refcount & free & close 152 void detach() 153 { 154 if (!_impl) return; 155 scope(exit) _impl = null; 156 157 if (atomicOp!"-="(_impl.refCount, 1) == 0) 158 { 159 scope(exit) pureFree(_impl); 160 161 if (!_impl.noAutoClose) 162 driverfp().releaseRef(_impl.fd); // closes the fd synchronously 163 } 164 } 165 166 // === INTERNAL UTILS === 167 168 private T withLock(T)(T delegate() f) 169 { 170 _impl.mutex.lock(); 171 scope(exit) _impl.mutex.unlock(); 172 return f(); 173 } 174 175 // === FILE OPERATIONS === 176 177 // replaces the currently open file of this instance with a new one 178 static if (!IS_PIPE) 179 void open(string name, FileOpenMode mode) @Async 180 { 181 detach(); // if this instance points to a file, detach 182 183 auto opened = fileOpen(name, mode); 184 // if this fails, leaves the file with no _impl attached cleanly. 185 check!(OpenStatus.ok, (s) => "open failed: " ~ s)(opened.status); 186 187 initialize(opened.fd); 188 } 189 190 // need to check fd because will be null if another instance closed the file. 191 @property bool isOpen() const pure nothrow 192 => _impl !is null && _impl.fd; 193 194 // if open, closes, else does nothing 195 void close() 196 { 197 if (!_impl) return; 198 199 scope(exit) 200 { 201 // basically the logic from detach() 202 if (atomicOp!"-="(_impl.refCount, 1) == 0) 203 pureFree(_impl); 204 _impl = null; 205 } 206 207 if (!_impl.fd) return; // already closed on another thread 208 scope(exit) _impl.fd = typeof(_impl.fd).init; // why not 209 210 driverfp().releaseRef(_impl.fd); // close 211 } 212 213 static if (!IS_PIPE) 214 void sync() 215 { 216 // TODO: Windows support 217 // TODO: Darwin support 218 import core.sys.posix.unistd : fsync; 219 import std.exception : errnoEnforce; 220 221 errnoEnforce(fsync(fileno) == 0); 222 } 223 224 @property int fileno() const 225 { 226 auto fd = _impl.fd.value.value; 227 assert(fd < int.max); 228 return cast(int) fd; 229 } 230 231 static if (!IS_PIPE) 232 @property ulong length() const 233 { 234 return eventDriver.files.getSize(_impl.fd); 235 } 236 237 // TODO: truncate 238 239 // make the offset arg only exist for files without having to declare the functions twice 240 static if (IS_PIPE) 241 private alias OFFSET_ARGS = AliasSeq!(); 242 else 243 private alias OFFSET_ARGS = AliasSeq!ulong; 244 245 static if (READABLE) 246 ulong read(OFFSET_ARGS oset, ubyte[] buf) @Async 247 { 248 return withLock({ 249 static if (IS_PIPE) 250 auto res = pipeRead(_impl.fd, buf); 251 else 252 auto res = fileRead(_impl.fd, oset[0], buf); 253 254 check!(IOStatus.ok, (s) => "read error: " ~ s)(res.status); 255 return res.bytesRWd; 256 }); 257 } 258 259 static if (WRITEABLE) 260 ulong write(OFFSET_ARGS oset, const(ubyte)[] buf) @Async 261 { 262 return withLock({ 263 static if (IS_PIPE) 264 auto res = pipeWrite(_impl.fd, buf); 265 else 266 auto res = fileWrite(_impl.fd, oset[0], buf); 267 268 check!(IOStatus.ok, (s) => "write error: " ~ s)(res.status); 269 return res.bytesRWd; 270 }); 271 } 272 } 273 274 // wraps a file handle 275 alias FileH = Handle!false; 276 // wraps one end of a pipe handle 277 alias PipeEndHR = Handle!(true, true, false); 278 alias PipeEndHW = Handle!(true, false); 279 alias PipeEndHA = Handle!true; 280 281 // wraps a complete pipe with a read and write end 282 struct Pipe 283 { 284 private PipeEndHR _read; 285 private PipeEndHW _write; 286 287 @property PipeEndHR readEnd() nothrow { return _read; } 288 @property PipeEndHW writeEnd() nothrow { return _write; } 289 290 @property auto readStream() nothrow { return streamify(_read); } 291 @property auto writeStream() nothrow { return streamify(_write); } 292 293 // generally should be unnecessary, as both pipes will automatically close themselves when 294 // there are no more references to them 295 void close() 296 { 297 _read.close(); 298 _write.close(); 299 } 300 301 // creates a pipe! 302 // cannot just be a default constructor for struct reasons 303 static Pipe create() 304 { 305 // https://github.com/dlang/phobos/blob/c970ca6/std/process.d#L2756 306 version (Posix) 307 { 308 import core.sys.posix.unistd : pipe; 309 int[2] fds; 310 check!(0, (_) => "failed to open pipe")(pipe(fds)); 311 312 return Pipe(PipeEndHR(fds[0], false), PipeEndHW(fds[1], false)); 313 } 314 else 315 { 316 // TODO 317 } 318 } 319 } 320 321 enum SeekOrigin 322 { 323 set, 324 curr, 325 end 326 } 327 328 struct Stream(H) if (isInstanceOf!(Handle, H)) 329 { 330 H handle; 331 332 enum isSeekable = !H.isPipe; 333 enum isReadable = H.isReadable; 334 enum isWriteable = H.isWriteable; 335 336 // position management 337 static if (isSeekable) 338 { 339 private ulong _index; 340 private bool _eof; 341 342 @property ulong tell() const 343 { 344 return _index; 345 } 346 347 // forward length from handle for convenience 348 @property ulong length() const 349 { 350 return handle.length; 351 } 352 353 @property bool isEof() const 354 { 355 return _eof; 356 } 357 358 void seek(long oset, SeekOrigin origin = SeekOrigin.set) 359 { 360 import std.exception : enforce; 361 362 auto len = length; 363 364 final switch (origin) with (SeekOrigin) 365 { 366 case set: 367 enforce(oset < len, "cannot seek past the end of the stream"); 368 _index = oset; 369 break; 370 371 case curr: 372 enforce(oset >= 0 || -oset > _index, "cannot seek before the start of the stream"); 373 enforce((oset + _index) <= len, "cannot seek past the end of the stream"); 374 _index += oset; 375 break; 376 377 case end: 378 enforce(oset <= 0, "cannot seek past the end of the stream"); 379 _index = len + oset; 380 break; 381 } 382 383 checkEOF(len); 384 } 385 386 private void checkEOF() 387 { 388 checkEOF(length); 389 } 390 391 private void checkEOF(ulong l) 392 { 393 _eof = _index == l; 394 } 395 } 396 397 static if (isReadable) 398 // reads into a buffer, updating the index and eof state as necessary, returning the amount written 399 ulong rawRead(ubyte[] buffer) @Async 400 { 401 static if (!isSeekable) return handle.read(buffer); 402 else 403 { 404 ulong bytesXfered; 405 406 // we only check if this read will EOF using the file length if an error occurs, 407 // as checking length() every time would be relatively expensive. 408 try 409 { 410 bytesXfered = handle.read(_index, buffer); 411 } 412 catch (FileIOException e) 413 { 414 auto fileLen = length(); 415 416 if ( 417 // error state MIGHT BE an EOF 418 e.status == IOStatus.error 419 // this read indeed would have EOFed 420 && _index + buffer.length > fileLen) 421 { 422 bytesXfered = fileLen - _index; 423 _eof = true; 424 } 425 else 426 throw e; 427 } 428 429 _index += bytesXfered; 430 431 return bytesXfered; 432 } 433 } 434 435 static if (isWriteable) 436 // reads into a buffer, updating the index and eof state as necessary, returning the amount written 437 ulong rawWrite(const(ubyte)[] buffer) @Async 438 { 439 static if (!isSeekable) return handle.write(buffer); 440 else 441 { 442 ulong bytesXfered; 443 444 try 445 { 446 bytesXfered = handle.write(_index, buffer); 447 } 448 catch (FileIOException e) 449 { 450 auto fileLen = length(); 451 452 if ( 453 // error state MIGHT BE an EOF 454 e.status == IOStatus.error 455 // this read indeed would have EOFed 456 && _index + buffer.length > fileLen) 457 { 458 bytesXfered = fileLen - _index; 459 _eof = true; 460 } 461 else 462 throw e; 463 } 464 465 _index += bytesXfered; 466 467 return bytesXfered; 468 } 469 } 470 471 static if (isReadable) 472 ubyte[] rawRead(ulong upTo) @Async 473 { 474 ubyte[] buf = new ubyte[upTo]; 475 auto xfered = rawRead(buf); 476 return buf[0 .. xfered]; 477 } 478 479 // TODO: nice apis 480 } 481 482 // this sucks but idk what else to do because of template arg inference 483 auto streamify(H)(scope H value) if (isInstanceOf!(Handle, H)) 484 { 485 return Stream!H(value); 486 } 487 488 import core.sys.posix.unistd : STDIN_FILENO, STDOUT_FILENO, STDERR_FILENO; 489 490 auto getStdin() @property // @suppress(dscanner.confusing.function_attributes) 491 { 492 int fd; 493 // TODO: other platforms than POSIX 494 version (Posix) 495 { 496 import core.sys.posix.unistd : dup; 497 fd = dup(STDIN_FILENO); 498 assert(fd); 499 } 500 501 return streamify(PipeEndHR(fd, false)); 502 } 503 504 auto getStdout() @property // @suppress(dscanner.confusing.function_attributes) 505 { 506 int fd; 507 version (Posix) 508 { 509 import core.sys.posix.unistd : dup; 510 511 fd = dup(STDOUT_FILENO); 512 assert(fd); 513 } 514 515 return streamify(PipeEndHW(fd, false)); 516 } 517 518 auto getStderr() @property // @suppress(dscanner.confusing.function_attributes) 519 { 520 int fd; 521 version (Posix) 522 { 523 import core.sys.posix.unistd : dup; 524 525 fd = dup(STDERR_FILENO); 526 assert(fd); 527 } 528 529 return streamify(PipeEndHW(fd, false)); 530 }