XRootD
Loading...
Searching...
No Matches
XrdPfc::File Class Reference

#include <XrdPfcFile.hh>

Collaboration diagram for XrdPfc::File:

Public Member Functions

void AddIO (IO *io)
void BlockRemovedFromWriteQ (Block *)
 Handle removal of a block from Cache's write queue.
void BlocksRemovedFromWriteQ (std::list< Block * > &)
 Handle removal of a set of blocks from Cache's write queue.
int dec_ref_cnt ()
bool FinalizeSyncBeforeExit ()
 Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
int Fstat (struct stat &sbuff)
int get_ref_cnt ()
size_t GetAccessCnt () const
int GetBlockSize () const
long long GetFileSize () const
const Info::AStatGetLastAccessStats () const
const std::string & GetLocalPath () const
XrdSysErrorGetLog () const
int GetNBlocks () const
int GetNDownloadedBlocks () const
int GetPrefetchCountOnIO (IO *io)
long long GetPrefetchedBytes () const
float GetPrefetchScore () const
std::string GetRemoteLocations () const
XrdSysTraceGetTrace () const
int inc_ref_cnt ()
long long initiate_emergency_shutdown ()
bool ioActive (IO *io)
 Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close().
void ioUpdated (IO *io)
 Notification from IO that it has been updated (remote open).
bool is_in_emergency_shutdown ()
const char * lPath () const
 Log path.
void Prefetch ()
int Read (IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
 Normal read.
int ReadV (IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
 Vector read.
const StatsRefStats () const
void RemoveIO (IO *io)
void RequestSyncOfDetachStats ()
 Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void StopPrefetchingOnIO (IO *io)
void Sync ()
 Sync file cache inf o and output data with disk.
void WriteBlockToDisk (Block *b)

Static Public Member Functions

static File * FileOpen (const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
 Static constructor that also does Open. Returns null ptr if Open fails.

Friends

class BlockResponseHandler
class Cache
class DirectResponseHandler

Detailed Description

Definition at line 202 of file XrdPfcFile.hh.

Member Function Documentation

◆ AddIO()

void File::AddIO ( IO * io)

Definition at line 349 of file XrdPfcFile.cc.

350{
351 // Called from Cache::GetFile() when a new IO asks for the file.
352
353 TRACEF(Debug, "AddIO() io = " << (void*)io);
354
355 time_t now = time(0);
356 std::string loc(io->GetLocation());
357
358 m_state_cond.Lock();
359
360 IoSet_i mi = m_io_set.find(io);
361
362 if (mi == m_io_set.end())
363 {
364 m_io_set.insert(io);
365 io->m_attach_time = now;
366 m_delta_stats.IoAttach();
367
368 insert_remote_location(loc);
369
370 if (m_prefetch_state == kStopped)
371 {
372 m_prefetch_state = kOn;
373 cache()->RegisterPrefetchFile(this);
374 }
375 }
376 else
377 {
378 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
379 }
380
381 m_state_cond.UnLock();
382}
#define TRACEF(act, x)
bool Debug
const char * GetLocation()
Definition XrdPfcIO.hh:44

References Debug, Error, XrdPfc::IO::GetLocation(), and TRACEF.

Referenced by XrdPfc::Cache::GetFile().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ BlockRemovedFromWriteQ()

void File::BlockRemovedFromWriteQ ( Block * b)

Handle removal of a block from Cache's write queue.

Definition at line 211 of file XrdPfcFile.cc.

212{
213 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
214
215 XrdSysCondVarHelper _lck(m_state_cond);
216 dec_ref_count(b);
217}
long long m_offset

References XrdPfc::Block::m_offset, and TRACEF.

◆ BlocksRemovedFromWriteQ()

void File::BlocksRemovedFromWriteQ ( std::list< Block * > & blocks)

Handle removal of a set of blocks from Cache's write queue.

Definition at line 219 of file XrdPfcFile.cc.

220{
221 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
222
223 XrdSysCondVarHelper _lck(m_state_cond);
224
225 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
226 {
227 dec_ref_count(*i);
228 }
229}

References TRACEF.

Referenced by XrdPfc::Cache::RemoveWriteQEntriesFor().

Here is the caller graph for this function:

◆ dec_ref_cnt()

int XrdPfc::File::dec_ref_cnt ( )
inline

Definition at line 288 of file XrdPfcFile.hh.

288{ return --m_ref_cnt; }

◆ FileOpen()

File * File::FileOpen ( const std::string & path,
long long offset,
long long fileSize,
XrdOucCacheIO * inputIO )
static

Static constructor that also does Open. Returns null ptr if Open fails.

Definition at line 141 of file XrdPfcFile.cc.

142{
143 File *file = new File(path, offset, fileSize);
144 if ( ! file->Open(inputIO))
145 {
146 delete file;
147 file = 0;
148 }
149 return file;
150}
XrdOucString File

References File.

Referenced by XrdPfc::Cache::GetFile().

Here is the caller graph for this function:

◆ FinalizeSyncBeforeExit()

bool File::FinalizeSyncBeforeExit ( )

Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.

Definition at line 325 of file XrdPfcFile.cc.

326{
327 // Returns true if sync is required.
328 // This method is called after corresponding IO is detached from PosixCache.
329
330 XrdSysCondVarHelper _lck(m_state_cond);
331 if ( ! m_in_shutdown)
332 {
333 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
334 {
335 report_and_merge_delta_stats();
336 m_cfi.WriteIOStatDetach(m_stats);
337 m_detach_time_logged = true;
338 m_in_sync = true;
339 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
340 return true;
341 }
342 }
343 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
344 return false;
345}

References Debug, and TRACEF.

◆ Fstat()

int File::Fstat ( struct stat & sbuff)

Definition at line 629 of file XrdPfcFile.cc.

630{
631 // Stat on an open file.
632 // Corrects size to actual full size of the file.
633 // Sets atime to 0 if the file is only partially downloaded, in accordance
634 // with pfc.onlyifcached settings.
635 // Called from IO::Fstat() and Cache::Stat() when the file is active.
636 // Returns 0 on success, -errno on error.
637
638 int res;
639
640 if ((res = m_data_file->Fstat(&sbuff))) return res;
641
642 sbuff.st_size = m_file_size;
643
644 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
645 if ( ! is_cached)
646 sbuff.st_atime = 0;
647
648 return 0;
649}

References stat.

Referenced by XrdPfc::Cache::ConsiderCached(), and XrdPfc::Cache::Stat().

Here is the caller graph for this function:

◆ get_ref_cnt()

int XrdPfc::File::get_ref_cnt ( )
inline

Definition at line 286 of file XrdPfcFile.hh.

286{ return m_ref_cnt; }

◆ GetAccessCnt()

size_t XrdPfc::File::GetAccessCnt ( ) const
inline

Definition at line 276 of file XrdPfcFile.hh.

276{ return m_cfi.GetAccessCnt(); }

◆ GetBlockSize()

int XrdPfc::File::GetBlockSize ( ) const
inline

Definition at line 277 of file XrdPfcFile.hh.

277{ return m_cfi.GetBufferSize(); }

◆ GetFileSize()

long long XrdPfc::File::GetFileSize ( ) const
inline

Definition at line 267 of file XrdPfcFile.hh.

267{ return m_file_size; }

◆ GetLastAccessStats()

const Info::AStat * XrdPfc::File::GetLastAccessStats ( ) const
inline

Definition at line 275 of file XrdPfcFile.hh.

275{ return m_cfi.GetLastAccessStats(); }

◆ GetLocalPath()

const std::string & XrdPfc::File::GetLocalPath ( ) const
inline

Definition at line 262 of file XrdPfcFile.hh.

262{ return m_filename; }

Referenced by XrdPfc::Cache::AddWriteTask(), and XrdPfc::Cache::ReleaseFile().

Here is the caller graph for this function:

◆ GetLog()

XrdSysError * File::GetLog ( ) const

Definition at line 1699 of file XrdPfcFile.cc.

1700{
1701 return Cache::TheOne().GetLog();
1702}
XrdSysError * GetLog() const
Definition XrdPfc.hh:294
static const Cache & TheOne()
Definition XrdPfc.cc:133

References XrdPfc::Cache::GetLog(), and XrdPfc::Cache::TheOne().

Here is the call graph for this function:

◆ GetNBlocks()

int XrdPfc::File::GetNBlocks ( ) const
inline

Definition at line 278 of file XrdPfcFile.hh.

278{ return m_cfi.GetNBlocks(); }

◆ GetNDownloadedBlocks()

int XrdPfc::File::GetNDownloadedBlocks ( ) const
inline

Definition at line 279 of file XrdPfcFile.hh.

279{ return m_cfi.GetNDownloadedBlocks(); }

◆ GetPrefetchCountOnIO()

int XrdPfc::File::GetPrefetchCountOnIO ( IO * io)

◆ GetPrefetchedBytes()

long long XrdPfc::File::GetPrefetchedBytes ( ) const
inline

Definition at line 280 of file XrdPfcFile.hh.

280{ return m_prefetch_bytes; }

◆ GetPrefetchScore()

float File::GetPrefetchScore ( ) const

Definition at line 1694 of file XrdPfcFile.cc.

1695{
1696 return m_prefetch_score;
1697}

◆ GetRemoteLocations()

std::string File::GetRemoteLocations ( ) const

Definition at line 1718 of file XrdPfcFile.cc.

1719{
1720 std::string s;
1721 if ( ! m_remote_locations.empty())
1722 {
1723 size_t sl = 0;
1724 int nl = 0;
1725 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1726 {
1727 sl += i->size();
1728 }
1729 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1730 s = '[';
1731 int j = 1;
1732 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1733 {
1734 s += '"'; s += *i; s += '"';
1735 if (j < nl) s += ',';
1736 }
1737 s += ']';
1738 }
1739 else
1740 {
1741 s = "[]";
1742 }
1743 return s;
1744}

◆ GetTrace()

XrdSysTrace * File::GetTrace ( ) const

Definition at line 1704 of file XrdPfcFile.cc.

1705{
1706 return Cache::TheOne().GetTrace();
1707}
XrdSysTrace * GetTrace() const
Definition XrdPfc.hh:295

References XrdPfc::Cache::GetTrace(), and XrdPfc::Cache::TheOne().

Here is the call graph for this function:

◆ inc_ref_cnt()

int XrdPfc::File::inc_ref_cnt ( )
inline

Definition at line 287 of file XrdPfcFile.hh.

287{ return ++m_ref_cnt; }

◆ initiate_emergency_shutdown()

long long File::initiate_emergency_shutdown ( )

Definition at line 154 of file XrdPfcFile.cc.

155{
156 // Called from Cache::Unlink() when the file is currently open.
157 // Cache::Unlink is also called on FSync error and when wrong number of bytes
158 // is received from a remote read.
159 //
160 // From this point onward the file will not be written to, cinfo file will
161 // not be updated, and all new read requests will return -ENOENT.
162 //
163 // File's entry in the Cache's active map is set to nullptr and will be
164 // removed from there shortly, in any case, well before this File object
165 // shuts down. Cache::Unlink() also reports the appropriate purge event.
166
167 XrdSysCondVarHelper _lck(m_state_cond);
168
169 m_in_shutdown = true;
170
171 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
172 {
173 m_prefetch_state = kStopped;
174 cache()->DeRegisterPrefetchFile(this);
175 }
176
177 report_and_merge_delta_stats();
178
179 return m_st_blocks;
180}

Referenced by XrdPfc::Cache::UnlinkFile().

Here is the caller graph for this function:

◆ ioActive()

bool File::ioActive ( IO * io)

Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close().

Definition at line 242 of file XrdPfcFile.cc.

243{
244 // Returns true if delay is needed.
245
246 TRACEF(Debug, "ioActive start for io " << io);
247
248 std::string loc(io->GetLocation());
249
250 {
251 XrdSysCondVarHelper _lck(m_state_cond);
252
253 IoSet_i mi = m_io_set.find(io);
254
255 if (mi != m_io_set.end())
256 {
257 unsigned int n_active_reads = io->m_active_read_reqs;
258
259 TRACE(Info, "ioActive for io " << io <<
260 ", active_reads " << n_active_reads <<
261 ", active_prefetches " << io->m_active_prefetches <<
262 ", allow_prefetching " << io->m_allow_prefetching <<
263 ", ios_in_detach " << m_ios_in_detach);
264 TRACEF(Info,
265 "\tio_map.size() " << m_io_set.size() <<
266 ", block_map.size() " << m_block_map.size() << ", file");
267
268 insert_remote_location(loc);
269
270 io->m_allow_prefetching = false;
271 io->m_in_detach = true;
272
273 // Check if any IO is still available for prfetching. If not, stop it.
274 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
275 {
276 if ( ! select_current_io_or_disable_prefetching(false) )
277 {
278 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
279 }
280 }
281
282 // On last IO, consider write queue blocks. Note, this also contains
283 // blocks being prefetched.
284
285 bool io_active_result;
286
287 if (n_active_reads > 0)
288 {
289 io_active_result = true;
290 }
291 else if (m_io_set.size() - m_ios_in_detach == 1)
292 {
293 io_active_result = ! m_block_map.empty();
294 }
295 else
296 {
297 io_active_result = io->m_active_prefetches > 0;
298 }
299
300 if ( ! io_active_result)
301 {
302 ++m_ios_in_detach;
303 }
304
305 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
306
307 return io_active_result;
308 }
309 else
310 {
311 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
312 return false;
313 }
314 }
315}
#define TRACE(act, x)
Definition XrdTrace.hh:63
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:70

References Debug, Error, XrdPfc::IO::GetLocation(), XrdPfc::IO::m_active_read_reqs, TRACE, and TRACEF.

Here is the call graph for this function:

◆ ioUpdated()

void File::ioUpdated ( IO * io)

Notification from IO that it has been updated (remote open).

Definition at line 233 of file XrdPfcFile.cc.

234{
235 std::string loc(io->GetLocation());
236 XrdSysCondVarHelper _lck(m_state_cond);
237 insert_remote_location(loc);
238}

References XrdPfc::IO::GetLocation().

Here is the call graph for this function:

◆ is_in_emergency_shutdown()

bool XrdPfc::File::is_in_emergency_shutdown ( )
inline

Definition at line 291 of file XrdPfcFile.hh.

291{ return m_in_shutdown; }

◆ lPath()

const char * File::lPath ( ) const

Log path.

Definition at line 1606 of file XrdPfcFile.cc.

1607{
1608 return m_filename.c_str();
1609}

Referenced by XrdPfc::Cache::ProcessWriteTasks(), and XrdPfc::Cache::RemoveWriteQEntriesFor().

Here is the caller graph for this function:

◆ Prefetch()

void File::Prefetch ( )

Definition at line 1621 of file XrdPfcFile.cc.

1622{
1623 // Check that block is not on disk and not in RAM.
1624 // TODO: Could prefetch several blocks at once!
1625 // blks_max could be an argument
1626
1627 BlockList_t blks;
1628
1629 TRACEF(DumpXL, "Prefetch() entering.");
1630 {
1631 XrdSysCondVarHelper _lck(m_state_cond);
1632
1633 if (m_prefetch_state != kOn)
1634 {
1635 return;
1636 }
1637
1638 if ( ! select_current_io_or_disable_prefetching(true) )
1639 {
1640 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1641 return;
1642 }
1643
1644 // Select block(s) to fetch.
1645 for (int f = 0; f < m_num_blocks; ++f)
1646 {
1647 if ( ! m_cfi.TestBitWritten(f))
1648 {
1649 int f_act = f + m_offset / m_block_size;
1650
1651 BlockMap_i bi = m_block_map.find(f_act);
1652 if (bi == m_block_map.end())
1653 {
1654 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1655 if (b)
1656 {
1657 TRACEF(Dump, "Prefetch take block " << f_act);
1658 blks.push_back(b);
1659 // Note: block ref_cnt not increased, it will be when placed into write queue.
1660
1661 inc_prefetch_read_cnt(1);
1662 }
1663 else
1664 {
1665 // This shouldn't happen as prefetching stops when RAM is 70% full.
1666 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1667 }
1668 break;
1669 }
1670 }
1671 }
1672
1673 if (blks.empty())
1674 {
1675 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1676 m_prefetch_state = kComplete;
1677 cache()->DeRegisterPrefetchFile(this);
1678 }
1679 else
1680 {
1681 (*m_current_io)->m_active_prefetches += (int) blks.size();
1682 }
1683 }
1684
1685 if ( ! blks.empty())
1686 {
1687 ProcessBlockRequests(blks);
1688 }
1689}
std::list< Block * > BlockList_t

References Debug, Error, and TRACEF.

Referenced by XrdPfc::Cache::Prefetch().

Here is the caller graph for this function:

◆ Read()

int File::Read ( IO * io,
char * buff,
long long offset,
int size,
ReadReqRH * rh )

Normal read.

Definition at line 814 of file XrdPfcFile.cc.

815{
816 // rrc_func is ONLY called from async processing.
817 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
818 // This streamlines implementation of synchronous IO::Read().
819
820 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
821
822 m_state_cond.Lock();
823
824 if (m_in_shutdown || io->m_in_detach)
825 {
826 m_state_cond.UnLock();
827 return m_in_shutdown ? -ENOENT : -EBADF;
828 }
829
830 // Shortcut -- file is fully downloaded.
831
832 if (m_cfi.IsComplete())
833 {
834 m_state_cond.UnLock();
835 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
836 if (ret > 0) {
837 XrdSysCondVarHelper _lck(m_state_cond);
838 m_delta_stats.AddBytesHit(ret);
839 check_delta_stats();
840 }
841 return ret;
842 }
843
844 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
845
846 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
847}
unsigned short m_seq_id
Definition XrdPfcFile.hh:53

References Xrd::hex1, XrdPfc::ReadReqRH::m_seq_id, and TRACEF.

Referenced by XrdPfc::IOFileBlock::Read().

Here is the caller graph for this function:

◆ ReadV()

int File::ReadV ( IO * io,
const XrdOucIOVec * readV,
int readVnum,
ReadReqRH * rh )

Vector read.

Definition at line 851 of file XrdPfcFile.cc.

852{
853 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
854
855 m_state_cond.Lock();
856
857 if (m_in_shutdown || io->m_in_detach)
858 {
859 m_state_cond.UnLock();
860 return m_in_shutdown ? -ENOENT : -EBADF;
861 }
862
863 // Shortcut -- file is fully downloaded.
864
865 if (m_cfi.IsComplete())
866 {
867 m_state_cond.UnLock();
868 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
869 if (ret > 0) {
870 XrdSysCondVarHelper _lck(m_state_cond);
871 m_delta_stats.AddBytesHit(ret);
872 check_delta_stats();
873 }
874 return ret;
875 }
876
877 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
878}

References TRACEF.

◆ RefStats()

const Stats & XrdPfc::File::RefStats ( ) const
inline

Definition at line 281 of file XrdPfcFile.hh.

281{ return m_stats; }

◆ RemoveIO()

void File::RemoveIO ( IO * io)

Definition at line 386 of file XrdPfcFile.cc.

387{
388 // Called from Cache::ReleaseFile.
389
390 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
391
392 time_t now = time(0);
393
394 m_state_cond.Lock();
395
396 IoSet_i mi = m_io_set.find(io);
397
398 if (mi != m_io_set.end())
399 {
400 if (mi == m_current_io)
401 {
402 ++m_current_io;
403 }
404
405 m_delta_stats.IoDetach(now - io->m_attach_time);
406 m_io_set.erase(mi);
407 --m_ios_in_detach;
408
409 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
410 {
411 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
412 m_prefetch_state = kStopped;
413 cache()->DeRegisterPrefetchFile(this);
414 }
415 }
416 else
417 {
418 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
419 }
420
421 m_state_cond.UnLock();
422}

References Debug, Error, and TRACEF.

Referenced by XrdPfc::Cache::ReleaseFile().

Here is the caller graph for this function:

◆ RequestSyncOfDetachStats()

void File::RequestSyncOfDetachStats ( )

Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.

Definition at line 319 of file XrdPfcFile.cc.

320{
321 XrdSysCondVarHelper _lck(m_state_cond);
322 m_detach_time_logged = false;
323}

◆ StopPrefetchingOnIO()

void XrdPfc::File::StopPrefetchingOnIO ( IO * io)

◆ Sync()

void File::Sync ( )

Sync file cache inf o and output data with disk.

Definition at line 1220 of file XrdPfcFile.cc.

1221{
1222 TRACEF(Dump, "Sync()");
1223
1224 int ret = m_data_file->Fsync();
1225 bool errorp = false;
1226 if (ret == XrdOssOK)
1227 {
1228 Stats loc_stats;
1229 {
1230 XrdSysCondVarHelper _lck(&m_state_cond);
1231 report_and_merge_delta_stats();
1232 loc_stats = m_stats;
1233 }
1234 m_cfi.WriteIOStat(loc_stats);
1235 m_cfi.Write(m_info_file, m_filename.c_str());
1236 int cret = m_info_file->Fsync();
1237 if (cret != XrdOssOK)
1238 {
1239 TRACEF(Error, "Sync cinfo file sync error " << cret);
1240 errorp = true;
1241 }
1242 }
1243 else
1244 {
1245 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1246 errorp = true;
1247 }
1248
1249 if (errorp)
1250 {
1251 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1252
1253 // Unlink will also call this->initiate_emergency_shutdown()
1254 Cache::GetInstance().UnlinkFile(m_filename, false);
1255
1256 XrdSysCondVarHelper _lck(&m_state_cond);
1257
1258 m_writes_during_sync.clear();
1259 m_in_sync = false;
1260
1261 return;
1262 }
1263
1264 int written_while_in_sync;
1265 bool resync = false;
1266 {
1267 XrdSysCondVarHelper _lck(&m_state_cond);
1268 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1269 {
1270 m_cfi.SetBitSynced(*i);
1271 }
1272 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1273 m_writes_during_sync.clear();
1274
1275 // If there were writes during sync and the file is now complete,
1276 // let us call Sync again without resetting the m_in_sync flag.
1277 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1278 resync = true;
1279 else
1280 m_in_sync = false;
1281 }
1282 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1283
1284 if (resync)
1285 Sync();
1286}
#define XrdOssOK
Definition XrdOss.hh:50
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1188
void Sync()
Sync file cache inf o and output data with disk.
XrdPosixStats Stats

References Error, XrdPfc::Cache::GetInstance(), Sync(), TRACEF, XrdPfc::Cache::UnlinkFile(), and XrdOssOK.

Referenced by Sync().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ WriteBlockToDisk()

void File::WriteBlockToDisk ( Block * b)

Definition at line 1134 of file XrdPfcFile.cc.

1135{
1136 // write block buffer into disk file
1137 long long offset = b->m_offset - m_offset;
1138 long long size = b->get_size();
1139 ssize_t retval;
1140
1141 if (m_cfi.IsCkSumCache())
1142 if (b->has_cksums())
1143 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1144 else
1145 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1146 else
1147 retval = m_data_file->Write(b->get_buff(), offset, size);
1148
1149 if (retval < size)
1150 {
1151 if (retval < 0) {
1152 TRACEF(Error, "WriteToDisk() write error " << retval);
1153 } else {
1154 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1155 }
1156
1157 XrdSysCondVarHelper _lck(m_state_cond);
1158
1159 dec_ref_count(b);
1160
1161 return;
1162 }
1163
1164 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1165
1166 // Set written bit.
1167 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1168
1169 bool schedule_sync = false;
1170 {
1171 XrdSysCondVarHelper _lck(m_state_cond);
1172
1173 m_cfi.SetBitWritten(blk_idx);
1174
1175 if (b->m_prefetch)
1176 {
1177 m_cfi.SetBitPrefetch(blk_idx);
1178 }
1179 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1180 {
1181 m_cfi.ResetCkSumNet();
1182 }
1183
1184 // Set synced bit or stash block index if in actual sync.
1185 // Synced state is only written out to cinfo file when data file is synced.
1186 if (m_in_sync)
1187 {
1188 m_writes_during_sync.push_back(blk_idx);
1189 }
1190 else
1191 {
1192 m_cfi.SetBitSynced(blk_idx);
1193 ++m_non_flushed_cnt;
1194 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1195 ! m_in_shutdown)
1196 {
1197 schedule_sync = true;
1198 m_in_sync = true;
1199 m_non_flushed_cnt = 0;
1200 }
1201 }
1202 // As soon as the reference count is decreased on the block, the
1203 // file object may be deleted. Thus, to avoid holding both locks at a time,
1204 // we defer the ref count decrease until later if a sync is needed
1205 if (!schedule_sync) {
1206 dec_ref_count(b);
1207 }
1208 }
1209
1210 if (schedule_sync)
1211 {
1212 cache()->ScheduleFileSync(this);
1213 XrdSysCondVarHelper _lck(m_state_cond);
1214 dec_ref_count(b);
1215 }
1216}
int get_size() const
vCkSum_t & ref_cksum_vec()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:215
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:122

References Error, XrdPfc::Block::get_buff(), XrdPfc::Block::get_size(), XrdPfc::Cache::GetInstance(), XrdPfc::Block::has_cksums(), XrdPfc::Block::m_offset, XrdPfc::Block::m_prefetch, XrdPfc::Block::ref_cksum_vec(), XrdPfc::Block::req_cksum_net(), and TRACEF.

Referenced by XrdPfc::Cache::ProcessWriteTasks().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ BlockResponseHandler

friend class BlockResponseHandler
friend

Definition at line 205 of file XrdPfcFile.hh.

References BlockResponseHandler.

Referenced by BlockResponseHandler.

◆ Cache

friend class Cache
friend

Definition at line 204 of file XrdPfcFile.hh.

References Cache.

Referenced by Cache.

◆ DirectResponseHandler

friend class DirectResponseHandler
friend

Definition at line 206 of file XrdPfcFile.hh.

References DirectResponseHandler.

Referenced by DirectResponseHandler.


The documentation for this class was generated from the following files: