59#if __cplusplus < 201103L
68 template<
typename U = std::ratio<1, 1>>
72 mytimer_t() : start( clock_t::now() ){ }
73 void reset(){ start = clock_t::now(); }
74 uint64_t elapsed()
const
76 return std::chrono::duration_cast<unit_t>( clock_t::now() - start ).count();
79 typedef std::chrono::high_resolution_clock clock_t;
80 typedef std::chrono::duration<uint64_t, U> unit_t;
81 std::chrono::time_point<clock_t> start;
84 using timer_sec_t = mytimer_t<>;
85 using timer_nsec_t = mytimer_t<std::nano>;
89 std::vector<XrdCl::xattr_t> &out )
91 std::vector<XrdCl::xattr_t> ret;
92 ret.reserve( in.size() );
93 std::vector<XrdCl::XAttr>::iterator itr = in.begin();
94 for( ; itr != in.end() ; ++itr )
96 if( !itr->status.IsOK() )
return itr->status;
98 ret.push_back( std::move( xa ) );
108 std::vector<XrdCl::xattr_t> &xattrs )
110 std::vector<XrdCl::XAttr> rsp;
112 if( !st.
IsOK() )
return st;
113 return Translate( rsp, xattrs );
120 std::vector<XrdCl::xattr_t> &xattrs )
124 std::vector<XrdCl::XAttr> rsp;
126 if( !st.
IsOK() )
return st;
127 return Translate( rsp, xattrs );
131 const std::vector<XrdCl::xattr_t> &xattrs )
133 std::vector<XrdCl::XAttrStatus> rsp;
135 std::vector<XrdCl::XAttrStatus>::iterator itr = rsp.begin();
136 for( ; itr != rsp.end() ; ++itr )
137 if( !itr->status.IsOK() )
return itr->status;
150 Source(
const std::string &checkSumType =
"",
151 const std::vector<std::string> &addcks = std::vector<std::string>() ) :
155 if( !checkSumType.empty() )
156 pCkSumHelper =
new XrdCl::CheckSumHelper(
"source", checkSumType );
158 for(
auto &type : addcks )
159 pAddCksHelpers.push_back(
new XrdCl::CheckSumHelper(
"source", type ) );
165 for(
auto ptr : pAddCksHelpers )
172 virtual XrdCl::XRootDStatus Initialize() = 0;
177 virtual int64_t GetSize() = 0;
182 virtual XrdCl::XRootDStatus StartAt( uint64_t offset ) = 0;
192 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci ) = 0;
197 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
198 std::string &checkSumType ) = 0;
203 virtual std::vector<std::string> GetAddCks() = 0;
208 virtual XrdCl::XRootDStatus
GetXAttr( std::vector<XrdCl::xattr_t> &xattrs ) = 0;
213 virtual XrdCl::XRootDStatus TryOtherServer()
220 XrdCl::CheckSumHelper *pCkSumHelper;
221 std::vector<XrdCl::CheckSumHelper*> pAddCksHelpers;
234 Destination(
const std::string &checkSumType =
"" ):
235 pPosc( false ), pForce( false ), pCoerce( false ), pMakeDir( false ),
236 pContinue( false ), pCkSumHelper( 0 )
238 if( !checkSumType.empty() )
239 pCkSumHelper =
new XrdCl::CheckSumHelper(
"destination", checkSumType );
245 virtual ~Destination()
253 virtual XrdCl::XRootDStatus Initialize() = 0;
258 virtual XrdCl::XRootDStatus Finalize() = 0;
266 virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci ) = 0;
271 virtual XrdCl::XRootDStatus Flush() = 0;
276 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
277 std::string &checkSumType ) = 0;
282 virtual XrdCl::XRootDStatus
SetXAttr(
const std::vector<XrdCl::xattr_t> &xattrs ) = 0;
287 virtual int64_t GetSize() = 0;
292 void SetPOSC(
bool posc )
300 void SetForce(
bool force )
308 void SetContinue(
bool continue_ )
310 pContinue = continue_;
316 void SetCoerce(
bool coerce )
324 void SetMakeDir(
bool makedir )
332 virtual const std::string& GetLastURL()
const
334 static const std::string empty;
341 virtual const std::string& GetWrtRecoveryRedir()
const
343 static const std::string empty;
354 XrdCl::CheckSumHelper *pCkSumHelper;
360 class StdInSource:
public Source
366 StdInSource(
const std::string &ckSumType, uint32_t chunkSize,
const std::vector<std::string> &addcks ):
367 Source( ckSumType, addcks ),
369 pChunkSize( chunkSize )
377 virtual ~StdInSource()
385 virtual XrdCl::XRootDStatus Initialize()
389 auto st = pCkSumHelper->Initialize();
390 if( !st.
IsOK() )
return st;
391 for(
auto cksHelper : pAddCksHelpers )
393 st = cksHelper->Initialize();
394 if( !st.
IsOK() )
return st;
397 return XrdCl::XRootDStatus();
403 virtual int64_t GetSize()
411 virtual XrdCl::XRootDStatus StartAt( uint64_t )
414 "Cannot continue from stdin!" );
420 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
422 using namespace XrdCl;
423 Log *log = DefaultEnv::GetLog();
425 uint32_t toRead = pChunkSize;
426 char *buffer =
new char[toRead];
428 int64_t bytesRead = 0;
432 int64_t bRead =
read( 0, buffer+offset, toRead );
435 log->
Debug( UtilityMsg,
"Unable to read from stdin: %s",
438 return XRootDStatus( stError, errOSError, errno );
452 return XRootDStatus( stOK, suDone );
456 pCkSumHelper->Update( buffer, bytesRead );
458 for(
auto cksHelper : pAddCksHelpers )
459 cksHelper->Update( buffer, bytesRead );
461 ci = XrdCl::PageInfo( pCurrentOffset, bytesRead, buffer );
462 pCurrentOffset += bytesRead;
463 return XRootDStatus( stOK, suContinue );
469 virtual XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
470 std::string &checkSum,
471 std::string &checkSumType )
473 using namespace XrdCl;
475 return cksHelper->
GetCheckSum( checkSum, checkSumType );
476 return XRootDStatus( stError, errCheckSumError );
482 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
483 std::string &checkSumType )
485 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
491 std::vector<std::string> GetAddCks()
493 std::vector<std::string> ret;
494 for(
auto cksHelper : pAddCksHelpers )
496 std::string type = cksHelper->
GetType();
498 GetCheckSumImpl( cksHelper, cks, type );
499 ret.push_back( type +
":" + cks );
507 virtual XrdCl::XRootDStatus
GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
509 return XrdCl::XRootDStatus();
513 StdInSource(
const StdInSource &other);
514 StdInSource &operator = (
const StdInSource &other);
516 uint64_t pCurrentOffset;
523 class XRootDSource:
public Source
525 struct CancellableJob :
public XrdCl::Job
527 virtual void Cancel() = 0;
536 template<
typename READER>
537 struct OnConnJob :
public CancellableJob
539 OnConnJob( XRootDSource *self, READER *reader ) : self( self ), reader( reader )
545 std::unique_lock<std::mutex> lck( mtx );
546 if( !self || !reader )
return;
548 if( self->pNbConn < self->pMaxNbConn )
549 self->FillQueue( reader );
554 std::unique_lock<std::mutex> lck( mtx );
570 XrdCl::XRootDStatus TryOtherServer()
572 return pFile->TryOtherServer();
578 XRootDSource(
const XrdCl::URL *url,
580 uint8_t parallelChunks,
581 const std::string &ckSumType,
582 const std::vector<std::string> &addcks,
584 Source( ckSumType, addcks ),
585 pUrl( url ), pFile( new XrdCl::
File() ), pSize( -1 ),
586 pCurrentOffset( 0 ), pChunkSize( chunkSize ),
587 pParallel( parallelChunks ),
588 pNbConn( 0 ), pUsePgRead( false ),
589 pDoServer( doserver )
593 pMaxNbConn = val - 1;
599 virtual ~XRootDSource()
602 pDataConnCB->Cancel();
605 if( pFile->IsOpen() )
606 XrdCl::XRootDStatus status = pFile->Close();
613 virtual XrdCl::XRootDStatus Initialize()
615 using namespace XrdCl;
616 Log *log = DefaultEnv::GetLog();
617 log->
Debug( UtilityMsg,
"Opening %s for reading",
618 pUrl->GetObfuscatedURL().c_str() );
621 DefaultEnv::GetEnv()->GetString(
"ReadRecovery", value );
622 pFile->SetProperty(
"ReadRecovery", value );
624 XRootDStatus st = pFile->Open( pUrl->GetURL(), OpenFlags::Read );
629 st = pFile->Stat(
false, statInfo );
636 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper && !pContinue )
638 st = pCkSumHelper->Initialize();
639 if( !st.
IsOK() )
return st;
641 for(
auto cksHelper : pAddCksHelpers )
644 if( !st.
IsOK() )
return st;
651 if( !pUrl->IsLocalFile() ||
652 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
654 pFile->GetProperty(
"LastURL", pDataServer );
658 if( ( !pUrl->IsLocalFile() && !pFile->IsSecure() ) ||
659 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
673 if( pDoServer && !pUrl->IsLocalFile() )
676 DefaultEnv::GetPostMaster()->QueryTransport( pDataServer, StreamQuery::IpStack, obj );
677 std::string *ipstack =
nullptr;
679 std::cerr <<
"!-!" << *ipstack << std::endl;
683 SetOnDataConnectHandler( pFile );
685 return XRootDStatus();
691 virtual int64_t GetSize()
699 virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
701 pCurrentOffset = offset;
703 return XrdCl::XRootDStatus();
714 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
716 return GetChunkImpl( pFile, ci );
722 virtual XrdCl::XRootDStatus
GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
724 return ::GetXAttr( *pFile, xattrs );
732 while( !pChunks.empty() )
734 ChunkHandler *ch = pChunks.front();
737 delete [] (
char *)ch->chunk.GetBuffer();
745 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
746 std::string &checkSumType )
748 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
751 XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
752 std::string &checkSum,
753 std::string &checkSumType )
755 if( pUrl->IsMetalink() )
758 XrdCl::VirtualRedirector *redirector = registry.
Get( *pUrl );
759 checkSum = redirector->
GetCheckSum( checkSumType );
760 if( !checkSum.empty() )
return XrdCl::XRootDStatus();
763 if( pUrl->IsLocalFile() )
770 return cksHelper->
GetCheckSum( checkSum, checkSumType );
775 std::string dataServer; pFile->GetProperty(
"DataServer", dataServer );
776 std::string lastUrl; pFile->GetProperty(
"LastURL", lastUrl );
783 std::vector<std::string> GetAddCks()
785 std::vector<std::string> ret;
786 for(
auto cksHelper : pAddCksHelpers )
788 std::string type = cksHelper->
GetType();
790 GetCheckSumImpl( cksHelper, cks, type );
791 ret.push_back( cks );
797 XRootDSource(
const XRootDSource &other);
798 XRootDSource &operator = (
const XRootDSource &other);
805 template<
typename READER>
806 inline void FillQueue( READER *reader )
811 uint16_t parallel = pParallel;
812 if( pNbConn < pMaxNbConn )
815 NbConnectedStrm( pDataServer );
817 if( pNbConn ) parallel *= pNbConn;
819 while( pChunks.size() < parallel && pCurrentOffset < pSize )
821 uint64_t chunkSize = pChunkSize;
822 if( pCurrentOffset + chunkSize > (uint64_t)pSize )
823 chunkSize = pSize - pCurrentOffset;
825 char *buffer =
new char[chunkSize];
826 ChunkHandler *ch =
new ChunkHandler();
828 ? reader->PgRead( pCurrentOffset, chunkSize, buffer, ch )
829 : reader->Read( pCurrentOffset, chunkSize, buffer, ch );
831 pCurrentOffset += chunkSize;
844 template<
typename READER>
845 void SetOnDataConnectHandler( READER *reader )
848 pDataConnCB.reset(
new OnConnJob<READER>(
this, reader ) );
851 if( pDataServer.empty() )
return;
865 template<
typename READER>
866 XrdCl::XRootDStatus GetChunkImpl( READER *reader, XrdCl::PageInfo &ci )
871 using namespace XrdCl;
872 Log *log = DefaultEnv::GetLog();
877 std::unique_lock<std::mutex> lck( pDataConnCB->mtx );
883 if( pChunks.empty() )
884 return XRootDStatus( stOK, suDone );
886 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
892 if( !ch->status.IsOK() )
894 log->
Debug( UtilityMsg,
"Unable read %d bytes at %llu from %s: %s",
895 ch->chunk.GetLength(), (
unsigned long long) ch->chunk.GetOffset(),
896 pUrl->GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
897 delete [] (
char *)ch->chunk.GetBuffer();
902 ci = std::move( ch->chunk );
904 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && !pContinue )
909 for(
auto cksHelper : pAddCksHelpers )
913 return XRootDStatus( stOK, suContinue );
919 class ChunkHandler:
public XrdCl::ResponseHandler
922 ChunkHandler(): sem( new XrdSysSemaphore(0) ) {}
923 virtual ~ChunkHandler() {
delete sem; }
924 virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
925 XrdCl::AnyObject *response )
927 this->status = *statusval;
931 chunk = ToChunk( response );
937 XrdCl::PageInfo ToChunk( XrdCl::AnyObject *response )
939 if( response->
Has<XrdCl::PageInfo>() )
941 XrdCl::PageInfo *resp =
nullptr;
942 response->
Get( resp );
943 return std::move( *resp );
947 XrdCl::ChunkInfo *resp =
nullptr;
948 response->
Get( resp );
954 XrdSysSemaphore *sem;
955 XrdCl::PageInfo chunk;
956 XrdCl::XRootDStatus status;
959 const XrdCl::URL *pUrl;
962 int64_t pCurrentOffset;
965 std::queue<ChunkHandler*> pChunks;
966 std::string pDataServer;
972 std::shared_ptr<CancellableJob> pDataConnCB;
978 class XRootDSourceZip:
public XRootDSource
984 XRootDSourceZip(
const std::string &filename,
985 const XrdCl::URL *archive,
987 uint8_t parallelChunks,
988 const std::string &ckSumType,
989 const std::vector<std::string> &addcks,
991 XRootDSource( archive, chunkSize, parallelChunks, ckSumType,
993 pFilename( filename ),
994 pZipArchive( new XrdCl::ZipArchive() )
1001 virtual ~XRootDSourceZip()
1012 virtual XrdCl::XRootDStatus Initialize()
1014 using namespace XrdCl;
1015 Log *log = DefaultEnv::GetLog();
1016 log->
Debug( UtilityMsg,
"Opening %s for reading",
1017 pUrl->GetObfuscatedURL().c_str() );
1020 DefaultEnv::GetEnv()->GetString(
"ReadRecovery", value );
1021 pZipArchive->SetProperty(
"ReadRecovery", value );
1027 st = pZipArchive->OpenFile( pFilename );
1031 XrdCl::StatInfo *info = 0;
1032 st = pZipArchive->Stat( info );
1041 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper )
1043 auto st = pCkSumHelper->Initialize();
1044 if( !st.
IsOK() )
return st;
1045 for(
auto cksHelper : pAddCksHelpers )
1048 if( !st.
IsOK() )
return st;
1052 if( ( !pUrl->IsLocalFile() && !pZipArchive->IsSecure() ) ||
1053 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
1055 pZipArchive->GetProperty(
"DataServer", pDataServer );
1064 SetOnDataConnectHandler( pZipArchive );
1066 return XrdCl::XRootDStatus();
1078 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1080 return GetChunkImpl( pZipArchive, ci );
1086 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1087 std::string &checkSumType )
1089 return GetCheckSumImpl( checkSum, checkSumType, pCkSumHelper );
1095 virtual XrdCl::XRootDStatus GetCheckSumImpl( std::string &checkSum,
1096 std::string &checkSumType,
1097 XrdCl::CheckSumHelper *cksHelper )
1100 if( checkSumType ==
"zcrc32" )
1103 auto st = pZipArchive->GetCRC32( pFilename, cksum );
1104 if( !st.
IsOK() )
return st;
1107 ckSum.
Set(
"zcrc32" );
1108 ckSum.
Set(
reinterpret_cast<void*
>( &cksum ),
sizeof( uint32_t ) );
1109 char cksBuffer[265];
1110 ckSum.
Get( cksBuffer, 256 );
1111 checkSum =
"zcrc32:";
1118 env->
GetInt(
"ZipMtlnCksum", useMtlnCksum );
1119 if( useMtlnCksum && pUrl->IsMetalink() )
1122 XrdCl::VirtualRedirector *redirector = registry.
Get( *pUrl );
1123 checkSum = redirector->
GetCheckSum( checkSumType );
1124 if( !checkSum.empty() )
return XrdCl::XRootDStatus();
1128 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && cksHelper && !pContinue )
1129 return cksHelper->
GetCheckSum( checkSum, checkSumType );
1138 std::vector<std::string> GetAddCks()
1140 std::vector<std::string> ret;
1141 for(
auto cksHelper : pAddCksHelpers )
1143 std::string type = cksHelper->
GetType();
1145 GetCheckSumImpl( cks, type, cksHelper );
1146 ret.push_back( cks );
1154 virtual XrdCl::XRootDStatus
GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1156 return XrdCl::XRootDStatus();
1161 XRootDSourceZip(
const XRootDSourceZip &other);
1162 XRootDSourceZip &operator = (
const XRootDSourceZip &other);
1164 const std::string pFilename;
1165 XrdCl::ZipArchive *pZipArchive;
1171 class XRootDSourceDynamic:
public Source
1178 XrdCl::XRootDStatus TryOtherServer()
1180 return pFile->TryOtherServer();
1186 XRootDSourceDynamic(
const XrdCl::URL *url,
1188 const std::string &ckSumType,
1189 const std::vector<std::string> &addcks ):
1190 Source( ckSumType, addcks ),
1191 pUrl( url ), pFile( new XrdCl::
File() ), pCurrentOffset( 0 ),
1192 pChunkSize( chunkSize ), pDone( false ), pUsePgRead( false )
1199 virtual ~XRootDSourceDynamic()
1201 XrdCl::XRootDStatus status = pFile->Close();
1208 virtual XrdCl::XRootDStatus Initialize()
1210 using namespace XrdCl;
1211 Log *log = DefaultEnv::GetLog();
1212 log->
Debug( UtilityMsg,
"Opening %s for reading",
1213 pUrl->GetObfuscatedURL().c_str() );
1216 DefaultEnv::GetEnv()->GetString(
"ReadRecovery", value );
1217 pFile->SetProperty(
"ReadRecovery", value );
1219 XRootDStatus st = pFile->Open( pUrl->GetURL(), OpenFlags::Read );
1223 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper && !pContinue )
1225 auto st = pCkSumHelper->Initialize();
1226 if( !st.
IsOK() )
return st;
1227 for(
auto cksHelper : pAddCksHelpers )
1230 if( !st.
IsOK() )
return st;
1234 if( ( !pUrl->IsLocalFile() && !pFile->IsSecure() ) ||
1235 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
1237 std::string datasrv;
1238 pFile->GetProperty(
"DataServer", datasrv );
1247 return XRootDStatus();
1253 virtual int64_t GetSize()
1261 virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
1263 pCurrentOffset = offset;
1265 return XrdCl::XRootDStatus();
1277 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1282 using namespace XrdCl;
1285 return XRootDStatus( stOK, suDone );
1290 char *buffer =
new char[pChunkSize];
1291 uint32_t bytesRead = 0;
1293 std::vector<uint32_t> cksums;
1294 XRootDStatus st = pUsePgRead
1295 ? pFile->PgRead( pCurrentOffset, pChunkSize, buffer, cksums, bytesRead )
1296 : pFile->Read( pCurrentOffset, pChunkSize, buffer, bytesRead );
1307 return XRootDStatus( stOK, suDone );
1310 if( bytesRead < pChunkSize )
1314 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && !pContinue )
1317 pCkSumHelper->Update( buffer, bytesRead );
1319 for(
auto cksHelper : pAddCksHelpers )
1320 cksHelper->
Update( buffer, bytesRead );
1323 ci = XrdCl::PageInfo( pCurrentOffset, bytesRead, buffer );
1324 pCurrentOffset += bytesRead;
1326 return XRootDStatus( stOK, suContinue );
1332 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1333 std::string &checkSumType )
1335 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
1338 XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
1339 std::string &checkSum,
1340 std::string &checkSumType )
1342 if( pUrl->IsMetalink() )
1345 XrdCl::VirtualRedirector *redirector = registry.
Get( *pUrl );
1346 checkSum = redirector->
GetCheckSum( checkSumType );
1347 if( !checkSum.empty() )
return XrdCl::XRootDStatus();
1350 if( pUrl->IsLocalFile() )
1357 return cksHelper->
GetCheckSum( checkSum, checkSumType );
1362 std::string dataServer; pFile->GetProperty(
"DataServer", dataServer );
1363 std::string lastUrl; pFile->GetProperty(
"LastURL", lastUrl );
1370 std::vector<std::string> GetAddCks()
1372 std::vector<std::string> ret;
1373 for(
auto cksHelper : pAddCksHelpers )
1375 std::string type = cksHelper->
GetType();
1377 GetCheckSumImpl( cksHelper, cks, type );
1378 ret.push_back( cks );
1386 virtual XrdCl::XRootDStatus
GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1388 return ::GetXAttr( *pFile, xattrs );
1392 XRootDSourceDynamic(
const XRootDSourceDynamic &other);
1393 XRootDSourceDynamic &operator = (
const XRootDSourceDynamic &other);
1394 const XrdCl::URL *pUrl;
1396 int64_t pCurrentOffset;
1397 uint32_t pChunkSize;
1405 class XRootDSourceXCp:
public Source
1411 XRootDSourceXCp(
const XrdCl::URL* url, uint32_t chunkSize, uint16_t parallelChunks, int32_t nbSrc, uint64_t blockSize ):
1412 pXCpCtx( 0 ), pUrl( url ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ), pNbSrc( nbSrc ), pBlockSize( blockSize )
1425 virtual XrdCl::XRootDStatus Initialize()
1428 int64_t fileSize = -1;
1430 if( pUrl->IsMetalink() )
1433 XrdCl::VirtualRedirector *redirector = registry.
Get( *pUrl );
1434 fileSize = redirector->
GetSize();
1439 XrdCl::LocationInfo *li = 0;
1440 XrdCl::FileSystem fs( *pUrl );
1442 if( !st.
IsOK() )
return st;
1445 for( itr = li->
Begin(); itr != li->
End(); ++itr)
1447 std::string url =
"root://" + itr->GetAddress() +
"/" + pUrl->GetPath();
1448 pReplicas.push_back( url );
1454 std::stringstream ss;
1455 ss <<
"XCp sources: ";
1457 std::vector<std::string>::iterator itr;
1458 for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
1464 pXCpCtx =
new XrdCl::XCpCtx( pReplicas, pBlockSize, pNbSrc, pChunkSize, pParallelChunks, fileSize );
1466 return pXCpCtx->Initialize();
1472 virtual int64_t GetSize()
1474 return pXCpCtx->GetSize();
1480 virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
1494 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1496 XrdCl::XRootDStatus st;
1499 st = pXCpCtx->GetChunk( ci );
1508 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1509 std::string &checkSumType )
1511 if( pUrl->IsMetalink() )
1514 XrdCl::VirtualRedirector *redirector = registry.
Get( *pUrl );
1515 checkSum = redirector->
GetCheckSum( checkSumType );
1516 if( !checkSum.empty() )
return XrdCl::XRootDStatus();
1519 std::vector<std::string>::iterator itr;
1520 for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
1522 XrdCl::URL url( *itr );
1524 checkSumType, url );
1525 if( st.
IsOK() )
return st;
1534 std::vector<std::string> GetAddCks()
1536 return std::vector<std::string>();
1542 virtual XrdCl::XRootDStatus
GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1544 XrdCl::XRootDStatus st;
1545 std::vector<std::string>::iterator itr = pReplicas.begin();
1546 for( ; itr < pReplicas.end() ; ++itr )
1548 st = ::GetXAttr( *itr, xattrs );
1549 if( st.
IsOK() )
return st;
1557 XrdCl::XCpCtx *pXCpCtx;
1558 const XrdCl::URL *pUrl;
1559 std::vector<std::string> pReplicas;
1560 uint32_t pChunkSize;
1561 uint16_t pParallelChunks;
1563 uint64_t pBlockSize;
1569 class StdOutDestination:
public Destination
1575 StdOutDestination(
const std::string &ckSumType ):
1576 Destination( ckSumType ), pCurrentOffset(0)
1583 virtual ~StdOutDestination()
1590 virtual XrdCl::XRootDStatus Initialize()
1594 ENOTSUP,
"Cannot continue to stdout." );
1597 return pCkSumHelper->Initialize();
1598 return XrdCl::XRootDStatus();
1604 virtual XrdCl::XRootDStatus Finalize()
1606 return XrdCl::XRootDStatus();
1615 virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
1617 using namespace XrdCl;
1618 Log *log = DefaultEnv::GetLog();
1622 log->
Error( UtilityMsg,
"Got out-of-bounds chunk, expected offset:"
1623 " %llu, got %llu", (
unsigned long long) pCurrentOffset, (
unsigned long long) ci.
GetOffset() );
1624 return XRootDStatus( stError, errInternal );
1632 wr =
write( 1, cursor, length );
1635 log->
Debug( UtilityMsg,
"Unable to write to stdout: %s",
1638 return XRootDStatus( stError, errOSError, errno );
1640 pCurrentOffset += wr;
1649 return XRootDStatus();
1655 virtual XrdCl::XRootDStatus Flush()
1657 return XrdCl::XRootDStatus();
1663 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1664 std::string &checkSumType )
1667 return pCkSumHelper->GetCheckSum( checkSum, checkSumType );
1674 virtual XrdCl::XRootDStatus
SetXAttr(
const std::vector<XrdCl::xattr_t> &xattrs )
1676 return XrdCl::XRootDStatus();
1682 virtual int64_t GetSize()
1688 StdOutDestination(
const StdOutDestination &other);
1689 StdOutDestination &operator = (
const StdOutDestination &other);
1690 uint64_t pCurrentOffset;
1696 class XRootDDestination:
public Destination
1702 XRootDDestination(
const XrdCl::URL &url, uint8_t parallelChunks,
1703 const std::string &ckSumType,
const XrdCl::ClassicCopyJob &cpjob ):
1704 Destination( ckSumType ),
1705 pUrl( url ), pFile( new XrdCl::
File( XrdCl::
File::DisableVirtRedirect ) ),
1706 pParallel( parallelChunks ), pSize( -1 ), pUsePgWrt( false ), cpjob( cpjob )
1713 virtual ~XRootDDestination()
1725 if( !cptarget.empty() )
1727 XrdCl::FileSystem fs(
"file://localhost" );
1728 XrdCl::XRootDStatus st = fs.Rm( cptarget );
1738 if( pUrl.IsLocalFile() && pPosc && !cpjob.GetResult().IsOK() )
1740 XrdCl::FileSystem fs( pUrl );
1741 XrdCl::XRootDStatus st = fs.Rm( pUrl.GetPath() );
1744 " on failure: %s", st.
ToString().c_str() );
1751 virtual XrdCl::XRootDStatus Initialize()
1753 using namespace XrdCl;
1754 Log *log = DefaultEnv::GetLog();
1755 log->
Debug( UtilityMsg,
"Opening %s for writing",
1756 pUrl.GetObfuscatedURL().c_str() );
1759 DefaultEnv::GetEnv()->GetString(
"WriteRecovery", value );
1760 pFile->SetProperty(
"WriteRecovery", value );
1762 OpenFlags::Flags flags = OpenFlags::Update;
1764 flags |= OpenFlags::Delete;
1765 else if( !pContinue )
1766 flags |= OpenFlags::New;
1769 flags |= OpenFlags::POSC;
1772 flags |= OpenFlags::Force;
1775 flags |= OpenFlags::MakePath;
1777 Access::Mode mode = Access::UR|Access::UW|Access::GR|Access::OR;
1779 XrdCl::XRootDStatus st = pFile->Open( pUrl.GetURL(), flags, mode );
1783 if( ( !pUrl.IsLocalFile() && !pFile->IsSecure() ) ||
1784 ( pUrl.IsLocalFile() && pUrl.IsMetalink() ) )
1786 std::string datasrv;
1787 pFile->GetProperty(
"DataServer", datasrv );
1798 if( !cptarget.empty() )
1800 std::string targeturl;
1801 pFile->GetProperty(
"LastURL", targeturl );
1802 targeturl = URL( targeturl ).GetLocation();
1803 if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
1804 log->
Warning( UtilityMsg,
"Could not create cp-target symlink: %s",
1807 log->
Info( UtilityMsg,
"Created cp-target symlink: %s -> %s",
1808 cptarget.c_str(), targeturl.c_str() );
1812 st = pFile->Stat(
false, info );
1818 if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
1819 return pCkSumHelper->Initialize();
1821 return XRootDStatus();
1827 virtual XrdCl::XRootDStatus Finalize()
1829 return pFile->Close();
1838 virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
1840 using namespace XrdCl;
1841 if( !pFile->IsOpen() )
1844 return XRootDStatus( stError, errUninitialized );
1850 if( pChunks.size() < pParallel )
1851 return QueueChunk( std::move( ci ) );
1857 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
1860 delete [] (
char*)ch->chunk.GetBuffer();
1861 if( !ch->status.IsOK() )
1863 Log *log = DefaultEnv::GetLog();
1864 log->
Debug( UtilityMsg,
"Unable write %d bytes at %llu from %s: %s",
1865 ch->chunk.GetLength(), (
unsigned long long) ch->chunk.GetOffset(),
1866 pUrl.GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
1874 return CheckIfRetriable( ch->status );
1877 return QueueChunk( std::move( ci ) );
1883 virtual int64_t GetSize()
1891 void CleanUpChunks()
1893 while( !pChunks.empty() )
1895 ChunkHandler *ch = pChunks.front();
1898 delete [] (
char *)ch->chunk.GetBuffer();
1906 XrdCl::XRootDStatus QueueChunk( XrdCl::PageInfo &&ci )
1910 if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
1913 ChunkHandler *ch =
new ChunkHandler( std::move( ci ) );
1914 XrdCl::XRootDStatus st;
1916 ? pFile->PgWrite(ch->chunk.GetOffset(), ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch->chunk.GetCksums(), ch)
1917 : pFile->Write( ch->chunk.GetOffset(), ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch );
1921 delete [] (
char*)ch->chunk.GetBuffer();
1926 return XrdCl::XRootDStatus();
1932 virtual XrdCl::XRootDStatus Flush()
1934 XrdCl::XRootDStatus st;
1935 while( !pChunks.empty() )
1937 ChunkHandler *ch = pChunks.front();
1940 if( !ch->status.IsOK() )
1946 st = CheckIfRetriable( ch->status );
1948 delete [] (
char *)ch->chunk.GetBuffer();
1957 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1958 std::string &checkSumType )
1960 if( pUrl.IsLocalFile() )
1967 return pCkSumHelper->GetCheckSum( checkSum, checkSumType );
1972 std::string lastUrl; pFile->GetProperty(
"LastURL", lastUrl );
1974 XrdCl::URL( lastUrl ) );
1980 virtual XrdCl::XRootDStatus
SetXAttr(
const std::vector<XrdCl::xattr_t> &xattrs )
1982 return ::SetXAttr( *pFile, xattrs );
1988 const std::string& GetLastURL()
const
1996 const std::string& GetWrtRecoveryRedir()
const
1998 return pWrtRecoveryRedir;
2002 XRootDDestination(
const XRootDDestination &other);
2003 XRootDDestination &operator = (
const XRootDDestination &other);
2008 class ChunkHandler:
public XrdCl::ResponseHandler
2011 ChunkHandler( XrdCl::PageInfo &&ci ):
2012 sem( new XrdSysSemaphore(0) ),
2013 chunk(std::move( ci ) ) {}
2014 virtual ~ChunkHandler() {
delete sem; }
2015 virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
2016 XrdCl::AnyObject * )
2018 this->status = *statusval;
2023 XrdSysSemaphore *sem;
2024 XrdCl::PageInfo chunk;
2025 XrdCl::XRootDStatus status;
2028 inline XrdCl::XRootDStatus CheckIfRetriable( XrdCl::XRootDStatus &status )
2030 if( status.
IsOK() )
return status;
2037 if( pFile->GetProperty(
"WrtRecoveryRedir", value ) )
2039 pWrtRecoveryRedir = value;
2040 if( pFile->GetProperty(
"LastURL", value ) ) pLastURL = value;
2047 const XrdCl::URL pUrl;
2050 std::queue<ChunkHandler *> pChunks;
2053 std::string pWrtRecoveryRedir;
2054 std::string pLastURL;
2056 const XrdCl::ClassicCopyJob &cpjob;
2062 class XRootDZipDestination:
public Destination
2068 XRootDZipDestination(
const XrdCl::URL &url,
const std::string &fn,
2069 int64_t size, uint8_t parallelChunks, XrdCl::ClassicCopyJob &cpjob ):
2070 Destination(
"zcrc32" ),
2071 pUrl( url ), pFilename( fn ), pZip( new XrdCl::ZipArchive() ),
2072 pParallel( parallelChunks ), pSize( size ), cpjob( cpjob )
2079 virtual ~XRootDZipDestination()
2088 if( pUrl.IsLocalFile() && pPosc && !cpjob.GetResult().IsOK() )
2090 XrdCl::FileSystem fs( pUrl );
2091 XrdCl::XRootDStatus st = fs.Rm( pUrl.GetPath() );
2096 " on failure: %s", st.
ToString().c_str() );
2104 virtual XrdCl::XRootDStatus Initialize()
2106 using namespace XrdCl;
2107 Log *log = DefaultEnv::GetLog();
2108 log->
Debug( UtilityMsg,
"Opening %s for writing",
2109 pUrl.GetObfuscatedURL().c_str() );
2112 DefaultEnv::GetEnv()->GetString(
"WriteRecovery", value );
2113 pZip->SetProperty(
"WriteRecovery", value );
2115 OpenFlags::Flags flags = OpenFlags::Update;
2117 FileSystem fs( pUrl );
2118 StatInfo *info =
nullptr;
2119 auto st = fs.Stat( pUrl.GetPath(), info );
2121 flags |= OpenFlags::New;
2124 flags |= OpenFlags::POSC;
2127 flags |= OpenFlags::Force;
2130 flags |= OpenFlags::MakePath;
2138 if( !cptarget.empty() )
2140 std::string targeturl;
2141 pZip->GetProperty(
"LastURL", targeturl );
2142 if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
2143 log->
Warning( UtilityMsg,
"Could not create cp-target symlink: %s",
2151 return pCkSumHelper->Initialize();
2157 virtual XrdCl::XRootDStatus Finalize()
2160 auto st = pCkSumHelper->GetRawCheckSum(
"zcrc32", crc32 );
2161 if( !st.
IsOK() )
return st;
2162 pZip->UpdateMetadata( crc32 );
2173 virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
2175 using namespace XrdCl;
2180 if( pChunks.size() < pParallel )
2181 return QueueChunk( std::move( ci ) );
2187 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
2190 delete [] (
char*)ch->chunk.GetBuffer();
2191 if( !ch->status.IsOK() )
2193 Log *log = DefaultEnv::GetLog();
2194 log->
Debug( UtilityMsg,
"Unable write %d bytes at %llu from %s: %s",
2195 ch->chunk.GetLength(), (
unsigned long long) ch->chunk.GetOffset(),
2196 pUrl.GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
2203 return CheckIfRetriable( ch->status );
2206 return QueueChunk( std::move( ci ) );
2212 virtual int64_t GetSize()
2220 void CleanUpChunks()
2222 while( !pChunks.empty() )
2224 ChunkHandler *ch = pChunks.front();
2227 delete [] (
char *)ch->chunk.GetBuffer();
2235 XrdCl::XRootDStatus QueueChunk( XrdCl::PageInfo &&ci )
2241 ChunkHandler *ch =
new ChunkHandler( std::move( ci ) );
2242 XrdCl::XRootDStatus st;
2249 st = pZip->Write( ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch );
2253 delete [] (
char*)ch->chunk.GetBuffer();
2258 return XrdCl::XRootDStatus();
2264 virtual XrdCl::XRootDStatus Flush()
2266 XrdCl::XRootDStatus st;
2267 while( !pChunks.empty() )
2269 ChunkHandler *ch = pChunks.front();
2272 if( !ch->status.IsOK() )
2278 st = CheckIfRetriable( ch->status );
2280 delete [] (
char *)ch->chunk.GetBuffer();
2289 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
2290 std::string &checkSumType )
2298 virtual XrdCl::XRootDStatus
SetXAttr(
const std::vector<XrdCl::xattr_t> &xattrs )
2306 const std::string& GetLastURL()
const
2314 const std::string& GetWrtRecoveryRedir()
const
2316 return pWrtRecoveryRedir;
2320 XRootDZipDestination(
const XRootDDestination &other);
2321 XRootDZipDestination &operator = (
const XRootDDestination &other);
2326 class ChunkHandler:
public XrdCl::ResponseHandler
2329 ChunkHandler( XrdCl::PageInfo &&ci ):
2330 sem( new XrdSysSemaphore(0) ),
2331 chunk( std::move( ci ) ) {}
2332 virtual ~ChunkHandler() {
delete sem; }
2333 virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
2334 XrdCl::AnyObject * )
2336 this->status = *statusval;
2341 XrdSysSemaphore *sem;
2342 XrdCl::PageInfo chunk;
2343 XrdCl::XRootDStatus status;
2346 inline XrdCl::XRootDStatus CheckIfRetriable( XrdCl::XRootDStatus &status )
2348 if( status.
IsOK() )
return status;
2355 if( pZip->GetProperty(
"WrtRecoveryRedir", value ) )
2357 pWrtRecoveryRedir = value;
2358 if( pZip->GetProperty(
"LastURL", value ) ) pLastURL = value;
2365 const XrdCl::URL pUrl;
2366 std::string pFilename;
2367 XrdCl::ZipArchive *pZip;
2369 std::queue<ChunkHandler *> pChunks;
2372 std::string pWrtRecoveryRedir;
2373 std::string pLastURL;
2374 XrdCl::ClassicCopyJob &cpjob;
2383 using namespace std::chrono;
2384 auto since_epoch = high_resolution_clock::now().time_since_epoch();
2385 return duration_cast<nanoseconds>( since_epoch );
2393 return sec * 1000000000;
2401#if __cplusplus >= 201103L
2402 using namespace std::chrono;
2403 std::this_thread::sleep_for( nanoseconds( nsec ) );
2406 req.tv_sec = nsec /
to_nsec( 1 );
2407 req.tv_nsec = nsec %
to_nsec( 1 );
2408 nanosleep( &req, 0 );
2420 CopyJob( jobId, jobProperties, jobResults )
2434 std::string checkSumMode;
2435 std::string checkSumType;
2436 std::string checkSumPreset;
2437 std::string zipSource;
2438 uint16_t parallelChunks;
2441 bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr,
2442 rmOnBadCksum, continue_, zipappend, doserver;
2443 int32_t nbXcpSources;
2445 long long xRateThreshold;
2447 std::vector<std::string> addcksums;
2451 pProperties->Get(
"checkSumPreset", checkSumPreset );
2452 pProperties->Get(
"parallelChunks", parallelChunks );
2458 pProperties->Get(
"dynamicSource", dynamicSource );
2462 pProperties->Get(
"preserveXAttr", preserveXAttr );
2464 pProperties->Get(
"xrateThreshold", xRateThreshold );
2478 if( force && continue_ )
2480 "Invalid argument combination: continue + force." );
2482 if( zipappend && ( continue_ || force ) )
2484 "Invalid argument combination: ( continue | force ) + zip-append." );
2489 std::unique_ptr<timer_sec_t> cptimer;
2490 if( cpTimeout ) cptimer.reset(
new timer_sec_t() );
2495 if( rmOnBadCksum ) posc =
true;
2500 if( checkSumType ==
"auto" )
2503 if( checkSumType.empty() )
2506 log->
Info(
UtilityMsg,
"Using inferred checksum type: %s.", checkSumType.c_str() );
2509 if( cptimer && cptimer->elapsed() > cpTimeout )
2515 std::unique_ptr<Source> src;
2517 src.reset(
new XRootDSourceXCp( &
GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) );
2519 src.reset(
new XRootDSourceZip( zipSource, &
GetSource(), chunkSize, parallelChunks,
2520 checkSumType, addcksums , doserver) );
2521 else if(
GetSource().GetProtocol() ==
"stdio" )
2522 src.reset(
new StdInSource( checkSumType, chunkSize, addcksums ) );
2526 src.reset(
new XRootDSourceDynamic( &
GetSource(), chunkSize, checkSumType, addcksums ) );
2528 src.reset(
new XRootDSource( &
GetSource(), chunkSize, parallelChunks, checkSumType, addcksums, doserver ) );
2532 if( !st.
IsOK() )
return SourceError( st );
2533 uint64_t size = src->GetSize() >= 0 ? src->GetSize() : 0;
2535 if( cptimer && cptimer->elapsed() > cpTimeout )
2538 std::unique_ptr<Destination> dest;
2541 if(
GetTarget().GetProtocol() ==
"stdio" )
2542 dest.reset(
new StdOutDestination( checkSumType ) );
2543 else if( zipappend )
2546 size_t pos = fn.rfind(
'/' );
2547 if( pos != std::string::npos )
2548 fn = fn.substr( pos + 1 );
2549 int64_t size = src->GetSize();
2550 dest.reset(
new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, *
this ) );
2557 if( src->GetSize() >= 0 )
2560 std::ostringstream o; o << src->GetSize();
2561 params[
"oss.asize"] = o.str();
2565 dest.reset(
new XRootDDestination( newDestUrl, parallelChunks, checkSumType, *
this ) );
2568 dest->SetForce( force );
2569 dest->SetPOSC( posc );
2570 dest->SetCoerce( coerce );
2571 dest->SetMakeDir( makeDir );
2572 dest->SetContinue( continue_ );
2573 st = dest->Initialize();
2574 if( !st.
IsOK() )
return DestinationError( st );
2576 if( cptimer && cptimer->elapsed() > cpTimeout )
2584 size -= dest->GetSize();
2586 if( !st.
IsOK() )
return SetResult( st );
2590 uint64_t total_processed = 0;
2591 uint64_t processed = 0;
2593 uint16_t threshold_interval = parallelChunks;
2594 bool threshold_draining =
false;
2595 timer_nsec_t threshold_timer;
2598 st = src->GetChunk( pageInfo );
2600 return SourceError( st);
2605 if( cptimer && cptimer->elapsed() > cpTimeout )
2610 auto elapsed = (
time_nsec() - start ).count();
2611 double transferred = total_processed + pageInfo.
GetLength();
2612 double expected = double( xRate ) /
to_nsec( 1 ) * elapsed;
2618 transferred > expected )
2620 auto nsec = ( transferred / xRate *
to_nsec( 1 ) ) - elapsed;
2625 if( xRateThreshold )
2627 auto elapsed = threshold_timer.elapsed();
2628 double transferred = processed + pageInfo.
GetLength();
2629 double expected = double( xRateThreshold ) /
to_nsec( 1 ) * elapsed;
2635 transferred < expected &&
2636 threshold_interval == 0 )
2638 if( !threshold_draining )
2641 " trying different source!" );
2644 "The transfer rate dropped below "
2645 "requested threshold!" );
2646 threshold_draining =
true;
2652 threshold_timer.reset();
2653 threshold_interval = parallelChunks;
2654 threshold_draining =
false;
2658 threshold_interval = threshold_interval > 0 ? threshold_interval - 1 : parallelChunks;
2661 total_processed += pageInfo.
GetLength();
2664 st = dest->PutChunk( std::move( pageInfo ) );
2669 pResults->Set(
"LastURL", dest->GetLastURL() );
2670 pResults->Set(
"WrtRecoveryRedir", dest->GetWrtRecoveryRedir() );
2671 return SetResult( st );
2673 return DestinationError( st );
2686 return DestinationError( st );
2693 std::vector<xattr_t> xattrs;
2694 st = src->GetXAttr( xattrs );
2695 if( !st.
IsOK() )
return SourceError( st );
2696 st = dest->SetXAttr( xattrs );
2697 if( !st.
IsOK() )
return DestinationError( st );
2704 if( src->GetSize() >= 0 && size != total_processed )
2706 log->
Error(
UtilityMsg,
"The declared source size is %llu bytes, but "
2707 "received %llu bytes.", (
unsigned long long) size, (
unsigned long long) total_processed );
2710 pResults->Set(
"size", total_processed );
2715 st = dest->Finalize();
2717 return DestinationError( st );
2722 if( checkSumMode !=
"none" )
2725 checkSumMode.c_str() );
2726 std::string sourceCheckSum;
2727 std::string targetCheckSum;
2729 if( cptimer && cptimer->elapsed() > cpTimeout )
2735 timeval oStart, oEnd;
2738 if( checkSumMode ==
"end2end" || checkSumMode ==
"source" ||
2739 !checkSumPreset.empty() )
2741 gettimeofday( &oStart, 0 );
2742 if( !checkSumPreset.empty() )
2744 sourceCheckSum = checkSumType +
":";
2750 st = src->GetCheckSum( sourceCheckSum, checkSumType );
2752 gettimeofday( &oEnd, 0 );
2755 return SourceError( st );
2757 pResults->Set(
"sourceCheckSum", sourceCheckSum );
2760 if( !addcksums.empty() )
2761 pResults->Set(
"additionalCkeckSum", src->GetAddCks() );
2763 if( cptimer && cptimer->elapsed() > cpTimeout )
2769 timeval tStart, tEnd;
2771 if( checkSumMode ==
"end2end" || checkSumMode ==
"target" )
2773 gettimeofday( &tStart, 0 );
2774 st = dest->GetCheckSum( targetCheckSum, checkSumType );
2776 return DestinationError( st );
2777 gettimeofday( &tEnd, 0 );
2778 pResults->Set(
"targetCheckSum", targetCheckSum );
2781 if( cptimer && cptimer->elapsed() > cpTimeout )
2787 auto sanitize_cksum = [](
char c )
2790 if( std::isalpha( c ) )
return std::tolower( c, loc );
2794 std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
2795 sourceCheckSum.begin(), sanitize_cksum );
2797 std::transform( targetCheckSum.begin(), targetCheckSum.end(),
2798 targetCheckSum.begin(), sanitize_cksum );
2803 if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
2806 if( sourceCheckSum == targetCheckSum )
2815 i.
cksum = sourceCheckSum;
2831 log->
Info(
UtilityMsg,
"Target file removed due to bad checksum!" );
2834 st = dest->Finalize();
std::chrono::nanoseconds time_nsec()
long long to_nsec(long long sec)
void sleep_nsec(long long nsec)
const char * XrdSysE2T(int errcode)
int Set(const char *csName)
int Get(char *Buff, int Blen)
void Get(Type &object)
Retrieve the object being held.
XRootDStatus Initialize()
Initialize.
const std::string & GetType()
XRootDStatus GetCheckSum(std::string &checkSum, std::string &checkSumType)
void Update(const void *buffer, uint32_t size)
ClassicCopyJob(uint16_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
virtual XRootDStatus Run(CopyProgressHandler *progress=0)
const URL & GetSource() const
Get source.
CopyJob(uint16_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
Constructor.
const URL & GetTarget() const
Get target.
PropertyList * pProperties
Interface for copy progress notification.
virtual void JobProgress(uint16_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal)
virtual bool ShouldCancel(uint16_t jobNum)
Determine whether the job should be canceled.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
bool GetInt(const std::string &key, int &value)
Send file/filesystem queries to an XRootD cluster.
XRootDStatus Rm(const std::string &path, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
XRootDStatus ListXAttr(ResponseHandler *handler, uint16_t timeout=0)
XRootDStatus SetXAttr(const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
Iterator Begin()
Get the location begin iterator.
LocationList::iterator Iterator
Iterator over locations.
Iterator End()
Get the location end iterator.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Info(uint64_t topic, const char *format,...)
Print an info.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
An abstract class to describe the client-side monitoring plugin interface.
@ EvCheckSum
CheckSumInfo: File checksummed.
virtual void Event(EventCode evCode, void *evData)=0
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
A key-value pair map storing both keys and values as strings.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
uint64_t GetSize() const
Get size (in bytes)
const std::string & GetPath() const
Get the path.
std::map< std::string, std::string > ParamsMap
void SetParams(const std::string ¶ms)
Set params.
const ParamsMap & GetParams() const
Get the URL params.
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
static XRootDStatus GetLocalCheckSum(std::string &checkSum, const std::string &checkSumType, const std::string &path)
Get a checksum from local file.
static bool HasXAttr(const XrdCl::URL &url)
static XRootDStatus GetRemoteCheckSum(std::string &checkSum, const std::string &checkSumType, const URL &url)
Get a checksum from a remote xrootd server.
static bool HasPgRW(const XrdCl::URL &url)
virtual long long GetSize() const =0
virtual std::string GetCheckSum(const std::string &type) const =0
virtual const std::vector< std::string > & GetReplicas()=0
Returns a vector with replicas as given in the meatlink file.
GetXAttrImpl< false > GetXAttr(Ctx< File > file, Arg< std::string > name)
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, uint16_t timeout=0)
Factory for creating CloseFileImpl objects.
const char *const DefaultCpTarget
const uint16_t errOperationExpired
const uint16_t errNotImplemented
Operation is not implemented.
const uint16_t stError
An error occurred that could potentially be retried.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
const uint16_t errDataError
data is corrupted
const int DefaultSubStreamsPerChannel
const int DefaultCpUsePgWrtRd
SetXAttrImpl< false > SetXAttr(Ctx< File > file, Arg< std::string > name, Arg< std::string > value)
const uint64_t UtilityMsg
const uint16_t errInvalidArgs
std::tuple< std::string, std::string > xattr_t
Extended attribute key - value pair.
const uint16_t errNotSupported
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errCheckSumError
const uint16_t errThresholdExceeded
const uint16_t errOperationInterrupted
const uint16_t errNoMoreReplicas
No more replicas to try.
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
const int DefaultZipMtlnCksum
uint64_t GetOffset() const
Get the offset.
uint32_t GetLength() const
Get the data length.
void * GetBuffer()
Get the buffer.
Describe a checksum event.
TransferInfo transfer
The transfer in question.
uint64_t tTime
Microseconds to obtain cksum from target.
bool isOK
True if checksum matched, false otherwise.
std::string cksum
Checksum as "type:value".
uint64_t oTime
Microseconds to obtain cksum from origin.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
@ Read
Open only for reading.
@ Write
Open only for writing.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.
uint32_t errNo
Errno, if any.