1 /** 
2  * Socket stream
3  */
4 module river.impls.sock;
5 
6 import river.core;
7 import std.socket;
8 
9 /** 
10  * Provides a stream interface to a `Socket` which has 
11  */
12 public class SockStream : Stream
13 {
14     /** 
15      * Underlying socket
16      */
17     private Socket socket;
18 
19     /** 
20      * Constructs a new `SockStream` from the provided socket
21      *
22      * Params:
23      *   socket = the `Socket` to use as the underlying source/sink
24      */
25     this(Socket socket)
26     {
27         this.socket = socket;
28     }
29 
30     /** 
31      * Ensures that the socket is open, if not, then throws an
32      * exception
33      */
34     private void openCheck()
35     {
36         if(!socket.isAlive())
37         {
38             throw new StreamException(StreamError.CLOSED);
39         }
40     }
41 
42     /** 
43      * Reads bytes from the socket into the provided array
44      * and returns without any further waiting, at most the
45      * number of bytes read will be the length of the provided
46      * array, at minimum a single byte
47      *
48      * Params:
49      *   toArray = the buffer to read into
50      * Returns: the number of bytes read
51      */
52     public override ulong read(byte[] toArray)
53     {
54         // Ensure the stream is open
55         openCheck();
56 
57         // Receive from the socket (at most `toArray.length`)
58         ptrdiff_t status = socket.receive(toArray);
59 
60         // If the remote end closed the connection
61         if(status == 0)
62         {
63             throw new StreamException(StreamError.CLOSED);
64         }
65         // TODO: Handle like above, but some custom error message, then throw exception
66         else if(status < 0)
67         {
68             // TODO: We should examine the error
69             throw new StreamException(StreamError.OPERATION_FAILED);
70         }
71         // If the message was correctly received
72         else
73         {
74             return status;
75         }
76     }
77 
78     /** 
79      * Reads bytes from the socket into the provided array
80      * until the array is fully-filled
81      *
82      * Params:
83      *   toArray = the buffer to read into
84      * Returns: the number of bytes read
85      */
86     public override ulong readFully(byte[] toArray)
87     {
88         // Ensure the stream is open
89         openCheck();
90 
91         // Receive from the socket `toArray.length`
92         // TODO: recv can only read a certain number of max bytes, we should
93         // ... decide what to do in such a case
94         ptrdiff_t status = socket.receive(toArray, cast(SocketFlags)MSG_WAITALL);
95 
96         // If the remote end closed the connection
97         if(status == 0)
98         {
99             throw new StreamException(StreamError.CLOSED);
100         }
101         // TODO: Handle like above, but some custom error message, then throw exception
102         else if(status < 0)
103         {
104             // TODO: We should examine the error
105             throw new StreamException(StreamError.OPERATION_FAILED);
106         }
107         // If the message was correctly received
108         else
109         {
110             // TODO: Ensure read count > 0 and count == toArray.length (full amount requested was read)
111             if(status == toArray.length)
112             {
113 
114             }
115             return status;
116         }
117     }
118 
119     /** 
120      * Writes bytes to the socket from the provided array
121      * and returns without any further waiting, at most the
122      * number of bytes written will be the length of the provided
123      * array, at minimum a single byte.
124      *
125      * Be aware that is the kernsl's internal buffer is full
126      * and if the `Socket` is in blocking mode that this wil
127      * block until space is available to send at most some of
128      * the bytes in `fromArray`.
129      *
130      * Params:
131      *   fromArray = the buffer to write from
132      * Returns: the number of bytes written
133      */
134     public override ulong write(byte[] fromArray)
135     {
136         // Ensure the stream is open
137         openCheck();
138 
139         // Write to the socket (at most `fromArray.length`)
140         // TODO: Implement me
141         ptrdiff_t status = socket.send(fromArray);
142 
143         // On an error
144         if(status < 0)
145         {
146             // TODO: We should examine the error
147             throw new StreamException(StreamError.OPERATION_FAILED);
148         }
149         // If the message was correctly sent
150         else
151         {
152             return status;
153         }
154     }
155 
156     /** 
157      * Writes bytes to the socket from the provided array
158      * until the array has been fully written
159      *
160      * Params:
161      *   fromArray = the buffer to write from
162      * Returns: the number of bytes written
163      */
164     public override ulong writeFully(byte[] fromArray)
165     {
166         // Ensure the stream is open
167         openCheck();
168 
169         // Write to the socket `fromArray.length`
170         // TODO: send can only write a certain number of max bytes, we should
171         // ... decide what to do in such a case
172         ptrdiff_t status = socket.send(fromArray, cast(SocketFlags)MSG_WAITALL);
173 
174         
175         // On an error
176         if(status < 0)
177         {
178             // TODO: We should examine the error
179             throw new StreamException(StreamError.OPERATION_FAILED);
180         }
181         // If the message was correctly sent
182         else
183         {
184             // TODO: Ensure read count > 0 and count == toArray.length (full amount requested was read)
185             if(status == fromArray.length)
186             {
187 
188             }
189             return status;
190         }
191     }
192 
193     /** 
194      * Closes the stream
195      */
196     public override void close()
197     {
198         /* Unblocks any current calls to receive/send and prevents and futher ones */
199         socket.shutdown(SocketShutdown.BOTH);
200 
201         /* Closes the connection */
202         socket.close();
203     }
204 }
205 
206 version(unittest)
207 {
208     import core.thread;
209     import std.file;
210     import std.stdio : writeln;
211 }
212 
213 /**
214  * Tests using `read(ref byte[])` and `readFully(ref byte[])`
215  * on a `SockStream`
216  */
217 unittest
218 {
219     import river.impls.sock;
220 
221     // FIXME: Make this randomnly generated
222     string testDomainStr = "/tmp/riverTestUNIXSock.sock";
223     UnixAddress testDomain = new UnixAddress(testDomainStr);
224 
225     scope(exit)
226     {
227         // Remove the UNIX domain file, else we will get a problem
228         // ... creating it next time we run
229         remove(testDomainStr);
230     }
231 
232     Socket server = new Socket(AddressFamily.UNIX, SocketType.STREAM);
233     server.bind(testDomain);
234     server.listen(0);
235     
236     class ServerThread : Thread
237     {
238         private Socket serverSocket;
239 
240         this(Socket serverSocket)
241         {
242             super(&run);
243             this.serverSocket = serverSocket;
244         }
245 
246         private void run()
247         {
248             
249             Socket clientSocket = serverSocket.accept();
250 
251             ubyte[] data = [69,255,21];
252             clientSocket.send(data);
253 
254 
255             Thread.sleep(dur!("seconds")(2));
256             // yield();
257             data = [1,2,3,4,5,5,4,3,2,1];
258             clientSocket.send(data);
259             
260         }
261     }
262 
263     Thread serverThread = new ServerThread(server);
264     serverThread.start();
265 
266     Socket clientConnection = new Socket(AddressFamily.UNIX, SocketType.STREAM);
267     clientConnection.connect(testDomain);
268 
269     Stream stream = new SockStream(clientConnection);
270 
271     // TODO: The below can technically be mixed-in
272 
273     byte[] receivedData;
274     receivedData.length = 2;
275     ulong cnt = stream.readFully(receivedData);
276     assert(cnt == 2);
277     assert(receivedData == [69,-1]);
278 
279 
280     Thread.sleep(dur!("seconds")(2));
281     byte[] receivedData2;
282     receivedData2.length = 3;
283     cnt = stream.read(receivedData2);
284     writeln(cnt);
285     writeln(receivedData2);
286     assert(cnt >= 1 && cnt <= 3);
287     assert(receivedData2 == [21, 0, 0] || receivedData2 == [21, 1, 0] || receivedData2 == [21, 1, 2]);
288 
289 
290     // Finally close the stream
291     stream.close();
292 }