6. Akka 的 Actor 实现
● Actor 是非常轻量的计算单元
● 5000 万 / 秒消息转发能力(单机、单核、本地)
● 250 万 Actors / GB 内存(每个空 Actor 约 400 多字节)
● Actor 位置透明,本身即具分布能力
● 按地址创建和查找 - 本地或远程节点
● 访问本地或远程节点仅在于地址 (Path) 不同
● 可以跨节点迁移
● Actor 是按层级实现督导 (supervision) 的
● Actor 按树状组织成层级
● 父 Actor 监控子 Actor 的状态,可以在出状况时停止、重启、恢复它。
7. Akka 2.3.X (3 月 5 日发布 )
● 分片集群 (Sharding Cluster) – Entity Actors
● 按 Entity 的 ID 分片,按需自动在相应的节点创建
● 消息按 ID 发送,由 Resolver 根据 ID 自动定位到
Actor 所在的 region( 节点中 ) 并由 region 发送给
Actor
● 持久化 (Persistence) – 状态快照或事件历史
● LevelDB (开发、测试)
● HBase
8. 分片 - IdExtractor / ShardResolver
type EntryId = String
type ShardId = String
type Msg = Any
type IdExtractor = PartialFunction[Msg, (EntryId, Msg)]
type ShardResolver = Msg => ShardId
9. 分片 - IdExtractor / shardResolver
sealed trait Command extends Msg with Serializable {
def sessionId: String
}
// cluster 按 sessionId 与 actor 一一对应,按需即时创建或定位转发
lazy val idExtractor: ShardRegion.IdExtractor = {
case cmd: Command => (cmd.sessionId, cmd)
}
// cluster 依据 sessionId ,按一定规则,将 actor 分片到 Region
// 比如 100 个 regions , cluster 会在每个节点分配若干个 Regions
lazy val shardResolver: ShardRegion.ShardResolver = {
case cmd: Command =>
(math.abs(cmd.sessionId.hashCode) % 100).toString
}
10. 分片 – 带 EntryId 的消息
sealed trait Command extends Msg with Serializable {
def sessionId: String
}
case class CreateSession(sessionId: String) extends Command
case class Connecting(sessionId: String, query: Uri.Query, origins: Seq[HttpOrigin],
transportConn: ActorRef, transport: Transport) extends Command
// called by connection
case class OnFrame(sessionId: String, frame: TextFrame) extends Command
// called by business logic
case class SendMessage(sessionId: String, endpoint: String, msg: String) extends Command
case class SendJson(sessionId: String, endpoint: String, json: String) extends Command
11. 持久化 – 会改变状态的消息
sealed trait Event extends Serializable
case class Connected(sessionId: String, query: Uri.Query,
origins: Seq[HttpOrigin],
transportConn: ActorRef,
transport: Transport) extends Event
case class UpdatePackets(packets: Seq[Packet]) extends Event
12. 持久化 – persist / recover
class ClusterConnectionActive(val namespaceMediator: ActorRef,
val broadcastMediator: ActorRef) extends
ConnectionActive with EventsourcedProcessor {
override def receiveRecover: Receive = {
case event: Event => updateState(event) // 重演持久化的消息历史以恢复状态
}
// 只持久化会改变状态的消息
override def receiveCommand: Receive = {
case connected: Connected =>
persist(connected)(updateState(_))
case packets: UpdatePackets =>
persist(packets)(updateState(_))
case _ => // 处理其它消息
}
def updateState(event: Event) = {
event match {
case x: Connected =>
connectionContext.foreach(_.bindTransport(x.transport))
case x: UpdatePackets =>
pendingPackets = immutable.Queue(x.packets: _*)
}
}