分享

akka actor的运行原理

水电费 2015-6-25 20:11:17 发表于 原理型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 44726
问题导读
1.如何配置Dispatcher?
2.Dispatcher的工作原理是什么?  
                              









序言

最近在调研Scala web框架的性能时遇到一些问题, 比如生成巨多的Actor,GC时间过长,CPU使用率太高, 执行Actor的Receive是遇到耗时操作的问题等。怀疑Akka的调度器有些问题,特意整理了一些Akka调度器的背景知识,以及从源代码分析一下Actor是怎么执行地。


Dispatcher

Akka MessageDispatcher驱动Akka actor运行(tick),也可以说是这个机器的引擎。所有的MessageDispatcher都实现了ExecutionContext trait, 这意味着它们可以用来执行任何代码, 例如 Future.
如果对Actor不做额外配置的话,ActorSystem会使用一个缺省的Dispatcher。缺省的Dispatcher也可以进行参数调整,缺省它使用一个特定的default-executor。如果ActorSystem在创建时传入一个ExecutionContext,则此ExecutionContext 将作为此ActorSystem的所有Dispatcher的缺省executor。缺省的default-executor是fork-join-executor,在大部分情况下它的性能还是不错的。

可以通过下面的代码得到一个配置的Dispatcher:

[mw_shl_code=shell,true]
// for use with Futures, Scheduler, etc.
implicit val executionContext = system.dispatchers.lookup("my-dispatcher")[/mw_shl_code]

为Actor设置Dispatcher

如果你希望为你的 Actor 设置非缺省的派发器,你需要做两件事:
首先要配置dispatcher:

[mw_shl_code=shell,true]
my-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "fork-join-executor"
  # Configuration for the fork join pool
  fork-join-executor {
    # Min number of threads to cap factor-based parallelism number to
    parallelism-min = 2
    # Parallelism (threads) ... ceil(available processors * factor)
    parallelism-factor = 2.0
    # Max number of threads to cap factor-based parallelism number to
    parallelism-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}[/mw_shl_code]


或者配置使用thread-pool-executor


[mw_shl_code=shell,true]my-thread-pool-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "thread-pool-executor"
  # Configuration for the thread pool
  thread-pool-executor {
    # minimum number of threads to cap factor-based core number to
    core-pool-size-min = 2
    # No of core threads ... ceil(available processors * factor)
    core-pool-size-factor = 2.0
    # maximum number of threads to cap factor-based number to
    core-pool-size-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}[/mw_shl_code]

这样,你就可以配置Actor使用图个特定的disptacher:

[mw_shl_code=scala,true]
import akka.actor.Props
val myActor = context.actorOf(Props[MyActor], "myactor")[/mw_shl_code]

[mw_shl_code=scala,true]
akka.actor.deployment {
  /myactor {
    dispatcher = my-dispatcher
  }
}[/mw_shl_code]


或者用另外一种方式,可以在代码中指定dispatcher:

[mw_shl_code=shell,true]
import akka.actor.Props
val myActor =
  context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")[/mw_shl_code]

dispatcher的类型

  • Dispatcher
    • 可共享性: 无限制
    • 邮箱: 任何一种类型,为每一个Actor创建一个
    • 使用场景: 缺省派发器,Bulkheading
    • 底层使用: java.util.concurrent.ExecutorService
         可以指定“executor”使用“fork-join-executor”, “thread-pool-executor” 或者 the FQCN(类名的全称) of an akka.dispatcher.ExecutorServiceConfigurator
  • PinnedDispatcher
    • 可共享性: 无
    • 邮箱: 任何一种类型,为每个Actor创建一个
    • 使用场景: Bulkheading
    • 底层使用: 任何 akka.dispatch.ThreadPoolExecutorConfigurator
         缺省为一个 “thread-pool-executor”
  • BalancingDispatcher
    • 可共享性: 仅对同一类型的Actor共享
    • 邮箱: 任何,为所有的Actor创建一个
    • 使用场景: Work-sharing
    • 底层使用: java.util.concurrent.ExecutorService
         指定使用 “executor” 使用 “fork-join-executor”, “thread-pool-executor” 或 the FQCN(类名的全称) of an akka.dispatcher.ExecutorServiceConfigurator
  • CallingThreadDispatcher
    • 可共享性: 无限制
    • 邮箱: 任何,每Actor每线程创建一个(需要时)
    • 使用场景: 仅为测试使用
    • 底层使用: 调用的线程 (duh)
邮箱

kka Mailbox 保存发往某 Actor的消息. 通常每个 Actor 拥有自己的邮箱, 但是如果是使用 BalancingDispatcher 使用同一个 BalancingDispatcher 的所有Actor共享同一个邮箱实例.
内置的邮箱的类型:
  • UnboundedMailbox - 缺省邮箱
  • SingleConsumerOnlyUnboundedMailbox
  • BoundedMailbox
  • NonBlockingBoundedMailbox
  • UnboundedPriorityMailbox
  • BoundedPriorityMailbox
  • UnboundedStablePriorityMailbox
  • BoundedStablePriorityMailbox
  • UnboundedControlAwareMailbox
  • BoundedControlAwareMailbox
工作原理

我们只看本地(同一个JVM进程)的ActRef: LocalActorRef,它定义了send (!)方法:
[mw_shl_code=shell,true]
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.sendMessage(message, sender)[/mw_shl_code]



actorCell实现了akka.actor.dungeon.Dispatch trait。它实现了具体的message的发送:


[mw_shl_code=scala,true]def sendMessage(msg: Envelope): Unit =
  try {
    if (system.settings.SerializeAllMessages) {
      val unwrapped = (msg.message match {
        case DeadLetter(wrapped, _, _) ⇒ wrapped
        case other                     ⇒ other
      }).asInstanceOf[AnyRef]
      if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) {
        val s = SerializationExtension(system)
        s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get
      }
    }
    dispatcher.dispatch(this, msg)
  } catch handleException[/mw_shl_code]

可以看到,还是交给dispatcher.dispatch进行消息的分发。

看具体的实现类Dispatcher.dispatch:

[mw_shl_code=scala,true]
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
  val mbox = receiver.mailbox
  mbox.enqueue(receiver.self, invocation)
  registerForExecution(mbox, true, false)
}[/mw_shl_code]


将消息放在对应的actor的邮箱中后,就会调用registerForExecution方法。

这个方法最重要的一行就是执行mbox,因为mbox实现了ForkJoinTask和Runnable接口。 (如果执行失败,还可能执行一次)

[mw_shl_code=shell,true]
executorService execute mbox[/mw_shl_code]


其实是将mbox放到线程池中执行。
mbox并不是一次全部执行完的,而是有throughput参数确定。每次只执行throughput个消息,执行完会加入到线程池等待队列中,除非全部执行完毕。
因此当throughput=1的时候对actor来说比较“公平”,这样actor能平均的执行。

由此得出几个结论:
  • 可以调解线程池的大小进行调优
  • 具体的dispatcher实现该如何执行actor。 比如我们可以实现一个优先级队列来执行优先级比较高的Actor。
  • 如果一个Actor执行比较耗时的操作,比如IO操作,就会影响线程池的执行,造成整体吞吐率下降。所以为这些耗时的Actor配置专门的线程池
  • Akka会中断一个Actor而去执行别的actor吗,然后回来继续执行先前的Actor? 答案是不会。 因为一旦Actor交给线程池,线程就会去执行它。 如果你在Actor中sleep线程,会导致线程池中的此线程sleep。 所以你必须想一些办法,比如一个长的业务逻辑分成几个业务逻辑,每次只执行一个业务逻辑,通过状态变换分成多次执行。
  • 如果没有其它情况,线程会执行完actor 邮箱中的一部分消息,如果还有消息,会将此邮箱再放入线程池等待执行,直到没有待处理的消息为止。


参考资料


文章来源:鸟窝

         

已有(1)人评论

跳转到指定楼层
小南3707 发表于 2015-6-26 09:13:15
赞!   
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条