1 /** 
2  * Socket stream
3  */
4 module river.impls.sock;
5 
6 import river.core;
7 import std.socket;
8 
9 /** 
10  * Provides a stream interface to a `Socket` which has 
11  */
12 public class SockStream : Stream
13 {
14     /** 
15      * Underlying socket
16      */
17     private Socket socket;
18 
19     /** 
20      * Constructs a new `SockStream` from the provided socket
21      *
22      * Params:
23      *   socket = the `Socket` to use as the underlying source/sink
24      */
25     this(Socket socket)
26     {
27         this.socket = socket;
28     }
29 
30     /** 
31      * Ensures that the socket is open, if not, then throws an
32      * exception
33      */
34     private void openCheck()
35     {
36         if(!socket.isAlive())
37         {
38             throw new StreamException(StreamError.CLOSED);
39         }
40     }
41 
42     /** 
43      * Reads bytes from the socket into the provided array
44      * and returns without any further waiting, at most the
45      * number of bytes read will be the length of the provided
46      * array, at minimum a single byte
47      *
48      * Params:
49      *   toArray = the buffer to read into
50      * Returns: the number of bytes read
51      */
52     public override ulong read(byte[] toArray)
53     {
54         // Ensure the stream is open
55         openCheck();
56 
57         // Receive from the socket (at most `toArray.length`)
58         ptrdiff_t status = socket.receive(toArray);
59 
60         // If the remote end closed the connection
61         if(status == 0)
62         {
63             throw new StreamException(StreamError.CLOSED);
64         }
65         // TODO: Handle like above, but some custom error message, then throw exception
66         else if(status < 0)
67         {
68             // TODO: We should examine the error
69             throw new StreamException(StreamError.OPERATION_FAILED);
70         }
71         // If the message was correctly received
72         else
73         {
74             return status;
75         }
76     }
77 
78     // TODO: We should not allow toArray.length == 0 below
79     // ... confusing between our internal socket closing
80     // ... and hence returning 0 or 0 bytes read
81     // ... WHEN we request with 0 length (occuring because
82     // ... of the toArray.length == 0)
83 
84     /** 
85      * Reads bytes from the socket into the provided array
86      * until the array is fully-filled
87      *
88      * Params:
89      *   toArray = the buffer to read into
90      * Returns: the number of bytes read
91      */
92     public override ulong readFully(byte[] toArray)
93     {
94         // Ensure the stream is open
95         openCheck();
96 
97         // Receive from the socket `toArray.length`
98         ptrdiff_t status = socket.receive(toArray, cast(SocketFlags)MSG_WAITALL);
99 
100         // If the remote end closed the connection
101         if(status == 0)
102         {
103             throw new StreamException(StreamError.CLOSED);
104         }
105         // TODO: Handle like above, but some custom error message, then throw exception
106         else if(status < 0)
107         {
108             // TODO: We should examine the error
109             throw new StreamException(StreamError.OPERATION_FAILED);
110         }
111         // If the message was correctly received
112         else
113         {
114             return status;
115         }
116     }
117 
118     /** 
119      * Writes bytes to the socket from the provided array
120      * and returns without any further waiting, at most the
121      * number of bytes written will be the length of the provided
122      * array, at minimum a single byte.
123      *
124      * Be aware that is the kernsl's internal buffer is full
125      * and if the `Socket` is in blocking mode that this wil
126      * block until space is available to send at most some of
127      * the bytes in `fromArray`.
128      *
129      * Params:
130      *   fromArray = the buffer to write from
131      * Returns: the number of bytes written
132      */
133     public override ulong write(byte[] fromArray)
134     {
135         // Ensure the stream is open
136         openCheck();
137 
138         // Write to the socket (at most `fromArray.length`)
139         ptrdiff_t status = socket.send(fromArray);
140 
141         // On an error
142         if(status < 0)
143         {
144             // TODO: We should examine the error
145             throw new StreamException(StreamError.OPERATION_FAILED);
146         }
147         // If the message was correctly sent
148         else
149         {
150             return status;
151         }
152     }
153 
154     /** 
155      * Writes bytes to the socket from the provided array
156      * until the array has been fully written
157      *
158      * Params:
159      *   fromArray = the buffer to write from
160      * Returns: the number of bytes written
161      */
162     public override ulong writeFully(byte[] fromArray)
163     {
164         // Ensure the stream is open
165         openCheck();
166 
167         /** 
168          * Perform a write till the number of bytes requested is fulfilled,
169          * we have to do it in this matter as it doesn't seem that MSG_WAITALL
170          * will work as done in `socket.receive()`
171          */
172         long totalBytesRequested = fromArray.length;
173         long totalBytesGot = 0;
174         while(totalBytesGot < totalBytesRequested)
175         {
176             /* Write remaining bytes into correct offset */
177             ptrdiff_t status = socket.send(fromArray[0+totalBytesGot..totalBytesRequested]);
178 
179             // On successful write
180             if(status > 0)
181             {
182                 totalBytesGot += status;
183             }
184             // On write error
185             else if(status == 0)
186             {
187                 throw new StreamException(StreamError.OPERATION_FAILED, "Could not write, status 0");
188             }
189             // On write error
190             else
191             {
192                 throw new StreamException(StreamError.OPERATION_FAILED, "Could not write, status <0");
193             }
194         }
195 
196         assert(totalBytesGot == totalBytesRequested);
197         
198         return totalBytesGot;
199     }
200 
201     /** 
202      * Closes the stream
203      */
204     public override void close()
205     {
206         /* Unblocks any current calls to receive/send and prevents and futher ones */
207         socket.shutdown(SocketShutdown.BOTH);
208 
209         /* Closes the connection */
210         socket.close();
211     }
212 }
213 
214 version(unittest)
215 {
216     import core.thread;
217     import std.file;
218     import std.stdio : writeln;
219     import river.impls.sock;
220 }
221 
222 /**
223  * Tests using `read(ref byte[])` and `readFully(ref byte[])`
224  * on a `SockStream`
225  */
226 unittest
227 {
228     string testDomainStr = "/tmp/riverTestUNIXSock.sock";
229     UnixAddress testDomain = new UnixAddress(testDomainStr);
230 
231     scope(exit)
232     {
233         // Remove the UNIX domain file, else we will get a problem
234         // ... creating it next time we run
235         remove(testDomainStr);
236     }
237 
238     Socket server = new Socket(AddressFamily.UNIX, SocketType.STREAM);
239     server.bind(testDomain);
240     server.listen(0);
241     
242     class ServerThread : Thread
243     {
244         private Socket serverSocket;
245 
246         this(Socket serverSocket)
247         {
248             super(&run);
249             this.serverSocket = serverSocket;
250         }
251 
252         private void run()
253         {
254             /** 
255              * Accept the socket and create a `SockStream`
256              * from it to test out writing
257              */   
258             Socket clientSocket = serverSocket.accept();
259             Stream clientStream = new SockStream(clientSocket);
260 
261             ubyte[] data = [69,255,21];
262             clientStream.writeFully(cast(byte[])data);
263 
264 
265             Thread.sleep(dur!("seconds")(2));
266             // yield();
267             data = [1,2,3,4,5,5,4,3,2,1];
268 
269             // We catch an exception here as sometimes the main
270             // ... thread may reach the stream.close() which
271             // ... causes the connection to close and 
272             // ... -1 internally returned hence throwing
273             // ... this error. This is fine as we are really
274             // ... testing reads below. WriteFully is being
275             // ... tested so much so as to just test if it works
276             try
277             {
278                 clientStream.writeFully(cast(byte[])data);
279             }
280             catch(StreamException e)
281             {
282                 
283             }
284             
285         }
286     }
287 
288     Thread serverThread = new ServerThread(server);
289     serverThread.start();
290 
291     Socket clientConnection = new Socket(AddressFamily.UNIX, SocketType.STREAM);
292     clientConnection.connect(testDomain);
293 
294     Stream stream = new SockStream(clientConnection);
295 
296     // TODO: The below can technically be mixed-in
297 
298     byte[] receivedData;
299     receivedData.length = 2;
300     ulong cnt = stream.readFully(receivedData);
301     assert(cnt == 2);
302     assert(receivedData == [69,-1]);
303 
304 
305     Thread.sleep(dur!("seconds")(2));
306     byte[] receivedData2;
307     receivedData2.length = 3;
308     cnt = stream.read(receivedData2);
309     writeln(cnt);
310     writeln(receivedData2);
311     assert(cnt >= 1 && cnt <= 3);
312     assert(receivedData2 == [21, 0, 0] || receivedData2 == [21, 1, 0] || receivedData2 == [21, 1, 2]);
313 
314 
315     // Finally close the stream
316     stream.close();
317 }