XRootD
Loading...
Searching...
No Matches
XrdCephBuffer::XrdCephBufferAlgSimple Class Reference

#include <XrdCephBufferAlgSimple.hh>

+ Inheritance diagram for XrdCephBuffer::XrdCephBufferAlgSimple:
+ Collaboration diagram for XrdCephBuffer::XrdCephBufferAlgSimple:

Public Member Functions

 XrdCephBufferAlgSimple (std::unique_ptr< IXrdCephBufferData > buffer, std::unique_ptr< ICephIOAdapter > cephio, int fd, bool useStriperlessReads=true)
 
virtual ~XrdCephBufferAlgSimple ()
 
virtual IXrdCephBufferDatabuffer ()
 
virtual const IXrdCephBufferDatabuffer () const
 
virtual ssize_t flushWriteCache () override
 remember to flush the cache on final writes
 
virtual ssize_t read (volatile void *buff, off_t offset, size_t blen) override
 read data through the buffer
 
virtual ssize_t read_aio (XrdSfsAio *aoip) override
 possible aio based code
 
virtual ssize_t write (const void *buff, off_t offset, size_t blen) override
 write data through the buffer
 
virtual ssize_t write_aio (XrdSfsAio *aoip) override
 possible aio based code
 
- Public Member Functions inherited from XrdCephBuffer::IXrdCephBufferAlg
virtual ~IXrdCephBufferAlg ()
 

Protected Member Functions

virtual ssize_t rawRead (void *buff, off_t offset, size_t blen)
 
virtual ssize_t rawWrite (void *buff, off_t offset, size_t blen)
 

Detailed Description

Non-async buffering code for non-aio read operations. Create a single buffer of a given size. For reads, if data in the buffer read and return the available bytes; if no useful data in the buffer fill the full buffer and return the requested read. If the data is partially in the buffer for the range requested, return only that subset; client should check and make an additional call for the data not returned. if 0 bytes are returned, it should be assumed it is at the end of the file.

Definition at line 27 of file XrdCephBufferAlgSimple.hh.

Constructor & Destructor Documentation

◆ XrdCephBufferAlgSimple()

XrdCephBufferAlgSimple::XrdCephBufferAlgSimple ( std::unique_ptr< IXrdCephBufferData buffer,
std::unique_ptr< ICephIOAdapter cephio,
int  fd,
bool  useStriperlessReads = true 
)

Definition at line 20 of file XrdCephBufferAlgSimple.cc.

22 :
23m_bufferdata(std::move(buffer)), m_cephio(std::move(cephio)), m_fd(fd),
24m_useStriperlessReads(useStriperlessReads) {
25
26}
virtual const IXrdCephBufferData * buffer() const

◆ ~XrdCephBufferAlgSimple()

XrdCephBufferAlgSimple::~XrdCephBufferAlgSimple ( )
virtual

Definition at line 28 of file XrdCephBufferAlgSimple.cc.

28 {
29 int prec = std::cout.precision();
30 float bytesBuffered = m_stats_bytes_fromceph - m_stats_bytes_bypassed;
31 float cacheUseFraction = bytesBuffered > 0 ? (1.*(m_stats_bytes_toclient-m_stats_bytes_bypassed)/bytesBuffered) : 1. ;
32
33 BUFLOG("XrdCephBufferAlgSimple::Destructor, fd=" << m_fd
34 << ", retrieved_bytes=" << m_stats_bytes_fromceph
35 << ", bypassed_bytes=" << m_stats_bytes_bypassed
36 << ", delivered_bytes=" << m_stats_bytes_toclient
37 << std::setprecision(4)
38 << ", cache_hit_frac=" << cacheUseFraction << std::setprecision(prec));
39 m_fd = -1;
40}
#define BUFLOG(x)

References BUFLOG.

Member Function Documentation

◆ buffer() [1/2]

virtual IXrdCephBufferData * XrdCephBuffer::XrdCephBufferAlgSimple::buffer ( )
inlinevirtual

Definition at line 43 of file XrdCephBufferAlgSimple.hh.

43{return m_bufferdata.get();}

◆ buffer() [2/2]

virtual const IXrdCephBufferData * XrdCephBuffer::XrdCephBufferAlgSimple::buffer ( ) const
inlinevirtual

Definition at line 42 of file XrdCephBufferAlgSimple.hh.

42{return m_bufferdata.get();}

◆ flushWriteCache()

ssize_t XrdCephBufferAlgSimple::flushWriteCache ( )
overridevirtual

remember to flush the cache on final writes

Implements XrdCephBuffer::IXrdCephBufferAlg.

Definition at line 354 of file XrdCephBufferAlgSimple.cc.

354 {
355 // Set a lock for any attempt at a simultaneous operation
356 // Use recursive, as write (and read) also calls the lock and don't want to deadlock
357 const std::lock_guard<std::recursive_mutex> lock(m_data_mutex); //
358 // BUFLOG("flushWriteCache: " << m_bufferStartingOffset << " " << m_bufferLength);
359 ssize_t rc(-1);
360 if (m_bufferLength == 0) {
361 BUFLOG("Empty buffer to flush: ");
362 rc = 0; // not an issue
363 }
364
365 if (m_bufferLength > 0) {
366 rc = m_cephio->write(m_bufferStartingOffset, m_bufferLength);
367 if (rc < 0) {
368 BUFLOG("WriteBuffer write step failed: " << rc);
369 }
370 } // some bytes to write
371
372 // reset values
373 m_bufferLength=0;
374 m_bufferStartingOffset=0;
375 m_bufferdata->invalidate();
376 // return bytes written, or errorcode if failure
377 return rc;
378}

References BUFLOG.

Referenced by write().

+ Here is the caller graph for this function:

◆ rawRead()

ssize_t XrdCephBufferAlgSimple::rawRead ( void *  buff,
off_t  offset,
size_t  blen 
)
protectedvirtual

Definition at line 381 of file XrdCephBufferAlgSimple.cc.

381 {
382 return -ENOSYS;
383}

◆ rawWrite()

ssize_t XrdCephBufferAlgSimple::rawWrite ( void *  buff,
off_t  offset,
size_t  blen 
)
protectedvirtual

Definition at line 385 of file XrdCephBufferAlgSimple.cc.

385 {
386 return -ENOSYS;
387}

◆ read()

ssize_t XrdCephBufferAlgSimple::read ( volatile void *  buff,
off_t  offset,
size_t  blen 
)
overridevirtual

read data through the buffer

If the requested read is larger than the buffer size, just bypass the cache. Invalidate the cache in anycase

In principle, only should ever have the first loop, however, in the case a read request passes over the boundary of the buffer, two reads will be needed; the first to read out the current buffer, and a second, to read the partial data from the refilled buffer

If we need to load data in the cache, do it here.

Implements XrdCephBuffer::IXrdCephBufferAlg.

Definition at line 90 of file XrdCephBufferAlgSimple.cc.

90 {
91 // Set a lock for any attempt at a simultaneous operation
92 // Use recursive, as flushCache also calls the lock and don't want to deadlock
93 // No call to flushCache should happen in a read, but be consistent
94 // BUFLOG("XrdCephBufferAlgSimple::read: preLock: " << std::hash<std::thread::id>{}(std::this_thread::get_id()) << " " << offset << " " << blen);
95 const std::lock_guard<std::recursive_mutex> lock(m_data_mutex); //
96 // BUFLOG("XrdCephBufferAlgSimple::read: postLock: " << std::hash<std::thread::id>{}(std::this_thread::get_id()) << " " << offset << " " << blen);
97
98 // BUFLOG("XrdCephBufferAlgSimple::read status:"
99 // << "\n\tRead off/len/end: " << offset << "/" << blen << "/(" << (offset+blen) <<")"
100 // << "\n\tBuffer: start/length/end/cap: " << m_bufferStartingOffset << "/" << m_bufferLength << "/"
101 // << (m_bufferStartingOffset + m_bufferLength) << "/" << m_bufferdata->capacity()
102 // );
103 if (blen == 0) return 0;
104
109 if (blen >= m_bufferdata->capacity()) {
110 //BUFLOG("XrdCephBufferAlgSimple::read: Readthrough cache: fd: " << m_fd
111 // << " " << offset << " " << blen);
112 // larger than cache, so read through, and invalidate the cache anyway
113 m_bufferdata->invalidate();
114 m_bufferLength =0; // ensure cached data is set to zero length
115 // #FIXME JW: const_cast is probably a bit poor.
116
117 ssize_t rc = ceph_posix_maybestriper_pread (m_fd, const_cast<void*>(buf), blen, offset, m_useStriperlessReads);
118 if (rc > 0) {
119 m_stats_bytes_fromceph += rc;
120 m_stats_bytes_toclient += rc;
121 m_stats_bytes_bypassed += rc;
122 }
123 return rc;
124 }
125
126 ssize_t rc(-1);
127 size_t bytesRemaining = blen; // track how many bytes still need to be read
128 off_t offsetDelta = 0;
129 size_t bytesRead = 0;
135 while (bytesRemaining > 0) {
136 // BUFLOG("In loop: " << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining << " " << m_bufferLength);
137
138 bool loadCache = false;
139 // run some checks to see if we need to fill the cache.
140 if (m_bufferLength == 0) {
141 // no data in buffer
142 loadCache = true;
143 } else if (offset < m_bufferStartingOffset) {
144 // offset before any cache data
145 loadCache = true;
146 } else if (offset >= (off_t) (m_bufferStartingOffset + m_bufferLength) ) {
147 // offset is beyond the stored data
148 loadCache = true;
149 } else if ((offset - m_bufferStartingOffset + offsetDelta) >= (off_t)m_bufferLength) {
150 // we have now read to the end of the buffers data
151 loadCache = true;
152 }
153
158 if (loadCache) {
159 // BUFLOG("XrdCephBufferAlgSimple::read: preLock: " << std::hash<std::thread::id>{}(std::this_thread::get_id()) << " " << "Filling the cache");
160 m_bufferdata->invalidate();
161 m_bufferLength =0; // set lengh of data stored to 0
162 rc = m_cephio->read(offset + offsetDelta, m_bufferdata->capacity()); // fill the cache
163 // BUFLOG("LoadCache ReadToCache: " << rc << " " << offset + offsetDelta << " " << m_bufferdata->capacity() );
164 if (rc < 0) {
165 BUFLOG("LoadCache Error: " << rc);
166 return rc;// TODO return correct errors
167 }
168 m_stats_bytes_fromceph += rc;
169 m_bufferStartingOffset = offset + offsetDelta;
170 m_bufferLength = rc;
171 if (rc == 0) {
172 // We should be at the end of file, with nothing more to read, and nothing that could be returned
173 // break out of the loop.
174 break;
175 }
176 }
177
178
179 //now read as much data as possible
180 off_t bufPosition = offset + offsetDelta - m_bufferStartingOffset;
181 rc = m_bufferdata->readBuffer( (void*) &(((char*)buf)[offsetDelta]) , bufPosition , bytesRemaining);
182 // BUFLOG("Fill result: " << offsetDelta << " " << bufPosition << " " << bytesRemaining << " " << rc)
183 if (rc < 0 ) {
184 BUFLOG("Reading from Cache Failed: " << rc << " " << offset << " "
185 << offsetDelta << " " << m_bufferStartingOffset << " "
186 << bufPosition << " "
187 << bytesRemaining );
188 return rc; // TODO return correct errors
189 }
190 if (rc == 0) {
191 // no bytes returned; much be at end of file
192 //BUFLOG("No bytes returned: " << rc << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining);
193 break; // leave the loop even though bytesremaing is probably >=0.
194 //i.e. requested a full buffers worth, but only a fraction of the file is here.
195 }
196 m_stats_bytes_toclient += rc;
197 // BUFLOG("End of loop: " << rc << " " << offset << " + " << offsetDelta << "; " << blen << " : " << bytesRemaining);
198 offsetDelta += rc;
199 bytesRemaining -= rc;
200 bytesRead += rc;
201
202 } // while bytesremaing
203
204 return bytesRead;
205}
ssize_t ceph_posix_maybestriper_pread(int fd, void *buf, size_t count, off64_t offset, bool allowStriper)

References BUFLOG, and ceph_posix_maybestriper_pread().

+ Here is the call graph for this function:

◆ read_aio()

ssize_t XrdCephBufferAlgSimple::read_aio ( XrdSfsAio aoip)
overridevirtual

possible aio based code

Implements XrdCephBuffer::IXrdCephBufferAlg.

Definition at line 43 of file XrdCephBufferAlgSimple.cc.

43 {
44 // Currently this is not supported, and callers using this should recieve the appropriate error code
45 //return -ENOSYS;
46
47 ssize_t rc(-ENOSYS);
48 if (!aoip) {
49 return -EINVAL;
50 }
51
52 volatile void * buf = aoip->sfsAio.aio_buf;
53 size_t blen = aoip->sfsAio.aio_nbytes;
54 off_t offset = aoip->sfsAio.aio_offset;
55
56 // translate the aio read into a simple sync read.
57 // hopefully don't get too many out of sequence reads to effect the caching
58 rc = read(buf, offset, blen);
59
60 aoip->Result = rc;
61 aoip->doneRead();
62
63 return rc;
64
65}
#define read(a, b, c)
Definition XrdPosix.hh:82
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
void * aio_buf
Definition XrdSfsAio.hh:47
ssize_t Result
Definition XrdSfsAio.hh:65
virtual void doneRead()=0
struct aiocb sfsAio
Definition XrdSfsAio.hh:62

References aiocb::aio_buf, aiocb::aio_nbytes, aiocb::aio_offset, XrdSfsAio::doneRead(), read, XrdSfsAio::Result, and XrdSfsAio::sfsAio.

+ Here is the call graph for this function:

◆ write()

ssize_t XrdCephBufferAlgSimple::write ( const void *  buff,
off_t  offset,
size_t  blen 
)
overridevirtual

write data through the buffer

We expect the next write to be in order and well defined. Determine the expected offset, and compare against offset provided Expected offset is the end of the buffer. m_bufferStartingOffset is the represented offset in ceph that buffer[0] represents

We should be equally careful if the offset of the buffer start is not aligned sensibly. Log this only for now, but #TODO, this should be come an error condition for over cautitious behaviour.

Provide some sanity checking for the write to the buffer. We call an error on this conditions as there is no immediate solution that is satisfactory.

< track how many bytes left to write

Typically would expect only one loop, i.e. the write request is smaller than the buffer. If bigger, or the request stradles the end of the buffer, will need another loop

If the cache is already full, lets flush to disk now

Check again if we can write data into the storage

Implements XrdCephBuffer::IXrdCephBufferAlg.

Definition at line 207 of file XrdCephBufferAlgSimple.cc.

207 {
208 // Set a lock for any attempt at a simultaneous operation
209 // Use recursive, as flushCache also calls the lock and don't want to deadlock
210 const std::lock_guard<std::recursive_mutex> lock(m_data_mutex);
211
212 // take the data in buf and put it into the cache; when the cache is full, write to underlying storage
213 // remember to flush the cache at the end of operations ...
214 ssize_t rc(-1);
215 // ssize_t bytesWrittenToStorage(0);
216
217 if (blen == 0) {
218 return 0; // nothing to write; are we done?
219 }
220
227 off_t expected_offset = (off_t)(m_bufferStartingOffset + m_bufferLength);
228
229 if ((offset != expected_offset) && (m_bufferLength > 0) ) {
230 // for the moment we just log that there is some non expected offset value
231 // TODO, might be dangerous to flush the cache on non-aligned writes ...
232 BUFLOG("Non expected offset: " << rc << " " << offset << " " << expected_offset);
233 // rc = flushWriteCache();
234 // if (rc < 0) {
235 // return rc; // TODO return correct errors
236 // }
237 } // mismatched offset
238
241 if ( (m_bufferStartingOffset % m_bufferdata->capacity()) != 0 ) {
242 BUFLOG(" Non aligned offset?" << m_bufferStartingOffset << " "
243 << m_bufferdata->capacity() << " " << m_bufferStartingOffset % m_bufferdata->capacity() );
244 }
245
246 // Commmented out below. It would be good to pass writes, which are larger than the buffer size,
247 // straight-through. However if the ranges are not well aligned, this could be an issue.
248 // And, what then to do about a possible partial filled buffer?
249
250 // if (blen >= m_bufferdata->capacity()) {
251 // // TODO, might be dangerous to flush the cache on non-aligned writes ...
252 // // flush the cache now, if needed
253 // rc = flushWriteCache();
254 // if (rc < 0) {
255 // return rc; // TODO return correct errors
256 // }
257 // bytesWrittenToStorage += rc;
258
259 // // Size is larger than the buffer; send the write straight through
260 // std::clog << "XrdCephBufferAlgSimple::write: Readthrough cache: fd: " << m_fd
261 // << " " << offset << " " << blen << std::endl;
262 // // larger than cache, so read through, and invalidate the cache anyway
263 // m_bufferdata->invalidate();
264 // m_bufferLength=0;
265 // m_bufferStartingOffset=0;
266 // rc = ceph_posix_pwrite(m_fd, buf, blen, offset);
267 // if (rc < 0) {
268 // return rc; // TODO return correct errors
269 // }
270 // bytesWrittenToStorage += rc;
271 // return rc;
272 // }
273
278 if ((offset != expected_offset) && (m_bufferLength > 0) ) {
279 BUFLOG("Error trying to write out of order: expected at: " << expected_offset
280 << " got offset" << offset << " of len " << blen);
281 return -EINVAL;
282 }
283 if (offset < 0) {
284 BUFLOG("Got a negative offset: " << offset);
285 return -EINVAL;
286 }
287
288
289 size_t bytesRemaining = blen;
290 size_t bytesWritten = 0;
291
295 while (bytesRemaining > 0) {
299 if (m_bufferLength == m_bufferdata->capacity()) {
300 rc = flushWriteCache();
301 if (rc < 0) {
302 return rc;
303 }
304 // bytesWrittenToStorage += rc;
305 } // at capacity;
306
307 if (m_bufferLength == 0) {
308 // cache is currently empty, so set the 'reference' to the external offset now
309 m_bufferStartingOffset = offset + bytesWritten;
310 }
311 //add data to the cache from buf, from buf[offsetDelta] to the cache at position m_bufferLength
312 // make sure to write only as many bytes as left in the cache.
313 size_t nBytesToWrite = std::min(bytesRemaining, m_bufferdata->capacity()-m_bufferLength);
314 const void* bufAtOffset = (void*)((char*)buf + bytesWritten); // nasty cast as void* doesn't do arithmetic
315 if (nBytesToWrite == 0) {
316 BUFLOG( "Wanting to write 0 bytes; why is that?");
317 }
318 rc = m_bufferdata->writeBuffer(bufAtOffset, m_bufferLength, nBytesToWrite, 0);
319 if (rc < 0) {
320 BUFLOG( "WriteBuffer step failed: " << rc << " " << m_bufferLength << " " << blen << " " << offset );
321 return rc; // pass the error condidition upwards
322 }
323 if (rc != (ssize_t)nBytesToWrite) {
324 BUFLOG( "WriteBuffer returned unexpected number of bytes: " << rc << " Expected: " << nBytesToWrite << " "
325 << m_bufferLength << " " << blen << " " << offset );
326 return -EBADE; // is bad exchange error best errno here?
327 }
328
329 // lots of repetition here; #TODO try to reduce
330 m_bufferLength += rc;
331 bytesWritten += rc;
332 bytesRemaining -= rc;
333
334 } // while byteRemaining
335
339 if (m_bufferLength == m_bufferdata->capacity()){
340 rc = flushWriteCache();
341 if (rc < 0)
342 {
343 return rc; // TODO return correct errors
344 }
345 // bytesWrittenToStorage += rc;
346 } // at capacity;
347
348 //BUFLOG( "WriteBuffer " << bytesWritten << " " << bytesWrittenToStorage << " " << offset << " " << blen << " " );
349 return bytesWritten;
350}
virtual ssize_t flushWriteCache() override
remember to flush the cache on final writes

References BUFLOG, and flushWriteCache().

+ Here is the call graph for this function:

◆ write_aio()

ssize_t XrdCephBufferAlgSimple::write_aio ( XrdSfsAio aoip)
overridevirtual

possible aio based code

Implements XrdCephBuffer::IXrdCephBufferAlg.

Definition at line 67 of file XrdCephBufferAlgSimple.cc.

67 {
68 // Currently this is not supported, and callers using this should recieve the appropriate error code
69 // return -ENOSYS;
70
71 ssize_t rc(-ENOSYS);
72 if (!aoip) {
73 return -EINVAL;
74 }
75
76 // volatile void * buf = aoip->sfsAio.aio_buf;
77 // size_t blen = aoip->sfsAio.aio_nbytes;
78 // off_t offset = aoip->sfsAio.aio_offset;
79 size_t blen = aoip->sfsAio.aio_nbytes;
80 off_t offset = aoip->sfsAio.aio_offset;
81
82 rc = write(const_cast<const void*>(aoip->sfsAio.aio_buf), offset, blen);
83 aoip->Result = rc;
84 aoip->doneWrite();
85 return rc;
86
87}
#define write(a, b, c)
Definition XrdPosix.hh:115
virtual void doneWrite()=0

References aiocb::aio_buf, aiocb::aio_nbytes, aiocb::aio_offset, XrdSfsAio::doneWrite(), XrdSfsAio::Result, XrdSfsAio::sfsAio, and write.

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: