XRootD
Loading...
Searching...
No Matches
XrdClMessageUtils.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdCl/XrdClLog.hh"
35
37
38namespace XrdCl
39{
40 //----------------------------------------------------------------------------
41 // Send a message
42 //----------------------------------------------------------------------------
44 Message *msg,
45 ResponseHandler *handler,
46 MessageSendParams &sendParams,
47 LocalFileHandler *lFileHandler )
48 {
49 //--------------------------------------------------------------------------
50 // Get the stuff needed to send the message
51 //--------------------------------------------------------------------------
52 Log *log = DefaultEnv::GetLog();
54 XRootDStatus st;
55
56 if( !postMaster )
58
59 log->Dump( XRootDMsg, "[%s] Sending message %s",
60 url.GetHostId().c_str(), msg->GetObfuscatedDescription().c_str() );
61
62 //--------------------------------------------------------------------------
63 // Get an instance of SID manager object
64 //--------------------------------------------------------------------------
65 std::shared_ptr<SIDManager> sidMgr( SIDMgrPool::Instance().GetSIDMgr( url ) );
67
68 //--------------------------------------------------------------------------
69 // Allocate the SID and marshall the message
70 //--------------------------------------------------------------------------
71 st = sidMgr->AllocateSID( req->streamid );
72 if( !st.IsOK() )
73 {
74 log->Error( XRootDMsg, "[%s] Unable to allocate stream id",
75 url.GetHostId().c_str() );
76 return st;
77 }
78
79 //--------------------------------------------------------------------------
80 // Make sure that in case of checkpoint xeq request the embedded request
81 // SID is matching
82 //--------------------------------------------------------------------------
83 if( req->requestid == kXR_chkpoint )
84 {
86 if( r->chkpoint.opcode == kXR_ckpXeq )
87 {
89 xeq->header.streamid[0] = req->streamid[0];
90 xeq->header.streamid[1] = req->streamid[1];
91 }
92 }
93
95
96 //--------------------------------------------------------------------------
97 // Create and set up the message handler
98 //--------------------------------------------------------------------------
99 XRootDMsgHandler *msgHandler;
100 msgHandler = new XRootDMsgHandler( msg, handler, &url, sidMgr, lFileHandler );
101 msgHandler->SetExpiration( sendParams.expires );
102 msgHandler->SetRedirectAsAnswer( !sendParams.followRedirects );
103 msgHandler->SetOksofarAsAnswer( sendParams.chunkedResponse );
104 msgHandler->SetChunkList( sendParams.chunkList );
105 msgHandler->SetKernelBuffer( sendParams.kbuff );
106 msgHandler->SetRedirectCounter( sendParams.redirectLimit );
107 msgHandler->SetStateful( sendParams.stateful );
108 msgHandler->SetCrc32cDigests( std::move( sendParams.crc32cDigests ) );
109
110 if( sendParams.loadBalancer.url.IsValid() )
111 msgHandler->SetLoadBalancer( sendParams.loadBalancer );
112
113 HostList *list = 0;
114 if( sendParams.hostList )
115 {
116 list = sendParams.hostList;
117 sendParams.hostList = nullptr;
118 }
119 else
120 list = new HostList();
121 list->push_back( url );
122 msgHandler->SetHostList( list );
123
124 //--------------------------------------------------------------------------
125 // Send the message
126 //--------------------------------------------------------------------------
127 st = postMaster->Send( url, msg, msgHandler, sendParams.stateful,
128 sendParams.expires );
129 if( !st.IsOK() )
130 {
132 log->Error( XRootDMsg, "[%s] Unable to send the message %s: %s",
133 url.GetHostId().c_str(), msg->GetObfuscatedDescription().c_str(),
134 st.ToString().c_str() );
135
136 // we need to reassign req as its current value might have been
137 // invalidated in the meanwhile due to a realloc
138 req = (ClientRequestHdr*)msg->GetBuffer();
139 // Release the SID as the request was never send
140 sidMgr->ReleaseSID( req->streamid );
141 delete msgHandler;
142 return st;
143 }
144 return XRootDStatus();
145 }
146
147 //----------------------------------------------------------------------------
148 // Redirect a message
149 //----------------------------------------------------------------------------
151 Message *msg,
152 ResponseHandler *handler,
153 MessageSendParams &sendParams,
154 LocalFileHandler *lFileHandler )
155 {
156 //--------------------------------------------------------------------------
157 // Register a new virtual redirector
158 //--------------------------------------------------------------------------
160 Status st = registry.Register( url );
161 if( !st.IsOK() )
162 return st;
163
164 //--------------------------------------------------------------------------
165 // Get the stuff needed to send the message
166 //--------------------------------------------------------------------------
167 Log *log = DefaultEnv::GetLog();
169
170 if( !postMaster )
172
173 log->Dump( XRootDMsg, "[%s] Redirecting message %s",
174 url.GetHostId().c_str(), msg->GetObfuscatedDescription().c_str() );
175
177
178 //--------------------------------------------------------------------------
179 // Create and set up the message handler
180 //--------------------------------------------------------------------------
181 XRootDMsgHandler *msgHandler;
182 msgHandler = new XRootDMsgHandler( msg, handler, &url, std::shared_ptr<SIDManager>(), lFileHandler );
183 msgHandler->SetExpiration( sendParams.expires );
184 msgHandler->SetRedirectAsAnswer( !sendParams.followRedirects );
185 msgHandler->SetOksofarAsAnswer( sendParams.chunkedResponse );
186 msgHandler->SetChunkList( sendParams.chunkList );
187 msgHandler->SetRedirectCounter( sendParams.redirectLimit );
188 msgHandler->SetFollowMetalink( true );
189
190 HostInfo info( url, true );
192 sendParams.loadBalancer = info;
193 msgHandler->SetLoadBalancer( info );
194
195 HostList *list = 0;
196 list = new HostList();
197 list->push_back( info );
198 msgHandler->SetHostList( list );
199
200 //--------------------------------------------------------------------------
201 // Redirect the message
202 //--------------------------------------------------------------------------
203 st = postMaster->Redirect( url, msg, msgHandler );
204 if( !st.IsOK() )
205 {
207 log->Error( XRootDMsg, "[%s] Unable to send the message %s: %s",
208 url.GetHostId().c_str(), msg->GetObfuscatedDescription().c_str(),
209 st.ToString().c_str() );
210 delete msgHandler;
211 delete list;
212 return st;
213 }
214 return Status();
215 }
216
217 //----------------------------------------------------------------------------
218 // Process sending params
219 //----------------------------------------------------------------------------
221 {
222 //--------------------------------------------------------------------------
223 // Timeout
224 //--------------------------------------------------------------------------
225 Env *env = DefaultEnv::GetEnv();
226 if( sendParams.timeout == 0 )
227 {
228 int requestTimeout = DefaultRequestTimeout;
229 env->GetInt( "RequestTimeout", requestTimeout );
230 sendParams.timeout = requestTimeout;
231 }
232
233 if( sendParams.expires == 0 )
234 sendParams.expires = ::time(0)+sendParams.timeout;
235
236 //--------------------------------------------------------------------------
237 // Redirect limit
238 //--------------------------------------------------------------------------
239 if( sendParams.redirectLimit == 0 )
240 {
241 int redirectLimit = DefaultRedirectLimit;
242 env->GetInt( "RedirectLimit", redirectLimit );
243 sendParams.redirectLimit = redirectLimit;
244 }
245 }
246
247 //----------------------------------------------------------------------------
249 //----------------------------------------------------------------------------
251 const URL::ParamsMap &newCgi,
252 bool replace,
253 const std::string &newPath )
254 {
255 ClientRequest *req = (ClientRequest *)msg->GetBuffer();
256 switch( req->header.requestid )
257 {
258 case kXR_chmod:
259 case kXR_mkdir:
260 case kXR_mv:
261 case kXR_open:
262 case kXR_rm:
263 case kXR_rmdir:
264 case kXR_stat:
265 case kXR_truncate:
266 {
267 //----------------------------------------------------------------------
268 // Get the pointer to the appropriate path
269 //----------------------------------------------------------------------
270 char *path = msg->GetBuffer( 24 );
271 size_t length = req->header.dlen;
272 if( req->header.requestid == kXR_mv )
273 {
274 for( int i = 0; i < req->header.dlen; ++i, ++path, --length )
275 if( *path == ' ' )
276 break;
277 ++path;
278 --length;
279 }
280
281 //----------------------------------------------------------------------
282 // Create a fake URL from an existing CGI
283 //----------------------------------------------------------------------
284 char *pathWithNull = new char[length+1];
285 memcpy( pathWithNull, path, length );
286 pathWithNull[length] = 0;
287 std::ostringstream o;
288 o << "fake://fake:111/" << pathWithNull;
289 delete [] pathWithNull;
290
291 URL currentPath( o.str() );
292 URL::ParamsMap currentCgi = currentPath.GetParams();
293 MergeCGI( currentCgi, newCgi, replace );
294 currentPath.SetParams( currentCgi );
295 if( !newPath.empty() )
296 currentPath.SetPath( newPath );
297 std::string newPathWitParams = currentPath.GetPathWithFilteredParams();
298
299 //----------------------------------------------------------------------
300 // Write the path with the new cgi appended to the message
301 //----------------------------------------------------------------------
302 uint32_t newDlen = req->header.dlen - length + newPathWitParams.size();
303 msg->ReAllocate( 24+newDlen );
304 req = (ClientRequest *)msg->GetBuffer();
305 path = msg->GetBuffer( 24 );
306 if( req->header.requestid == kXR_mv )
307 {
308 for( int i = 0; i < req->header.dlen; ++i, ++path )
309 if( *path == ' ' )
310 break;
311 ++path;
312 }
313 memcpy( path, newPathWitParams.c_str(), newPathWitParams.size() );
314 req->header.dlen = newDlen;
315 break;
316 }
317 case kXR_locate:
318 {
319 Env *env = DefaultEnv::GetEnv();
320 int preserveLocateTried = DefaultPreserveLocateTried;
321 env->GetInt( "PreserveLocateTried", preserveLocateTried );
322
323 if( !preserveLocateTried ) break;
324
325 //----------------------------------------------------------------------
326 // In case of locate we only want to preserve tried/triedrc CGI info
327 //----------------------------------------------------------------------
328 URL::ParamsMap triedCgi;
329 URL::ParamsMap::const_iterator itr = newCgi.find( "triedrc" );
330 if( itr != newCgi.end() )
331 triedCgi[itr->first] = itr->second;
332 itr = newCgi.find( "tried" );
333 if( itr != newCgi.end() )
334 triedCgi[itr->first] = itr->second;
335
336 //----------------------------------------------------------------------
337 // Is there anything to do?
338 //----------------------------------------------------------------------
339 if( triedCgi.empty() ) break;
340
341 //----------------------------------------------------------------------
342 // Get the pointer to the appropriate path
343 //----------------------------------------------------------------------
344 char *path = msg->GetBuffer( 24 );
345 size_t length = req->header.dlen;
346
347 //----------------------------------------------------------------------
348 // Create a fake URL from an existing CGI
349 //----------------------------------------------------------------------
350 std::string strpath( path, length );
351 std::ostringstream o;
352 o << "fake://fake:111/" << strpath;
353
354 URL currentPath( o.str() );
355 URL::ParamsMap currentCgi = currentPath.GetParams();
356 MergeCGI( currentCgi, triedCgi, replace );
357 currentPath.SetParams( currentCgi );
358 std::string pathWitParams = currentPath.GetPathWithFilteredParams();
359
360 //----------------------------------------------------------------------
361 // Write the path with the new cgi appended to the message
362 //----------------------------------------------------------------------
363 uint32_t newDlen = pathWitParams.size();
364 msg->ReAllocate( 24+newDlen );
365 req = (ClientRequest *)msg->GetBuffer();
366 path = msg->GetBuffer( 24 );
367 memcpy( path, pathWitParams.c_str(), pathWitParams.size() );
368 req->header.dlen = newDlen;
369 break;
370 }
371 }
373 }
374
375 //------------------------------------------------------------------------
377 //------------------------------------------------------------------------
379 const URL::ParamsMap &cgi2,
380 bool replace )
381 {
382 URL::ParamsMap::const_iterator it;
383 for( it = cgi2.begin(); it != cgi2.end(); ++it )
384 {
385 if( replace || cgi1.find( it->first ) == cgi1.end() )
386 cgi1[it->first] = it->second;
387 else
388 {
389 std::string &v = cgi1[it->first];
390 if( v.empty() )
391 v = it->second;
392 else
393 {
394 v += ',';
395 v += it->second;
396 }
397 }
398 }
399 }
400
401 //------------------------------------------------------------------------
403 //------------------------------------------------------------------------
404 Status MessageUtils::CreateXAttrVec( const std::vector<xattr_t> &attrs,
405 std::vector<char> &avec )
406 {
407 if( attrs.empty() )
408 return Status();
409
410 if( attrs.size() > xfaLimits::kXR_faMaxVars )
411 return Status( stError, errInvalidArgs );
412
413 //----------------------------------------------------------------------
414 // Calculate the name and value vector lengths
415 //----------------------------------------------------------------------
416
417 // 2 bytes for rc + 1 byte for null character at the end
418 static const int name_overhead = 3;
419 // 4 bytes for value length
420 static const int value_overhead = 4;
421
422 size_t nlen = 0, vlen = 0;
423 for( auto itr = attrs.begin(); itr != attrs.end(); ++itr )
424 {
425 nlen += std::get<xattr_name>( *itr ).size() + name_overhead;
426 vlen += std::get<xattr_value>( *itr ).size() + value_overhead;
427 }
428
429 if( nlen > xfaLimits::kXR_faMaxNlen )
430 return Status( stError, errInvalidArgs );
431
432 if( vlen > xfaLimits::kXR_faMaxVlen )
433 return Status( stError, errInvalidArgs );
434
435 //----------------------------------------------------------------------
436 // Create name and value vectors
437 //----------------------------------------------------------------------
438 avec.resize( nlen + vlen, 0 );
439 char *nvec = avec.data(), *vvec = avec.data() + nlen;
440
441 for( auto itr = attrs.begin(); itr != attrs.end(); ++itr )
442 {
443 const std::string &name = std::get<xattr_name>( *itr );
444 nvec = ClientFattrRequest::NVecInsert( name.c_str(), nvec );
445 const std::string &value = std::get<xattr_value>( *itr );
446 vvec = ClientFattrRequest::VVecInsert( value.c_str(), vvec );
447 }
448
449 return Status();
450 }
451
452 //------------------------------------------------------------------------
453 // Create xattr name vector vector
454 //------------------------------------------------------------------------
455 Status MessageUtils::CreateXAttrVec( const std::vector<std::string> &attrs,
456 std::vector<char> &nvec )
457 {
458 if( attrs.empty() )
459 return Status();
460
461 if( attrs.size() > xfaLimits::kXR_faMaxVars )
462 return Status( stError, errInvalidArgs );
463
464 //----------------------------------------------------------------------
465 // Calculate the name and value vector lengths
466 //----------------------------------------------------------------------
467
468 // 2 bytes for rc + 1 byte for null character at the end
469 static const int name_overhead = 3;
470
471 size_t nlen = 0;
472 for( auto itr = attrs.begin(); itr != attrs.end(); ++itr )
473 nlen += itr->size() + name_overhead;
474
475 if( nlen > xfaLimits::kXR_faMaxNlen )
476 return Status( stError, errInvalidArgs );
477
478 //----------------------------------------------------------------------
479 // Create name vector
480 //----------------------------------------------------------------------
481 nvec.resize( nlen, 0 );
482 char *nptr = nvec.data();
483
484 for( auto itr = attrs.begin(); itr != attrs.end(); ++itr )
485 nptr = ClientFattrRequest::NVecInsert( itr->c_str(), nptr );
486
487 return Status();
488 }
489}
#define kXR_isManager
#define kXR_attrMeta
kXR_char streamid[2]
Definition XProtocol.hh:156
static const int kXR_ckpXeq
Definition XProtocol.hh:216
struct ClientRequestHdr header
Definition XProtocol.hh:846
kXR_unt16 requestid
Definition XProtocol.hh:157
@ kXR_open
Definition XProtocol.hh:122
@ kXR_mkdir
Definition XProtocol.hh:120
@ kXR_chmod
Definition XProtocol.hh:114
@ kXR_rm
Definition XProtocol.hh:126
@ kXR_rmdir
Definition XProtocol.hh:127
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_mv
Definition XProtocol.hh:121
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_chkpoint
Definition XProtocol.hh:124
@ kXR_locate
Definition XProtocol.hh:139
#define kXR_attrVirtRdr
@ kXR_faMaxVars
Definition XProtocol.hh:280
@ kXR_faMaxVlen
Definition XProtocol.hh:282
@ kXR_faMaxNlen
Definition XProtocol.hh:281
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:849
void ReAllocate(uint32_t size)
Reallocate the buffer to a new location of a given size.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static Status CreateXAttrVec(const std::vector< xattr_t > &attrs, std::vector< char > &avec)
Create xattr vector.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static void RewriteCGIAndPath(Message *msg, const URL::ParamsMap &newCgi, bool replace, const std::string &newPath)
Append cgi to the one already present in the message.
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
A hub for dispatching and receiving messages.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
XRootDStatus Register(const URL &url)
Creates a new virtual redirector and registers it (async).
Handle an async response.
static SIDMgrPool & Instance()
URL representation.
Definition XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
void SetParams(const std::string &params)
Set params.
Definition XrdClURL.cc:402
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
Definition XrdClURL.cc:331
void SetPath(const std::string &path)
Set the path.
Definition XrdClURL.hh:225
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:244
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:452
Handle/Process/Forward XRootD messages.
void SetRedirectCounter(uint16_t redirectCounter)
Set the redirect counter.
void SetFollowMetalink(bool followMetalink)
void SetChunkList(ChunkList *chunkList)
Set the chunk list.
void SetHostList(HostList *hostList)
Set host list.
void SetCrc32cDigests(std::vector< uint32_t > &&crc32cDigests)
void SetLoadBalancer(const HostInfo &loadBalancer)
Set the load balancer.
void SetOksofarAsAnswer(bool oksofarAsAnswer)
void SetKernelBuffer(XrdSys::KernelBuffer *kbuff)
Set the kernel buffer.
void SetExpiration(time_t expiration)
Set a timestamp after which we give up.
void SetRedirectAsAnswer(bool redirectAsAnswer)
static void SetDescription(Message *msg)
Get the description of a message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
const int DefaultPreserveLocateTried
const int DefaultRedirectLimit
const uint16_t errUninitialized
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t XRootDMsg
std::vector< HostInfo > HostList
const uint16_t errInvalidArgs
const int DefaultRequestTimeout
static char * VVecInsert(const char *value, char *buffer)
Definition XProtocol.cc:188
static char * NVecInsert(const char *name, char *buffer)
Definition XProtocol.cc:172
URL url
URL of the host.
uint32_t flags
Host type.
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
Procedure execution status.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.