1 /** 2 * Pipe-based stream 3 */ 4 module river.impls.pipe; 5 6 import river.core; 7 import std.exception : ErrnoException; 8 import std.conv : to; 9 import river.impls.fd : FDStream; 10 11 /** 12 * Provides a stream interface to a UNIX pipe fd-pair 13 */ 14 public class PipeStream : Stream 15 { 16 /** 17 * Underlying FDStreams for the read and write fds 18 * making up the pipe 19 */ 20 private FDStream readStream, writeStream; 21 22 /** 23 * Constructs a new piped-stream with the file descriptors of the 24 * read and write ends provided 25 * 26 * Params: 27 * readEnd = the fd of the pipe's read end 28 * writeEnd = the fd of the pipe's write end 29 */ 30 this(int readEnd, int writeEnd) 31 { 32 this.readStream = new FDStream(readEnd); 33 this.writeStream = new FDStream(writeEnd); 34 } 35 36 /** 37 * Creates a new anonymous pipe and attaches it to a newly created 38 * `PipeStream` 39 * 40 * Returns: the created `PipeStream`, `null` on failure to create 41 * the pipe 42 */ 43 public static PipeStream newPipe() 44 { 45 version(Posix) 46 { 47 import core.sys.posix.unistd : pipe; 48 49 /* Open the pipe */ 50 int[2] pipeFd; 51 52 // Successful pipe creation 53 if(pipe(pipeFd) == 0) 54 { 55 return new PipeStream(pipeFd[0], pipeFd[1]); 56 } 57 // Failure to create a pipe 58 else 59 { 60 return null; 61 } 62 } 63 else 64 { 65 pragma(msg, "PipeStream: Cannot construct pipes on this platform"); 66 static assert(false); 67 } 68 } 69 70 /** 71 * Reads bytes from the pipe into the provided array 72 * and returns without any further waiting, at most the 73 * number of bytes read will be the length of the provided 74 * array, at minimum a single byte 75 * 76 * Params: 77 * toArray = the buffer to read into 78 * Returns: the number of bytes read 79 */ 80 public override ulong read(byte[] toArray) 81 { 82 return readStream.read(toArray); 83 } 84 85 /** 86 * Reads bytes from the pipe into the provided array 87 * until the array is fully-filled 88 * 89 * Params: 90 * toArray = the buffer to read into 91 * Returns: the number of bytes read 92 */ 93 public override ulong readFully(byte[] toArray) 94 { 95 return readStream.readFully(toArray); 96 } 97 98 public override ulong write(byte[] fromArray) 99 { 100 return writeStream.write(fromArray); 101 } 102 103 public override ulong writeFully(byte[] fromArray) 104 { 105 return writeStream.writeFully(fromArray); 106 } 107 108 /** 109 * Closes both pipe pairs 110 */ 111 public override void close() 112 { 113 readStream.close(); 114 writeStream.close(); 115 } 116 } 117 118 version(unittest) 119 { 120 import core.thread; 121 } 122 123 /** 124 * Create a new `PipeStream` where one thread writes to it 125 * and another thread (the main thread) reads from it. 126 * 127 * We have added in some pauses to add entropy to show 128 * how it could go either way and how `read(byte[])` 129 * and `readFully(byte[])` can be used in such situations 130 */ 131 unittest 132 { 133 PipeStream myPipe = PipeStream.newPipe(); 134 assert(myPipe !is null); 135 136 class WriterThread : Thread 137 { 138 private PipeStream myPipeStream; 139 140 this(PipeStream myPipeStream) 141 { 142 this.myPipeStream = myPipeStream; 143 super(&run); 144 } 145 146 private void run() 147 { 148 byte[] data = [0,69,55]; 149 myPipeStream.write(data); 150 151 Thread.sleep(dur!("seconds")(2)); 152 153 data = [42, 80, 99]; 154 myPipeStream.write(data); 155 156 Thread.sleep(dur!("seconds")(2)); 157 158 data = [100, 102]; 159 myPipeStream.write(data); 160 } 161 } 162 163 Thread writerThread = new WriterThread(myPipe); 164 writerThread.start(); 165 166 Thread.sleep(dur!("seconds")(2)); 167 168 byte[] myReceivedData; 169 myReceivedData.length = 4; 170 ulong cnt = myPipe.read(myReceivedData); 171 assert(cnt == 3 || cnt == 4); 172 assert(myReceivedData == [0, 69,55, 0] || myReceivedData == [0, 69,55, 42]); 173 174 175 // By now either [42, 80, 99, 100, 102] or [80, 99, 100, 102] 176 177 byte[] myReceivedData2; 178 myReceivedData2.length = 4; 179 cnt = myPipe.readFully(myReceivedData2); 180 import std.stdio; 181 writeln(cnt); 182 assert(cnt == 4); 183 import std.stdio; 184 writeln(myReceivedData2); 185 assert(myReceivedData2 == [42, 80, 99, 100] || myReceivedData2 == [80, 99, 100, 102]); 186 187 // Close the stream 188 myPipe.close(); 189 }