当前位置:网站首页>Mongodb kernel source code implementation, performance tuning, best operation and maintenance practice series command processing module source code implementation 1

Mongodb kernel source code implementation, performance tuning, best operation and maintenance practice series command processing module source code implementation 1

2020-11-09 22:32:15 Yang Yaya - focus on mongodb and high performance Middleware

About author

      Former didi travel technology expert , The current OPPO Document database mongodb person in charge , be responsible for oppo Tens of millions of peaks TPS/ One hundred billion level document database mongodb Kernel development and operation and maintenance work , Always focused on distributed caching 、 High performance server 、 database 、 Middleware and other related research and development . Continue to share 《MongoDB Kernel source code design 、 performance optimization 、 Best O & M practices 》,Github Account address :https://github.com/y123456yz

  1. background

      <<transport_layer Network transport layer module source code implementation >> I share mongodb Kernel underlying network IO Processing related implementations , Including socket initialization 、 A complete mongodb Message reading 、 Get DB Send data to client, etc .Mongodb Support a variety of growth 、 Delete 、 Change 、 check 、 Aggregate processing 、cluster Handling, etc , Each operation corresponds to one in the kernel implementation command, Every command There are different functions ,mongodb How the kernel works command Source code processing will be the focus of this paper

      Besides ,mongodb Provides mongostat Tool to monitor various operation statistics of the current cluster .Mongostat The monitoring statistics are shown in the figure below :

      among ,insert、delete、update、query These four statistics are easy to understand , They are respectively increased 、 Delete 、 Change 、 check . however ,comand、getmore It's not easy to understand ,command What statistics does it stand for ?getMore What statistics does it stand for ?, These two are relatively difficult to understand .

      Besides , Through the text analysis , We will understand the specific meaning of these six statistics , At the same time, make sure that the six statistics are counted by those operations .

      Command Command processing module is divided into :mongos Operation command 、mongod Operation command 、mongodb Cluster internal commands , The specific definitions are as follows :

  • mongos Operation command , The client can use the mongos Access cluster related commands .
  • mongod Operation command : The client can use the mongod Copy sets and cfg server Access cluster related commands .
  • mongodb Cluster internal commands :mongos、mongod、mongo-cfg Commands for interaction between cluster instances .

      Command The core code of command processing module is implemented as follows :

     《command Command processing module source code implementation 》 Related articles focus on the core code implementation of command processing module , That is to say, the command processing source file implementation in the above screenshot .

2. <<transport_layer Network transport layer module source code implementation >> Connect and review

      <<transport_layer Network transport layer module source code implementation >> In the article , We are right. service_state_machine State machine scheduling sub module is analyzed , In this module dealTask The task is going on mongodb Internal business logic processing , Its core implementation is as follows :

1.//dealTask Handle   
2.void ServiceStateMachine::_processMessage(ThreadGuard guard) {  
3.    ......
4.    //command Handle 、DB After accessing the data through dbresponse return   
5.    DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);  
6.    ......
7.}

      above _sep Corresponding mongod perhaps mongos Instance's service entry implementation , The _seq Members are initialized to... In the following code ServiceEntryPointMongod and ServiceEntryPointMongod Class implementation .SSM State machine _seq The core code of member initialization assignment is as follows :

1.//mongos Instance start initialization   
2.static ExitCode runMongosServer() {  
3.    ......  
4.    //mongos Instance corresponding sep by ServiceEntryPointMongos  
5.    auto sep = stdx::make_unique<ServiceEntryPointMongos>(getGlobalServiceContext());  
6.    getGlobalServiceContext()->setServiceEntryPoint(std::move(sep));  
7.    ......  
8.}  
9.  
10.//mongod Instance start initialization   
11.ExitCode _initAndListen(int listenPort) {  
12.    ......  
13.    //mongod Instance corresponding sep by ServiceEntryPointMongod  
14.    serviceContext->setServiceEntryPoint(  
15.        stdx::make_unique<ServiceEntryPointMongod>(serviceContext));  
16.    ......  
17.}  
18.  
19.//SSM State machine initialization   
20.ServiceStateMachine::ServiceStateMachine(...)  
21.    : _state{State::Created},  
22.      //mongod and mongos The service entry of the instance is assigned to _seq Member variables   
23.      _sep{svcContext->getServiceEntryPoint()},  
24.      ......  
} 

          Through the above several core interfaces , hold mongos and mongod Two instances of service entry and state machine SSM(ServiceStateMachine) Connect , Finally and below command Command processing module Association .

     dealTask Do it once. mongodb Internal logical processing of requests , The treatment is made by _sep->handleRequest() Interface implementation . because mongos and mongod The service entrance is made up of ServiceEntryPointMongos and ServiceEntryPointMongod Two classes implement , therefore dealTask Also evolved into the following interface processing :

  • mongos example :ServiceEntryPointMongos::handleRequest(...)
  • Mongod example ::ServiceEntryPointMongod::handleRequest(...)

      The input parameters of these two interfaces are OperationContext and Message, Corresponding to the operation context respectively 、 Request raw data content . The following will analyze Message Parsing implementation 、OperationContext The service context implementation will be analyzed in the following sections .

      Mongod and mongos The instance service entry class is inherited from the network transmission module ServiceEntryPointImpl class , As shown in the figure below :

      Tips: mongos and mongod Why does the service entry class inherit the network transport module service entry class ?

      The reason is that a request corresponds to a link session, The session The corresponding request and SSM The state machine only corresponds to . All client requests correspond to SSM Save all the state machine information and ServiceEntryPointImpl._sessions Among members , and command The command processing module is SSM State machine task dealTask Mission , Through this inheritance relationship ,ServiceEntryPointMongod and ServiceEntryPointMongos Subclasses can then be associated with state machines and task processing , At the same time, you can also get the current request corresponding to session Link information .

3. Mongodb Protocol analysis

stay 《transport_layer Network transport layer module source code implementation 2 》 The data receiving and transmitting sub module in has completed a complete mongodb The reception of messages , One mongodb The message was sent by Header Head +opCode Inclusion composition , As shown in the figure below :

      The fields in the above figure are described in the following table :

      opCode There are many values , In previous releases OP_INSERT、OP_DELETE、OP_UPDATE、OP_QUERY For addition, deletion, modification and query requests ,Mongodb from 3.6 The version starts to use by default OP_MSG Operation as default opCode, Is an extensible message format , Designed to include other opcodes , The new version of the read and write request protocol is corresponding to the opcode . This article takes OP_MSG The operation code corresponding protocol is analyzed as an example , Other opcode protocol analysis process is similar to ,OP_MSG The format of the request agreement is as follows :

1.OP_MSG {  
2.    //mongodb Message header   
3.    MsgHeader header;            
4.    // Bitmap , It is used to identify whether the message needs to be checked   Whether a response is needed, etc   
5.    uint32 flagBits;           // message flags  
6.    // Message content , for example find write Wait for the content of the command to pass bson The format exists in the structure   
7.    Sections[] sections;       // data sections  
8.    // message CRC check   
9.    optional<uint32> checksum; // optional CRC-32C checksum  
} 

      OP_MSG Each field is described in the following table :

      A complete OP_MSG The request format is as follows :

      Except for the universal head header Outside , Client command requests are actually saved in sections Field , This field holds the original request bson Format data .BSON By 10gen Developed a data format , At present, it is mainly used for MongoDB in , yes MongoDB Data storage format .BSON be based on JSON Format , choice JSON The main reason for the transformation is JSON The generality of and JSON Of schemaless Characteristics of .BSON comparison JSON It has the following characteristics

  • Lightweight( More lightweight )
  • Traversable( Easy to operate )
  • Efficient( Efficient performance )

      The point of this article is not to analyze bson Form of agreement ,bson Details of the protocol implementation will be shared in the following chapters .bson More design details of the agreement can be found in :http://bsonspec.org/

      summary : A complete mongodb The message was sent by header+body form , among header Length fixed to 16 byte ,body The length is equal to messageLength-16.Header Part of the agreement is interpreted by message.cpp and message.h Two source files to achieve ,body Part of it corresponds to OP_MSG Class request resolution by op_msg.cpp and op_msg.h Two source files to achieve .

3. mongodb Message general header parsing and encapsulation source code implementation

      Header The head is resolved by src/mongo/util/net Under the table of contents message.cpp and message.h Two files are finished , This class is mainly used for general purpose header Head and body Partial analysis 、 encapsulation . Therefore, the message header core code is divided into the following two categories :

  • Message header content analysis and encapsulation (MSGHEADER Namespace implementation )
  • Head and body Content analysis and encapsulation (MsgData Namespace implementation )

3.1 mongodb Message header parsing and encapsulation core code implementation

      mongodb The message header is parsed by namespace MSGHEADER {...} Realization , The main members and interfaces of this class are implemented as follows :

1.namespace MSGHEADER {  
2.//header Information of each field in the header   
3.struct Layout {  
4.    // Whole message length , Include header Length and body length   
5.    int32_t messageLength;     
6.    //requestID  The request id Information   
7.    int32_t requestID;         
8.    //getResponseToMsgId analysis   
9.    int32_t responseTo;        
10.    // Operation type :OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE、OP_MSG etc.   
11.    int32_t opCode;  
12.};  
13.  
14.//ConstView Realization header Header data parsing   
15.class ConstView {   
16.public:  
17.    ......  
18.    // Initialization construct   
19.    ConstView(const char* data) : _data(data) {}  
20.    // obtain _data Address   
21.    const charview2ptr() const {  
22.        return data().view();  
23.    }  
24.    //TransportLayerASIO::ASIOSourceTicket::_headerCallback call   
25.    // analysis header The head of the messageLength Field   
26.    int32_t getMessageLength() const {  
27.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, messageLength));  
28.    }  
29.    // analysis header The head of the requestID Field   
30.    int32_t getRequestMsgId() const {  
31.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, requestID));  
32.    }  
33.    // analysis header The head of the getResponseToMsgId Field   
34.    int32_t getResponseToMsgId() const {  
35.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, responseTo));  
36.    }  
37.    // analysis header The head of the opCode Field   
38.    int32_t getOpCode() const {  
39.        return data().read<LittleEndian<int32_t>>(offsetof(Layout, opCode));  
40.    }  
41.  
42.protected:  
43.    //mongodb Message data start address   
44.    const view_type& data() const {  
45.        return _data;  
46.    }  
47.private:  
48.    // Data section   
49.    view_type _data;  
50.};  
51.  
52.//View fill header The head of data   
53.class View : public ConstView {  
54.public:  
55.    ......  
56.    // Construct initialization   
57.    View(char* data) : ConstView(data) {}  
58.    //header Initial address   
59.    charview2ptr() {  
60.        return data().view();  
61.    }  
62.    // The following four interfaces are used header fill   
63.    // fill header Head messageLength Field   
64.    void setMessageLength(int32_t value) {  
65.        data().write(tagLittleEndian(value), offsetof(Layout, messageLength));  
66.    }  
67.    // fill header Head requestID Field   
68.    void setRequestMsgId(int32_t value) {  
69.        data().write(tagLittleEndian(value), offsetof(Layout, requestID));  
70.    }  
71.    // fill header Head responseTo Field   
72.    void setResponseToMsgId(int32_t value) {  
73.        data().write(tagLittleEndian(value), offsetof(Layout, responseTo));  
74.    }  
75.    // fill header Head opCode Field   
76.    void setOpCode(int32_t value) {  
77.        data().write(tagLittleEndian(value), offsetof(Layout, opCode));  
78.    }  
79.private:  
80.    // Point to header Initial address   
81.    view_type data() const {  
82.        return const_cast<char*>(ConstView::view2ptr());  
83.    }  
84.};  
85.}

      From the above header Head analysis 、 The filled implementation class shows that ,header The head is resolved by MSGHEADER::ConstView Realization ;header The head is filled with MSGHEADER::View complete . In fact, code implementation , adopt offsetof To shift , So as to quickly locate the corresponding field of the header .

3.2 mongodb Message header +body Parse and encapsulate the core code implementation

      Namespace MSGHEADER{...} The namespace is responsible only for header The treatment of the head ,namespace MsgData{...} The namespace is relative to MSGHEADER The namespace is more complete , In addition to handling header parsing encapsulation , Also responsible for body Data start address maintenance 、body Data encapsulation 、 Data length check, etc .MsgData The kernel code implementation of the namespace is as follows :

1.namespace MsgData {  
2.struct Layout {  
3.    // Data filling consists of :header part   
4.    MSGHEADER::Layout header;  
5.    // Data filling consists of : body part ,body First use data Take the place   
6.    char data[4];  
7.};  
8.  
9.// analysis header Field information and body In fact, the address information   
10.class ConstView {  
11.public:  
12.    // Initialization construct   
13.    ConstView(const char* storage) : _storage(storage) {}  
14.    // Get data start address   
15.    const charview2ptr() const {  
16.        return storage().view();  
17.    }  
18.  
19.    // The following four interfaces indirectly perform the preceding MSGHEADER In the header field parsing   
20.    // fill header Head messageLength Field   
21.    int32_t getLen() const {  
22.        return header().getMessageLength();  
23.    }  
24.    // fill header Head requestID Field   
25.    int32_t getId() const {  
26.        return header().getRequestMsgId();  
27.    }  
28.    // fill header Head responseTo Field   
29.    int32_t getResponseToMsgId() const {  
30.        return header().getResponseToMsgId();  
31.    }  
32.    // Get the... In the network data message opCode Field   
33.    NetworkOp getNetworkOp() const {  
34.        return NetworkOp(header().getOpCode());  
35.    }  
36.    // Point to body Initial address   
37.    const chardata() const {  
38.        return storage().view(offsetof(Layout, data));  
39.    }  
40.    //messageLength Length check ,opcode Check   
41.    bool valid() const {  
42.        if (getLen() <= 0 || getLen() > (4 * BSONObjMaxInternalSize))  
43.            return false;  
44.        if (getNetworkOp() < 0 || getNetworkOp() > 30000)  
45.            return false;  
46.        return true;  
47.    }  
48.    ......  
49.protected:  
50.    // obtain _storage  
51.    const ConstDataView& storage() const {  
52.        return _storage;  
53.    }  
54.    // Point to header Initial address   
55.    MSGHEADER::ConstView header() const {  
56.        return storage().view(offsetof(Layout, header));  
57.    }  
58.private:  
59.    //mongodb The message is stored here   
60.    ConstDataView _storage;  
61.};  
62.  
63.// Fill in the data , Include Header and body  
64.class View : public ConstView {  
65.public:  
66.    // Construct initialization   
67.    View(char* storage) : ConstView(storage) {}  
68.    ......  
69.    // Get message start address   
70.    charview2ptr() {  
71.        return storage().view();  
72.    }  
73.  
74.    // The following four interfaces indirectly perform the preceding MSGHEADER The header field structure in   
75.    // The following four interfaces are completed msg header assignment   
76.    // fill header Head messageLength Field   
77.    void setLen(int value) {  
78.        return header().setMessageLength(value);  
79.    }  
80.    // fill header Head messageLength Field   
81.    void setId(int32_t value) {  
82.        return header().setRequestMsgId(value);  
83.    }  
84.    // fill header Head messageLength Field   
85.    void setResponseToMsgId(int32_t value) {  
86.        return header().setResponseToMsgId(value);  
87.    }  
88.    // fill header Head messageLength Field   
89.    void setOperation(int value) {  
90.        return header().setOpCode(value);  
91.    }  
92.  
93.    using ConstView::data;  
94.    // Point to data  
95.    chardata() {  
96.        return storage().view(offsetof(Layout, data));  
97.    }  
98.private:  
99.    // It's the starting address of the message   
100.    DataView storage() const {  
101.        return const_cast<char*>(ConstView::view2ptr());  
102.    }  
103.    // Point to header Head   
104.    MSGHEADER::View header() const {  
105.        return storage().view(offsetof(Layout, header));  
106.    }  
107.};  
108.  
109.......  
110.//Value For the front Layout, reduce 4 Because there is 4 Byte padding data, So this is header length   
111.const int MsgDataHeaderSize = sizeof(Value) - 4;  
112.  
113.// The length of the data section after removing the header   
114.inline int ConstView::dataLen() const {   
115.    return getLen() - MsgDataHeaderSize;  
116.}  
117.}  // namespace MsgData  

        and MSGHEADER The namespace is compared to ,MsgData This namespace The implementation of the namespace interface and the preceding MSGHEADER The implementation of a namespace is similar .MsgData Not only deal with header Analytical assembly of the head , Also responsible for body Some data header pointers point to 、 Head length check 、opCode Check 、 Data filling, etc . among ,MsgData In the namespace header Parsing the head constructs the underlying dependency MSGHEADER Realization .

3.3 Message/DbMessage Core code implementation

      stay 《transport_layer Network transport layer module source code implementation 2 》 in , From the bottom ASIO Library received mongodb The message is stored in Message Storage in structure , Finally, it is stored in ServiceStateMachine._inMessage Among members .

      At the front of 2 We know mongod and mongso Instance's service entry interface handleRequest(...) It's all with Message Enter the reference , That's what you get Message Data is processed through this interface .Message Class main interface implementation is as follows :

1.//DbMessage._msg Members are of this type   
2.class Message {  
3.public:  
4.    //message initialization   
5.    explicit Message(SharedBuffer data) : _buf(std::move(data)) {}  
6.    // Head header data   
7.    MsgData::View header() const {  
8.        verify(!empty());  
9.        return _buf.get();  
10.    }  
11.    // Get the... In the network data message op Field   
12.    NetworkOp operation() const {  
13.        return header().getNetworkOp();  
14.    }  
15.    //_buf Release empty   
16.    bool empty() const {  
17.        return !_buf;  
18.    }  
19.    // Get the total length of the message messageLength  
20.    int size() const {  
21.        if (_buf) {  
22.            return MsgData::ConstView(_buf.get()).getLen();  
23.        }  
24.        return 0;  
25.    }  
26.    //body length   
27.    int dataSize() const {  
28.        return size() - sizeof(MSGHEADER::Value);  
29.    }  
30.    //buf Reset   
31.    void reset() {  
32.        _buf = {};  
33.    }  
34.    // use to set first buffer if empty  
35.    //_buf Use it directly buf Space   
36.    void setData(SharedBuffer buf) {  
37.        verify(empty());  
38.        _buf = std::move(buf);  
39.    }  
40.     // hold msgtxt copy to _buf in   
41.    void setData(int operation, const char* msgtxt) {  
42.        setData(operation, msgtxt, strlen(msgtxt) + 1);  
43.    }  
44.    // according to operation and msgdata Construct a complete mongodb message   
45.    void setData(int operation, const char* msgdata, size_t len) {  
46.        verify(empty());  
47.        size_t dataLen = len + sizeof(MsgData::Value) - 4;  
48.        _buf = SharedBuffer::allocate(dataLen);  
49.        MsgData::View d = _buf.get();  
50.        if (len)  
51.            memcpy(d.data(), msgdata, len);  
52.        d.setLen(dataLen);  
53.        d.setOperation(operation);  
54.    }  
55.    ......  
56.    // obtain _buf Corresponding pointer   
57.    const char* buf() const {  
58.        return _buf.get();  
59.    }  
60.  
61.private:  
62.    // Storage of received data buf  
63.    SharedBuffer _buf;  
64.};  

       Message Is the operation mongodb The most direct implementation class for sending and receiving messages , This class mainly completes a complete mongodb Message encapsulation . of mongodb After the header body More parsing is implemented in DbMessage Complete in class ,DbMessage Class inclusion Message Members of the class _msg. actually ,Message Message information in handleRequest(...) The instance service entry is assigned to DbMessage._msg, After the message body Processing continues by DbMessage Class related interface completes processing .DbMessage and Message Class relations are as follows :

1.class DbMessage {  
2.    ......  
3.    // contain Message Member variables   
4.    const Message& _msg;  
5.    //mongodb Message starting address 
6.    const char* _nsStart; 
7.    // Message end address 
8.    const char* _theEnd; 
9.}  
10.  
11.DbMessage::DbMessage(const Message& msg) : _msg(msg),   
12.  _nsStart(NULL), _mark(NULL), _nsLen(0) {  
13.    // One mongodb message (header+body) The end address of the data   
14.    _theEnd = _msg.singleData().data() + _msg.singleData().dataLen();  
15.    // Message starting address  [_nextjsobj, _theEnd ] Between the data is a complete mongodb message   
16.    _nextjsobj = _msg.singleData().data();  
17.    ......  
18.} 

      DbMessage._msg The members are DbMessage  type ,DbMessage Of _nsStart and _theEnd Each member has a complete record mongodb The starting address and ending address of the message , Through these two pointers, you can get a complete mongodb The whole content of the message , Include header and body.

      Be careful :DbMessage Is early mongodb edition (version<3.6) For messages body Parsing encapsulated classes , These classes are for opCode=[dbUpdate, dbDelete] The operation of this interval . stay mongodb The new version (version>=3.6) in ,body Parsing and encapsulation by op_msg.h and op_msg.cpp In the code file clase OpMsgRequest{} Finish processing .

3.4 OpMsg Packet parsing encapsulation core code implementation

        Mongodb from 3.6 The version starts to use by default OP_MSG Operation as default opCode, Is an extensible message format , Designed to include other opcodes , The new version of the read and write request protocol is corresponding to the opcode .OP_MSG Corresponding mongodb message body Parsing encapsulation is handled by OpMsg Class related interfaces complete ,OpMsg::parse(Message) from Message The message is resolved from body Content , Its core code implementation is as follows :

1.struct OpMsg {   
2.      ......  
3.    //msg For analytic assignment, see OpMsg::parse     
4.    // Various orders (insert update find etc. ) It's all stored in the body in   
5.    BSONObj body;    
6.    //sequences I don't understand the usage for the time being , It doesn't feel useful ? Skip first   
7.    std::vector<DocumentSequence> sequences; // See OpMsg::parse  
8.}  

1.// from message Resolve in OpMsg Information   
2.OpMsg OpMsg::parse(const Message& message) try {  
3.    //message Can't be empty , also opCode It has to be for dbMsg  
4.    invariant(!message.empty());  
5.    invariant(message.operation() == dbMsg);  
6.    // obtain flagBits  
7.    const uint32_t flags = OpMsg::flags(message);  
8.    //flagBits Effectiveness check ,bit 0-15 The Chinese can only deal with 0 And the 1 Bit operation   
9.    uassert(ErrorCodes::IllegalOpMsgFlag,  
10.            str::stream() << "Message contains illegal flags value: Ob"  
11.                          << std::bitset<32>(flags).to_string(),  
12.            !containsUnknownRequiredFlags(flags));  
13.  
14.    // Check code default 4 byte   
15.    constexpr int kCrc32Size = 4;  
16.    // Judge that mongo message body Check whether the content is enabled   
17.    const bool haveChecksum = flags & kChecksumPresent;  
18.    // If there is a check function enabled , At the end of the message 4 Byte is the check code   
19.    const int checksumSize = haveChecksum ? kCrc32Size : 0;  
20.    //sections Field contents   
21.    BufReader sectionsBuf(message.singleData().data() + sizeof(flags),  
22.                          message.dataSize() - sizeof(flags) - checksumSize);  
23.  
24.    // By default, the bit is set first false  
25.    bool haveBody = false;  
26.    OpMsg msg;  
27.    // analysis sections Corresponding command request data   
28.    while (!sectionsBuf.atEof()) {  
29.        //BufReader::read Read kind Content , A byte   
30.        const auto sectionKind = sectionsBuf.read<Section>();  
31.        //kind by 0 Corresponding to the command request body Content , The content passes bson Report errors   
32.        switch (sectionKind) {  
33.            //sections The first byte is 0 That is the body  
34.            case Section::kBody: {  
35.                // There can only be one by default body  
36.                uassert(40430, "Multiple body sections in message", !haveBody);  
37.                haveBody = true;  
38.                // Command requested bson The information is stored here   
39.                msg.body = sectionsBuf.read<Validated<BSONObj>>();  
40.                break;  
41.            }  
42.  
43.            //DocSequence I don't understand for the moment , It's rarely used , skip , And so on   
44.            // After the analysis of the mainstream function of this series of articles , Look back and analyze   
45.            case Section::kDocSequence: {  
46.                  ......  
47.            }  
48.        }  
49.    }  
50.    //OP_MSG There has to be body Content   
51.    uassert(40587, "OP_MSG messages must have a body", haveBody);  
52.    //body and sequence To judge again   
53.    for (const auto& docSeq : msg.sequences) {  
54.        ......  
55.    }  
56.    return msg;  
57.}  

        OpMsg Class quilt OpMsgRequest Class inheritance ,OpMsgRequest Class core interface is to parse out OpMsg.body Database information and table information in ,OpMsgRequest Class code implementation is as follows :

1.// Protocol analysis will use , see runCommands  
2.struct OpMsgRequest : public OpMsg {  
3.    ......  
4.    // Construct initialization   
5.    explicit OpMsgRequest(OpMsg&& generic) : OpMsg(std::move(generic)) {}  
6.    //opMsgRequestFromAnyProtocol->OpMsgRequest::parse   
7.    // from message Resolve in OpMsg Member information required   
8.    static OpMsgRequest parse(const Message& message) {  
9.        //OpMsg::parse  
10.        return OpMsgRequest(OpMsg::parse(message));  
11.    }  
12.    // according to db body extraFields fill OpMsgRequest  
13.    static OpMsgRequest fromDBAndBody(... {  
14.        OpMsgRequest request;  
15.        request.body = ([&] {  
16.            // fill request.body  
17.            ......  
18.        }());  
19.        return request;  
20.    }  
21.    // from body In order to get db name  
22.    StringData getDatabase() const {  
23.        if (auto elem = body["$db"])  
24.            return elem.checkAndGetStringData();  
25.        uasserted(40571, "OP_MSG requests require a $db argument");  
26.    }  
27.    //find  insert  Wait for command information   body The first of elem Namely command  name   
28.    StringData getCommandName() const {  
29.        return body.firstElementFieldName();  
30.    }  
31.};  

       OpMsgRequest adopt OpMsg::parse(message) It is concluded that OpMsg Information , So as to obtain body Content ,GetCommandName() Interface and getDatabase() And then they start from body Get the library in DB Information 、 Command name information . Through the relevant interface of this class , Command name (find、write、update etc. ) and DB The library has got it .

      OpMsg Module except OP_MSG Besides the analysis of related messages , Also responsible for OP_MSG Message assembly and filling , The interface functions of the module are as follows :

4. Mongod The core code of instance service entrance is implemented

      Mongod Instance service entry class ServiceEntryPointMongod Inherit ServiceEntryPointImpl class ,mongod Instance message parsing processing 、 Command parsing 、 Command execution is handled by this class .ServiceEntryPointMongod The core interface can be subdivided into :opCode Parsing and callback processing 、 Command parsing and searching 、 Command execution three sub modules .

4.1 opCode Parsing and callback processing

      OpCode Opcode parsing and callback processing are performed by ServiceEntryPointMongod::handleRequest(...) Interface implementation , The core code is as follows :

1.//mongod Service processing of client requests     
2.// Through the state machine SSM Module of the following interface calls :ServiceStateMachine::_processMessage  
3.DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) {  
4.    // obtain opCode,3.6 The version corresponds to the client, which is used by default OP_MSG  
5.    NetworkOp op = m.operation();   
6.    ......  
7.    // according to message structure DbMessage  
8.    DbMessage dbmsg(m);  
9.    // Get the corresponding... According to the operation context client  
10.    Client& c = *opCtx->getClient();    
11.    ......  
12.    // Get Library . Table information , Note that only dbUpdate<opCode<dbDelete Of opCode The request was passed dbmsg Get library and table information directly   
13.    const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL;  
14.    const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString();  
15.    ....  
16.    //CurOp::debug  initialization opDebug, Slow log related records   
17.    OpDebug& debug = currentOp.debug();  
18.    // Slow log threshold   
19.    long long logThresholdMs = serverGlobalParams.slowMS;  
20.    // when mongodb This slow operation will be recorded ,1 To record only slow operations , That is, the operation time is greater than the set configuration ,2 Means to record all operations     
21.    bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1));  
22.    DbResponse dbresponse;  
23.    if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) {  
24.        // The new version op=dbMsg, So go here   
25.        // from DB get data , The data is obtained through dbresponse return   
26.        dbresponse = runCommands(opCtx, m);     
27.    } else if (op == dbQuery) {  
28.        ......   
29.        // In the early mongodb Version query goes here   
30.        dbresponse = receivedQuery(opCtx, nsString, c, m);  
31.    } else if (op == dbGetMore) {    
32.        // In the early mongodb Version query goes here   
33.        dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug);  
34.    } else {  
35.        ......  
36.        // Earlier versions added   Delete   Change here to deal with   
37.         if (op == dbInsert) {  
38.              receivedInsert(opCtx, nsString, m); // Insert operation entry     The new version CmdInsert::runImpl  
39.         } else if (op == dbUpdate) {  
40.              receivedUpdate(opCtx, nsString, m); // Update operation entry     
41.         } else if (op == dbDelete) {  
42.              receivedDelete(opCtx, nsString, m); // Delete operation entry     
43.         }   
44.    }  
45.    // obtain runCommands execution time , That's internal processing time   
46.    debug.executionTimeMicros = durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses());  
47.    ......  
48.    // Slow logging   
49.    if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) {  
50.        Locker::LockerInfo lockerInfo;    
51.        //OperationContext::lockState  LockerImpl<>::getLockerInfo  
52.        opCtx->lockState()->getLockerInfo(&lockerInfo);   
53.  
54.    //OpDebug::report  Log slow logs to log files   
55.        log() << debug.report(&c, currentOp, lockerInfo.stats);   
56.    }  
57.    // All kinds of Statistics   
58.    recordCurOpMetrics(opCtx);  
59.}  

      Mongod Of handleRequest() The interface mainly completes the following work :

  • from Message In order to get OpCode, In earlier versions, each command had its own value , For example, add, delete, modify and check the previous versions respectively :dbInsert、dbDelete、dbUpdate、dbQuery;Mongodb 3.6 Start , The default request corresponds to OpCode All are OP_MSG, By default, this article only analyzes OpCode=OP_MSG Related treatment .
  • Get the corresponding Client Client information .
  • If it's an early version , adopt Message structure DbMessage, At the same time, it analyzes the stock out . Table information .
  • According to the different OpCode Execute the corresponding callback operation ,OP_MSG The corresponding operation is runCommands(...), The data is obtained by dbresponse return .
  • Get db After the data returned by the layer , Make slow log judgment , If db Layer data access exceeds threshold , Keep a slow log .
  • Set up debug All kinds of statistics of .

4.2 Command parsing and searching

      From the above analysis, we can see that , The interface finally calls runCommands(...), The core code implementation of the interface is as follows :

1.//message It's the corresponding command perform   
2.DbResponse runCommands(OperationContext* opCtx, const Message& message) {  
3.    // obtain message Corresponding ReplyBuilder,3.6 Default correspondence OpMsgReplyBuilder  
4.    // The response data is constructed by this class   
5.    auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message));  
6.    [&] {  
7.        OpMsgRequest request;  
8.        try {  // Parse.  
9.            // Protocol analysis   according to message Get corresponding OpMsgRequest  
10.            request = rpc::opMsgRequestFromAnyProtocol(message);  
11.        }   
12.    }  
13.    try {  // Execute.  
14.        //opCtx initialization   
15.        curOpCommandSetup(opCtx, request);  
16.        //command Initialize to Null  
17.        Command* c = nullptr;  
18.        //OpMsgRequest::getCommandName lookup   
19.        if (!(c = Command::findCommand(request.getCommandName()))) {   
20.             // No corresponding command The following exception handling of   
21.             ......  
22.        }  
23.        // perform command command , The data is obtained through replyBuilder.get() return   
24.        execCommandDatabase(opCtx, c, request, replyBuilder.get());  
25.    }  
26.    //OpMsgReplyBuilder::done Serialize the data   
27.    auto response = replyBuilder->done();  
28.    //responseLength assignment   
29.    CurOp::get(opCtx)->debug().responseLength = response.header().dataLen();  
30.    //  return   
31.    return DbResponse{std::move(response)};  
32.}  

      RunCommands(...) Interface from message Resolve in OpMsg Information , And then get the OpMsg Corresponding command Command message , Finally, the subsequent processing operation corresponding to the command is executed . The main functions are as follows :

  • Get the OpCode Corresponding replyBuilder,OP_MSG Operation corresponds to builder by OpMsgReplyBuilder.
  • according to message It is concluded that OpMsgRequest data ,OpMsgRequest It contains the real command request bson Information .
  • opCtx Initialization operation .
  • adopt request.getCommandName() Return command information ( Such as “find”、“update” Etc ).
  • adopt Command::findCommand(command name) from CommandMap This map Table to find out whether the command command . If you don't find a statement, don't support , If you find a statement to support .
  • call execCommandDatabase(...) Execute the command , And get the execution result of the command .
  • according to command Execution result construction response And back to

4.3 Command execution

1.void execCommandDatabase(...) {  
2.    ......  
3.    // obtain dbname  
4.    const auto dbname = request.getDatabase().toString();  
5.    ......  
6.    //mab The table is stored from bson From the analysis of elem Information   
7.    StringMap<int> topLevelFields;  
8.    //body elem analysis   
9.    for (auto&& element : request.body) {  
10.        // obtain bson Medium elem Information   
11.        StringData fieldName = element.fieldNameStringData();  
12.        // If elem Repetition of information , Exception handling   
13.        ......  
14.    }  
15.    // If it is help command , Then give help Tips   
16.    if (Command::isHelpRequest(helpField)) {  
17.        // give help Tips   
18.        Command::generateHelpResponse(opCtx, replyBuilder, *command);  
19.        return;  
20.    }  
21.    // Authority authentication check , Check the command execution permissions   
22.    uassertStatusOK(Command::checkAuthorization(command, opCtx, request));  
23.    ......  
24.  
25.    // The number of times the command is executed is counted   db.serverStatus().metrics.commands You can get statistics   
26.    command->incrementCommandsExecuted();  
27.    // The real command execution is in this   
28.    retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime);  
29.    // Statistics of the number of failed command execution   
30.    if (!retval) {  
31.        command->incrementCommandsFailed();  
32.     }  
33.     ......  
34.}  

      execCommandDatabase(...) The final call RunCommandImpl(...) Do the actual processing of the corresponding command , The core code of the interface is implemented as follows :

1.bool runCommandImpl(...) {  
2.    // Get command request content body  
3.    BSONObj cmd = request.body;  
4.    // Get... In the request DB Library information   
5.    const std::string db = request.getDatabase().toString();  
6.    //ReadConcern Check   
7.    Status rcStatus = waitForReadConcern(  
8.        opCtx, repl::ReadConcernArgs::get(opCtx), command->allowsAfterClusterTime(cmd));  
9.    //ReadConcern The inspection failed , Direct exception handling   
10.    if (!rcStatus.isOK()) {  
11.         // exception handling   
12.         return;  
13.    }  
14.    if (!command->supportsWriteConcern(cmd)) {  
15.        // Command not supported WriteConcern, But the corresponding request contains WriteConcern To configure , Direct error reporting does not support   
16.        if (commandSpecifiesWriteConcern(cmd)) {  
17.            // exception handling "Command does not support writeConcern"  
18.            ......  
19.            return result;  
20.        }  
21.    // call Command::publicRun Perform different command operations   
22.        result = command->publicRun(opCtx, request, inPlaceReplyBob);  
23.    }  
24.    // extract WriteConcernOptions Information   
25.    auto wcResult = extractWriteConcern(opCtx, cmd, db);  
26.    // Extract exception , Direct exception handling   
27.    if (!wcResult.isOK()) {  
28.        // exception handling   
29.        ......  
30.        return result;  
31.    }  
32.    ......  
33.    // Execute the corresponding command Command::publicRun, Perform different command operations   
34.    result = command->publicRun(opCtx, request, inPlaceReplyBob);  
35.    ......  
36.}

      RunCommandImpl(...) Interface finally calls the interface to input the parameter command, perform  command->publicRun(...) Interface , That is, the common of command module publicRun.

4.4 summary

      Mongod The service entrance starts with message Resolve in opCode opcode ,3.6 The default operation code of the client corresponding to the version is OP_MSQ, This operation corresponds to OpMsgRequest Information . And then from message From the original data, it is found that command After the command string , Continue through the big picture Map Does table lookup support this command operation , If supported, execute the command ; If not , Direct abnormal printing , At the same time return to .

5. Mongos The core code of instance service entrance is implemented

       mongos Service entry core code implementation process and mongod Service entry code implementation process is almost the same ,mongos example message analysis 、OP_MSG Opcode processing 、command Command search process and the previous chapter mongod The instance processing process is similar to , This chapter will not analyze in detail .Mongos The instance service entry process is as follows :

      ServiceEntryPointMongos::handleRequest(...)->Strategy::clientCommand(...)-->runCommand(...)->execCommandClient(...)

      The final interface core code implementation is as follows :

1.void runCommand(...) {  
2.    ......  
3.    // Get request command name  
4.    auto const commandName = request.getCommandName();  
5.    // From the overall situation map Find in table   
6.    auto const command = Command::findCommand(commandName);  
7.    // There is no counterpart command There is , Throwing an exception indicates that the command is not supported   
8.    if (!command) {   
9.        ......  
10.        return;  
11.    }   
12.    ......  
13.    // Carry out orders   
14.    execCommandClient(opCtx, command, request, builder);   
15.    ......  
16.}  
17.
18.void execCommandClient(...)  
19.{   
20.    ......  
21.    // Certification check , Whether there is an operation to command Authority to order , If there is no exception, it will prompt   
22.    Status status = Command::checkAuthorization(c, opCtx, request);    
23.    if (!status.isOK()) {  
24.        Command::appendCommandStatus(result, status);  
25.        return;  
26.    }  
27.    // The number of times the command is executed increases , The agent also needs to be counted   
28.    c->incrementCommandsExecuted();   
29.    // if necessary command Statistics , Then add 1  
30.    if (c->shouldAffectCommandCounter()) {  
31.        globalOpCounters.gotCommand();  
32.    }  
33.    ......  
34.    // Some commands do not support writeconcern To configure , Report errors   
35.    bool supportsWriteConcern = c->supportsWriteConcern(request.body);  
36.    // I won't support it writeconcern Request with this parameter , Direct exception handling "Command does not support writeConcern"  
37.    if (!supportsWriteConcern && !wcResult.getValue().usedDefault) {  
38.        ......  
39.        return;  
40.    }  
41.    // Execute the public of this command publicRun Interface ,Command::publicRun  
42.    ok = c->publicRun(opCtx, request, result);   
43.    ......  
44.}  
  1. Tips: mongos and mongod A small difference between the core code implementation of instance service portal
  • Mongod example opCode Opcode parsing 、OpMsg analysis 、command The search and corresponding command call processing are handled by class ServiceEntryPointMongod{...} Class together .
  • mongos The example is that opCode Opcode parsing is left to class ServiceEntryPointMongos{...} Class implementation ,OpMsg analysis 、command The search and corresponding command call processing is placed in clase Strategy{...} Class to deal with .

6. summary

        Mongodb Message analysis and assembly process summary

  • A complete mongodb A message consists of a general message header Head +body Part of it is made up of .
  • Body Part content , According to the header of the message opCode To decide on a different body Content .
  • 3.6 Version corresponds to client request opCode The default is OP_MSG, The opcode corresponds to body Partly from flagBits + sections + checksum form , among sections The real command request information is stored in , has bson Data format save .
  • Header Head and body The process of packaging and parsing newspaper style is made up of class Message {...} Class implementation
  • Body In the corresponding command Command name 、 Library name 、 Table name resolution is in mongodb(version<3.6) In the lower version of the protocol class DbMessage {...} Class implementation
  • Body In the corresponding command Command name 、 Library name 、 Table name resolution is in mongodb(version<3.6) In the lower version of the protocol struct OpMsgRequest{...} The structure and struct OpMsg {...} Class implementation

      Mongos and mongod The service entry processing flow of the instance is similar with little difference , The overall process flow is as follows :

  • from message It is concluded that opCode opcode , The corresponding opcode callback is executed according to different opcodes .
  • according to message It is concluded that OpMsg request Information ,mongodb The command information of the message is stored in this body in , The body has bson Format store .
  • from body Resolve in command Command string information ( Such as “insert”、“update” etc. ).
  • From the overall situation _commands map Table to see if the command is supported , If supported, execute the command to process , If it is not supported, an error will be reported directly .
  • Finally find the corresponding command After the command , perform command The function of run Interface .

      The graphical summary is as follows :

      explain : The first 3 In fact, the protocol analysis and encapsulation process of the chapter should be regarded as the category of network processing module , The purpose of this paper is to analyze command The command processing module is convenient , This part of the implementation is summarized to the command processing module , It's easy to understand .

    Tips: We will continue to share different command Command execution details .

7. remaining problems

     The first 1 Statistics in the chapter , Will be in command After analyzing the core code of the module, the answer will be revealed ,《mongodb command Command processing module source code implementation 2 》 Continue to analyze , Stay tuned .

版权声明
本文为[Yang Yaya - focus on mongodb and high performance Middleware]所创,转载请带上原文链接,感谢