XRootD
Loading...
Searching...
No Matches
XrdPfcFile.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19
20#include "XrdPfcFile.hh"
21#include "XrdPfc.hh"
23#include "XrdPfcIO.hh"
24#include "XrdPfcTrace.hh"
25
27#include "XrdSys/XrdSysTimer.hh"
28#include "XrdOss/XrdOss.hh"
29#include "XrdOuc/XrdOucEnv.hh"
31
32#include "XrdCl/XrdClURL.hh"
33
34#include <cassert>
35#include <cstdio>
36#include <sstream>
37#include <unordered_map>
38
39#include <fcntl.h>
40
41using namespace XrdPfc;
42
43namespace
44{
45
46const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
47
48Cache* cache() { return &Cache::GetInstance(); }
49
50}
51
52const char *File::m_traceID = "File";
53
54//------------------------------------------------------------------------------
55
56File::File(const std::string& path, long long iOffset, long long iFileSize) :
57 m_ref_cnt(0),
58 m_data_file(0),
59 m_info_file(0),
60 m_cfi(Cache::TheOne().GetTrace(), Cache::TheOne().is_prefetch_enabled()),
61 m_filename(path),
62 m_offset(iOffset),
63 m_file_size(iFileSize),
64 m_current_io(m_io_set.end()),
65 m_ios_in_detach(0),
66 m_non_flushed_cnt(0),
67 m_in_sync(false),
68 m_detach_time_logged(false),
69 m_in_shutdown(false),
70 m_state_cond(0),
71 m_block_size(0),
72 m_num_blocks(0),
73 m_resmon_token(-1),
74 m_prefetch_state(kOff),
75 m_prefetch_bytes(0),
76 m_prefetch_read_cnt(0),
77 m_prefetch_hit_cnt(0),
78 m_prefetch_score(0)
79{}
80
81File::~File()
82{
83 TRACEF(Debug, "~File() for ");
84}
85
86void File::Close()
87{
88 // Close is called while nullptr is put into Cache::m_active map, see Cache::dec_ref_count(File*).
89 // A stat is called after close to re-check that m_stat_blocks have been reported correctly
90 // to the resource-monitor. Note that the reporting is already clamped down to m_file_size
91 // in report_and_merge_delta_stats() below.
92 //
93 // XFS can pre-allocate significant amount of blocks (1 GB at 1GB mark, 4 GB above 4GB) and those
94 // get reported in as stat.st_blocks.
95 // The reported number is correct in a stat immediately following a close.
96 // If one starts off by writing the last byte of the file, this pre-allocation does not get
97 // triggered up to that point. But comes back with a vengeance right after.
98 //
99 // To be determined if other FSes do something similar (Ceph, ZFS, ...). Ext4 doesn't.
100
101 if (m_info_file)
102 {
103 TRACEF(Debug, "Close() closing info-file ");
104 m_info_file->Close();
105 delete m_info_file;
106 m_info_file = nullptr;
107 }
108
109 if (m_data_file)
110 {
111 TRACEF(Debug, "Close() closing data-file ");
112 m_data_file->Close();
113 delete m_data_file;
114 m_data_file = nullptr;
115 }
116
117 if (m_resmon_token >= 0)
118 {
119 // Last update of file stats has been sent from the final Sync unless we are in_shutdown --
120 // but in this case the file will get unlinked by the cache and reported as purge event.
121 // We check if the reported st_blocks so far is correct.
122 if (m_stats.m_BytesWritten > 0 && ! m_in_shutdown) {
123 struct stat s;
124 int sr = Cache::GetInstance().GetOss()->Stat(m_filename.c_str(), &s);
125 if (sr == 0 && s.st_blocks != m_st_blocks) {
126 Stats stats;
127 stats.m_StBlocksAdded = s.st_blocks - m_st_blocks;
128 m_st_blocks = s.st_blocks;
129 Cache::ResMon().register_file_update_stats(m_resmon_token, stats);
130 }
131 }
132
133 Cache::ResMon().register_file_close(m_resmon_token, time(0), m_stats);
134 }
135
136 TRACEF(Debug, "Close() finished, prefetch score = " << m_prefetch_score);
137}
138
139//------------------------------------------------------------------------------
140
141File* File::FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO* inputIO)
142{
143 File *file = new File(path, offset, fileSize);
144 if ( ! file->Open(inputIO))
145 {
146 delete file;
147 file = 0;
148 }
149 return file;
150}
151
152//------------------------------------------------------------------------------
153
155{
156 // Called from Cache::Unlink() when the file is currently open.
157 // Cache::Unlink is also called on FSync error and when wrong number of bytes
158 // is received from a remote read.
159 //
160 // From this point onward the file will not be written to, cinfo file will
161 // not be updated, and all new read requests will return -ENOENT.
162 //
163 // File's entry in the Cache's active map is set to nullptr and will be
164 // removed from there shortly, in any case, well before this File object
165 // shuts down. Cache::Unlink() also reports the appropriate purge event.
166
167 XrdSysCondVarHelper _lck(m_state_cond);
168
169 m_in_shutdown = true;
170
171 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
172 {
173 m_prefetch_state = kStopped;
174 cache()->DeRegisterPrefetchFile(this);
175 }
176
177 report_and_merge_delta_stats();
178
179 return m_st_blocks;
180}
181
182//------------------------------------------------------------------------------
183
184void File::check_delta_stats()
185{
186 // Called under m_state_cond lock.
187 // BytesWritten indirectly trigger an unconditional merge through periodic Sync().
188 if (m_delta_stats.BytesReadAndWritten() >= m_resmon_report_threshold && ! m_in_shutdown)
189 report_and_merge_delta_stats();
190}
191
192void File::report_and_merge_delta_stats()
193{
194 // Called under m_state_cond lock.
195 struct stat s;
196 m_data_file->Fstat(&s);
197 // Do not report st_blocks beyond 4kB round-up over m_file_size. Some FSs report
198 // aggressive pre-allocation in this field (XFS, 4GB).
199 long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
200 : m_file_size >> 9;
201 long long st_blocks_to_report = std::min((long long) s.st_blocks, max_st_blocks_to_report);
202 m_delta_stats.m_StBlocksAdded = st_blocks_to_report - m_st_blocks;
203 m_st_blocks = st_blocks_to_report;
204 Cache::ResMon().register_file_update_stats(m_resmon_token, m_delta_stats);
205 m_stats.AddUp(m_delta_stats);
206 m_delta_stats.Reset();
207}
208
209//------------------------------------------------------------------------------
210
212{
213 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
214
215 XrdSysCondVarHelper _lck(m_state_cond);
216 dec_ref_count(b);
217}
218
219void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
220{
221 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
222
223 XrdSysCondVarHelper _lck(m_state_cond);
224
225 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
226 {
227 dec_ref_count(*i);
228 }
229}
230
231//------------------------------------------------------------------------------
232
234{
235 std::string loc(io->GetLocation());
236 XrdSysCondVarHelper _lck(m_state_cond);
237 insert_remote_location(loc);
238}
239
240//------------------------------------------------------------------------------
241
243{
244 // Returns true if delay is needed.
245
246 TRACEF(Debug, "ioActive start for io " << io);
247
248 std::string loc(io->GetLocation());
249
250 {
251 XrdSysCondVarHelper _lck(m_state_cond);
252
253 IoSet_i mi = m_io_set.find(io);
254
255 if (mi != m_io_set.end())
256 {
257 unsigned int n_active_reads = io->m_active_read_reqs;
258
259 TRACE(Info, "ioActive for io " << io <<
260 ", active_reads " << n_active_reads <<
261 ", active_prefetches " << io->m_active_prefetches <<
262 ", allow_prefetching " << io->m_allow_prefetching <<
263 ", ios_in_detach " << m_ios_in_detach);
264 TRACEF(Info,
265 "\tio_map.size() " << m_io_set.size() <<
266 ", block_map.size() " << m_block_map.size() << ", file");
267
268 insert_remote_location(loc);
269
270 io->m_allow_prefetching = false;
271 io->m_in_detach = true;
272
273 // Check if any IO is still available for prfetching. If not, stop it.
274 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
275 {
276 if ( ! select_current_io_or_disable_prefetching(false) )
277 {
278 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
279 }
280 }
281
282 // On last IO, consider write queue blocks. Note, this also contains
283 // blocks being prefetched.
284
285 bool io_active_result;
286
287 if (n_active_reads > 0)
288 {
289 io_active_result = true;
290 }
291 else if (m_io_set.size() - m_ios_in_detach == 1)
292 {
293 io_active_result = ! m_block_map.empty();
294 }
295 else
296 {
297 io_active_result = io->m_active_prefetches > 0;
298 }
299
300 if ( ! io_active_result)
301 {
302 ++m_ios_in_detach;
303 }
304
305 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
306
307 return io_active_result;
308 }
309 else
310 {
311 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
312 return false;
313 }
314 }
315}
316
317//------------------------------------------------------------------------------
318
320{
321 XrdSysCondVarHelper _lck(m_state_cond);
322 m_detach_time_logged = false;
323}
324
326{
327 // Returns true if sync is required.
328 // This method is called after corresponding IO is detached from PosixCache.
329
330 XrdSysCondVarHelper _lck(m_state_cond);
331 if ( ! m_in_shutdown)
332 {
333 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
334 {
335 report_and_merge_delta_stats();
336 m_cfi.WriteIOStatDetach(m_stats);
337 m_detach_time_logged = true;
338 m_in_sync = true;
339 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
340 return true;
341 }
342 }
343 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
344 return false;
345}
346
347//------------------------------------------------------------------------------
348
350{
351 // Called from Cache::GetFile() when a new IO asks for the file.
352
353 TRACEF(Debug, "AddIO() io = " << (void*)io);
354
355 time_t now = time(0);
356 std::string loc(io->GetLocation());
357
358 m_state_cond.Lock();
359
360 IoSet_i mi = m_io_set.find(io);
361
362 if (mi == m_io_set.end())
363 {
364 m_io_set.insert(io);
365 io->m_attach_time = now;
366 m_delta_stats.IoAttach();
367
368 insert_remote_location(loc);
369
370 if (m_prefetch_state == kStopped)
371 {
372 m_prefetch_state = kOn;
373 cache()->RegisterPrefetchFile(this);
374 }
375 }
376 else
377 {
378 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
379 }
380
381 m_state_cond.UnLock();
382}
383
384//------------------------------------------------------------------------------
385
387{
388 // Called from Cache::ReleaseFile.
389
390 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
391
392 time_t now = time(0);
393
394 m_state_cond.Lock();
395
396 IoSet_i mi = m_io_set.find(io);
397
398 if (mi != m_io_set.end())
399 {
400 if (mi == m_current_io)
401 {
402 ++m_current_io;
403 }
404
405 m_delta_stats.IoDetach(now - io->m_attach_time);
406 m_io_set.erase(mi);
407 --m_ios_in_detach;
408
409 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
410 {
411 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
412 m_prefetch_state = kStopped;
413 cache()->DeRegisterPrefetchFile(this);
414 }
415 }
416 else
417 {
418 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
419 }
420
421 m_state_cond.UnLock();
422}
423
424//------------------------------------------------------------------------------
425
426bool File::Open(XrdOucCacheIO* inputIO)
427{
428 // Sets errno accordingly.
429
430 static const char *tpfx = "Open() ";
431
432 TRACEF(Dump, tpfx << "entered");
433
434 // Before touching anything, check with ResourceMonitor if a scan is in progress.
435 // This function will wait internally if needed until it is safe to proceed.
436 Cache::ResMon().CrossCheckIfScanIsInProgress(m_filename, m_state_cond);
437
439
440 XrdOss &myOss = * Cache::GetInstance().GetOss();
441 const char *myUser = conf.m_username.c_str();
442 XrdOucEnv myEnv;
443 struct stat data_stat, info_stat;
444
445 std::string ifn = m_filename + Info::s_infoExtension;
446
447 bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
448 bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
449
450 // Create the data file itself.
451 char size_str[32]; sprintf(size_str, "%lld", m_file_size);
452 myEnv.Put("oss.asize", size_str);
453 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
454
455 int res;
456
457 if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
458 {
459 TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
460 errno = -res;
461 return false;
462 }
463
464 m_data_file = myOss.newFile(myUser);
465 if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
466 {
467 TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
468 errno = -res;
469 delete m_data_file; m_data_file = 0;
470 return false;
471 }
472
473 myEnv.Put("oss.asize", "64k"); // Advisory, block-map and access list lengths vary.
474 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
475 if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
476 {
477 TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
478 errno = -res;
479 m_data_file->Close(); delete m_data_file; m_data_file = 0;
480 return false;
481 }
482
483 m_info_file = myOss.newFile(myUser);
484 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
485 {
486 TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
487 errno = -res;
488 delete m_info_file; m_info_file = 0;
489 m_data_file->Close(); delete m_data_file; m_data_file = 0;
490 return false;
491 }
492
493 bool initialize_info_file = true;
494
495 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
496 {
497 TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
498 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
499 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() <<
500 ", block_size=" << (m_cfi.GetBufferSize() >> 10) << "k)");
501
502 // Check if data file exists and is of reasonable size.
503 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
504 {
505 initialize_info_file = false;
506 } else {
507 TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
508 m_cfi.ResetAllAccessStats();
509 m_data_file->Ftruncate(0);
510 Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
511 }
512 }
513
514 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
515 {
516 if (conf.does_cschk_have_missing_bits(m_cfi.GetCkSumState()) &&
517 conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
518 {
519 TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
520 initialize_info_file = true;
521 m_cfi.ResetAllAccessStats();
522 m_data_file->Ftruncate(0);
523 Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
524 } else {
525 // TODO: If the file is complete, we don't need to reset net cksums.
526 m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
527 }
528 }
529
530 // Check if we have pfc url arguments.
531 long long pfc_blocksize = conf.m_bufferSize;
532 int pfc_prefetch = conf.m_prefetch_max_blocks;
534 {
535 parse_pfc_url_args(inputIO, pfc_blocksize, pfc_prefetch);
536 }
537
538 if (initialize_info_file)
539 {
540 m_cfi.SetBufferSizeFileSizeAndCreationTime(pfc_blocksize, m_file_size);
541 m_cfi.SetCkSumState(conf.get_cs_Chk());
542 m_cfi.ResetNoCkSumTime();
543 m_cfi.Write(m_info_file, ifn.c_str());
544 m_info_file->Fsync();
545 cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
546 TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks()
547 << " block size = " << pfc_blocksize);
548 }
549 else
550 {
551 if (futimens(m_info_file->getFD(), NULL)) {
552 TRACEF(Error, tpfx << "failed setting modification time " << ERRNO_AND_ERRSTR(errno));
553 }
554 if (pfc_blocksize != conf.m_bufferSize) {
555 TRACEF(Info, tpfx << "URL CGI pfc.blocksize ignored for an already existing file");
556 }
557 }
558
559 m_cfi.WriteIOStatAttach();
560 m_state_cond.Lock();
561 m_block_size = m_cfi.GetBufferSize();
562 m_num_blocks = m_cfi.GetNBlocks();
563 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
564 m_prefetch_max_blocks_in_flight = pfc_prefetch;
565 if (pfc_prefetch != conf.m_prefetch_max_blocks)
566 TRACEF(Debug, tpfx << "pfc.prefetch set to " << pfc_prefetch << " via CGI parameter");
567
568 m_data_file->Fstat(&data_stat);
569 m_st_blocks = data_stat.st_blocks;
570
571 m_resmon_token = Cache::ResMon().register_file_open(m_filename, time(0), data_existed);
572 constexpr long long MB = 1024 * 1024;
573 m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
574 // m_resmon_report_threshold_scaler; // something like 10% of original threshold, to adjust
575 // actual threshold based on return values from register_file_update_stats().
576
577 m_state_cond.UnLock();
578
579 return true;
580}
581
582void File::parse_pfc_url_args(XrdOucCacheIO* inputIO, long long &pfc_blocksize, int &pfc_prefetch) const
583{
584 const Configuration &conf = Cache::TheOne().RefConfiguration();
585
586 XrdCl::URL url(inputIO->Path());
587 auto const & urlp = url.GetParams();
588
589 auto extract = [&](const std::string &key, std::string &value) -> bool {
590 auto it = urlp.find(key);
591 if (it != urlp.end()) {
592 value = it->second;
593 return true;
594 } else {
595 value.clear();
596 return false;
597 }
598 };
599
600 std::string val;
601 if (conf.m_cgi_blocksize_allowed && extract("pfc.blocksize", val))
602 {
603 const char *tpfx = "File::Open::urlcgi pfc.blocksize ";
604 long long bsize;
605 if (Cache::TheOne().blocksize_str2value(tpfx, val.c_str(), bsize,
607 {
608 pfc_blocksize = bsize;
609 } else {
610 TRACEF(Error, tpfx << "Error processing the parameter.");
611 }
612 }
613 if (conf.m_cgi_prefetch_allowed && extract("pfc.prefetch", val))
614 {
615 const char *tpfx = "File::Open::urlcgi pfc.prefetch ";
616 int pref;
617 if (Cache::TheOne().prefetch_str2value(tpfx, val.c_str(), pref,
619 {
620 pfc_prefetch = pref;
621 } else {
622 TRACEF(Error, tpfx << "Error processing the parameter.");
623 }
624 }
625}
626
627//------------------------------------------------------------------------------
628
629int File::Fstat(struct stat &sbuff)
630{
631 // Stat on an open file.
632 // Corrects size to actual full size of the file.
633 // Sets atime to 0 if the file is only partially downloaded, in accordance
634 // with pfc.onlyifcached settings.
635 // Called from IO::Fstat() and Cache::Stat() when the file is active.
636 // Returns 0 on success, -errno on error.
637
638 int res;
639
640 if ((res = m_data_file->Fstat(&sbuff))) return res;
641
642 sbuff.st_size = m_file_size;
643
644 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
645 if ( ! is_cached)
646 sbuff.st_atime = 0;
647
648 return 0;
649}
650
651//==============================================================================
652// Read and helpers
653//==============================================================================
654
655bool File::overlap(int blk, // block to query
656 long long blk_size, //
657 long long req_off, // offset of user request
658 int req_size, // size of user request
659 // output:
660 long long &off, // offset in user buffer
661 long long &blk_off, // offset in block
662 int &size) // size to copy
663{
664 const long long beg = blk * blk_size;
665 const long long end = beg + blk_size;
666 const long long req_end = req_off + req_size;
667
668 if (req_off < end && req_end > beg)
669 {
670 const long long ovlp_beg = std::max(beg, req_off);
671 const long long ovlp_end = std::min(end, req_end);
672
673 off = ovlp_beg - req_off;
674 blk_off = ovlp_beg - beg;
675 size = (int) (ovlp_end - ovlp_beg);
676
677 assert(size <= blk_size);
678 return true;
679 }
680 else
681 {
682 return false;
683 }
684}
685
686//------------------------------------------------------------------------------
687
688Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
689{
690 // Must be called w/ state_cond locked.
691 // Checks on size etc should be done before.
692 //
693 // Reference count is 0 so increase it in calling function if you want to
694 // catch the block while still in memory.
695
696 const long long off = i * m_block_size;
697 const int last_block = m_num_blocks - 1;
698 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
699
700 int blk_size, req_size;
701 if (i == last_block) {
702 blk_size = req_size = m_file_size - off;
703 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
704 } else {
705 blk_size = req_size = m_block_size;
706 }
707
708 Block *b = 0;
709 char *buf = cache()->RequestRAM(req_size);
710
711 if (buf)
712 {
713 b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
714
715 if (b)
716 {
717 m_block_map[i] = b;
718
719 // Actual Read request is issued in ProcessBlockRequests().
720
721 if (m_prefetch_state == kOn && (int) m_block_map.size() >= m_prefetch_max_blocks_in_flight)
722 {
723 m_prefetch_state = kHold;
724 cache()->DeRegisterPrefetchFile(this);
725 }
726 }
727 else
728 {
729 TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
730 }
731 }
732
733 return b;
734}
735
736void File::ProcessBlockRequest(Block *b)
737{
738 // This *must not* be called with block_map locked.
739
741
742 if (XRD_TRACE What >= TRACE_Dump) {
743 char buf[256];
744 snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
745 b->get_offset()/m_block_size, (void*)b, b->m_prefetch, b->get_offset(), b->get_req_size(), (void*)b->get_buff(), (void*)brh);
746 TRACEF(Dump, "ProcessBlockRequest() " << buf);
747 }
748
749 if (b->req_cksum_net())
750 {
751 b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
752 b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
753 } else {
754 b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
755 }
756}
757
758void File::ProcessBlockRequests(BlockList_t& blks)
759{
760 // This *must not* be called with block_map locked.
761
762 for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
763 {
764 ProcessBlockRequest(*bi);
765 }
766}
767
768//------------------------------------------------------------------------------
769
770void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
771{
772 int n_chunks = ioVec.size();
773 int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
774
775 TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
776 ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
777
778 DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
779
780 int pos = 0;
781 while (n_chunks > XrdProto::maxRvecsz) {
782 io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
783 pos += XrdProto::maxRvecsz;
784 n_chunks -= XrdProto::maxRvecsz;
785 }
786 io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
787}
788
789//------------------------------------------------------------------------------
790
791int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
792{
793 TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
794
795 long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
796
797 if (rs < 0)
798 {
799 TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
800 return rs;
801 }
802
803 if (rs != expected_size)
804 {
805 TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
806 return -EIO;
807 }
808
809 return (int) rs;
810}
811
812//------------------------------------------------------------------------------
813
814int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
815{
816 // rrc_func is ONLY called from async processing.
817 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
818 // This streamlines implementation of synchronous IO::Read().
819
820 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
821
822 m_state_cond.Lock();
823
824 if (m_in_shutdown || io->m_in_detach)
825 {
826 m_state_cond.UnLock();
827 return m_in_shutdown ? -ENOENT : -EBADF;
828 }
829
830 // Shortcut -- file is fully downloaded.
831
832 if (m_cfi.IsComplete())
833 {
834 m_state_cond.UnLock();
835 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
836 if (ret > 0) {
837 XrdSysCondVarHelper _lck(m_state_cond);
838 m_delta_stats.AddBytesHit(ret);
839 check_delta_stats();
840 }
841 return ret;
842 }
843
844 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
845
846 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
847}
848
849//------------------------------------------------------------------------------
850
851int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
852{
853 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
854
855 m_state_cond.Lock();
856
857 if (m_in_shutdown || io->m_in_detach)
858 {
859 m_state_cond.UnLock();
860 return m_in_shutdown ? -ENOENT : -EBADF;
861 }
862
863 // Shortcut -- file is fully downloaded.
864
865 if (m_cfi.IsComplete())
866 {
867 m_state_cond.UnLock();
868 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
869 if (ret > 0) {
870 XrdSysCondVarHelper _lck(m_state_cond);
871 m_delta_stats.AddBytesHit(ret);
872 check_delta_stats();
873 }
874 return ret;
875 }
876
877 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
878}
879
880//------------------------------------------------------------------------------
881
882int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
883 ReadReqRH *rh, const char *tpfx)
884{
885 // Non-trivial processing for Read and ReadV.
886 // Entered under lock.
887 //
888 // loop over reqired blocks:
889 // - if on disk, ok;
890 // - if in ram or incoming, inc ref-count
891 // - otherwise request and inc ref count (unless RAM full => request direct)
892 // unlock
893
894 int prefetch_cnt = 0;
895
896 ReadRequest *read_req = nullptr;
897 BlockList_t blks_to_request; // blocks we are issuing a new remote request for
898
899 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
900
901 std::vector<XrdOucIOVec> iovec_disk;
902 std::vector<XrdOucIOVec> iovec_direct;
903 int iovec_disk_total = 0;
904 int iovec_direct_total = 0;
905
906 for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
907 {
908 const XrdOucIOVec &iov = readV[iov_idx];
909 long long iUserOff = iov.offset;
910 int iUserSize = iov.size;
911 char *iUserBuff = iov.data;
912
913 const int idx_first = iUserOff / m_block_size;
914 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
915
916 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
917
918 enum LastBlock_e { LB_other, LB_disk, LB_direct };
919
920 LastBlock_e lbe = LB_other;
921
922 for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
923 {
924 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
925 BlockMap_i bi = m_block_map.find(block_idx);
926
927 // overlap and read
928 long long off; // offset in user buffer
929 long long blk_off; // offset in block
930 int size; // size to copy
931
932 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
933
934 // In RAM or incoming?
935 if (bi != m_block_map.end())
936 {
937 inc_ref_count(bi->second);
938 TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
939
940 if (bi->second->is_finished())
941 {
942 // note, blocks with error should not be here !!!
943 // they should be either removed or reissued in ProcessBlockResponse()
944 assert(bi->second->is_ok());
945
946 blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
947
948 if (bi->second->m_prefetch)
949 ++prefetch_cnt;
950 }
951 else
952 {
953 if ( ! read_req)
954 read_req = new ReadRequest(io, rh);
955
956 // We have a lock on state_cond --> as we register the request before releasing the lock,
957 // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
958
959 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
960 ++read_req->m_n_chunk_reqs;
961 }
962
963 lbe = LB_other;
964 }
965 // On disk?
966 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
967 {
968 TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
969
970 if (lbe == LB_disk)
971 iovec_disk.back().size += size;
972 else
973 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
974 iovec_disk_total += size;
975
976 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
977 ++prefetch_cnt;
978
979 lbe = LB_disk;
980 }
981 // Neither ... then we have to go get it ...
982 else
983 {
984 if ( ! read_req)
985 read_req = new ReadRequest(io, rh);
986
987 // Is there room for one more RAM Block?
988 Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
989 if (b)
990 {
991 TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
992 inc_ref_count(b);
993 blks_to_request.push_back(b);
994
995 b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
996 ++read_req->m_n_chunk_reqs;
997
998 lbe = LB_other;
999 }
1000 else // Nope ... read this directly without caching.
1001 {
1002 TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
1003
1004 iovec_direct_total += size;
1005 read_req->m_direct_done = false;
1006
1007 // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
1008 // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
1009 // is determined in the RequestBlocksDirect().
1010 if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
1011 iovec_direct.back().size += size;
1012 } else {
1013 long long in_offset = block_idx * m_block_size + blk_off;
1014 char *out_pos = iUserBuff + off;
1015 while (size > XrdProto::maxRVdsz) {
1016 iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
1017 in_offset += XrdProto::maxRVdsz;
1018 out_pos += XrdProto::maxRVdsz;
1019 size -= XrdProto::maxRVdsz;
1020 }
1021 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
1022 }
1023
1024 lbe = LB_direct;
1025 }
1026 }
1027 } // end for over blocks in an IOVec
1028 } // end for over readV IOVec
1029
1030 inc_prefetch_hit_cnt(prefetch_cnt);
1031
1032 m_state_cond.UnLock();
1033
1034 // First, send out remote requests for new blocks.
1035 if ( ! blks_to_request.empty())
1036 {
1037 ProcessBlockRequests(blks_to_request);
1038 blks_to_request.clear();
1039 }
1040
1041 // Second, send out remote direct read requests.
1042 if ( ! iovec_direct.empty())
1043 {
1044 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
1045
1046 TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
1047 }
1048
1049 // Begin synchronous part where we process data that is already in RAM or on disk.
1050
1051 long long bytes_read = 0;
1052 int error_cond = 0; // to be set to -errno
1053
1054 // Third, process blocks that are available in RAM.
1055 if ( ! blks_ready.empty())
1056 {
1057 for (auto &bvi : blks_ready)
1058 {
1059 for (auto &cr : bvi.second)
1060 {
1061 TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
1062 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
1063 bytes_read += cr.m_size;
1064 }
1065 }
1066 }
1067
1068 // Fourth, read blocks from disk.
1069 if ( ! iovec_disk.empty())
1070 {
1071 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1072 TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
1073 if (rc >= 0)
1074 {
1075 bytes_read += rc;
1076 }
1077 else
1078 {
1079 error_cond = rc;
1080 TRACEF(Error, tpfx << "failed read from disk");
1081 }
1082 }
1083
1084 // End synchronous part -- update with sync stats and determine actual state of this read.
1085 // Note: remote reads might have already finished during disk-read!
1086
1087 m_state_cond.Lock();
1088
1089 for (auto &bvi : blks_ready)
1090 dec_ref_count(bvi.first, (int) bvi.second.size());
1091
1092 if (read_req)
1093 {
1094 read_req->m_bytes_read += bytes_read;
1095 if (error_cond)
1096 read_req->update_error_cond(error_cond);
1097 read_req->m_stats.m_BytesHit += bytes_read;
1098 read_req->m_sync_done = true;
1099
1100 if (read_req->is_complete())
1101 {
1102 // Almost like FinalizeReadRequest(read_req) -- but no callout!
1103 m_delta_stats.AddReadStats(read_req->m_stats);
1104 check_delta_stats();
1105 m_state_cond.UnLock();
1106
1107 int ret = read_req->return_value();
1108 delete read_req;
1109 return ret;
1110 }
1111 else
1112 {
1113 m_state_cond.UnLock();
1114 return -EWOULDBLOCK;
1115 }
1116 }
1117 else
1118 {
1119 m_delta_stats.m_BytesHit += bytes_read;
1120 check_delta_stats();
1121 m_state_cond.UnLock();
1122
1123 // !!! No callout.
1124
1125 return error_cond ? error_cond : bytes_read;
1126 }
1127}
1128
1129
1130//==============================================================================
1131// WriteBlock and Sync
1132//==============================================================================
1133
1135{
1136 // write block buffer into disk file
1137 long long offset = b->m_offset - m_offset;
1138 long long size = b->get_size();
1139 ssize_t retval;
1140
1141 if (m_cfi.IsCkSumCache())
1142 if (b->has_cksums())
1143 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1144 else
1145 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1146 else
1147 retval = m_data_file->Write(b->get_buff(), offset, size);
1148
1149 if (retval < size)
1150 {
1151 if (retval < 0) {
1152 TRACEF(Error, "WriteToDisk() write error " << retval);
1153 } else {
1154 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1155 }
1156
1157 XrdSysCondVarHelper _lck(m_state_cond);
1158
1159 dec_ref_count(b);
1160
1161 return;
1162 }
1163
1164 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1165
1166 // Set written bit.
1167 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1168
1169 bool schedule_sync = false;
1170 {
1171 XrdSysCondVarHelper _lck(m_state_cond);
1172
1173 m_cfi.SetBitWritten(blk_idx);
1174
1175 if (b->m_prefetch)
1176 {
1177 m_cfi.SetBitPrefetch(blk_idx);
1178 }
1179 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1180 {
1181 m_cfi.ResetCkSumNet();
1182 }
1183
1184 // Set synced bit or stash block index if in actual sync.
1185 // Synced state is only written out to cinfo file when data file is synced.
1186 if (m_in_sync)
1187 {
1188 m_writes_during_sync.push_back(blk_idx);
1189 }
1190 else
1191 {
1192 m_cfi.SetBitSynced(blk_idx);
1193 ++m_non_flushed_cnt;
1194 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1195 ! m_in_shutdown)
1196 {
1197 schedule_sync = true;
1198 m_in_sync = true;
1199 m_non_flushed_cnt = 0;
1200 }
1201 }
1202 // As soon as the reference count is decreased on the block, the
1203 // file object may be deleted. Thus, to avoid holding both locks at a time,
1204 // we defer the ref count decrease until later if a sync is needed
1205 if (!schedule_sync) {
1206 dec_ref_count(b);
1207 }
1208 }
1209
1210 if (schedule_sync)
1211 {
1212 cache()->ScheduleFileSync(this);
1213 XrdSysCondVarHelper _lck(m_state_cond);
1214 dec_ref_count(b);
1215 }
1216}
1217
1218//------------------------------------------------------------------------------
1219
1221{
1222 TRACEF(Dump, "Sync()");
1223
1224 int ret = m_data_file->Fsync();
1225 bool errorp = false;
1226 if (ret == XrdOssOK)
1227 {
1228 Stats loc_stats;
1229 {
1230 XrdSysCondVarHelper _lck(&m_state_cond);
1231 report_and_merge_delta_stats();
1232 loc_stats = m_stats;
1233 }
1234 m_cfi.WriteIOStat(loc_stats);
1235 m_cfi.Write(m_info_file, m_filename.c_str());
1236 int cret = m_info_file->Fsync();
1237 if (cret != XrdOssOK)
1238 {
1239 TRACEF(Error, "Sync cinfo file sync error " << cret);
1240 errorp = true;
1241 }
1242 }
1243 else
1244 {
1245 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1246 errorp = true;
1247 }
1248
1249 if (errorp)
1250 {
1251 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1252
1253 // Unlink will also call this->initiate_emergency_shutdown()
1254 Cache::GetInstance().UnlinkFile(m_filename, false);
1255
1256 XrdSysCondVarHelper _lck(&m_state_cond);
1257
1258 m_writes_during_sync.clear();
1259 m_in_sync = false;
1260
1261 return;
1262 }
1263
1264 int written_while_in_sync;
1265 bool resync = false;
1266 {
1267 XrdSysCondVarHelper _lck(&m_state_cond);
1268 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1269 {
1270 m_cfi.SetBitSynced(*i);
1271 }
1272 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1273 m_writes_during_sync.clear();
1274
1275 // If there were writes during sync and the file is now complete,
1276 // let us call Sync again without resetting the m_in_sync flag.
1277 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1278 resync = true;
1279 else
1280 m_in_sync = false;
1281 }
1282 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1283
1284 if (resync)
1285 Sync();
1286}
1287
1288
1289//==============================================================================
1290// Block processing
1291//==============================================================================
1292
1293void File::free_block(Block* b)
1294{
1295 // Method always called under lock.
1296 int i = b->m_offset / m_block_size;
1297 TRACEF(Dump, "free_block block " << b << " idx = " << i);
1298 size_t ret = m_block_map.erase(i);
1299 if (ret != 1)
1300 {
1301 // assert might be a better option than a warning
1302 TRACEF(Error, "free_block did not erase " << i << " from map");
1303 }
1304 else
1305 {
1306 cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1307 delete b;
1308 }
1309
1310 if (m_prefetch_state == kHold && (int) m_block_map.size() < m_prefetch_max_blocks_in_flight)
1311 {
1312 m_prefetch_state = kOn;
1313 cache()->RegisterPrefetchFile(this);
1314 }
1315}
1316
1317//------------------------------------------------------------------------------
1318
1319bool File::select_current_io_or_disable_prefetching(bool skip_current)
1320{
1321 // Method always called under lock. It also expects prefetch to be active.
1322
1323 int io_size = (int) m_io_set.size();
1324 bool io_ok = false;
1325
1326 if (io_size == 1)
1327 {
1328 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1329 if (io_ok)
1330 {
1331 m_current_io = m_io_set.begin();
1332 }
1333 }
1334 else if (io_size > 1)
1335 {
1336 IoSet_i mi = m_current_io;
1337 if (skip_current && mi != m_io_set.end()) ++mi;
1338
1339 for (int i = 0; i < io_size; ++i)
1340 {
1341 if (mi == m_io_set.end()) mi = m_io_set.begin();
1342
1343 if ((*mi)->m_allow_prefetching)
1344 {
1345 m_current_io = mi;
1346 io_ok = true;
1347 break;
1348 }
1349 ++mi;
1350 }
1351 }
1352
1353 if ( ! io_ok)
1354 {
1355 m_current_io = m_io_set.end();
1356 m_prefetch_state = kStopped;
1357 cache()->DeRegisterPrefetchFile(this);
1358 }
1359
1360 return io_ok;
1361}
1362
1363//------------------------------------------------------------------------------
1364
1365void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1366{
1367 // Called from DirectResponseHandler.
1368 // NOT under lock.
1369
1370 if (error_cond)
1371 TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1372
1373 m_state_cond.Lock();
1374
1375 if (error_cond)
1376 rreq->update_error_cond(error_cond);
1377 else {
1378 rreq->m_stats.m_BytesBypassed += bytes_read;
1379 rreq->m_bytes_read += bytes_read;
1380 }
1381
1382 rreq->m_direct_done = true;
1383
1384 bool rreq_complete = rreq->is_complete();
1385
1386 m_state_cond.UnLock();
1387
1388 if (rreq_complete)
1389 FinalizeReadRequest(rreq);
1390}
1391
1392void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1393{
1394 // Called from ProcessBlockResponse().
1395 // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1396 // Does not manage m_read_req.
1397 // Will not complete the request.
1398
1399 TRACEF(Debug, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1400 " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1401
1402 rreq->update_error_cond(b->get_error());
1403 --rreq->m_n_chunk_reqs;
1404
1405 dec_ref_count(b);
1406}
1407
1408void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1409{
1410 // Called from ProcessBlockResponse().
1411 // NOT under lock as it does memcopy ofor exisf block data.
1412 // Acquires lock for block, m_read_req and rreq state update.
1413
1414 ReadRequest *rreq = creq.m_read_req;
1415
1416 TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1417 memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1418
1419 m_state_cond.Lock();
1420
1421 rreq->m_bytes_read += creq.m_size;
1422
1423 if (b->get_req_id() == (void*) rreq)
1424 rreq->m_stats.m_BytesMissed += creq.m_size;
1425 else
1426 rreq->m_stats.m_BytesHit += creq.m_size;
1427
1428 --rreq->m_n_chunk_reqs;
1429
1430 if (b->m_prefetch)
1431 inc_prefetch_hit_cnt(1);
1432
1433 dec_ref_count(b);
1434
1435 bool rreq_complete = rreq->is_complete();
1436
1437 m_state_cond.UnLock();
1438
1439 if (rreq_complete)
1440 FinalizeReadRequest(rreq);
1441}
1442
1443void File::FinalizeReadRequest(ReadRequest *rreq)
1444{
1445 // called from ProcessBlockResponse()
1446 // NOT under lock -- does callout
1447 {
1448 XrdSysCondVarHelper _lck(m_state_cond);
1449 m_delta_stats.AddReadStats(rreq->m_stats);
1450 check_delta_stats();
1451 }
1452
1453 rreq->m_rh->Done(rreq->return_value());
1454 delete rreq;
1455}
1456
1457void File::ProcessBlockResponse(Block *b, int res)
1458{
1459 static const char* tpfx = "ProcessBlockResponse ";
1460
1461 TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1462
1463 if (res >= 0 && res != b->get_size())
1464 {
1465 // Incorrect number of bytes received, apparently size of the file on the remote
1466 // is different than what the cache expects it to be.
1467 TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1468 Cache::GetInstance().UnlinkFile(m_filename, false);
1469 }
1470
1471 m_state_cond.Lock();
1472
1473 // Deregister block from IO's prefetch count, if needed.
1474 if (b->m_prefetch)
1475 {
1476 IO *io = b->get_io();
1477 IoSet_i mi = m_io_set.find(io);
1478 if (mi != m_io_set.end())
1479 {
1480 --io->m_active_prefetches;
1481
1482 // If failed and IO is still prefetching -- disable prefetching on this IO.
1483 if (res < 0 && io->m_allow_prefetching)
1484 {
1485 TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1486 io->m_allow_prefetching = false;
1487
1488 // Check if any IO is still available for prfetching. If not, stop it.
1489 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1490 {
1491 if ( ! select_current_io_or_disable_prefetching(false) )
1492 {
1493 TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1494 }
1495 }
1496 }
1497
1498 // If failed with no subscribers -- delete the block and exit.
1499 if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1500 {
1501 free_block(b);
1502 m_state_cond.UnLock();
1503 return;
1504 }
1505 m_prefetch_bytes += b->get_size();
1506 }
1507 else
1508 {
1509 TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1510 }
1511 }
1512
1513 if (res == b->get_size())
1514 {
1515 b->set_downloaded();
1516 TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1517 if ( ! m_in_shutdown)
1518 {
1519 // Increase ref-count for the writer.
1520 inc_ref_count(b);
1521 m_delta_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1522 // No check for writes, report-and-merge forced during Sync().
1523 cache()->AddWriteTask(b, true);
1524 }
1525
1526 // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1527 vChunkRequest_t creqs_to_notify;
1528 creqs_to_notify.swap( b->m_chunk_reqs );
1529
1530 m_state_cond.UnLock();
1531
1532 for (auto &creq : creqs_to_notify)
1533 {
1534 ProcessBlockSuccess(b, creq);
1535 }
1536 }
1537 else
1538 {
1539 if (res < 0) {
1540 bool new_error = b->get_io()->register_block_error(res);
1541 int tlvl = new_error ? TRACE_Error : TRACE_Debug;
1542 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1543 << ", io=" << b->get_io() << ", error=" << res);
1544 } else {
1545 bool first_p = b->get_io()->register_incomplete_read();
1546 int tlvl = first_p ? TRACE_Error : TRACE_Debug;
1547 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1548 << ", io=" << b->get_io() << " incomplete, got " << res << " expected " << b->get_size());
1549#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1550 res = -EIO;
1551#else
1552 res = -EREMOTEIO;
1553#endif
1554 }
1555 b->set_error(res);
1556
1557 // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1558 // Collect others with a different IO, the first of them will be used to reissue the request.
1559 // This is then done outside of lock.
1560 std::list<ReadRequest*> rreqs_to_complete;
1561 vChunkRequest_t creqs_to_keep;
1562
1563 for(ChunkRequest &creq : b->m_chunk_reqs)
1564 {
1565 ReadRequest *rreq = creq.m_read_req;
1566
1567 if (rreq->m_io == b->get_io())
1568 {
1569 ProcessBlockError(b, rreq);
1570 if (rreq->is_complete())
1571 {
1572 rreqs_to_complete.push_back(rreq);
1573 }
1574 }
1575 else
1576 {
1577 creqs_to_keep.push_back(creq);
1578 }
1579 }
1580
1581 bool reissue = false;
1582 if ( ! creqs_to_keep.empty())
1583 {
1584 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1585
1586 TRACEF(Debug, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1587 b->get_io() << " - reissuing request with my io " << rreq->m_io);
1588
1589 b->reset_error_and_set_io(rreq->m_io, rreq);
1590 b->m_chunk_reqs.swap( creqs_to_keep );
1591 reissue = true;
1592 }
1593
1594 m_state_cond.UnLock();
1595
1596 for (auto rreq : rreqs_to_complete)
1597 FinalizeReadRequest(rreq);
1598
1599 if (reissue)
1600 ProcessBlockRequest(b);
1601 }
1602}
1603
1604//------------------------------------------------------------------------------
1605
1606const char* File::lPath() const
1607{
1608 return m_filename.c_str();
1609}
1610
1611//------------------------------------------------------------------------------
1612
1613int File::offsetIdx(int iIdx) const
1614{
1615 return iIdx - m_offset/m_block_size;
1616}
1617
1618
1619//------------------------------------------------------------------------------
1620
1622{
1623 // Check that block is not on disk and not in RAM.
1624 // TODO: Could prefetch several blocks at once!
1625 // blks_max could be an argument
1626
1627 BlockList_t blks;
1628
1629 TRACEF(DumpXL, "Prefetch() entering.");
1630 {
1631 XrdSysCondVarHelper _lck(m_state_cond);
1632
1633 if (m_prefetch_state != kOn)
1634 {
1635 return;
1636 }
1637
1638 if ( ! select_current_io_or_disable_prefetching(true) )
1639 {
1640 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1641 return;
1642 }
1643
1644 // Select block(s) to fetch.
1645 for (int f = 0; f < m_num_blocks; ++f)
1646 {
1647 if ( ! m_cfi.TestBitWritten(f))
1648 {
1649 int f_act = f + m_offset / m_block_size;
1650
1651 BlockMap_i bi = m_block_map.find(f_act);
1652 if (bi == m_block_map.end())
1653 {
1654 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1655 if (b)
1656 {
1657 TRACEF(Dump, "Prefetch take block " << f_act);
1658 blks.push_back(b);
1659 // Note: block ref_cnt not increased, it will be when placed into write queue.
1660
1661 inc_prefetch_read_cnt(1);
1662 }
1663 else
1664 {
1665 // This shouldn't happen as prefetching stops when RAM is 70% full.
1666 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1667 }
1668 break;
1669 }
1670 }
1671 }
1672
1673 if (blks.empty())
1674 {
1675 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1676 m_prefetch_state = kComplete;
1677 cache()->DeRegisterPrefetchFile(this);
1678 }
1679 else
1680 {
1681 (*m_current_io)->m_active_prefetches += (int) blks.size();
1682 }
1683 }
1684
1685 if ( ! blks.empty())
1686 {
1687 ProcessBlockRequests(blks);
1688 }
1689}
1690
1691
1692//------------------------------------------------------------------------------
1693
1695{
1696 return m_prefetch_score;
1697}
1698
1700{
1701 return Cache::TheOne().GetLog();
1702}
1703
1705{
1706 return Cache::TheOne().GetTrace();
1707}
1708
1709void File::insert_remote_location(const std::string &loc)
1710{
1711 if ( ! loc.empty())
1712 {
1713 size_t p = loc.find_first_of('@');
1714 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1715 }
1716}
1717
1718std::string File::GetRemoteLocations() const
1719{
1720 std::string s;
1721 if ( ! m_remote_locations.empty())
1722 {
1723 size_t sl = 0;
1724 int nl = 0;
1725 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1726 {
1727 sl += i->size();
1728 }
1729 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1730 s = '[';
1731 int j = 1;
1732 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1733 {
1734 s += '"'; s += *i; s += '"';
1735 if (j < nl) s += ',';
1736 }
1737 s += ']';
1738 }
1739 else
1740 {
1741 s = "[]";
1742 }
1743 return s;
1744}
1745
1746//==============================================================================
1747//======================= RESPONSE HANDLERS ==============================
1748//==============================================================================
1749
1751{
1752 m_block->m_file->ProcessBlockResponse(m_block, res);
1753 delete this;
1754}
1755
1756//------------------------------------------------------------------------------
1757
1759{
1760 m_mutex.Lock();
1761
1762 int n_left = --m_to_wait;
1763
1764 if (res < 0) {
1765 if (m_errno == 0) m_errno = res; // store first reported error
1766 } else {
1767 m_bytes_read += res;
1768 }
1769
1770 m_mutex.UnLock();
1771
1772 if (n_left == 0)
1773 {
1774 m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1775 delete this;
1776 }
1777}
#define TRACE_Debug
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define TRACE_Error
Definition XrdPfcTrace.hh:7
#define TRACE_Dump
#define TRACEF(act, x)
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
#define stat(a, b)
Definition XrdPosix.hh:101
#define XRD_TRACE
bool Debug
XrdOucString File
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int Fstat(struct stat *buf)
Definition XrdOss.hh:136
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual const char * Path()=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
void Done(int result) override
int get_size() const
int get_error() const
int get_n_cksum_errors()
int * ptr_n_cksum_errors()
IO * get_io() const
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void set_error(int err)
void * get_req_id() const
void set_downloaded()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
long long m_offset
void reset_error_and_set_io(IO *io, void *rid)
int get_req_size() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:163
XrdSysError * GetLog() const
Definition XrdPfc.hh:294
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:215
static ResourceMonitor & ResMon()
Definition XrdPfc.cc:135
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
XrdSysTrace * GetTrace() const
Definition XrdPfc.hh:295
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1188
static const Cache & TheOne()
Definition XrdPfc.cc:133
XrdOss * GetOss() const
Definition XrdPfc.hh:280
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void Sync()
Sync file cache inf o and output data with disk.
XrdSysTrace * GetTrace() const
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
void RemoveIO(IO *io)
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
XrdSysError * GetLog() const
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close().
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:16
bool register_incomplete_read()
Definition XrdPfcIO.hh:90
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:31
bool register_block_error(int res)
Definition XrdPfcIO.hh:93
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:70
const char * GetLocation()
Definition XrdPfcIO.hh:44
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:41
static const char * s_infoExtension
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
long long BytesReadAndWritten() const
long long m_BytesHit
number of bytes served from disk
std::list< Block * > BlockList_t
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
XrdPosixStats Stats
static const int maxRVdsz
Definition XProtocol.hh:688
static const int maxRvecsz
Definition XProtocol.hh:686
long long offset
ReadRequest * m_read_req
Definition XrdPfcFile.hh:91
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:64
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
Definition XrdPfc.hh:115
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
Definition XrdPfc.hh:116
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition XrdPfc.hh:80
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
Definition XrdPfc.hh:119
CkSumCheck_e get_cs_Chk() const
Definition XrdPfc.hh:73
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
Definition XrdPfc.hh:112
bool should_uvkeep_purge(time_t delta) const
Definition XrdPfc.hh:82
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:88
long long m_bufferSize
cache block size, default 128 kB
Definition XrdPfc.hh:107
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
Definition XrdPfc.hh:114
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:89
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
Definition XrdPfc.hh:117
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:87
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
Definition XrdPfc.hh:118
unsigned short m_seq_id
Definition XrdPfcFile.hh:53
void update_error_cond(int ec)
Definition XrdPfcFile.hh:81
bool is_complete() const
Definition XrdPfcFile.hh:83
int return_value() const
Definition XrdPfcFile.hh:84
long long m_bytes_read
Definition XrdPfcFile.hh:68