XRootD
Loading...
Searching...
No Matches
XrdCl::Stream Class Reference

Stream. More...

#include <XrdClStream.hh>

+ Collaboration diagram for XrdCl::Stream:

Public Types

enum  StreamStatus {
  Disconnected = 0 ,
  Connected = 1 ,
  Connecting = 2 ,
  Error = 3
}
 Status of the stream. More...
 

Public Member Functions

 Stream (const URL *url, const URL &prefer=URL())
 Constructor.
 
 ~Stream ()
 Destructor.
 
bool CanCollapse (const URL &url)
 
void DisableIfEmpty (uint16_t subStream)
 Disables respective uplink if empty.
 
void Disconnect (bool force=false)
 Disconnect the stream.
 
XRootDStatus EnableLink (PathID &path)
 
void ForceConnect ()
 Force connection.
 
void ForceError (XRootDStatus status, bool hush=false)
 Force error.
 
const std::string & GetName () const
 Return stream name.
 
const URLGetURL () const
 Get the URL.
 
XRootDStatus Initialize ()
 Initializer.
 
uint16_t InspectStatusRsp (uint16_t stream, MsgHandler *&incHandler)
 
MsgHandlerInstallIncHandler (std::shared_ptr< Message > &msg, uint16_t stream)
 
void OnConnect (uint16_t subStream)
 Call back when a message has been reconstructed.
 
void OnConnectError (uint16_t subStream, XRootDStatus status)
 On connect error.
 
void OnError (uint16_t subStream, XRootDStatus status)
 On error.
 
void OnIncoming (uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
 Call back when a message has been reconstructed.
 
void OnMessageSent (uint16_t subStream, Message *msg, uint32_t bytesSent)
 
bool OnReadTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On read timeout.
 
std::pair< Message *, MsgHandler * > OnReadyToWrite (uint16_t subStream)
 
bool OnWriteTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On write timeout.
 
Status Query (uint16_t query, AnyObject &result)
 Query the stream.
 
void RegisterEventHandler (ChannelEventHandler *handler)
 Register channel event handler.
 
void RemoveEventHandler (ChannelEventHandler *handler)
 Remove a channel event handler.
 
XRootDStatus Send (Message *msg, MsgHandler *handler, bool stateful, time_t expires)
 Queue the message for sending.
 
void SetChannelData (AnyObject *channelData)
 Set the channel data.
 
void SetIncomingQueue (InQueue *incomingQueue)
 Set the incoming queue.
 
void SetJobManager (JobManager *jobManager)
 Set job manager.
 
void SetOnDataConnectHandler (std::shared_ptr< Job > &onConnJob)
 Set the on-connect handler for data streams.
 
void SetPoller (Poller *poller)
 Set the poller.
 
void SetTaskManager (TaskManager *taskManager)
 Set task manager.
 
void SetTransport (TransportHandler *transport)
 Set the transport.
 
void Tick (time_t now)
 

Detailed Description

Stream.

Definition at line 51 of file XrdClStream.hh.

Member Enumeration Documentation

◆ StreamStatus

Status of the stream.

Enumerator
Disconnected 

Not connected.

Connected 

Connected.

Connecting 

In the process of being connected.

Error 

Broken.

Definition at line 57 of file XrdClStream.hh.

58 {
59 Disconnected = 0,
60 Connected = 1,
61 Connecting = 2,
62 Error = 3
63 };
@ Disconnected
Not connected.
@ Error
Broken.
@ Connected
Connected.
@ Connecting
In the process of being connected.

Constructor & Destructor Documentation

◆ Stream()

XrdCl::Stream::Stream ( const URL url,
const URL prefer = URL() 
)

Constructor.

Definition at line 96 of file XrdClStream.cc.

96 :
97 pUrl( url ),
98 pPrefer( prefer ),
99 pTransport( 0 ),
100 pPoller( 0 ),
101 pTaskManager( 0 ),
102 pJobManager( 0 ),
103 pIncomingQueue( 0 ),
104 pChannelData( 0 ),
105 pLastStreamError( 0 ),
106 pConnectionCount( 0 ),
107 pConnectionInitTime( 0 ),
108 pAddressType( Utils::IPAll ),
109 pSessionId( 0 ),
110 pBytesSent( 0 ),
111 pBytesReceived( 0 )
112 {
113 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
115
116 std::ostringstream o;
117 o << pUrl->GetHostId();
118 pStreamName = o.str();
119
120 pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
122 pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
124 pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
126
127 std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
129
130 pAddressType = Utils::String2AddressType( netStack );
131 if( pAddressType == Utils::AddressType::IPAuto )
132 {
134 if( !( stacks & XrdNetUtils::hasIP64 ) )
135 {
136 if( stacks & XrdNetUtils::hasIPv4 )
137 pAddressType = Utils::AddressType::IPv4;
138 else if( stacks & XrdNetUtils::hasIPv6 )
139 pAddressType = Utils::AddressType::IPv6;
140 }
141 }
142
143 Log *log = DefaultEnv::GetLog();
144 log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
145 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146 "Window: %d", pStreamName.c_str(), netStack.c_str(),
147 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
148 }
static Log * GetLog()
Get default log.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
@ qryINIF
Only consider internet protocols via ifconfig.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
XrdSysError Log
Definition XrdConfig.cc:113

References XrdCl::Log::Debug(), XrdCl::DefaultConnectionRetry, XrdCl::DefaultConnectionWindow, XrdCl::DefaultNetworkStack, XrdCl::DefaultStreamErrorWindow, XrdCl::URL::GetHostId(), XrdCl::Utils::GetIntParameter(), XrdCl::DefaultEnv::GetLog(), XrdCl::Utils::GetStringParameter(), XrdNetUtils::hasIP64, XrdNetUtils::hasIPv4, XrdNetUtils::hasIPv6, XrdCl::Utils::IPAuto, XrdCl::Utils::IPv4, XrdCl::Utils::IPv6, XrdNetUtils::NetConfig(), XrdCl::PostMasterMsg, XrdNetUtils::qryINIF, and XrdCl::Utils::String2AddressType().

+ Here is the call graph for this function:

◆ ~Stream()

XrdCl::Stream::~Stream ( )

Destructor.

Definition at line 153 of file XrdClStream.cc.

154 {
155 Disconnect( true );
156
157 Log *log = DefaultEnv::GetLog();
158 log->Debug( PostMasterMsg, "[%s] Destroying stream",
159 pStreamName.c_str() );
160
161 MonitorDisconnection( XRootDStatus() );
162
163 SubStreamList::iterator it;
164 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
165 delete *it;
166 }
void Disconnect(bool force=false)
Disconnect the stream.

References XrdCl::Log::Debug(), Disconnect(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

Member Function Documentation

◆ CanCollapse()

bool XrdCl::Stream::CanCollapse ( const URL url)
Returns
: true is this channel can be collapsed using this URL, false otherwise

Definition at line 1197 of file XrdClStream.cc.

1198 {
1199 Log *log = DefaultEnv::GetLog();
1200
1201 //--------------------------------------------------------------------------
1202 // Resolve all the addresses of the host we're supposed to connect to
1203 //--------------------------------------------------------------------------
1204 std::vector<XrdNetAddr> prefaddrs;
1205 XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1206 if( !st.IsOK() )
1207 {
1208 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1209 , pStreamName.c_str(), url.GetHostName().c_str() );
1210 return false;
1211 }
1212
1213 //--------------------------------------------------------------------------
1214 // Resolve all the addresses of the alias
1215 //--------------------------------------------------------------------------
1216 std::vector<XrdNetAddr> aliasaddrs;
1217 st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1218 if( !st.IsOK() )
1219 {
1220 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1221 , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1222 return false;
1223 }
1224
1225 //--------------------------------------------------------------------------
1226 // Now check if the preferred host is part of the alias
1227 //--------------------------------------------------------------------------
1228 auto itr = prefaddrs.begin();
1229 for( ; itr != prefaddrs.end() ; ++itr )
1230 {
1231 auto itr2 = aliasaddrs.begin();
1232 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1233 if( itr->Same( &*itr2 ) ) return true;
1234 }
1235
1236 return false;
1237 }
const std::string & GetHostName() const
Get the name of the target host.
Definition XrdClURL.hh:170
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.

References XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::Channel::CanCollapse().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DisableIfEmpty()

void XrdCl::Stream::DisableIfEmpty ( uint16_t  subStream)

Disables respective uplink if empty.

Definition at line 585 of file XrdClStream.cc.

586 {
587 XrdSysMutexHelper scopedLock( pMutex );
588 Log *log = DefaultEnv::GetLog();
589
590 if( pSubStreams[subStream]->outQueue->IsEmpty() )
591 {
592 log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
593 pSubStreams[subStream]->socket->GetStreamName().c_str() );
594 pSubStreams[subStream]->socket->DisableUplink();
595 }
596 }

References XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::AsyncSocketHandler::OnWrite().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Disconnect()

void XrdCl::Stream::Disconnect ( bool  force = false)

Disconnect the stream.

Definition at line 363 of file XrdClStream.cc.

364 {
365 XrdSysMutexHelper scopedLock( pMutex );
366 SubStreamList::iterator it;
367 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
368 {
369 (*it)->socket->Close();
370 (*it)->status = Socket::Disconnected;
371 }
372 }
@ Disconnected
The socket is disconnected.

References XrdCl::Socket::Disconnected.

Referenced by ~Stream().

+ Here is the caller graph for this function:

◆ EnableLink()

XRootDStatus XrdCl::Stream::EnableLink ( PathID path)

Connect if needed, otherwise make sure that the underlying socket handler gets write readiness events, it will update the path with what it has actually enabled

Definition at line 187 of file XrdClStream.cc.

188 {
189 XrdSysMutexHelper scopedLock( pMutex );
190
191 //--------------------------------------------------------------------------
192 // We are in the process of connecting the main stream, so we do nothing
193 // because when the main stream connection is established it will connect
194 // all the other streams
195 //--------------------------------------------------------------------------
196 if( pSubStreams[0]->status == Socket::Connecting )
197 return XRootDStatus();
198
199 //--------------------------------------------------------------------------
200 // The main stream is connected, so we can verify whether we have
201 // the up and the down stream connected and ready to handle data.
202 // If anything is not right we fall back to stream 0.
203 //--------------------------------------------------------------------------
204 if( pSubStreams[0]->status == Socket::Connected )
205 {
206 if( pSubStreams[path.down]->status != Socket::Connected )
207 path.down = 0;
208
209 if( pSubStreams[path.up]->status == Socket::Disconnected )
210 {
211 path.up = 0;
212 return pSubStreams[0]->socket->EnableUplink();
213 }
214
215 if( pSubStreams[path.up]->status == Socket::Connected )
216 return pSubStreams[path.up]->socket->EnableUplink();
217
218 return XRootDStatus();
219 }
220
221 //--------------------------------------------------------------------------
222 // The main stream is not connected, we need to check whether enough time
223 // has passed since we last encountered an error (if any) so that we could
224 // re-attempt the connection
225 //--------------------------------------------------------------------------
226 Log *log = DefaultEnv::GetLog();
227 time_t now = ::time(0);
228
229 if( now-pLastStreamError < pStreamErrorWindow )
230 return pLastFatalError;
231
232 gettimeofday( &pConnectionStarted, 0 );
233 ++pConnectionCount;
234
235 //--------------------------------------------------------------------------
236 // Resolve all the addresses of the host we're supposed to connect to
237 //--------------------------------------------------------------------------
238 XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
239 if( !st.IsOK() )
240 {
241 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
242 "the host", pStreamName.c_str() );
243 pLastStreamError = now;
244 st.status = stFatal;
245 pLastFatalError = st;
246 return st;
247 }
248
249 if( pPrefer.IsValid() )
250 {
251 std::vector<XrdNetAddr> addrresses;
252 XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
253 if( !st.IsOK() )
254 {
255 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
256 pStreamName.c_str(), pPrefer.GetHostName().c_str() );
257 }
258 else
259 {
260 std::vector<XrdNetAddr> tmp;
261 tmp.reserve( pAddresses.size() );
262 // first add all remaining addresses
263 auto itr = pAddresses.begin();
264 for( ; itr != pAddresses.end() ; ++itr )
265 {
266 if( !HasNetAddr( *itr, addrresses ) )
267 tmp.push_back( *itr );
268 }
269 // then copy all 'preferred' addresses
270 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
271 // and keep the result
272 pAddresses.swap( tmp );
273 }
274 }
275
277 pAddresses );
278
279 while( !pAddresses.empty() )
280 {
281 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282 pAddresses.pop_back();
283 pConnectionInitTime = ::time( 0 );
284 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
285 if( st.IsOK() )
286 {
287 pSubStreams[0]->status = Socket::Connecting;
288 break;
289 }
290 }
291 return st;
292 }
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:452
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
const uint16_t stFatal
Fatal error, it's still an error.

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostId(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), XrdCl::Utils::LogHostAddresses(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stFatal, and XrdCl::PathID::up.

Referenced by ForceConnect(), OnConnectError(), OnError(), and Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceConnect()

void XrdCl::Stream::ForceConnect ( )

Force connection.

Definition at line 347 of file XrdClStream.cc.

348 {
349 XrdSysMutexHelper scopedLock( pMutex );
350 if( pSubStreams[0]->status == Socket::Connecting )
351 {
352 pSubStreams[0]->status = Socket::Disconnected;
353 XrdCl::PathID path( 0, 0 );
354 XrdCl::XRootDStatus st = EnableLink( path );
355 if( !st.IsOK() )
356 OnConnectError( 0, st );
357 }
358 }
XRootDStatus EnableLink(PathID &path)
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
bool IsOK() const
We're fine.

References XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Status::IsOK(), and OnConnectError().

Referenced by XrdCl::Channel::ForceReconnect().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceError()

void XrdCl::Stream::ForceError ( XRootDStatus  status,
bool  hush = false 
)

Force error.

Definition at line 933 of file XrdClStream.cc.

934 {
935 XrdSysMutexHelper scopedLock( pMutex );
936 Log *log = DefaultEnv::GetLog();
937 for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
938 {
939 if( pSubStreams[substream]->status != Socket::Connected ) continue;
940 pSubStreams[substream]->socket->Close();
941 pSubStreams[substream]->status = Socket::Disconnected;
942
943 if( !hush )
944 log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
945 pStreamName.c_str(), status.ToString().c_str() );
946
947 //--------------------------------------------------------------------
948 // Reinsert the stuff that we have failed to sent
949 //--------------------------------------------------------------------
950 if( pSubStreams[substream]->outMsgHelper.msg )
951 {
952 OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
953 pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
954 h.stateful );
955 pIncomingQueue->RemoveMessageHandler(h.handler);
956 pSubStreams[substream]->outMsgHelper.Reset();
957 }
958
959 //--------------------------------------------------------------------
960 // Reinsert the receiving handler and reset any partially read partial
961 //--------------------------------------------------------------------
962 if( pSubStreams[substream]->inMsgHelper.handler )
963 {
964 InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
965 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
966 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
967 if( xrdHandler ) xrdHandler->PartialReceived();
968 h.Reset();
969 }
970 }
971
972 pConnectionCount = 0;
973
974 //------------------------------------------------------------------------
975 // We're done here, unlock the stream mutex to avoid deadlocks and
976 // report the disconnection event to the handlers
977 //------------------------------------------------------------------------
978 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
979 "message handlers.", pStreamName.c_str() );
980
981 SubStreamList::iterator it;
982 OutQueue q;
983 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
984 q.GrabItems( *(*it)->outQueue );
985 scopedLock.UnLock();
986
987 q.Report( status );
988
989 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
990 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
991 }
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
@ Broken
The stream is broken.

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::OutQueue::MsgHelper::expires, XrdCl::InMessageHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabItems(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::InMessageHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::InQueue::RemoveMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::Channel::ForceDisconnect(), and XrdCl::AsyncSocketHandler::OnHeaderCorruption().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetName()

const std::string & XrdCl::Stream::GetName ( ) const
inline

Return stream name.

Definition at line 170 of file XrdClStream.hh.

171 {
172 return pStreamName;
173 }

◆ GetURL()

const URL * XrdCl::Stream::GetURL ( ) const
inline

Get the URL.

Definition at line 157 of file XrdClStream.hh.

158 {
159 return pUrl;
160 }

Referenced by XrdCl::AsyncSocketHandler::OnConnectionReturn().

+ Here is the caller graph for this function:

◆ Initialize()

XRootDStatus XrdCl::Stream::Initialize ( )

Initializer.

Definition at line 171 of file XrdClStream.cc.

172 {
173 if( !pTransport || !pPoller || !pChannelData )
174 return XRootDStatus( stError, errUninitialized );
175
176 AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
177 pChannelData, 0, this );
178 pSubStreams.push_back( new SubStreamData() );
179 pSubStreams[0]->socket = s;
180 return XRootDStatus();
181 }
const uint16_t errUninitialized
const uint16_t stError
An error occurred that could potentially be retried.

References XrdCl::errUninitialized, and XrdCl::stError.

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::Stream::InspectStatusRsp ( uint16_t  stream,
MsgHandler *&  incHandler 
)

In case the message is a kXR_status response it needs further attention

Returns
: a MsgHandler in case we need to read out raw data

Definition at line 1166 of file XrdClStream.cc.

1168 {
1169 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1170 if( !mh.handler )
1172
1173 uint16_t action = mh.handler->InspectStatusRsp();
1174 mh.action |= action;
1175
1176 if( action & MsgHandler::RemoveHandler )
1177 pIncomingQueue->RemoveMessageHandler( mh.handler );
1178
1179 if( action & MsgHandler::Raw )
1180 {
1181 incHandler = mh.handler;
1182 return MsgHandler::Raw;
1183 }
1184
1185 if( action & MsgHandler::Corrupted )
1186 return MsgHandler::Corrupted;
1187
1188 if( action & MsgHandler::More )
1189 return MsgHandler::More;
1190
1191 return MsgHandler::None;
1192 }
@ More
there are more (non-raw) data to be read

References XrdCl::InMessageHelper::action, XrdCl::MsgHandler::Corrupted, XrdCl::InMessageHelper::handler, XrdCl::MsgHandler::InspectStatusRsp(), XrdCl::MsgHandler::More, XrdCl::MsgHandler::None, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, and XrdCl::InQueue::RemoveMessageHandler().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ InstallIncHandler()

MsgHandler * XrdCl::Stream::InstallIncHandler ( std::shared_ptr< Message > &  msg,
uint16_t  stream 
)

Install a message handler for the given message if there is one available, if the handler want's to be called in the raw mode it will be returned, the message ownership flag is returned in any case

Parameters
msgmessage header
streamstream concerned
Returns
a pair containing the handler and ownership flag

Definition at line 1145 of file XrdClStream.cc.

1146 {
1147 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1148 if( !mh.handler )
1149 mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1150 mh.expires,
1151 mh.action );
1152
1153 if( !mh.handler )
1154 return nullptr;
1155
1156 if( mh.action & MsgHandler::Raw )
1157 return mh.handler;
1158 return nullptr;
1159 }
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)

References XrdCl::InMessageHelper::action, XrdCl::InMessageHelper::expires, XrdCl::InQueue::GetHandlerForMessage(), XrdCl::InMessageHelper::handler, and XrdCl::MsgHandler::Raw.

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnect()

void XrdCl::Stream::OnConnect ( uint16_t  subStream)

Call back when a message has been reconstructed.

Definition at line 623 of file XrdClStream.cc.

624 {
625 XrdSysMutexHelper scopedLock( pMutex );
626 pSubStreams[subStream]->status = Socket::Connected;
627
628 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
629 Log *log = DefaultEnv::GetLog();
630 log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
631 subStream, ipstack.c_str() );
632
633 if( subStream == 0 )
634 {
635 pLastStreamError = 0;
636 pLastFatalError = XRootDStatus();
637 pConnectionCount = 0;
638 uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
639 pSessionId = ++sSessCntGen;
640
641 //------------------------------------------------------------------------
642 // Create the streams if they don't exist yet
643 //------------------------------------------------------------------------
644 if( pSubStreams.size() == 1 && numSub > 1 )
645 {
646 for( uint16_t i = 1; i < numSub; ++i )
647 {
648 URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
649 AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
650 pChannelData, i, this );
651 pSubStreams.push_back( new SubStreamData() );
652 pSubStreams[i]->socket = s;
653 }
654 }
655
656 //------------------------------------------------------------------------
657 // Connect the extra streams, if we fail we move all the outgoing items
658 // to stream 0, we don't need to enable the uplink here, because it
659 // should be already enabled after the handshaking process is completed.
660 //------------------------------------------------------------------------
661 if( pSubStreams.size() > 1 )
662 {
663 log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
664 pStreamName.c_str(), pSubStreams.size() - 1 );
665 for( size_t i = 1; i < pSubStreams.size(); ++i )
666 {
667 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
668 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
669 if( !st.IsOK() )
670 {
671 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
672 pSubStreams[i]->socket->Close();
673 }
674 else
675 {
676 pSubStreams[i]->status = Socket::Connecting;
677 }
678 }
679 }
680
681 //------------------------------------------------------------------------
682 // Inform monitoring
683 //------------------------------------------------------------------------
684 pBytesSent = 0;
685 pBytesReceived = 0;
686 gettimeofday( &pConnectionDone, 0 );
687 Monitor *mon = DefaultEnv::GetMonitor();
688 if( mon )
689 {
690 Monitor::ConnectInfo i;
691 i.server = pUrl->GetHostId();
692 i.sTOD = pConnectionStarted;
693 i.eTOD = pConnectionDone;
694 i.streams = pSubStreams.size();
695
696 AnyObject qryResult;
697 std::string *qryResponse = nullptr;
698 pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
699 qryResult.Get( qryResponse );
700
701 if (qryResponse) {
702 i.auth = *qryResponse;
703 delete qryResponse;
704 } else {
705 i.auth = "";
706 }
707
708 mon->Event( Monitor::EvConnect, &i );
709 }
710
711 //------------------------------------------------------------------------
712 // For every connected control-stream call the global on-connect handler
713 //------------------------------------------------------------------------
715 }
716 else if( pOnDataConnJob )
717 {
718 //------------------------------------------------------------------------
719 // For every connected data-stream call the on-connect handler
720 //------------------------------------------------------------------------
721 pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
722 }
723 }
static Monitor * GetMonitor()
Get the monitor object.
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
@ EvConnect
ConnectInfo: Login into a server.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
static const uint16_t Auth
Transport name, returns std::string *.

References XrdCl::Monitor::ConnectInfo::auth, XrdCl::TransportQuery::Auth, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Monitor::ConnectInfo::eTOD, XrdCl::Monitor::EvConnect, XrdCl::Monitor::Event(), XrdCl::AnyObject::Get(), XrdCl::TransportHandler::GetBindPreference(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnectHandler(), XrdCl::PostMasterMsg, XrdCl::TransportHandler::Query(), XrdCl::JobManager::QueueJob(), XrdCl::Monitor::ConnectInfo::server, XrdCl::Monitor::ConnectInfo::sTOD, XrdCl::Monitor::ConnectInfo::streams, and XrdCl::TransportHandler::SubStreamNumber().

Referenced by XrdCl::AsyncSocketHandler::HandShakeNextStep().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnectError()

void XrdCl::Stream::OnConnectError ( uint16_t  subStream,
XRootDStatus  status 
)

On connect error.

Definition at line 728 of file XrdClStream.cc.

729 {
730 XrdSysMutexHelper scopedLock( pMutex );
731 Log *log = DefaultEnv::GetLog();
732 pSubStreams[subStream]->socket->Close();
733 time_t now = ::time(0);
734
735 //--------------------------------------------------------------------------
736 // For every connection error call the global connection error handler
737 //--------------------------------------------------------------------------
739
740 //--------------------------------------------------------------------------
741 // If we connected subStream == 0 and cannot connect >0 then we just give
742 // up and move the outgoing messages to another queue
743 //--------------------------------------------------------------------------
744 if( subStream > 0 )
745 {
746 pSubStreams[subStream]->status = Socket::Disconnected;
747 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
748 if( pSubStreams[0]->status == Socket::Connected )
749 {
750 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
751 if( !st.IsOK() )
752 OnFatalError( 0, st, scopedLock );
753 return;
754 }
755
756 if( pSubStreams[0]->status == Socket::Connecting )
757 return;
758
759 OnFatalError( subStream, status, scopedLock );
760 return;
761 }
762
763 //--------------------------------------------------------------------------
764 // Check if we still have time to try and do something in the current window
765 //--------------------------------------------------------------------------
766 time_t elapsed = now-pConnectionInitTime;
767 log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
768 pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
769
770 //------------------------------------------------------------------------
771 // If we have some IP addresses left we try them
772 //------------------------------------------------------------------------
773 if( !pAddresses.empty() )
774 {
775 XRootDStatus st;
776 do
777 {
778 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
779 pAddresses.pop_back();
780 pConnectionInitTime = ::time( 0 );
781 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
782 }
783 while( !pAddresses.empty() && !st.IsOK() );
784
785 if( !st.IsOK() )
786 OnFatalError( subStream, st, scopedLock );
787
788 return;
789 }
790 //------------------------------------------------------------------------
791 // If we still can retry with the same host name, we sleep until the end
792 // of the connection window and try
793 //------------------------------------------------------------------------
794 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
795 && !status.IsFatal() )
796 {
797 log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
798 pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
799
800 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
801 pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
802 return;
803 }
804 //--------------------------------------------------------------------------
805 // We are out of the connection window, the only thing we can do here
806 // is re-resolving the host name and retrying if we still can
807 //--------------------------------------------------------------------------
808 else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
809 {
810 pAddresses.clear();
811 pSubStreams[0]->status = Socket::Disconnected;
812 PathID path( 0, 0 );
813 XRootDStatus st = EnableLink( path );
814 if( !st.IsOK() )
815 OnFatalError( subStream, st, scopedLock );
816 return;
817 }
818
819 //--------------------------------------------------------------------------
820 // Else, we fail
821 //--------------------------------------------------------------------------
822 OnFatalError( subStream, status, scopedLock );
823 }
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void RegisterTask(Task *task, time_t time, bool own=true)

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Log::Info(), XrdCl::Status::IsFatal(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnErrHandler(), XrdCl::PostMasterMsg, and XrdCl::TaskManager::RegisterTask().

Referenced by ForceConnect(), XrdCl::AsyncSocketHandler::OnConnectionReturn(), and XrdCl::AsyncSocketHandler::OnFaultWhileHandshaking().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnError()

void XrdCl::Stream::OnError ( uint16_t  subStream,
XRootDStatus  status 
)

On error.

Definition at line 828 of file XrdClStream.cc.

829 {
830 XrdSysMutexHelper scopedLock( pMutex );
831 Log *log = DefaultEnv::GetLog();
832 pSubStreams[subStream]->socket->Close();
833 pSubStreams[subStream]->status = Socket::Disconnected;
834
835 log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
836 pStreamName.c_str(), subStream, status.ToString().c_str() );
837
838 //--------------------------------------------------------------------------
839 // Reinsert the stuff that we have failed to sent
840 //--------------------------------------------------------------------------
841 if( pSubStreams[subStream]->outMsgHelper.msg )
842 {
843 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
844 pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
845 h.stateful );
846 pIncomingQueue->RemoveMessageHandler(h.handler);
847 pSubStreams[subStream]->outMsgHelper.Reset();
848 }
849
850 //--------------------------------------------------------------------------
851 // Reinsert the receiving handler and reset any partially read partial
852 //--------------------------------------------------------------------------
853 if( pSubStreams[subStream]->inMsgHelper.handler )
854 {
855 InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
856 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
857 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
858 if( xrdHandler ) xrdHandler->PartialReceived();
859 h.Reset();
860 }
861
862 //--------------------------------------------------------------------------
863 // We are dealing with an error of a peripheral stream. If we don't have
864 // anything to send don't bother recovering. Otherwise move the requests
865 // to stream 0 if possible.
866 //--------------------------------------------------------------------------
867 if( subStream > 0 )
868 {
869 if( pSubStreams[subStream]->outQueue->IsEmpty() )
870 return;
871
872 if( pSubStreams[0]->status != Socket::Disconnected )
873 {
874 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
875 if( pSubStreams[0]->status == Socket::Connected )
876 {
877 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
878 if( !st.IsOK() )
879 OnFatalError( 0, st, scopedLock );
880 return;
881 }
882 }
883 OnFatalError( subStream, status, scopedLock );
884 return;
885 }
886
887 //--------------------------------------------------------------------------
888 // If we lost the stream 0 we have lost the session, we re-enable the
889 // stream if we still have things in one of the outgoing queues, otherwise
890 // there is not point to recover at this point.
891 //--------------------------------------------------------------------------
892 if( subStream == 0 )
893 {
894 MonitorDisconnection( status );
895
896 SubStreamList::iterator it;
897 size_t outstanding = 0;
898 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
899 outstanding += (*it)->outQueue->GetSizeStateless();
900
901 if( outstanding )
902 {
903 PathID path( 0, 0 );
904 XRootDStatus st = EnableLink( path );
905 if( !st.IsOK() )
906 {
907 OnFatalError( 0, st, scopedLock );
908 return;
909 }
910 }
911
912 //------------------------------------------------------------------------
913 // We're done here, unlock the stream mutex to avoid deadlocks and
914 // report the disconnection event to the handlers
915 //------------------------------------------------------------------------
916 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
917 "message handlers.", pStreamName.c_str() );
918 OutQueue q;
919 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
920 q.GrabStateful( *(*it)->outQueue );
921 scopedLock.UnLock();
922
923 q.Report( status );
924 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
925 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
926 return;
927 }
928 }

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, EnableLink(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::InMessageHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabStateful(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::InMessageHelper::handler, XrdCl::Status::IsOK(), XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::InQueue::RemoveMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnFault(), and OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnIncoming()

void XrdCl::Stream::OnIncoming ( uint16_t  subStream,
std::shared_ptr< Message msg,
uint32_t  bytesReceived 
)

Call back when a message has been reconstructed.

Definition at line 471 of file XrdClStream.cc.

474 {
475 msg->SetSessionId( pSessionId );
476 pBytesReceived += bytesReceived;
477
478 MsgHandler *handler = nullptr;
479 uint16_t action = 0;
480 {
481 InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
482 handler = mh.handler;
483 action = mh.action;
484 mh.Reset();
485 }
486
487 if( !IsPartial( *msg ) )
488 {
489 uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
490 *pChannelData );
491 if( streamAction & TransportHandler::DigestMsg )
492 return;
493
494 if( streamAction & TransportHandler::RequestClose )
495 {
496 RequestClose( *msg );
497 return;
498 }
499 }
500
501 Log *log = DefaultEnv::GetLog();
502
503 //--------------------------------------------------------------------------
504 // No handler, we discard the message ...
505 //--------------------------------------------------------------------------
506 if( !handler )
507 {
508 ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
509 log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
510 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511 pStreamName.c_str(), (void*)msg.get(), rsp->hdr.status,
512 rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
513 return;
514 }
515
516 //--------------------------------------------------------------------------
517 // We have a handler, so we call the callback
518 //--------------------------------------------------------------------------
519 log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
520 pStreamName.c_str(), (void*)msg.get() );
521
523 {
524 log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
525 pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
526
527 // if we are handling partial response we have to take down the timeout fence
528 if( IsPartial( *msg ) )
529 {
530 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
531 if( xrdHandler ) xrdHandler->PartialReceived();
532 }
533
534 return;
535 }
536
537 Job *job = new HandleIncMsgJob( handler );
538 pJobManager->QueueJob( job );
539 }
kXR_char streamid[2]
Definition XProtocol.hh:914
ServerResponseHeader hdr
@ Ignore
Ignore the message.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.

References XrdCl::InMessageHelper::action, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::InMessageHelper::handler, ServerResponse::hdr, XrdCl::MsgHandler::Ignore, XrdCl::TransportHandler::MessageReceived(), XrdCl::MsgHandler::NoProcess, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::JobManager::QueueJob(), XrdCl::TransportHandler::RequestClose, XrdCl::InMessageHelper::Reset(), ServerResponseHeader::status, ServerResponseHeader::streamid, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnMessageSent()

void XrdCl::Stream::OnMessageSent ( uint16_t  subStream,
Message msg,
uint32_t  bytesSent 
)

Definition at line 601 of file XrdClStream.cc.

604 {
605 pTransport->MessageSent( msg, subStream, bytesSent,
606 *pChannelData );
607 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
608 pBytesSent += bytesSent;
609 if( h.handler )
610 {
611 // ensure expiration time is assigned if still in queue
612 pIncomingQueue->AssignTimeout( h.handler );
613 // OnStatusReady may cause the handler to delete itself, in
614 // which case the handler or the user callback may also delete msg
615 h.handler->OnStatusReady( msg, XRootDStatus() );
616 }
617 pSubStreams[subStream]->outMsgHelper.Reset();
618 }
void AssignTimeout(MsgHandler *handler)
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.

References XrdCl::InQueue::AssignTimeout(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::TransportHandler::MessageSent(), and XrdCl::MsgHandler::OnStatusReady().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadTimeout()

bool XrdCl::Stream::OnReadTimeout ( uint16_t  subStream)

On read timeout.

Definition at line 1050 of file XrdClStream.cc.

1051 {
1052 //--------------------------------------------------------------------------
1053 // We only take the main stream into account
1054 //--------------------------------------------------------------------------
1055 if( substream != 0 )
1056 return true;
1057
1058 //--------------------------------------------------------------------------
1059 // Check if there is no outgoing messages and if the stream TTL is elapesed.
1060 // It is assumed that the underlying transport makes sure that there is no
1061 // pending requests that are not answered, ie. all possible virtual streams
1062 // are de-allocated
1063 //--------------------------------------------------------------------------
1064 Log *log = DefaultEnv::GetLog();
1065 SubStreamList::iterator it;
1066 time_t now = time(0);
1067
1068 XrdSysMutexHelper scopedLock( pMutex );
1069 uint32_t outgoingMessages = 0;
1070 time_t lastActivity = 0;
1071 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1072 {
1073 outgoingMessages += (*it)->outQueue->GetSize();
1074 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1075 if( lastActivity < sockLastActivity )
1076 lastActivity = sockLastActivity;
1077 }
1078
1079 if( !outgoingMessages )
1080 {
1081 bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1082 *pChannelData );
1083 if( disconnect )
1084 {
1085 log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1086 pStreamName.c_str() );
1087 scopedLock.UnLock();
1088 //----------------------------------------------------------------------
1089 // Important note!
1090 //
1091 // This destroys the Stream object itself, the underlined
1092 // AsyncSocketHandler object (that called this method) and the Channel
1093 // object that aggregates this Stream.
1094 //
1095 // Additionally &(*pUrl) is used by ForceDisconnect to check if we are
1096 // in a Channel that was previously collapsed in a redirect.
1097 //----------------------------------------------------------------------
1099 return false;
1100 }
1101 }
1102
1103 //--------------------------------------------------------------------------
1104 // Check if the stream is broken
1105 //--------------------------------------------------------------------------
1106 XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1107 *pChannelData );
1108 if( !st.IsOK() )
1109 {
1110 scopedLock.UnLock();
1111 OnError( substream, st );
1112 return false;
1113 }
1114 return true;
1115 }
Status ForceDisconnect(const URL &url)
Shut down a channel.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0

References XrdCl::Log::Debug(), XrdCl::PostMaster::ForceDisconnect(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::IsStreamBroken(), XrdCl::TransportHandler::IsStreamTTLElapsed(), OnError(), XrdCl::PostMasterMsg, and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadyToWrite()

std::pair< Message *, MsgHandler * > XrdCl::Stream::OnReadyToWrite ( uint16_t  subStream)

Definition at line 545 of file XrdClStream.cc.

546 {
547 XrdSysMutexHelper scopedLock( pMutex );
548 Log *log = DefaultEnv::GetLog();
549 if( pSubStreams[subStream]->outQueue->IsEmpty() )
550 {
551 log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
552 pSubStreams[subStream]->socket->GetStreamName().c_str() );
553
554 pSubStreams[subStream]->socket->DisableUplink();
555 return std::make_pair( (Message *)0, (MsgHandler *)0 );
556 }
557
558 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
559 h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
560 h.expires,
561 h.stateful );
562
563 log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
564 "from out-queue to in-queue, starting to send outgoing.",
565 pUrl->GetHostId().c_str(), (void*)h.handler,
566 h.msg->GetObfuscatedDescription().c_str() );
567
568 scopedLock.UnLock();
569
570 if( h.handler )
571 {
572 bool rmMsg = false;
573 pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
574 if( rmMsg )
575 {
576 Log *log = DefaultEnv::GetLog();
577 log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
578 pStreamName.c_str() );
579 }
580 h.handler->OnReadyToSend( h.msg );
581 }
582 return std::make_pair( h.msg, h.handler );
583 }
void AddMessageHandler(MsgHandler *handler, bool &rmMsg)

References XrdCl::InQueue::AddMessageHandler(), XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::MsgHandler::OnReadyToSend(), XrdCl::PostMasterMsg, XrdCl::OutQueue::MsgHelper::stateful, XrdSysMutexHelper::UnLock(), and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnWriteTimeout()

bool XrdCl::Stream::OnWriteTimeout ( uint16_t  subStream)

On write timeout.

Definition at line 1120 of file XrdClStream.cc.

1121 {
1122 return true;
1123 }

Referenced by XrdCl::AsyncSocketHandler::OnWriteTimeout().

+ Here is the caller graph for this function:

◆ Query()

Status XrdCl::Stream::Query ( uint16_t  query,
AnyObject result 
)

Query the stream.

Definition at line 1242 of file XrdClStream.cc.

1243 {
1244 switch( query )
1245 {
1247 {
1248 result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1249 return Status();
1250 }
1251
1253 {
1254 result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1255 return Status();
1256 }
1257
1259 {
1260 result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1261 return Status();
1262 }
1263
1264 default:
1265 return Status( stError, errQueryNotSupported );
1266 }
1267 }
const uint16_t errQueryNotSupported
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack

References XrdCl::errQueryNotSupported, XrdCl::StreamQuery::HostName, XrdCl::StreamQuery::IpAddr, XrdCl::StreamQuery::IpStack, XrdCl::AnyObject::Set(), and XrdCl::stError.

Referenced by XrdCl::Channel::QueryTransport().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RegisterEventHandler()

void XrdCl::Stream::RegisterEventHandler ( ChannelEventHandler handler)

Register channel event handler.

Definition at line 1128 of file XrdClStream.cc.

1129 {
1130 pChannelEvHandlers.AddHandler( handler );
1131 }
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.

References XrdCl::ChannelHandlerList::AddHandler().

Referenced by XrdCl::Channel::RegisterEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RemoveEventHandler()

void XrdCl::Stream::RemoveEventHandler ( ChannelEventHandler handler)

Remove a channel event handler.

Definition at line 1136 of file XrdClStream.cc.

1137 {
1138 pChannelEvHandlers.RemoveHandler( handler );
1139 }
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.

References XrdCl::ChannelHandlerList::RemoveHandler().

Referenced by XrdCl::Channel::RemoveEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Send()

XRootDStatus XrdCl::Stream::Send ( Message msg,
MsgHandler handler,
bool  stateful,
time_t  expires 
)

Queue the message for sending.

Definition at line 297 of file XrdClStream.cc.

301 {
302 XrdSysMutexHelper scopedLock( pMutex );
303 Log *log = DefaultEnv::GetLog();
304
305 //--------------------------------------------------------------------------
306 // Check the session ID and bounce if needed
307 //--------------------------------------------------------------------------
308 if( msg->GetSessionId() &&
309 (pSubStreams[0]->status != Socket::Connected ||
310 pSessionId != msg->GetSessionId()) )
311 return XRootDStatus( stError, errInvalidSession );
312
313 //--------------------------------------------------------------------------
314 // Decide on the path to send the message
315 //--------------------------------------------------------------------------
316 PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317 if( pSubStreams.size() <= path.up )
318 {
319 log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
320 "substream %d, using 0 instead", pStreamName.c_str(),
321 msg->GetObfuscatedDescription().c_str(), path.up );
322 path.up = 0;
323 }
324
325 log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
326 "substream %d expecting answer at %d", pStreamName.c_str(),
327 msg->GetObfuscatedDescription().c_str(), (void*)msg, path.up, path.down );
328
329 //--------------------------------------------------------------------------
330 // Enable *a* path and insert the message to the right queue
331 //--------------------------------------------------------------------------
332 XRootDStatus st = EnableLink( path );
333 if( st.IsOK() )
334 {
335 pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336 pSubStreams[path.up]->outQueue->PushBack( msg, handler,
337 expires, stateful );
338 }
339 else
340 st.status = stFatal;
341 return st;
342 }
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
const uint16_t errInvalidSession

References XrdCl::Socket::Connected, XrdCl::PathID::down, XrdCl::Log::Dump(), EnableLink(), XrdCl::errInvalidSession, XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Message::GetSessionId(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::MultiplexSubStream(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stError, XrdCl::stFatal, XrdCl::PathID::up, and XrdCl::Log::Warning().

Referenced by XrdCl::Channel::Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetChannelData()

void XrdCl::Stream::SetChannelData ( AnyObject channelData)
inline

Set the channel data.

Definition at line 115 of file XrdClStream.hh.

116 {
117 pChannelData = channelData;
118 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetIncomingQueue()

void XrdCl::Stream::SetIncomingQueue ( InQueue incomingQueue)
inline

Set the incoming queue.

Definition at line 107 of file XrdClStream.hh.

108 {
109 pIncomingQueue = incomingQueue;
110 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetJobManager()

void XrdCl::Stream::SetJobManager ( JobManager jobManager)
inline

Set job manager.

Definition at line 131 of file XrdClStream.hh.

132 {
133 pJobManager = jobManager;
134 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetOnDataConnectHandler()

void XrdCl::Stream::SetOnDataConnectHandler ( std::shared_ptr< Job > &  onConnJob)
inline

Set the on-connect handler for data streams.

Definition at line 263 of file XrdClStream.hh.

264 {
265 XrdSysMutexHelper scopedLock( pMutex );
266 pOnDataConnJob = onConnJob;
267 }

Referenced by XrdCl::Channel::SetOnDataConnectHandler().

+ Here is the caller graph for this function:

◆ SetPoller()

void XrdCl::Stream::SetPoller ( Poller poller)
inline

Set the poller.

Definition at line 99 of file XrdClStream.hh.

100 {
101 pPoller = poller;
102 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTaskManager()

void XrdCl::Stream::SetTaskManager ( TaskManager taskManager)
inline

Set task manager.

Definition at line 123 of file XrdClStream.hh.

124 {
125 pTaskManager = taskManager;
126 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTransport()

void XrdCl::Stream::SetTransport ( TransportHandler transport)
inline

Set the transport.

Definition at line 91 of file XrdClStream.hh.

92 {
93 pTransport = transport;
94 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ Tick()

void XrdCl::Stream::Tick ( time_t  now)

Handle a clock event generated either by socket timeout, or by the task manager event

Definition at line 377 of file XrdClStream.cc.

378 {
379 //--------------------------------------------------------------------------
380 // Check for timed-out requests and incoming handlers
381 //--------------------------------------------------------------------------
382 pMutex.Lock();
383 OutQueue q;
384 SubStreamList::iterator it;
385 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
386 q.GrabExpired( *(*it)->outQueue, now );
387 pMutex.UnLock();
388
389 q.Report( XRootDStatus( stError, errOperationExpired ) );
390 pIncomingQueue->ReportTimeout( now );
391 }
void ReportTimeout(time_t now=0)
Timeout handlers.
const uint16_t errOperationExpired

References XrdCl::errOperationExpired, XrdCl::OutQueue::GrabExpired(), XrdSysMutex::Lock(), XrdCl::OutQueue::Report(), XrdCl::InQueue::ReportTimeout(), XrdCl::stError, and XrdSysMutex::UnLock().

Referenced by XrdCl::Channel::Tick().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: