akka actor的运行原理
问题导读1.如何配置Dispatcher?
2.Dispatcher的工作原理是什么?
static/image/hrline/4.gif
序言
最近在调研Scala web框架的性能时遇到一些问题, 比如生成巨多的Actor,GC时间过长,CPU使用率太高, 执行Actor的Receive是遇到耗时操作的问题等。怀疑Akka的调度器有些问题,特意整理了一些Akka调度器的背景知识,以及从源代码分析一下Actor是怎么执行地。
Dispatcher
Akka MessageDispatcher驱动Akka actor运行(tick),也可以说是这个机器的引擎。所有的MessageDispatcher都实现了ExecutionContext trait, 这意味着它们可以用来执行任何代码, 例如 Future.
如果对Actor不做额外配置的话,ActorSystem会使用一个缺省的Dispatcher。缺省的Dispatcher也可以进行参数调整,缺省它使用一个特定的default-executor。如果ActorSystem在创建时传入一个ExecutionContext,则此ExecutionContext 将作为此ActorSystem的所有Dispatcher的缺省executor。缺省的default-executor是fork-join-executor,在大部分情况下它的性能还是不错的。
可以通过下面的代码得到一个配置的Dispatcher:
// for use with Futures, Scheduler, etc.
implicit val executionContext = system.dispatchers.lookup("my-dispatcher")
为Actor设置Dispatcher
如果你希望为你的 Actor 设置非缺省的派发器,你需要做两件事:
首先要配置dispatcher:
my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
或者配置使用thread-pool-executor
my-thread-pool-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "thread-pool-executor"
# Configuration for the thread pool
thread-pool-executor {
# minimum number of threads to cap factor-based core number to
core-pool-size-min = 2
# No of core threads ... ceil(available processors * factor)
core-pool-size-factor = 2.0
# maximum number of threads to cap factor-based number to
core-pool-size-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
这样,你就可以配置Actor使用图个特定的disptacher:
import akka.actor.Props
val myActor = context.actorOf(Props, "myactor")
akka.actor.deployment {
/myactor {
dispatcher = my-dispatcher
}
}
或者用另外一种方式,可以在代码中指定dispatcher:
import akka.actor.Props
val myActor =
context.actorOf(Props.withDispatcher("my-dispatcher"), "myactor1")
dispatcher的类型
[*]Dispatcher
[*]可共享性: 无限制
[*]邮箱: 任何一种类型,为每一个Actor创建一个
[*]使用场景: 缺省派发器,Bulkheading
[*]底层使用: java.util.concurrent.ExecutorService
可以指定“executor”使用“fork-join-executor”, “thread-pool-executor” 或者 the FQCN(类名的全称) of an akka.dispatcher.ExecutorServiceConfigurator
[*]PinnedDispatcher
[*]可共享性: 无
[*]邮箱: 任何一种类型,为每个Actor创建一个
[*]使用场景: Bulkheading
[*]底层使用: 任何 akka.dispatch.ThreadPoolExecutorConfigurator
缺省为一个 “thread-pool-executor”
[*]BalancingDispatcher
[*]可共享性: 仅对同一类型的Actor共享
[*]邮箱: 任何,为所有的Actor创建一个
[*]使用场景: Work-sharing
[*]底层使用: java.util.concurrent.ExecutorService
指定使用 “executor” 使用 “fork-join-executor”, “thread-pool-executor” 或 the FQCN(类名的全称) of an akka.dispatcher.ExecutorServiceConfigurator
[*]CallingThreadDispatcher
[*]可共享性: 无限制
[*]邮箱: 任何,每Actor每线程创建一个(需要时)
[*]使用场景: 仅为测试使用
[*]底层使用: 调用的线程 (duh)
邮箱
kka Mailbox 保存发往某 Actor的消息. 通常每个 Actor 拥有自己的邮箱, 但是如果是使用 BalancingDispatcher 使用同一个 BalancingDispatcher 的所有Actor共享同一个邮箱实例.
内置的邮箱的类型:
[*]UnboundedMailbox - 缺省邮箱
[*]SingleConsumerOnlyUnboundedMailbox
[*]BoundedMailbox
[*]NonBlockingBoundedMailbox
[*]UnboundedPriorityMailbox
[*]BoundedPriorityMailbox
[*]UnboundedStablePriorityMailbox
[*]BoundedStablePriorityMailbox
[*]UnboundedControlAwareMailbox
[*]BoundedControlAwareMailbox
工作原理
我们只看本地(同一个JVM进程)的ActRef: LocalActorRef,它定义了send (!)方法:
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.sendMessage(message, sender)
actorCell实现了akka.actor.dungeon.Dispatch trait。它实现了具体的message的发送:
def sendMessage(msg: Envelope): Unit =
try {
if (system.settings.SerializeAllMessages) {
val unwrapped = (msg.message match {
case DeadLetter(wrapped, _, _) ⇒ wrapped
case other ⇒ other
}).asInstanceOf
if (!unwrapped.isInstanceOf) {
val s = SerializationExtension(system)
s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get
}
}
dispatcher.dispatch(this, msg)
} catch handleException
可以看到,还是交给dispatcher.dispatch进行消息的分发。
看具体的实现类Dispatcher.dispatch:
protected def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
}
将消息放在对应的actor的邮箱中后,就会调用registerForExecution方法。
这个方法最重要的一行就是执行mbox,因为mbox实现了ForkJoinTask和Runnable接口。 (如果执行失败,还可能执行一次)
executorService execute mbox
其实是将mbox放到线程池中执行。
mbox并不是一次全部执行完的,而是有throughput参数确定。每次只执行throughput个消息,执行完会加入到线程池等待队列中,除非全部执行完毕。
因此当throughput=1的时候对actor来说比较“公平”,这样actor能平均的执行。
由此得出几个结论:
[*]可以调解线程池的大小进行调优
[*]具体的dispatcher实现该如何执行actor。 比如我们可以实现一个优先级队列来执行优先级比较高的Actor。
[*]如果一个Actor执行比较耗时的操作,比如IO操作,就会影响线程池的执行,造成整体吞吐率下降。所以为这些耗时的Actor配置专门的线程池
[*]Akka会中断一个Actor而去执行别的actor吗,然后回来继续执行先前的Actor? 答案是不会。 因为一旦Actor交给线程池,线程就会去执行它。 如果你在Actor中sleep线程,会导致线程池中的此线程sleep。 所以你必须想一些办法,比如一个长的业务逻辑分成几个业务逻辑,每次只执行一个业务逻辑,通过状态变换分成多次执行。
[*]如果没有其它情况,线程会执行完actor 邮箱中的一部分消息,如果还有消息,会将此邮箱再放入线程池等待执行,直到没有待处理的消息为止。
参考资料
[*]Dispatchers
[*]tuning dispatchers in akka applications
[*]https://www.zybuluo.com/MiloXia/note/80283
[*]https://github.com/akka/akka/tree/master/akka-actor/src/main/scala/akka/dispatch
[*]http://www.gtan.com/akka_doc/scala/dispatchers.html
文章来源:鸟窝
赞!
页:
[1]