XRootD
XrdSys::IOEvents::Poller Class Referenceabstract

#include <XrdSysIOEvents.hh>

+ Inheritance diagram for XrdSys::IOEvents::Poller:
+ Collaboration diagram for XrdSys::IOEvents::Poller:

Classes

struct  PipeData
 

Public Types

enum  CreateOpts { optTOM }
 

Public Member Functions

 Poller (int cFD, int rFD)
 
virtual ~Poller ()
 Destructor. Stop() is effecively called when this object is deleted. More...
 
void Stop ()
 

Static Public Member Functions

static PollerCreate (int &eNum, const char **eTxt=0, int crOpts=0)
 

Protected Member Functions

virtual void Begin (XrdSysSemaphore *syncp, int &rc, const char **eTxt)=0
 
void CbkTMO ()
 
bool CbkXeq (Channel *cP, int events, int eNum, const char *eTxt)
 
 CPP_ATOMIC_TYPE (bool) wakePend
 
virtual void Exclude (Channel *cP, bool &isLocked, bool dover=1)=0
 
int GetFault (Channel *cP)
 
int GetPollEnt (Channel *cP)
 
int GetRequest ()
 
virtual bool Include (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
 
bool Init (Channel *cP, int &eNum, const char **eTxt, bool &isLockd)
 
void LockChannel (Channel *cP)
 
virtual bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
 
int Poll2Enum (short events)
 
int SendCmd (PipeData &cmd)
 
void SetPollEnt (Channel *cP, int ptEnt)
 
virtual void Shutdown ()=0
 
bool TmoAdd (Channel *cP, int tmoSet)
 
void TmoDel (Channel *cP)
 
int TmoGet ()
 
void UnLockChannel (Channel *cP)
 

Protected Attributes

ChannelattBase
 
bool chDead
 
int cmdFD
 
int pipeBlen
 
char * pipeBuff
 
struct pollfd pipePoll
 
pthread_t pollTid
 
PipeData reqBuff
 
int reqFD
 
ChanneltmoBase
 
unsigned char tmoMask
 

Static Protected Attributes

static time_t maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff)
 
static pid_t parentPID = getpid()
 

Friends

class BootStrap
 
class Channel
 

Detailed Description

Define a poller object interface. A poller fields and dispatches event callbacks. An actual instance of a poller object is obtained by using the Create() method. You cannot simply create an instance of this object using new or in-place declaration since it is abstract. Any number of these objects may created. Each creation spawns a polling thread.

Definition at line 371 of file XrdSysIOEvents.hh.

Member Enumeration Documentation

◆ CreateOpts

Create a specialized instance of a poller object, initialize it, and start the polling process. You must call Create() to obtain a specialized poller.

Parameters
eNumPlace where errno is placed upon failure.
eTxtPlace where a pointer to the description of the failing operation is to be set. If null, no description is returned.
crOptsPoller options (see static const optxxx): optTOM - Timeout resumption after a timeout event must be manually reenabled. By default, event timeouts are automatically renabled after successful callbacks.
Returns
!0 Poller successfully created and started. eNum contains zero. eTxt if not null contains a null string. The returned value is a pointer to the Poller object. 0 Poller could not be created. eNum contains the associated errno value. eTxt if not null contains the failing operation.
Enumerator
optTOM 

Definition at line 398 of file XrdSysIOEvents.hh.

Constructor & Destructor Documentation

◆ Poller()

XrdSys::IOEvents::Poller::Poller ( int  cFD,
int  rFD 
)

Constructor

Parameters
cFDThe file descriptor to send commands to the poll thread.
rFDThe file descriptor to recv commands in the poll thread.

Definition at line 572 of file XrdSysIOEvents.cc.

573 {
574 
575 // Now initialize local class members
576 //
577  attBase = 0;
578  tmoBase = 0;
579  cmdFD = cFD;
580  reqFD = rFD;
581  wakePend = false;
582  pipeBuff = 0;
583  pipeBlen = 0;
584  pipePoll.fd = rFD;
585  pipePoll.events = POLLIN | POLLRDNORM;
586  tmoMask = 255;
587 }

◆ ~Poller()

virtual XrdSys::IOEvents::Poller::~Poller ( )
inlinevirtual

Destructor. Stop() is effecively called when this object is deleted.

Definition at line 430 of file XrdSysIOEvents.hh.

430 {}

Member Function Documentation

◆ Begin()

virtual void XrdSys::IOEvents::Poller::Begin ( XrdSysSemaphore syncp,
int &  rc,
const char **  eTxt 
)
protectedpure virtual

Start the polling event loop. An implementation must be supplied. Begin() is called via the internal BootStrap class from a new thread.

Implemented in XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollPort, XrdSys::IOEvents::PollPoll, XrdSys::IOEvents::PollKQ, and XrdSys::IOEvents::PollE.

Referenced by XrdSys::IOEvents::BootStrap::Start().

+ Here is the caller graph for this function:

◆ CbkTMO()

void XrdSys::IOEvents::Poller::CbkTMO ( )
protected

Definition at line 615 of file XrdSysIOEvents.cc.

616 {
617  Channel *cP;
618 
619 // Process each element in the timeout queue, calling the callback function
620 // if the timeout has passed. As this method can be called with a lock on the
621 // channel mutex, we need to drop it prior to calling the callback.
622 //
623  toMutex.Lock();
624  while((cP = tmoBase) && cP->deadLine <= time(0))
625  {int dlType = cP->dlType;
626  toMutex.UnLock();
627  CbkXeq(cP, dlType, 0, 0);
628  toMutex.Lock();
629  }
630  toMutex.UnLock();
631 }
bool CbkXeq(Channel *cP, int events, int eNum, const char *eTxt)

◆ CbkXeq()

bool XrdSys::IOEvents::Poller::CbkXeq ( Channel cP,
int  events,
int  eNum,
const char *  eTxt 
)
protected

Definition at line 637 of file XrdSysIOEvents.cc.

639 {
640  XrdSysMutexHelper cbkMHelp(cP->chMutex);
641  char oldEvents;
642  bool cbok, retval, isRead, isWrite, isLocked = true;
643 
644 // Perform any required tracing
645 //
646  if (TRACING)
647  {const char *cbtype = (cP->chPoller == cP->chPollXQ ? "norm" :
648  (cP->chPoller == &pollInit ? "init" :
649  (cP->chPoller == &pollWait ? "wait" : "err")));
650  DO_TRACE(CbkXeq,cP->chFD,"callback events=" <<events
651  <<" chev=" <<static_cast<int>(cP->chEvents)
652  <<" toq=" <<(cP->inTOQ != 0) <<" erc=" <<eNum
653  <<" callback " <<(cP->chCB ? "present" : "missing")
654  <<" poller=" <<cbtype);
655  }
656 
657 // Remove this from the timeout queue if there and reset the deadlines based
658 // on the event we are reflecting. This separates read and write deadlines
659 //
660  if (cP->inTOQ)
661  {TmoDel(cP);
662  cP->dlType |= (events & CallBack::ValidEvents) << 4;
663  isRead = events & (CallBack::ReadyToRead | CallBack:: ReadTimeOut);
664  if (isRead) cP->rdDL = maxTime;
665  isWrite= events & (CallBack::ReadyToWrite | CallBack::WriteTimeOut);
666  if (isWrite) cP->wrDL = maxTime;
667  } else {
668  cP->dlType &= CallBack::ValidEvents;
669  isRead = isWrite = false;
670  }
671 
672 // Verify that there is a callback here and the channel is ready. If not,
673 // disable this channel for the events being refelcted unless the event is a
674 // fatal error. In this case we need to abandon the channel since error events
675 // may continue to be generated as we can't always disable them.
676 //
677  if (!(cP->chCB) || cP->chPoller != cP->chPollXQ)
678  {if (eNum)
679  {cP->chPoller = &pollErr1; cP->chFault = eNum;
680  cP->inPSet = 0;
681  return false;
682  }
683  oldEvents = cP->chEvents;
684  cP->chEvents = 0;
685  retval = cP->chPoller->Modify(cP, eNum, 0, isLocked);
686  TRACE_MOD(CbkXeq,cP->chFD,0);
687  if (!isLocked) cP->chMutex.Lock();
688  cP->chEvents = oldEvents;
689  return true;
690  }
691 
692 // Resolve the problem where we get an error event but the channel wants them
693 // presented as a read or write event. If neither is possible then defer the
694 // error until the channel is enabled again.
695 //
696  if (eNum)
697  {if (cP->chEvents & Channel::errorEvents)
698  {cP->chPoller = &pollErr1; cP->chFault = eNum;
699  cP->chStat = Channel::isCBMode;
700  chDead = false;
701  cbkMHelp.UnLock();
702  cP->chCB->Fatal(cP,cP->chCBA, eNum, eTxt);
703  if (chDead) return true;
704  cbkMHelp.Lock(&(cP->chMutex));
705  cP->inPSet = 0;
706  return false;
707  }
708  if (REVENTS(cP->chEvents)) events = CallBack::ReadyToRead;
709  else if (WEVENTS(cP->chEvents)) events = CallBack::ReadyToWrite;
710  else {cP->chPoller = &pollErr1; cP->chFault = eNum; cP->inPSet = 0;
711  return false;
712  }
713  }
714 
715 // Indicate that we are in callback mode then drop the channel lock and effect
716 // the callback. This allows the callback to freely manage locks.
717 //
718  cP->chStat = Channel::isCBMode;
719  chDead = false;
720  // Detach() may be called after unlocking the channel and would zero the
721  // callback pointer and argument. So keep a copy.
722  CallBack *cb = cP->chCB;
723  void *cba = cP->chCBA;
724  cbkMHelp.UnLock();
725  IF_TRACE(CbkXeq,cP->chFD,"invoking callback; events=" <<events);
726  cbok = cb->Event(cP,cba, events);
727  IF_TRACE(CbkXeq,cP->chFD,"callback returned " <<BOOLNAME(cbok));
728 
729 // If channel destroyed by the callback, bail really fast. Otherwise, regain
730 // the channel lock.
731 //
732  if (chDead) return true;
733  cbkMHelp.Lock(&(cP->chMutex));
734 
735 // If the channel is being destroyed; then another thread must have done so.
736 // Tell it the callback has finished and just return.
737 //
738  if (cP->chStat != Channel::isCBMode)
739  {if (cP->chStat == Channel::isDead)
740  {XrdSysSemaphore *theSem = (XrdSysSemaphore *)cP->chCBA;
741  // channel will be destroyed shortly after post, unlock mutex before
742  cbkMHelp.UnLock();
743  theSem->Post();
744  }
745  return true;
746  }
747  cP->chStat = Channel::isClear;
748 
749 // Handle enable or disable here. If we keep the channel enabled then reset
750 // the timeout if it hasn't been handled via a call from the callback.
751 //
752  if (!cbok) Detach(cP,isLocked,false);
753  else if ((isRead || isWrite) && !(cP->inTOQ) && (cP->chRTO || cP->chWTO))
754  TmoAdd(cP, 0);
755 
756 // All done. While the mutex should not have been unlocked, we relock it if
757 // it has to keep the mutex helper from croaking.
758 //
759  if (!isLocked) cP->chMutex.Lock();
760  return true;
761 }
#define IF_TRACE(x, fd, y)
#define TRACING
#define DO_TRACE(x, fd, y)
#define REVENTS(x)
#define BOOLNAME(x)
#define TRACE_MOD(x, fd, y)
#define WEVENTS(x)
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ WriteTimeOut
Write timeout.
@ ValidEvents
Mask to test for valid events.
@ errorEvents
Error event non-r/w specific.
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void TmoDel(Channel *cP)
bool TmoAdd(Channel *cP, int tmoSet)

References BOOLNAME, DO_TRACE, XrdSys::IOEvents::Channel::errorEvents, XrdSys::IOEvents::CallBack::Event(), XrdSys::IOEvents::CallBack::Fatal(), IF_TRACE, XrdSysMutex::Lock(), XrdSysMutexHelper::Lock(), Modify(), XrdSys::IOEvents::pollErr1, XrdSys::IOEvents::pollInit, XrdSys::IOEvents::pollWait, XrdSysSemaphore::Post(), XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::CallBack::ReadyToRead, XrdSys::IOEvents::CallBack::ReadyToWrite, REVENTS, TRACE_MOD, TRACING, XrdSysMutexHelper::UnLock(), XrdSys::IOEvents::CallBack::ValidEvents, WEVENTS, and XrdSys::IOEvents::CallBack::WriteTimeOut.

+ Here is the call graph for this function:

◆ CPP_ATOMIC_TYPE()

XrdSys::IOEvents::Poller::CPP_ATOMIC_TYPE ( bool  )
protected

◆ Create()

XrdSys::IOEvents::Poller * XrdSys::IOEvents::Poller::Create ( int &  eNum,
const char **  eTxt = 0,
int  crOpts = 0 
)
static

Definition at line 767 of file XrdSysIOEvents.cc.

770 {
771  int fildes[2];
772  struct pollArg pArg;
773  pthread_t tid;
774 
775 // Create a pipe used to break the poll wait loop
776 //
777  if (XrdSysFD_Pipe(fildes))
778  {eNum = errno;
779  if (eTxt) *eTxt = "creating poll pipe";
780  return 0;
781  }
782 
783 // Create an actual implementation of a poller
784 //
785  if (!(pArg.pollP = newPoller(fildes, eNum, eTxt)))
786  {close(fildes[0]);
787  close(fildes[1]);
788  return 0;
789  }
790 
791 // Now start a thread to handle this poller object
792 //
794  (void *)&pArg, XRDSYSTHREAD_BIND, "Poller")))
795  {if (eTxt) *eTxt = "creating poller thread"; return 0;}
796 
797 // Now wait for the thread to finish initializing before we allow use
798 // Note that the bootstrap takes ownership of the semaphore and will delete it
799 // once the thread positing the semaphore actually ends. This is to avoid
800 // semaphore bugs present in certain (e.g. Linux) kernels.
801 //
802  pArg.pollSync->Wait();
803 
804 // Check if all went well
805 //
806  if (pArg.retCode)
807  {if (eTxt) *eTxt = (pArg.retMsg ? pArg.retMsg : "starting poller");
808  eNum = pArg.retCode;
809  delete pArg.pollP;
810  return 0;
811  }
812 
813 // Set creation options in the new poller
814 //
815  if (crOpts & optTOM)
816  pArg.pollP->tmoMask = ~(CallBack::ReadTimeOut|CallBack::WriteTimeOut);
817 
818 // All done
819 //
820  eNum = 0;
821  if (eTxt) *eTxt = "";
822  return pArg.pollP;
823 }
#define close(a)
Definition: XrdPosix.hh:48
#define XRDSYSTHREAD_BIND
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void * Start(void *parg)

References close, XrdSys::IOEvents::pollArg::pollP, XrdSys::IOEvents::pollArg::pollSync, XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::pollArg::retCode, XrdSys::IOEvents::pollArg::retMsg, XrdSysThread::Run(), XrdSys::IOEvents::BootStrap::Start(), tmoMask, XrdSysSemaphore::Wait(), XrdSys::IOEvents::CallBack::WriteTimeOut, and XRDSYSTHREAD_BIND.

+ Here is the call graph for this function:

◆ Exclude()

virtual void XrdSys::IOEvents::Poller::Exclude ( Channel cP,
bool &  isLocked,
bool  dover = 1 
)
protectedpure virtual

Remove a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollPort, XrdSys::IOEvents::PollPoll, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollerInit, and XrdSys::IOEvents::PollerErr1.

◆ GetFault()

int XrdSys::IOEvents::Poller::GetFault ( Channel cP)
inlineprotected

Definition at line 437 of file XrdSysIOEvents.hh.

437 {return cP->chFault;}

Referenced by XrdSys::IOEvents::PollerErr1::Include(), and XrdSys::IOEvents::PollerErr1::Modify().

+ Here is the caller graph for this function:

◆ GetPollEnt()

int XrdSys::IOEvents::Poller::GetPollEnt ( Channel cP)
inlineprotected

Definition at line 438 of file XrdSysIOEvents.hh.

438 {return cP->pollEnt;}

◆ GetRequest()

int XrdSys::IOEvents::Poller::GetRequest ( )
protected

Definition at line 875 of file XrdSysIOEvents.cc.

876 {
877  ssize_t rlen;
878  int rc;
879 
880 // See if we are to resume a read or start a fresh one
881 //
882  if (!pipeBlen)
883  {pipeBuff = (char *)&reqBuff; pipeBlen = sizeof(reqBuff);}
884 
885 // Wait for the next request. Some OS's (like Linux) don't support non-blocking
886 // pipes. So, we must front the read with a poll.
887 //
888  do {rc = poll(&pipePoll, 1, 0);}
889  while(rc < 0 && (errno == EAGAIN || errno == EINTR));
890  if (rc < 1) return 0;
891 
892 // Now we can put up a read without a delay. Normally a full command will be
893 // present. Under some heavy conditions, this may not be the case.
894 //
895  do {rlen = read(reqFD, pipeBuff, pipeBlen);}
896  while(rlen < 0 && errno == EINTR);
897  if (rlen <= 0)
898  {std::cerr <<"Poll: "<<XrdSysE2T(errno)<<" reading from request pipe\n"<< std::flush;
899  return 0;
900  }
901 
902 // Check if all the data has arrived. If not all the data is present, defer
903 // this request until more data arrives.
904 //
905  if (!(pipeBlen -= rlen)) return 1;
906  pipeBuff += rlen;
907  return 0;
908 }
ssize_t read(int fildes, void *buf, size_t nbyte)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104

References read(), and XrdSysE2T().

+ Here is the call graph for this function:

◆ Include()

virtual bool XrdSys::IOEvents::Poller::Include ( Channel cP,
int &  eNum,
const char **  eTxt,
bool &  isLocked 
)
protectedpure virtual

Add a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollPort, XrdSys::IOEvents::PollPoll, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollerInit, and XrdSys::IOEvents::PollerErr1.

Referenced by Init().

+ Here is the caller graph for this function:

◆ Init()

bool XrdSys::IOEvents::Poller::Init ( Channel cP,
int &  eNum,
const char **  eTxt,
bool &  isLockd 
)
protected

Definition at line 914 of file XrdSysIOEvents.cc.

916 {
917 // The channel must be locked upon entry!
918 //
919  bool retval;
920 
921 
922 // If we are already in progress then simply update the shadow events and
923 // resuppress all current events.
924 //
925  if (cP->chPoller == &pollWait)
926  {cP->reMod = cP->chEvents;
927  cP->chEvents = 0;
928  IF_TRACE(Init,cP->chFD,"defer events=" <<cP->reMod);
929  return true;
930  }
931 
932 // Trace this entry
933 //
934  IF_TRACE(Init,cP->chFD,"begin events=" <<int(cP->chEvents));
935 
936 // If no events are enabled at this point, just return
937 //
938  if (!(cP->chEvents)) return true;
939 
940 // Refuse to enable a channel without a callback function
941 //
942  if (!(cP->chCB))
943  {eNum = EDESTADDRREQ;
944  if (eTxt) *eTxt = "enabling without a callback";
945  return false;
946  }
947 
948 // So, now we can include the channel in the poll set. We will include it
949 // with no events enabled to prevent callbacks prior to completion here.
950 //
951  cP->chPoller = &pollWait; cP->reMod = cP->chEvents; cP->chEvents = 0;
952  retval = cP->chPollXQ->Include(cP, eNum, eTxt, isLocked);
953  IF_TRACE(Init,cP->chFD,"Include() returned " <<BOOLNAME(retval) <<TRACE_LOK);
954  if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
955 
956 // Determine what future poller to use. If we can use the regular poller then
957 // set the correct event mask for the channel. Note that we could have lost
958 // control but the correct events will be reflected in the "reMod" member.
959 //
960  if (!retval) {cP->chPoller = &pollErr1; cP->chFault = eNum;}
961  else {cP->chPoller = cP->chPollXQ;
962  cP->inPSet = 1;
963  if (cP->reMod)
964  {cP->chEvents = cP->reMod;
965  retval = cP->chPoller->Modify(cP, eNum, eTxt, isLocked);
966  TRACE_MOD(Init,cP->chFD,int(cP->reMod));
967  if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
968  } else {
969  TRACE_NOD(Init,cP->chFD,0);
970  }
971  }
972 
973 // All done
974 //
975  cP->reMod = 0;
976  return retval;
977 }
#define TRACE_LOK
#define TRACE_NOD(x, fd, y)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd)

References BOOLNAME, IF_TRACE, Include(), XrdSysMutex::Lock(), Modify(), XrdSys::IOEvents::pollErr1, XrdSys::IOEvents::pollWait, TRACE_LOK, TRACE_MOD, and TRACE_NOD.

Referenced by XrdSys::IOEvents::PollerInit::Modify(), and XrdSys::IOEvents::PollerWait::Modify().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ LockChannel()

void XrdSys::IOEvents::Poller::LockChannel ( Channel cP)
inlineprotected

Definition at line 441 of file XrdSysIOEvents.hh.

441 {cP->chMutex.Lock();}

References XrdSysMutex::Lock().

+ Here is the call graph for this function:

◆ Modify()

virtual bool XrdSys::IOEvents::Poller::Modify ( Channel cP,
int &  eNum,
const char **  eTxt,
bool &  isLocked 
)
protectedpure virtual

Modify the event status of a channel. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollPort, XrdSys::IOEvents::PollPoll, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollerInit, and XrdSys::IOEvents::PollerErr1.

Referenced by CbkXeq(), and Init().

+ Here is the caller graph for this function:

◆ Poll2Enum()

int XrdSys::IOEvents::Poller::Poll2Enum ( short  events)
protected

Definition at line 983 of file XrdSysIOEvents.cc.

984 {
985  if (events & POLLERR) return EPIPE;
986 
987  if (events & POLLHUP) return ECONNRESET;
988 
989  if (events & POLLNVAL) return EBADF;
990 
991  return EOPNOTSUPP;
992 }

◆ SendCmd()

int XrdSys::IOEvents::Poller::SendCmd ( PipeData cmd)
protected

Definition at line 998 of file XrdSysIOEvents.cc.

999 {
1000  int wlen;
1001 
1002 // Pipe writes are atomic so we don't need locks. Some commands require
1003 // confirmation. We handle that here based on the command. Note that pipes
1004 // gaurantee that all of the data will be written or we will block.
1005 //
1006  if (cmd.req >= PipeData::Post)
1007  {XrdSysSemaphore mySem(0);
1008  cmd.theSem = &mySem;
1009  do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1010  while (wlen < 0 && errno == EINTR);
1011  if (wlen > 0) mySem.Wait();
1012  cmd.theSem = nullptr;
1013  } else {
1014  do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1015  while (wlen < 0 && errno == EINTR);
1016  }
1017 
1018 // All done
1019 //
1020  return (wlen >= 0 ? 0 : errno);
1021 }
#define write(a, b, c)
Definition: XrdPosix.hh:121

References XrdSys::IOEvents::Poller::PipeData::req, XrdSys::IOEvents::Poller::PipeData::theSem, XrdSysSemaphore::Wait(), and write.

+ Here is the call graph for this function:

◆ SetPollEnt()

void XrdSys::IOEvents::Poller::SetPollEnt ( Channel cP,
int  ptEnt 
)
protected

Definition at line 1027 of file XrdSysIOEvents.cc.

1028 {
1029  cP->pollEnt = pe;
1030 }

◆ Shutdown()

virtual void XrdSys::IOEvents::Poller::Shutdown ( )
protectedpure virtual

Shutdown the poller. An implementation must be supplied. The shutdown method must release any allocated storage and close private file descriptors. The polling thread will have already been terminated and x-thread pipe closed. Warning: the derived destructor must call Stop() and do nothing else!

Implemented in XrdSys::IOEvents::PollPort, XrdSys::IOEvents::PollPoll, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollerInit, and XrdSys::IOEvents::PollerErr1.

◆ Stop()

void XrdSys::IOEvents::Poller::Stop ( )

Stop a poller object. Active callbacks are completed. Pending callbacks are discarded. After which the poller event thread exits. Subsequently, each associated channel is disabled and removed from the poller object. If the channel is enabled for a StopEvent, the stop callback is invoked. However, any attempt to use the channel methods that require an active poller will return an error.

Since a stopped poller cannot be restarted; the only thing left is to delete it. This also applies to all the associated channels since they no longer have an active poller.

Definition at line 1036 of file XrdSysIOEvents.cc.

1037 {
1038  PipeData cmdbuff;
1039  CallBack *theCB;
1040  Channel *cP;
1041  void *cbArg;
1042  int doCB;
1043 
1044 // Initialize the pipdata structure
1045 //
1046  memset(static_cast<void*>( &cmdbuff ), 0, sizeof(cmdbuff));
1047  cmdbuff.req = PipeData::Stop;
1048 
1049 // Lock all of this
1050 //
1051  adMutex.Lock();
1052 
1053 // If we are already shutdown then we are done
1054 //
1055  if (cmdFD == -1) {adMutex.UnLock(); return;}
1056 
1057 // First we must stop the poller thread in an orderly fashion.
1058 //
1059  adMutex.UnLock();
1060  SendCmd(cmdbuff);
1061  adMutex.Lock();
1062 
1063 // Close the pipe communication mechanism
1064 //
1065  close(cmdFD); cmdFD = -1;
1066  close(reqFD); reqFD = -1;
1067 
1068 // Run through cleaning up the channels. While there should not be any other
1069 // operations happening on this poller, we take the conservative approach.
1070 //
1071  while((cP = attBase))
1072  {REMOVE(attBase, attList, cP);
1073  adMutex.UnLock();
1074  cP->chMutex.Lock();
1075  doCB = cP->chCB != 0 && (cP->chEvents & Channel::stopEvent);
1076  if (cP->inTOQ) TmoDel(cP);
1077  cP->Reset(&pollErr1, cP->chFD, EIDRM);
1078  cP->chPollXQ = &pollErr1;
1079  if (doCB)
1080  {cP->chStat = Channel::isClear;
1081  theCB = cP->chCB; cbArg = cP->chCBA;
1082  cP->chMutex.UnLock();
1083  theCB->Stop(cP, cbArg);
1084  } else cP->chMutex.UnLock();
1085  adMutex.Lock();
1086  }
1087 
1088 // Now invoke the poller specific shutdown
1089 //
1090  Shutdown();
1091  adMutex.UnLock();
1092 }
#define REMOVE(dlbase, dlvar, curitem)
@ stopEvent
Poller stop event.
int SendCmd(PipeData &cmd)
virtual void Shutdown()=0

References close, XrdSysMutex::Lock(), XrdSys::IOEvents::pollErr1, REMOVE, XrdSys::IOEvents::Poller::PipeData::req, XrdSys::IOEvents::CallBack::Stop(), XrdSys::IOEvents::Channel::stopEvent, and XrdSysMutex::UnLock().

Referenced by XrdSys::IOEvents::PollE::~PollE(), XrdSys::IOEvents::PollKQ::~PollKQ(), XrdSys::IOEvents::PollPoll::~PollPoll(), XrdSys::IOEvents::PollPort::~PollPort(), and XrdCl::PollerBuiltIn::Stop().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ TmoAdd()

bool XrdSys::IOEvents::Poller::TmoAdd ( Channel cP,
int  tmoSet 
)
protected

Definition at line 1098 of file XrdSysIOEvents.cc.

1099 {
1100  XrdSysMutexHelper mHelper(toMutex);
1101  time_t tNow;
1102  Channel *ncP;
1103  bool setRTO, setWTO;
1104 
1105 // Do some tracing
1106 //
1107  IF_TRACE(TmoAdd,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1108  <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1109 
1110 // Remove element from timeout queue if it is there
1111 //
1112  if (cP->inTOQ)
1113  {REMOVE(tmoBase, tmoList, cP);
1114  cP->inTOQ = 0;
1115  }
1116 
1117 // Determine which timeouts need to be reset
1118 //
1119  tmoSet|= cP->dlType >> 4;
1120  setRTO = (tmoSet&tmoMask) & (CallBack::ReadyToRead |CallBack:: ReadTimeOut);
1122 
1123 // Reset the required deadlines
1124 //
1125  tNow = time(0);
1126  if (setRTO && REVENTS(cP->chEvents) && cP->chRTO)
1127  cP->rdDL = cP->chRTO + tNow;
1128  if (setWTO && WEVENTS(cP->chEvents) && cP->chWTO)
1129  cP->wrDL = cP->chWTO + tNow;
1130 
1131 // Calculate the closest enabled deadline
1132 //
1133  if (cP->rdDL < cP->wrDL)
1134  {cP->deadLine = cP->rdDL; cP->dlType = CallBack:: ReadTimeOut;
1135  } else {
1136  cP->deadLine = cP->wrDL; cP->dlType = CallBack::WriteTimeOut;
1137  if (cP->rdDL == cP->wrDL) cP->dlType |= CallBack:: ReadTimeOut;
1138  }
1139  IF_TRACE(TmoAdd, cP->chFD, "t=" <<tNow <<" rdDL=" <<setRTO <<' ' <<cP->rdDL
1140  <<" wrDL=" <<setWTO <<' ' <<cP->wrDL);
1141 
1142 // If no timeout really applies, we are done
1143 //
1144  if (cP->deadLine == maxTime) return false;
1145 
1146 // Add the channel to the timeout queue in correct deadline position.
1147 //
1148  if ((ncP = tmoBase))
1149  {do {if (cP->deadLine < ncP->deadLine) break;
1150  ncP = ncP->tmoList.next;
1151  } while(ncP != tmoBase);
1152  INSERT(tmoList, ncP, cP);
1153  if (cP->deadLine < tmoBase->deadLine) tmoBase = cP;
1154  } else tmoBase = cP;
1155  cP->inTOQ = 1;
1156 
1157 // Indicate to the caller whether or not a wakeup is required
1158 //
1159  return (tmoBase == cP);
1160 }
#define STATUSOF(x)
#define INSERT(dlvar, curitem, newitem)
@ dec
Definition: XrdSysTrace.hh:42
@ hex
Definition: XrdSysTrace.hh:42

References BOOLNAME, Xrd::dec, Xrd::hex, IF_TRACE, INSERT, XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::CallBack::ReadyToRead, XrdSys::IOEvents::CallBack::ReadyToWrite, REMOVE, REVENTS, STATUSOF, WEVENTS, and XrdSys::IOEvents::CallBack::WriteTimeOut.

◆ TmoDel()

void XrdSys::IOEvents::Poller::TmoDel ( Channel cP)
protected

Definition at line 1166 of file XrdSysIOEvents.cc.

1167 {
1168 
1169 // Do some tracing
1170 //
1171  IF_TRACE(TmoDel,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1172  <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1173 
1174 // Get the timeout queue lock and remove the channel from the queue
1175 //
1176  toMutex.Lock();
1177  REMOVE(tmoBase, tmoList, cP);
1178  cP->inTOQ = 0;
1179  toMutex.UnLock();
1180 }

References BOOLNAME, Xrd::dec, Xrd::hex, IF_TRACE, REMOVE, and STATUSOF.

◆ TmoGet()

int XrdSys::IOEvents::Poller::TmoGet ( )
protected

Definition at line 1186 of file XrdSysIOEvents.cc.

1187 {
1188  int wtval;
1189 
1190 // Lock the timeout queue
1191 //
1192  toMutex.Lock();
1193 
1194 // Calculate wait time. If the deadline passed, invoke the timeout callback.
1195 // we will need to drop the timeout lock as we don't have the channel lock.
1196 //
1197  do {if (!tmoBase) {wtval = -1; break;}
1198  wtval = (tmoBase->deadLine - time(0)) * 1000;
1199  if (wtval > 0) break;
1200  toMutex.UnLock();
1201  CbkTMO();
1202  toMutex.Lock();
1203  } while(1);
1204 
1205 // Return the value
1206 //
1207  CPP_ATOMIC_STORE(wakePend, false, std::memory_order_release);
1208  toMutex.UnLock();
1209  return wtval;
1210 }
#define CPP_ATOMIC_STORE(x, val, order)

References CPP_ATOMIC_STORE.

Referenced by XrdSys::IOEvents::PollPort::BegTO().

+ Here is the caller graph for this function:

◆ UnLockChannel()

void XrdSys::IOEvents::Poller::UnLockChannel ( Channel cP)
inlineprotected

Definition at line 448 of file XrdSysIOEvents.hh.

448 {cP->chMutex.UnLock();}

References XrdSysMutex::UnLock().

+ Here is the call graph for this function:

Friends And Related Function Documentation

◆ BootStrap

friend class BootStrap
friend

Definition at line 373 of file XrdSysIOEvents.hh.

◆ Channel

friend class Channel
friend

Definition at line 374 of file XrdSysIOEvents.hh.

Member Data Documentation

◆ attBase

Channel* XrdSys::IOEvents::Poller::attBase
protected

Definition at line 488 of file XrdSysIOEvents.hh.

◆ chDead

bool XrdSys::IOEvents::Poller::chDead
protected

Definition at line 511 of file XrdSysIOEvents.hh.

Referenced by XrdSys::IOEvents::Channel::Delete().

◆ cmdFD

int XrdSys::IOEvents::Poller::cmdFD
protected

Definition at line 494 of file XrdSysIOEvents.hh.

◆ maxTime

time_t XrdSys::IOEvents::Poller::maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff)
staticprotected

Definition at line 513 of file XrdSysIOEvents.hh.

Referenced by XrdSys::IOEvents::Channel::Enable().

◆ parentPID

pid_t XrdSys::IOEvents::Poller::parentPID = getpid()
staticprotected

Definition at line 515 of file XrdSysIOEvents.hh.

◆ pipeBlen

int XrdSys::IOEvents::Poller::pipeBlen
protected

Definition at line 508 of file XrdSysIOEvents.hh.

◆ pipeBuff

char* XrdSys::IOEvents::Poller::pipeBuff
protected

Definition at line 507 of file XrdSysIOEvents.hh.

◆ pipePoll

struct pollfd XrdSys::IOEvents::Poller::pipePoll
protected

Definition at line 491 of file XrdSysIOEvents.hh.

◆ pollTid

pthread_t XrdSys::IOEvents::Poller::pollTid
protected

◆ reqBuff

PipeData XrdSys::IOEvents::Poller::reqBuff
protected

Definition at line 506 of file XrdSysIOEvents.hh.

◆ reqFD

int XrdSys::IOEvents::Poller::reqFD
protected

Definition at line 495 of file XrdSysIOEvents.hh.

Referenced by XrdSys::IOEvents::PollKQ::PollKQ().

◆ tmoBase

Channel* XrdSys::IOEvents::Poller::tmoBase
protected

Definition at line 489 of file XrdSysIOEvents.hh.

◆ tmoMask

unsigned char XrdSys::IOEvents::Poller::tmoMask
protected

Definition at line 509 of file XrdSysIOEvents.hh.

Referenced by Create().


The documentation for this class was generated from the following files: