当前位置:网站首页>Acceptor and tcpserver for source code analysis of Muduo Network Library

Acceptor and tcpserver for source code analysis of Muduo Network Library

2020-12-07 17:13:40 osc_ ohsup7nf

Acceptor

be used for accept One TCP Connect ,accept Notify when you accept success TCP The user's connection .Acceptor Mainly for TcpServer The use of , Its life span is controlled by the latter . One Acceptor It is equivalent to holding a server socket The descriptor , The socket Sure accept Multiple TCP Customer connection , This accept The operation is Acceptor Realized .

There are some packaged socket And address structure , Such as class InetAddress Express sockaddr_in Encapsulation , If you can pass ip Address and port The port generates a sockaddr_in; class Socket It encapsulates part of socket Socket operation , Such as Socket::bindAddress(InetAddress&) take socket And a sockaddr_in Address binding ,Socket::accept(InetAddress& peerAddr) Will a socket Allow connection to a client address peerAddr,Socket::listen() monitor socket,Socket::shutdownWrite() Achieve closure socket The writing of . The encapsulation of these classes can be seen in my analysis of this blog . Muduo Network library source code analysis of socket And the encapsulation of its related operations

Acceptor During construction, a socket The descriptor acceptSocket_( This is a Socket Type i.e socket Of RAII encapsulation ), And through a Channel( Register events and callback functions ) management acceptSocket_::fd member ( namely socket The descriptor ), Once it's time to socket To read is to have TCP Customer connection request , be Channel::handleEvent() Will call Acceptor::hanleRead() perform accept Accept one TCP Customer connection .Acceptor::handleRead() There will be new TCP Client connection and client address through callback function newConnectionCallback(connfd,peerAddr) Pass it on to TCP The user of the customer connection , Usually TcpServer class , The callback function here newConnectionCallback Is in Acceptor::setNewConnectionCallback(newConnectionCallback) designated ,TcpServer When constructing new One Acceptor after , The callback function is specified as TcpServer::newConnection(int sockfd, const InetAddress& peerAddr). It is worth noting that here is the idea of unifying the source of events , That is, through Channel and Poller Manage events .Acceptor::listen() The work is : start-up acceptSocket_::listen() monitor socket The descriptor , And pass Channel::enableReading() take socket The readable event of is registered to Poller In the event set of .

Acceptor.h

#ifndef MUDUO_NET_ACCEPTOR_H
#define MUDUO_NET_ACCEPTOR_H

#include <boost/function.hpp>
#include <boost/noncopyable.hpp>

#include <muduo/net/Channel.h>
#include <muduo/net/Socket.h>

namespace muduo
{
namespace net
{

class EventLoop;
class InetAddress;

///
/// Acceptor of incoming TCP connections.
///
// accept One TCP Connect 
class Acceptor : boost::noncopyable
{
 public:
     /*  Callback after the new connection is established  */
  typedef boost::function<void (int sockfd,
                                const InetAddress&)> NewConnectionCallback;

  Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);
  ~Acceptor();

  /*  Set user task callback  */
  void setNewConnectionCallback(const NewConnectionCallback& cb)
  { newConnectionCallback_ = cb; }

  bool listenning() const { return listenning_; }
  void listen();            //  Start listening   

 private:
  void handleRead();        // listenfd -> Channel Callbacks to readable events on  

  EventLoop* loop_;         //  Where  EventLoop
  Socket acceptSocket_;     // listenfd 
  Channel acceptChannel_;   // listenfd  Corresponding  Channel
  NewConnectionCallback newConnectionCallback_;     //  Callback function to handle new connections ,accept  After the call 
  bool listenning_;         //  Is it  listen 
  int idleFd_;              // placeholder fd, be used for fd Full condition 
};

}
}

#endif  // MUDUO_NET_ACCEPTOR_H

Acceptor.cc

#include <muduo/net/Acceptor.h>

#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/SocketsOps.h>

#include <boost/bind.hpp>

#include <errno.h>
#include <fcntl.h>
//#include <sys/types.h>
//#include <sys/stat.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

/* Acceptor  Data members of include Socket、Channel etc. , Used to accept a connection  */
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
  : loop_(loop),
    /*  establish  listenfd */
    acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
    /*  establish  listenfd  Corresponding  Channel */
    acceptChannel_(loop, acceptSocket_.fd()),
    listenning_(false),
    /*  Open the empty fd, For occupancy  */
    idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
  assert(idleFd_ >= 0);
  acceptSocket_.setReuseAddr(true);     // Set up listenfd  Reuse addr
  acceptSocket_.setReusePort(reuseport);// Reuse port
  acceptSocket_.bindAddress(listenAddr);// binding ip and port
  acceptChannel_.setReadCallback(   // Set up  Channel  The readable callback function of is  handleRead()
      boost::bind(&Acceptor::handleRead, this));
}

Acceptor::~Acceptor()
{
  acceptChannel_.disableAll();
  acceptChannel_.remove();
  ::close(idleFd_);
}

/*  Constructors and listen() Execution creation TCP Traditional steps on the server side  socket bind listen */
void Acceptor::listen()
{   
  loop_->assertInLoopThread();
  listenning_ = true;       // Change the logo 
  acceptSocket_.listen();   //listen
  acceptChannel_.enableReading();//  Register to read Events , Call when there is a read event handleRead()
}

/*  When epoll Listen to the listenfd when , Start executing this callback function  */
void Acceptor::handleRead()
{
  loop_->assertInLoopThread();
  InetAddress peerAddr;
  //FIXME loop until no more
  /* accept  A connection  */
  int connfd = acceptSocket_.accept(&peerAddr);
  if (connfd >= 0)
  {
    // string hostport = peerAddr.toIpPort();
    // LOG_TRACE << "Accepts of " << hostport;
    /*  Call back after accepting the connection  newConnectionCallback_
     *  Send back connfd, establish TcpConnection  Then assign the connection to other threads  */
    if (newConnectionCallback_)
    {
      newConnectionCallback_(connfd, peerAddr);
    }
    else
    {
      sockets::close(connfd);
    }
  }
  else
  {
      /*
       *  In this process fd Can't create for new connections after the upper limit is reached socket The descriptor 
       *  Since there is no socketfd To show the connection , It's impossible close it 
       *  The program continues to run , The next time epoll_wait Will go straight back to , because listenfd It's still readable 
       *  So the program gets stuck in  busy loop
       * 
       *  Handle fd  When it's full , In this way 
       *  It's to take an empty one first fd, Then when fd When it's full 
       *  First close this free file , Get a quota for a file descriptor 
       *  Call again accept Get new socket The descriptor of the connection 
       *  And then immediately close Tune it , This gracefully disconnects the client 
       *  Finally, open a free file again , hold " pit " Occupy , In case it happens again 
       */
    LOG_SYSERR << "in Acceptor::handleRead";
    // Read the section named "The special problem of
    // accept()ing when you can't" in libev's doc.
    // By Marc Lehmann, author of libev.
    if (errno == EMFILE)    //fd The maximum number of 
    {
      ::close(idleFd_);     // Turn off the footprint fd
      idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);// Accept this connection 
      ::close(idleFd_);     // Turn off the 
      idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);// Reopen this fd  placeholder 
    }
  }
}

TcpServer

management accept To obtain the TcpConnection, It's for direct use by users , The lifetime is controlled by the user . Users only need to set up the corresponding callback functions , And then call TcpServer::start() that will do .

Every TCP The client connection consists of a class TcpConenction management ( The receiving and sending of the message is executed ), and TcpServer Is to manage these TcpConnection,TcpConnection Class will be given in a later article .TcpServer hold boost::scoped_ptr< TcpConnection> The pointer to TcpConnectionPtr.

TcpServer Receive a by at construct time ip Address and port Composed of InetAddress Parameters , And send this address to Acceptor Used to receive the address TCP Connection request .TcpServer hold scoped_ptr< Acceptor> acceptor_ Used to receive TcpServer Listen on the port TCP Connection request , Be careful Accpetor Every time accept After the connection, the descriptor of the new connection should be added connfd And address peerAddr Back to the user , here TcpServer By means of accptor_->setNewConnectionCallback(bind(&TcpServer::newConnection,this,_1,_2)) take TcpServer::newConnection Pass to Acceptor,acceptor_ In an interview with TCP The client will call TcpServer::newConnection(connfd,peerAddr), and TcpSrever::newConnection() The main function of is for < connfd,peerAddr> Create a TcpConnection Management should TCP Customer connection , And to TcpConnection Register some callback functions , such as connectionCallback Mainly in the TcpServer Some of the connection handling functions specified by the user in the TcpSrever to TcpConnection Is called in , In addition, there are user specified message processing callbacks, etc., which are all through TcpServer Pass to TcpConnection To carry out in detail . Besides TcpServer::newConnection() It will also carry out TcpConnection::connectEstablished() This function will make this specific TcpConnection Connect the corresponding descriptor connfd Join in poll The event table for , That is, through a Channel Manage a specific TCP Customer connection .

TcpServer use map< string,TcpConnectionPtr> Manage all TCP Customer connection , among string By TcpServer Server address plus a int Constitute means TcpConnectionPtr Name .

TcpServer The callbacks specified by the user in the :

  • connectionCallback(), When TcpConnection Call ( from TcpConnection::connectEstablished() call connectionCallback() ) Used to perform user specified connection callbacks .

  • messageCallback(), When TcpConenction This function is executed when there are network messages Channel::handleEvent() -> TcpConnection::handleRead() -> messageCallback().

  • writeCompleteCallback(), Specified by the user when TCP The callback that is executed when the message on the connection is sent .

These functions are used by the user in TcpServer After being created, it is passed through TcpServer::set*Callback Series of functions registered . When Acceptor Accept a new TCP Execute on connection Acceptor::handleRead()->TcpServer::newConnection()->TcpConnection::set*Callback() This completes the pass for the specified function . How about execution ? This is going to be in TcpConenction Corresponding socket Read when the event is ready / Can be written by Channel::handEvent() Execute these user specified callbacks .

TcpServer::removeConnection() The main functions are from TcpServer Remove one from TcpConnection, But it can't be removed directly , And through the thread transfer function to complete .TcpServer::removeConenction() Will perform EventLoop::runInLoop(bind(&TcpServer::removeConnectionInLoop) -> EventLoop::runInLoop() -> TcpServer::removeConnectionInLoop().TcpServer::removeConenctionInLoop() Will a TcpConnection from TcpServer Remove , And to EventLoop Register callback EventLoop::runInLoop(bind(&TcpConenction::connectDestroyed)), And then execute TcpConnection::connectDestroyed().

TcpServer.h

#ifndef MUDUO_NET_TCPSERVER_H
#define MUDUO_NET_TCPSERVER_H

#include <muduo/base/Atomic.h>
#include <muduo/base/Types.h>
#include <muduo/net/TcpConnection.h>

#include <map>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>

namespace muduo
{
namespace net
{

class Acceptor;
class EventLoop;
class EventLoopThreadPool;

///
/// TCP server, supports single-threaded and thread-pool models.
///
/// This is an interface class, so don't expose too much details.
/* */
class TcpServer : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;
  enum Option
  {
    kNoReusePort,
    kReusePort,
  };

  //TcpServer(EventLoop* loop, const InetAddress& listenAddr);
  /*  Construct to accept a ip and port Composed of InetAddress Parameters , Used for construction  Acceptor(listenfd) */
  TcpServer(EventLoop* loop,
            const InetAddress& listenAddr,
            const string& nameArg,
            Option option = kNoReusePort);
  ~TcpServer();  // force out-line dtor, for scoped_ptr members.

  const string& ipPort() const { return ipPort_; }
  const string& name() const { return name_; }
  EventLoop* getLoop() const { return loop_; }

  /// Set the number of threads for handling input.
  ///
  /// Always accepts new connection in loop's thread.
  /// Must be called before @c start
  /// @param numThreads
  /// - 0 means all I/O in loop's thread, no thread will created.
  ///   this is the default value.
  /// - 1 means all I/O in another thread.
  /// - N means a thread pool with N threads, new connections
  ///   are assigned on a round-robin basis.
  /*  Set the number of threads  */
  void setThreadNum(int numThreads);
  void setThreadInitCallback(const ThreadInitCallback& cb)
  { threadInitCallback_ = cb; }
  /// valid after calling start()
  boost::shared_ptr<EventLoopThreadPool> threadPool()
  { return threadPool_; }

  /// Starts the server if it's not listenning.
  ///
  /// It's harmless to call it multiple times.
  /// Thread safe.
  void start();

  /// Set connection callback.
  /// Not thread safe.
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }

  /// Set message callback.
  /// Not thread safe.
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }

  /// Set write complete callback.
  /// Not thread safe.
  void setWriteCompleteCallback(const WriteCompleteCallback& cb)
  { writeCompleteCallback_ = cb; }

 private:
  /// Not thread safe, but in loop
  /*  When the new connection arrives ,Acceptor Callbacks  newConnection */
  void newConnection(int sockfd, const InetAddress& peerAddr);
  /// Thread safe.
  void removeConnection(const TcpConnectionPtr& conn);
  /// Not thread safe, but in loop
  void removeConnectionInLoop(const TcpConnectionPtr& conn);

  /* TcpConnection The name of the object to the one that points to it share_ptr,TcpServer use map To manage all the connections  */
  typedef std::map<string, TcpConnectionPtr> ConnectionMap;

  /*  To accept tcp Connected EventLoop, If threadnum by 1, It's the only one IO Threads  */
  EventLoop* loop_;     // the acceptor loop
  const string ipPort_; //ip port
  const string name_;   //server  Name 
  /*  Through internal  Acceptor  be responsible for  listenfd  The establishment of and  accept  Connect  */
  boost::scoped_ptr<Acceptor> acceptor_;    // avoid revealing Acceptor  Avoid exposure to users 
  boost::shared_ptr<EventLoopThreadPool> threadPool_;// Thread pool , Each thread runs one EventLoop

  ConnectionCallback connectionCallback_;   // When the connection is established and closed callback
  MessageCallback messageCallback_;         // When the news arrived callback
  WriteCompleteCallback writeCompleteCallback_; // When the message is written to the opposite buffer callback
  ThreadInitCallback threadInitCallback_;   //EventLoop The callback function of thread initialization 

  AtomicInt32 started_;
  // always in loop thread
  int nextConnId_;              // Next connected id, For giving tcp Connection construct name 
  ConnectionMap connections_;   // Use this map All connection management  
};

}
}

#endif  // MUDUO_NET_TCPSERVER_H

TcpServer.c

#include <muduo/net/TcpServer.h>

#include <muduo/base/Logging.h>
#include <muduo/net/Acceptor.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThreadPool.h>
#include <muduo/net/SocketsOps.h>

#include <boost/bind.hpp>

#include <stdio.h>  // snprintf

using namespace muduo;
using namespace muduo::net;

TcpServer::TcpServer(EventLoop* loop,
                     const InetAddress& listenAddr,
                     const string& nameArg,
                     Option option)
  : loop_(CHECK_NOTNULL(loop)),
    /*  from InetAddress Get ip and port */
    ipPort_(listenAddr.toIpPort()),
    /* server  Of  name */
    name_(nameArg),   
    /*  Use the incoming listenAddr structure Acceptor */
    acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),

    threadPool_(new EventLoopThreadPool(loop, name_)),
    /*  Use the default callback function to handle connections and messages   initialization  */
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    /* id  from 1  Start  */
    nextConnId_(1)
{
    /*  take newConnection Pass to acceptor_,acceptor_ After execution accept This function will be called after  */
  acceptor_->setNewConnectionCallback(
      boost::bind(&TcpServer::newConnection, this, _1, _2));
}

TcpServer::~TcpServer()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";

  for (ConnectionMap::iterator it(connections_.begin());
      it != connections_.end(); ++it)
  {
    TcpConnectionPtr conn(it->second);
    it->second.reset();
    conn->getLoop()->runInLoop(
      boost::bind(&TcpConnection::connectDestroyed, conn));
  }
}

/*  Set the number of threads , In this step, you can decide whether to use multi thread or single thread  */
void TcpServer::setThreadNum(int numThreads)
{
  assert(0 <= numThreads);
  threadPool_->setThreadNum(numThreads);
}

/*
 * TcpServer  Startup process 
 * 1.  The thread that starts the thread pool 
 * 2.  Start listen
 * 3.  register listenfd Reading events 
 */
void TcpServer::start()
{
  if (started_.getAndSet(1) == 0)
  {
      /*  Start a IO Threads  */
    threadPool_->start(threadInitCallback_);

    /*  The assertion is not listening  */
    assert(!acceptor_->listenning());
    /*  Start listen */
    loop_->runInLoop(
        boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
  }
}

/* 
 * Acceptor After accepting the connection   Call this callback function  
 *  by <connfd,peerAddr>  Create a TcpConnection object conn To manage the connection 
 *  Add it to  ConnectionMap
 *  Set it up  callback
 *  Call again conn->connectEstablished()
 *  register connfd And call back to the user provided ConnectionCallback
 */
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
  loop_->assertInLoopThread();
  /*  Take one from the thread pool loop  Threads  */
  EventLoop* ioLoop = threadPool_->getNextLoop();
  char buf[64];
  /*  structure tcp The name of the connection   Every TcpConnection  The object has a name  */
  snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
  /* connid++ */
  ++nextConnId_;
  /* Link name format :servername + server.ip+server.port + connid */
  string connName = name_ + buf;

  LOG_INFO << "TcpServer::newConnection [" << name_
           << "] - new connection [" << connName
           << "] from " << peerAddr.toIpPort();
  InetAddress localAddr(sockets::getLocalAddr(sockfd));
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  /*  newly build TcpConnection  object  conn */
  TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));

  /* 
   *  Add it to  ConnectionMap 
   * key  It's connected name,value  For pointing to this object  shared_ptr 
   */
  connections_[connName] = conn;

  /*  Set it up  callback */
  /* TcpConnection  Call  */
  conn->setConnectionCallback(connectionCallback_);
  /*  Call when the message arrives  */
  conn->setMessageCallback(messageCallback_);
  /*  Called when all data is successfully written to the other kernel buffer  */
  conn->setWriteCompleteCallback(writeCompleteCallback_);

  /* TCP Callback function when the connection is closed , For internal use , Cannot be specified by the user   */
  conn->setCloseCallback(
      boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
  /* 
   *  stay loop Create in thread tcp The process of connecting 
   *  It's mainly about setting up tcp state , Register to read Events 
   *  And perform tcp  Set up a callback function connectionCallback_()
   */
  ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
}

/*  from  TcpServer  Remove one from  TcpConnection */
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
  // FIXME: unsafe
  // 
  loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
  loop_->assertInLoopThread();
  LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
           << "] - connection " << conn->name();
  /*  from  TcpServer  Delete this  TcpConnection */
  size_t n = connections_.erase(conn->name());
  (void)n;
  assert(n == 1);
  EventLoop* ioLoop = conn->getLoop();
  /*  stay  ioLoop  In the implementation of  connectDestroyed() */
  ioLoop->queueInLoop(
      boost::bind(&TcpConnection::connectDestroyed, conn));
}

版权声明
本文为[osc_ ohsup7nf]所创,转载请带上原文链接,感谢
https://chowdera.com/2020/12/202012071713131360.html