XRootD
Loading...
Searching...
No Matches
XrdSys::IOEvents::Poller Class Referenceabstract

#include <XrdSysIOEvents.hh>

Inheritance diagram for XrdSys::IOEvents::Poller:
Collaboration diagram for XrdSys::IOEvents::Poller:

Classes

struct  PipeData

Public Types

enum  CreateOpts { optTOM }

Public Member Functions

 Poller (int cFD, int rFD)
virtual ~Poller ()
 Destructor. Stop() is effecively called when this object is deleted.
void Stop ()

Static Public Member Functions

static PollerCreate (int &eNum, const char **eTxt=0, int crOpts=0)

Protected Member Functions

virtual void Begin (XrdSysSemaphore *syncp, int &rc, const char **eTxt)=0
void CbkTMO ()
bool CbkXeq (Channel *cP, int events, int eNum, const char *eTxt)
 CPP_ATOMIC_TYPE (bool) wakePend
virtual void Exclude (Channel *cP, bool &isLocked, bool dover=1)=0
int GetFault (Channel *cP)
int GetPollEnt (Channel *cP)
int GetRequest ()
virtual bool Include (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
bool Init (Channel *cP, int &eNum, const char **eTxt, bool &isLockd)
void LockChannel (Channel *cP)
virtual bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
int Poll2Enum (short events)
int SendCmd (PipeData &cmd)
void SetPollEnt (Channel *cP, int ptEnt)
virtual void Shutdown ()=0
bool TmoAdd (Channel *cP, int tmoSet)
void TmoDel (Channel *cP)
int TmoGet ()
void UnLockChannel (Channel *cP)

Protected Attributes

ChannelattBase
bool chDead
int cmdFD
int pipeBlen
char * pipeBuff
struct pollfd pipePoll
pthread_t pollTid
PipeData reqBuff
int reqFD
ChanneltmoBase
unsigned char tmoMask

Static Protected Attributes

static time_t maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff)
static pid_t parentPID = getpid()

Friends

class BootStrap
class Channel

Detailed Description

Define a poller object interface. A poller fields and dispatches event callbacks. An actual instance of a poller object is obtained by using the Create() method. You cannot simply create an instance of this object using new or in-place declaration since it is abstract. Any number of these objects may created. Each creation spawns a polling thread.

Definition at line 371 of file XrdSysIOEvents.hh.

Member Enumeration Documentation

◆ CreateOpts

Create a specialized instance of a poller object, initialize it, and start the polling process. You must call Create() to obtain a specialized poller.

Parameters
eNumPlace where errno is placed upon failure.
eTxtPlace where a pointer to the description of the failing operation is to be set. If null, no description is returned.
crOptsPoller options (see static const optxxx): optTOM - Timeout resumption after a timeout event must be manually reenabled. By default, event timeouts are automatically renabled after successful callbacks.
Returns
!0 Poller successfully created and started. eNum contains zero. eTxt if not null contains a null string. The returned value is a pointer to the Poller object. 0 Poller could not be created. eNum contains the associated errno value. eTxt if not null contains the failing operation.
Enumerator
optTOM 

Definition at line 398 of file XrdSysIOEvents.hh.

Constructor & Destructor Documentation

◆ Poller()

XrdSys::IOEvents::Poller::Poller ( int cFD,
int rFD )

Constructor

Parameters
cFDThe file descriptor to send commands to the poll thread.
rFDThe file descriptor to recv commands in the poll thread.

Definition at line 571 of file XrdSysIOEvents.cc.

572{
573
574// Now initialize local class members
575//
576 attBase = 0;
577 tmoBase = 0;
578 cmdFD = cFD;
579 reqFD = rFD;
580 wakePend = false;
581 pipeBuff = 0;
582 pipeBlen = 0;
583 pipePoll.fd = rFD;
584 pipePoll.events = POLLIN | POLLRDNORM;
585 tmoMask = 255;
586}

References attBase, cmdFD, pipeBlen, pipeBuff, pipePoll, reqFD, tmoBase, and tmoMask.

Referenced by XrdSys::IOEvents::PollE::PollE(), XrdSys::IOEvents::PollerErr1::PollerErr1(), XrdSys::IOEvents::PollerInit::PollerInit(), XrdSys::IOEvents::PollerWait::PollerWait(), XrdSys::IOEvents::PollKQ::PollKQ(), XrdSys::IOEvents::PollPoll::PollPoll(), and XrdSys::IOEvents::PollPort::PollPort().

Here is the caller graph for this function:

◆ ~Poller()

virtual XrdSys::IOEvents::Poller::~Poller ( )
inlinevirtual

Destructor. Stop() is effecively called when this object is deleted.

Definition at line 430 of file XrdSysIOEvents.hh.

430{}

Member Function Documentation

◆ Begin()

virtual void XrdSys::IOEvents::Poller::Begin ( XrdSysSemaphore * syncp,
int & rc,
const char ** eTxt )
protectedpure virtual

Start the polling event loop. An implementation must be supplied. Begin() is called via the internal BootStrap class from a new thread.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

Referenced by XrdSys::IOEvents::BootStrap::Start().

Here is the caller graph for this function:

◆ CbkTMO()

void XrdSys::IOEvents::Poller::CbkTMO ( )
protected

Definition at line 614 of file XrdSysIOEvents.cc.

615{
616 Channel *cP;
617
618// Process each element in the timeout queue, calling the callback function
619// if the timeout has passed. As this method can be called with a lock on the
620// channel mutex, we need to drop it prior to calling the callback.
621//
622 toMutex.Lock();
623 while((cP = tmoBase) && cP->deadLine <= time(0))
624 {int dlType = cP->dlType;
625 toMutex.UnLock();
626 CbkXeq(cP, dlType, 0, 0);
627 toMutex.Lock();
628 }
629 toMutex.UnLock();
630}
bool CbkXeq(Channel *cP, int events, int eNum, const char *eTxt)

References CbkXeq(), Channel, and tmoBase.

Referenced by XrdSys::IOEvents::PollE::Begin(), XrdSys::IOEvents::PollKQ::Begin(), XrdSys::IOEvents::PollPoll::Begin(), XrdSys::IOEvents::PollPort::Begin(), and TmoGet().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ CbkXeq()

bool XrdSys::IOEvents::Poller::CbkXeq ( Channel * cP,
int events,
int eNum,
const char * eTxt )
protected

Definition at line 636 of file XrdSysIOEvents.cc.

638{
639 XrdSysMutexHelper cbkMHelp(cP->chMutex);
640 char oldEvents;
641 bool cbok, retval, isRead, isWrite, isLocked = true;
642
643// Perform any required tracing
644//
645 if (TRACING)
646 {const char *cbtype = (cP->chPoller == cP->chPollXQ ? "norm" :
647 (cP->chPoller == &pollInit ? "init" :
648 (cP->chPoller == &pollWait ? "wait" : "err")));
649 DO_TRACE(CbkXeq,cP->chFD,"callback events=" <<events
650 <<" chev=" <<static_cast<int>(cP->chEvents)
651 <<" toq=" <<(cP->inTOQ != 0) <<" erc=" <<eNum
652 <<" callback " <<(cP->chCB ? "present" : "missing")
653 <<" poller=" <<cbtype);
654 }
655
656// Remove this from the timeout queue if there and reset the deadlines based
657// on the event we are reflecting. This separates read and write deadlines
658//
659 if (cP->inTOQ)
660 {TmoDel(cP);
661 cP->dlType |= (events & CallBack::ValidEvents) << 4;
662 isRead = events & (CallBack::ReadyToRead | CallBack:: ReadTimeOut);
663 if (isRead) cP->rdDL = maxTime;
665 if (isWrite) cP->wrDL = maxTime;
666 } else {
667 cP->dlType &= CallBack::ValidEvents;
668 isRead = isWrite = false;
669 }
670
671// Verify that there is a callback here and the channel is ready. If not,
672// disable this channel for the events being refelcted unless the event is a
673// fatal error. In this case we need to abandon the channel since error events
674// may continue to be generated as we can't always disable them.
675//
676 if (!(cP->chCB) || cP->chPoller != cP->chPollXQ)
677 {if (eNum)
678 {cP->chPoller = &pollErr1; cP->chFault = eNum;
679 cP->inPSet = 0;
680 return false;
681 }
682 oldEvents = cP->chEvents;
683 cP->chEvents = 0;
684 retval = cP->chPoller->Modify(cP, eNum, 0, isLocked);
685 TRACE_MOD(CbkXeq,cP->chFD,0);
686 if (!isLocked) cP->chMutex.Lock();
687 cP->chEvents = oldEvents;
688 return true;
689 }
690
691// Resolve the problem where we get an error event but the channel wants them
692// presented as a read or write event. If neither is possible then defer the
693// error until the channel is enabled again.
694//
695 if (eNum)
696 {if (cP->chEvents & Channel::errorEvents)
697 {cP->chPoller = &pollErr1; cP->chFault = eNum;
698 cP->chStat = Channel::isCBMode;
699 chDead = false;
700 cbkMHelp.UnLock();
701 cP->chCB->Fatal(cP,cP->chCBA, eNum, eTxt);
702 if (chDead) return true;
703 cbkMHelp.Lock(&(cP->chMutex));
704 cP->inPSet = 0;
705 return false;
706 }
707 if (REVENTS(cP->chEvents)) events = CallBack::ReadyToRead;
708 else if (WEVENTS(cP->chEvents)) events = CallBack::ReadyToWrite;
709 else {cP->chPoller = &pollErr1; cP->chFault = eNum; cP->inPSet = 0;
710 return false;
711 }
712 }
713
714// Indicate that we are in callback mode then drop the channel lock and effect
715// the callback. This allows the callback to freely manage locks.
716//
717 cP->chStat = Channel::isCBMode;
718 chDead = false;
719 // Detach() may be called after unlocking the channel and would zero the
720 // callback pointer and argument. So keep a copy.
721 CallBack *cb = cP->chCB;
722 void *cba = cP->chCBA;
723 cbkMHelp.UnLock();
724 IF_TRACE(CbkXeq,cP->chFD,"invoking callback; events=" <<events);
725 cbok = cb->Event(cP,cba, events);
726 IF_TRACE(CbkXeq,cP->chFD,"callback returned " <<BOOLNAME(cbok));
727
728// If channel destroyed by the callback, bail really fast. Otherwise, regain
729// the channel lock.
730//
731 if (chDead) return true;
732 cbkMHelp.Lock(&(cP->chMutex));
733
734// If the channel is being destroyed; then another thread must have done so.
735// Tell it the callback has finished and just return.
736//
737 if (cP->chStat != Channel::isCBMode)
738 {if (cP->chStat == Channel::isDead)
739 ((XrdSysSemaphore *)cP->chCBA)->Post();
740 return true;
741 }
742 cP->chStat = Channel::isClear;
743
744// Handle enable or disable here. If we keep the channel enabled then reset
745// the timeout if it hasn't been handled via a call from the callback.
746//
747 if (!cbok) Detach(cP,isLocked,false);
748 else if ((isRead || isWrite) && !(cP->inTOQ) && (cP->chRTO || cP->chWTO))
749 TmoAdd(cP, 0);
750
751// All done. While the mutex should not have been unlocked, we relock it if
752// it has to keep the mutex helper from croaking.
753//
754 if (!isLocked) cP->chMutex.Lock();
755 return true;
756}
#define IF_TRACE(x, fd, y)
#define DO_TRACE(x, fd, y)
#define REVENTS(x)
#define BOOLNAME(x)
#define TRACE_MOD(x, fd, y)
#define WEVENTS(x)
#define TRACING(x)
Definition XrdTrace.hh:70
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ ValidEvents
Mask to test for valid events.
@ errorEvents
Error event non-r/w specific.
bool TmoAdd(Channel *cP, int tmoSet)

References BOOLNAME, CbkXeq(), chDead, DO_TRACE, XrdSys::IOEvents::Channel::errorEvents, XrdSys::IOEvents::CallBack::Event(), XrdSys::IOEvents::CallBack::Fatal(), IF_TRACE, XrdSysMutex::Lock(), XrdSysMutexHelper::Lock(), maxTime, Modify(), XrdSys::IOEvents::pollErr1, XrdSys::IOEvents::pollInit, XrdSys::IOEvents::pollWait, XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::CallBack::ReadyToRead, XrdSys::IOEvents::CallBack::ReadyToWrite, REVENTS, TmoAdd(), TmoDel(), TRACE_MOD, TRACING, XrdSysMutexHelper::UnLock(), XrdSys::IOEvents::CallBack::ValidEvents, WEVENTS, and XrdSys::IOEvents::CallBack::WriteTimeOut.

Referenced by CbkTMO(), and CbkXeq().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ CPP_ATOMIC_TYPE()

XrdSys::IOEvents::Poller::CPP_ATOMIC_TYPE ( bool )
protected

◆ Create()

XrdSys::IOEvents::Poller * XrdSys::IOEvents::Poller::Create ( int & eNum,
const char ** eTxt = 0,
int crOpts = 0 )
static

Definition at line 762 of file XrdSysIOEvents.cc.

765{
766 int fildes[2];
767 struct pollArg pArg;
768 pthread_t tid;
769
770// Create a pipe used to break the poll wait loop
771//
772 if (XrdSysFD_Pipe(fildes))
773 {eNum = errno;
774 if (eTxt) *eTxt = "creating poll pipe";
775 return 0;
776 }
777
778// Create an actual implementation of a poller
779//
780 if (!(pArg.pollP = newPoller(fildes, eNum, eTxt)))
781 {close(fildes[0]);
782 close(fildes[1]);
783 return 0;
784 }
785
786// Now start a thread to handle this poller object
787//
789 (void *)&pArg, XRDSYSTHREAD_BIND, "Poller")))
790 {if (eTxt) *eTxt = "creating poller thread"; return 0;}
791
792// Now wait for the thread to finish initializing before we allow use
793// Note that the bootstrap takes ownership of the semaphore and will delete it
794// once the thread positing the semaphore actually ends. This is to avoid
795// semaphore bugs present in certain (e.g. Linux) kernels.
796//
797 pArg.pollSync->Wait();
798
799// Check if all went well
800//
801 if (pArg.retCode)
802 {if (eTxt) *eTxt = (pArg.retMsg ? pArg.retMsg : "starting poller");
803 eNum = pArg.retCode;
804 delete pArg.pollP;
805 return 0;
806 }
807
808// Set creation options in the new poller
809//
810 if (crOpts & optTOM)
811 pArg.pollP->tmoMask = ~(CallBack::ReadTimeOut|CallBack::WriteTimeOut);
812
813// All done
814//
815 eNum = 0;
816 if (eTxt) *eTxt = "";
817 return pArg.pollP;
818}
#define close(a)
Definition XrdPosix.hh:48
#define XRDSYSTHREAD_BIND
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void * Start(void *parg)

References close, optTOM, XrdSys::IOEvents::pollArg::pollP, XrdSys::IOEvents::pollArg::pollSync, XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::pollArg::retCode, XrdSys::IOEvents::pollArg::retMsg, XrdSysThread::Run(), XrdSys::IOEvents::BootStrap::Start(), tmoMask, XrdSysSemaphore::Wait(), XrdSys::IOEvents::CallBack::WriteTimeOut, and XRDSYSTHREAD_BIND.

Referenced by XrdCl::PollerBuiltIn::Start().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Exclude()

virtual void XrdSys::IOEvents::Poller::Exclude ( Channel * cP,
bool & isLocked,
bool dover = 1 )
protectedpure virtual

Remove a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

References Channel.

Here is the call graph for this function:

◆ GetFault()

int XrdSys::IOEvents::Poller::GetFault ( Channel * cP)
inlineprotected

Definition at line 437 of file XrdSysIOEvents.hh.

437{return cP->chFault;}

References Channel.

Referenced by XrdSys::IOEvents::PollerErr1::Include(), and XrdSys::IOEvents::PollerErr1::Modify().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetPollEnt()

int XrdSys::IOEvents::Poller::GetPollEnt ( Channel * cP)
inlineprotected

Definition at line 438 of file XrdSysIOEvents.hh.

438{return cP->pollEnt;}

References Channel.

Referenced by XrdSys::IOEvents::PollKQ::Exclude(), XrdSys::IOEvents::PollPoll::Exclude(), XrdSys::IOEvents::PollPoll::Include(), XrdSys::IOEvents::PollKQ::Modify(), and XrdSys::IOEvents::PollPoll::Modify().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetRequest()

int XrdSys::IOEvents::Poller::GetRequest ( )
protected

Definition at line 870 of file XrdSysIOEvents.cc.

871{
872 ssize_t rlen;
873 int rc;
874
875// See if we are to resume a read or start a fresh one
876//
877 if (!pipeBlen)
878 {pipeBuff = (char *)&reqBuff; pipeBlen = sizeof(reqBuff);}
879
880// Wait for the next request. Some OS's (like Linux) don't support non-blocking
881// pipes. So, we must front the read with a poll.
882//
883 do {rc = poll(&pipePoll, 1, 0);}
884 while(rc < 0 && (errno == EAGAIN || errno == EINTR));
885 if (rc < 1) return 0;
886
887// Now we can put up a read without a delay. Normally a full command will be
888// present. Under some heavy conditions, this may not be the case.
889//
890 do {rlen = read(reqFD, pipeBuff, pipeBlen);}
891 while(rlen < 0 && errno == EINTR);
892 if (rlen <= 0)
893 {std::cerr <<"Poll: "<<XrdSysE2T(errno)<<" reading from request pipe\n"<< std::flush;
894 return 0;
895 }
896
897// Check if all the data has arrived. If not all the data is present, defer
898// this request until more data arrives.
899//
900 if (!(pipeBlen -= rlen)) return 1;
901 pipeBuff += rlen;
902 return 0;
903}
#define read(a, b, c)
Definition XrdPosix.hh:82
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104

References pipeBlen, pipeBuff, pipePoll, read, reqBuff, reqFD, and XrdSysE2T().

Here is the call graph for this function:

◆ Include()

virtual bool XrdSys::IOEvents::Poller::Include ( Channel * cP,
int & eNum,
const char ** eTxt,
bool & isLocked )
protectedpure virtual

Add a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

References Channel.

Referenced by Init().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Init()

bool XrdSys::IOEvents::Poller::Init ( Channel * cP,
int & eNum,
const char ** eTxt,
bool & isLockd )
protected

Definition at line 909 of file XrdSysIOEvents.cc.

911{
912// The channel must be locked upon entry!
913//
914 bool retval;
915
916
917// If we are already in progress then simply update the shadow events and
918// resuppress all current events.
919//
920 if (cP->chPoller == &pollWait)
921 {cP->reMod = cP->chEvents;
922 cP->chEvents = 0;
923 IF_TRACE(Init,cP->chFD,"defer events=" <<cP->reMod);
924 return true;
925 }
926
927// Trace this entry
928//
929 IF_TRACE(Init,cP->chFD,"begin events=" <<int(cP->chEvents));
930
931// If no events are enabled at this point, just return
932//
933 if (!(cP->chEvents)) return true;
934
935// Refuse to enable a channel without a callback function
936//
937 if (!(cP->chCB))
938 {eNum = EDESTADDRREQ;
939 if (eTxt) *eTxt = "enabling without a callback";
940 return false;
941 }
942
943// So, now we can include the channel in the poll set. We will include it
944// with no events enabled to prevent callbacks prior to completion here.
945//
946 cP->chPoller = &pollWait; cP->reMod = cP->chEvents; cP->chEvents = 0;
947 retval = cP->chPollXQ->Include(cP, eNum, eTxt, isLocked);
948 IF_TRACE(Init,cP->chFD,"Include() returned " <<BOOLNAME(retval) <<TRACE_LOK);
949 if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
950
951// Determine what future poller to use. If we can use the regular poller then
952// set the correct event mask for the channel. Note that we could have lost
953// control but the correct events will be reflected in the "reMod" member.
954//
955 if (!retval) {cP->chPoller = &pollErr1; cP->chFault = eNum;}
956 else {cP->chPoller = cP->chPollXQ;
957 cP->inPSet = 1;
958 if (cP->reMod)
959 {cP->chEvents = cP->reMod;
960 retval = cP->chPoller->Modify(cP, eNum, eTxt, isLocked);
961 TRACE_MOD(Init,cP->chFD,int(cP->reMod));
962 if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
963 } else {
964 TRACE_NOD(Init,cP->chFD,0);
965 }
966 }
967
968// All done
969//
970 cP->reMod = 0;
971 return retval;
972}
#define TRACE_LOK
#define TRACE_NOD(x, fd, y)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd)

References BOOLNAME, IF_TRACE, Include(), Init(), XrdSysMutex::Lock(), Modify(), XrdSys::IOEvents::pollErr1, XrdSys::IOEvents::pollWait, TRACE_LOK, TRACE_MOD, and TRACE_NOD.

Referenced by Init(), XrdSys::IOEvents::PollerInit::Modify(), and XrdSys::IOEvents::PollerWait::Modify().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ LockChannel()

void XrdSys::IOEvents::Poller::LockChannel ( Channel * cP)
inlineprotected

Definition at line 441 of file XrdSysIOEvents.hh.

441{cP->chMutex.Lock();}

References Channel, and XrdSysMutex::Lock().

Here is the call graph for this function:

◆ Modify()

virtual bool XrdSys::IOEvents::Poller::Modify ( Channel * cP,
int & eNum,
const char ** eTxt,
bool & isLocked )
protectedpure virtual

Modify the event status of a channel. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

References Channel.

Referenced by CbkXeq(), and Init().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Poll2Enum()

int XrdSys::IOEvents::Poller::Poll2Enum ( short events)
protected

Definition at line 978 of file XrdSysIOEvents.cc.

979{
980 if (events & POLLERR) return EPIPE;
981
982 if (events & POLLHUP) return ECONNRESET;
983
984 if (events & POLLNVAL) return EBADF;
985
986 return EOPNOTSUPP;
987}

◆ SendCmd()

int XrdSys::IOEvents::Poller::SendCmd ( PipeData & cmd)
protected

Definition at line 993 of file XrdSysIOEvents.cc.

994{
995 int wlen;
996
997// Pipe writes are atomic so we don't need locks. Some commands require
998// confirmation. We handle that here based on the command. Note that pipes
999// gaurantee that all of the data will be written or we will block.
1000//
1001 if (cmd.req >= PipeData::Post)
1002 {XrdSysSemaphore mySem(0);
1003 cmd.theSem = &mySem;
1004 do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1005 while (wlen < 0 && errno == EINTR);
1006 if (wlen > 0) mySem.Wait();
1007 } else {
1008 do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1009 while (wlen < 0 && errno == EINTR);
1010 }
1011
1012// All done
1013//
1014 return (wlen >= 0 ? 0 : errno);
1015}
#define write(a, b, c)
Definition XrdPosix.hh:115

References cmdFD, XrdSys::IOEvents::Poller::PipeData::Post, XrdSys::IOEvents::Poller::PipeData::req, XrdSys::IOEvents::Poller::PipeData::theSem, XrdSysSemaphore::Wait(), and write.

Referenced by XrdSys::IOEvents::PollE::Exclude(), XrdSys::IOEvents::PollKQ::Exclude(), XrdSys::IOEvents::PollPoll::Exclude(), XrdSys::IOEvents::PollPort::Exclude(), XrdSys::IOEvents::PollPoll::Include(), XrdSys::IOEvents::PollPoll::Modify(), and Stop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ SetPollEnt()

void XrdSys::IOEvents::Poller::SetPollEnt ( Channel * cP,
int ptEnt )
protected

Definition at line 1021 of file XrdSysIOEvents.cc.

1022{
1023 cP->pollEnt = pe;
1024}

Referenced by XrdSys::IOEvents::PollKQ::Exclude(), XrdSys::IOEvents::PollPoll::Include(), and XrdSys::IOEvents::PollKQ::Modify().

Here is the caller graph for this function:

◆ Shutdown()

virtual void XrdSys::IOEvents::Poller::Shutdown ( )
protectedpure virtual

Shutdown the poller. An implementation must be supplied. The shutdown method must release any allocated storage and close private file descriptors. The polling thread will have already been terminated and x-thread pipe closed. Warning: the derived destructor must call Stop() and do nothing else!

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

Referenced by Stop().

Here is the caller graph for this function:

◆ Stop()

void XrdSys::IOEvents::Poller::Stop ( )

Stop a poller object. Active callbacks are completed. Pending callbacks are discarded. After which the poller event thread exits. Subsequently, each associated channel is disabled and removed from the poller object. If the channel is enabled for a StopEvent, the stop callback is invoked. However, any attempt to use the channel methods that require an active poller will return an error.

Since a stopped poller cannot be restarted; the only thing left is to delete it. This also applies to all the associated channels since they no longer have an active poller.

Definition at line 1030 of file XrdSysIOEvents.cc.

1031{
1032 PipeData cmdbuff;
1033 CallBack *theCB;
1034 Channel *cP;
1035 void *cbArg;
1036 int doCB;
1037
1038// Initialize the pipdata structure
1039//
1040 memset(static_cast<void*>( &cmdbuff ), 0, sizeof(cmdbuff));
1041 cmdbuff.req = PipeData::Stop;
1042
1043// Lock all of this
1044//
1045 adMutex.Lock();
1046
1047// If we are already shutdown then we are done
1048//
1049 if (cmdFD == -1) {adMutex.UnLock(); return;}
1050
1051// First we must stop the poller thread in an orderly fashion.
1052//
1053 adMutex.UnLock();
1054 SendCmd(cmdbuff);
1055 adMutex.Lock();
1056
1057// Close the pipe communication mechanism
1058//
1059 close(cmdFD); cmdFD = -1;
1060 close(reqFD); reqFD = -1;
1061
1062// Run through cleaning up the channels. While there should not be any other
1063// operations happening on this poller, we take the conservative approach.
1064//
1065 while((cP = attBase))
1066 {REMOVE(attBase, attList, cP);
1067 adMutex.UnLock();
1068 cP->chMutex.Lock();
1069 doCB = cP->chCB != 0 && (cP->chEvents & Channel::stopEvent);
1070 if (cP->inTOQ) TmoDel(cP);
1071 cP->Reset(&pollErr1, cP->chFD, EIDRM);
1072 cP->chPollXQ = &pollErr1;
1073 if (doCB)
1074 {cP->chStat = Channel::isClear;
1075 theCB = cP->chCB; cbArg = cP->chCBA;
1076 cP->chMutex.UnLock();
1077 theCB->Stop(cP, cbArg);
1078 } else cP->chMutex.UnLock();
1079 adMutex.Lock();
1080 }
1081
1082// Now invoke the poller specific shutdown
1083//
1084 Shutdown();
1085 adMutex.UnLock();
1086}
#define REMOVE(dlbase, dlvar, curitem)
@ stopEvent
Poller stop event.
int SendCmd(PipeData &cmd)
virtual void Shutdown()=0

References attBase, Channel, close, cmdFD, XrdSysMutex::Lock(), XrdSys::IOEvents::pollErr1, REMOVE, XrdSys::IOEvents::Poller::PipeData::req, reqFD, SendCmd(), Shutdown(), XrdSys::IOEvents::CallBack::Stop(), XrdSys::IOEvents::Poller::PipeData::Stop, XrdSys::IOEvents::Channel::stopEvent, TmoDel(), and XrdSysMutex::UnLock().

Referenced by XrdSys::IOEvents::PollE::~PollE(), XrdSys::IOEvents::PollKQ::~PollKQ(), XrdSys::IOEvents::PollPoll::~PollPoll(), XrdSys::IOEvents::PollPort::~PollPort(), and XrdCl::PollerBuiltIn::Stop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ TmoAdd()

bool XrdSys::IOEvents::Poller::TmoAdd ( Channel * cP,
int tmoSet )
protected

Definition at line 1092 of file XrdSysIOEvents.cc.

1093{
1094 XrdSysMutexHelper mHelper(toMutex);
1095 time_t tNow;
1096 Channel *ncP;
1097 bool setRTO, setWTO;
1098
1099// Do some tracing
1100//
1101 IF_TRACE(TmoAdd,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1102 <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1103
1104// Remove element from timeout queue if it is there
1105//
1106 if (cP->inTOQ)
1107 {REMOVE(tmoBase, tmoList, cP);
1108 cP->inTOQ = 0;
1109 }
1110
1111// Determine which timeouts need to be reset
1112//
1113 tmoSet|= cP->dlType >> 4;
1116
1117// Reset the required deadlines
1118//
1119 tNow = time(0);
1120 if (setRTO && REVENTS(cP->chEvents) && cP->chRTO)
1121 cP->rdDL = cP->chRTO + tNow;
1122 if (setWTO && WEVENTS(cP->chEvents) && cP->chWTO)
1123 cP->wrDL = cP->chWTO + tNow;
1124
1125// Calculate the closest enabled deadline
1126//
1127 if (cP->rdDL < cP->wrDL)
1128 {cP->deadLine = cP->rdDL; cP->dlType = CallBack:: ReadTimeOut;
1129 } else {
1130 cP->deadLine = cP->wrDL; cP->dlType = CallBack::WriteTimeOut;
1131 if (cP->rdDL == cP->wrDL) cP->dlType |= CallBack:: ReadTimeOut;
1132 }
1133 IF_TRACE(TmoAdd, cP->chFD, "t=" <<tNow <<" rdDL=" <<setRTO <<' ' <<cP->rdDL
1134 <<" wrDL=" <<setWTO <<' ' <<cP->wrDL);
1135
1136// If no timeout really applies, we are done
1137//
1138 if (cP->deadLine == maxTime) return false;
1139
1140// Add the channel to the timeout queue in correct deadline position.
1141//
1142 if ((ncP = tmoBase))
1143 {do {if (cP->deadLine < ncP->deadLine) break;
1144 ncP = ncP->tmoList.next;
1145 } while(ncP != tmoBase);
1146 INSERT(tmoList, ncP, cP);
1147 if (cP->deadLine < tmoBase->deadLine) tmoBase = cP;
1148 } else tmoBase = cP;
1149 cP->inTOQ = 1;
1150
1151// Indicate to the caller whether or not a wakeup is required
1152//
1153 return (tmoBase == cP);
1154}
#define INSERT(dlvar, curitem, newitem)
#define STATUSOF(x)

References BOOLNAME, Channel, IF_TRACE, INSERT, maxTime, XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::CallBack::ReadyToRead, XrdSys::IOEvents::CallBack::ReadyToWrite, REMOVE, REVENTS, STATUSOF, TmoAdd(), tmoBase, tmoMask, WEVENTS, and XrdSys::IOEvents::CallBack::WriteTimeOut.

Referenced by CbkXeq(), and TmoAdd().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ TmoDel()

void XrdSys::IOEvents::Poller::TmoDel ( Channel * cP)
protected

Definition at line 1160 of file XrdSysIOEvents.cc.

1161{
1162
1163// Do some tracing
1164//
1165 IF_TRACE(TmoDel,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1166 <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1167
1168// Get the timeout queue lock and remove the channel from the queue
1169//
1170 toMutex.Lock();
1171 REMOVE(tmoBase, tmoList, cP);
1172 cP->inTOQ = 0;
1173 toMutex.UnLock();
1174}

References BOOLNAME, IF_TRACE, REMOVE, STATUSOF, tmoBase, and TmoDel().

Referenced by CbkXeq(), Stop(), and TmoDel().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ TmoGet()

int XrdSys::IOEvents::Poller::TmoGet ( )
protected

Definition at line 1180 of file XrdSysIOEvents.cc.

1181{
1182 int wtval;
1183
1184// Lock the timeout queue
1185//
1186 toMutex.Lock();
1187
1188// Calculate wait time. If the deadline passed, invoke the timeout callback.
1189// we will need to drop the timeout lock as we don't have the channel lock.
1190//
1191 do {if (!tmoBase) {wtval = -1; break;}
1192 wtval = (tmoBase->deadLine - time(0)) * 1000;
1193 if (wtval > 0) break;
1194 toMutex.UnLock();
1195 CbkTMO();
1196 toMutex.Lock();
1197 } while(1);
1198
1199// Return the value
1200//
1201 CPP_ATOMIC_STORE(wakePend, false, std::memory_order_release);
1202 toMutex.UnLock();
1203 return wtval;
1204}
#define CPP_ATOMIC_STORE(x, val, order)

References CbkTMO(), CPP_ATOMIC_STORE, and tmoBase.

Referenced by XrdSys::IOEvents::PollE::Begin(), XrdSys::IOEvents::PollKQ::Begin(), XrdSys::IOEvents::PollPoll::Begin(), and XrdSys::IOEvents::PollPort::BegTO().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ UnLockChannel()

void XrdSys::IOEvents::Poller::UnLockChannel ( Channel * cP)
inlineprotected

Definition at line 448 of file XrdSysIOEvents.hh.

448{cP->chMutex.UnLock();}

References Channel, and XrdSysMutex::UnLock().

Referenced by XrdSys::IOEvents::PollE::Exclude(), XrdSys::IOEvents::PollKQ::Exclude(), XrdSys::IOEvents::PollPoll::Exclude(), XrdSys::IOEvents::PollPort::Exclude(), XrdSys::IOEvents::PollPoll::Include(), and XrdSys::IOEvents::PollPoll::Modify().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ BootStrap

friend class BootStrap
friend

Definition at line 373 of file XrdSysIOEvents.hh.

References BootStrap.

Referenced by BootStrap.

◆ Channel

Member Data Documentation

◆ attBase

Channel* XrdSys::IOEvents::Poller::attBase
protected

Definition at line 488 of file XrdSysIOEvents.hh.

Referenced by Poller(), and Stop().

◆ chDead

bool XrdSys::IOEvents::Poller::chDead
protected

Definition at line 511 of file XrdSysIOEvents.hh.

Referenced by CbkXeq(), and XrdSys::IOEvents::Channel::Delete().

◆ cmdFD

int XrdSys::IOEvents::Poller::cmdFD
protected

Definition at line 494 of file XrdSysIOEvents.hh.

Referenced by Poller(), SendCmd(), and Stop().

◆ maxTime

time_t XrdSys::IOEvents::Poller::maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff)
staticprotected

Definition at line 513 of file XrdSysIOEvents.hh.

Referenced by CbkXeq(), XrdSys::IOEvents::Channel::Enable(), and TmoAdd().

◆ parentPID

pid_t XrdSys::IOEvents::Poller::parentPID = getpid()
staticprotected

◆ pipeBlen

int XrdSys::IOEvents::Poller::pipeBlen
protected

Definition at line 508 of file XrdSysIOEvents.hh.

Referenced by Poller(), and GetRequest().

◆ pipeBuff

char* XrdSys::IOEvents::Poller::pipeBuff
protected

Definition at line 507 of file XrdSysIOEvents.hh.

Referenced by Poller(), and GetRequest().

◆ pipePoll

struct pollfd XrdSys::IOEvents::Poller::pipePoll
protected

Definition at line 493 of file XrdSysIOEvents.hh.

Referenced by Poller(), and GetRequest().

◆ pollTid

pthread_t XrdSys::IOEvents::Poller::pollTid
protected

◆ reqBuff

PipeData XrdSys::IOEvents::Poller::reqBuff
protected

Definition at line 506 of file XrdSysIOEvents.hh.

Referenced by GetRequest().

◆ reqFD

int XrdSys::IOEvents::Poller::reqFD
protected

Definition at line 495 of file XrdSysIOEvents.hh.

Referenced by Poller(), XrdSys::IOEvents::PollKQ::PollKQ(), GetRequest(), and Stop().

◆ tmoBase

Channel* XrdSys::IOEvents::Poller::tmoBase
protected

Definition at line 489 of file XrdSysIOEvents.hh.

Referenced by Poller(), CbkTMO(), TmoAdd(), TmoDel(), and TmoGet().

◆ tmoMask

unsigned char XrdSys::IOEvents::Poller::tmoMask
protected

Definition at line 509 of file XrdSysIOEvents.hh.

Referenced by Poller(), Create(), and TmoAdd().


The documentation for this class was generated from the following files: