Because the company hadoop There are some bottlenecks in the cluster , Optimization is needed without increasing the number of machines , Whether it's storage or processing performance , More reasonable use of existing cluster resources , So I came to learn a wave of hadoop Of rpc Relevant knowledge and hdfs Knowledge of , as well as yarn Related optimizations , After learning, I really understand what aspects can be optimized , Which parameters can be adjusted , It's a little bit of an epiphany , Most of the content of this article comes from 《Hadoop 2.x HDFS Source analysis 》, I think this book is well written , You can learn a lot , After reading this blog, if you don't understand , You can still continue to study this book , It's very detailed , Very clear . This article mainly from RPC Principle 、hdfs Communication protocol and Hadoop RPC The realization of these three parts are described .
One 、RPC principle
RPC(Pemote Procedure CallProtocol) It is a protocol that calls remote computer service through network ,RPC The protocol assumes that there are some network transport protocols , Such as TCP,UDP, And through these transmission protocols, access request or response information is transmitted between communication programs . stay OSI Network communication model ,RPC Across the transport and application layers .RPC The main function goal of is to build distributed computing （ application ） More easily , It does not lose the semantic simplicity of local call when providing powerful remote call capability . To achieve this goal ,RPC The framework needs to provide a transparent invocation mechanism so that users do not have to explicitly distinguish between local invocation and remote invocation .
RPC It's usually a client / Server model . The requester is a client , The service provider is a server . The client will first send a call request with parameters to the server , Wait for the response message from the server , On the server , The service provider will sleep until a call request arrives , When a request is received , The server will call on the request , The result of the calculation is , Finally return to the client .RPC The frame diagram of is shown in the following figure ：
The picture above ,RPC It mainly includes the following parts ：
1、 Communication module ： transmission RPC Network communication module for request and response , Can be based on TCP agreement , It can also be based on UDP agreement , They pass request and reply messages between the client and the server , In general, there is no processing of packets . request – There are two ways to implement this protocol , As shown in the figure below .
2、 Client's stub Program ： On the client side stub Programs behave like native programs , But the underlying layer will serialize the call parameters and requests and send them to the server through the communication module . after stub Will wait for a response from the server , And the response information is deserialized to the requester .
3、 Server side stub Program ：stub The program deserializes the call request and parameters sent by the client , Trigger the corresponding service program according to the call information , Then the response information of the service program Xu Lihua is sent back to the client .
4、 Request procedure ： The requester calls the client like a local method stub Program , And then to receive stub The return response information of the program .
5、 Service program ： The server will receive from stub Call request for , Execute the corresponding logic and return the execution result .
- transparency ： Calling programs on other machines remotely , For users, it's like calling local methods .
- High performance ：RPC Server Be able to process multiple sources simultaneously Client Request .
- Controllability ：JDK There is already a RPC frame ——RMI, But it's time to PRC The framework is too heavyweight and less controllable , therefore Hadoop RPC Implemented custom RPC frame .
1.3、RPC Request basic steps
- The client program calls the client generated by the system locally Stub Program ;
- The Stub The program serializes the function call information and encapsulates it into a message package according to the requirements of the network communication module , And send it to the communication module and send it to the remote server .
- After the remote server receives this message , Send this message to the corresponding server Stub Program ;
- Stub Program unpacking message , And deserialize , Form the form required by the process being tuned , And call the corresponding function ;
- The called function executes with the parameters obtained , And return the result to the server Stub Program ;
- Stub The program encapsulates this result as a message , It is transmitted to the client program step by step through the network communication module .
Two 、HDFS Communication protocol of
HDFS The communication protocol is abstract HDFS Call interface between nodes , It is mainly divided into hadoop RPC Interface and streaming interface
2.1、hadoop RPC Interface
This interface defines the client and namenode The interface between nodes , It is used for all operations of the client on the file system , Both reading and writing need to interact with this interface .
The interface defines that it is initiated by the client ,namenode Response operation , It mainly includes hdfs File reading and writing related operations ,hdfs Namespace , snapshot , Cache related operations .
（ The client and Namenode Interaction ） General operation , The client will go through getBlockLocations() Method direction Namenode Get the location information of the file （ Refers to all of the... That store copies of this block datanode Information about ）, And use reportBadBlocks() Way to Namenode Reporting error blocks .
（ Write , Tracking data ） First of all, I will use create() Methods by hdfs Create a new empty file in the file system directory tree , And then call addBlock() Method to obtain the location information of the data block where the file is stored , Finally, according to the location information, the client and datanode Build data flow pipeline , Write data . It's a little different , First, through append() Method to get the location information of the last writable block and open an existing file （ It's not full ）, And then set up the data pipeline flow , And trace data to the node , When it's full , It's like create() The method is the same , The client will call addBlock() Method to get a new block of data . When the client has finished writing the entire file , Would call complete() Method notice Namenode, This operation will commit the newly written HDFS All data blocks of the file , When the data block is full, the number of copies is , Then return to true, Otherwise return to false; Will try again .
The above is successfully completed , If the client gets a new application block , Unable to establish connection , Would call abandonBlock() Method to give up a data block , The client will pass through again addBlock() Method to get a new block of data .
When the client writes a block of data , If something goes wrong with the replica node , The client will call getAdditionalDatanode() Method direction Namenode Apply for a new datanode To replace the fault datanode. Then the client calls updateBlockForPipeline() Method direction Namenode Request to assign a new timestamp to this block , In this way, the time stamp of the data block of the failed node will be expired , Back to this, delete , Finally, the client can use the new timestamp to create a new data pipeline and write data more recently .
If the client fails in the process of writing , To prevent trouble , For any one client All open files need client Periodically call clientProtocol.renewLease() Method to update the lease , If Namenode I haven't received it for a long time client Lease update information for , You think client fault , Trigger a lease recovery operation , Close the file and synchronize the status of this file block on all data nodes , Make sure hdfs The file in the system is correct and consistent .
If in the process of writing data Namenode Something goes wrong , You need to HA Come into play .
The client and datanode The interface between , Main user client access datanode Node information is called , Real reading and writing is done through a streaming interface . There are two main definitions , Part of it is to support HDFS The operation of reading and writing files , For example, call getReplicaVisibleLength() obtain datanode The real data length of a data block of a node and getBlockLocalPathInfo（） Such method , The other part is to support DFSAdmin China and datanode Node management related commands .
datanode And namenode Communication interface between , Include namenode Return to... Through the method in the interface datanode Give orders .datanode Through this interface, it can be used to namenode To register , Report block information and cache information .DataNode Use this interface with Namenode handshake 、 register 、 Send a heartbeat 、 Report all and incremental data blocks ,NameNode Will be in Datanode In the heartbeat response of Namenode Instructions . The main methods of this interface are divided into three types ,Datanode Start related to , Heartbeat is related , Data block read and write related .
（Datanode Start related ）Datanode The kick-off meeting will be with NameNode Conduct 4 Secondary interaction , adopt versionRequest() And NameNode Shake hands , And then call refisterDatanode() towards NameNode Register the current datanode, Then call blockReport() report datanode All data block information stored on , Last call cacheReport() report datanode All data blocks cached .
The handshake is mainly to return namenode Some of the namespace of ID, colony ID,hdfs Version number, etc ,datanode Check and compare after receiving the message , To judge whether it can be related to namenode Working together , Can I register .
The report will be based on datanode Report data block storage and establish data block and datanode Correspondence of nodes .blockReport() Occurs at startup and at specified intervals .cacheReport() and blockReport() The method is exactly the same . It's just reporting the current datanode All data blocks cached on .
（ Heartbeat is related ）datanode I will go to namenode Send a heartbeat （dfs.heartbeat.interval=3s）, If namenode I haven't received it for a long time datanode Heartbeat information of , Think of it as datanode invalid . Every heartbeat contains datanode The state of storage on the node , Cache state , The number of connections that are writing file data , Threads used to read and write data, etc . In the open HA Under the state of ,datanode It needs to be addressed to two namenode At the same time send heartbeat information , But only active To datanode Send instructions .
（ Data block read-write correlation ）datanode Will send to namenode Reporting broken data blocks , And on a regular basis namenode report datanode New data block or deleted data block .
datanode and datanode The interface between , It is mainly used for data block recovery operation , And synchronization datanode The node stores information about the copy of the data block . Mainly used for lease recovery operations .
When the client opens a file for operation , First get the lease for this file , And the lease needs to be updated regularly , Otherwise ,namenode You think it's time to client abnormal ,namenode The lease recovery operation will be triggered （ Synchronize all of the datanode The status of the file database on , And force this file to close ）.
Lease reinstatement is not by namenode Controlled, responsible , It is namenode Choose one from the data pipeline datanode Restore the primary node of , The recovery command is then issued to trigger the data node to control the lease recovery operation , In other words, there is a process in which the recovery master node coordinates the entire lease recovery operation . The lease recovery operation is to take all of the datanode The node holds the same block state （ Timestamp and block length ） Synchronous consistency .
It is mainly used for namenode Of ha Mechanism , Or in the case of a single node secondaryNamenode also namenode Communication interface between
2.2、 Streaming interface
The streaming interface is HDFS Based on TCP perhaps HTTP Implemented interface , stay HDFS in , Streaming interface includes based on TCP Of DataTransferProtocol Interface , as well as HA Architecture Active Namenode and Standby Namenode Between HTTP Interface .
DataTransferProtocol It is used to describe writing or reading Datanode The data is based on TCP The streaming interface ,HDFS The transmission of data block between client and data node and between data node and data node is based on DataTransferProtocol Interface implemented .
2、Active Namenode and Standby Namenode Between the HTTP Interface
Namenode The namespace of the file system is saved in a regular fsimage In file , And will Namenode The modification of the namespace of is written to editlog In file , Regular consolidation fsimage and editlog file . This merge operation is performed by Secondary Namenode perhaps Standby Namenode To achieve , After merging, you have to synchronize with Active Namenode, stay Active Namenode and Standby Namenode Between HTTP The interface is used to transmit fsimage Of documents .
Two 、Hadoop RPC The implementation of the
Hadoop As a distributed storage system , Communication and interaction between nodes is essential , So in hadoop There is a communication mechanism between nodes .RPC(Pemote Procedure CallProtocol, Remote procedure call protocol ) Allow local programs to call applications on remote machines to provide services just like calling local methods ,Hadop RPC The mechanism is based on IPC Realized , Mainly used java Dynamic proxy for ,java NIO as well as protobuf And other basic technologies （ Not based on java Of RMI）.
Hadoop RPC The framework consists of three main classes ：RPC、Client and Server class ,RPC Class is used to provide an external use of Hadoop RPC Framework interface ,Client Class is used to implement client functions ,Server Class is used to implement server-side functions .
3.1、RPC The realization of the class
The client can call the program through RPC Class provides the waitForProxy() and getProxy() Method to get the specified RPC The proxy object of the protocol , then RPC The client can call the method of proxy object to send RPC Request to server .
On the server , Server program call RPC Inside Builder.build() Method to construct a RPC.Server class , And then call RPC.server.start() Method start up Server Object listens and responds to RPC request .
3.2、Client The realization of the class
HDFS Client Will get a ClientProtocolPB The proxy object of the protocol , And call... On this proxy object RPC Method , The proxy object will call RPC.Client.call() Method will serialize the RPC The request is sent to the server .
Client The flow chart of sending request and receiving response is shown below ：Client Class has only one entry , namely call() Method , The proxy class is called Client.call() Methods will RPC The request is sent to the remote server , Then wait for the response from the remote server .
Client The flow chart of sending request and receiving response is shown above ： It can be divided into the following steps ：
- Client.call() Methods will RPC The request is encapsulated in a Call object , It's preserved RPC The completion flag of the call , Return value information and exception information , then call() Method creates a connection object , Used to manage client and server Of socket Connect . use ConnectionId As key, Will the new Connection The object is put into Client.connections Save field values , With callId As key, Will construct Call Objects in the Connection.calls Save in the field .
- Client.call() Method call Connection.setupIOstreams() Method establishment and server Of socket Connect ,setupIOstreams() The method also starts connection Threads , These threads listen to socket And read server Response information sent back .
- Client.call() Method call Connection.sendRpcRequest() Method to send RPC Ask to Server.
- Client.call() Method call Call.wait() stay Call Wait on the object , wait for server Return response information .
- Connection Thread received Server Response information sent back , According to the information in the response, find the corresponding call object , Then set the Call Object's return field , And call call.notify() Wake up the Client.call() Method's thread reads Call The return value of the object .
RPC.Client Sending requests and receiving responses in is done by two separate threads , Sending a request thread is a call Client.call() Method thread , And the receiving response thread is call() Starting up connect Threads .
Inner class Connection It's a thread class , Provide to build Client To Server Of Socket Connect , send out RPC Request and read RPC Response information and other functions .Client With each Server Maintain a communication connection between , Basic information and operations related to the connection are encapsulated in Connection Class , Basic information mainly includes the unique identification of communication connection （remoteId）、 And Server End to end communication Socket（socket）、 Network input data flow （in）、 Network output data stream （out）、 preservation RPC Requested hash table （calls） etc. .
Call Class encapsulates a RPC request , It contains 5 Member variables , They are the only signs id、 Function call information param、 Function execution return value value、 Error or exception information error And the execution completion identifier done. because Hadoop RPC Server Asynchronous processing of client requests , This makes the order in which remote procedure calls occur has nothing to do with the order in which results are returned , and Client Correct is through id Identify different function calls . When the client sends a request to the server , Just fill in id and param Two variables , And the rest of the 3 A variable （value、error and done） It is filled by the server according to the execution of the function .
3.3、Server The realization of the class
To improve performance ,Server Class uses many techniques to improve concurrency , Including thread pool ,javaNIO Provided Reactor Patterns, etc . For better understanding Server Class design , We push forward step by step ：
RPC The processing flow of the server is similar to that of all network program servers ：1、 Read request ;2、 Deserialize the request ;3、 Processing requests ;4、 Serialize the response ;5、 Send back the response .
Reactor Pattern is a design pattern widely used on the server side , It's also an event driven design pattern ;Reactor The processing flow of ： The application registers with an intermediary IO event , When the middleman hears this IO After the time , It notifies and wakes the application to handle this event , The man in the middle is actually a thread that keeps waiting and looping , It receives the registration of all applications , Wipe the app side by side IO Is the event ready , If it is ready, notify the application to process .
A simple one based on Reactor The network server design of the mode is shown in the figure below ： It mainly includes reactor、acceptor as well as hadndler Equal module , among reactor Responsible for monitoring all the IO event , When a new IO Event time ,reactor Sleep wake up the event corresponding module processing .acceptor Responsible for responding to socket Connection request event , Will receive a request to establish a connection , Post construction handler object ,handler Responsible for providing reactor register IO Read events , Then the corresponding business logic processing is carried out , Finally send back the response .
The main steps are as follows ：
- The client sends socket Connect request to server , Server side reactor The object is listening to this IO request , because acceptor The object is reactor Object socket Connection requested iO event , therefore reactor Will start acceptor Object response socket Connection request .
- acceptor Object will receive from the client socket Connection request , And create a handler object ,handler The construction method of the object is in reactor Object IO Read events .
- After the client establishes the connection , Will pass socket send out RPC request ,RPC Request to reach reactor after , There will be reactor Objects are distributed to the corresponding handler Object processing .
- handler Objects are read from the network RPC request , Then deserialize the request and execute the logic for the request , Finally, the response information is serialized and passed through socket Send back to client .
Because of the above design, there is only one thread on the server side , So ask for handler Read request in 、 The process of executing requests and sending responses must be able to be processed quickly , If a block occurs in a link , The entire server logic is blocked . So let's look at multithreading Reactor Mode of network server structure .
3.3.2、 Multithreading Reactor Pattern
On the basis of Reactor Based on the model , The read request part which takes a long time is also separated from the business logic processing part , Leave it to two separate thread pools , Respectively readers Thread pool and handler Thread pool of .readers The thread pool contains several execution reads RPC Request task Reader Threads . They will be in Reactor Register to read on RPC request IO event , And then read it from the network RPC request , And will RPC The request is encapsulated in a Call In the object , The final will be Call Object is put into the shared message queue MQ in . and handers The thread pool contains many handler Threads , They're constantly coming from the shared message queue MQ Remove from RPC request , Then execute the business logic and send the response to the client . That's the guarantee IO Monitoring and distribution of events ,RPC Requests are read and responded to in different threads , It greatly improves the concurrent performance of the server . The specific architecture is as follows ：
The picture above shows multithreading Reactor Pattern version ,IO Event monitoring 、RPC The request can be read and processed concurrently , But like hadoop Of Namenode Such an object , There will be many at the same time socket Connection request and RPC The way to ask , This will cause Reactor Processing and distributing these IO There is a block in the event , Cause server performance degradation , On this basis, it can be expanded to multiple reactor The pattern of .
3.3.3、 Multiple Reactor Multithreading mode
Multiple Reactor The structure of multithreading mode is shown in the figure below ：
here mainReactor Responsible for monitoring socket Connection event ,readReactor Responsible for monitoring IO Read events ,respondSelector Responsible for monitoring IO Write events , There will be multiple readReactor Reduce the load on the system , Different Reader The thread will be different according to certain logic readReactor Registered on IO Read events . When acceptor Established socket After connection , From readers Take one out of the thread pool reader Thread to start RPC Requested process .Reader The thread will select one according to certain logic readReactor Object and register to read on this object RPC Requested IO event . And then it's up to readReactor Is there any monitoring on the network RPC Request arrival , And set out Reader Thread reads , When handler Successfully handled a RPC After the request , Will go to respondSelector Register to write RPC Respond to IO event , When socket When the output stream pipeline can write data ,sender Class can send the response to a client .
3.3.4、server Class design
server The design structure of the class is as follows , Basic and multiple reactor The design pattern of the multithreaded version is similar to .
- Listener： Be similar to Reactor In the pattern mainReactor,Listener There is a... In the object Selector object acceptSelector, Responsible for monitoring from the client Socket request , When acceptSelector After listening to the connection request ,Listener Object initializes the connection , After that, we use polling from readers Select one from the thread pool reader threading RPC The requested read operation .
- Reader： And Reactor In the pattern Reader The threads are the same , be used for RPC Read request ,Reader There is a... In the thread class Selector object readSelector, similar Reactor In the pattern readReactor, This object is used to monitor whether the network can be read RPC request . When readSelector Strong enough to be readable RPC After the request , Will wake up Reader The thread reads the request , And encapsulate the request in a Call In the object , And then put this Call Objects in the CallQueue In line .
- Handler： And Reactor In the pattern Handler similar , Used for processing RPC Request and send back a response ,Handler The object will come from CallQueue Take it out constantly RPC request , And then execute RPC Request the corresponding local function for processing , Finally, encapsulate the response and send it back to the client .
- Responder： Used to send RPC Respond to , Will be in Responder Inside respondSelector Register a write response event on , there respondSelector and Reactor Medium respondSelector The concept is the same , When respondSelector Strong enough to be able to write response to network conditions , Will inform Responder Send the remaining response back to the client
server Class processing RPC Processing flow of request ：
- Listener Thread acceptSelector stay ServerSocketChannel Registered on OP_ACCEPT event , And create readers Thread pool , Every Reader Of readSelector There is no monitoring at this time Channel.
- Client send out socket Connection request , set out Listener Of acceptSelector Wake up the Listener Threads .
- Listener call ServerSocketChannel.accept() Create a new SocketChannel
- Listener from readers Select a thread from the thread pool , And in Reader Of readSelector Registered on OP_READ event
- Client send out RPC Request datagram , set out Reader Of selector Wake up the Reader Threads
- Reader from socketChannel Read data from , Encapsulated into Call object , And then put in a shared duel CallQueue in
- handlers The threads in the thread pool are all in CallQueue Blocking up , When there is Call After the object is put in , One of them Handler The thread is awakened , And then according to Call Object information BlockingServer Object's callBlockingMethod() Method , then Handler Write the response to SocketChannel in .
- If handler It was found that the response could not be fully written to SocketChannel in , Will be in Responder Of respondSelector Register one on OP_WRITE Time , When socket Back to normal ,Responder Will be awakened to continue writing responses .
Server An introduction to the inner classes of a class ：
- Listener class ： It's a thread class , Whole Server There will only be one Listener Threads , Used to listen to... From the client Socket Connection request , For every new arrival socket Connection request ,Listener Will come from readers Select one from the thread pool Reader threading ,Listener A is defined in Selector object , Responsible for monitoring SelectionKey.OP_ACCEPT event .
- Reader class ： It's a thread class , Every Reader The thread is responsible for reading from several client connections RPC request , And in the Server There will be more than one Reader Threads form a readers Thread pool ,readers Thread pool concurrent reads RPC request , Improved Server Handle RPC Request speed ,Reader Class defines its own readSelector Field , For the quiver SelectionKey.OP_READ event .
- Connection class ： maintain Server and Client Between Socket Connect ,Reader The thread will call readAndProcess() Methods from IO Read a from the stream RPC request
- Handler class ： A thread class , Responsible for the execution of RPC Request the corresponding local function , Then return the result to the client .Handler The main method of the thread class loops from the shared queue CallQueue Take out the pending Call object , And then call Server.call() Method execution RPC Call the corresponding local function .
- Responder class ： A thread class , One Server Only one of them Responder object ,Responder Inside contains a Selector object responseSelector, For monitoring SelectionKey.OP_WRITE event .responseSelector Will cycle monitoring network environment whether the conditions to send data , after responseSelector Will trigger Responder The thread sends the unfinished response result to the client .
3.4、 be based on RPC The optimization of the
got it RPC After the principle of , The following optimization will naturally understand .
- Handler Number of threads . stay Hadoop in ,ResourceManager and NameNode Namely YARN and HDFS In two subsystems RPC Server, Their corresponding Handler The number is determined by the parameter yarn.resourcemanager.resource-tracker.client.thread-count and dfs.namenode.service.handler.count Appoint , The default values are 50 and 10, When the cluster is large , These two parameter values will greatly affect the system performance
- Maximum client retries . In a distributed environment , It is common for clients to try to connect again due to network failure or other reasons , However, too many attempts may not be conducive to the application with high real-time requirements . The maximum number of client retries depends on the parameter ipc.client.connect.max.retries Appoint , The default value is 10, That is to say, they will continue to try 10 Time （ Every two times there's a gap 1 second ）
- Every Handler The maximum number of threads Call number . By the parameter ipc.server.handler.queue.size Appoint , The default is 100, in other words , By default , Every Handler Thread corresponding Call The queue length is 100. such as , If Handler The number is 10, Then the whole Call queue （ The shared queue callQueue） Maximum length is ：100×10=1000
- ipc.server.listen.queue.size Control the server socket The listening queue length of , namely backlog length , The default value is 128. and Linux Parameters of net.core.somaxconn The default value is also 128. When the server is busy , Such as NameNode,128 It's not enough . So you need to increase backlog, for example 3000 The cluster will ipc.server.listen.queue.size Set up 32768, In order to make the whole parameter achieve the desired effect , The same needs to be done kernel Parameters net.core.somaxconn Let it be greater than or equal to 32768 Value .
Reference resources ：《Hadoop 2.x HDFS Source analysis 》《Hadoop Technology insider : In depth analysis of YARN Architecture and implementation principle 》