XRootD
Loading...
Searching...
No Matches
XrdPfc::File Class Reference

#include <XrdPfcFile.hh>

+ Collaboration diagram for XrdPfc::File:

Public Member Functions

void AddIO (IO *io)
 
void BlockRemovedFromWriteQ (Block *)
 Handle removal of a block from Cache's write queue.
 
void BlocksRemovedFromWriteQ (std::list< Block * > &)
 Handle removal of a set of blocks from Cache's write queue.
 
int dec_ref_cnt ()
 
bool FinalizeSyncBeforeExit ()
 Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
 
int Fstat (struct stat &sbuff)
 
int get_ref_cnt ()
 
size_t GetAccessCnt () const
 
int GetBlockSize () const
 
long long GetFileSize () const
 
const Info::AStatGetLastAccessStats () const
 
const std::string & GetLocalPath () const
 
XrdSysErrorGetLog () const
 
int GetNBlocks () const
 
int GetNDownloadedBlocks () const
 
int GetPrefetchCountOnIO (IO *io)
 
long long GetPrefetchedBytes () const
 
float GetPrefetchScore () const
 
std::string GetRemoteLocations () const
 
XrdSysTraceGetTrace () const
 
int inc_ref_cnt ()
 
long long initiate_emergency_shutdown ()
 
bool ioActive (IO *io)
 Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
 
void ioUpdated (IO *io)
 Notification from IO that it has been updated (remote open).
 
bool is_in_emergency_shutdown ()
 
const char * lPath () const
 Log path.
 
void Prefetch ()
 
int Read (IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
 Normal read.
 
int ReadV (IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
 Vector read.
 
const StatsRefStats () const
 
void RemoveIO (IO *io)
 
void RequestSyncOfDetachStats ()
 Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
 
void StopPrefetchingOnIO (IO *io)
 
void Sync ()
 Sync file cache inf o and output data with disk.
 
void WriteBlockToDisk (Block *b)
 

Static Public Member Functions

static FileFileOpen (const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
 Static constructor that also does Open. Returns null ptr if Open fails.
 

Friends

class BlockResponseHandler
 
class Cache
 
class DirectResponseHandler
 

Detailed Description

Definition at line 202 of file XrdPfcFile.hh.

Member Function Documentation

◆ AddIO()

void File::AddIO ( IO * io)

Definition at line 349 of file XrdPfcFile.cc.

350{
351 // Called from Cache::GetFile() when a new IO asks for the file.
352
353 TRACEF(Debug, "AddIO() io = " << (void*)io);
354
355 time_t now = time(0);
356 std::string loc(io->GetLocation());
357
358 m_state_cond.Lock();
359
360 IoSet_i mi = m_io_set.find(io);
361
362 if (mi == m_io_set.end())
363 {
364 m_io_set.insert(io);
365 io->m_attach_time = now;
366 m_delta_stats.IoAttach();
367
368 insert_remote_location(loc);
369
370 if (m_prefetch_state == kStopped)
371 {
372 m_prefetch_state = kOn;
373 cache()->RegisterPrefetchFile(this);
374 }
375 }
376 else
377 {
378 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
379 }
380
381 m_state_cond.UnLock();
382}
#define TRACEF(act, x)
bool Debug
const char * GetLocation()
Definition XrdPfcIO.hh:44

References Debug, Error, XrdPfc::IO::GetLocation(), and TRACEF.

Referenced by XrdPfc::Cache::GetFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ BlockRemovedFromWriteQ()

void File::BlockRemovedFromWriteQ ( Block * b)

Handle removal of a block from Cache's write queue.

Definition at line 211 of file XrdPfcFile.cc.

212{
213 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
214
215 XrdSysCondVarHelper _lck(m_state_cond);
216 dec_ref_count(b);
217}
long long m_offset

References XrdPfc::Block::m_offset, and TRACEF.

◆ BlocksRemovedFromWriteQ()

void File::BlocksRemovedFromWriteQ ( std::list< Block * > & blocks)

Handle removal of a set of blocks from Cache's write queue.

Definition at line 219 of file XrdPfcFile.cc.

220{
221 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
222
223 XrdSysCondVarHelper _lck(m_state_cond);
224
225 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
226 {
227 dec_ref_count(*i);
228 }
229}

References TRACEF.

Referenced by XrdPfc::Cache::RemoveWriteQEntriesFor().

+ Here is the caller graph for this function:

◆ dec_ref_cnt()

int XrdPfc::File::dec_ref_cnt ( )
inline

Definition at line 288 of file XrdPfcFile.hh.

288{ return --m_ref_cnt; }

◆ FileOpen()

File * File::FileOpen ( const std::string & path,
long long offset,
long long fileSize,
XrdOucCacheIO * inputIO )
static

Static constructor that also does Open. Returns null ptr if Open fails.

Definition at line 141 of file XrdPfcFile.cc.

142{
143 File *file = new File(path, offset, fileSize);
144 if ( ! file->Open(inputIO))
145 {
146 delete file;
147 file = 0;
148 }
149 return file;
150}
XrdOucString File

References File.

Referenced by XrdPfc::Cache::GetFile().

+ Here is the caller graph for this function:

◆ FinalizeSyncBeforeExit()

bool File::FinalizeSyncBeforeExit ( )

Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.

Definition at line 325 of file XrdPfcFile.cc.

326{
327 // Returns true if sync is required.
328 // This method is called after corresponding IO is detached from PosixCache.
329
330 XrdSysCondVarHelper _lck(m_state_cond);
331 if ( ! m_in_shutdown)
332 {
333 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
334 {
335 report_and_merge_delta_stats();
336 m_cfi.WriteIOStatDetach(m_stats);
337 m_detach_time_logged = true;
338 m_in_sync = true;
339 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
340 return true;
341 }
342 }
343 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
344 return false;
345}

References Debug, and TRACEF.

◆ Fstat()

int File::Fstat ( struct stat & sbuff)

Definition at line 632 of file XrdPfcFile.cc.

633{
634 // Stat on an open file.
635 // Corrects size to actual full size of the file.
636 // Sets atime to 0 if the file is only partially downloaded, in accordance
637 // with pfc.onlyifcached settings.
638 // Called from IO::Fstat() and Cache::Stat() when the file is active.
639 // Returns 0 on success, -errno on error.
640
641 int res;
642
643 if ((res = m_data_file->Fstat(&sbuff))) return res;
644
645 sbuff.st_size = m_file_size;
646
647 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
648 if ( ! is_cached)
649 sbuff.st_atime = 0;
650
651 return 0;
652}

References stat.

Referenced by XrdPfc::Cache::ConsiderCached(), and XrdPfc::Cache::Stat().

+ Here is the caller graph for this function:

◆ get_ref_cnt()

int XrdPfc::File::get_ref_cnt ( )
inline

Definition at line 286 of file XrdPfcFile.hh.

286{ return m_ref_cnt; }

◆ GetAccessCnt()

size_t XrdPfc::File::GetAccessCnt ( ) const
inline

Definition at line 276 of file XrdPfcFile.hh.

276{ return m_cfi.GetAccessCnt(); }

◆ GetBlockSize()

int XrdPfc::File::GetBlockSize ( ) const
inline

Definition at line 277 of file XrdPfcFile.hh.

277{ return m_cfi.GetBufferSize(); }

◆ GetFileSize()

long long XrdPfc::File::GetFileSize ( ) const
inline

Definition at line 267 of file XrdPfcFile.hh.

267{ return m_file_size; }

◆ GetLastAccessStats()

const Info::AStat * XrdPfc::File::GetLastAccessStats ( ) const
inline

Definition at line 275 of file XrdPfcFile.hh.

275{ return m_cfi.GetLastAccessStats(); }

◆ GetLocalPath()

const std::string & XrdPfc::File::GetLocalPath ( ) const
inline

Definition at line 262 of file XrdPfcFile.hh.

262{ return m_filename; }

Referenced by XrdPfc::Cache::AddWriteTask(), and XrdPfc::Cache::ReleaseFile().

+ Here is the caller graph for this function:

◆ GetLog()

XrdSysError * File::GetLog ( ) const

Definition at line 1702 of file XrdPfcFile.cc.

1703{
1704 return Cache::TheOne().GetLog();
1705}
XrdSysError * GetLog() const
Definition XrdPfc.hh:294
static const Cache & TheOne()
Definition XrdPfc.cc:133

References XrdPfc::Cache::GetLog(), and XrdPfc::Cache::TheOne().

+ Here is the call graph for this function:

◆ GetNBlocks()

int XrdPfc::File::GetNBlocks ( ) const
inline

Definition at line 278 of file XrdPfcFile.hh.

278{ return m_cfi.GetNBlocks(); }

◆ GetNDownloadedBlocks()

int XrdPfc::File::GetNDownloadedBlocks ( ) const
inline

Definition at line 279 of file XrdPfcFile.hh.

279{ return m_cfi.GetNDownloadedBlocks(); }

◆ GetPrefetchCountOnIO()

int XrdPfc::File::GetPrefetchCountOnIO ( IO * io)

◆ GetPrefetchedBytes()

long long XrdPfc::File::GetPrefetchedBytes ( ) const
inline

Definition at line 280 of file XrdPfcFile.hh.

280{ return m_prefetch_bytes; }

◆ GetPrefetchScore()

float File::GetPrefetchScore ( ) const

Definition at line 1697 of file XrdPfcFile.cc.

1698{
1699 return m_prefetch_score;
1700}

◆ GetRemoteLocations()

std::string File::GetRemoteLocations ( ) const

Definition at line 1721 of file XrdPfcFile.cc.

1722{
1723 std::string s;
1724 if ( ! m_remote_locations.empty())
1725 {
1726 size_t sl = 0;
1727 int nl = 0;
1728 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1729 {
1730 sl += i->size();
1731 }
1732 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1733 s = '[';
1734 int j = 1;
1735 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1736 {
1737 s += '"'; s += *i; s += '"';
1738 if (j < nl) s += ',';
1739 }
1740 s += ']';
1741 }
1742 else
1743 {
1744 s = "[]";
1745 }
1746 return s;
1747}

◆ GetTrace()

XrdSysTrace * File::GetTrace ( ) const

Definition at line 1707 of file XrdPfcFile.cc.

1708{
1709 return Cache::TheOne().GetTrace();
1710}
XrdSysTrace * GetTrace() const
Definition XrdPfc.hh:295

References XrdPfc::Cache::GetTrace(), and XrdPfc::Cache::TheOne().

+ Here is the call graph for this function:

◆ inc_ref_cnt()

int XrdPfc::File::inc_ref_cnt ( )
inline

Definition at line 287 of file XrdPfcFile.hh.

287{ return ++m_ref_cnt; }

◆ initiate_emergency_shutdown()

long long File::initiate_emergency_shutdown ( )

Definition at line 154 of file XrdPfcFile.cc.

155{
156 // Called from Cache::Unlink() when the file is currently open.
157 // Cache::Unlink is also called on FSync error and when wrong number of bytes
158 // is received from a remote read.
159 //
160 // From this point onward the file will not be written to, cinfo file will
161 // not be updated, and all new read requests will return -ENOENT.
162 //
163 // File's entry in the Cache's active map is set to nullptr and will be
164 // removed from there shortly, in any case, well before this File object
165 // shuts down. Cache::Unlink() also reports the appropriate purge event.
166
167 XrdSysCondVarHelper _lck(m_state_cond);
168
169 m_in_shutdown = true;
170
171 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
172 {
173 m_prefetch_state = kStopped;
174 cache()->DeRegisterPrefetchFile(this);
175 }
176
177 report_and_merge_delta_stats();
178
179 return m_st_blocks;
180}

Referenced by XrdPfc::Cache::UnlinkFile().

+ Here is the caller graph for this function:

◆ ioActive()

bool File::ioActive ( IO * io)

Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()

Definition at line 242 of file XrdPfcFile.cc.

243{
244 // Returns true if delay is needed.
245
246 TRACEF(Debug, "ioActive start for io " << io);
247
248 std::string loc(io->GetLocation());
249
250 {
251 XrdSysCondVarHelper _lck(m_state_cond);
252
253 IoSet_i mi = m_io_set.find(io);
254
255 if (mi != m_io_set.end())
256 {
257 unsigned int n_active_reads = io->m_active_read_reqs;
258
259 TRACE(Info, "ioActive for io " << io <<
260 ", active_reads " << n_active_reads <<
261 ", active_prefetches " << io->m_active_prefetches <<
262 ", allow_prefetching " << io->m_allow_prefetching <<
263 ", ios_in_detach " << m_ios_in_detach);
264 TRACEF(Info,
265 "\tio_map.size() " << m_io_set.size() <<
266 ", block_map.size() " << m_block_map.size() << ", file");
267
268 insert_remote_location(loc);
269
270 io->m_allow_prefetching = false;
271 io->m_in_detach = true;
272
273 // Check if any IO is still available for prfetching. If not, stop it.
274 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
275 {
276 if ( ! select_current_io_or_disable_prefetching(false) )
277 {
278 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
279 }
280 }
281
282 // On last IO, consider write queue blocks. Note, this also contains
283 // blocks being prefetched.
284
285 bool io_active_result;
286
287 if (n_active_reads > 0)
288 {
289 io_active_result = true;
290 }
291 else if (m_io_set.size() - m_ios_in_detach == 1)
292 {
293 io_active_result = ! m_block_map.empty();
294 }
295 else
296 {
297 io_active_result = io->m_active_prefetches > 0;
298 }
299
300 if ( ! io_active_result)
301 {
302 ++m_ios_in_detach;
303 }
304
305 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
306
307 return io_active_result;
308 }
309 else
310 {
311 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
312 return false;
313 }
314 }
315}
#define TRACE(act, x)
Definition XrdTrace.hh:63
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:70

References Debug, Error, XrdPfc::IO::GetLocation(), XrdPfc::IO::m_active_read_reqs, TRACE, and TRACEF.

+ Here is the call graph for this function:

◆ ioUpdated()

void File::ioUpdated ( IO * io)

Notification from IO that it has been updated (remote open).

Definition at line 233 of file XrdPfcFile.cc.

234{
235 std::string loc(io->GetLocation());
236 XrdSysCondVarHelper _lck(m_state_cond);
237 insert_remote_location(loc);
238}

References XrdPfc::IO::GetLocation().

+ Here is the call graph for this function:

◆ is_in_emergency_shutdown()

bool XrdPfc::File::is_in_emergency_shutdown ( )
inline

Definition at line 291 of file XrdPfcFile.hh.

291{ return m_in_shutdown; }

◆ lPath()

const char * File::lPath ( ) const

Log path.

Definition at line 1609 of file XrdPfcFile.cc.

1610{
1611 return m_filename.c_str();
1612}

Referenced by XrdPfc::Cache::ProcessWriteTasks(), and XrdPfc::Cache::RemoveWriteQEntriesFor().

+ Here is the caller graph for this function:

◆ Prefetch()

void File::Prefetch ( )

Definition at line 1624 of file XrdPfcFile.cc.

1625{
1626 // Check that block is not on disk and not in RAM.
1627 // TODO: Could prefetch several blocks at once!
1628 // blks_max could be an argument
1629
1630 BlockList_t blks;
1631
1632 TRACEF(DumpXL, "Prefetch() entering.");
1633 {
1634 XrdSysCondVarHelper _lck(m_state_cond);
1635
1636 if (m_prefetch_state != kOn)
1637 {
1638 return;
1639 }
1640
1641 if ( ! select_current_io_or_disable_prefetching(true) )
1642 {
1643 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1644 return;
1645 }
1646
1647 // Select block(s) to fetch.
1648 for (int f = 0; f < m_num_blocks; ++f)
1649 {
1650 if ( ! m_cfi.TestBitWritten(f))
1651 {
1652 int f_act = f + m_offset / m_block_size;
1653
1654 BlockMap_i bi = m_block_map.find(f_act);
1655 if (bi == m_block_map.end())
1656 {
1657 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1658 if (b)
1659 {
1660 TRACEF(Dump, "Prefetch take block " << f_act);
1661 blks.push_back(b);
1662 // Note: block ref_cnt not increased, it will be when placed into write queue.
1663
1664 inc_prefetch_read_cnt(1);
1665 }
1666 else
1667 {
1668 // This shouldn't happen as prefetching stops when RAM is 70% full.
1669 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1670 }
1671 break;
1672 }
1673 }
1674 }
1675
1676 if (blks.empty())
1677 {
1678 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1679 m_prefetch_state = kComplete;
1680 cache()->DeRegisterPrefetchFile(this);
1681 }
1682 else
1683 {
1684 (*m_current_io)->m_active_prefetches += (int) blks.size();
1685 }
1686 }
1687
1688 if ( ! blks.empty())
1689 {
1690 ProcessBlockRequests(blks);
1691 }
1692}
std::list< Block * > BlockList_t

References Debug, Error, and TRACEF.

Referenced by XrdPfc::Cache::Prefetch().

+ Here is the caller graph for this function:

◆ Read()

int File::Read ( IO * io,
char * buff,
long long offset,
int size,
ReadReqRH * rh )

Normal read.

Definition at line 817 of file XrdPfcFile.cc.

818{
819 // rrc_func is ONLY called from async processing.
820 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
821 // This streamlines implementation of synchronous IO::Read().
822
823 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
824
825 m_state_cond.Lock();
826
827 if (m_in_shutdown || io->m_in_detach)
828 {
829 m_state_cond.UnLock();
830 return m_in_shutdown ? -ENOENT : -EBADF;
831 }
832
833 // Shortcut -- file is fully downloaded.
834
835 if (m_cfi.IsComplete())
836 {
837 m_state_cond.UnLock();
838 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
839 if (ret > 0) {
840 XrdSysCondVarHelper _lck(m_state_cond);
841 m_delta_stats.AddBytesHit(ret);
842 check_delta_stats();
843 }
844 return ret;
845 }
846
847 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
848
849 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
850}
unsigned short m_seq_id
Definition XrdPfcFile.hh:53

References Xrd::hex1, XrdPfc::ReadReqRH::m_seq_id, and TRACEF.

Referenced by XrdPfc::IOFileBlock::Read().

+ Here is the caller graph for this function:

◆ ReadV()

int File::ReadV ( IO * io,
const XrdOucIOVec * readV,
int readVnum,
ReadReqRH * rh )

Vector read.

Definition at line 854 of file XrdPfcFile.cc.

855{
856 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
857
858 m_state_cond.Lock();
859
860 if (m_in_shutdown || io->m_in_detach)
861 {
862 m_state_cond.UnLock();
863 return m_in_shutdown ? -ENOENT : -EBADF;
864 }
865
866 // Shortcut -- file is fully downloaded.
867
868 if (m_cfi.IsComplete())
869 {
870 m_state_cond.UnLock();
871 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
872 if (ret > 0) {
873 XrdSysCondVarHelper _lck(m_state_cond);
874 m_delta_stats.AddBytesHit(ret);
875 check_delta_stats();
876 }
877 return ret;
878 }
879
880 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
881}

References TRACEF.

◆ RefStats()

const Stats & XrdPfc::File::RefStats ( ) const
inline

Definition at line 281 of file XrdPfcFile.hh.

281{ return m_stats; }

◆ RemoveIO()

void File::RemoveIO ( IO * io)

Definition at line 386 of file XrdPfcFile.cc.

387{
388 // Called from Cache::ReleaseFile.
389
390 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
391
392 time_t now = time(0);
393
394 m_state_cond.Lock();
395
396 IoSet_i mi = m_io_set.find(io);
397
398 if (mi != m_io_set.end())
399 {
400 if (mi == m_current_io)
401 {
402 ++m_current_io;
403 }
404
405 m_delta_stats.IoDetach(now - io->m_attach_time);
406 m_io_set.erase(mi);
407 --m_ios_in_detach;
408
409 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
410 {
411 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
412 m_prefetch_state = kStopped;
413 cache()->DeRegisterPrefetchFile(this);
414 }
415 }
416 else
417 {
418 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
419 }
420
421 m_state_cond.UnLock();
422}

References Debug, Error, and TRACEF.

Referenced by XrdPfc::Cache::ReleaseFile().

+ Here is the caller graph for this function:

◆ RequestSyncOfDetachStats()

void File::RequestSyncOfDetachStats ( )

Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.

Definition at line 319 of file XrdPfcFile.cc.

320{
321 XrdSysCondVarHelper _lck(m_state_cond);
322 m_detach_time_logged = false;
323}

◆ StopPrefetchingOnIO()

void XrdPfc::File::StopPrefetchingOnIO ( IO * io)

◆ Sync()

void File::Sync ( )

Sync file cache inf o and output data with disk.

Definition at line 1223 of file XrdPfcFile.cc.

1224{
1225 TRACEF(Dump, "Sync()");
1226
1227 int ret = m_data_file->Fsync();
1228 bool errorp = false;
1229 if (ret == XrdOssOK)
1230 {
1231 Stats loc_stats;
1232 {
1233 XrdSysCondVarHelper _lck(&m_state_cond);
1234 report_and_merge_delta_stats();
1235 loc_stats = m_stats;
1236 }
1237 m_cfi.WriteIOStat(loc_stats);
1238 m_cfi.Write(m_info_file, m_filename.c_str());
1239 int cret = m_info_file->Fsync();
1240 if (cret != XrdOssOK)
1241 {
1242 TRACEF(Error, "Sync cinfo file sync error " << cret);
1243 errorp = true;
1244 }
1245 }
1246 else
1247 {
1248 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1249 errorp = true;
1250 }
1251
1252 if (errorp)
1253 {
1254 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1255
1256 // Unlink will also call this->initiate_emergency_shutdown()
1257 Cache::GetInstance().UnlinkFile(m_filename, false);
1258
1259 XrdSysCondVarHelper _lck(&m_state_cond);
1260
1261 m_writes_during_sync.clear();
1262 m_in_sync = false;
1263
1264 return;
1265 }
1266
1267 int written_while_in_sync;
1268 bool resync = false;
1269 {
1270 XrdSysCondVarHelper _lck(&m_state_cond);
1271 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1272 {
1273 m_cfi.SetBitSynced(*i);
1274 }
1275 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1276 m_writes_during_sync.clear();
1277
1278 // If there were writes during sync and the file is now complete,
1279 // let us call Sync again without resetting the m_in_sync flag.
1280 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1281 resync = true;
1282 else
1283 m_in_sync = false;
1284 }
1285 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1286
1287 if (resync)
1288 Sync();
1289}
#define XrdOssOK
Definition XrdOss.hh:50
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1188
void Sync()
Sync file cache inf o and output data with disk.
XrdPosixStats Stats

References Error, XrdPfc::Cache::GetInstance(), Sync(), TRACEF, XrdPfc::Cache::UnlinkFile(), and XrdOssOK.

Referenced by Sync().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ WriteBlockToDisk()

void File::WriteBlockToDisk ( Block * b)

Definition at line 1137 of file XrdPfcFile.cc.

1138{
1139 // write block buffer into disk file
1140 long long offset = b->m_offset - m_offset;
1141 long long size = b->get_size();
1142 ssize_t retval;
1143
1144 if (m_cfi.IsCkSumCache())
1145 if (b->has_cksums())
1146 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1147 else
1148 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1149 else
1150 retval = m_data_file->Write(b->get_buff(), offset, size);
1151
1152 if (retval < size)
1153 {
1154 if (retval < 0) {
1155 TRACEF(Error, "WriteToDisk() write error " << retval);
1156 } else {
1157 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1158 }
1159
1160 XrdSysCondVarHelper _lck(m_state_cond);
1161
1162 dec_ref_count(b);
1163
1164 return;
1165 }
1166
1167 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1168
1169 // Set written bit.
1170 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1171
1172 bool schedule_sync = false;
1173 {
1174 XrdSysCondVarHelper _lck(m_state_cond);
1175
1176 m_cfi.SetBitWritten(blk_idx);
1177
1178 if (b->m_prefetch)
1179 {
1180 m_cfi.SetBitPrefetch(blk_idx);
1181 }
1182 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1183 {
1184 m_cfi.ResetCkSumNet();
1185 }
1186
1187 // Set synced bit or stash block index if in actual sync.
1188 // Synced state is only written out to cinfo file when data file is synced.
1189 if (m_in_sync)
1190 {
1191 m_writes_during_sync.push_back(blk_idx);
1192 }
1193 else
1194 {
1195 m_cfi.SetBitSynced(blk_idx);
1196 ++m_non_flushed_cnt;
1197 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1198 ! m_in_shutdown)
1199 {
1200 schedule_sync = true;
1201 m_in_sync = true;
1202 m_non_flushed_cnt = 0;
1203 }
1204 }
1205 // As soon as the reference count is decreased on the block, the
1206 // file object may be deleted. Thus, to avoid holding both locks at a time,
1207 // we defer the ref count decrease until later if a sync is needed
1208 if (!schedule_sync) {
1209 dec_ref_count(b);
1210 }
1211 }
1212
1213 if (schedule_sync)
1214 {
1215 cache()->ScheduleFileSync(this);
1216 XrdSysCondVarHelper _lck(m_state_cond);
1217 dec_ref_count(b);
1218 }
1219}
int get_size() const
vCkSum_t & ref_cksum_vec()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:215
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:122

References Error, XrdPfc::Block::get_buff(), XrdPfc::Block::get_size(), XrdPfc::Cache::GetInstance(), XrdPfc::Block::has_cksums(), XrdPfc::Block::m_offset, XrdPfc::Block::m_prefetch, XrdPfc::Block::ref_cksum_vec(), XrdPfc::Block::req_cksum_net(), and TRACEF.

Referenced by XrdPfc::Cache::ProcessWriteTasks().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Friends And Related Symbol Documentation

◆ BlockResponseHandler

friend class BlockResponseHandler
friend

Definition at line 205 of file XrdPfcFile.hh.

References BlockResponseHandler.

Referenced by BlockResponseHandler.

◆ Cache

friend class Cache
friend

Definition at line 204 of file XrdPfcFile.hh.

References Cache.

Referenced by Cache.

◆ DirectResponseHandler

friend class DirectResponseHandler
friend

Definition at line 206 of file XrdPfcFile.hh.

References DirectResponseHandler.

Referenced by DirectResponseHandler.


The documentation for this class was generated from the following files: