XRootD
Loading...
Searching...
No Matches
XrdClAsyncPageReader.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_
20#define SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_
21
23#include "XrdCl/XrdClSocket.hh"
26
27#include <sys/uio.h>
28#include <memory>
29#include <arpa/inet.h>
30
31namespace XrdCl
32{
33
34//------------------------------------------------------------------------------
36//------------------------------------------------------------------------------
38{
39 public:
40
41 //--------------------------------------------------------------------------
46 //--------------------------------------------------------------------------
48 std::vector<uint32_t> &digests ) :
49 chunks( chunks ),
50 digests( digests ),
51 dlen( 0 ),
52 rspoff( 0 ),
53 chindex( 0 ),
54 choff( 0 ),
55 dgindex( 0 ),
56 dgoff( 0 ),
57 iovcnt( 0 ),
58 iovindex( 0 )
59 {
60 uint64_t rdoff = chunks.front().offset;
61 uint32_t rdlen = 0;
62 for( auto &ch : chunks )
63 rdlen += ch.length;
64 int fpglen, lpglen;
65 int pgcnt = XrdOucPgrwUtils::csNum( rdoff, rdlen, fpglen, lpglen);
66 digests.resize( pgcnt );
67 }
68
69 //--------------------------------------------------------------------------
71 //--------------------------------------------------------------------------
73 {
74 }
75
76 //--------------------------------------------------------------------------
78 //--------------------------------------------------------------------------
80 {
81 dlen = rsp->status.bdy.dlen;
82 rspoff = rsp->info.pgread.offset;
83
84 uint64_t bufoff = rspoff - chunks[0].offset;
85 chindex = 0;
86
87 for( chindex = 0; chindex < chunks.size(); ++chindex )
88 {
89 if( chunks[chindex].length < bufoff )
90 {
91 bufoff -= chunks[chindex].length;
92 continue;
93 }
94 break;
95 }
96 choff = bufoff;
97 dgindex = rspoff/XrdSys::PageSize - chunks[0].offset/XrdSys::PageSize;
98 }
99
100 //--------------------------------------------------------------------------
105 //--------------------------------------------------------------------------
106 XRootDStatus Read( Socket &socket, uint32_t &btsread )
107 {
108 if( dlen == 0 || chindex >= chunks.size() )
109 return XRootDStatus();
110 btsread = 0;
111 int nbbts = 0;
112 do
113 {
114 // Prepare the IO vector for receiving the data
115 if( iov.empty() )
116 InitIOV();
117 // read the data into the buffer
118 nbbts = 0;
119 auto st = socket.ReadV( iov.data() + iovindex, iovcnt, nbbts );
120 if( !st.IsOK() )
121 return st;
122 btsread += nbbts;
123 dlen -= nbbts;
124 ShiftIOV( nbbts );
125 if( st.code == suRetry )
126 return st;
127 }
128 while( nbbts > 0 && dlen > 0 && chindex < chunks.size() );
129
130 return XRootDStatus();
131 }
132
133 private:
134
135 //--------------------------------------------------------------------------
137 //--------------------------------------------------------------------------
138 inline static int max_iovcnt()
139 {
140 // make sure it is an even number
141 static const int iovmax = XrdSys::getIovMax() & ~1;
142 return iovmax;
143 }
144
145 //--------------------------------------------------------------------------
147 //--------------------------------------------------------------------------
148 inline void addiov( char *&buf, size_t len )
149 {
150 iov.emplace_back();
151 iov.back().iov_base = buf;
152 iov.back().iov_len = len;
153 buf += len;
154 ++iovcnt;
155 }
156
157 //--------------------------------------------------------------------------
159 //--------------------------------------------------------------------------
160 inline void addiov( char *&buf, uint32_t len, uint32_t &dleft )
161 {
162 if( len > dleft ) len = dleft;
163 addiov( buf, len );
164 dleft -= len;
165 }
166
167 //--------------------------------------------------------------------------
170 //--------------------------------------------------------------------------
171 inline static uint32_t CalcIOVSize( uint32_t dleft )
172 {
173 uint32_t ret = ( dleft / PageWithDigest + 2 ) * 2;
174 return ( ret > uint32_t( max_iovcnt() ) ? max_iovcnt() : ret );
175 }
176
177 //--------------------------------------------------------------------------
179 //--------------------------------------------------------------------------
180 uint32_t CalcRdSize()
181 {
182 // data size in the server response (including digests)
183 uint32_t dleft = dlen;
184 // space in our page buffer
185 uint32_t pgspace = chunks[chindex].length - choff;
186 // space in our digest buffer
187 uint32_t dgspace = sizeof( uint32_t ) * (digests.size() - dgindex ) - dgoff;
188 if( dleft > pgspace + dgspace )
189 dleft = pgspace + dgspace;
190 return dleft;
191 }
192
193 //--------------------------------------------------------------------------
195 //--------------------------------------------------------------------------
196 void InitIOV()
197 {
198 iovindex = 0;
199 // figure out the number of data we can read in one go
200 uint32_t dleft = CalcRdSize();
201 // and reset the I/O vector
202 iov.clear();
203 iovcnt = 0;
204 iov.reserve( CalcIOVSize( dleft ) );
205 // now prepare the page and digest buffers
206 ChunkInfo ch = chunks[chindex];
207 char* pgbuf = static_cast<char*>( ch.buffer ) + choff;
208 uint64_t rdoff = ch.offset + choff;
209 char* dgbuf = reinterpret_cast<char*>( digests.data() + dgindex ) + dgoff;
210 // handle the first digest
211 uint32_t fdglen = sizeof( uint32_t ) - dgoff;
212 addiov( dgbuf, fdglen, dleft );
213 if( dleft == 0 || iovcnt >= max_iovcnt() )
214 return;
215 // handle the first page
216 uint32_t fpglen = XrdSys::PageSize - rdoff % XrdSys::PageSize;
217 addiov( pgbuf, fpglen, dleft );
218 if( dleft == 0 || iovcnt >= max_iovcnt() )
219 return;
220 // handle all the subsequent aligned pages
221 size_t fullpgs = dleft / PageWithDigest;
222 for( size_t i = 0; i < fullpgs; ++i )
223 {
224 addiov( dgbuf, sizeof( uint32_t ), dleft );
225 if( dleft == 0 || iovcnt >= max_iovcnt() )
226 return;
227 addiov( pgbuf, XrdSys::PageSize, dleft );
228 if( dleft == 0 || iovcnt >= max_iovcnt() )
229 return;
230 }
231 // handle the last digest
232 uint32_t ldglen = sizeof( uint32_t );
233 addiov( dgbuf, ldglen, dleft );
234 if( dleft == 0 || iovcnt >= max_iovcnt() )
235 return;
236 // handle the last page
237 addiov( pgbuf, dleft, dleft );
238 }
239
240 //--------------------------------------------------------------------------
242 //--------------------------------------------------------------------------
243 inline void shift( void *&buffer, size_t nbbts )
244 {
245 char *buf = static_cast<char*>( buffer );
246 buf += nbbts;
247 buffer = buf;
248 }
249
250 //--------------------------------------------------------------------------
254 //--------------------------------------------------------------------------
255 inline void shiftdgbuf( uint32_t &btsread )
256 {
257 if( iov[iovindex].iov_len > btsread )
258 {
259 iov[iovindex].iov_len -= btsread;
260 shift( iov[iovindex].iov_base, btsread );
261 dgoff += btsread;
262 btsread = 0;
263 return;
264 }
265
266 btsread -= iov[iovindex].iov_len;
267 iov[iovindex].iov_len = 0;
268 dgoff = 0;
269 digests[dgindex] = ntohl( digests[dgindex] );
270 ++dgindex;
271 ++iovindex;
272 --iovcnt;
273 }
274
275 //--------------------------------------------------------------------------
279 //--------------------------------------------------------------------------
280 inline void shiftpgbuf( uint32_t &btsread )
281 {
282 if( iov[iovindex].iov_len > btsread )
283 {
284 iov[iovindex].iov_len -= btsread;
285 shift( iov[iovindex].iov_base, btsread );
286 choff += btsread;
287 btsread = 0;
288 return;
289 }
290
291 btsread -= iov[iovindex].iov_len;
292 choff += iov[iovindex].iov_len;
293 iov[iovindex].iov_len = 0;
294 ++iovindex;
295 --iovcnt;
296 }
297
298 //--------------------------------------------------------------------------
300 //--------------------------------------------------------------------------
301 void ShiftIOV( uint32_t btsread )
302 {
303 // if iovindex is even it point to digest, otherwise it points to a page
304 if( iovindex % 2 == 0 )
305 shiftdgbuf( btsread );
306 // adjust as many I/O buffers as necessary
307 while( btsread > 0 )
308 {
309 // handle page
310 shiftpgbuf( btsread );
311 if( btsread == 0 ) break;
312 // handle digest
313 shiftdgbuf( btsread );
314 }
315 // if we filled the buffer, move to the next one
316 if( iovcnt == 0 )
317 iov.clear();
318 // do we need to move to the next chunk?
319 if( choff >= chunks[chindex].length )
320 {
321 ++chindex;
322 choff = 0;
323 }
324 }
325
326 ChunkList &chunks; //< list of data chunks to be filled with user data
327 std::vector<uint32_t> &digests; //< list of crc32c digests for every 4KB page of data
328 uint32_t dlen; //< size of the data in the message
329 uint64_t rspoff; //< response offset
330
331 size_t chindex; //< index of the current data buffer
332 size_t choff; //< offset within the current buffer
333 size_t dgindex; //< index of the current digest buffer
334 size_t dgoff; //< offset within the current digest buffer
335
336 std::vector<iovec> iov; //< I/O vector
337 int iovcnt; //< size of the I/O vector
338 size_t iovindex; //< index of the first valid element in the I/O vector
339
340 static const int PageWithDigest = XrdSys::PageSize + sizeof( uint32_t );
341};
342
343} /* namespace XrdEc */
344
345#endif /* SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_ */
ServerResponseStatus status
struct ServerResponseBody_Status bdy
union ServerResponseV2::@207342300141235315373173036347114307032363217365 info
void SetRsp(ServerResponseV2 *rsp)
Sets message data size.
virtual ~AsyncPageReader()
Destructor.
AsyncPageReader(ChunkList &chunks, std::vector< uint32_t > &digests)
XRootDStatus Read(Socket &socket, uint32_t &btsread)
A network socket.
XRootDStatus ReadV(iovec *iov, int iocnt, int &bytesRead)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint16_t suRetry
std::vector< ChunkInfo > ChunkList
List of chunks.
static const int PageSize
int getIovMax()