XRootD
Loading...
Searching...
No Matches
XrdPollE.icc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d P o l l E . i c c */
4/* */
5/* (c) 2008 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <fcntl.h>
32#include <cstdlib>
33#include <sys/types.h>
34#include <sys/stat.h>
35#include <sys/epoll.h>
36#include <sys/eventfd.h>
37
38#include "Xrd/XrdPollE.hh"
39#include "Xrd/XrdScheduler.hh"
40
41/******************************************************************************/
42/* n e w P o l l e r */
43/******************************************************************************/
44
45XrdPoll *XrdPoll::newPoller(int pollid, int maxfd)
46{
47 int pfd, wfd, bytes, alignment, pagsz = getpagesize();
48 struct epoll_event *pp;
49
50// Open the /dev/poll driver
51//
52#ifndef EPOLL_CLOEXEC
53 if ((pfd = epoll_create(maxfd)) >= 0) fcntl(pfd, F_SETFD, FD_CLOEXEC);
54 else
55#else
56 if ((pfd = epoll_create1(EPOLL_CLOEXEC)) < 0)
57#endif
58 {Log.Emsg("Poll", errno, "create epoll device"); return 0;}
59
60 if ((wfd = eventfd(0, EFD_CLOEXEC)) < 0)
61 {Log.Emsg("Poll", errno,
62 "create an eventfd as the wait-poller descriptor");
63 close(pfd);
64 return 0;
65 }
66
67// Calculate the size of the poll table and allocate it
68//
69 bytes = maxfd * sizeof(struct epoll_event);
70 alignment = (bytes < pagsz ? 1024 : pagsz);
71 if (posix_memalign((void **)&pp, alignment, bytes))
72 {Log.Emsg("Poll", ENOMEM, "create poll table");
73 close(wfd);
74 close(pfd);
75 return 0;
76 }
77
78// Create new poll object
79//
80 memset((void *)pp, 0, bytes);
81 return (XrdPoll *)new XrdPollE(pp, maxfd, pfd, wfd);
82}
83
84/******************************************************************************/
85/* D e s t r u c t o r */
86/******************************************************************************/
87
89{
90 if (PollTab) free(PollTab);
91 if (WaitFd >= 0) close(WaitFd);
92 if (PollDfd >= 0) close(PollDfd);
93}
94
95/******************************************************************************/
96/* A d d W a i t F d */
97/******************************************************************************/
98
99int XrdPollE::AddWaitFd()
100{
101 const unsigned int myPollEvts = EPOLLIN;
102 struct epoll_event myEvent = {myPollEvts, {(void *)&WaitFd}};
103
104// Add the waitfd to the poll set
105//
106 if (epoll_ctl(PollDfd, EPOLL_CTL_ADD, WaitFd, &myEvent) < 0)
107 {int rc = errno;
108 Log.Emsg("Poll", rc, "include the wait FD in the poll set");
109 return rc;
110 }
111
112 return 0;
113}
114
115/******************************************************************************/
116/* D i s a b l e */
117/******************************************************************************/
118
119void XrdPollE::Disable(XrdPollInfo &pInfo, const char *etxt)
120{
121
122// Simply return if the link is already disabled
123//
124 if (!pInfo.isEnabled) return;
125
126// If Linux 2.6.9 we use EPOLLONESHOT to automatically disable a polled fd.
127// So, the Disable() method need not do anything. Prior kernels did not have
128// this mechanism so we need to do this manually.
129//
130#ifndef EPOLLONESHOT
131 struct epoll_event myEvents = {0, (void *)&pInfo};
132
133// Disable this fd. Unlike solaris, epoll_ctl() does not block when the pollfd
134// is being waited upon by another thread.
135//
136 if (epoll_ctl(PollDfd, EPOLL_CTL_MOD, pInfo.FD, &myEvents))
137 {Log.Emsg("Poll", errno, "disable link", lp->ID); return;}
138#endif
139
140// Trace this event
141//
142 pInfo.isEnabled = false;
143 TRACEI(POLL, "Poller " <<PID <<" async disabling link " <<pInfo.FD);
144
145// Check if this link needs to be rescheduled. If so, the caller better have
146// the link opMutex lock held for this to work!
147//
148 if (etxt && Finish(pInfo, etxt)) Sched.Schedule((XrdJob *)&pInfo.Link);
149}
150
151/******************************************************************************/
152/* E n a b l e */
153/******************************************************************************/
154
156{
157 struct epoll_event myEvents = {ePollEvents, {(void *)&pInfo}};
158
159// Simply return if the link is already enabled
160//
161 if (pInfo.isEnabled) return 1;
162
163// Enable this fd. Unlike solaris, epoll_ctl() does not block when the pollfd
164// is being waited upon by another thread.
165//
166 pInfo.isEnabled = true;
167 if (epoll_ctl(PollDfd, EPOLL_CTL_MOD, pInfo.FD, &myEvents))
168 {Log.Emsg("Poll", errno, "enable link", pInfo.Link.ID);
169 pInfo.isEnabled = false;
170 return 0;
171 }
172
173// Do final processing
174//
175 TRACE(POLL, "Poller " <<PID <<" enabled " <<pInfo.Link.ID);
176 numEnabled++;
177 return 1;
178}
179
180/******************************************************************************/
181/* E x c l u d e */
182/******************************************************************************/
183
185{
186
187// Make sure this link is not enabled
188//
189 if (pInfo.isEnabled)
190 {Log.Emsg("Poll", "Detach of enabled link", pInfo.Link.ID);
191 Disable(pInfo);
192 }
193
194// Forcibly remove the link from the poll set
195//
196 remFD(pInfo, 0);
197
198// Wait to make sure the poll thread has completed handling the last set of
199// events which may have included this Link. The handing includes examining the
200// Link's PollInfo before deciding if to schedule the Link. After we return,
201// the PollInfo may be reset and the Link could be subsequently reused.
202//
203 Wait4Poller();
204}
205
206/******************************************************************************/
207/* H a n d l e W a i t F d */
208/******************************************************************************/
209
210void XrdPollE::HandleWaitFd(const unsigned int events)
211{
212// Used by the polling thread to signal waiters that a polling loop has been
213// completed.
214//
215 eventfd_t ic, cnt;
216
217// We don't expect anything but EPOLLIN. But if we get an error (errors
218// can be reported, despite not being selected events) abort rather than
219// potentially keeping the poll thread busy repeatedly looping.
220//
221 if (!(events & EPOLLIN) || (events & (EPOLLERR | EPOLLHUP)))
222 {char eBuff[64];
223 Log.Emsg("Poll", "wait-poller handler:", x2Text(events, eBuff));
224 if (events & (EPOLLERR | EPOLLHUP))
225 abort();
226 return;
227 }
228
229 if (eventfd_read(WaitFd, &cnt) < 0)
230 {Log.Emsg("Poll", errno, "read from the wait-poller descriptor");
231 return;
232 }
233
234 for (ic=0;ic<cnt;++ic) WaitFdSem.Post();
235}
236
237/******************************************************************************/
238/* I n c l u d e */
239/******************************************************************************/
240
242{
243 struct epoll_event myEvent = {0, {(void *)&pInfo}};
244 int rc;
245
246// Add this fd to the poll set
247//
248 if ((rc = epoll_ctl(PollDfd, EPOLL_CTL_ADD, pInfo.FD, &myEvent)) < 0)
249 Log.Emsg("Poll", errno, "include link", pInfo.Link.ID);
250
251// All done
252//
253 return rc == 0;
254}
255
256/******************************************************************************/
257/* r e m F D */
258/******************************************************************************/
259
260void XrdPollE::remFD(XrdPollInfo &pInfo, unsigned int events)
261{
262 struct epoll_event myEvents = {0, {(void *)&pInfo}};
263
264// It works out that ONESHOT mode or even CTL_MOD requests do not necessarily
265// prevent epoll_wait() from returning on an error event. So, we must manually
266// remove the fd from the set and assume the logic was actually correct. If it
267// wasn't then the client will eventually timeout and retry the request. This
268// may cause a double remove; so we don't consider these an arror.
269//
270 if (pInfo.FD > 0)
271 {TRACEI(POLL, "Poller " <<PID <<" removing FD " <<pInfo.FD);
272 if (epoll_ctl(PollDfd, EPOLL_CTL_DEL, pInfo.FD, &myEvents)
273 && (errno != ENOENT || events != 0))
274 {const char *why;
275 char buff[96];
276 if (events & (EPOLLHUP | EPOLLRDHUP)) why = "sever";
277 else if (events & EPOLLERR) why = "error";
278 else why = "disc";
279 snprintf(buff, sizeof(buff), "exclude fd %d during %s (%x) event; link",
280 pInfo.FD, why, events);
281 Log.Emsg("Poll", errno, buff, pInfo.Link.ID);
282 }
283 }
284}
285
286/******************************************************************************/
287/* S t a r t */
288/******************************************************************************/
289
290void XrdPollE::Start(XrdSysSemaphore *syncsem, int &retcode)
291{
292 char eBuff[64];
293 int rc, i, numpolled, num2sched;
294 unsigned int waitFdEvents;
295 bool haveWaiters;
296 XrdJob *jfirst, *jlast;
297 const short pollOK = EPOLLIN | EPOLLPRI;
298 XrdLink *lp;
299 XrdPollInfo *pInfo;
300
301 if ((rc = AddWaitFd()))
302 {retcode = rc;
303 syncsem->Post();
304 return;
305 }
306
307// Indicate to the starting thread that all went well
308//
309 retcode = 0;
310 syncsem->Post();
311
312// Now start dispatching links that are ready
313//
314 do {do {numpolled = epoll_wait(PollDfd, PollTab, PollMax, -1);}
315 while (numpolled < 0 && errno == EINTR);
316 if (numpolled == 0) continue;
317 if (numpolled < 0)
318 {Log.Emsg("Poll", errno, "poll for events");
319 abort();
320 }
321 numEvents += numpolled;
322
323 // Checkout which links must be dispatched (no need to lock)
324 //
325 jfirst = jlast = 0; num2sched = 0;
326 haveWaiters = false; waitFdEvents = 0;
327 for (i = 0; i < numpolled; i++)
328 {if (PollTab[i].data.ptr == &WaitFd)
329 {haveWaiters = true; waitFdEvents = PollTab[i].events;}
330 else if ((pInfo = (XrdPollInfo *)PollTab[i].data.ptr))
331 {if (!(pInfo->isEnabled) && pInfo->FD >= 0)
332 remFD(*pInfo, PollTab[i].events);
333 else {pInfo->isEnabled = 0;
334 if (!(PollTab[i].events & pollOK)
335 || (PollTab[i].events & POLLRDHUP))
336 Finish(*pInfo, x2Text(PollTab[i].events, eBuff));
337 lp = &(pInfo->Link);
338 lp->NextJob = jfirst; jfirst = (XrdJob *)lp;
339 if (!jlast) jlast=(XrdJob *)lp;
340 num2sched++;
341#ifndef EPOLLONESHOT
342 PollTab[i].events = 0;
343 if (epoll_ctl(PollDfd,EPOLL_CTL_MOD,pInfo.FD,&PollTab[i]))
344 Log.Emsg("Poll",errno,"disable link",pInfo.Link.ID);
345#endif
346 }
347 } else Log.Emsg("Poll", "null link event!!!!");
348 }
349
350 // Schedule the polled links
351 //
352 if (num2sched == 1) Sched.Schedule(jfirst);
353 else if (num2sched) Sched.Schedule(num2sched, jfirst, jlast);
354
355 if (haveWaiters) HandleWaitFd(waitFdEvents);
356 } while(1);
357}
358
359/******************************************************************************/
360/* W a i t 4 P o l l e r */
361/******************************************************************************/
362
363void XrdPollE::Wait4Poller()
364{
365// Makes the caller wait for the polling thread to complete an event processing
366// loop.
367//
368 if (eventfd_write(WaitFd, 1) < 0)
369 {Log.Emsg("Poll", errno, "write to the wait-poller descriptor");
370 return;
371 }
372
373 WaitFdSem.Wait();
374}
375
376/******************************************************************************/
377/* x 2 T e x t */
378/******************************************************************************/
379
380const char *XrdPollE::x2Text(unsigned int events, char *buff)
381{
382 if (events & EPOLLERR) return "socket error";
383
384 if (events & (EPOLLHUP | EPOLLRDHUP)) return "hangup";
385
386 sprintf(buff, "unusual event (%.4x)", events);
387 return buff;
388}
#define EPOLLRDHUP
Definition XrdPollE.hh:37
#define close(a)
Definition XrdPosix.hh:48
#define TRACE(act, x)
Definition XrdTrace.hh:63
#define TRACEI(act, x)
Definition XrdTrace.hh:66
XrdJob * NextJob
Definition XrdJob.hh:46
const char * x2Text(unsigned int evf, char *buff)
Definition XrdPollE.icc:380
void Exclude(XrdPollInfo &pInfo)
Definition XrdPollE.icc:184
void Disable(XrdPollInfo &pInfo, const char *etxt=0)
Definition XrdPollE.icc:119
void Start(XrdSysSemaphore *syncp, int &rc)
Definition XrdPollE.icc:290
int Enable(XrdPollInfo &pInfo)
Definition XrdPollE.icc:155
int Include(XrdPollInfo &pInfo)
Definition XrdPollE.icc:241
XrdLink & Link
int PID
Definition XrdPoll.hh:82
int numEvents
Definition XrdPoll.hh:133
static XrdPoll * newPoller(int pollid, int numfd)
Definition XrdPollE.icc:45
static int Finish(XrdPollInfo &pInfo, const char *etxt=0)
Definition XrdPoll.cc:204
int numEnabled
Definition XrdPoll.hh:132
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysError Log
Definition XrdConfig.cc:113