XRootD
Loading...
Searching...
No Matches
XrdSysIOEventsPollE.icc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d S y s I O E v e n t s P o l l E . i c c */
4/* */
5/* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdlib>
32#include <sys/types.h>
33#include <sys/stat.h>
34#include <sys/epoll.h>
35
37#include "XrdSys/XrdSysE2T.hh"
38#ifndef Atomic
39#define Atomic(x) x
40#endif
41
42
43/******************************************************************************/
44/* C l a s s P o l l E */
45/******************************************************************************/
46
47namespace XrdSys
48{
49namespace IOEvents
50{
51class PollE : public Poller
52{
53public:
54
55static int AllocMem(void **memP, int slots);
56
57 PollE(struct epoll_event *ptab, int numfd, int pfd, int pFD[2])
58 : Poller(pFD[0], pFD[1]), pollTab(ptab), cbNow(0),
59 pollDfd(pfd), pollMax(numfd), pollNum(1), numPoll(0),
60 cbCurr(0)
61 {}
62 ~PollE() {Stop();}
63
64protected:
65
66 void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg);
67
68 void Exclude(Channel *cP, bool &isLocked, bool dover=1);
69
70 bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
71
72 bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
73
74 void Shutdown();
75
76private:
77 int AllocPT(int slots);
78 void Dispatch(Channel *cP, uint32_t pollEv);
79 bool Process(int curr);
80
81struct epoll_event *pollTab;
82 Channel *cbNow;
83 int pollDfd;
84 int pollMax;
85 Atomic(int) pollNum;
86 int numPoll;
87 int cbCurr;
88static void *deadChP;
89};
90 void *PollE::deadChP = 0;
91};
92};
93
94/******************************************************************************/
95/* C l a s s P o l l e r */
96/******************************************************************************/
97/******************************************************************************/
98/* Static: n e w P o l l e r */
99/******************************************************************************/
100
102XrdSys::IOEvents::Poller::newPoller(int pipeFD[2],
103 int &eNum,
104 const char **eTxt)
105
106{
107 static const int allocFD = 1024;
108 struct epoll_event *pp, myEvent = {(EPOLLIN | EPOLLPRI), {0}};
109 int pfd;
110
111// Open the /dev/poll driver
112//
113#ifndef EPOLL_CLOEXEC
114 if ((pfd = epoll_create(allocFD)) >= 0) fcntl(pfd, F_SETFD, FD_CLOEXEC);
115 else
116#else
117 if ((pfd = epoll_create1(EPOLL_CLOEXEC)) < 0)
118#endif
119 {eNum = errno;
120 if (eTxt) *eTxt = "creating epoll device";
121 return 0;
122 }
123
124// Add the request side of the pipe fd to the poll set (always fd[0])
125//
126 if (epoll_ctl(pfd, EPOLL_CTL_ADD, pipeFD[0], &myEvent))
127 { eNum = errno;
128 *eTxt = "adding communication pipe";
129 return 0;
130 }
131
132// Allocate the poll table
133//
134 if ((eNum = XrdSys::IOEvents::PollE::AllocMem((void **)&pp, allocFD)))
135 {eNum = ENOMEM;
136 if (eTxt) *eTxt = "creating epoll table";
137 close(pfd);
138 return 0;
139 }
140
141// Create new poll object
142//
143 return (Poller *)new PollE(pp, allocFD, pfd, pipeFD);
144}
145
146/******************************************************************************/
147/* C l a s s P o l l E */
148/******************************************************************************/
149/******************************************************************************/
150/* A l l o c M e m */
151/******************************************************************************/
152
153int XrdSys::IOEvents::PollE::AllocMem(void **memP, int slots)
154{
155 int rc, bytes, alignment, pagsz = getpagesize();
156
157// Calculate the size of the poll table and allocate it
158//
159 bytes = slots * sizeof(struct epoll_event);
160 alignment = (bytes < pagsz ? 1024 : pagsz);
161 if (!(rc = posix_memalign(memP, alignment, bytes))) memset(*memP, 0, bytes);
162 return rc;
163}
164
165/******************************************************************************/
166/* Private: A l l o c P T */
167/******************************************************************************/
168
169int XrdSys::IOEvents::PollE::AllocPT(int slots)
170{
171 struct epoll_event *pp;
172
173// Calclulate new slots
174//
175 if (pollMax >= slots) slots = pollMax + 256;
176 else slots = pollMax + (slots/256*256) + (slots%256 ? 256 : 0);
177
178// Allocate a new table and if successful, replace the old one
179//
180 if (!AllocMem((void **)&pp, slots))
181 {free(pollTab);
182 pollTab = pp;
183 pollMax = slots;
184 }
185
186// All done
187//
188 return 0;
189}
190
191/******************************************************************************/
192/* Protected: B e g i n */
193/******************************************************************************/
194
196 int &retcode,
197 const char **eTxt)
198{
199 int numpolled, pollN;
200 Channel *cP;
201
202// Indicate to the starting thread that all went well
203//
204 retcode = 0;
205 *eTxt = 0;
206 syncsem->Post();
207
208// Now start dispatching channels that are ready. We use the wakePend flag to
209// keep the chatter down when we actually wakeup.
210//
211 do {do {numpolled = epoll_wait(pollDfd, pollTab, pollMax, TmoGet());}
212 while (numpolled < 0 && errno == EINTR);
213 CPP_ATOMIC_STORE(wakePend, true, std::memory_order_release);
214 numPoll = numpolled;
215 if (numpolled == 0) CbkTMO();
216 else if (numpolled < 0)
217 {int rc = errno;
218 //--------------------------------------------------------------
219 // If we are in a child process and the epoll file descriptor
220 // has been closed, there is an immense chance the fork will be
221 // followed by an exec, in which case we don't want to abort
222 //--------------------------------------------------------------
223 if( rc == EBADF && parentPID != getpid() ) return;
224 std::cerr <<"EPoll: "<<XrdSysE2T(rc)<<" polling for events "<<std::endl;
225 abort();
226 }
227 else for (int i = 0; i < numpolled; i++)
228 {if ((cP = (Channel *)pollTab[i].data.ptr))
229 {cbCurr = i; Dispatch(cP, pollTab[i].events);}
230 else if (!Process(i)) return;
231 }
232
233 pollN = AtomicGet(pollNum);
234 if (pollMax < pollN) AllocPT(pollN);
235
236 } while(1);
237}
238
239/******************************************************************************/
240/* Private: D i s p a t c h */
241/******************************************************************************/
242
243void XrdSys::IOEvents::PollE::Dispatch(XrdSys::IOEvents::Channel *cP,
244 uint32_t pollEv)
245{
246 static const uint32_t pollER = EPOLLERR| EPOLLHUP;
247 static const uint32_t pollOK = EPOLLIN | EPOLLPRI | EPOLLOUT;
248 static const uint32_t pollRD = EPOLLIN | EPOLLPRI;
249 static const uint32_t pollWR = EPOLLOUT;
250 const char *eTxt;
251 int eNum, events = 0;
252 bool isLocked = false;
253
254// Make sure this not a dispatch to a dead channel (rare but true)
255//
256 if (cP == (XrdSys::IOEvents::Channel *)&deadChP) return;
257
258// Translate the event to something reasonable
259//
260 if (pollEv & pollER)
261 {eTxt = "polling";
262 eNum = (pollEv & EPOLLERR ? EPIPE : ECONNRESET); // Error or HUP
263 }
264 else if (pollEv & pollOK)
265 {if (pollEv & pollRD) events |= CallBack::ReadyToRead;
266 if (pollEv & pollWR) events |= CallBack::ReadyToWrite;
267 eNum = 0; eTxt = 0;
268 }
269 else {eTxt = "polling"; eNum = EIO;}
270
271// Execute the callback
272//
273 cbNow = cP;
274 if (!CbkXeq(cP, events, eNum, eTxt)) Exclude(cP, isLocked, 0);
275 cbNow = 0;
276}
277
278/******************************************************************************/
279/* Protected: E x c l u d e */
280/******************************************************************************/
281
283 bool &isLocked, bool dover)
284{
285
286// Remove this channel from the poll set. We ignore errors as the descriptor
287// may have been closed prior to this call (though this shouldn't happen).
288//
289 epoll_ctl(pollDfd, EPOLL_CTL_DEL, cP->GetFD(), 0);
290 AtomicDec(pollNum);
291
292// If we need to verify this action, sync with the poller thread (note that the
293// poller thread will not ask for this action unless it wants to deadlock). We
294// may actually deadlock anyway if the channel lock is held. We are allowed to
295// release it if the caller locked it. This will prevent a deadlock. Otherwise,
296// if we are in a callback and this channel is not the one that initiated the
297// exclude then we must make sure that we cancel any pending callback to the
298// excluded channel as it may have been deleted and we won't know that here.
299//
300 if (dover)
301 {PipeData cmdbuff;
302 if (isLocked)
303 {isLocked = false;
304 UnLockChannel(cP);
305 }
306 cmdbuff.req = PipeData::RmFD;
307 cmdbuff.fd = cP->GetFD();
308 SendCmd(cmdbuff);
309 } else {
310 if (cbNow && cbNow != cP)
311 for (int i = cbCurr+1; i < numPoll; i++)
312 {if (cP == (Channel *)pollTab[i].data.ptr)
313 pollTab[i].data.ptr = &deadChP;
314 }
315 }
316}
317
318/******************************************************************************/
319/* Protected: I n c l u d e */
320/******************************************************************************/
321
323 int &eNum,
324 const char **eTxt,
325 bool &isLocked)
326{
327 struct epoll_event myEvent = {0, {(void *)cP}};
328 int events = cP->GetEvents();
329
330// Establish new event mask
331//
332 if (events & Channel:: readEvents) myEvent.events = EPOLLIN | EPOLLPRI;
333 if (events & Channel::writeEvents) myEvent.events |= EPOLLOUT;
334
335// Add this fd to the poll set
336//
337 if (epoll_ctl(pollDfd, EPOLL_CTL_ADD, cP->GetFD(), &myEvent))
338 {eNum = errno;
339 if (eTxt) *eTxt = "adding channel";
340 return false;
341 }
342
343// All went well. Bump the number in the set. The poller thread will
344// reallocate the poll table if need be.
345//
346 AtomicInc(pollNum);
347 return true;
348}
349
350/******************************************************************************/
351/* Protected: M o d i f y */
352/******************************************************************************/
353
355 int &eNum,
356 const char **eTxt,
357 bool &isLocked)
358{
359 struct epoll_event myEvents = {0, {(void *)cP}};
360 int events = cP->GetEvents();
361
362// Establish new event mask
363//
364 if (events & Channel:: readEvents) myEvents.events |= EPOLLIN | EPOLLPRI;
365 if (events & Channel::writeEvents) myEvents.events |= EPOLLOUT;
366
367// Modify this fd. Unlike solaris, epoll_ctl() does not block when the pollfd
368// is being waited upon by another thread.
369//
370 if (epoll_ctl(pollDfd, EPOLL_CTL_MOD, cP->GetFD(), &myEvents))
371 {eNum = errno;
372 if (eTxt) *eTxt = "modifying poll events";
373 return false;
374 }
375
376// All done
377//
378 return true;
379}
380
381/******************************************************************************/
382/* Private: P r o c e s s */
383/******************************************************************************/
384
385bool XrdSys::IOEvents::PollE::Process(int curr)
386{
387// Get the pipe request and check out actions of interest.
388//
389 if (GetRequest())
390 { if (reqBuff.req == PipeData::RmFD)
391 {Channel *cP;
392 for (int i = curr+1; i < numPoll; i++)
393 {if ((cP = (Channel *)pollTab[i].data.ptr)
394 && cP != (XrdSys::IOEvents::Channel *)&deadChP
395 && reqBuff.fd == cP->GetFD()) pollTab[i].data.ptr=&deadChP;
396 }
397 reqBuff.theSem->Post();
398 }
399 else if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post();
400 return false;
401 }
402 }
403
404// Return true
405//
406 return true;
407}
408
409/******************************************************************************/
410/* Protected: S h u t d o w n */
411/******************************************************************************/
412
414{
415 static XrdSysMutex shutMutex;
416
417// To avoid race conditions, we serialize this code
418//
419 shutMutex.Lock();
420
421// Release the poll table
422//
423 if (pollTab) {free(pollTab); pollTab = 0;}
424
425// Close the epoll file descriptor
426//
427 if (pollDfd >= 0) {close(pollDfd); pollDfd = -1;}
428
429// All done
430//
431 shutMutex.UnLock();
432}
#define close(a)
Definition XrdPosix.hh:48
#define eMsg(x)
#define Atomic(type)
#define AtomicInc(x)
#define CPP_ATOMIC_STORE(x, val, order)
#define AtomicDec(x)
#define AtomicGet(x)
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
PollE(struct epoll_event *ptab, int numfd, int pfd, int pFD[2])
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
static int AllocMem(void **memP, int slots)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)