当前位置:网站首页>Operation principle of akka actor

Operation principle of akka actor

2021-10-22 10:57:54 Bird's nest

I've been researching recently Scala web There are some problems with the performance of the framework , For example, generate a lot of Actor,GC drawn-out ,CPU The usage rate is too high , perform Actor Of Receive It is the problem of time-consuming operation, etc . doubt Akka There are some problems with the scheduler , Specially sorted out some Akka Background knowledge of scheduler , And analyze it from the source code Actor How to execute .

Dispatcher

Akka MessageDispatcher drive Akka actor function (tick), It can also be said to be the engine of this machine . be-all MessageDispatcher It's all done ExecutionContext trait, This means that they can be used to execute any code , for example Future.

If the Actor Without additional configuration ,ActorSystem A default... Will be used Dispatcher. Default Dispatcher You can also adjust parameters , By default, it uses a specific default-executor. If ActorSystem Pass in a... When creating ExecutionContext, Then this ExecutionContext As this ActorSystem All of the Dispatcher Default executor. Default default-executor yes fork-join-executor, In most cases, its performance is good .

You can get a configuration through the following code Dispatcher:

       
       
       
1
2
       
       
       
// for use with Futures, Scheduler, etc.
implicit val executionContext = system.dispatchers.lookup( "my-dispatcher")

by Actor Set up Dispatcher

If you want to work for your Actor Set a non default dispatcher , You need to do two things :
First of all, configure dispatcher:

       
       
       
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
       
       
       
my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork- join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism- min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism- max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}

Or configure to use thread-pool-executor

       
       
       
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
       
       
       
my-thread-pool-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "thread-pool-executor"
# Configuration for the thread pool
thread-pool-executor {
# minimum number of threads to cap factor-based core number to
core-pool- size- min = 2
# No of core threads ... ceil(available processors * factor)
core-pool- size-factor = 2.0
# maximum number of threads to cap factor-based number to
core-pool- size- max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}

such , You can configure Actor Use the diagram to create a specific disptacher:

       
       
       
1
2
       
       
       
import akka.actor.Props
val myActor = context.actorOf(Props[MyActor], "myactor")
       
       
       
1
2
3
4
5
       
       
       
akka.actor.deployment {
/myactor {
dispatcher = my-dispatcher
}
}

Or in another way , You can specify... In your code dispatcher:

       
       
       
1
2
3
       
       
       
import akka.actor.Props
val myActor =
context.actorOf(Props[MyActor].withDispatcher( "my-dispatcher"), "myactor1")

dispatcher The type of

  • Dispatcher

    • Shareability : unlimited
    • mailbox : Any type , For every one Actor Create a
    • Use scenarios : Default dispatcher ,Bulkheading
    • Bottom use : java.util.concurrent.ExecutorService
      You can specify “executor” Use “fork-join-executor”, “thread-pool-executor” perhaps the FQCN( The full name of a class ) of an akka.dispatcher.ExecutorServiceConfigurator
  • PinnedDispatcher

    • Shareability : nothing
    • mailbox : Any type , For each Actor Create a
    • Use scenarios : Bulkheading
    • Bottom use : whatever akka.dispatch.ThreadPoolExecutorConfigurator
      Default to one “thread-pool-executor”
  • BalancingDispatcher

    • Shareability : Only for the same type of Actor share
    • mailbox : whatever , For all the Actor Create a
    • Use scenarios : Work-sharing
    • Bottom use : java.util.concurrent.ExecutorService
      Specify the use of “executor” Use “fork-join-executor”, “thread-pool-executor” or the FQCN( The full name of a class ) of an akka.dispatcher.ExecutorServiceConfigurator
  • CallingThreadDispatcher

    • Shareability : unlimited
    • mailbox : whatever , Every time Actor Create one per thread ( When needed )
    • Use scenarios : For testing purposes only
    • Bottom use : Calling thread (duh)

mailbox

kka Mailbox Save and send to Actor The news of . Usually every Actor Have your own mailbox , But if you use BalancingDispatcher Use the same BalancingDispatcher All of the Actor Share the same mailbox instance .
Type of built-in mailbox :

  • UnboundedMailbox - Default mailbox
  • SingleConsumerOnlyUnboundedMailbox
  • BoundedMailbox
  • NonBlockingBoundedMailbox
  • UnboundedPriorityMailbox
  • BoundedPriorityMailbox
  • UnboundedStablePriorityMailbox
  • BoundedStablePriorityMailbox
  • UnboundedControlAwareMailbox
  • BoundedControlAwareMailbox

working principle

We only look at local ( The same JVM process ) Of ActRef: LocalActorRef, It defines the send (!) Method :

       
       
       
1
       
       
       
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.sendMessage(message, sender)

actorCell Realized akka.actor.dungeon.Dispatch trait. It implements specific message Sending of :

       
       
       
1
2
3
4
5
6
7
8
9
10
11
12
13
14
       
       
       
def sendMessage(msg: Envelope): Unit =
try {
if (system.settings.SerializeAllMessages) {
val unwrapped = (msg.message match {
case DeadLetter(wrapped, _, _) ⇒ wrapped
case other ⇒ other
}).asInstanceOf[AnyRef]
if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) {
val s = SerializationExtension(system)
s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get
}
}
dispatcher.dispatch( this, msg)
} catch handleException

You can see , Or give it to dispatcher.dispatch Distribute messages .
Look at the specific implementation class Dispatcher.dispatch:

       
       
       
1
2
3
4
5
       
       
       
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
}

Put the message in the corresponding actor In your mailbox , Will call registerForExecution Method .
The most important line of this method is to execute mbox, because mbox Realized ForkJoinTask and Runnable Interface . ( If execution fails , It is also possible to perform )

       
       
       
1
       
       
       
executorService execute mbox

In fact, I will mbox Put it into the thread pool to execute .
mbox Not all at once , But there is throughput Parameter determination . Execute only at a time throughput A message , After execution, it will be added to the thread pool waiting queue , Unless all execution is completed .
So when throughput=1 The time is right actor It's more “ fair ”, such actor Can perform on average .

Several conclusions are drawn :

  • You can tune the size of the mediation thread pool
  • Concrete dispatcher Implement how to execute actor. For example, we can implement a priority queue to execute higher priority Actor.
  • If one Actor Perform more time-consuming operations , such as IO operation , It will affect the execution of the thread pool , The overall throughput decreases . So for these time-consuming Actor Configure a dedicated thread pool
  • Akka Will interrupt a Actor And do something else actor Do you , Then come back and continue with the previous Actor? The answer is no . Because once Actor Give it to the thread pool , The thread will execute it . If you are in the Actor in sleep Threads , Will cause this thread in the thread pool sleep. So you have to think of something , For example, a long business logic is divided into several business logic , Execute only one business logic at a time , It is divided into multiple executions by state transformation .
  • If nothing else , The thread will finish executing actor Some messages in the mailbox , If there is any news , This mailbox will be put into the thread pool for execution , Until there are no pending messages .

Reference material

  1. Dispatchers
  2. tuning dispatchers in akka applications
  3. https://www.zybuluo.com/MiloXia/note/80283
  4. https://github.com/akka/akka/tree/master/akka-actor/src/main/scala/akka/dispatch
  5. http://www.gtan.com/akka_doc/scala/dispatchers.html

版权声明
本文为[Bird's nest]所创,转载请带上原文链接,感谢
https://chowdera.com/2021/10/20211009000611652e.html

随机推荐