1 /** 2 * Socket stream 3 */ 4 module river.impls.sock; 5 6 import river.core; 7 import std.socket; 8 9 /** 10 * Provides a stream interface to a `Socket` which has 11 */ 12 public class SockStream : Stream 13 { 14 /** 15 * Underlying socket 16 */ 17 private Socket socket; 18 19 /** 20 * Constructs a new `SockStream` from the provided socket 21 * 22 * Params: 23 * socket = the `Socket` to use as the underlying source/sink 24 */ 25 this(Socket socket) 26 { 27 this.socket = socket; 28 } 29 30 /** 31 * Ensures that the socket is open, if not, then throws an 32 * exception 33 */ 34 private void openCheck() 35 { 36 if(!socket.isAlive()) 37 { 38 throw new StreamException(StreamError.CLOSED); 39 } 40 } 41 42 /** 43 * Reads bytes from the socket into the provided array 44 * and returns without any further waiting, at most the 45 * number of bytes read will be the length of the provided 46 * array, at minimum a single byte 47 * 48 * Params: 49 * toArray = the buffer to read into 50 * Returns: the number of bytes read 51 */ 52 public override ulong read(byte[] toArray) 53 { 54 // Ensure the stream is open 55 openCheck(); 56 57 // Receive from the socket (at most `toArray.length`) 58 ptrdiff_t status = socket.receive(toArray); 59 60 // If the remote end closed the connection 61 if(status == 0) 62 { 63 throw new StreamException(StreamError.CLOSED); 64 } 65 // TODO: Handle like above, but some custom error message, then throw exception 66 else if(status < 0) 67 { 68 // TODO: We should examine the error 69 throw new StreamException(StreamError.OPERATION_FAILED); 70 } 71 // If the message was correctly received 72 else 73 { 74 return status; 75 } 76 } 77 78 /** 79 * Reads bytes from the socket into the provided array 80 * until the array is fully-filled 81 * 82 * Params: 83 * toArray = the buffer to read into 84 * Returns: the number of bytes read 85 */ 86 public override ulong readFully(byte[] toArray) 87 { 88 // Ensure the stream is open 89 openCheck(); 90 91 // Receive from the socket `toArray.length` 92 // TODO: recv can only read a certain number of max bytes, we should 93 // ... decide what to do in such a case 94 ptrdiff_t status = socket.receive(toArray, cast(SocketFlags)MSG_WAITALL); 95 96 // If the remote end closed the connection 97 if(status == 0) 98 { 99 throw new StreamException(StreamError.CLOSED); 100 } 101 // TODO: Handle like above, but some custom error message, then throw exception 102 else if(status < 0) 103 { 104 // TODO: We should examine the error 105 throw new StreamException(StreamError.OPERATION_FAILED); 106 } 107 // If the message was correctly received 108 else 109 { 110 // TODO: Ensure read count > 0 and count == toArray.length (full amount requested was read) 111 if(status == toArray.length) 112 { 113 114 } 115 return status; 116 } 117 } 118 119 /** 120 * Writes bytes to the socket from the provided array 121 * and returns without any further waiting, at most the 122 * number of bytes written will be the length of the provided 123 * array, at minimum a single byte. 124 * 125 * Be aware that is the kernsl's internal buffer is full 126 * and if the `Socket` is in blocking mode that this wil 127 * block until space is available to send at most some of 128 * the bytes in `fromArray`. 129 * 130 * Params: 131 * fromArray = the buffer to write from 132 * Returns: the number of bytes written 133 */ 134 public override ulong write(byte[] fromArray) 135 { 136 // Ensure the stream is open 137 openCheck(); 138 139 // Write to the socket (at most `fromArray.length`) 140 // TODO: Implement me 141 ptrdiff_t status = socket.send(fromArray); 142 143 // On an error 144 if(status < 0) 145 { 146 // TODO: We should examine the error 147 throw new StreamException(StreamError.OPERATION_FAILED); 148 } 149 // If the message was correctly sent 150 else 151 { 152 return status; 153 } 154 } 155 156 /** 157 * Writes bytes to the socket from the provided array 158 * until the array has been fully written 159 * 160 * Params: 161 * fromArray = the buffer to write from 162 * Returns: the number of bytes written 163 */ 164 public override ulong writeFully(byte[] fromArray) 165 { 166 // Ensure the stream is open 167 openCheck(); 168 169 // Write to the socket `fromArray.length` 170 // TODO: send can only write a certain number of max bytes, we should 171 // ... decide what to do in such a case 172 ptrdiff_t status = socket.send(fromArray, cast(SocketFlags)MSG_WAITALL); 173 174 175 // On an error 176 if(status < 0) 177 { 178 // TODO: We should examine the error 179 throw new StreamException(StreamError.OPERATION_FAILED); 180 } 181 // If the message was correctly sent 182 else 183 { 184 // TODO: Ensure read count > 0 and count == toArray.length (full amount requested was read) 185 if(status == fromArray.length) 186 { 187 188 } 189 return status; 190 } 191 } 192 193 /** 194 * Closes the stream 195 */ 196 public override void close() 197 { 198 /* Unblocks any current calls to receive/send and prevents and futher ones */ 199 socket.shutdown(SocketShutdown.BOTH); 200 201 /* Closes the connection */ 202 socket.close(); 203 } 204 } 205 206 version(unittest) 207 { 208 import core.thread; 209 import std.file; 210 import std.stdio : writeln; 211 } 212 213 /** 214 * Tests using `read(ref byte[])` and `readFully(ref byte[])` 215 * on a `SockStream` 216 */ 217 unittest 218 { 219 import river.impls.sock; 220 221 // FIXME: Make this randomnly generated 222 string testDomainStr = "/tmp/riverTestUNIXSock.sock"; 223 UnixAddress testDomain = new UnixAddress(testDomainStr); 224 225 scope(exit) 226 { 227 // Remove the UNIX domain file, else we will get a problem 228 // ... creating it next time we run 229 remove(testDomainStr); 230 } 231 232 Socket server = new Socket(AddressFamily.UNIX, SocketType.STREAM); 233 server.bind(testDomain); 234 server.listen(0); 235 236 class ServerThread : Thread 237 { 238 private Socket serverSocket; 239 240 this(Socket serverSocket) 241 { 242 super(&run); 243 this.serverSocket = serverSocket; 244 } 245 246 private void run() 247 { 248 249 Socket clientSocket = serverSocket.accept(); 250 251 ubyte[] data = [69,255,21]; 252 clientSocket.send(data); 253 254 255 Thread.sleep(dur!("seconds")(2)); 256 // yield(); 257 data = [1,2,3,4,5,5,4,3,2,1]; 258 clientSocket.send(data); 259 260 } 261 } 262 263 Thread serverThread = new ServerThread(server); 264 serverThread.start(); 265 266 Socket clientConnection = new Socket(AddressFamily.UNIX, SocketType.STREAM); 267 clientConnection.connect(testDomain); 268 269 Stream stream = new SockStream(clientConnection); 270 271 // TODO: The below can technically be mixed-in 272 273 byte[] receivedData; 274 receivedData.length = 2; 275 ulong cnt = stream.readFully(receivedData); 276 assert(cnt == 2); 277 assert(receivedData == [69,-1]); 278 279 280 Thread.sleep(dur!("seconds")(2)); 281 byte[] receivedData2; 282 receivedData2.length = 3; 283 cnt = stream.read(receivedData2); 284 writeln(cnt); 285 writeln(receivedData2); 286 assert(cnt >= 1 && cnt <= 3); 287 assert(receivedData2 == [21, 0, 0] || receivedData2 == [21, 1, 0] || receivedData2 == [21, 1, 2]); 288 289 290 // Finally close the stream 291 stream.close(); 292 }