XRootD
Loading...
Searching...
No Matches
XrdClChannel.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#include "XrdCl/XrdClChannel.hh"
27#include "XrdCl/XrdClStream.hh"
28#include "XrdCl/XrdClSocket.hh"
30#include "XrdCl/XrdClLog.hh"
34
35#include <ctime>
36
37namespace XrdCl
38{
40 {
41 public:
42 //------------------------------------------------------------------------
43 // Constructor
44 //------------------------------------------------------------------------
45 TickGeneratorTask( XrdCl::Channel *channel, const std::string &hostId ):
46 pChannel( channel )
47 {
48 std::string name = "TickGeneratorTask for: ";
49 name += hostId;
50 SetName( name );
51 }
52
53 //------------------------------------------------------------------------
54 // Run the task
55 //------------------------------------------------------------------------
56 time_t Run( time_t now )
57 {
58 XrdSysMutexHelper lck( pMtx );
59 if( !pChannel ) return 0;
60
61 using namespace XrdCl;
62 pChannel->Tick( now );
63
64 Env *env = DefaultEnv::GetEnv();
65 int timeoutResolution = DefaultTimeoutResolution;
66 env->GetInt( "TimeoutResolution", timeoutResolution );
67 return now+timeoutResolution;
68 }
69
71 {
72 XrdSysMutexHelper lck( pMtx );
73 pChannel = 0;
74 }
75
76 private:
77 XrdCl::Channel *pChannel;
78 XrdSysMutex pMtx;
79 };
80
81 //----------------------------------------------------------------------------
82 // Constructor
83 //----------------------------------------------------------------------------
84 Channel::Channel( const URL &url,
85 Poller *poller,
86 TransportHandler *transport,
87 TaskManager *taskManager,
88 JobManager *jobManager,
89 const URL &prefurl ):
90 pUrl( url.GetHostId() ),
91 pPoller( poller ),
92 pTransport( transport ),
93 pTaskManager( taskManager ),
94 pTickGenerator( 0 ),
95 pJobManager( jobManager )
96 {
97 Env *env = DefaultEnv::GetEnv();
98 Log *log = DefaultEnv::GetLog();
99
100 int timeoutResolution = DefaultTimeoutResolution;
101 env->GetInt( "TimeoutResolution", timeoutResolution );
102
103 pTransport->InitializeChannel( url, pChannelData );
104 log->Debug( PostMasterMsg, "Creating new channel to: %s",
105 url.GetChannelId().c_str() );
106
107 pUrl.SetParams( url.GetParams() );
108 pUrl.SetProtocol( url.GetProtocol() );
109
110 //--------------------------------------------------------------------------
111 // Create the stream
112 //--------------------------------------------------------------------------
113 pStream = new Stream( &pUrl, prefurl );
114 pStream->SetTransport( transport );
115 pStream->SetPoller( poller );
116 pStream->SetIncomingQueue( &pIncoming );
117 pStream->SetTaskManager( taskManager );
118 pStream->SetJobManager( jobManager );
119 pStream->SetChannelData( &pChannelData );
120 pStream->Initialize();
121
122 //--------------------------------------------------------------------------
123 // Register the task generating timeout events
124 //--------------------------------------------------------------------------
125 pTickGenerator = new TickGeneratorTask( this, pUrl.GetChannelId() );
126 pTaskManager->RegisterTask( pTickGenerator, ::time(0)+timeoutResolution );
127 }
128
129 //----------------------------------------------------------------------------
130 // Destructor
131 //----------------------------------------------------------------------------
133 {
134 pTickGenerator->Invalidate();
135 delete pStream;
136 pTransport->FinalizeChannel( pChannelData );
137 }
138
139 //----------------------------------------------------------------------------
140 // Send the message asynchronously
141 //----------------------------------------------------------------------------
143 MsgHandler *handler,
144 bool stateful,
145 time_t expires )
146
147 {
148 return pStream->Send( msg, handler, stateful, expires );
149 }
150
151 //----------------------------------------------------------------------------
152 // Handle a time event
153 //----------------------------------------------------------------------------
154 void Channel::Tick( time_t now )
155 {
156 pStream->Tick( now );
157 }
158
159 //----------------------------------------------------------------------------
160 // Force disconnect of all streams
161 //----------------------------------------------------------------------------
163 {
164 return ForceDisconnect(false);
165 }
166
167 //----------------------------------------------------------------------------
168 // Force disconnect of all streams
169 //----------------------------------------------------------------------------
171 {
172 //--------------------------------------------------------------------------
173 // Disconnect and recreate the streams
174 //--------------------------------------------------------------------------
175 pStream->ForceError( Status( stError, errOperationInterrupted ), hush );
176
177 return Status();
178 }
179
180 //----------------------------------------------------------------------------
181 // Force reconnect
182 //----------------------------------------------------------------------------
184 {
185 pStream->ForceConnect();
186 return Status();
187 }
188
189 //------------------------------------------------------------------------
190 // Get the number of connected data streams
191 //------------------------------------------------------------------------
193 {
194 return XRootDTransport::NbConnectedStrm( pChannelData );
195 }
196
197 //------------------------------------------------------------------------
198 // Set the on-connect handler for data streams
199 //------------------------------------------------------------------------
200 void Channel::SetOnDataConnectHandler( std::shared_ptr<Job> &onConnJob )
201 {
202 pStream->SetOnDataConnectHandler( onConnJob );
203 }
204
205 //------------------------------------------------------------------------
206 // Check if channel can be collapsed using given URL
207 //------------------------------------------------------------------------
208 bool Channel::CanCollapse( const URL &url )
209 {
210 return pStream->CanCollapse( url );
211 }
212
213 //------------------------------------------------------------------------
214 // Decrement file object instance count bound to this channel
215 //------------------------------------------------------------------------
217 {
218 pTransport->DecFileInstCnt( pChannelData );
219 }
220
221 //----------------------------------------------------------------------------
222 // Query the transport handler
223 //----------------------------------------------------------------------------
224 Status Channel::QueryTransport( uint16_t query, AnyObject &result )
225 {
226 if( query < 2000 )
227 return pTransport->Query( query, result, pChannelData );
228 return pStream->Query( query, result );
229 }
230
231 //----------------------------------------------------------------------------
232 // Register channel event handler
233 //----------------------------------------------------------------------------
235 {
236 pStream->RegisterEventHandler( handler );
237 }
238
239 //------------------------------------------------------------------------
240 // Remove a channel event handler
241 //------------------------------------------------------------------------
243 {
244 pStream->RemoveEventHandler( handler );
245 }
246}
A communication channel between the client and the server.
uint16_t NbConnectedStrm()
Get the number of connected data streams.
Status ForceReconnect()
Force reconnect.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
~Channel()
Destructor.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void DecFileInstCnt()
Decrement file object instance count bound to this channel.
Status ForceDisconnect()
Force disconnect of all streams.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
void Tick(time_t now)
Handle a time event.
bool CanCollapse(const URL &url)
Status QueryTransport(uint16_t query, AnyObject &result)
Channel(const URL &url, Poller *poller, TransportHandler *transport, TaskManager *taskManager, JobManager *jobManager, const URL &prefurl=URL())
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
A synchronized queue.
Handle diagnostics.
Definition XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
The message representation used throughout the system.
Interface for socket pollers.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
void SetTransport(TransportHandler *transport)
Set the transport.
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
bool CanCollapse(const URL &url)
void SetPoller(Poller *poller)
Set the poller.
void ForceConnect()
Force connection.
void ForceError(XRootDStatus status, bool hush=false)
Force error.
void SetTaskManager(TaskManager *taskManager)
Set task manager.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
void SetJobManager(JobManager *jobManager)
Set job manager.
Status Query(uint16_t query, AnyObject &result)
Query the stream.
void Tick(time_t now)
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void SetChannelData(AnyObject *channelData)
Set the channel data.
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
void SetName(const std::string &name)
Set name of the task.
time_t Run(time_t now)
TickGeneratorTask(XrdCl::Channel *channel, const std::string &hostId)
Perform the handshake and the authentication for each physical stream.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual void FinalizeChannel(AnyObject &channelData)=0
Finalize channel.
virtual void InitializeChannel(const URL &url, AnyObject &channelData)=0
Initialize channel.
virtual void DecFileInstCnt(AnyObject &channelData)=0
Decrement file object instance count bound to this channel.
URL representation.
Definition XrdClURL.hh:31
std::string GetChannelId() const
Definition XrdClURL.cc:512
void SetParams(const std::string &params)
Set params.
Definition XrdClURL.cc:402
void SetProtocol(const std::string &protocol)
Set protocol.
Definition XrdClURL.hh:126
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:244
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:118
static uint16_t NbConnectedStrm(AnyObject &channelData)
Number of currently connected data streams.
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const int DefaultTimeoutResolution
const uint16_t errOperationInterrupted
Procedure execution status.