XRootD
Loading...
Searching...
No Matches
XrdCl::TaskManager Class Reference

#include <XrdClTaskManager.hh>

+ Collaboration diagram for XrdCl::TaskManager:

Public Member Functions

 TaskManager ()
 Constructor.
 
 ~TaskManager ()
 Destructor.
 
void RegisterTask (Task *task, time_t time, bool own=true)
 
void RunTasks ()
 Run the tasks - this loops infinitely.
 
bool Start ()
 Start the manager.
 
bool Stop ()
 
void UnregisterTask (Task *task)
 

Detailed Description

Run short tasks at a given time in the future

The task manager just runs one extra thread so the execution of one tasks may interfere with the execution of another

Definition at line 75 of file XrdClTaskManager.hh.

Constructor & Destructor Documentation

◆ TaskManager()

XrdCl::TaskManager::TaskManager ( )

Constructor.

Definition at line 48 of file XrdClTaskManager.cc.

48 : pResolution(1), pRunnerThread(0), pRunning(false)
49 {}

◆ ~TaskManager()

XrdCl::TaskManager::~TaskManager ( )

Destructor.

Definition at line 54 of file XrdClTaskManager.cc.

55 {
56 TaskSet::iterator it, itE;
57 for( it = pTasks.begin(); it != pTasks.end(); ++it )
58 if( it->own )
59 delete it->task;
60 }

Member Function Documentation

◆ RegisterTask()

void XrdCl::TaskManager::RegisterTask ( Task task,
time_t  time,
bool  own = true 
)

Run the given task at the given time.

Parameters
tasktask to be run
timetime at which the task should be run
owndetermines whether the task object should be destroyed when no longer needed

Definition at line 127 of file XrdClTaskManager.cc.

128 {
129 Log *log = DefaultEnv::GetLog();
130
131 log->Debug( TaskMgrMsg, "Registering task: \"%s\" to be run at: [%s]",
132 task->GetName().c_str(), Utils::TimeToString(time).c_str() );
133
134 XrdSysMutexHelper scopedLock( pMutex );
135 pTasks.insert( TaskHelper( task, time, own ) );
136 }
static Log * GetLog()
Get default log.
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
const uint64_t TaskMgrMsg
XrdSysError Log
Definition XrdConfig.cc:113

References XrdCl::Log::Debug(), XrdCl::DefaultEnv::GetLog(), XrdCl::Task::GetName(), XrdCl::TaskMgrMsg, and XrdCl::Utils::TimeToString().

Referenced by XrdCl::Channel::Channel(), XrdCl::ForkHandler::Child(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Stream::OnConnectError(), and XrdCl::XRootDMsgHandler::Process().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RunTasks()

void XrdCl::TaskManager::RunTasks ( )

Run the tasks - this loops infinitely.

Definition at line 153 of file XrdClTaskManager.cc.

154 {
155 Log *log = DefaultEnv::GetLog();
156
157 //--------------------------------------------------------------------------
158 // We want the thread to be cancelable only when we sleep between tasks
159 // execution
160 //--------------------------------------------------------------------------
161 pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, 0 );
162
163 for(;;)
164 {
165 pthread_setcancelstate( PTHREAD_CANCEL_DISABLE, 0 );
166 pMutex.Lock();
167
168 //------------------------------------------------------------------------
169 // Remove the tasks from the active set - super inefficient,
170 // but, hopefully, never really necessary. We first need to build a list
171 // of iterators because it is impossible to remove elements from
172 // a multiset when iterating over it
173 //------------------------------------------------------------------------
174 TaskList::iterator listIt = pToBeUnregistered.begin();
175 TaskSet::iterator it, itE;
176 std::list<TaskSet::iterator> iteratorList;
177 std::list<TaskSet::iterator>::iterator itRem;
178 for( ; listIt != pToBeUnregistered.end(); ++listIt )
179 {
180 for( it = pTasks.begin(); it != pTasks.end(); ++it )
181 {
182 if( it->task == *listIt )
183 iteratorList.push_back( it );
184 }
185 }
186
187 for( itRem = iteratorList.begin(); itRem != iteratorList.end(); ++itRem )
188 {
189 Task *tsk = (*itRem)->task;
190 bool own = (*itRem)->own;
191 log->Debug( TaskMgrMsg, "Removing task: \"%s\"", tsk->GetName().c_str() );
192 pTasks.erase( *itRem );
193 if( own )
194 delete tsk;
195 }
196
197 pToBeUnregistered.clear();
198
199 //------------------------------------------------------------------------
200 // Select the tasks to be run
201 //------------------------------------------------------------------------
202 time_t now = time(0);
203 std::list<TaskHelper> toRun;
204 std::list<TaskHelper>::iterator trIt;
205
206 it = pTasks.begin();
207 itE = pTasks.upper_bound( TaskHelper( 0, now ) );
208
209 for( ; it != itE; ++it )
210 toRun.push_back( TaskHelper( it->task, 0, it->own ) );
211
212 pTasks.erase( pTasks.begin(), itE );
213 pMutex.UnLock();
214
215 //------------------------------------------------------------------------
216 // Run the tasks and reinsert them if necessary
217 //------------------------------------------------------------------------
218 for( trIt = toRun.begin(); trIt != toRun.end(); ++trIt )
219 {
220 log->Dump( TaskMgrMsg, "Running task: \"%s\"",
221 trIt->task->GetName().c_str() );
222 time_t schedule = trIt->task->Run( now );
223 if( schedule )
224 {
225 log->Dump( TaskMgrMsg, "Will rerun task \"%s\" at [%s]",
226 trIt->task->GetName().c_str(),
227 Utils::TimeToString(schedule).c_str() );
228 pMutex.Lock();
229 pTasks.insert( TaskHelper( trIt->task, schedule, trIt->own ) );
230 pMutex.UnLock();
231 }
232 else
233 {
234 log->Debug( TaskMgrMsg, "Done with task: \"%s\"",
235 trIt->task->GetName().c_str() );
236 if( trIt->own )
237 delete trIt->task;
238 }
239 }
240
241 //------------------------------------------------------------------------
242 // Enable the cancellation and go to sleep
243 //------------------------------------------------------------------------
244 pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, 0 );
245 pthread_testcancel();
246 XrdSysTimer::Wait( pResolution*1000 );
247 }
248 }
static void Wait(int milliseconds)

References XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::Task::GetName(), XrdSysMutex::Lock(), XrdCl::TaskMgrMsg, XrdCl::Utils::TimeToString(), XrdSysMutex::UnLock(), and XrdSysTimer::Wait().

Referenced by RunRunnerThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Start()

bool XrdCl::TaskManager::Start ( )

Start the manager.

Definition at line 65 of file XrdClTaskManager.cc.

66 {
67 XrdSysMutexHelper scopedLock( pOpMutex );
68 Log *log = DefaultEnv::GetLog();
69 log->Debug( TaskMgrMsg, "Starting the task manager..." );
70
71 if( pRunning )
72 {
73 log->Error( TaskMgrMsg, "The task manager is already running" );
74 return false;
75 }
76
77 int ret = ::pthread_create( &pRunnerThread, 0, ::RunRunnerThread, this );
78 if( ret != 0 )
79 {
80 log->Error( TaskMgrMsg, "Unable to spawn the task runner thread: %s",
81 XrdSysE2T( errno ) );
82 return false;
83 }
84 pRunning = true;
85 log->Debug( TaskMgrMsg, "Task manager started" );
86 return true;
87 }
static void * RunRunnerThread(void *arg)
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104

References XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), RunRunnerThread(), XrdCl::TaskMgrMsg, and XrdSysE2T().

+ Here is the call graph for this function:

◆ Stop()

bool XrdCl::TaskManager::Stop ( )

Stop the manager

Will wait until the currently running task completes

Definition at line 92 of file XrdClTaskManager.cc.

93 {
94 XrdSysMutexHelper scopedLock( pOpMutex );
95 Log *log = DefaultEnv::GetLog();
96 log->Debug( TaskMgrMsg, "Stopping the task manager..." );
97 if( !pRunning )
98 {
99 log->Error( TaskMgrMsg, "The task manager is not running" );
100 return false;
101 }
102
103 if( ::pthread_cancel( pRunnerThread ) != 0 )
104 {
105 log->Error( TaskMgrMsg, "Unable to cancel the task runner thread: %s",
106 XrdSysE2T( errno ) );
107 return false;
108 }
109
110 void *threadRet;
111 int ret = pthread_join( pRunnerThread, (void **)&threadRet );
112 if( ret != 0 )
113 {
114 log->Error( TaskMgrMsg, "Failed to join the task runner thread: %s",
115 XrdSysE2T( errno ) );
116 return false;
117 }
118
119 pRunning = false;
120 log->Debug( TaskMgrMsg, "Task manager stopped" );
121 return true;
122 }

References XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::TaskMgrMsg, and XrdSysE2T().

+ Here is the call graph for this function:

◆ UnregisterTask()

void XrdCl::TaskManager::UnregisterTask ( Task task)

Remove a task, the unregistration process is asynchronous and may be performed at any point in the future, the function just queues the request. Unregistered task gets destroyed if it was owned by the task manager.

Definition at line 141 of file XrdClTaskManager.cc.

142 {
143 Log *log = DefaultEnv::GetLog();
144 log->Debug( TaskMgrMsg, "Requesting unregistration of: \"%s\"",
145 task->GetName().c_str() );
146 XrdSysMutexHelper scopedLock( pMutex );
147 pToBeUnregistered.push_back( task );
148 }

References XrdCl::Log::Debug(), XrdCl::DefaultEnv::GetLog(), XrdCl::Task::GetName(), and XrdCl::TaskMgrMsg.

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: