1 /** 
2  * FD-based stream
3  */
4 module river.impls.fd;
5 
6 import river.core;
7 
8 /** 
9  * Provides a base for streams based on a file descriptor
10  */
11 public class FDStream : Stream
12 {
13     /** 
14      * Underlying file descriptor
15      */
16     protected const int fd;
17 
18     /** 
19      * Creates a new `FDStream` with the backing read/write file
20      * descriptor being the one provided
21      *
22      * Params:
23      *   fd = the read/write file descriptor
24      */
25     this(int fd)
26     {
27         this.fd = fd;
28     }
29 
30     /** 
31      * Reads bytes from the file descriptor into the provided array
32      * and returns without any further waiting, at most the
33      * number of bytes read will be the length of the provided
34      * array, at minimum a single byte
35      *
36      * Params:
37      *   toArray = the buffer to read into
38      * Returns: the number of bytes read
39      */
40     public override ulong read(byte[] toArray)
41     {
42         version(Posix)
43         {
44             import core.sys.posix.unistd : read, ssize_t;
45 
46             ssize_t status = read(fd, toArray.ptr, toArray.length);
47 
48             if(status > 0)
49             {
50                 return status;
51             }
52             else if(status == 0)
53             {
54                 throw new StreamException(StreamError.OPERATION_FAILED, "Could not read, status 0");
55             }
56             else
57             {
58                 throw new StreamException(StreamError.OPERATION_FAILED, "Could not read, status <0");
59             }
60         }
61         else
62         {
63             pragma(msg, "FDStream: The read() call is not implemented for your platform");
64             static assert(false);
65         }
66     }
67 
68     /** 
69      * Reads bytes from the file descriptor into the provided array
70      * until the array is fully-filled
71      *
72      * Params:
73      *   toArray = the buffer to read into
74      * Returns: the number of bytes read
75      */
76     public override ulong readFully(byte[] toArray)
77     {
78         version(Posix)
79         {
80             import core.sys.posix.unistd : read, ssize_t;
81 
82             /** 
83             * Perform a read till the number of bytes requested is fulfilled
84             */
85             long totalBytesRequested = toArray.length;
86             long totalBytesGot = 0;
87             while(totalBytesGot < totalBytesRequested)
88             {
89                 /* Read remaining bytes into correct offset */
90                 ssize_t status = read(fd, toArray.ptr+totalBytesGot, totalBytesRequested-totalBytesGot);
91 
92                 if(status > 0)
93                 {
94                     totalBytesGot += status;
95                 }
96                 else if(status == 0)
97                 {
98                     throw new StreamException(StreamError.OPERATION_FAILED, "Could not read, status 0");
99                 }
100                 else
101                 {
102                     throw new StreamException(StreamError.OPERATION_FAILED, "Could not read, status <0");
103                 }
104             }
105 
106             assert(totalBytesGot == totalBytesRequested);
107             return totalBytesGot;
108         }
109         else
110         {
111             pragma(msg, "FDStream: The readFully() call is not implemented for your platform");
112             static assert(false);
113         }
114     }
115 
116     public override ulong write(byte[] fromArray)
117     {
118         version(Posix)
119         {
120             import core.sys.posix.unistd : write, ssize_t;
121 
122             ssize_t status = write(fd, fromArray.ptr, fromArray.length);
123 
124             if(status > 0)
125             {
126                 return status;
127             }
128             else if(status == 0)
129             {
130                 throw new StreamException(StreamError.OPERATION_FAILED, "Could not write, status 0");
131             }
132             else
133             {
134                 throw new StreamException(StreamError.OPERATION_FAILED, "Could not write, status <0");
135             }
136         }
137         else
138         {
139             pragma(msg, "FDStream: The write() call is not implemented for your platform");
140             static assert(false);
141         }
142     }
143 
144     public override ulong writeFully(byte[] fromArray)
145     {
146         // TODO: Implement me
147         return 0;
148     }
149 
150     /**
151      * Closes the file descriptor
152      */
153     public override void close()
154     {
155         version(Posix)
156         {
157             import core.sys.posix.unistd : close;
158 
159             // TODO: Do something with the error code of both calls to `close`
160             close(fd);
161         }
162         else
163         {
164             pragma(msg, "FDStream: The close() call is not implemented for your platform");
165             static assert(false);
166         }
167     }
168 }