1 module rockhopper.rhapi.file;
2 
3 import rockhopper.core.reactor;
4 import rockhopper.core.uda;
5 
6 public import eventcore.driver : FileOpenMode;
7 import eventcore.driver : FileFD, PipeFD, OpenStatus, IOStatus;
8 
9 import std.traits : isInstanceOf;
10 
11 
12 class FileException(S) : Exception
13 {
14 	static if (!is(S == void))
15 	{
16 		S status;
17 
18 		this(S st, string msg)
19 		{
20 			status = st;
21 			super(msg);
22 		}
23 	}
24 }
25 
26 alias FileIOException = FileException!IOStatus;
27 alias FileOpenException = FileException!OpenStatus;
28 
29 // throws FileException!S if v != OK
30 private void check(alias OK, alias TEM)(typeof(OK) v)
31 {
32 	import std.exception : enforce;
33 	import std.conv : to;
34 
35 	enforce(v == OK, new FileException!(typeof(OK))(v, TEM(v.to!string)));
36 }
37 
38 // a quick note on types
39 // int / HANDLE  system FD or handle
40 // FILE* C       stdio.h streams <-- *THIS IS A RED HERRING, RESIST THE URGE TO USE THIS HERE*
41 // phobos file   wraps a FILE*
42 // FileFD        wraps an int / HANDLE
43 // rh File       wraps a FileFD
44 
45 // copyable via refcounting, auto closes on last detach
46 // modelled after the implementation of phobos/std/stdio.d/File
47 // if pipe, then reads and writes will not accept offsets etc. use the appropriate one for your FD type
48 private struct Handle(bool IS_PIPE = false, bool READABLE = true, bool WRITEABLE = true)
49 {
50 	import std.meta : AliasSeq;
51 	import core.memory : pureMalloc, pureFree;
52 	import core.atomic : atomicStore, atomicLoad, atomicOp;
53 	import eventcore.core : eventDriver;
54 
55 	import rockhopper.core.llevents : fileOpen, fileClose, fileRead, fileWrite, pipeRead, pipeWrite;
56 	import rockhopper.rhapi.syncf : FMutex; // TODO: TMutex
57 
58 	public enum bool isPipe = IS_PIPE;
59 	public enum bool isReadable = READABLE;
60 	public enum bool isWriteable = WRITEABLE;
61 
62 	// make it easy to refer to the necessary driver
63 	static if(IS_PIPE)
64 		private auto driverfp = () => eventDriver.pipes;
65 	else
66 		private auto driverfp = () => eventDriver.files;
67 
68 	static if (IS_PIPE)
69 		private alias FD = PipeFD;
70 	else
71 		private alias FD = FileFD;
72 
73 	// === STORAGE ===
74 
75 	// struct to keep it all close together in memory
76 	private struct Impl {
77 		FD fd; // null if another instance closes this file
78 		shared uint refCount;
79 		bool noAutoClose; // if true, never close automatically
80 		FMutex mutex;
81 	}
82 	private Impl* _impl;
83 
84 	// === CONSTRUCTORS ===
85 
86 	this(int handle, bool noAutoClose = true, uint refs = 1) nothrow
87 	{
88 		initialize(driverfp().adopt(handle), noAutoClose, refs);
89 	}
90 
91 	this(FD handle, bool noAutoClose = true, uint refs = 1) @nogc nothrow
92 	{
93 		initialize(handle, noAutoClose, refs);
94 	}
95 
96 	private void initialize(FD handle, bool noAutoClose = false, uint refs = 1) @nogc nothrow
97 	{
98 		assert(!_impl);
99 		_impl = cast(Impl*) pureMalloc(Impl.sizeof);
100 		*_impl = Impl.init; // overwrite uninitialized memory from malloc!
101 		assert(_impl);
102 
103 
104 		_impl.fd = handle;
105 		_impl.noAutoClose = noAutoClose;
106 		atomicStore(_impl.refCount, refs);
107 	}
108 
109 	// i've checked, async *struct* constructors are safe!
110 	static if (!IS_PIPE)
111 	this(string name, FileOpenMode mode) @Async
112 	{
113 		auto fd = fileOpen(name, mode);
114 		check!(OpenStatus.ok, (s) => "open failed: " ~ s)(fd.status);
115 
116 		initialize(fd.fd);
117 	}
118 
119 	// copy constructors between pipes of different types
120 	static if (IS_PIPE)
121 	{
122 		this(P)(scope P rhs)
123 		if (
124 			// pipew and piper -> pipea
125 			(WRITEABLE && READABLE && (is(P == Handle!(true, true, false)) || is(P == Handle!(true, false, true))))
126 			// pipea -> pipew
127 			|| (WRITEABLE && !READABLE && is(P == Handle!true))
128 			// pipea -> piper
129 			|| (!WRITEABLE && READABLE && is(P == Handle!true))
130 		)
131 		{
132 			if (!rhs._impl) return;
133 			assert(atomicLoad(rhs._impl.refCount));
134 			atomicOp!"+="(rhs._impl.refCount, 1); // add a ref
135 			_impl = cast(Impl*) rhs._impl; // duplicate the impl pointer
136 		}
137 	}
138 
139 	// === LIFECYCLE ===
140 
141 	// +1 refcount for fresh instances
142 	this(this)
143 	{
144 		if (!_impl) return; // null impl, whatever
145 		assert(atomicLoad(_impl.refCount));
146 		atomicOp!"+="(_impl.refCount, 1);
147 	}
148 
149 	~this() => detach();
150 
151 	// -1 refcount & free & close
152 	void detach()
153 	{
154 		if (!_impl) return;
155 		scope(exit) _impl = null;
156 
157 		if (atomicOp!"-="(_impl.refCount, 1) == 0)
158 		{
159 			scope(exit) pureFree(_impl);
160 
161 			if (!_impl.noAutoClose)
162 				driverfp().releaseRef(_impl.fd); // closes the fd synchronously
163 		}
164 	}
165 
166 	// === INTERNAL UTILS ===
167 
168 	private T withLock(T)(T delegate() f)
169 	{
170 		_impl.mutex.lock();
171 		scope(exit) _impl.mutex.unlock();
172 		return f();
173 	}
174 
175 	// === FILE OPERATIONS ===
176 
177 	// replaces the currently open file of this instance with a new one
178 	static if (!IS_PIPE)
179 	void open(string name, FileOpenMode mode) @Async
180 	{
181 		detach(); // if this instance points to a file, detach
182 
183 		auto opened = fileOpen(name, mode);
184 		// if this fails, leaves the file with no _impl attached cleanly.
185 		check!(OpenStatus.ok, (s) => "open failed: " ~ s)(opened.status);
186 
187 		initialize(opened.fd);
188 	}
189 
190 	// need to check fd because will be null if another instance closed the file.
191 	@property bool isOpen() const pure nothrow
192 		=> _impl !is null && _impl.fd;
193 
194 	// if open, closes, else does nothing
195 	void close()
196 	{
197 		if (!_impl) return;
198 
199 		scope(exit)
200 		{
201 			// basically the logic from detach()
202 			if (atomicOp!"-="(_impl.refCount, 1) == 0)
203 				pureFree(_impl);
204 			_impl = null;
205 		}
206 
207 		if (!_impl.fd) return; // already closed on another thread
208 		scope(exit) _impl.fd = typeof(_impl.fd).init; // why not
209 
210 		driverfp().releaseRef(_impl.fd); // close
211 	}
212 
213 	static if (!IS_PIPE)
214 	void sync()
215 	{
216 		// TODO: Windows support
217 		// TODO: Darwin support
218 		import core.sys.posix.unistd : fsync;
219 		import std.exception : errnoEnforce;
220 
221 		errnoEnforce(fsync(fileno) == 0);
222 	}
223 
224 	@property int fileno() const
225 	{
226 		auto fd = _impl.fd.value.value;
227 		assert(fd < int.max);
228 		return cast(int) fd;
229 	}
230 
231 	static if (!IS_PIPE)
232 	@property ulong length() const
233 	{
234 		return eventDriver.files.getSize(_impl.fd);
235 	}
236 
237 	// TODO: truncate
238 
239 	// make the offset arg only exist for files without having to declare the functions twice
240 	static if (IS_PIPE)
241 		private alias OFFSET_ARGS = AliasSeq!();
242 	else
243 		private alias OFFSET_ARGS = AliasSeq!ulong;
244 
245 	static if (READABLE)
246 	ulong read(OFFSET_ARGS oset, ubyte[] buf) @Async
247 	{
248 		return withLock({
249 			static if (IS_PIPE)
250 				auto res = pipeRead(_impl.fd, buf);
251 			else
252 				auto res = fileRead(_impl.fd, oset[0], buf);
253 
254 			check!(IOStatus.ok, (s) => "read error: " ~ s)(res.status);
255 			return res.bytesRWd;
256 		});
257 	}
258 
259 	static if (WRITEABLE)
260 	ulong write(OFFSET_ARGS oset, const(ubyte)[] buf) @Async
261 	{
262 		return withLock({
263 			static if (IS_PIPE)
264 				auto res = pipeWrite(_impl.fd, buf);
265 			else
266 				auto res = fileWrite(_impl.fd, oset[0], buf);
267 
268 			check!(IOStatus.ok, (s) => "write error: " ~ s)(res.status);
269 			return res.bytesRWd;
270 		});
271 	}
272 }
273 
274 // wraps a file handle
275 alias FileH = Handle!false;
276 // wraps one end of a pipe handle
277 alias PipeEndHR = Handle!(true, true, false);
278 alias PipeEndHW = Handle!(true, false);
279 alias PipeEndHA = Handle!true;
280 
281 // wraps a complete pipe with a read and write end
282 struct Pipe
283 {
284 	private PipeEndHR _read;
285 	private PipeEndHW _write;
286 
287 	@property PipeEndHR readEnd() nothrow { return _read; }
288 	@property PipeEndHW writeEnd() nothrow { return _write; }
289 
290 	@property auto readStream() nothrow { return streamify(_read); }
291 	@property auto writeStream() nothrow { return streamify(_write); }
292 
293 	// generally should be unnecessary, as both pipes will automatically close themselves when
294 	// there are no more references to them
295 	void close()
296 	{
297 		_read.close();
298 		_write.close();
299 	}
300 
301 	// creates a pipe!
302 	// cannot just be a default constructor for struct reasons
303 	static Pipe create()
304 	{
305 		// https://github.com/dlang/phobos/blob/c970ca6/std/process.d#L2756
306 		version (Posix)
307 		{
308 			import core.sys.posix.unistd : pipe;
309 			int[2] fds;
310 			check!(0, (_) => "failed to open pipe")(pipe(fds));
311 
312 			return Pipe(PipeEndHR(fds[0], false), PipeEndHW(fds[1], false));
313 		}
314 		else
315 		{
316 			// TODO
317 		}
318 	}
319 }
320 
321 enum SeekOrigin
322 {
323 	set,
324 	curr,
325 	end
326 }
327 
328 struct Stream(H) if (isInstanceOf!(Handle, H))
329 {
330 	H handle;
331 
332 	enum isSeekable = !H.isPipe;
333 	enum isReadable = H.isReadable;
334 	enum isWriteable = H.isWriteable;
335 
336 	// position management
337 	static if (isSeekable)
338 	{
339 		private ulong _index;
340 		private bool _eof;
341 
342 		@property ulong tell() const
343 		{
344 			return _index;
345 		}
346 
347 		// forward length from handle for convenience
348 		@property ulong length() const
349 		{
350 			return handle.length;
351 		}
352 
353 		@property bool isEof() const
354 		{
355 			return _eof;
356 		}
357 
358 		void seek(long oset, SeekOrigin origin = SeekOrigin.set)
359 		{
360 			import std.exception : enforce;
361 
362 			auto len = length;
363 
364 			final switch (origin) with (SeekOrigin)
365 			{
366 			case set:
367 				enforce(oset < len, "cannot seek past the end of the stream");
368 				_index = oset;
369 				break;
370 
371 			case curr:
372 				enforce(oset >= 0 || -oset > _index, "cannot seek before the start of the stream");
373 				enforce((oset + _index) <= len, "cannot seek past the end of the stream");
374 				_index += oset;
375 				break;
376 
377 			case end:
378 				enforce(oset <= 0, "cannot seek past the end of the stream");
379 				_index = len + oset;
380 				break;
381 			}
382 
383 			checkEOF(len);
384 		}
385 
386 		private void checkEOF()
387 		{
388 			checkEOF(length);
389 		}
390 
391 		private void checkEOF(ulong l)
392 		{
393 			_eof = _index == l;
394 		}
395 	}
396 
397 	static if (isReadable)
398 	// reads into a buffer, updating the index and eof state as necessary, returning the amount written
399 	ulong rawRead(ubyte[] buffer) @Async
400 	{
401 		static if (!isSeekable) return handle.read(buffer);
402 		else
403 		{
404 			ulong bytesXfered;
405 
406 			// we only check if this read will EOF using the file length if an error occurs,
407 			// as checking length() every time would be relatively expensive.
408 			try
409 			{
410 				bytesXfered = handle.read(_index, buffer);
411 			}
412 			catch (FileIOException e)
413 			{
414 				auto fileLen = length();
415 
416 				if (
417 					// error state MIGHT BE an EOF
418 					e.status == IOStatus.error
419 					// this read indeed would have EOFed
420 					&& _index + buffer.length > fileLen)
421 				{
422 					bytesXfered = fileLen - _index;
423 					_eof = true;
424 				}
425 				else
426 					throw e;
427 			}
428 
429 			_index += bytesXfered;
430 
431 			return bytesXfered;
432 		}
433 	}
434 
435 	static if (isWriteable)
436 	// reads into a buffer, updating the index and eof state as necessary, returning the amount written
437 	ulong rawWrite(const(ubyte)[] buffer) @Async
438 	{
439 		static if (!isSeekable) return handle.write(buffer);
440 		else
441 		{
442 			ulong bytesXfered;
443 
444 			try
445 			{
446 				bytesXfered = handle.write(_index, buffer);
447 			}
448 			catch (FileIOException e)
449 			{
450 				auto fileLen = length();
451 
452 				if (
453 					// error state MIGHT BE an EOF
454 					e.status == IOStatus.error
455 					// this read indeed would have EOFed
456 					&& _index + buffer.length > fileLen)
457 				{
458 					bytesXfered = fileLen - _index;
459 					_eof = true;
460 				}
461 				else
462 					throw e;
463 			}
464 
465 			_index += bytesXfered;
466 
467 			return bytesXfered;
468 		}
469 	}
470 
471 	static if (isReadable)
472 	ubyte[] rawRead(ulong upTo) @Async
473 	{
474 		ubyte[] buf = new ubyte[upTo];
475 		auto xfered = rawRead(buf);
476 		return buf[0 .. xfered];
477 	}
478 
479 	// TODO: nice apis
480 }
481 
482 // this sucks but idk what else to do because of template arg inference
483 auto streamify(H)(scope H value) if (isInstanceOf!(Handle, H))
484 {
485 	return Stream!H(value);
486 }
487 
488 import core.sys.posix.unistd : STDIN_FILENO, STDOUT_FILENO, STDERR_FILENO;
489 
490 auto getStdin() @property // @suppress(dscanner.confusing.function_attributes)
491 {
492 	int fd;
493 	// TODO: other platforms than POSIX
494 	version (Posix)
495 	{
496 		import core.sys.posix.unistd : dup;
497 		fd = dup(STDIN_FILENO);
498 		assert(fd);
499 	}
500 
501 	return streamify(PipeEndHR(fd, false));
502 }
503 
504 auto getStdout() @property // @suppress(dscanner.confusing.function_attributes)
505 {
506 	int fd;
507 	version (Posix)
508 	{
509 		import core.sys.posix.unistd : dup;
510 
511 		fd = dup(STDOUT_FILENO);
512 		assert(fd);
513 	}
514 
515 	return streamify(PipeEndHW(fd, false));
516 }
517 
518 auto getStderr() @property // @suppress(dscanner.confusing.function_attributes)
519 {
520 	int fd;
521 	version (Posix)
522 	{
523 		import core.sys.posix.unistd : dup;
524 
525 		fd = dup(STDERR_FILENO);
526 		assert(fd);
527 	}
528 
529 	return streamify(PipeEndHW(fd, false));
530 }