当前位置:网站首页>Implementation of timer for source code analysis of Muduo Network Library

Implementation of timer for source code analysis of Muduo Network Library

2020-12-07 17:13:41 osc_ ylezri59

muduo The timer function consists of three class Realization ,TimerId、Timer and TimerQueue.

TimerId class

It uniquely identifies a Timer Timer .TimerId Class At the same time Timer* and sequence_, This sequence_ Is each Timer Object has a globally incremented sequence number int64_t sequence_, Using atomic counter (AtomicInt64) Generate .
It is mainly used to log off the timer , In this way, we can distinguish two addresses in the same order Timer object .

namespace muduo
{
namespace net
{

class Timer;

///
/// An opaque identifier, for canceling Timer.
///
/*  With a unique identification Timer, Mainly used to cancel Timer */
class TimerId : public muduo::copyable
{
 public:
  TimerId()
    : timer_(NULL),
      sequence_(0)
  {
  }

  TimerId(Timer* timer, int64_t seq)
    : timer_(timer),    //timer  The pointer to the timer 
      sequence_(seq)    //seq  The serial number of the timed task 
  {
  }

  // default copy-ctor, dtor and assignment are okay

  friend class TimerQueue;

 private:
  Timer* timer_;
  int64_t sequence_;
};

}
}

Timer class

Encapsulates some parameters of the timer , Including time out (expiration_)、 Timeout callback function (callback_)、 The time interval (interval_)、 Whether to repeat the timing (repeat_)、 The serial number of timer and other member variables , Most member functions return the values of these variables ,run() Used to call callback functions ,restart() To restart the timer .

Timer.h

namespace muduo
{
namespace net
{
///
/// Internal class for timer event.
///
/*  Timer  */
class Timer : boost::noncopyable
{
 public:
  Timer(const TimerCallback& cb, Timestamp when, double interval)
    : callback_(cb),
      expiration_(when),
      interval_(interval),
      repeat_(interval > 0.0),
      sequence_(s_numCreated_.incrementAndGet())
  { }

#ifdef __GXX_EXPERIMENTAL_CXX0X__
  Timer(TimerCallback&& cb, Timestamp when, double interval)
    : callback_(std::move(cb)),
      expiration_(when),
      interval_(interval),
      repeat_(interval > 0.0),
      sequence_(s_numCreated_.incrementAndGet())
  { }
#endif

  void run() const
  {
    callback_();    // Execute timer callback function 
  }

  /*  Returns the timeout timestamp of the timer  */
  Timestamp expiration() const  { return expiration_; }
  /*  Whether it is periodically timed  */
  bool repeat() const { return repeat_; }
  /*  Return the serial number of this timer  */
  int64_t sequence() const { return sequence_; }

  /*  Restart timer  */
  void restart(Timestamp now);

  static int64_t numCreated() { return s_numCreated_.get(); }

 private:
  const TimerCallback callback_;    // Timeout callback function 
  Timestamp expiration_;            // Timeout time stamp 
  const double interval_;           // The time interval , If it's a one-time timer , The value is 0
  const bool repeat_;               // Is it repeated 
  const int64_t sequence_;          // The serial number of this timed task 

  static AtomicInt64 s_numCreated_; // Timer count , The current number of timers that have been created 
};
}
}

Timer.cc

#include <muduo/net/Timer.h>

using namespace muduo;
using namespace muduo::net;

AtomicInt64 Timer::s_numCreated_;

void Timer::restart(Timestamp now)
{
  if (repeat_)
  {
      // If you need to repeat , Then set the time to the next timeout 
    expiration_ = addTime(now, interval_);
  }
  else
  {
      // If you don't need to repeat , Set the timeout to an unavailable one  value
    expiration_ = Timestamp::invalid();
  }
}

TimerQueue class

Timer queue , Used to manage all timers , There are only two interfaces for this class : Add and log off timers , Respectively addTimer() and cancel().

TimerQueue The choice of data structure . You need to efficiently organize Timer, It can quickly find the due date according to the current time Timer, It's also important to be able to efficiently add and delete Timer. Finally, I chose set < pair<TimeStamp,Timer*> >, use pair by key The reason is that there may be more than one of the same Timestamp Timestamp timeout , And the search only returns one , So even if two Timer The timeout for is the same , Their addresses have to be different .

By giving timerfd A timeout time to achieve timeout timing , It has Channel, adopt Channel management timerfd, And then to EventLoop and Poller register timerfd Read Events , When timerfd When the readable event of is ready, it indicates that a timeout has arrived , And then call timerfdChannel_ Read event callback for handleRead(), adopt getExpired() Find out all the timeout Events , Then execute the corresponding timeout callback function Timer::run(). To reuse timers , After each treatment , It will check whether these timeouts need to be repeated , If you need to repeat , Add it to the timer set again .

timerfd How to achieve multiple timers timed out ? Save the timer every time set The container inserts a timer Timer When it comes to set The timeout of the header element of , If the newly inserted timeout is small , Update timerfd Time for , To ensure that timerfd Always be set The most recent timeout in . When timerfd Readable time , You need to traverse the container set, Because there may be more than one Timer It's overtime ( Even though tiemrfd Is the current minimum timeout ). The key here is to adopt timerfd Implement unified event source .

TimerQueue.h

namespace muduo
{
namespace net
{

class EventLoop;
class Timer;
class TimerId;

///
/// A best efforts timer queue.
/// No guarantee that the callback will be on time.
///
/*  Timer queue  */
class TimerQueue : boost::noncopyable
{
 public:
  explicit TimerQueue(EventLoop* loop);
  ~TimerQueue();

  ///
  /// Schedules the callback to be run at given time,
  /// repeats if @c interval > 0.0.
  ///
  /// Must be thread safe. Usually be called from other threads.
  /*  Add a timer  */
  TimerId addTimer(const TimerCallback& cb,
                   Timestamp when,
                   double interval);
#ifdef __GXX_EXPERIMENTAL_CXX0X__
  TimerId addTimer(TimerCallback&& cb,
                   Timestamp when,
                   double interval);
#endif

  /*  Log off a timer  */
  void cancel(TimerId timerId);

 private:

  // FIXME: use unique_ptr<Timer> instead of raw pointers.
  // This requires heterogeneous comparison lookup (N3465) from C++14
  // so that we can find an T* in a set<unique_ptr<T>>.
  typedef std::pair<Timestamp, Timer*> Entry;   // For a timed task 
  typedef std::set<Entry> TimerList;            // Timed task set , use set, Yes key nothing value, And orderly 
  typedef std::pair<Timer*, int64_t> ActiveTimer; // Here's an explanation   
  typedef std::set<ActiveTimer> ActiveTimerSet;

  void addTimerInLoop(Timer* timer);    // Add a scheduled task 
  void cancelInLoop(TimerId timerId);   // Log off a timer 
  // called when timerfd alarms
  void handleRead();                    //timerfd  Can be read   The callback 
  // move out all expired timers
  std::vector<Entry> getExpired(Timestamp now); // Get all timeouts 
  /*  Reset timeout timer  */
  void reset(const std::vector<Entry>& expired, Timestamp now);

  bool insert(Timer* timer);    // Plug in the timer TimerList in 

  EventLoop* loop_;             //TimerQueue  Of  EventLoop
  const int timerfd_;           //  Inside  timerfd 
  Channel timerfdChannel_;      //timerfd  Corresponding Channel, To observe timerfd_  Upper readable event 
  // Timer list sorted by expiration
  TimerList timers_;            // All the scheduled tasks 

  // for cancel()
  // timers_  And  activeTimers_  They all have the same Timer  Address 
  // timers_  It's sorted by time-out ,activeTimers_  It's sorted by timer address 
  ActiveTimerSet activeTimers_;
  bool callingExpiredTimers_; /* atomic */// Is it in   Processing timer timeout callback 
  ActiveTimerSet cancelingTimers_; // Save the logged out timer 
};
}
}

TimerQueue.cc

TimerQueue::TimerQueue(EventLoop* loop)
  : loop_(loop),
    timerfd_(createTimerfd()),
    timerfdChannel_(loop, timerfd_),
    timers_(),
    callingExpiredTimers_(false)
{
  timerfdChannel_.setReadCallback(
      boost::bind(&TimerQueue::handleRead, this));// Set up timerfd The readable event callback function is handleRead
  // we are always reading the timerfd, we disarm it with timerfd_settime.
  timerfdChannel_.enableReading();  //timerfd  Register readable Events 
}

TimerQueue::~TimerQueue()
{
  timerfdChannel_.disableAll();
  timerfdChannel_.remove();
  ::close(timerfd_);
  // do not remove channel, since we're in EventLoop::dtor();
  for (TimerList::iterator it = timers_.begin();
      it != timers_.end(); ++it)
  {
    delete it->second; // Hand release Timer*
  }
}

/*  Add timing task , Return the unique ID of this timer  */
TimerId TimerQueue::addTimer(const TimerCallback& cb,
                             Timestamp when,
                             double interval)
{
    /* new  A timer object  interval  Greater than 0 , It's a timer that needs to be repeated  */
  Timer* timer = new Timer(cb, when, interval);
  /* 
   * runInLoop  It means   If Ben IO If a thread wants to add a timer, it can be done directly by  addTimerInLoop  add to 
   *  If it is other threads to IO Adding timers to a thread needs to be done indirectly through  queueInLoop add to 
   */
  loop_->runInLoop(
      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
  return TimerId(timer, timer->sequence());
}

#ifdef __GXX_EXPERIMENTAL_CXX0X__
TimerId TimerQueue::addTimer(TimerCallback&& cb,
                             Timestamp when,
                             double interval)
{
    //  Right value semantics 
  Timer* timer = new Timer(std::move(cb), when, interval);
  loop_->runInLoop(
      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
  return TimerId(timer, timer->sequence());
}
#endif
/*  Log off a timer , By EventLoop::cancel(TimerId timerId) call  */
void TimerQueue::cancel(TimerId timerId)
{
  loop_->runInLoop(
      boost::bind(&TimerQueue::cancelInLoop, this, timerId));
}

/* IO Threads add timers to themselves  */
void TimerQueue::addTimerInLoop(Timer* timer)
{
  loop_->assertInLoopThread();
  bool earliestChanged = insert(timer); // If the timer currently inserted   Earlier than the timers in the queue   Then return to true 

  if (earliestChanged)      // The earliest timeout changed , You need to reset timerfd_ Timeout for 
  {
    resetTimerfd(timerfd_, timer->expiration()); //timerfd_  Reset timeout 
  }
}

void TimerQueue::cancelInLoop(TimerId timerId)
{
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  ActiveTimer timer(timerId.timer_, timerId.sequence_);
  ActiveTimerSet::iterator it = activeTimers_.find(timer);  // Find the timer 
  if (it != activeTimers_.end()) //  eureka 
  {
      /*  from  timers_  and  activeTimers_  Delete */
    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
    assert(n == 1); (void)n;
    delete it->first; // FIXME: no delete please    // Manual  delete 
    activeTimers_.erase(it);
  }
  else if (callingExpiredTimers_) // Maybe we're dealing with 
  {
      /*  First of all   Insert the timer to be logged off  */
    cancelingTimers_.insert(timer);
  }
  assert(timers_.size() == activeTimers_.size());
}

/* timerfd  Callback function for readable events  */
void TimerQueue::handleRead()
{
  loop_->assertInLoopThread();
  Timestamp now(Timestamp::now());
  readTimerfd(timerfd_, now);

  /*  Find all timeout Events  */
  std::vector<Entry> expired = getExpired(now);

  callingExpiredTimers_ = true;
  cancelingTimers_.clear();
  // safe to callback outside critical section
  for (std::vector<Entry>::iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    it->second->run();  // Execute callback of timeout timer 
  }
  callingExpiredTimers_ = false;

  reset(expired, now); // Reset timer , If you don't need to schedule again , Just delete , Otherwise, it's time again 
}

/*  Get the timer that timed out in the queue  */
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
  assert(timers_.size() == activeTimers_.size());
  std::vector<Entry> expired;   // The container that holds the timeout timer 
  Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX)); // Sentinels are worth 
  TimerList::iterator end = timers_.lower_bound(sentry);    // Returns the first... That did not time out Timer The iterator 
  assert(end == timers_.end() || now < end->first);         // No overtime or found 
  std::copy(timers_.begin(), end, back_inserter(expired));  // Copy the timeout timer to  expired  In the container 
  timers_.erase(timers_.begin(), end);                      // Set the timeout timer from timers_ Delete 

  for (std::vector<Entry>::iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    ActiveTimer timer(it->second, it->second->sequence());
    size_t n = activeTimers_.erase(timer);      //  The timer that will time out   from  activeTimers_  Delete 
    assert(n == 1); (void)n;
  }

  assert(timers_.size() == activeTimers_.size());   //  After deleting them all  size  It should be the same 
  return expired;   // Returns the part of the timer that timed out 
}

/*  After the timed task of timeout callback has been executed , Check if these timers need to be repeated  */
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
  Timestamp nextExpire;

  for (std::vector<Entry>::const_iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    ActiveTimer timer(it->second, it->second->sequence());
    if (it->second->repeat()    //  Need to repeat   and   There's no cancellation 
        && cancelingTimers_.find(timer) == cancelingTimers_.end())
    {
        /*  Change the timeout time of the timer to the next timeout time  */
      it->second->restart(now);
      insert(it->second);   // Re insert into timer container 
    }
    else
    {
      // FIXME move to a free list
      //  Delete without repetition 
      delete it->second; // FIXME: no delete please
    }
  }

  if (!timers_.empty())
  {
      /*  Gets the timestamp of the earliest timer in the current timer collection , As the next timeout */
    nextExpire = timers_.begin()->second->expiration();
  }

  if (nextExpire.valid())
  {
    resetTimerfd(timerfd_, nextExpire);     // Reset  timerfd_  Timeout for 
  }
}

/*  towards  set  Insert a new timer in  */
bool TimerQueue::insert(Timer* timer)
{
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  bool earliestChanged = false;             //  The earliest timeout   Whether it has been changed 
  Timestamp when = timer->expiration();     // New insertion  timer  Timeout for 
  TimerList::iterator it = timers_.begin(); //  The earliest timed tasks today 
  if (it == timers_.end() || when < it->first)
  {
    earliestChanged = true; 
    // If timers_ It's empty , perhaps when  It's smaller than the earliest timed task at present , So the earliest timeout , It definitely needs to be changed 
  }
  {
      /*  towards  timers_  Insert timed tasks in  */
    std::pair<TimerList::iterator, bool> result
      = timers_.insert(Entry(when, timer));
    assert(result.second); (void)result;
  }
  {
      /*  towards  activeTimers_  Insert timed tasks in  */
    std::pair<ActiveTimerSet::iterator, bool> result
      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
    assert(result.second); (void)result;
  }

  /*  After insertion , The number of container elements should be the same  */
  assert(timers_.size() == activeTimers_.size());
  return earliestChanged;   // Return to modify flag , Indicates that the latest timeout has changed 
}

Timer interface

EventLoop There are four interfaces in the timer to use , Three are adding timers , All in turn call TimerQueue::addTimer(); There's also a logoff timer . as follows :

/*  At the time stamp  time  At all times ,0.0  Means not to repeat  */
TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
{
  return timerQueue_->addTimer(cb, time, 0.0);
}

/*  Delay  delay  Time execution  */
TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
{
  Timestamp time(addTime(Timestamp::now(), delay));
  return runAt(time, cb);
}

/*  Repeat timer , The time interval is  interval */
TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
{
  Timestamp time(addTime(Timestamp::now(), interval));
  return timerQueue_->addTimer(cb, time, interval);
}

/*  Log off timer , Call directly  TimerQueue::cancel() */
void EventLoop::cancel(TimerId timerId)
{
  return timerQueue_->cancel(timerId);
}

timerfd Related operations of

timerfd yes Linux Provides a timer interface for the user program , Abstract the timer as a file descriptor , Timeout notification via readable event of file descriptor , The file becomes readable at the moment of timeout , So that you can perfectly blend into select/poll In the frame , Deal with... In a unified way I/O And timing events . At the same time, its time precision is compared with that of select/poll Of timeout Higher ,timeout The timing accuracy is only milliseconds .

It provides three timerfd C API:

#include <sys/timerfd.h> 
int timerfd_create(int clockid, int flags); 
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value,struct itimerspec *old_value); 
int timerfd_gettime(int fd, struct itimerspec *curr_value);

For usage and test examples, please refer to :
http://xiaorui.cc/2016/07/29/%E5%9F%BA%E4%BA%8Etimerfd-epoll%E5%BC%80%E5%8F%91%E7%9A%84io%E5%AE%9A%E6%97%B6%E5%99%A8/

So let's see TimerQueue in timerfd Related operations of .

/*  establish  timerfd */
int createTimerfd()
{
  int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
                                 TFD_NONBLOCK | TFD_CLOEXEC);
  if (timerfd < 0)
  {
    LOG_SYSFATAL << "Failed in timerfd_create";
  }
  return timerfd;
}

/*  Calculate the time difference between the timeout and the current time , And convert the parameters to  api  The type of acceptance   */
struct timespec howMuchTimeFromNow(Timestamp when)
{
    /*  Microseconds  =  Timeout microseconds  -  Microseconds at the current moment  */
  int64_t microseconds = when.microSecondsSinceEpoch()
                         - Timestamp::now().microSecondsSinceEpoch();
  if (microseconds < 100)
  {
    microseconds = 100;
  }
  struct timespec ts;   // convert to  struct timespec  Structure returns 
  ts.tv_sec = static_cast<time_t>(
      microseconds / Timestamp::kMicroSecondsPerSecond);
  ts.tv_nsec = static_cast<long>(
      (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
  return ts;
}

/*  read timerfd, Avoid timer events from triggering all the time  */
void readTimerfd(int timerfd, Timestamp now)
{
  uint64_t howmany;
  ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
  LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
  if (n != sizeof howmany)
  {
    LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
  }
}

/*  Reset timer timeout  */
void resetTimerfd(int timerfd, Timestamp expiration)
{
  // wake up loop by timerfd_settime()
  struct itimerspec newValue;
  struct itimerspec oldValue;
  bzero(&newValue, sizeof newValue);
  bzero(&oldValue, sizeof oldValue);
  newValue.it_value = howMuchTimeFromNow(expiration);
  int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); // After that time , A timed event is generated 
  if (ret)
  {
    LOG_SYSERR << "timerfd_settime()";
  }
}

That's all muduo The realization of timer , I'm writing it myself http server When the timer is implemented with the smallest heap , If you are interested, you can have a look at :
https://github.com/Tanswer/Xserver/blob/master/src/timer.h

版权声明
本文为[osc_ ylezri59]所创,转载请带上原文链接,感谢
https://chowdera.com/2020/12/20201207171313139v.html