1 /** 
2  * Pipe-based stream
3  */
4 module river.impls.pipe;
5 
6 import river.core;
7 import std.exception : ErrnoException;
8 import std.conv : to;
9 import river.impls.fd : FDStream;
10 
11 /** 
12  * Provides a stream interface to a UNIX pipe fd-pair
13  */
14 public class PipeStream : Stream
15 {
16     /** 
17      * Underlying FDStreams for the read and write fds
18      * making up the pipe
19      */
20     private FDStream readStream, writeStream;
21 
22     /** 
23      * Constructs a new piped-stream with the file descriptors of the
24      * read and write ends provided
25      *
26      * Params:
27      *   readEnd = the fd of the pipe's read end
28      *   writeEnd = the fd of the pipe's write end
29      */
30 	this(int readEnd, int writeEnd)
31 	{
32         this.readStream = new FDStream(readEnd);
33         this.writeStream = new FDStream(writeEnd);
34 	}
35 
36     /** 
37      * Gets the stream attached to the read-end of the pipe
38      *
39      * Returns: an `FDStream`
40      */
41     public FDStream getReadStream()
42     {
43         return readStream;
44     }
45 
46     /** 
47      * Gets the stream attached to the write-end of the pipe
48      *
49      * Returns: an `FDStream`
50      */
51     public FDStream getWriteStream()
52     {
53         return writeStream;
54     }
55 
56     /** 
57      * Creates a new anonymous pipe and attaches it to a newly created
58      * `PipeStream`
59      *
60      * Returns: the created `PipeStream`, `null` on failure to create
61      * the pipe
62      */
63     public static PipeStream newPipe()
64     {
65         version(Posix)
66         {
67             import core.sys.posix.unistd : pipe;
68 
69             /* Open the pipe */
70             int[2] pipeFd;
71 
72             // Successful pipe creation
73             if(pipe(pipeFd) == 0)
74             {
75                 return new PipeStream(pipeFd[0], pipeFd[1]);
76             }
77             // Failure to create a pipe
78             else
79             {
80                 return null;
81             }
82         }
83         else
84         {
85             pragma(msg, "PipeStream: Cannot construct pipes on this platform");
86             static assert(false);
87         }
88     }
89 
90     /** 
91      * Reads bytes from the pipe into the provided array
92      * and returns without any further waiting, at most the
93      * number of bytes read will be the length of the provided
94      * array, at minimum a single byte
95      *
96      * Params:
97      *   toArray = the buffer to read into
98      * Returns: the number of bytes read
99      */
100     public override ulong read(byte[] toArray)
101     {
102         return readStream.read(toArray);
103     }
104 
105     /** 
106      * Reads bytes from the pipe into the provided array
107      * until the array is fully-filled
108      *
109      * Params:
110      *   toArray = the buffer to read into
111      * Returns: the number of bytes read
112      */
113     public override ulong readFully(byte[] toArray)
114     {
115         return readStream.readFully(toArray);
116     }
117 
118     public override ulong write(byte[] fromArray)
119     {
120         return writeStream.write(fromArray);
121     }
122 
123     public override ulong writeFully(byte[] fromArray)
124     {
125         return writeStream.writeFully(fromArray);
126     }
127 
128     /** 
129      * Closes both pipe pairs
130      */
131     public override void close()
132     {
133         readStream.close();
134         writeStream.close();
135     }
136 }
137 
138 version(unittest)
139 {
140     import core.thread;
141     import std.stdio;
142 }
143 
144 /**
145  * Create a new `PipeStream` where one thread writes to it
146  * and another thread (the main thread) reads from it.
147  *
148  * We have added in some pauses to add entropy to show
149  * how it could go either way and how `read(byte[])`
150  * and `readFully(byte[])` can be used in such situations
151  */
152 unittest
153 {
154     PipeStream myPipe = PipeStream.newPipe();
155     assert(myPipe !is null);
156 
157     class WriterThread : Thread
158     {
159         private PipeStream myPipeStream;
160 
161         this(PipeStream myPipeStream)
162         {
163             this.myPipeStream = myPipeStream;
164             super(&run);
165         }
166 
167         private void run()
168         {
169             byte[] data = [0,69,55];
170             myPipeStream.write(data);
171 
172             Thread.sleep(dur!("seconds")(2));
173 
174             data = [42, 80, 99];
175             myPipeStream.write(data);
176 
177             Thread.sleep(dur!("seconds")(2));
178 
179             data = [100, 102];
180             myPipeStream.write(data);
181         }
182     }
183 
184     Thread writerThread = new WriterThread(myPipe);
185     writerThread.start();
186 
187     Thread.sleep(dur!("seconds")(2));
188 
189     byte[] myReceivedData;
190     myReceivedData.length = 4;
191     ulong cnt = myPipe.read(myReceivedData);
192     assert(cnt == 3 || cnt == 4);
193     assert(myReceivedData == [0, 69,55, 0] || myReceivedData == [0, 69,55, 42]);
194 
195 
196     // By now either [42, 80, 99, 100, 102] or [80, 99, 100, 102]
197 
198     byte[] myReceivedData2;
199     myReceivedData2.length = 4;
200     cnt = myPipe.readFully(myReceivedData2);
201     writeln(cnt);
202     assert(cnt == 4);
203     writeln(myReceivedData2);
204     assert(myReceivedData2 == [42, 80, 99, 100] || myReceivedData2 == [80, 99, 100, 102]);
205 
206     // Close the stream
207     myPipe.close();
208 }
209 
210 // version(unittest)
211 // {
212 //     unittest
213 //     {
214 //         version(linux)
215 //         {
216 //             writeln("Testing fcntl to adjust pipe size to test writeFully()");
217 
218 //             import core.sys.linux.fcntl;
219 
220 //             PipeStream myPipe = PipeStream.newPipe();
221 //             assert(myPipe !is null);
222 
223 //             int allocatedSize = fcntl(myPipe.getWriteStream().getFd(), 1031, 5000);
224 //             writeln("Pipe's internal buffer size allocated is: ", allocatedSize);
225 //             writeln("Checking size (did the kernel lie, piece of shit) ", fcntl(myPipe.getReadStream().getFd(), 1032));
226 
227 //             // TODO: Insert a reader thread that reads sloweer whilst we try write more
228 //             // than an initial `allocatedSize`+1
229 //             class ReaderThread : Thread
230 //             {
231 //                 private Stream stream;
232 //                 this(Stream stream)
233 //                 {
234 //                     this.stream = stream;
235 //                     super(&worker);
236 //                 }
237 
238 //                 private void worker()
239 //                 {
240 //                     writeln("Reader thread is sleeping for 3 seconds...");
241 //                     Thread.sleep(dur!("seconds")(3));
242 //                     writeln("reader is going to now");
243 
244 //                     byte[] singleByte;
245 //                     singleByte.length = 4095;
246 //                     stream.read(singleByte);
247 
248 //                     writeln("Popped off byte: ", singleByte);
249 //                 }
250 //             }
251 
252 //             Thread readerThread = new ReaderThread(myPipe);
253 //             readerThread.start();
254 
255 
256 //             // We must write `allocatedSize`+1`
257 //             byte[] writeBytes;
258 //             writeBytes.length = allocatedSize+1;
259 //             foreach(ref byte writeByte; writeBytes)
260 //             {
261 //                 writeByte = 69;
262 //             }
263 //             writeBytes[0] = 10;
264 
265 //             writeln("Array to be written: ", writeBytes[0..20]);
266             
267 //             writeln("Calling writeFully() now");
268 //             myPipe.writeFully(writeBytes);
269 //             writeln("writeFully() completed");
270 //         }
271         
272 //     }
273     
274 // }
275