XRootD
Loading...
Searching...
No Matches
XrdPollPoll.icc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d P o l l P o l l . i c c */
4/* */
5/* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* Produced by Andrew Hanushevsky for Stanford University under contract */
7/* DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30#include <signal.h>
31#include <cstdlib>
32
33#include "Xrd/XrdLinkCtl.hh"
34#include "Xrd/XrdPollPoll.hh"
35#include "Xrd/XrdScheduler.hh"
36
37#include <vector>
38
39/******************************************************************************/
40/* n e w P o l l e r */
41/******************************************************************************/
42
43XrdPoll *XrdPoll::newPoller(int pollid, int maxfd)
44{
45 int bytes, alignment, pagsz = getpagesize();
46 struct pollfd *pp;
47
48// Calculate the size of the poll table and allocate it
49//
50 bytes = maxfd * sizeof(struct pollfd);
51 alignment = (bytes < pagsz ? 1024 : pagsz);
52 if (posix_memalign((void **)&pp, alignment, bytes))
53 {Log.Emsg("Poll", ENOMEM, "create poll table");
54 return 0;
55 }
56
57// Create new poll object
58//
59 memset((void *)pp, 0, bytes);
60 return (XrdPoll *)new XrdPollPoll(pp, maxfd);
61}
62
63/******************************************************************************/
64/* C o n s t r c u t o r */
65/******************************************************************************/
66
67XrdPollPoll::XrdPollPoll(struct pollfd *pp, int numfd)
68{
69
70// Initialize the standard stuff
71//
72 PollTab = pp;
73 PollTNum= 0;
74 PollQ = 0;
75 maxent = numfd;
76}
77
78/******************************************************************************/
79/* D e s t r u c t o r */
80/******************************************************************************/
81
83{
84 if (PollTab) free(PollTab);
85}
86
87/******************************************************************************/
88/* I n c l u d e */
89/******************************************************************************/
90
92{
93 struct pollfd *pfd;
94 int ptnum;
95
96// Lock down the poll data structure
97//
98 PollMutex.Lock();
99
100// Get the next entry to be used
101//
102 ptnum = 0;
103 while((ptnum < PollTNum) && (PollTab[ptnum].fd != -1)) ptnum++;
104
105// Make sure we have enough table entries to add this link
106//
107 if (ptnum > maxent)
108 {Log.Emsg("Attach","Attach",pInfo.Link.ID,"failed; poll table overflow.");
109 PollMutex.UnLock();
110 return 0;
111 }
112
113// Initialize the polltable entry
114//
115 pfd = &(PollTab[ptnum]);
116 pfd->fd = -pInfo.FD;
117 pfd->events = POLLIN | POLLRDNORM;
118 pfd->revents = 0;
119
120// Record relevant information in the link
121//
122 pInfo.PollEnt = pfd;
123 if (ptnum == PollTNum) PollTNum++;
124
125// All done
126//
127 PollMutex.UnLock();
128 return 1;
129}
130
131/******************************************************************************/
132/* D i s a b l e */
133/******************************************************************************/
134
135void XrdPollPoll::Disable(XrdPollInfo &pInfo, const char *etxt)
136{
137 XrdSysSemaphore mySem(0);
138 PipeData cmdbuff[2];
139 int myerrno = 0;
140
141// Check if this link is in the pollQ. If so, remove it.
142//
143 if (pInfo.inQ) dqLink(&pInfo);
144
145// Simply return if the link is already disabled
146//
147 if (!pInfo.isEnabled) return;
148
149// Trace this event
150//
151 TRACEI(POLL, "Poller " <<PID <<" async disabling link FD " <<pInfo.FD);
152
153// Send a disable request to the poller thread handling this link. We need to
154// wait until the operation is actually completed before returning.
155//
156 memset(&cmdbuff, 0, sizeof(cmdbuff));
157 cmdbuff[0].req = PipeData::DiFD;
158 cmdbuff[0].Parms.Arg.fd = pInfo.FD;
159 cmdbuff[0].Parms.Arg.ent = pInfo.PollEnt - PollTab;
160 cmdbuff[1].req = PipeData::Post;
161 cmdbuff[1].Parms.theSem = &mySem;
162 PollPipe.Lock();
163 if (write(CmdFD, &cmdbuff, sizeof(cmdbuff)) < 0) myerrno = errno;
165
166// Verify that all went well and if termination wanted, terminate the link
167//
168 if (myerrno) Log.Emsg("Poll", myerrno, "disable link", pInfo.Link.ID);
169 else {mySem.Wait();
170 if (etxt && Finish(pInfo, etxt))
171 Sched.Schedule((XrdJob *)&pInfo.Link);
172 }
173}
174
175/******************************************************************************/
176/* E n a b l e */
177/******************************************************************************/
178
180{
181 PipeData cmdbuff;
182 int myerrno = 0;
183
184// Simply return if the link is already enabled
185//
186 if (pInfo.isEnabled) return 1;
187
188// Add this link element to the queue
189//
190 PollMutex.Lock();
191 pInfo.Next = PollQ;
192 PollQ = &pInfo;
193 pInfo.inQ = true;
194 PollMutex.UnLock();
195
196// Send an enable request to the poller thread handling this link
197//
198 TRACEI(POLL, "sending poller " <<PID <<" enable for link " <<pInfo.FD);
199 cmdbuff.req = PipeData::EnFD;
200 cmdbuff.Parms.Arg.fd = pInfo.FD;
201 cmdbuff.Parms.Arg.ent = pInfo.PollEnt - PollTab;
202 PollPipe.Lock();
203 if (write(CmdFD, &cmdbuff, sizeof(cmdbuff)) < 0) myerrno = errno;
205
206// Verify that all went well. Note that the link stays in the pollQ.
207//
208 if (myerrno)
209 {Log.Emsg("Poll", myerrno, "enable link", pInfo.Link.ID); return 0;}
210
211// All done
212//
213 return 1;
214}
215
216/******************************************************************************/
217/* E x c l u d e */
218/******************************************************************************/
219
221{
222 XrdSysSemaphore mySem(0);
223 PipeData cmdbuff[2];
224 int myerrno = 0;
225
226// Make sure this link is not enabled
227//
228 if (pInfo.isEnabled)
229 {Log.Emsg("Poll", "Detach of enabled link", pInfo.Link.ID);
230 Disable(pInfo);
231 }
232 else if (pInfo.inQ) dqLink(&pInfo);
233
234// Send a deatch request to the poller thread handling this link
235//
236 TRACEI(POLL, "sending poller " <<PID <<" detach for link " <<pInfo.FD);
237 cmdbuff[0].req = PipeData::RmFD;
238 cmdbuff[0].Parms.Arg.fd = pInfo.FD;
239 cmdbuff[0].Parms.Arg.ent = pInfo.PollEnt - PollTab;
240 cmdbuff[1].req = PipeData::Post;
241 cmdbuff[1].Parms.theSem = &mySem;
242 PollPipe.Lock();
243 if (write(CmdFD, &cmdbuff, sizeof(cmdbuff)) < 0) myerrno = errno;
245
246// Verify that all went well and if termination wanted, terminate the link
247//
248 if (myerrno) Log.Emsg("Poll", myerrno, "detach link", pInfo.Link.ID);
249 else mySem.Wait();
250}
251
252/******************************************************************************/
253/* S t a r t */
254/******************************************************************************/
255
256void XrdPollPoll::Start(XrdSysSemaphore *syncsem, int &retcode)
257{
258 int numpolled, num2sched;
259 XrdJob *jfirst, *jlast;
260 XrdPollInfo *plp, *nlp, *pInfo;
261 XrdLink *lp;
262 short pollevents;
263 const short pollOK = POLLIN | POLLRDNORM;
264
265// Set up he first entry in the poll table to be our communications port
266//
267 PollTab[0].fd = ReqFD;
268 PollTab[0].events = pollOK;
269 PollTab[0].revents = 0;
270 PollTNum = 1;
271
272// Signal the caller to continue
273//
274 retcode = 0;
275 syncsem->Post();
276
277// Now do the main poll loop
278//
279 std::vector<struct pollfd> PollTabCopy;
280 do {// Duplicate the polling table so we don't need to hold the PollMutex
281 // while we are sleeping in the poll()
282 PollMutex.Lock();
283 PollTabCopy.resize(PollTNum);
284 memcpy(PollTabCopy.data(), PollTab, sizeof(struct pollfd) * PollTNum);
285 PollMutex.UnLock();
286
287 do {numpolled = poll(PollTabCopy.data(), PollTabCopy.size(), -1);}
288 while(numpolled < 0 && (errno == EAGAIN || errno == EINTR));
289
290 // Check if we had a polling error
291 //
292 if (numpolled < 0)
293 {if (errno != EINTR) Restart(errno);
294 else numInterrupts++;
295 continue;
296 }
297 numEvents += numpolled;
298
299 // Note this thread is the only one that writes directly to the poll
300 // table (everything else is a read). Hence, it's OK to assume that
301 // the table after the poll() is unchanged and we can write back the
302 // revents field.
303 PollMutex.Lock();
304 for (size_t idx=0; idx<PollTabCopy.size(); idx++)
305 PollTab[idx].revents = PollTabCopy[idx].revents;
306
307 // Check out base poll table entry, we can do this without a lock
308 //
309 if (PollTab[0].revents & pollOK)
310 {PollMutex.UnLock();
311 doRequests(numpolled);
312 if (--numpolled <= 0) continue;
313 PollMutex.Lock();
314 }
315
316 // Checkout which links must be dispatched (do this locked)
317 //
318 plp = 0; nlp = PollQ; jfirst = jlast = 0; num2sched = 0;
319 while ((pInfo = nlp) && numpolled > 0)
320 {if ((pollevents = pInfo->PollEnt->revents))
321 {pInfo->PollEnt->fd = -pInfo->PollEnt->fd;
322 if (plp) nlp = plp->Next = pInfo->Next;
323 else nlp = PollQ = pInfo->Next;
324 numpolled--; pInfo->inQ = false;
325 if (!(pollevents & pollOK))
326 Finish(*pInfo, Poll2Text(pollevents));
327 lp = &(pInfo->Link);
328 if (!(pInfo->isEnabled))
329 Log.Emsg("Poll", "Disabled event occurred for", lp->ID);
330 else {pInfo->isEnabled = false;
331 lp->NextJob = jfirst; jfirst = (XrdJob *)lp;
332 if (!jlast) jlast=(XrdJob *)lp;
333 num2sched++;
334 continue;
335 }
336 }
337 plp = pInfo; nlp = pInfo->Next;
338 }
339 if (numpolled) Recover(numpolled);
340 PollMutex.UnLock();
341
342 // Schedule the polled links
343 //
344 if (num2sched == 1) Sched.Schedule(jfirst);
345 else if (num2sched) Sched.Schedule(num2sched, jfirst, jlast);
346 } while(1);
347}
348
349/******************************************************************************/
350/* P r i v a t e M e t h o d s */
351/******************************************************************************/
352/******************************************************************************/
353/* d o D e t a c h */
354/******************************************************************************/
355// Detach a given offset in the poll table, `pti`, from the PollTab.
356//
357// This method must be called with the PollMutex held.
359{
360 int lastent;
361
362// Get some starting values
363//
364 if ((lastent = PollTNum-1) < 0)
365 {Log.Emsg("Poll","Underflow during detach"); abort();}
366
367 if (pti == lastent)
368 do {PollTNum--;} while(PollTNum && PollTab[PollTNum-1].fd == -1);
369}
370
371/******************************************************************************/
372/* d o R e q u e s t s */
373/******************************************************************************/
374// This must be called with the PollMutex unlocked
375void XrdPollPoll::doRequests(int maxreq)
376{
377 const char *act;
378 int pti, ptfd, num2do;
379 XrdPollInfo *piP;
380
381// To keep ourselves from being swamped, base request read-aheads on the number
382// of pending poll events.
383//
384 num2do = (maxreq < 3 ? -1 : maxreq);
385
386// Now process all poll table manipulation requests
387//
388 while(num2do-- && getRequest())
389 {XrdSysMutexHelper PollGuard(PollMutex);
390 if (ReqBuff.req == PipeData::Post)
391 {ReqBuff.Parms.theSem->Post();
392 continue;
393 }
394 pti = ReqBuff.Parms.Arg.ent;
395 if ((ptfd = abs(PollTab[pti].fd)) != ReqBuff.Parms.Arg.fd)
396 {auto fd = PollTab[pti].fd;
397 PollGuard.UnLock();
398 LogEvent(ReqBuff.req, fd, ReqBuff.Parms.Arg.fd);
399 continue;
400 }
401 if (!(piP = XrdLinkCtl::fd2PollInfo(ptfd)))
402 {PollGuard.UnLock();
403 LogEvent(ReqBuff.req, -1, ptfd);
404 continue;
405 }
406 if (ReqBuff.req == PipeData::EnFD)
407 {PollTab[pti].events = POLLIN | POLLRDNORM;
408 PollTab[pti].fd = ptfd;
409 piP->isEnabled = true; numEnabled++;
410 act = " enabled fd ";
411 }
412 else if (ReqBuff.req == PipeData::DiFD)
413 {PollTab[pti].fd = -ptfd;
414 act = " disabled fd ";
415 piP->isEnabled = false;
416 }
417 else if (ReqBuff.req == PipeData::RmFD)
418 {PollTab[pti].fd = -1;
419 doDetach(pti);
420 act = " detached fd ";
421 piP->isEnabled = false;
422 }
423 else {PollGuard.UnLock();
424 Log.Emsg("Poll", "Received an invalid poll pipe request");
425 continue;
426 }
427 PollGuard.UnLock();
428 TRACE(POLL, "Poller " <<PID <<act <<ReqBuff.Parms.Arg.fd
429 <<" entry " <<pti <<" now at " <<PollTNum);
430 }
431}
432
433/******************************************************************************/
434/* d q L i n k */
435/******************************************************************************/
436
437void XrdPollPoll::dqLink(XrdPollInfo *pInfo)
438{
439 XrdPollInfo *plp, *nlp;
440
441// Find matching link in the queue
442//
443 PollMutex.Lock();
444 pInfo->inQ = false;
445 plp = 0; nlp = PollQ;
446 while (nlp && (pInfo != nlp)) {plp=nlp; nlp = nlp->Next;}
447
448// If we found the link, remove it. Otherwise complain
449//
450 if (nlp) {if (plp) plp->Next = nlp->Next;
451 else PollQ = nlp->Next;
452 PollMutex.UnLock();
453 }
454 else {PollMutex.UnLock();
455 Log.Emsg("dqLink", "Link not found in Q", pInfo->Link.ID);
456 }
457}
458
459/******************************************************************************/
460/* L o g E v e n t */
461/******************************************************************************/
462
463void XrdPollPoll::LogEvent(int req, int pollfd, int cmdfd)
464{
465 const char *opn, *id1, *id2;
466 char buff[4096];
467 XrdLink *lp;
468
469 if (ReqBuff.req == PipeData::EnFD) opn = "enable";
470 else if (ReqBuff.req == PipeData::DiFD) opn = "disable";
471 else if (ReqBuff.req == PipeData::RmFD) opn = "detach";
472 else opn = "???";
473
474 if (pollfd < 0)
475 {sprintf(buff, "poll %d failed; FD %d", PID, cmdfd);
476 Log.Emsg("Poll", opn, buff, "does not map to a link");
477 return;
478 }
479
480 if ((lp = XrdLinkCtl::fd2link(pollfd))) id1 = lp->ID;
481 else id1 = "unknown";
482 if ((lp = XrdLinkCtl::fd2link(cmdfd))) id2 = lp->ID;
483 else id2 = "unknown";
484 snprintf(buff, sizeof(buff)-1,
485 "%d poll fd=%d (%s) not equal %s cmd fd=%d (%s).",
486 PID, pollfd, id1, opn, cmdfd, id2);
487
488 Log.Emsg("Poll", "cmd/poll mismatch:", buff);
489}
490
491/******************************************************************************/
492/* R e c o v e r */
493/******************************************************************************/
494// This must be called with PollMutex locked.
495void XrdPollPoll::Recover(int numleft)
496{
497 int i;
498 XrdPollInfo *piP;
499
500// Turn off any unaccounted links
501//
502 for (i = 1; i < PollTNum; i++)
503 if (PollTab[i].revents)
504 {if (!(piP = XrdLinkCtl::fd2PollInfo(PollTab[i].fd)))
505 PollTab[i].fd = -1;
506 else {piP->isEnabled = false;
507 PollTab[i].fd = -PollTab[i].fd;
508 Log.Emsg("Poll","Improper poll event for",piP->Link.ID);
509 }
510 }
511}
512
513/******************************************************************************/
514/* R e s t a r t */
515/******************************************************************************/
516// This must be called with the PollMutex unlocked
517void XrdPollPoll::Restart(int ecode)
518{
519 XrdPollInfo *pInfo;
520
521// Issue error message
522//
523 TRACE(POLL, PID <<'-' <<TID <<" Poll error " <<ecode);
524 Log.Emsg("Poll", errno, "poll");
525
526// For any outstanding link here, close the link and detach it
527//
528 PollMutex.Lock();
529 while((pInfo = PollQ))
530 {PollQ = pInfo->Next;
531 pInfo->PollEnt->fd = -1;
532 Finish(*pInfo, "Unexpected polling error");
533 Sched.Schedule((XrdJob *)&(pInfo->Link));
534 }
535 PollMutex.UnLock();
536}
#define write(a, b, c)
Definition XrdPosix.hh:115
#define TRACE(act, x)
Definition XrdTrace.hh:63
#define TRACEI(act, x)
Definition XrdTrace.hh:66
XrdJob * NextJob
Definition XrdJob.hh:46
static XrdLink * fd2link(int fd)
Definition XrdLinkCtl.hh:72
static XrdPollInfo * fd2PollInfo(int fd)
struct pollfd * PollEnt
XrdPollInfo * Next
XrdLink & Link
int Include(XrdPollInfo &pInfo)
XrdPollPoll(struct pollfd *pp, int numfd)
void doDetach(int pti)
void Start(XrdSysSemaphore *syncp, int &rc)
void Disable(XrdPollInfo &pInfo, const char *etxt=0)
void Exclude(XrdPollInfo &pInfo)
int Enable(XrdPollInfo &pInfo)
int numInterrupts
Definition XrdPoll.hh:134
pthread_t TID
Definition XrdPoll.hh:83
int PID
Definition XrdPoll.hh:82
XrdSysMutex PollPipe
Definition XrdPoll.hh:115
int ReqFD
Definition XrdPoll.hh:118
int numEvents
Definition XrdPoll.hh:133
int getRequest()
Definition XrdPoll.cc:232
PipeData ReqBuff
Definition XrdPoll.hh:126
static char * Poll2Text(short events)
Definition XrdPoll.cc:272
static XrdPoll * newPoller(int pollid, int numfd)
Definition XrdPollE.icc:45
static int Finish(XrdPollInfo &pInfo, const char *etxt=0)
Definition XrdPoll.cc:204
int numEnabled
Definition XrdPoll.hh:132
int CmdFD
Definition XrdPoll.hh:117
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysError Log
Definition XrdConfig.cc:113
XrdScheduler Sched
Definition XrdLinkCtl.cc:54
union XrdPoll::PipeData::@18 Parms