1 /** 
2  * FD-based stream
3  */
4 module river.impls.fd;
5 
6 import river.core;
7 
8 /** 
9  * Provides a base for streams based on a file descriptor
10  */
11 public class FDStream : Stream
12 {
13     /** 
14      * Underlying file descriptor
15      */
16     protected const int fd;
17 
18     /** 
19      * Creates a new `FDStream` with the backing read/write file
20      * descriptor being the one provided
21      *
22      * Params:
23      *   fd = the read/write file descriptor
24      */
25     this(int fd)
26     {
27         this.fd = fd;
28     }
29 
30     /** 
31      * Reads bytes from the file descriptor into the provided array
32      * and returns without any further waiting, at most the
33      * number of bytes read will be the length of the provided
34      * array, at minimum a single byte
35      *
36      * Params:
37      *   toArray = the buffer to read into
38      * Returns: the number of bytes read
39      */
40     public override ulong read(byte[] toArray)
41     {
42         version(Posix)
43         {
44             import core.sys.posix.unistd : read, ssize_t;
45 
46             ssize_t status = read(fd, toArray.ptr, toArray.length);
47 
48             if(status > 0)
49             {
50                 return status;
51             }
52             else if(status == 0)
53             {
54                 throw new StreamException(StreamError.OPERATION_FAILED, "Could not read, status 0");
55             }
56             else
57             {
58                 throw new StreamException(StreamError.OPERATION_FAILED, "Could not read, status <0");
59             }
60         }
61         else
62         {
63             pragma(msg, "FDStream: The read() call is not implemented for your platform");
64             static assert(false);
65         }
66     }
67 
68     /** 
69      * Reads bytes from the file descriptor into the provided array
70      * until the array is fully-filled
71      *
72      * Params:
73      *   toArray = the buffer to read into
74      * Returns: the number of bytes read
75      */
76     public override ulong readFully(byte[] toArray)
77     {
78         version(Posix)
79         {
80             import core.sys.posix.unistd : read, ssize_t;
81 
82             /** 
83             * Perform a read till the number of bytes requested is fulfilled
84             */
85             long totalBytesRequested = toArray.length;
86             long totalBytesGot = 0;
87             while(totalBytesGot < totalBytesRequested)
88             {
89                 /* Read remaining bytes into correct offset */
90                 ssize_t status = read(fd, toArray.ptr+totalBytesGot, totalBytesRequested-totalBytesGot);
91 
92                 if(status > 0)
93                 {
94                     totalBytesGot += status;
95                 }
96                 else if(status == 0)
97                 {
98                     throw new StreamException(StreamError.OPERATION_FAILED, "Could not read, status 0");
99                 }
100                 else
101                 {
102                     throw new StreamException(StreamError.OPERATION_FAILED, "Could not read, status <0");
103                 }
104             }
105 
106             assert(totalBytesGot == totalBytesRequested);
107             return totalBytesGot;
108         }
109         else
110         {
111             pragma(msg, "FDStream: The readFully() call is not implemented for your platform");
112             static assert(false);
113         }
114     }
115 
116     public override ulong write(byte[] fromArray)
117     {
118         version(Posix)
119         {
120             import core.sys.posix.unistd : write, ssize_t;
121 
122             ssize_t status = write(fd, fromArray.ptr, fromArray.length);
123 
124             if(status > 0)
125             {
126                 return status;
127             }
128             else if(status == 0)
129             {
130                 throw new StreamException(StreamError.OPERATION_FAILED, "Could not write, status 0");
131             }
132             else
133             {
134                 throw new StreamException(StreamError.OPERATION_FAILED, "Could not write, status <0");
135             }
136         }
137         else
138         {
139             pragma(msg, "FDStream: The write() call is not implemented for your platform");
140             static assert(false);
141         }
142     }
143 
144     public override ulong writeFully(byte[] fromArray)
145     {
146         // TODO: Add a unit test for this, we should do it in something that
147         // ... has a fixed internal buffer
148         // TODO: Implement me, use the code that readFully uses but for writing
149         version(Posix)
150         {
151             import core.sys.posix.unistd : write, ssize_t;
152 
153             /** 
154             * Perform a write till the number of bytes requested is fulfilled
155             */
156             long totalBytesRequested = fromArray.length;
157             long totalBytesGot = 0;
158             while(totalBytesGot < totalBytesRequested)
159             {
160                 /* Write remaining bytes into correct offset */
161                 ssize_t status = write(fd, fromArray.ptr+totalBytesGot, totalBytesRequested-totalBytesGot);
162 
163                 if(status > 0)
164                 {
165                     totalBytesGot += status;
166                 }
167                 else if(status == 0)
168                 {
169                     throw new StreamException(StreamError.OPERATION_FAILED, "Could not write, status 0");
170                 }
171                 else
172                 {
173                     throw new StreamException(StreamError.OPERATION_FAILED, "Could not write, status <0");
174                 }
175             }
176 
177             assert(totalBytesGot == totalBytesRequested);
178             return totalBytesGot;
179         }
180         else
181         {
182             pragma(msg, "FDStream: The writeFully() call is not implemented for your platform");
183             static assert(false);
184         }
185     }
186 
187     /**
188      * Closes the file descriptor
189      */
190     public override void close()
191     {
192         version(Posix)
193         {
194             import core.sys.posix.unistd : close;
195 
196             /* Close the file descriptor */
197             close(fd);
198         }
199         else
200         {
201             pragma(msg, "FDStream: The close() call is not implemented for your platform");
202             static assert(false);
203         }
204     }
205 
206     /** 
207      * Retrieve the underlying file descriptor this stream is attached to
208      *
209      * Returns: the fd as an `int`
210      */
211     public final int getFd()
212     {
213         return fd;
214     }
215 }