1 /** 2 * FD-based stream 3 */ 4 module river.impls.fd; 5 6 import river.core; 7 8 /** 9 * Provides a base for streams based on a file descriptor 10 */ 11 public class FDStream : Stream 12 { 13 /** 14 * Underlying file descriptor 15 */ 16 protected const int fd; 17 18 /** 19 * Creates a new `FDStream` with the backing read/write file 20 * descriptor being the one provided 21 * 22 * Params: 23 * fd = the read/write file descriptor 24 */ 25 this(int fd) 26 { 27 this.fd = fd; 28 } 29 30 /** 31 * Reads bytes from the file descriptor into the provided array 32 * and returns without any further waiting, at most the 33 * number of bytes read will be the length of the provided 34 * array, at minimum a single byte 35 * 36 * Params: 37 * toArray = the buffer to read into 38 * Returns: the number of bytes read 39 */ 40 public override ulong read(byte[] toArray) 41 { 42 version(Posix) 43 { 44 import core.sys.posix.unistd : read, ssize_t; 45 46 ssize_t status = read(fd, toArray.ptr, toArray.length); 47 48 if(status > 0) 49 { 50 return status; 51 } 52 else if(status == 0) 53 { 54 throw new StreamException(StreamError.OPERATION_FAILED, "Could not read, status 0"); 55 } 56 else 57 { 58 throw new StreamException(StreamError.OPERATION_FAILED, "Could not read, status <0"); 59 } 60 } 61 else 62 { 63 pragma(msg, "FDStream: The read() call is not implemented for your platform"); 64 static assert(false); 65 } 66 } 67 68 /** 69 * Reads bytes from the file descriptor into the provided array 70 * until the array is fully-filled 71 * 72 * Params: 73 * toArray = the buffer to read into 74 * Returns: the number of bytes read 75 */ 76 public override ulong readFully(byte[] toArray) 77 { 78 version(Posix) 79 { 80 import core.sys.posix.unistd : read, ssize_t; 81 82 /** 83 * Perform a read till the number of bytes requested is fulfilled 84 */ 85 long totalBytesRequested = toArray.length; 86 long totalBytesGot = 0; 87 while(totalBytesGot < totalBytesRequested) 88 { 89 /* Read remaining bytes into correct offset */ 90 ssize_t status = read(fd, toArray.ptr+totalBytesGot, totalBytesRequested-totalBytesGot); 91 92 if(status > 0) 93 { 94 totalBytesGot += status; 95 } 96 else if(status == 0) 97 { 98 throw new StreamException(StreamError.OPERATION_FAILED, "Could not read, status 0"); 99 } 100 else 101 { 102 throw new StreamException(StreamError.OPERATION_FAILED, "Could not read, status <0"); 103 } 104 } 105 106 assert(totalBytesGot == totalBytesRequested); 107 return totalBytesGot; 108 } 109 else 110 { 111 pragma(msg, "FDStream: The readFully() call is not implemented for your platform"); 112 static assert(false); 113 } 114 } 115 116 public override ulong write(byte[] fromArray) 117 { 118 version(Posix) 119 { 120 import core.sys.posix.unistd : write, ssize_t; 121 122 ssize_t status = write(fd, fromArray.ptr, fromArray.length); 123 124 if(status > 0) 125 { 126 return status; 127 } 128 else if(status == 0) 129 { 130 throw new StreamException(StreamError.OPERATION_FAILED, "Could not write, status 0"); 131 } 132 else 133 { 134 throw new StreamException(StreamError.OPERATION_FAILED, "Could not write, status <0"); 135 } 136 } 137 else 138 { 139 pragma(msg, "FDStream: The write() call is not implemented for your platform"); 140 static assert(false); 141 } 142 } 143 144 public override ulong writeFully(byte[] fromArray) 145 { 146 // TODO: Add a unit test for this, we should do it in something that 147 // ... has a fixed internal buffer 148 // TODO: Implement me, use the code that readFully uses but for writing 149 version(Posix) 150 { 151 import core.sys.posix.unistd : write, ssize_t; 152 153 /** 154 * Perform a write till the number of bytes requested is fulfilled 155 */ 156 long totalBytesRequested = fromArray.length; 157 long totalBytesGot = 0; 158 while(totalBytesGot < totalBytesRequested) 159 { 160 /* Write remaining bytes into correct offset */ 161 ssize_t status = write(fd, fromArray.ptr+totalBytesGot, totalBytesRequested-totalBytesGot); 162 163 if(status > 0) 164 { 165 totalBytesGot += status; 166 } 167 else if(status == 0) 168 { 169 throw new StreamException(StreamError.OPERATION_FAILED, "Could not write, status 0"); 170 } 171 else 172 { 173 throw new StreamException(StreamError.OPERATION_FAILED, "Could not write, status <0"); 174 } 175 } 176 177 assert(totalBytesGot == totalBytesRequested); 178 return totalBytesGot; 179 } 180 else 181 { 182 pragma(msg, "FDStream: The writeFully() call is not implemented for your platform"); 183 static assert(false); 184 } 185 } 186 187 /** 188 * Closes the file descriptor 189 */ 190 public override void close() 191 { 192 version(Posix) 193 { 194 import core.sys.posix.unistd : close; 195 196 /* Close the file descriptor */ 197 close(fd); 198 } 199 else 200 { 201 pragma(msg, "FDStream: The close() call is not implemented for your platform"); 202 static assert(false); 203 } 204 } 205 206 /** 207 * Retrieve the underlying file descriptor this stream is attached to 208 * 209 * Returns: the fd as an `int` 210 */ 211 public final int getFd() 212 { 213 return fd; 214 } 215 }