37#include <unordered_map>
46const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
52const char *File::m_traceID =
"File";
56File::File(
const std::string& path,
long long iOffset,
long long iFileSize) :
60 m_cfi(
Cache::TheOne().GetTrace(),
Cache::TheOne().is_prefetch_enabled()),
63 m_file_size(iFileSize),
64 m_current_io(m_io_set.end()),
68 m_detach_time_logged(false),
74 m_prefetch_state(kOff),
76 m_prefetch_read_cnt(0),
77 m_prefetch_hit_cnt(0),
104 m_info_file->Close();
106 m_info_file =
nullptr;
112 m_data_file->Close();
114 m_data_file =
nullptr;
117 if (m_resmon_token >= 0)
122 if (m_stats.m_BytesWritten > 0 && ! m_in_shutdown) {
125 if (sr == 0 && s.st_blocks != m_st_blocks) {
128 m_st_blocks = s.st_blocks;
136 TRACEF(
Debug,
"Close() finished, prefetch score = " << m_prefetch_score);
143 File *file =
new File(path, offset, fileSize);
144 if ( ! file->Open(inputIO))
169 m_in_shutdown =
true;
171 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
173 m_prefetch_state = kStopped;
174 cache()->DeRegisterPrefetchFile(
this);
177 report_and_merge_delta_stats();
184void File::check_delta_stats()
189 report_and_merge_delta_stats();
192void File::report_and_merge_delta_stats()
196 m_data_file->
Fstat(&s);
199 long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
201 long long st_blocks_to_report = std::min((
long long) s.st_blocks, max_st_blocks_to_report);
203 m_st_blocks = st_blocks_to_report;
205 m_stats.
AddUp(m_delta_stats);
206 m_delta_stats.
Reset();
213 TRACEF(Dump,
"BlockRemovedFromWriteQ() block = " << (
void*) b <<
" idx= " << b->
m_offset/m_block_size);
221 TRACEF(Dump,
"BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
225 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
237 insert_remote_location(loc);
253 IoSet_i mi = m_io_set.find(io);
255 if (mi != m_io_set.end())
260 ", active_reads " << n_active_reads <<
261 ", active_prefetches " << io->m_active_prefetches <<
262 ", allow_prefetching " << io->m_allow_prefetching <<
263 ", ios_in_detach " << m_ios_in_detach);
265 "\tio_map.size() " << m_io_set.size() <<
266 ", block_map.size() " << m_block_map.size() <<
", file");
268 insert_remote_location(loc);
270 io->m_allow_prefetching =
false;
271 io->m_in_detach =
true;
274 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
276 if ( ! select_current_io_or_disable_prefetching(
false) )
278 TRACEF(
Debug,
"ioActive stopping prefetching after io " << io <<
" retreat.");
285 bool io_active_result;
287 if (n_active_reads > 0)
289 io_active_result =
true;
291 else if (m_io_set.size() - m_ios_in_detach == 1)
293 io_active_result = ! m_block_map.empty();
297 io_active_result = io->m_active_prefetches > 0;
300 if ( ! io_active_result)
305 TRACEF(
Info,
"ioActive for io " << io <<
" returning " << io_active_result <<
", file");
307 return io_active_result;
311 TRACEF(
Error,
"ioActive io " << io <<
" not found in IoSet. This should not happen.");
322 m_detach_time_logged =
false;
331 if ( ! m_in_shutdown)
333 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
335 report_and_merge_delta_stats();
336 m_cfi.WriteIOStatDetach(m_stats);
337 m_detach_time_logged =
true;
339 TRACEF(
Debug,
"FinalizeSyncBeforeExit requesting sync to write detach stats");
343 TRACEF(
Debug,
"FinalizeSyncBeforeExit sync not required");
355 time_t now = time(0);
360 IoSet_i mi = m_io_set.find(io);
362 if (mi == m_io_set.end())
365 io->m_attach_time = now;
366 m_delta_stats.IoAttach();
368 insert_remote_location(loc);
370 if (m_prefetch_state == kStopped)
372 m_prefetch_state = kOn;
373 cache()->RegisterPrefetchFile(
this);
378 TRACEF(
Error,
"AddIO() io = " << (
void*)io <<
" already registered.");
381 m_state_cond.UnLock();
392 time_t now = time(0);
396 IoSet_i mi = m_io_set.find(io);
398 if (mi != m_io_set.end())
400 if (mi == m_current_io)
405 m_delta_stats.IoDetach(now - io->m_attach_time);
409 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
411 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" Prefetching is not stopped/complete -- it should be by now.");
412 m_prefetch_state = kStopped;
413 cache()->DeRegisterPrefetchFile(
this);
418 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" is NOT registered.");
421 m_state_cond.UnLock();
430 static const char *tpfx =
"Open() ";
432 TRACEF(Dump, tpfx <<
"entered");
443 struct stat data_stat, info_stat;
447 bool data_existed = (myOss.
Stat(m_filename.c_str(), &data_stat) ==
XrdOssOK);
448 bool info_existed = (myOss.
Stat(ifn.c_str(), &info_stat) ==
XrdOssOK);
451 char size_str[32]; sprintf(size_str,
"%lld", m_file_size);
452 myEnv.
Put(
"oss.asize", size_str);
464 m_data_file = myOss.
newFile(myUser);
465 if ((res = m_data_file->
Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
469 delete m_data_file; m_data_file = 0;
473 myEnv.
Put(
"oss.asize",
"64k");
479 m_data_file->Close();
delete m_data_file; m_data_file = 0;
483 m_info_file = myOss.
newFile(myUser);
484 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
488 delete m_info_file; m_info_file = 0;
489 m_data_file->Close();
delete m_data_file; m_data_file = 0;
493 bool initialize_info_file =
true;
495 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
497 TRACEF(
Debug, tpfx <<
"Reading existing info file. (data_existed=" << data_existed <<
498 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
499 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() <<
500 ", block_size=" << (m_cfi.GetBufferSize() >> 10) <<
"k)");
503 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
505 initialize_info_file =
false;
507 TRACEF(Warning, tpfx <<
"Basic sanity checks on data file failed, resetting info file, truncating data file.");
508 m_cfi.ResetAllAccessStats();
509 m_data_file->Ftruncate(0);
514 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.
get_cs_Chk())
519 TRACEF(Info, tpfx <<
"Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
520 initialize_info_file =
true;
521 m_cfi.ResetAllAccessStats();
522 m_data_file->Ftruncate(0);
535 parse_pfc_url_args(inputIO, pfc_blocksize, pfc_prefetch);
538 if (initialize_info_file)
540 m_cfi.SetBufferSizeFileSizeAndCreationTime(pfc_blocksize, m_file_size);
542 m_cfi.ResetNoCkSumTime();
543 m_cfi.Write(m_info_file, ifn.c_str());
544 m_info_file->Fsync();
545 cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
546 TRACEF(
Debug, tpfx <<
"Creating new file info, data size = " << m_file_size <<
" num blocks = " << m_cfi.GetNBlocks()
547 <<
" block size = " << pfc_blocksize);
551 if (futimens(m_info_file->getFD(), NULL)) {
555 TRACEF(Info, tpfx <<
"URL CGI pfc.blocksize ignored for an already existing file");
559 m_cfi.WriteIOStatAttach();
561 m_block_size = m_cfi.GetBufferSize();
562 m_num_blocks = m_cfi.GetNBlocks();
563 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped;
564 m_prefetch_max_blocks_in_flight = pfc_prefetch;
566 TRACEF(
Debug, tpfx <<
"pfc.prefetch set to " << pfc_prefetch <<
" via CGI parameter");
568 m_data_file->Fstat(&data_stat);
569 m_st_blocks = data_stat.st_blocks;
572 constexpr long long MB = 1024 * 1024;
573 m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
577 m_state_cond.UnLock();
582void File::parse_pfc_url_args(XrdOucCacheIO* inputIO,
long long &pfc_blocksize,
int &pfc_prefetch)
const
586 XrdCl::URL url(inputIO->
Path());
587 auto const & urlp = url.GetParams();
589 auto extract = [&](
const std::string &key, std::string &value) ->
bool {
590 auto it = urlp.find(key);
591 if (it != urlp.end()) {
603 const char *tpfx =
"File::Open::urlcgi pfc.blocksize ";
605 if (
Cache::TheOne().blocksize_str2value(tpfx, val.c_str(), bsize,
608 pfc_blocksize = bsize;
610 TRACEF(
Error, tpfx <<
"Error processing the parameter.");
615 const char *tpfx =
"File::Open::urlcgi pfc.prefetch ";
617 if (
Cache::TheOne().prefetch_str2value(tpfx, val.c_str(), pref,
622 TRACEF(
Error, tpfx <<
"Error processing the parameter.");
640 if ((res = m_data_file->Fstat(&sbuff)))
return res;
642 sbuff.st_size = m_file_size;
644 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
655bool File::overlap(
int blk,
664 const long long beg = blk * blk_size;
665 const long long end = beg + blk_size;
666 const long long req_end = req_off + req_size;
668 if (req_off < end && req_end > beg)
670 const long long ovlp_beg = std::max(beg, req_off);
671 const long long ovlp_end = std::min(end, req_end);
673 off = ovlp_beg - req_off;
674 blk_off = ovlp_beg - beg;
675 size = (int) (ovlp_end - ovlp_beg);
677 assert(size <= blk_size);
688Block* File::PrepareBlockRequest(
int i,
IO *io,
void *req_id,
bool prefetch)
696 const long long off = i * m_block_size;
697 const int last_block = m_num_blocks - 1;
698 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
700 int blk_size, req_size;
701 if (i == last_block) {
702 blk_size = req_size = m_file_size - off;
703 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
705 blk_size = req_size = m_block_size;
709 char *buf = cache()->RequestRAM(req_size);
713 b =
new (std::nothrow) Block(
this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
721 if (m_prefetch_state == kOn && (
int) m_block_map.size() >= m_prefetch_max_blocks_in_flight)
723 m_prefetch_state = kHold;
724 cache()->DeRegisterPrefetchFile(
this);
729 TRACEF(Dump,
"PrepareBlockRequest() " << i <<
" prefetch " << prefetch <<
", allocation failed.");
736void File::ProcessBlockRequest(
Block *b)
744 snprintf(buf, 256,
"idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
745 b->
get_offset()/m_block_size, (
void*)b, b->m_prefetch, b->get_offset(), b->get_req_size(), (
void*)b->get_buff(), (
void*)brh);
746 TRACEF(Dump,
"ProcessBlockRequest() " << buf);
762 for (
BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
764 ProcessBlockRequest(*bi);
770void File::RequestBlocksDirect(
IO *io,
ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec,
int expected_size)
772 int n_chunks = ioVec.size();
775 TRACEF(DumpXL,
"RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
776 ", total_size = " << expected_size <<
", n_vec_reads = " << n_vec_reads);
786 io->
GetInput()->
ReadV( *handler, ioVec.data() + pos, n_chunks);
791int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec,
int expected_size)
793 TRACEF(DumpXL,
"ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
795 long long rs = m_data_file->ReadV(ioVec.data(), (
int) ioVec.size());
799 TRACEF(
Error,
"ReadBlocksFromDisk neg retval = " << rs);
803 if (rs != expected_size)
805 TRACEF(
Error,
"ReadBlocksFromDisk incomplete size = " << rs);
824 if (m_in_shutdown || io->m_in_detach)
826 m_state_cond.UnLock();
827 return m_in_shutdown ? -ENOENT : -EBADF;
832 if (m_cfi.IsComplete())
834 m_state_cond.UnLock();
835 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
838 m_delta_stats.AddBytesHit(ret);
844 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
846 return ReadOpusCoalescere(io, &readV, 1, rh,
"Read() ");
853 TRACEF(Dump,
"ReadV() for " << readVnum <<
" chunks.");
857 if (m_in_shutdown || io->m_in_detach)
859 m_state_cond.UnLock();
860 return m_in_shutdown ? -ENOENT : -EBADF;
865 if (m_cfi.IsComplete())
867 m_state_cond.UnLock();
868 int ret = m_data_file->ReadV(
const_cast<XrdOucIOVec*
>(readV), readVnum);
871 m_delta_stats.AddBytesHit(ret);
877 return ReadOpusCoalescere(io, readV, readVnum, rh,
"ReadV() ");
882int File::ReadOpusCoalescere(
IO *io,
const XrdOucIOVec *readV,
int readVnum,
894 int prefetch_cnt = 0;
899 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
901 std::vector<XrdOucIOVec> iovec_disk;
902 std::vector<XrdOucIOVec> iovec_direct;
903 int iovec_disk_total = 0;
904 int iovec_direct_total = 0;
906 for (
int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
913 const int idx_first = iUserOff / m_block_size;
914 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
916 TRACEF(DumpXL, tpfx <<
"sid: " <<
Xrd::hex1 << rh->
m_seq_id <<
" idx_first: " << idx_first <<
" idx_last: " << idx_last);
918 enum LastBlock_e { LB_other, LB_disk, LB_direct };
920 LastBlock_e lbe = LB_other;
922 for (
int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
925 BlockMap_i bi = m_block_map.find(block_idx);
932 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
935 if (bi != m_block_map.end())
937 inc_ref_count(bi->second);
938 TRACEF(Dump, tpfx << (
void*) iUserBuff <<
" inc_ref_count for existing block " << bi->second <<
" idx = " << block_idx);
940 if (bi->second->is_finished())
944 assert(bi->second->is_ok());
946 blks_ready[bi->second].emplace_back(
ChunkRequest(
nullptr, iUserBuff + off, blk_off, size) );
948 if (bi->second->m_prefetch)
954 read_req =
new ReadRequest(io, rh);
959 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
966 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
968 TRACEF(DumpXL, tpfx <<
"read from disk " << (
void*)iUserBuff <<
" idx = " << block_idx);
971 iovec_disk.back().size += size;
973 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
974 iovec_disk_total += size;
976 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
985 read_req =
new ReadRequest(io, rh);
988 Block *b = PrepareBlockRequest(block_idx, io, read_req,
false);
991 TRACEF(Dump, tpfx <<
"inc_ref_count new " << (
void*)iUserBuff <<
" idx = " << block_idx);
993 blks_to_request.push_back(b);
995 b->
m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
1002 TRACEF(DumpXL, tpfx <<
"direct block " << block_idx <<
", blk_off " << blk_off <<
", size " << size);
1004 iovec_direct_total += size;
1011 iovec_direct.back().size += size;
1013 long long in_offset = block_idx * m_block_size + blk_off;
1014 char *out_pos = iUserBuff + off;
1021 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
1030 inc_prefetch_hit_cnt(prefetch_cnt);
1032 m_state_cond.UnLock();
1035 if ( ! blks_to_request.empty())
1037 ProcessBlockRequests(blks_to_request);
1038 blks_to_request.clear();
1042 if ( ! iovec_direct.empty())
1044 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
1046 TRACEF(Dump, tpfx <<
"direct read requests sent out, n_chunks = " << (
int) iovec_direct.size() <<
", total_size = " << iovec_direct_total);
1051 long long bytes_read = 0;
1055 if ( ! blks_ready.empty())
1057 for (
auto &bvi : blks_ready)
1059 for (
auto &cr : bvi.second)
1061 TRACEF(DumpXL, tpfx <<
"ub=" << (
void*)cr.m_buf <<
" from pre-finished block " << bvi.first->m_offset/m_block_size <<
" size " << cr.m_size);
1062 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
1063 bytes_read += cr.m_size;
1069 if ( ! iovec_disk.empty())
1071 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1072 TRACEF(DumpXL, tpfx <<
"from disk finished size = " << rc);
1087 m_state_cond.Lock();
1089 for (
auto &bvi : blks_ready)
1090 dec_ref_count(bvi.first, (
int) bvi.second.size());
1103 m_delta_stats.AddReadStats(read_req->
m_stats);
1104 check_delta_stats();
1105 m_state_cond.UnLock();
1113 m_state_cond.UnLock();
1114 return -EWOULDBLOCK;
1119 m_delta_stats.m_BytesHit += bytes_read;
1120 check_delta_stats();
1121 m_state_cond.UnLock();
1125 return error_cond ? error_cond : bytes_read;
1137 long long offset = b->
m_offset - m_offset;
1141 if (m_cfi.IsCkSumCache())
1145 retval = m_data_file->pgWrite(b->
get_buff(), offset, size, 0, 0);
1147 retval = m_data_file->Write(b->
get_buff(), offset, size);
1152 TRACEF(
Error,
"WriteToDisk() write error " << retval);
1154 TRACEF(
Error,
"WriteToDisk() incomplete block write ret=" << retval <<
" (should be " << size <<
")");
1164 const int blk_idx = (b->
m_offset - m_offset) / m_block_size;
1167 TRACEF(Dump,
"WriteToDisk() success set bit for block " << b->
m_offset <<
" size=" << size);
1169 bool schedule_sync =
false;
1173 m_cfi.SetBitWritten(blk_idx);
1177 m_cfi.SetBitPrefetch(blk_idx);
1181 m_cfi.ResetCkSumNet();
1188 m_writes_during_sync.push_back(blk_idx);
1192 m_cfi.SetBitSynced(blk_idx);
1193 ++m_non_flushed_cnt;
1194 if ((m_cfi.IsComplete() || m_non_flushed_cnt >=
Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1197 schedule_sync =
true;
1199 m_non_flushed_cnt = 0;
1205 if (!schedule_sync) {
1212 cache()->ScheduleFileSync(
this);
1224 int ret = m_data_file->Fsync();
1225 bool errorp =
false;
1231 report_and_merge_delta_stats();
1232 loc_stats = m_stats;
1234 m_cfi.WriteIOStat(loc_stats);
1235 m_cfi.Write(m_info_file, m_filename.c_str());
1236 int cret = m_info_file->Fsync();
1239 TRACEF(
Error,
"Sync cinfo file sync error " << cret);
1245 TRACEF(
Error,
"Sync data file sync error " << ret <<
", cinfo file has not been updated");
1251 TRACEF(
Error,
"Sync failed, unlinking local files and initiating shutdown of File object");
1258 m_writes_during_sync.clear();
1264 int written_while_in_sync;
1265 bool resync =
false;
1268 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1270 m_cfi.SetBitSynced(*i);
1272 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1273 m_writes_during_sync.clear();
1277 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1282 TRACEF(Dump,
"Sync "<< written_while_in_sync <<
" blocks written during sync." << (resync ?
" File is now complete - resyncing." :
""));
1293void File::free_block(
Block* b)
1296 int i = b->
m_offset / m_block_size;
1297 TRACEF(Dump,
"free_block block " << b <<
" idx = " << i);
1298 size_t ret = m_block_map.erase(i);
1302 TRACEF(
Error,
"free_block did not erase " << i <<
" from map");
1310 if (m_prefetch_state == kHold && (
int) m_block_map.size() < m_prefetch_max_blocks_in_flight)
1312 m_prefetch_state = kOn;
1313 cache()->RegisterPrefetchFile(
this);
1319bool File::select_current_io_or_disable_prefetching(
bool skip_current)
1323 int io_size = (int) m_io_set.size();
1328 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1331 m_current_io = m_io_set.begin();
1334 else if (io_size > 1)
1336 IoSet_i mi = m_current_io;
1337 if (skip_current && mi != m_io_set.end()) ++mi;
1339 for (
int i = 0; i < io_size; ++i)
1341 if (mi == m_io_set.end()) mi = m_io_set.begin();
1343 if ((*mi)->m_allow_prefetching)
1355 m_current_io = m_io_set.end();
1356 m_prefetch_state = kStopped;
1357 cache()->DeRegisterPrefetchFile(
this);
1365void File::ProcessDirectReadFinished(
ReadRequest *rreq,
int bytes_read,
int error_cond)
1371 TRACEF(
Error,
"Read(), direct read finished with error " << -error_cond <<
" " <<
XrdSysE2T(-error_cond));
1373 m_state_cond.Lock();
1386 m_state_cond.UnLock();
1389 FinalizeReadRequest(rreq);
1416 TRACEF(Dump,
"ProcessBlockSuccess() ub=" << (
void*)creq.
m_buf <<
" from finished block " << b->
m_offset/m_block_size <<
" size " << creq.
m_size);
1419 m_state_cond.Lock();
1424 rreq->m_stats.m_BytesMissed += creq.
m_size;
1426 rreq->m_stats.m_BytesHit += creq.
m_size;
1428 --rreq->m_n_chunk_reqs;
1431 inc_prefetch_hit_cnt(1);
1435 bool rreq_complete = rreq->is_complete();
1437 m_state_cond.UnLock();
1440 FinalizeReadRequest(rreq);
1448 XrdSysCondVarHelper _lck(m_state_cond);
1449 m_delta_stats.AddReadStats(rreq->
m_stats);
1450 check_delta_stats();
1457void File::ProcessBlockResponse(
Block *b,
int res)
1459 static const char* tpfx =
"ProcessBlockResponse ";
1461 TRACEF(Dump, tpfx <<
"block=" << b <<
", idx=" << b->
m_offset/m_block_size <<
", off=" << b->
m_offset <<
", res=" << res);
1463 if (res >= 0 && res != b->
get_size())
1467 TRACEF(
Error, tpfx <<
"Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1471 m_state_cond.Lock();
1477 IoSet_i mi = m_io_set.find(io);
1478 if (mi != m_io_set.end())
1480 --io->m_active_prefetches;
1483 if (res < 0 && io->m_allow_prefetching)
1485 TRACEF(
Debug, tpfx <<
"after failed prefetch on io " << io <<
" disabling prefetching on this io.");
1486 io->m_allow_prefetching =
false;
1489 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1491 if ( ! select_current_io_or_disable_prefetching(
false) )
1493 TRACEF(
Debug, tpfx <<
"stopping prefetching after io " << b->
get_io() <<
" marked as bad.");
1499 if (b->
m_refcnt == 0 && (res < 0 || m_in_shutdown))
1502 m_state_cond.UnLock();
1516 TRACEF(Dump, tpfx <<
"inc_ref_count idx=" << b->
m_offset/m_block_size);
1517 if ( ! m_in_shutdown)
1523 cache()->AddWriteTask(b,
true);
1530 m_state_cond.UnLock();
1532 for (
auto &creq : creqs_to_notify)
1534 ProcessBlockSuccess(b, creq);
1543 <<
", io=" << b->
get_io() <<
", error=" << res);
1548 <<
", io=" << b->
get_io() <<
" incomplete, got " << res <<
" expected " << b->
get_size());
1549#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1560 std::list<ReadRequest*> rreqs_to_complete;
1569 ProcessBlockError(b, rreq);
1572 rreqs_to_complete.push_back(rreq);
1577 creqs_to_keep.push_back(creq);
1581 bool reissue =
false;
1582 if ( ! creqs_to_keep.empty())
1584 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1586 TRACEF(
Debug,
"ProcessBlockResponse() requested block " << (
void*)b <<
" failed with another io " <<
1587 b->
get_io() <<
" - reissuing request with my io " << rreq->
m_io);
1594 m_state_cond.UnLock();
1596 for (
auto rreq : rreqs_to_complete)
1597 FinalizeReadRequest(rreq);
1600 ProcessBlockRequest(b);
1608 return m_filename.c_str();
1613int File::offsetIdx(
int iIdx)
const
1615 return iIdx - m_offset/m_block_size;
1629 TRACEF(DumpXL,
"Prefetch() entering.");
1633 if (m_prefetch_state != kOn)
1638 if ( ! select_current_io_or_disable_prefetching(
true) )
1640 TRACEF(
Error,
"Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1645 for (
int f = 0; f < m_num_blocks; ++f)
1647 if ( ! m_cfi.TestBitWritten(f))
1649 int f_act = f + m_offset / m_block_size;
1651 BlockMap_i bi = m_block_map.find(f_act);
1652 if (bi == m_block_map.end())
1654 Block *b = PrepareBlockRequest(f_act, *m_current_io,
nullptr,
true);
1657 TRACEF(Dump,
"Prefetch take block " << f_act);
1661 inc_prefetch_read_cnt(1);
1666 TRACEF(Warning,
"Prefetch allocation failed for block " << f_act);
1675 TRACEF(
Debug,
"Prefetch file is complete, stopping prefetch.");
1676 m_prefetch_state = kComplete;
1677 cache()->DeRegisterPrefetchFile(
this);
1681 (*m_current_io)->m_active_prefetches += (int) blks.size();
1685 if ( ! blks.empty())
1687 ProcessBlockRequests(blks);
1696 return m_prefetch_score;
1709void File::insert_remote_location(
const std::string &loc)
1713 size_t p = loc.find_first_of(
'@');
1714 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1721 if ( ! m_remote_locations.empty())
1725 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1729 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1732 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1734 s +=
'"'; s += *i; s +=
'"';
1735 if (j < nl) s +=
',';
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
const char * XrdSysE2T(int errcode)
virtual int Fstat(struct stat *buf)
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual const char * Path()=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
void Done(int result) override
int * ptr_n_cksum_errors()
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void * get_req_id() const
bool req_cksum_net() const
void reset_error_and_set_io(IO *io, void *rid)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
XrdSysError * GetLog() const
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static ResourceMonitor & ResMon()
static Cache & GetInstance()
Singleton access.
XrdSysTrace * GetTrace() const
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
static const Cache & TheOne()
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void Sync()
Sync file cache inf o and output data with disk.
XrdSysTrace * GetTrace() const
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
XrdSysError * GetLog() const
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close().
Base cache-io class that implements some XrdOucCacheIO abstract methods.
bool register_incomplete_read()
XrdOucCacheIO * GetInput()
bool register_block_error(int res)
RAtomic_int m_active_read_reqs
number of active read requests
const char * GetLocation()
Status of cached file. Can be read from and written into a binary file.
static const char * s_infoExtension
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
long long BytesReadAndWritten() const
long long m_BytesHit
number of bytes served from disk
std::list< Block * > BlockList_t
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
static const int maxRvecsz
Contains parameters configurable from the xrootd config file.
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
CkSumCheck_e get_cs_Chk() const
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
bool should_uvkeep_purge(time_t delta) const
std::string m_data_space
oss space for data files
long long m_bufferSize
cache block size, default 128 kB
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
std::string m_meta_space
oss space for metadata files (cinfo)
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
std::string m_username
username passed to oss plugin
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
void update_error_cond(int ec)