Scala 编写库:Akka框架基本要点介绍
问题导读
1.Akka是什么模型?
2.ActorSystem是什么?
3.Akka Cluster是否存在单点故障?
static/image/hrline/4.gif
Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。本文基本上是基于Akka的官方文档(版本是2.3.12),通过自己的理解,来阐述Akka提供的一些组件或概念,另外总结了Akka的一些使用场景。Actor维基百科这样定义Actor模型:在计算科学领域,Actor模型是一个并行计算(Concurrent Computation)模型,它把actor作为并行计算的基本元素来对待:为响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。
通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性:
[*]提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发
[*]提供了异步非阻塞的、高性能的事件驱动编程模型
[*]超级轻量级事件处理(每GB堆内存几百万Actor)
实现一个Actor,可以继承特质akka.actor.Actor,实现一个receive方法,应该在receive方法中定义一系列的case语句,基于标准Scala的模式匹配方法,来实现每一种消息的处理逻辑。
我们先看一下Akka中特质Actor的定义:
trait Actor {
import Actor._
type Receive = Actor.Receive
implicit val context: ActorContext = {
val contextStack = ActorCell.contextStack.get
if ((contextStack.isEmpty) || (contextStack.head eq null))
throw ActorInitializationException(
s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +
"You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")
val c = contextStack.head
ActorCell.contextStack.set(null :: contextStack)
c
}
implicit final val self = context.self //MUST BE A VAL, TRUST ME
final def sender(): ActorRef = context.sender()
def receive: Actor.Receive // 这个是在子类中一定要实现的抽象方法
protected def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)
protected def aroundPreStart(): Unit = preStart()
protected def aroundPostStop(): Unit = postStop()
protected def aroundPreRestart(reason: Throwable, message: Option): Unit = preRestart(reason, message)
protected def aroundPostRestart(reason: Throwable): Unit = postRestart(reason)
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
@throws(classOf) // when changing this you MUST also change UntypedActorDocTest
def preStart(): Unit = () // 启动Actor之前需要执行的操作,默认为空实现,可以重写该方法
@throws(classOf) // when changing this you MUST also change UntypedActorDocTest
def postStop(): Unit = () // 终止Actor之前需要执行的操作,默认为空实现,可以重写该方法
@throws(classOf) // when changing this you MUST also change UntypedActorDocTest
def preRestart(reason: Throwable, message: Option): Unit = { // 重启Actor之前需要执行的操作,默认终止该Actor所监督的所有子Actor,然后调用postStop()方法,可以重写该方法
context.children foreach { child ⇒
context.unwatch(child)
context.stop(child)
}
postStop()
}
@throws(classOf) // when changing this you MUST also change UntypedActorDocTest
def postRestart(reason: Throwable): Unit = {// 重启Actor之前需要执行的操作,默认执行preStart()的实现逻辑,可以重写该方法
preStart()
}
def unhandled(message: Any): Unit = {
message match {
case Terminated(dead) ⇒ throw new DeathPactException(dead)
case _ ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
}
}
}
上面特质中提供了几个Hook,具体说明可以看代码中注释,我们可以在继承该特质时重写Hook方法,实现自己的处理逻辑。
一个Actor是有生命周期(Lifecycle)的,如下图所示:
通过上图我们可以看到,一除了/system路径下面的Actor外,一个Actor初始时路径为空,调用ActorSystem的actorOf方法创建一个Actor实例,返回一个引用ActorRef,它包括一个UID和一个Path,标识了一个Actor,可以通过该引用向该Actor实例发送消息。ActorSystem在Akka中,一个ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,按照逻辑划分的每个应用对应一个ActorSystem实例。
一个ActorSystem是具有分层结构(Hierarchical Structure)的:一个Actor能够管理(Oversee)某个特定的函数,他可能希望将一个task分解为更小的多个子task,这样它就需要创建多个子Actor(Child Actors),并监督这些子Actor处理任务的进度等详细情况,实际上这个Actor创建了一个Supervisor来监督管理子Actor执行拆分后的多个子task,如果一个子Actor执行子task失败,那么就要向Supervisor发送一个消息说明处理子task失败。需要知道的是,一个Actor能且仅能有一个Supervisor,就是创建它的那个Actor。基于被监控任务的性质和失败的性质,一个Supervisor可以选择执行如下操作选择:
[*]重新开始(Resume)一个子Actor,保持它内部的状态
[*]重启一个子Actor,清除它内部的状态
[*]终止一个子Actor
[*]扩大失败的影响,从而使这个子Actor失败
将一个Actor以一个监督层次结构视图来看是非常重要的,因为它诠释了上面第4种操作选择的存在性,而且对前3种操作选择也有影响:重新开始(Resume)一个Actor,则该Actor的所有子Actor都继续工作;重启一个Actor,则该Actor的所有子Actor都被重新启动;终止一个Actor,则该Actor的所有子Actor都被终止。另外,一个Actor的preRestart方法的默认行为是终止所有子Actor,如果我们不想这样,可以在继承Actor的实现中重写preRestart方法的逻辑。
一个ActorSystem在创建过程中,至少启动3个Actor,如下图所示:
上图是一个类似树状层次结构,ActorSystem的Top-Level层次结构,与Actor关联起来,称为Actor路径(Actor Path),不同的路径代表了不同的监督范围(Supervision Scope)。下面说明ActorSystem的监督范围:
[*]“/”路径:通过根路径可以搜索到所有的Actor
[*]“/user”路径:用户创建的Top-Level Actor在该路径下面,通过调用ActorSystem.actorOf来实现Actor的创建
[*]“/system”路径:系统创建的Top-Level Actor在该路径下面
[*]“/deadLetters”路径:消息被发送到已经终止,或者不存在的Actor,这些Actor都在该路径下面
[*]“/temp”路径:被系统临时创建的Actor在该路径下面
[*]“/remote”路径:改路径下存在的Actor,它们的Supervisor都是远程Actor的引用
TypedActorTypedActor是Akka基于Active对象(Active Object)设计模式的一个实现,关于Active对象模式,可以看维基百科的定义:
Active对象模式解耦了在一个对象上执行方法和调用方法的逻辑,执行方法和调用方法分别在各自的线程执行上下文中。该模式的目标是通过使用异步方法调用和一个调度器来处理请求,从而实现并行计算处理,该模式由6个元素组成:
[*]一个Proxy对象,提供一个面向客户端的接口和一组公共的方法
[*]一个接口,定义了请求一个Active对象上的方法的集合
[*]一个来自客户端请求的列表
[*]一个调度器,确定下一次处理哪一个请求
[*]Active对象上方法的实现
[*]一个回掉或者变量,供客户端接收请求被处理后的结果
通过前面对Actor的了解,我们知道Actor更适用于在Akka的Actor系统之间来实现并行计算处理,而TypedActor适用于桥接Actor系统和非Actor系统。TypedActor是基于JDK的Proxy来实现的,与Actor不同的是,Actor一次处理一个消息,而TypedActor一次处理一个调用(Call)。关于更多关于TypedActor,可以查看Akka文档。ClusterAkka Cluster提供了一个容错(Fault-Tolerant)、去中心化(Decentralized)、基于P2P的集群服务,而且不会出现单点故障(SPOF, Single Point Of Failure)。Akka基于Gossip实现集群服务,而且支持服务自动失败检测。
关于Gossip协议的说明,维基百科说明如下所示:Gossip协议是点对点(Computer-to-Computer)通信协议的一种,它受社交网络种的流言传播的特点所启发。现在分布式系统常常使用Gossip协议来解决其他方式所无法解决的问题,或者是由于底层网络的超大特殊结构,或者是因为Gossip方案是解决这类问题最有效的一种方式。一个Akka集群由一组成员节点组成,每个成员节点通过hostname:port:uid来唯一标识,并且每个成员节点之间是解耦合的(Decoupled)。一个Akka应用程序是一个分布式应用程序,它具有一个Actor的集合S,而每个节点上可以启动这个Akka应用S的集合的的一部分Actor,而不必是全集S。如果一个新的成员节点需要加入到Akka集群,只需要在集群中任意一个成员节点上执行Join命令即可。
Akka集群中各个成员节点之间的状态关系,如下图所示:
Akka集群中任何一个成员节点都有可能成为集群的Leader,这是基于Gossip收敛(Convergence)过程得到的确定性结果,没有经过选举的过程。Leader只是一种角色,在各轮Gossip收敛过程中Leader是不断变化的。Leader的职责是使成员节点进入/离开集群。
一个成员节点开始于joining状态,一旦所有其节点都看到了该新加入Akka集群的节点,则Leader会设置这个节点的状态为up。
如果一个节点安全离开Akka集群,可预期地它的状态会变为leaving状态,当Leader看到该节点为leaving状态,会将其状态修改为exiting,然后当所有节点看到该节点状态为exiting,则Leader将该节点移除,状态修改为removed状态。
如果一个节点处于unreachable状态,基于Gossip协议Leader是无法执行任何操作收敛(Convergence)到该节点的,所以unreachable状态的节点的状态是必须被改变的,它必须变成reachable状态或者down状态。如果该节点想再次加入到Akka集群,它必须需要重新启动,并且重新加入集群(经由joining状态)。RemotingAkka Remoting的设计目标是基于P2P风格的网络通信,所以它存在如下限制:
[*]不支持NAT(Network Address Translation)
[*]不支持负载均衡器(Load Balancers)
Akka提供了种方式来使用Remoting功能:
[*]通过调用actorSelection方法搜索一个actor,该方法输入的参数的模式为:akka.<protocol>://<actor system>@<hostname>:<port>/<actor path>
[*]通过actorOf方法创建一个actor
下面看一下Remoting系统中故障恢复模型(Failure Recovery Model),如下图所示:
上图中,连接到一个远程系统的过程中,包括上面4种状态:在进行任何通信之前,系统处于Idle状态;当第一次一个消息尝试向远程系统发送,或者当远程系统连接过来,这时系统状态变为Active;当两个系统通信失败,连接丢失,这时系统变为Gated状态;当系统通信过程中,由于参与通信的系统的状态不一致导致系统无法恢复,这时远程系统变为Quarantined状态,只有重新启动系统才能恢复,重启后系统变为Active状态。PersistenceAkka的持久性能够使得有状态的Actor实例保存它的内部状态,在Actor重启后能够更快的进行恢复。需要强调的是,持久化的仅仅是Actor的内部状态,而不是Actor当前的状态,Actor内部状态的变化会被一追加的方式存到到指定的存储中,一旦追加完成存储状态,这些数据就不会被更新。有状态的Actor通过重放(Replay)持久化的状态来快速恢复,重建内部状态。
Akka Persistence的架构有如下几个要点:
[*]PersistentActor
它是一个持久的、有状态的Actor,能够将持久化消息到一个日志系统中。当一个PersistentActor重启的时候,它能够重放记录到日志系统中的消息,从而基于这些消息来恢复一个Actor的内部状态。
[*]PersistentView
持久化视图是一个持久的有状态的Actor,它接收被记录到一个PersistentActor中的消息,但是它本身并不记录消息到日志系统,而是通过复制一个PersistentActor的消息流,来更新自己内部状态。
[*]AtLeastOnceDelivery
提供了一个消息至少传递一次(At-Lease-Once)的语义,在发送者和接收者所在的JVM崩溃的时候,将消息传递到目的地。
[*]Journal
一个日志系统存储发送给一个PersistentActor的消息序列,可以在应用程序中控制是否一个PersistentActor将消息序列记录到日志中。日志系统是支持插件式的,默认情况下,消息被记录到本地文件系统中。Akka CamelAkka提供了一个模块,能够与Apache Camel整合。Apache Camel是一个实现了EIP(Enterprise Integration Patterns)的整合框架,支持通过各种各样的协议进行消息交换。所以Akka的Actor可以通过Scala或Java API与其它系统进行通信,协议比如HTTP、SOAP、TCP、FTP、SMTP、JMS。Akka适用场景Akka适用场景非常广泛,这里根据一些已有的使用案例来总结一下,Akka能够在哪些应用场景下投入生产环境:
[*]事务处理(Transaction Processing)
在线游戏系统、金融/银行系统、交易系统、投注系统、社交媒体系统、电信服务系统。
[*]后端服务(Service Backend)
任何行业的任何类型的应用都可以使用,比如提供REST、SOAP等风格的服务,类似于一个服务总线,Akka支持纵向&横向扩展,以及容错/高可用(HA)的特性。
[*]并行计算(Concurrency/Parallelism)
任何具有并发/并行计算需求的行业,基于JVM的应用都可以使用,如使用编程语言Scala、Java、Groovy、JRuby开发。
[*]仿真
Master/Slave架构风格的计算系统、计算网格系统、MapReduce系统。
[*]通信Hub(Communications Hub)
电信系统、Web媒体系统、手机媒体系统。
[*]复杂事件流处理(Complex Event Stream Processing)
Akka本身提供的Actor就适合处理基于事件驱动的应用,所以可以更加容易处理具有复杂事件流的应用。其它特性Akka还支持很多其它特性,如下所示:
[*]支持Future,可以同步或异步地获取发送消息的结果
[*]支持基于事件的Dispatcher,将多个Actor与一个线程池绑定
[*]支持消息路由,可以提供不同的消息路由策略,如Akka支持如下策略:RoundRobinRoutingLogic、RandomRoutingLogic、SmallestMailboxRoutingLogic、BroadcastRoutingLogic、ScatterGatherFirstCompletedRoutingLogic、TailChoppingRoutingLogic、ConsistentHashingRoutingLogic
[*]支持FSM,提供基于事件的状态转移
good{:soso_e100:}{:soso_e100:}
页:
[1]