XRootD
Loading...
Searching...
No Matches
XrdClXCpSrc.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#include "XrdCl/XrdClXCpSrc.hh"
26#include "XrdCl/XrdClXCpCtx.hh"
27#include "XrdCl/XrdClLog.hh"
30#include "XrdCl/XrdClUtils.hh"
31
32#include <cmath>
33#include <cstdlib>
34
35namespace XrdCl
36{
37
39{
40 public:
41
42 ChunkHandler( XCpSrc *src, uint64_t offset, uint64_t size, char *buffer, File *handle, bool usepgrd ) :
43 pSrc( src->Self() ), pOffset( offset ), pSize( size ), pBuffer( buffer ), pHandle( handle ), pUsePgRead( usepgrd )
44 {
45
46 }
47
48 virtual ~ChunkHandler()
49 {
50 pSrc->Delete();
51 }
52
53 virtual void HandleResponse( XRootDStatus *status, AnyObject *response )
54 {
55 PageInfo *chunk = 0;
56 if( response ) // get the response
57 {
58 ToPgInfo( response, chunk );
59 delete response;
60 }
61
62 if( !chunk && status->IsOK() ) // if the response is not there make sure the status is error
63 {
64 *status = XRootDStatus( stError, errInternal );
65 }
66
67 if( status->IsOK() && chunk->GetLength() != pSize ) // the file size on the server is different
68 { // than the one specified in metalink file
69 *status = XRootDStatus( stError, errDataError );
70 }
71
72 if( !status->IsOK() )
73 {
74 delete[] pBuffer;
75 delete chunk;
76 chunk = 0;
77 }
78
79 pSrc->ReportResponse( status, chunk, pHandle );
80
81 delete this;
82 }
83
84 private:
85
86 void ToPgInfo( AnyObject *response, PageInfo *&chunk )
87 {
88 if( pUsePgRead )
89 {
90 response->Get( chunk );
91 response->Set( ( int* )0 );
92 }
93 else
94 {
95 ChunkInfo *rsp = nullptr;
96 response->Get( rsp );
97 chunk = new PageInfo( rsp->offset, rsp->length, rsp->buffer );
98 }
99 }
100
101 XCpSrc *pSrc;
102 uint64_t pOffset;
103 uint64_t pSize;
104 char *pBuffer;
105 File *pHandle;
106 bool pUsePgRead;
107};
108
109
110XCpSrc::XCpSrc( uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx ) :
111 pChunkSize( chunkSize ), pParallel( parallel ), pFileSize( fileSize ), pThread(),
112 pCtx( ctx->Self() ), pFile( 0 ), pCurrentOffset( 0 ), pBlkEnd( 0 ), pDataTransfered( 0 ), pRefCount( 1 ),
113 pRunning( false ), pStartTime( 0 ), pTransferTime( 0 ), pUsePgRead( false )
114{
115}
116
117XCpSrc::~XCpSrc()
118{
119 pCtx->RemoveSrc( this );
120 // we release ctx, it is always Delete() by its creator
121 // not by us.
122 pCtx->Release();
123}
124
126{
127 pRunning = true;
128 int rc = pthread_create( &pThread, 0, Run, this );
129 if( rc ) pRunning = false;
130 return rc;
131}
132
133void* XCpSrc::Run( void* arg )
134{
135 XCpSrc *me = static_cast<XCpSrc*>( arg );
136 me->StartDownloading();
137 me->Delete();
138 return 0;
139}
140
141void XCpSrc::StartDownloading()
142{
143 XRootDStatus st = Initialize();
144 if( !st.IsOK() )
145 {
146 pRunning = false;
147 // notify those who wait for the file
148 // size, they won't get it from this
149 // source
150 pCtx->NotifyInitExpectant();
151 // put a null chunk so we are sure
152 // the main thread doesn't get stuck
153 // at the sync queue
154 pCtx->PutChunk( 0 );
155 return;
156 }
157
158 // start counting transfer time
159 pStartTime = time( 0 );
160
161 while( pRunning )
162 {
163 st = ReadChunks();
164 if( st.IsOK() && st.code == suPartial )
165 {
166 // we have only ongoing transfers
167 // so we can already ask for new block
168 if( GetWork().IsOK() ) continue;
169 }
170 else if( st.IsOK() && st.code == suDone )
171 {
172 // if we are done, try to get more work,
173 // if successful continue
174 if( GetWork().IsOK() ) continue;
175 // keep track of the time before we go idle
176 pTransferTime += time( 0 ) - pStartTime;
177 // check if the overall download process is
178 // done, this makes the thread wait until
179 // either the download is done, or a source
180 // went to error, or a 60s timeout has been
181 // reached (the timeout is there so we can
182 // check if a source degraded in the meanwhile
183 // and now we can steal from it)
184 if( !pCtx->AllDone() )
185 {
186 // reset start time after pause
187 pStartTime = time( 0 );
188 continue;
189 }
190 // stop counting
191 // otherwise we are done here
192 pRunning = false;
193 return;
194 }
195
196 XRootDStatus *status = pReports.Get();
197 if( !status->IsOK() )
198 {
199 Log *log = DefaultEnv::GetLog();
200 std::string myHost = URL( pUrl ).GetHostName();
201 log->Error( UtilityMsg, "Failed to read chunk from %s: %s", myHost.c_str(), status->GetErrorMessage().c_str() );
202
203 if( !Recover().IsOK() )
204 {
205 delete status;
206 pRunning = false;
207 // notify idle sources, they might be
208 // interested in taking over my workload
209 pCtx->NotifyIdleSrc();
210 // put a null chunk so we are sure
211 // the main thread doesn't get stuck
212 // at the sync queue
213 pCtx->PutChunk( 0 );
214 // if we have data we need to wait for someone to take over
215 // unless the extreme copy is over, in this case we don't care
216 while( HasData() && !pCtx->AllDone() );
217
218 return;
219 }
220 }
221 delete status;
222 }
223}
224
225XRootDStatus XCpSrc::Initialize()
226{
227 Log *log = DefaultEnv::GetLog();
228 XRootDStatus st;
229
230 do
231 {
232 if( !pCtx->GetNextUrl( pUrl ) )
233 {
234 log->Error( UtilityMsg, "Failed to initialize XCp source, no more replicas to try" );
235 return XRootDStatus( stError );
236 }
237
238 log->Debug( UtilityMsg, "Opening %s for reading", pUrl.c_str() );
239
240 std::string value;
241 DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
242
243 pFile = new File();
244 pFile->SetProperty( "ReadRecovery", value );
245
246 st = pFile->Open( pUrl, OpenFlags::Read );
247 if( !st.IsOK() )
248 {
249 log->Warning( UtilityMsg, "Failed to open %s for reading: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
250 DeletePtr( pFile );
251 continue;
252 }
253
254 URL url( pUrl );
255 if( ( !url.IsLocalFile() && !pFile->IsSecure() ) ||
256 ( url.IsLocalFile() && url.IsMetalink() ) )
257 {
258 std::string datasrv;
259 pFile->GetProperty( "DataServer", datasrv );
260 //--------------------------------------------------------------------
261 // Decide whether we can use PgRead
262 //--------------------------------------------------------------------
264 XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
265 pUsePgRead = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
266 }
267
268 if( pFileSize < 0 )
269 {
270 StatInfo *statInfo = 0;
271 st = pFile->Stat( false, statInfo );
272 if( !st.IsOK() )
273 {
274 log->Warning( UtilityMsg, "Failed to stat %s: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
275 DeletePtr( pFile );
276 continue;
277 }
278 pFileSize = statInfo->GetSize();
279 pCtx->SetFileSize( pFileSize );
280 delete statInfo;
281 }
282 }
283 while( !st.IsOK() );
284
285 std::pair<uint64_t, uint64_t> p = pCtx->GetBlock();
286 pCurrentOffset = p.first;
287 pBlkEnd = p.second + p.first;
288
289 return st;
290}
291
292XRootDStatus XCpSrc::Recover()
293{
294 Log *log = DefaultEnv::GetLog();
295 XRootDStatus st;
296
297 do
298 {
299 if( !pCtx->GetNextUrl( pUrl ) )
300 {
301 log->Error( UtilityMsg, "Failed to initialize XCp source, no more replicas to try" );
302 return XRootDStatus( stError );
303 }
304
305 log->Debug( UtilityMsg, "Opening %s for reading", pUrl.c_str() );
306
307 std::string value;
308 DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
309
310 pFile = new File();
311 pFile->SetProperty( "ReadRecovery", value );
312
313 st = pFile->Open( pUrl, OpenFlags::Read );
314 if( !st.IsOK() )
315 {
316 DeletePtr( pFile );
317 log->Warning( UtilityMsg, "Failed to open %s for reading: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
318 }
319
320 URL url( pUrl );
321 if( ( !url.IsLocalFile() && pFile->IsSecure() ) ||
322 ( url.IsLocalFile() && url.IsMetalink() ) )
323 {
324 std::string datasrv;
325 pFile->GetProperty( "DataServer", datasrv );
326 //--------------------------------------------------------------------
327 // Decide whether we can use PgRead
328 //--------------------------------------------------------------------
330 XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
331 pUsePgRead = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
332 }
333 }
334 while( !st.IsOK() );
335
336 pRecovered.insert( pOngoing.begin(), pOngoing.end() );
337 pOngoing.clear();
338
339 // since we have a brand new source, we need
340 // to restart transfer rate statistics
341 pTransferTime = 0;
342 pStartTime = time( 0 );
343 pDataTransfered = 0;
344
345 return st;
346}
347
348XRootDStatus XCpSrc::ReadChunks()
349{
350 XrdSysMutexHelper lck( pMtx );
351
352 while( pOngoing.size() < pParallel && !pRecovered.empty() )
353 {
354 std::pair<uint64_t, uint64_t> p;
355 std::map<uint64_t, uint64_t>::iterator itr = pRecovered.begin();
356 p = *itr;
357 pOngoing.insert( p );
358 pRecovered.erase( itr );
359
360 char *buffer = new char[p.second];
361 ChunkHandler *handler = new ChunkHandler( this, p.first, p.second, buffer, pFile, pUsePgRead );
362 XRootDStatus st = pUsePgRead
363 ? pFile->PgRead( p.first, p.second, buffer, handler )
364 : pFile->Read( p.first, p.second, buffer, handler );
365 if( !st.IsOK() )
366 {
367 delete[] buffer;
368 delete handler;
369 ReportResponse( new XRootDStatus( st ), 0, pFile );
370 return st;
371 }
372 }
373
374 while( pOngoing.size() < pParallel && pCurrentOffset < pBlkEnd )
375 {
376 uint64_t chunkSize = pChunkSize;
377 if( pCurrentOffset + chunkSize > pBlkEnd )
378 chunkSize = pBlkEnd - pCurrentOffset;
379 pOngoing[pCurrentOffset] = chunkSize;
380 char *buffer = new char[chunkSize];
381 ChunkHandler *handler = new ChunkHandler( this, pCurrentOffset, chunkSize, buffer, pFile, pUsePgRead );
382 XRootDStatus st = pUsePgRead
383 ? pFile->PgRead( pCurrentOffset, chunkSize, buffer, handler )
384 : pFile->Read( pCurrentOffset, chunkSize, buffer, handler );
385 pCurrentOffset += chunkSize;
386 if( !st.IsOK() )
387 {
388 delete[] buffer;
389 delete handler;
390 ReportResponse( new XRootDStatus( st ), 0, pFile );
391 return st;
392 }
393 }
394
395 if( pOngoing.empty() ) return XRootDStatus( stOK, suDone );
396
397 if( pRecovered.empty() && pCurrentOffset >= pBlkEnd ) return XRootDStatus( stOK, suPartial );
398
399 return XRootDStatus( stOK, suContinue );
400}
401
402void XCpSrc::ReportResponse( XRootDStatus *status, PageInfo *chunk, File *handle )
403{
404 XrdSysMutexHelper lck( pMtx );
405 bool ignore = false;
406
407 if( status->IsOK() )
408 {
409 // if the status is OK remove it from
410 // the list of ongoing transfers, if it
411 // was not on the list we ignore the
412 // response (this could happen due to
413 // source change or stealing)
414 ignore = !pOngoing.erase( chunk->GetOffset() );
415 }
416 else if( FilesEqual( pFile, handle ) )
417 {
418 // if the status is NOT OK, and pFile
419 // match the handle it means that we see
420 // an error for the first time, map the
421 // broken file to the number of outstanding
422 // asynchronous operations and reset the pointer
423 pFailed[pFile] = pOngoing.size();
424 pFile = 0;
425 }
426 else
427 DeletePtr( status );
428
429 if( !FilesEqual( pFile, handle ) )
430 {
431 // if the pFile does not match the handle,
432 // it means that this response came from
433 // a broken source, decrement the count of
434 // outstanding async operations for this src,
435 --pFailed[handle];
436 if( pFailed[handle] == 0 )
437 {
438 // if this was the last outstanding operation
439 // close the file and delete it
440 pFailed.erase( handle );
441 XRootDStatus st = handle->Close();
442 delete handle;
443 }
444 }
445
446 lck.UnLock();
447
448 if( status ) pReports.Put( status );
449
450 if( ignore )
451 {
452 DeleteChunk( chunk );
453 return;
454 }
455
456 if( chunk )
457 {
458 pDataTransfered += chunk->GetLength();
459 pCtx->PutChunk( chunk );
460 }
461}
462
463void XCpSrc::Steal( XCpSrc *src )
464{
465 if( !src ) return;
466
467 // use the address of the mutex to form an
468 // order for acquiring the locks.
469 XrdSysMutexHelper lck1, lck2;
470 if ( std::less{}(&pMtx, &src->pMtx) )
471 {
472 lck2.Lock( &src->pMtx );
473 lck1.Lock( &pMtx );
474 }
475 else
476 {
477 lck1.Lock( &pMtx );
478 lck2.Lock( &src->pMtx );
479 }
480
481 Log *log = DefaultEnv::GetLog();
482 std::string myHost = URL( pUrl ).GetHostName(), srcHost = URL( src->pUrl ).GetHostName();
483
484 if( !src->pRunning )
485 {
486 // the source we are stealing from is in error state, we can have everything
487
488 pRecovered.insert( src->pOngoing.begin(), src->pOngoing.end() );
489 pRecovered.insert( src->pRecovered.begin(), src->pRecovered.end() );
490 pCurrentOffset = src->pCurrentOffset;
491 pBlkEnd = src->pBlkEnd;
492
493 src->pOngoing.clear();
494 src->pRecovered.clear();
495 src->pCurrentOffset = 0;
496 src->pBlkEnd = 0;
497
498 // a broken source might be waiting for
499 // someone to take over his data, so we
500 // need to notify
501 pCtx->NotifyIdleSrc();
502
503 log->Debug( UtilityMsg, "%s: Stealing everything from %s", myHost.c_str(), srcHost.c_str() );
504
505 return;
506 }
507
508 // the source we are stealing from is just slower, only take part of its work
509 // so we want a fraction of its work we want for ourself
510 uint64_t myTransferRate = TransferRate(), srcTransferRate = src->TransferRate();
511 if( myTransferRate == 0 ) return;
512 double fraction = double( myTransferRate ) / double( myTransferRate + srcTransferRate );
513
514 if( src->pCurrentOffset < src->pBlkEnd )
515 {
516 // the source still has a block of data
517 uint64_t blkSize = src->pBlkEnd - src->pCurrentOffset;
518 uint64_t steal = static_cast<uint64_t>( round( fraction * blkSize ) );
519 // if after stealing there will be less than one chunk
520 // take everything
521 if( blkSize - steal <= pChunkSize )
522 steal = blkSize;
523
524 pCurrentOffset = src->pBlkEnd - steal;
525 pBlkEnd = src->pBlkEnd;
526 src->pBlkEnd -= steal;
527
528 log->Debug( UtilityMsg, "%s: Stealing fraction (%f) of block from %s", myHost.c_str(), fraction, srcHost.c_str() );
529
530 return;
531 }
532
533 if( !src->pRecovered.empty() )
534 {
535 size_t count = static_cast<size_t>( round( fraction * src->pRecovered.size() ) );
536 while( count-- )
537 {
538 std::map<uint64_t, uint64_t>::iterator itr = src->pRecovered.begin();
539 pRecovered.insert( *itr );
540 src->pRecovered.erase( itr );
541 }
542
543 log->Debug( UtilityMsg, "%s: Stealing fraction (%f) of recovered chunks from %s", myHost.c_str(), fraction, srcHost.c_str() );
544
545 return;
546 }
547
548 // * a fraction < 0.5 means that we are actually slower (so it does
549 // not make sense to steal ongoing's from someone who's faster)
550 // * a fraction ~ 0.5 means that we have more or less the same transfer
551 // rate (similarly, it doesn't make sense to steal)
552 // * the source needs to be really faster (though, this is an arbitrary
553 // choice) to actually steal something
554 if( !src->pOngoing.empty() && fraction > 0.7 )
555 {
556 size_t count = static_cast<size_t>( round( fraction * src->pOngoing.size() ) );
557 while( count-- )
558 {
559 std::map<uint64_t, uint64_t>::iterator itr = src->pOngoing.begin();
560 pRecovered.insert( *itr );
561 src->pOngoing.erase( itr );
562 }
563
564 log->Debug( UtilityMsg, "%s: Stealing fraction (%f) of ongoing chunks from %s", myHost.c_str(), fraction, srcHost.c_str() );
565 }
566}
567
568XRootDStatus XCpSrc::GetWork()
569{
570 std::pair<uint64_t, uint64_t> p = pCtx->GetBlock();
571
572 if( p.second > 0 )
573 {
574 XrdSysMutexHelper lck( pMtx );
575 pCurrentOffset = p.first;
576 pBlkEnd = p.first + p.second;
577
578 Log *log = DefaultEnv::GetLog();
579 std::string myHost = URL( pUrl ).GetHostName();
580 log->Debug( UtilityMsg, "%s got next block", myHost.c_str() );
581
582 return XRootDStatus();
583 }
584
585 // WeakestLink() increases ref count on wLink, so decrease after
586 XCpSrc *wLink = pCtx->WeakestLink( this );
587 Steal( wLink );
588 if( wLink ) wLink->Delete();
589
590 // if we managed to steal something declare success
591 if( pCurrentOffset < pBlkEnd || !pRecovered.empty() ) return XRootDStatus();
592 // otherwise return an error
593 return XRootDStatus( stError, errInvalidOp );
594}
595
597{
598 time_t duration = pTransferTime + time( 0 ) - pStartTime;
599 return pDataTransfered / ( duration + 1 ); // add one to avoid floating point exception
600}
601
602} /* namespace XrdCl */
XrdOucString File
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
ChunkHandler(XCpSrc *src, uint64_t offset, uint64_t size, char *buffer, File *handle, bool usepgrd)
virtual ~ChunkHandler()
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
A file.
Definition XrdClFile.hh:46
Handle an async response.
static bool HasPgRW(const XrdCl::URL &url)
void NotifyInitExpectant()
void RemoveSrc(XCpSrc *src)
void PutChunk(PageInfo *chunk)
friend class ChunkHandler
static void DeleteChunk(PageInfo *&chunk)
XCpSrc * Self()
XCpSrc(uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx)
uint64_t TransferRate()
void Lock(XrdSysMutex *Mutex)
const uint16_t suPartial
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidOp
const int DefaultCpUsePgWrtRd
const uint64_t UtilityMsg
const uint16_t suDone
const uint16_t suContinue
Response NullRef< Response >::value
XrdSysError Log
Definition XrdConfig.cc:113
@ Read
Open only for reading.
uint32_t GetLength() const
Get the data length.
bool IsOK() const
We're fine.