XRootD
Loading...
Searching...
No Matches
XrdClXRootDMsgHandler.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
26#define __XRD_CL_XROOTD_MSG_HANDLER_HH__
27
31#include "XrdCl/XrdClMessage.hh"
33#include "XrdCl/XrdClLog.hh"
35
41
46
48#include "XrdOuc/XrdOucUtils.hh"
49
50#include <sys/uio.h>
51#include <arpa/inet.h> // for network unmarshaling stuff
52
53#include <array>
54#include <list>
55#include <memory>
56#include <atomic>
57#include <memory>
58
59namespace XrdCl
60{
61 class PostMaster;
62 class SIDManager;
63 class URL;
64 class LocalFileHandler;
65 class Socket;
66
67 //----------------------------------------------------------------------------
68 // Single entry in the redirect-trace-back
69 //----------------------------------------------------------------------------
71 {
79
80 RedirectEntry( const URL &from, const URL &to, Type type ) :
81 from( from ), to( to ), type( type )
82 {
83
84 }
85
90
91 std::string ToString( bool prevok = true )
92 {
93 const std::string tostr = to.GetLocation();
94 const std::string fromstr = from.GetLocation();
95
96 if( prevok )
97 {
98 switch( type )
99 {
100 case EntryRedirect: return "Redirected from: " + fromstr + " to: "
101 + tostr;
102
103 case EntryRedirectOnWait: return "Server responded with wait. "
104 "Falling back to virtual redirector: " + tostr;
105
106 case EntryRetry: return "Retrying: " + tostr;
107
108 case EntryWait: return "Waited at server request. Resending: "
109 + tostr;
110 }
111 }
112 return "Failed at: " + fromstr + ", retrying at: " + tostr;
113 }
114 };
115
116 //----------------------------------------------------------------------------
118 //----------------------------------------------------------------------------
120 {
121 friend class HandleRspJob;
122
123 public:
124 //------------------------------------------------------------------------
133 //------------------------------------------------------------------------
135 ResponseHandler *respHandler,
136 const URL *url,
137 std::shared_ptr<SIDManager> sidMgr,
138 LocalFileHandler *lFileHandler):
139 pRequest( msg ),
140 pResponseHandler( respHandler ),
141 pUrl( *url ),
142 pEffectiveDataServerUrl( 0 ),
143 pSidMgr( sidMgr ),
144 pLFileHandler( lFileHandler ),
145 pExpiration( 0 ),
146 pRedirectAsAnswer( false ),
147 pOksofarAsAnswer( false ),
148 pHasLoadBalancer( false ),
149 pHasSessionId( false ),
150 pChunkList( 0 ),
151 pKBuff( 0 ),
152 pRedirectCounter( 0 ),
153 pNotAuthorizedCounter( 0 ),
154
155 pAsyncOffset( 0 ),
156 pAsyncChunkIndex( 0 ),
157
158 pPgWrtCksumBuff( 4 ),
159 pPgWrtCurrentPageOffset( 0 ),
160 pPgWrtCurrentPageNb( 0 ),
161
162 pOtherRawStarted( false ),
163
164 pFollowMetalink( false ),
165
166 pStateful( false ),
167
168 pAggregatedWaitTime( 0 ),
169
170 pMsgInFly( false ),
171 pSendingState( 0 ),
172
173 pTimeoutFence( false ),
174
175 pDirListStarted( false ),
176 pDirListWithStat( false ),
177
178 pCV( 0 ),
179
180 pSslErrCnt( 0 )
181 {
182 pPostMaster = DefaultEnv::GetPostMaster();
183 if( msg->GetSessionId() )
184 pHasSessionId = true;
185
186 Log *log = DefaultEnv::GetLog();
187 log->Debug( ExDbgMsg, "[%s] MsgHandler created: %p (message: %s ).",
188 pUrl.GetHostId().c_str(), (void*)this,
189 pRequest->GetObfuscatedDescription().c_str() );
190
191 ClientRequestHdr *hdr = (ClientRequestHdr*)pRequest->GetBuffer();
192 if( ntohs( hdr->requestid ) == kXR_pgread )
193 {
194 ClientPgReadRequest *pgrdreq = (ClientPgReadRequest*)pRequest->GetBuffer();
195 pCrc32cDigests.reserve( XrdOucPgrwUtils::csNum( ntohll( pgrdreq->offset ),
196 ntohl( pgrdreq->rlen ) ) );
197 }
198
199 //----------------------------------------------------------------------
200 // Pass the reader our pUrl, not *url. The latter is a reference, likely
201 // from FileStateHandler such as *pDataServer. Accessing that throughout
202 // our lifetime may lead to concurrent access. In the case of read-
203 // recovery the FileStateHandler may entirely reallocate the url object.
204 //----------------------------------------------------------------------
205 if( ntohs( hdr->requestid ) == kXR_readv )
206 pBodyReader.reset( new AsyncVectorReader( pUrl, *pRequest ) );
207 else if( ntohs( hdr->requestid ) == kXR_read )
208 pBodyReader.reset( new AsyncRawReader( pUrl, *pRequest ) );
209 else
210 pBodyReader.reset( new AsyncDiscardReader( pUrl, *pRequest ) );
211 }
212
213 //------------------------------------------------------------------------
215 //------------------------------------------------------------------------
217 {
218 DumpRedirectTraceBack();
219
220 if( !pHasSessionId )
221 delete pRequest;
222 delete pEffectiveDataServerUrl;
223
224 pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
225 pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
226 pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
227 pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
228 pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
229 pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
230
231 Log *log = DefaultEnv::GetLog();
232 log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: %p.",
233 pUrl.GetHostId().c_str(), (void*)this );
234 }
235
236 //------------------------------------------------------------------------
242 //------------------------------------------------------------------------
243 virtual uint16_t Examine( std::shared_ptr<Message> &msg ) override;
244
245 //------------------------------------------------------------------------
254 //------------------------------------------------------------------------
255 virtual uint16_t InspectStatusRsp() override;
256
257 //------------------------------------------------------------------------
261 //------------------------------------------------------------------------
262 virtual uint16_t GetSid() const override;
263
264 //------------------------------------------------------------------------
268 //------------------------------------------------------------------------
269 virtual void Process() override;
270
271 //------------------------------------------------------------------------
281 //------------------------------------------------------------------------
283 Socket *socket,
284 uint32_t &bytesRead ) override;
285
286 //------------------------------------------------------------------------
291 //------------------------------------------------------------------------
292 virtual uint8_t OnStreamEvent( StreamEvent event,
293 XRootDStatus status ) override;
294
295 //------------------------------------------------------------------------
297 //------------------------------------------------------------------------
298 virtual void OnStatusReady( const Message *message,
299 XRootDStatus status ) override;
300
301 //------------------------------------------------------------------------
303 //------------------------------------------------------------------------
304 virtual bool IsRaw() const override;
305
306 //------------------------------------------------------------------------
315 //------------------------------------------------------------------------
317 uint32_t &bytesWritten ) override;
318
319 //------------------------------------------------------------------------
323 //------------------------------------------------------------------------
324 void WaitDone( time_t now );
325
326 //------------------------------------------------------------------------
328 //------------------------------------------------------------------------
329 void SetExpiration( time_t expiration )
330 {
331 pExpiration = expiration;
332 }
333
334 //------------------------------------------------------------------------
336 //------------------------------------------------------------------------
337 time_t GetExpiration() override
338 {
339 return pExpiration;
340 }
341
342 //------------------------------------------------------------------------
345 //------------------------------------------------------------------------
346 void SetRedirectAsAnswer( bool redirectAsAnswer )
347 {
348 pRedirectAsAnswer = redirectAsAnswer;
349 }
350
351 //------------------------------------------------------------------------
354 //------------------------------------------------------------------------
355 void SetOksofarAsAnswer( bool oksofarAsAnswer )
356 {
357 pOksofarAsAnswer = oksofarAsAnswer;
358 }
359
360 //------------------------------------------------------------------------
362 //------------------------------------------------------------------------
363 const Message *GetRequest() const
364 {
365 return pRequest;
366 }
367
368 //------------------------------------------------------------------------
370 //------------------------------------------------------------------------
371 void SetLoadBalancer( const HostInfo &loadBalancer )
372 {
373 if( !loadBalancer.url.IsValid() )
374 return;
375 pLoadBalancer = loadBalancer;
376 pHasLoadBalancer = true;
377 }
378
379 //------------------------------------------------------------------------
381 //------------------------------------------------------------------------
382 void SetHostList( HostList *hostList )
383 {
384 pHosts.reset( hostList );
385 }
386
387 //------------------------------------------------------------------------
389 //------------------------------------------------------------------------
390 void SetChunkList( ChunkList *chunkList )
391 {
392 pChunkList = chunkList;
393 if( pBodyReader )
394 pBodyReader->SetChunkList( chunkList );
395 if( chunkList )
396 pChunkStatus.resize( chunkList->size() );
397 else
398 pChunkStatus.clear();
399 }
400
401 void SetCrc32cDigests( std::vector<uint32_t> && crc32cDigests )
402 {
403 pCrc32cDigests = std::move( crc32cDigests );
404 }
405
406 //------------------------------------------------------------------------
408 //------------------------------------------------------------------------
410 {
411 pKBuff = kbuff;
412 }
413
414 //------------------------------------------------------------------------
416 //------------------------------------------------------------------------
417 void SetRedirectCounter( uint16_t redirectCounter )
418 {
419 pRedirectCounter = redirectCounter;
420 }
421
422 void SetFollowMetalink( bool followMetalink )
423 {
424 pFollowMetalink = followMetalink;
425 }
426
427 void SetStateful( bool stateful )
428 {
429 pStateful = stateful;
430 }
431
432 //------------------------------------------------------------------------
436 //------------------------------------------------------------------------
437 void PartialReceived();
438
439 void OnReadyToSend( [[maybe_unused]] Message *msg ) override
440 {
441 pSendingState |= kSawReadySend;
442 }
443
444 private:
445
446 // bit flags used with pSendingState
447 static constexpr int kSendDone = 0x0001;
448 static constexpr int kSawResp = 0x0002;
449 static constexpr int kFinalResp = 0x0004;
450 static constexpr int kSawReadySend = 0x0008;
451
452 //------------------------------------------------------------------------
454 //------------------------------------------------------------------------
455 void HandleError( XRootDStatus status );
456
457 //------------------------------------------------------------------------
459 //------------------------------------------------------------------------
460 Status RetryAtServer( const URL &url, RedirectEntry::Type entryType );
461
462 //------------------------------------------------------------------------
464 //------------------------------------------------------------------------
465 void HandleResponse();
466
467 //------------------------------------------------------------------------
469 //------------------------------------------------------------------------
470 XRootDStatus *ProcessStatus();
471
472 //------------------------------------------------------------------------
475 //------------------------------------------------------------------------
476 Status ParseResponse( AnyObject *&response );
477
478 //------------------------------------------------------------------------
481 //------------------------------------------------------------------------
482 Status ParseXAttrResponse( char *data, size_t len, AnyObject *&response );
483
484 //------------------------------------------------------------------------
487 //------------------------------------------------------------------------
488 Status RewriteRequestRedirect( const URL &newUrl );
489
490 //------------------------------------------------------------------------
492 //------------------------------------------------------------------------
493 Status RewriteRequestWait();
494
495 //------------------------------------------------------------------------
497 //------------------------------------------------------------------------
498 void UpdateTriedCGI(uint32_t errNo=0);
499
500 //------------------------------------------------------------------------
502 //------------------------------------------------------------------------
503 void SwitchOnRefreshFlag();
504
505 //------------------------------------------------------------------------
508 //------------------------------------------------------------------------
509 void HandleRspOrQueue();
510
511 //------------------------------------------------------------------------
513 //------------------------------------------------------------------------
514 void HandleLocalRedirect( URL *url );
515
516 //------------------------------------------------------------------------
521 //------------------------------------------------------------------------
522 bool IsRetriable();
523
524 //------------------------------------------------------------------------
531 //------------------------------------------------------------------------
532 bool OmitWait( Message &request, const URL &url );
533
534 //------------------------------------------------------------------------
540 //------------------------------------------------------------------------
541 bool RetriableErrorResponse( const Status &status );
542
543 //------------------------------------------------------------------------
545 //------------------------------------------------------------------------
546 void DumpRedirectTraceBack();
547
554 //------------------------------------------------------------------------
555 template<typename T>
556 Status ReadFromBuffer( char *&buffer, size_t &buflen, T& result );
557
558 //------------------------------------------------------------------------
565 //------------------------------------------------------------------------
566 Status ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result );
567
568 //------------------------------------------------------------------------
576 //------------------------------------------------------------------------
577 Status ReadFromBuffer( char *&buffer, size_t &buflen, size_t size,
578 std::string &result );
579
580 //------------------------------------------------------------------------
581 // Helper struct for async reading of chunks
582 //------------------------------------------------------------------------
583 struct ChunkStatus
584 {
585 ChunkStatus(): sizeError( false ), done( false ) {}
586 bool sizeError;
587 bool done;
588 };
589
590 typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
591
592 static const size_t CksumSize = sizeof( uint32_t );
593 static const size_t PageWithCksum = XrdSys::PageSize + CksumSize;
594 static const size_t MaxSslErrRetry = 3;
595
596 inline static size_t NbPgPerRsp( uint64_t offset, uint32_t dlen )
597 {
598 uint32_t pgcnt = 0;
599 uint32_t remainder = offset % XrdSys::PageSize;
600 if( remainder > 0 )
601 {
602 // account for the first unaligned page
603 ++pgcnt;
604 // the size of the 1st unaligned page
605 uint32_t _1stpg = XrdSys::PageSize - remainder;
606 if( _1stpg + CksumSize > dlen )
607 _1stpg = dlen - CksumSize;
608 dlen -= _1stpg + CksumSize;
609 }
610 pgcnt += dlen / PageWithCksum;
611 if( dlen % PageWithCksum )
612 ++ pgcnt;
613 return pgcnt;
614 }
615
616 Message *pRequest;
617 std::shared_ptr<Message> pResponse; //< the ownership is shared with MsgReader
618 std::vector<std::shared_ptr<Message>> pPartialResps; //< the ownership is shared with MsgReader
619 ResponseHandler *pResponseHandler;
620 URL pUrl;
621 URL *pEffectiveDataServerUrl;
622 PostMaster *pPostMaster;
623 std::shared_ptr<SIDManager> pSidMgr;
624 LocalFileHandler *pLFileHandler;
625 XRootDStatus pStatus;
626 Status pLastError;
627 time_t pExpiration;
628 bool pRedirectAsAnswer;
629 bool pOksofarAsAnswer;
630 std::unique_ptr<HostList> pHosts;
631 bool pHasLoadBalancer;
632 HostInfo pLoadBalancer;
633 bool pHasSessionId;
634 std::string pRedirectUrl;
635 ChunkList *pChunkList;
636 std::vector<uint32_t> pCrc32cDigests;
637 XrdSys::KernelBuffer *pKBuff;
638 std::vector<ChunkStatus> pChunkStatus;
639 uint16_t pRedirectCounter;
640 uint16_t pNotAuthorizedCounter;
641
642 uint32_t pAsyncOffset;
643 uint32_t pAsyncChunkIndex;
644
645 std::unique_ptr<AsyncPageReader> pPageReader;
646 std::unique_ptr<AsyncRawReaderIntfc> pBodyReader;
647
648 Buffer pPgWrtCksumBuff;
649 uint32_t pPgWrtCurrentPageOffset;
650 uint32_t pPgWrtCurrentPageNb;
651
652 bool pOtherRawStarted;
653
654 bool pFollowMetalink;
655
656 bool pStateful;
657 int pAggregatedWaitTime;
658
659 std::unique_ptr<RedirectEntry> pRdirEntry;
660 RedirectTraceBack pRedirectTraceBack;
661
662 bool pMsgInFly;
663 std::atomic<int> pSendingState;
664
665 //------------------------------------------------------------------------
666 // true if MsgHandler is both in inQueue and installed in respective
667 // Stream (this could happen if server gave oksofar response), otherwise
668 // false
669 //------------------------------------------------------------------------
670 std::atomic<bool> pTimeoutFence;
671
672 //------------------------------------------------------------------------
673 // if we are serving chunked data to the user's handler in case of
674 // kXR_dirlist we need to memorize if the response contains stat info or
675 // not (the information is only encoded in the first chunk)
676 //------------------------------------------------------------------------
677 bool pDirListStarted;
678 bool pDirListWithStat;
679
680 //------------------------------------------------------------------------
681 // synchronization is needed in case the MsgHandler has been configured
682 // to serve kXR_oksofar as a response to the user's handler
683 //------------------------------------------------------------------------
684 XrdSysCondVar pCV;
685
686 //------------------------------------------------------------------------
687 // Count of consecutive `errTlsSslError` errors
688 //------------------------------------------------------------------------
689 size_t pSslErrCnt;
690 };
691}
692
693#endif // __XRD_CL_XROOTD_MSG_HANDLER_HH__
kXR_unt16 requestid
Definition XProtocol.hh:157
@ kXR_read
Definition XProtocol.hh:125
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_pgread
Definition XProtocol.hh:142
Object for discarding data.
Object for reading out data from the kXR_read response.
Object for reading out data from the VectorRead response.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
Handle diagnostics.
Definition XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
The message representation used throughout the system.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
StreamEvent
Events that may have occurred to the stream.
A hub for dispatching and receiving messages.
Handle an async response.
Handle XRootD stream IDs.
A network socket.
URL representation.
Definition XrdClURL.hh:31
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:452
void SetRedirectCounter(uint16_t redirectCounter)
Set the redirect counter.
void SetFollowMetalink(bool followMetalink)
void SetChunkList(ChunkList *chunkList)
Set the chunk list.
void SetHostList(HostList *hostList)
Set host list.
virtual uint16_t InspectStatusRsp() override
virtual void OnStatusReady(const Message *message, XRootDStatus status) override
The requested action has been performed and the status is available.
void SetCrc32cDigests(std::vector< uint32_t > &&crc32cDigests)
const Message * GetRequest() const
Get the request pointer.
void SetLoadBalancer(const HostInfo &loadBalancer)
Set the load balancer.
virtual uint16_t Examine(std::shared_ptr< Message > &msg) override
void OnReadyToSend(Message *msg) override
XRootDMsgHandler(Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
virtual void Process() override
Process the message if it was "taken" by the examine action.
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead) override
void SetOksofarAsAnswer(bool oksofarAsAnswer)
time_t GetExpiration() override
Get a timestamp after which we give up.
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
void SetKernelBuffer(XrdSys::KernelBuffer *kbuff)
Set the kernel buffer.
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status) override
virtual uint16_t GetSid() const override
void SetExpiration(time_t expiration)
Set a timestamp after which we give up.
virtual bool IsRaw() const override
Are we a raw writer or not?
void SetRedirectAsAnswer(bool redirectAsAnswer)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
std::vector< HostInfo > HostList
const uint64_t ExDbgMsg
std::vector< ChunkInfo > ChunkList
List of chunks.
static const int PageSize
URL url
URL of the host.
RedirectEntry(const URL &from, const URL &to, Type type)
std::string ToString(bool prevok=true)
Procedure execution status.