1 /** 2 * Pipe-based stream 3 */ 4 module river.impls.pipe; 5 6 import river.core; 7 import std.exception : ErrnoException; 8 import std.conv : to; 9 import river.impls.fd : FDStream; 10 11 /** 12 * Provides a stream interface to a UNIX pipe fd-pair 13 */ 14 public class PipeStream : Stream 15 { 16 /** 17 * Underlying FDStreams for the read and write fds 18 * making up the pipe 19 */ 20 private FDStream readStream, writeStream; 21 22 /** 23 * Constructs a new piped-stream with the file descriptors of the 24 * read and write ends provided 25 * 26 * Params: 27 * readEnd = the fd of the pipe's read end 28 * writeEnd = the fd of the pipe's write end 29 */ 30 this(int readEnd, int writeEnd) 31 { 32 this.readStream = new FDStream(readEnd); 33 this.writeStream = new FDStream(writeEnd); 34 } 35 36 /** 37 * Gets the stream attached to the read-end of the pipe 38 * 39 * Returns: an `FDStream` 40 */ 41 public FDStream getReadStream() 42 { 43 return readStream; 44 } 45 46 /** 47 * Gets the stream attached to the write-end of the pipe 48 * 49 * Returns: an `FDStream` 50 */ 51 public FDStream getWriteStream() 52 { 53 return writeStream; 54 } 55 56 /** 57 * Creates a new anonymous pipe and attaches it to a newly created 58 * `PipeStream` 59 * 60 * Returns: the created `PipeStream`, `null` on failure to create 61 * the pipe 62 */ 63 public static PipeStream newPipe() 64 { 65 version(Posix) 66 { 67 import core.sys.posix.unistd : pipe; 68 69 /* Open the pipe */ 70 int[2] pipeFd; 71 72 // Successful pipe creation 73 if(pipe(pipeFd) == 0) 74 { 75 return new PipeStream(pipeFd[0], pipeFd[1]); 76 } 77 // Failure to create a pipe 78 else 79 { 80 return null; 81 } 82 } 83 else 84 { 85 pragma(msg, "PipeStream: Cannot construct pipes on this platform"); 86 static assert(false); 87 } 88 } 89 90 /** 91 * Reads bytes from the pipe into the provided array 92 * and returns without any further waiting, at most the 93 * number of bytes read will be the length of the provided 94 * array, at minimum a single byte 95 * 96 * Params: 97 * toArray = the buffer to read into 98 * Returns: the number of bytes read 99 */ 100 public override ulong read(byte[] toArray) 101 { 102 return readStream.read(toArray); 103 } 104 105 /** 106 * Reads bytes from the pipe into the provided array 107 * until the array is fully-filled 108 * 109 * Params: 110 * toArray = the buffer to read into 111 * Returns: the number of bytes read 112 */ 113 public override ulong readFully(byte[] toArray) 114 { 115 return readStream.readFully(toArray); 116 } 117 118 public override ulong write(byte[] fromArray) 119 { 120 return writeStream.write(fromArray); 121 } 122 123 public override ulong writeFully(byte[] fromArray) 124 { 125 return writeStream.writeFully(fromArray); 126 } 127 128 /** 129 * Closes both pipe pairs 130 */ 131 public override void close() 132 { 133 readStream.close(); 134 writeStream.close(); 135 } 136 } 137 138 version(unittest) 139 { 140 import core.thread; 141 import std.stdio; 142 } 143 144 /** 145 * Create a new `PipeStream` where one thread writes to it 146 * and another thread (the main thread) reads from it. 147 * 148 * We have added in some pauses to add entropy to show 149 * how it could go either way and how `read(byte[])` 150 * and `readFully(byte[])` can be used in such situations 151 */ 152 unittest 153 { 154 PipeStream myPipe = PipeStream.newPipe(); 155 assert(myPipe !is null); 156 157 class WriterThread : Thread 158 { 159 private PipeStream myPipeStream; 160 161 this(PipeStream myPipeStream) 162 { 163 this.myPipeStream = myPipeStream; 164 super(&run); 165 } 166 167 private void run() 168 { 169 byte[] data = [0,69,55]; 170 myPipeStream.write(data); 171 172 Thread.sleep(dur!("seconds")(2)); 173 174 data = [42, 80, 99]; 175 myPipeStream.write(data); 176 177 Thread.sleep(dur!("seconds")(2)); 178 179 data = [100, 102]; 180 myPipeStream.write(data); 181 } 182 } 183 184 Thread writerThread = new WriterThread(myPipe); 185 writerThread.start(); 186 187 Thread.sleep(dur!("seconds")(2)); 188 189 byte[] myReceivedData; 190 myReceivedData.length = 4; 191 ulong cnt = myPipe.read(myReceivedData); 192 assert(cnt == 3 || cnt == 4); 193 assert(myReceivedData == [0, 69,55, 0] || myReceivedData == [0, 69,55, 42]); 194 195 196 // By now either [42, 80, 99, 100, 102] or [80, 99, 100, 102] 197 198 byte[] myReceivedData2; 199 myReceivedData2.length = 4; 200 cnt = myPipe.readFully(myReceivedData2); 201 writeln(cnt); 202 assert(cnt == 4); 203 writeln(myReceivedData2); 204 assert(myReceivedData2 == [42, 80, 99, 100] || myReceivedData2 == [80, 99, 100, 102]); 205 206 // Close the stream 207 myPipe.close(); 208 } 209 210 // version(unittest) 211 // { 212 // unittest 213 // { 214 // version(linux) 215 // { 216 // writeln("Testing fcntl to adjust pipe size to test writeFully()"); 217 218 // import core.sys.linux.fcntl; 219 220 // PipeStream myPipe = PipeStream.newPipe(); 221 // assert(myPipe !is null); 222 223 // int allocatedSize = fcntl(myPipe.getWriteStream().getFd(), 1031, 5000); 224 // writeln("Pipe's internal buffer size allocated is: ", allocatedSize); 225 // writeln("Checking size (did the kernel lie, piece of shit) ", fcntl(myPipe.getReadStream().getFd(), 1032)); 226 227 // // TODO: Insert a reader thread that reads sloweer whilst we try write more 228 // // than an initial `allocatedSize`+1 229 // class ReaderThread : Thread 230 // { 231 // private Stream stream; 232 // this(Stream stream) 233 // { 234 // this.stream = stream; 235 // super(&worker); 236 // } 237 238 // private void worker() 239 // { 240 // writeln("Reader thread is sleeping for 3 seconds..."); 241 // Thread.sleep(dur!("seconds")(3)); 242 // writeln("reader is going to now"); 243 244 // byte[] singleByte; 245 // singleByte.length = 4095; 246 // stream.read(singleByte); 247 248 // writeln("Popped off byte: ", singleByte); 249 // } 250 // } 251 252 // Thread readerThread = new ReaderThread(myPipe); 253 // readerThread.start(); 254 255 256 // // We must write `allocatedSize`+1` 257 // byte[] writeBytes; 258 // writeBytes.length = allocatedSize+1; 259 // foreach(ref byte writeByte; writeBytes) 260 // { 261 // writeByte = 69; 262 // } 263 // writeBytes[0] = 10; 264 265 // writeln("Array to be written: ", writeBytes[0..20]); 266 267 // writeln("Calling writeFully() now"); 268 // myPipe.writeFully(writeBytes); 269 // writeln("writeFully() completed"); 270 // } 271 272 // } 273 274 // } 275