1 /** 2 * Socket stream 3 */ 4 module river.impls.sock; 5 6 import river.core; 7 import std.socket; 8 9 /** 10 * Provides a stream interface to a `Socket` which has 11 */ 12 public class SockStream : Stream 13 { 14 /** 15 * Underlying socket 16 */ 17 private Socket socket; 18 19 /** 20 * Constructs a new `SockStream` from the provided socket 21 * 22 * Params: 23 * socket = the `Socket` to use as the underlying source/sink 24 */ 25 this(Socket socket) 26 { 27 this.socket = socket; 28 } 29 30 /** 31 * Ensures that the socket is open, if not, then throws an 32 * exception 33 */ 34 private void openCheck() 35 { 36 if(!socket.isAlive()) 37 { 38 throw new StreamException(StreamError.CLOSED); 39 } 40 } 41 42 /** 43 * Reads bytes from the socket into the provided array 44 * and returns without any further waiting, at most the 45 * number of bytes read will be the length of the provided 46 * array, at minimum a single byte 47 * 48 * Params: 49 * toArray = the buffer to read into 50 * Returns: the number of bytes read 51 */ 52 public override ulong read(byte[] toArray) 53 { 54 // Ensure the stream is open 55 openCheck(); 56 57 // Receive from the socket (at most `toArray.length`) 58 ptrdiff_t status = socket.receive(toArray); 59 60 // If the remote end closed the connection 61 if(status == 0) 62 { 63 throw new StreamException(StreamError.CLOSED); 64 } 65 // TODO: Handle like above, but some custom error message, then throw exception 66 else if(status < 0) 67 { 68 // TODO: We should examine the error 69 throw new StreamException(StreamError.OPERATION_FAILED); 70 } 71 // If the message was correctly received 72 else 73 { 74 return status; 75 } 76 } 77 78 // TODO: We should not allow toArray.length == 0 below 79 // ... confusing between our internal socket closing 80 // ... and hence returning 0 or 0 bytes read 81 // ... WHEN we request with 0 length (occuring because 82 // ... of the toArray.length == 0) 83 84 /** 85 * Reads bytes from the socket into the provided array 86 * until the array is fully-filled 87 * 88 * Params: 89 * toArray = the buffer to read into 90 * Returns: the number of bytes read 91 */ 92 public override ulong readFully(byte[] toArray) 93 { 94 // Ensure the stream is open 95 openCheck(); 96 97 // Receive from the socket `toArray.length` 98 ptrdiff_t status = socket.receive(toArray, cast(SocketFlags)MSG_WAITALL); 99 100 // If the remote end closed the connection 101 if(status == 0) 102 { 103 throw new StreamException(StreamError.CLOSED); 104 } 105 // TODO: Handle like above, but some custom error message, then throw exception 106 else if(status < 0) 107 { 108 // TODO: We should examine the error 109 throw new StreamException(StreamError.OPERATION_FAILED); 110 } 111 // If the message was correctly received 112 else 113 { 114 return status; 115 } 116 } 117 118 /** 119 * Writes bytes to the socket from the provided array 120 * and returns without any further waiting, at most the 121 * number of bytes written will be the length of the provided 122 * array, at minimum a single byte. 123 * 124 * Be aware that is the kernsl's internal buffer is full 125 * and if the `Socket` is in blocking mode that this wil 126 * block until space is available to send at most some of 127 * the bytes in `fromArray`. 128 * 129 * Params: 130 * fromArray = the buffer to write from 131 * Returns: the number of bytes written 132 */ 133 public override ulong write(byte[] fromArray) 134 { 135 // Ensure the stream is open 136 openCheck(); 137 138 // Write to the socket (at most `fromArray.length`) 139 ptrdiff_t status = socket.send(fromArray); 140 141 // On an error 142 if(status < 0) 143 { 144 // TODO: We should examine the error 145 throw new StreamException(StreamError.OPERATION_FAILED); 146 } 147 // If the message was correctly sent 148 else 149 { 150 return status; 151 } 152 } 153 154 /** 155 * Writes bytes to the socket from the provided array 156 * until the array has been fully written 157 * 158 * Params: 159 * fromArray = the buffer to write from 160 * Returns: the number of bytes written 161 */ 162 public override ulong writeFully(byte[] fromArray) 163 { 164 // Ensure the stream is open 165 openCheck(); 166 167 /** 168 * Perform a write till the number of bytes requested is fulfilled, 169 * we have to do it in this matter as it doesn't seem that MSG_WAITALL 170 * will work as done in `socket.receive()` 171 */ 172 long totalBytesRequested = fromArray.length; 173 long totalBytesGot = 0; 174 while(totalBytesGot < totalBytesRequested) 175 { 176 /* Write remaining bytes into correct offset */ 177 ptrdiff_t status = socket.send(fromArray[0+totalBytesGot..totalBytesRequested]); 178 179 // On successful write 180 if(status > 0) 181 { 182 totalBytesGot += status; 183 } 184 // On write error 185 else if(status == 0) 186 { 187 throw new StreamException(StreamError.OPERATION_FAILED, "Could not write, status 0"); 188 } 189 // On write error 190 else 191 { 192 throw new StreamException(StreamError.OPERATION_FAILED, "Could not write, status <0"); 193 } 194 } 195 196 assert(totalBytesGot == totalBytesRequested); 197 198 return totalBytesGot; 199 } 200 201 /** 202 * Closes the stream 203 */ 204 public override void close() 205 { 206 /* Unblocks any current calls to receive/send and prevents and futher ones */ 207 socket.shutdown(SocketShutdown.BOTH); 208 209 /* Closes the connection */ 210 socket.close(); 211 } 212 } 213 214 version(unittest) 215 { 216 import core.thread; 217 import std.file; 218 import std.stdio : writeln; 219 import river.impls.sock; 220 } 221 222 /** 223 * Tests using `read(ref byte[])` and `readFully(ref byte[])` 224 * on a `SockStream` 225 */ 226 unittest 227 { 228 string testDomainStr = "/tmp/riverTestUNIXSock.sock"; 229 UnixAddress testDomain = new UnixAddress(testDomainStr); 230 231 scope(exit) 232 { 233 // Remove the UNIX domain file, else we will get a problem 234 // ... creating it next time we run 235 remove(testDomainStr); 236 } 237 238 Socket server = new Socket(AddressFamily.UNIX, SocketType.STREAM); 239 server.bind(testDomain); 240 server.listen(0); 241 242 class ServerThread : Thread 243 { 244 private Socket serverSocket; 245 246 this(Socket serverSocket) 247 { 248 super(&run); 249 this.serverSocket = serverSocket; 250 } 251 252 private void run() 253 { 254 /** 255 * Accept the socket and create a `SockStream` 256 * from it to test out writing 257 */ 258 Socket clientSocket = serverSocket.accept(); 259 Stream clientStream = new SockStream(clientSocket); 260 261 ubyte[] data = [69,255,21]; 262 clientStream.writeFully(cast(byte[])data); 263 264 265 Thread.sleep(dur!("seconds")(2)); 266 // yield(); 267 data = [1,2,3,4,5,5,4,3,2,1]; 268 269 // We catch an exception here as sometimes the main 270 // ... thread may reach the stream.close() which 271 // ... causes the connection to close and 272 // ... -1 internally returned hence throwing 273 // ... this error. This is fine as we are really 274 // ... testing reads below. WriteFully is being 275 // ... tested so much so as to just test if it works 276 try 277 { 278 clientStream.writeFully(cast(byte[])data); 279 } 280 catch(StreamException e) 281 { 282 283 } 284 285 } 286 } 287 288 Thread serverThread = new ServerThread(server); 289 serverThread.start(); 290 291 Socket clientConnection = new Socket(AddressFamily.UNIX, SocketType.STREAM); 292 clientConnection.connect(testDomain); 293 294 Stream stream = new SockStream(clientConnection); 295 296 // TODO: The below can technically be mixed-in 297 298 byte[] receivedData; 299 receivedData.length = 2; 300 ulong cnt = stream.readFully(receivedData); 301 assert(cnt == 2); 302 assert(receivedData == [69,-1]); 303 304 305 Thread.sleep(dur!("seconds")(2)); 306 byte[] receivedData2; 307 receivedData2.length = 3; 308 cnt = stream.read(receivedData2); 309 writeln(cnt); 310 writeln(receivedData2); 311 assert(cnt >= 1 && cnt <= 3); 312 assert(receivedData2 == [21, 0, 0] || receivedData2 == [21, 1, 0] || receivedData2 == [21, 1, 2]); 313 314 315 // Finally close the stream 316 stream.close(); 317 }