XRootD
Loading...
Searching...
No Matches
XrdPfcFile.hh
Go to the documentation of this file.
1#ifndef __XRDPFC_FILE_HH__
2#define __XRDPFC_FILE_HH__
3//----------------------------------------------------------------------------------
4// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
5// Author: Alja Mrak-Tadel, Matevz Tadel
6//----------------------------------------------------------------------------------
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//----------------------------------------------------------------------------------
20
21#include "XrdPfcTypes.hh"
22#include "XrdPfcInfo.hh"
23#include "XrdPfcStats.hh"
24
25#include "XrdOuc/XrdOucCache.hh"
26#include "XrdOuc/XrdOucIOVec.hh"
27
28#include <functional>
29#include <list>
30#include <map>
31#include <set>
32#include <string>
33
34class XrdJob;
35struct XrdOucIOVec;
36
37namespace XrdPfc
38{
39class File;
42class IO;
43
44struct ReadVBlockListRAM;
45struct ReadVChunkListRAM;
46struct ReadVBlockListDisk;
47struct ReadVChunkListDisk;
48
50{
52 int m_n_chunks = 0; // Only set for ReadV().
53 unsigned short m_seq_id;
54 XrdOucCacheIOCB *m_iocb; // External callback passed into IO::Read().
55
56 ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb) :
57 m_seq_id(sid), m_iocb(iocb)
58 {}
59};
60
61// -------------------------------------------------------------
62
64{
66 ReadReqRH *m_rh; // Internal callback created in IO::Read().
67
68 long long m_bytes_read = 0;
69 int m_error_cond = 0; // to be set to -errno
72
74 bool m_sync_done = false;
75 bool m_direct_done = true;
76
78 m_io(io), m_rh(rh)
79 {}
80
82
83 bool is_complete() const { return m_n_chunk_reqs == 0 && m_sync_done && m_direct_done; }
85};
86
87// -------------------------------------------------------------
88
90{
92 char *m_buf; // Where to place the data chunk.
93 long long m_off; // Offset *within* the corresponding block.
94 int m_size; // Size of the data chunk.
95
96 ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size) :
97 m_read_req(rreq), m_buf(buf), m_off(off), m_size(size)
98 {}
99};
100
101using vChunkRequest_t = std::vector<ChunkRequest>;
102using vChunkRequest_i = std::vector<ChunkRequest>::iterator;
103
104// ================================================================
105
106class Block
107{
108public:
110 IO *m_io; // IO that handled current request, used for == / != comparisons only
111 void *m_req_id; // Identity of requestor -- used for stats.
112
113 char *m_buff;
114 long long m_offset;
118 int m_errno; // stores negative errno
124
126
127 Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize,
128 bool m_prefetch, bool cks_net) :
129 m_file(f), m_io(io), m_req_id(rid),
130 m_buff(buf), m_offset(off), m_size(size), m_req_size(rsize),
133 {}
134
135 char* get_buff() const { return m_buff; }
136 int get_size() const { return m_size; }
137 int get_req_size() const { return m_req_size; }
138 long long get_offset() const { return m_offset; }
139
140 File* get_file() const { return m_file; }
141 IO* get_io() const { return m_io; }
142 void* get_req_id() const { return m_req_id; }
143
144 bool is_finished() const { return m_downloaded || m_errno != 0; }
145 bool is_ok() const { return m_downloaded; }
146 bool is_failed() const { return m_errno != 0; }
147
148 void set_downloaded() { m_downloaded = true; }
149 void set_error(int err) { m_errno = err; }
150 int get_error() const { return m_errno; }
151
152 void reset_error_and_set_io(IO *io, void *rid)
153 {
154 m_errno = 0;
155 m_io = io;
156 m_req_id = rid;
157 }
158
159 bool req_cksum_net() const { return m_req_cksum_net; }
160 bool has_cksums() const { return ! m_cksum_vec.empty(); }
164};
165
166using BlockList_t = std::list<Block*>;
167using BlockList_i = std::list<Block*>::iterator;
168
169// ================================================================
170
172{
173public:
175
177
178 void Done(int result) override;
179};
180
181// ----------------------------------------------------------------
182
184{
185public:
191 int m_errno = 0;
192
193 DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait) :
194 m_file(file), m_read_req(rreq), m_to_wait(to_wait)
195 {}
196
197 void Done(int result) override;
198};
199
200// ================================================================
201
202class File
203{
204 friend class Cache;
207public:
208 // Constructor, destructor, Open() and Close() are private.
209
211 static File* FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO* inputIO);
212
215
217 void BlocksRemovedFromWriteQ(std::list<Block*>&);
218
220 int Read(IO *io, char* buff, long long offset, int size, ReadReqRH *rh);
221
223 int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh);
224
225 //----------------------------------------------------------------------
227 //----------------------------------------------------------------------
228 void ioUpdated(IO *io);
229
230 //----------------------------------------------------------------------
233 //----------------------------------------------------------------------
234 bool ioActive(IO *io);
235
236 //----------------------------------------------------------------------
239 //----------------------------------------------------------------------
241
242 //----------------------------------------------------------------------
245 //----------------------------------------------------------------------
247
248 //----------------------------------------------------------------------
250 //----------------------------------------------------------------------
251 void Sync();
252
253 void WriteBlockToDisk(Block* b);
254
255 void Prefetch();
256
257 float GetPrefetchScore() const;
258
260 const char* lPath() const;
261
262 const std::string& GetLocalPath() const { return m_filename; }
263
264 XrdSysError* GetLog() const;
265 XrdSysTrace* GetTrace() const;
266
267 long long GetFileSize() const { return m_file_size; }
268
269 void AddIO(IO *io);
272 void RemoveIO(IO *io);
273
274 std::string GetRemoteLocations() const;
275 const Info::AStat* GetLastAccessStats() const { return m_cfi.GetLastAccessStats(); }
276 size_t GetAccessCnt() const { return m_cfi.GetAccessCnt(); }
277 int GetBlockSize() const { return m_cfi.GetBufferSize(); }
278 int GetNBlocks() const { return m_cfi.GetNBlocks(); }
279 int GetNDownloadedBlocks() const { return m_cfi.GetNDownloadedBlocks(); }
280 long long GetPrefetchedBytes() const { return m_prefetch_bytes; }
281 const Stats& RefStats() const { return m_stats; }
282
283 int Fstat(struct stat &sbuff);
284
285 // These three methods are called under Cache's m_active lock
286 int get_ref_cnt() { return m_ref_cnt; }
287 int inc_ref_cnt() { return ++m_ref_cnt; }
288 int dec_ref_cnt() { return --m_ref_cnt; }
289
290 long long initiate_emergency_shutdown();
291 bool is_in_emergency_shutdown() { return m_in_shutdown; }
292
293private:
295 File(const std::string &path, long long offset, long long fileSize);
296
298 ~File();
299
301 void Close();
302
304 bool Open(XrdOucCacheIO* inputIO);
305 void parse_pfc_url_args(XrdOucCacheIO* inputIO, long long &pfc_blocksize, int &pfc_prefetch) const;
306
307 static const char *m_traceID;
308
309 int m_ref_cnt;
310
311 XrdOssDF *m_data_file;
312 XrdOssDF *m_info_file;
313 Info m_cfi;
314
315 const std::string m_filename;
316 const long long m_offset;
317 const long long m_file_size;
318
319 // IO objects attached to this file.
320
321 typedef std::set<IO*> IoSet_t;
322 typedef IoSet_t::iterator IoSet_i;
323
324 IoSet_t m_io_set;
325 IoSet_i m_current_io;
326 int m_ios_in_detach;
327
328 // FSync
329
330 std::vector<int> m_writes_during_sync;
331 int m_non_flushed_cnt;
332 bool m_in_sync;
333 bool m_detach_time_logged;
334 bool m_in_shutdown;
335
336 // Block state and management
337
338 typedef std::list<int> IntList_t;
339 typedef IntList_t::iterator IntList_i;
340
341 typedef std::map<int, Block*> BlockMap_t;
342 typedef BlockMap_t::iterator BlockMap_i;
343
344 BlockMap_t m_block_map;
345 XrdSysCondVar m_state_cond;
346 long long m_block_size;
347 int m_num_blocks;
348
349 // Stats and ResourceMonitor interface
350
351 Stats m_stats;
352 Stats m_delta_stats;
353 long long m_st_blocks;
354 long long m_resmon_report_threshold;
355 int m_resmon_token;
356
357 void check_delta_stats();
358 void report_and_merge_delta_stats();
359
360 std::set<std::string> m_remote_locations;
361 void insert_remote_location(const std::string &loc);
362
363 // Prefetch
364
365 enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
366
367 PrefetchState_e m_prefetch_state;
368 int m_prefetch_max_blocks_in_flight;
369
370 long long m_prefetch_bytes;
371 int m_prefetch_read_cnt;
372 int m_prefetch_hit_cnt;
373 float m_prefetch_score; // cached
374
375 void inc_prefetch_read_cnt(int prc) { if (prc) { m_prefetch_read_cnt += prc; calc_prefetch_score(); } }
376 void inc_prefetch_hit_cnt (int phc) { if (phc) { m_prefetch_hit_cnt += phc; calc_prefetch_score(); } }
377 void calc_prefetch_score() { m_prefetch_score = float(m_prefetch_hit_cnt) / m_prefetch_read_cnt; }
378
379 // Helpers
380
381 bool overlap(int blk, // block to query
382 long long blk_size, //
383 long long req_off, // offset of user request
384 int req_size, // size of user request
385 // output:
386 long long &off, // offset in user buffer
387 long long &blk_off, // offset in block
388 int &size);
389
390 // Read & ReadV
391
392 Block* PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch);
393
394 void ProcessBlockRequest (Block *b);
395 void ProcessBlockRequests(BlockList_t& blks);
396
397 void RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size);
398
399 int ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size);
400
401 int ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
402 ReadReqRH *rh, const char *tpfx);
403
404 void ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond);
405 void ProcessBlockError(Block *b, ReadRequest *rreq);
406 void ProcessBlockSuccess(Block *b, ChunkRequest &creq);
407 void FinalizeReadRequest(ReadRequest *rreq);
408
409 void ProcessBlockResponse(Block *b, int res);
410
411 // Block management
412
413 void inc_ref_count(Block* b);
414 void dec_ref_count(Block* b, int count = 1);
415 void free_block(Block*);
416
417 bool select_current_io_or_disable_prefetching(bool skip_current);
418
419 int offsetIdx(int idx) const;
420};
421
422//------------------------------------------------------------------------------
423
424inline void File::inc_ref_count(Block* b)
425{
426 // Method always called under lock.
427 b->m_refcnt++;
428}
429
430//------------------------------------------------------------------------------
431
432inline void File::dec_ref_count(Block* b, int count)
433{
434 // Method always called under lock.
435 assert(b->is_finished());
436 b->m_refcnt -= count;
437 assert(b->m_refcnt >= 0);
438
439 if (b->m_refcnt == 0)
440 {
441 free_block(b);
442 }
443}
444
445}
446
447#endif
#define stat(a, b)
Definition XrdPosix.hh:101
XrdOucString File
void Done(int result) override
int get_size() const
int get_error() const
int get_n_cksum_errors()
int * ptr_n_cksum_errors()
Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize, bool m_prefetch, bool cks_net)
IO * get_io() const
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
bool is_finished() const
bool is_ok() const
void set_error(int err)
void * get_req_id() const
void set_downloaded()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
bool is_failed() const
long long m_offset
File * get_file() const
vCkSum_t m_cksum_vec
void reset_error_and_set_io(IO *io, void *rid)
int get_req_size() const
void Done(int result) override
DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait)
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
float GetPrefetchScore() const
friend class BlockResponseHandler
int GetNBlocks() const
void StopPrefetchingOnIO(IO *io)
std::string GetRemoteLocations() const
size_t GetAccessCnt() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
long long GetPrefetchedBytes() const
int GetBlockSize() const
int GetNDownloadedBlocks() const
const Info::AStat * GetLastAccessStats() const
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
int inc_ref_cnt()
int GetPrefetchCountOnIO(IO *io)
const Stats & RefStats() const
void Sync()
Sync file cache inf o and output data with disk.
XrdSysTrace * GetTrace() const
int dec_ref_cnt()
int get_ref_cnt()
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
long long GetFileSize() const
const std::string & GetLocalPath() const
void RemoveIO(IO *io)
friend class Cache
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
bool is_in_emergency_shutdown()
XrdSysError * GetLog() const
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close().
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:16
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:41
Statistics of cache utilisation by a File object.
std::list< Block * > BlockList_t
std::vector< ChunkRequest >::iterator vChunkRequest_i
std::vector< ChunkRequest > vChunkRequest_t
std::vector< uint32_t > vCkSum_t
std::list< Block * >::iterator BlockList_i
ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size)
Definition XrdPfcFile.hh:96
ReadRequest * m_read_req
Definition XrdPfcFile.hh:91
Access statistics.
Definition XrdPfcInfo.hh:57
XrdOucCacheIOCB * m_iocb
Definition XrdPfcFile.hh:54
unsigned short m_seq_id
Definition XrdPfcFile.hh:53
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition XrdPfcFile.hh:56
void update_error_cond(int ec)
Definition XrdPfcFile.hh:81
ReadRequest(IO *io, ReadReqRH *rh)
Definition XrdPfcFile.hh:77
bool is_complete() const
Definition XrdPfcFile.hh:83
int return_value() const
Definition XrdPfcFile.hh:84
long long m_bytes_read
Definition XrdPfcFile.hh:68