1 /** 
2  * Pipe-based stream
3  */
4 module river.impls.pipe;
5 
6 import river.core;
7 import std.exception : ErrnoException;
8 import std.conv : to;
9 import river.impls.fd : FDStream;
10 
11 /** 
12  * Provides a stream interface to a UNIX pipe fd-pair
13  */
14 public class PipeStream : Stream
15 {
16     /** 
17      * Underlying FDStreams for the read and write fds
18      * making up the pipe
19      */
20     private FDStream readStream, writeStream;
21 
22     /** 
23      * Constructs a new piped-stream with the file descriptors of the
24      * read and write ends provided
25      *
26      * Params:
27      *   readEnd = the fd of the pipe's read end
28      *   writeEnd = the fd of the pipe's write end
29      */
30 	this(int readEnd, int writeEnd)
31 	{
32         this.readStream = new FDStream(readEnd);
33         this.writeStream = new FDStream(writeEnd);
34 	}
35 
36     /** 
37      * Creates a new anonymous pipe and attaches it to a newly created
38      * `PipeStream`
39      *
40      * Returns: the created `PipeStream`, `null` on failure to create
41      * the pipe
42      */
43     public static PipeStream newPipe()
44     {
45         version(Posix)
46         {
47             import core.sys.posix.unistd : pipe;
48 
49             /* Open the pipe */
50             int[2] pipeFd;
51 
52             // Successful pipe creation
53             if(pipe(pipeFd) == 0)
54             {
55                 return new PipeStream(pipeFd[0], pipeFd[1]);
56             }
57             // Failure to create a pipe
58             else
59             {
60                 return null;
61             }
62         }
63         else
64         {
65             pragma(msg, "PipeStream: Cannot construct pipes on this platform");
66             static assert(false);
67         }
68     }
69 
70     /** 
71      * Reads bytes from the pipe into the provided array
72      * and returns without any further waiting, at most the
73      * number of bytes read will be the length of the provided
74      * array, at minimum a single byte
75      *
76      * Params:
77      *   toArray = the buffer to read into
78      * Returns: the number of bytes read
79      */
80     public override ulong read(byte[] toArray)
81     {
82         return readStream.read(toArray);
83     }
84 
85     /** 
86      * Reads bytes from the pipe into the provided array
87      * until the array is fully-filled
88      *
89      * Params:
90      *   toArray = the buffer to read into
91      * Returns: the number of bytes read
92      */
93     public override ulong readFully(byte[] toArray)
94     {
95         return readStream.readFully(toArray);
96     }
97 
98     public override ulong write(byte[] fromArray)
99     {
100         return writeStream.write(fromArray);
101     }
102 
103     public override ulong writeFully(byte[] fromArray)
104     {
105         return writeStream.writeFully(fromArray);
106     }
107 
108     /** 
109      * Closes both pipe pairs
110      */
111     public override void close()
112     {
113         readStream.close();
114         writeStream.close();
115     }
116 }
117 
118 version(unittest)
119 {
120     import core.thread;
121 }
122 
123 /**
124  * Create a new `PipeStream` where one thread writes to it
125  * and another thread (the main thread) reads from it.
126  *
127  * We have added in some pauses to add entropy to show
128  * how it could go either way and how `read(byte[])`
129  * and `readFully(byte[])` can be used in such situations
130  */
131 unittest
132 {
133     PipeStream myPipe = PipeStream.newPipe();
134     assert(myPipe !is null);
135 
136     class WriterThread : Thread
137     {
138         private PipeStream myPipeStream;
139 
140         this(PipeStream myPipeStream)
141         {
142             this.myPipeStream = myPipeStream;
143             super(&run);
144         }
145 
146         private void run()
147         {
148             byte[] data = [0,69,55];
149             myPipeStream.write(data);
150 
151             Thread.sleep(dur!("seconds")(2));
152 
153             data = [42, 80, 99];
154             myPipeStream.write(data);
155 
156             Thread.sleep(dur!("seconds")(2));
157 
158             data = [100, 102];
159             myPipeStream.write(data);
160         }
161     }
162 
163     Thread writerThread = new WriterThread(myPipe);
164     writerThread.start();
165 
166     Thread.sleep(dur!("seconds")(2));
167 
168     byte[] myReceivedData;
169     myReceivedData.length = 4;
170     ulong cnt = myPipe.read(myReceivedData);
171     assert(cnt == 3 || cnt == 4);
172     assert(myReceivedData == [0, 69,55, 0] || myReceivedData == [0, 69,55, 42]);
173 
174 
175     // By now either [42, 80, 99, 100, 102] or [80, 99, 100, 102]
176 
177     byte[] myReceivedData2;
178     myReceivedData2.length = 4;
179     cnt = myPipe.readFully(myReceivedData2);
180     import std.stdio;
181     writeln(cnt);
182     assert(cnt == 4);
183     import std.stdio;
184     writeln(myReceivedData2);
185     assert(myReceivedData2 == [42, 80, 99, 100] || myReceivedData2 == [80, 99, 100, 102]);
186 
187     // Close the stream
188     myPipe.close();
189 }