身的意思

by admin on 2018年10月20日

  以上篇讨论里我们要介绍了Akka-Cluster的基本原理。同时我们啊认可了几只以Akka-Cluster的最主要:首先,Akka-Cluster集群构建和Actor编程没有一直的关联。集群构建是ActorSystem层面达到的,可以是纯粹的部署和配备行为;分布式Actor程序编程实现了Actor消息地址之透明化,无须考虑对象运行环境是否分布式的,可以按照常规的Actor编程模式展开。

野马的人命意义

 
既然分布式的Actor编程无须特别对集群环境,那么摆在我们眼前的就算是大半个可以直接用的运算环境(集群节点)了,现在我们的分布式编程方式应主要聚焦于争充分行使这些遍布之演算环境,即:如何管程序合理分配至各级集群节点以达到最美的演算输出效率。我们得经人工方式发出目的为集群节点分配负载,或者通过某种算法来机关分配就如前议论过的Routing那样。

Wild horse

我们先是示范怎么手工进行集群的负载分配:目的非常粗略:把不同之机能分配为不同之集群节点去运算。先随运算功能将集群节点分类:我们得经设定节点角色role来落实。然后要平等近似节点(可能是多只节点)作为前端负责承接任务,然后因职责类来分配。负责具体运算的节点统称后台节点(backend
nodes),负责接收和分配任务的节点统称前端节点(frontend
nodes)。在编程过程中绝无仅有用考虑集群环境的局部即是前者节点需要理解处在所有后台节点上运算Actor的具体地址,即ActorRef。这点要后台节点Node在加人集群时为前者负责分配任务的Actor报备自己的ActorRef。前端Actor是经过一个后台报备的ActorRef清单来分配任务的。假如我们用加减乘除模拟四宗不同的运算功能,用手工形式拿各个一样件功能布局到一个集群节点上,然后据此一个前端节点按效益分配运算任务。下面是持有节点共享的信类型:

奇迹见到同样首文章

package clusterloadbalance.messages

object Messages {
  sealed trait MathOps
  case class Add(x: Int, y: Int) extends MathOps
  case class Sub(x: Int, y: Int) extends MathOps
  case class Mul(x: Int, y: Int) extends MathOps
  case class Div(x: Int, y: Int) extends MathOps

  sealed trait ClusterMsg
  case class RegisterBackendActor(role: String) extends ClusterMsg

}

《塞布尔岛之弃马》

肩负运算的后台定义如下:

塞布尔岛居

package clusterloadbalance.backend

import akka.actor._
import clusterloadbalance.messages.Messages._

import scala.concurrent.duration._
import akka.cluster._
import akka.cluster.ClusterEvent._
import com.typesafe.config.ConfigFactory


object CalcFuctions {
  def propsFuncs = Props(new CalcFuctions)
  def propsSuper(role: String) = Props(new CalculatorSupervisor(role))
}

class CalcFuctions extends Actor {
  override def receive: Receive = {
    case Add(x,y) =>
      println(s"$x + $y carried out by ${self} with result=${x+y}")
    case Sub(x,y) =>
      println(s"$x - $y carried out by ${self} with result=${x - y}")
    case Mul(x,y) =>
      println(s"$x * $y carried out by ${self} with result=${x * y}")
    case Div(x,y) =>
        println(s"$x / $y carried out by ${self} with result=${x / y}")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    println(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)
  }
}

class CalculatorSupervisor(mathOps: String) extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
      decider.orElse(SupervisorStrategy.defaultDecider)
    }

  val calcActor = context.actorOf(CalcFuctions.propsFuncs,"calcFunction")
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self,classOf[MemberUp])
    super.preStart()
  }

  override def postStop(): Unit =
    cluster.unsubscribe(self)
    super.postStop()

  override def receive: Receive = {
    case MemberUp(m) =>
      if (m.hasRole("frontend")) {
        context.actorSelection(RootActorPath(m.address)+"/user/frontend") !
          RegisterBackendActor(mathOps)
      }
    case msg@ _ => calcActor.forward(msg)
  }

}

object Calculator {
  def create(role: String): Unit = {   //create instances of backend Calculator
    val config = ConfigFactory.parseString("Backend.akka.cluster.roles = [\""+role+"\"]")
      .withFallback(ConfigFactory.load()).getConfig("Backend")
    val calcSystem = ActorSystem("calcClusterSystem",config)
    val calcRef = calcSystem.actorOf(CalcFuctions.propsSuper(role),"calculator")
  }
}

北纬43.933度,西经60.007度。

顾,这个运算Actor是带监管的,可以自动处理非常。

自17世纪以来,

为有利于表达,我将拥有机能还集中在CalcFunctions一个Actor里。在骨子里情形下应当分为四只例外之Actor,因为其会给布置至不同之集群节点上。另外,增加了一个supervisor来监管CalcFunctions。这个supervisor也是布在四个节点上独家监管仍节点上的子级Actor。现在备集群节点内的功力交流都见面由此是supervisor来拓展了,所以需要往前端登记ActorRef和现实性负责之效果role。下面是前台程序定义:

这边产生了至少475不善沉船事故 。

package clusterloadbalance.frontend

import akka.actor._
import clusterloadbalance.messages.Messages._
import com.typesafe.config.ConfigFactory


object CalcRouter {
  def props = Props(new CalcRouter)
}

class CalcRouter extends Actor {
  var nodes: Map[String,ActorRef] = Map()
  override def receive: Receive = {
    case RegisterBackendActor(role) =>
      nodes += (role -> sender())
      context.watch(sender())
    case add: Add => routeCommand("adder", add)
    case sub: Sub => routeCommand("substractor",sub)
    case mul: Mul => routeCommand("multiplier",mul)
    case div: Div => routeCommand("divider",div)

    case Terminated(ref) =>    //remove from register
      nodes = nodes.filter { case (_,r) => r != ref}

  }
  def routeCommand(role: String, ops: MathOps): Unit = {
    nodes.get(role) match {
      case Some(ref) => ref ! ops
      case None =>
        println(s"$role not registered!")
    }
  }
}

object FrontEnd {
  private var router: ActorRef = _
  def create = {  //must load this seed-node before any backends
    val calcSystem = ActorSystem("calcClusterSystem",ConfigFactory.load().getConfig("Frontend"))
    router = calcSystem.actorOf(CalcRouter.props,"frontend")
  }
  def getRouter = router
}

她于人们称作

惟有来一个前台当功能分配。这是单seed-node,必须先行启动,然后另外后台节点启动时才会拓展后台登记。

“大西洋墓地”。

咱俩为此底的代码来测试者序。由于Akka的集群节点是由于不同之ActorSystem实例代表的,所以于斯例子里从未必要采取多重叠项目结构:

岛上的野马何时开始发生,无人而考,

package clusterLoadBalance.demo
import clusterloadbalance.messages.Messages._
import clusterloadbalance.backend.Calculator
import clusterloadbalance.frontend.FrontEnd

object LoadBalancerDemo extends App {
  FrontEnd.create

  Calculator.create("adder")

  Calculator.create("substractor")

  Calculator.create("multiplier")

  Calculator.create("divider")

  Thread.sleep(2000)

  val router = FrontEnd.getRouter

  router ! Add(10,3)
  router ! Mul(3,7)
  router ! Div(8,2)
  router ! Sub(45, 3)
  router ! Div(8,0)

}

但极具有可能的是这岛屿上成群野马

接下来,resources/application.conf是这样定义之: 

凡是群沉船悲剧的恩赐。

Frontend {
  akka {
    actor {
      provider = "cluster"
    }
    remote {
      log-remote-lifecycle-events = off
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2551
      }
    }

    cluster {
      roles = [frontend]
      seed-nodes = [
        "akka.tcp://calcClusterSystem@127.0.0.1:2551"]

      auto-down-unreachable-after = 10s
    }
  }
}

Backend {
  akka{
    actor {
      provider = "cluster"
    }
    remote {
      log-remote-lifecycle-events = off
      netty.tcp {
        hostname = "127.0.0.1"
        port = 0
      }
    }

    cluster {
      roles = [backend]
      seed-nodes = [
        "akka.tcp://calcClusterSystem@127.0.0.1:2551"]

      auto-down-unreachable-after = 10s
    }
  }
}

罗马尼亚摄影师杜特斯科,被这个故事吸引,

运算结果:

连沉迷于拍摄野马的主题。

...
3 * 7 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1800460396] with result=21
10 + 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#1046049287] with result=13
8 / 2 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-2008880936] with result=4
45 - 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#367851617] with result=42
[WARN] [06/30/2017 09:12:39.465] [calcClusterSystem-akka.actor.default-dispatcher-20] [akka://calcClusterSystem/user/calculator/calcFunction] / by zero

野性、凌乱、眼神纯净、充满灵性,

结果显示不同之意义于不同之节点运算,而且ArithmeticException并没造成Restart,说明SupervisorStrategy发挥了意向。 

立刻是如出一辙栽纯净的轻易,

我们呢得以就此类似前面介绍的Routing方式来分配负载,原理还是经过前端的Router把任务分配到后台多独Routee上去运算。不同之是这些Routee可以在不同的集群节点上。原则及摆,具体任务分配是透过某种Routing算法自动安排到Routees上去的。但是,我们还是可以用ConsistentHashing-Router模式实现目标Routees的杂交。上面例子的加减乘除操作类型可以成为这种Router模式的分类键(Key)。Router还是分Router-Group和Router-Pool两栽:Router-Pool更自动化些,它的Routees是由于Router构建及配置的,Router-Group的Router则是经路径查找方法连接已构建和部署好了之Routees的。如果我们想发生针对性的管任务分配到指定集群节点上的Routee,必须首先构建和安排Routees后再次就此Router-Group方式连接Routees。还是用点的例子,使用ConsistentHashing-Router模式。首先调整一下消息类型:

不用认可,无需围观,无需惊叹和羡慕。

package clusterrouter.messages
import akka.routing.ConsistentHashingRouter._
object Messages {
  class MathOps(hashKey: String) extends Serializable with ConsistentHashable {
    override def consistentHashKey: Any = hashKey
  }
  case class Add(x: Int, y: Int) extends MathOps("adder")
  case class Sub(x: Int, y: Int) extends MathOps("substractor")
  case class Mul(x: Int, y: Int) extends MathOps("multiplier")
  case class Div(x: Int, y: Int) extends MathOps("divider")

}

到这个略带岛屿,

留神现行Router是于跨越系统的集群环境下,消息类型必须实现Serializable。构建Calculator并且做到布局:

恐怕不过本最好惨痛极便利的章程就是是海事。

package clusterrouter.backend

import akka.actor._
import clusterrouter.messages.Messages._

import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory


object CalcFuctions {
  def propsFuncs = Props(new CalcFuctions)
  def propsSuper = Props(new CalculatorSupervisor)
}

class CalcFuctions extends Actor {
  override def receive: Receive = {
    case Add(x,y) =>
      println(s"$x + $y carried out by ${self} with result=${x+y}")
    case Sub(x,y) =>
      println(s"$x - $y carried out by ${self} with result=${x - y}")
    case Mul(x,y) =>
      println(s"$x * $y carried out by ${self} with result=${x * y}")
    case Div(x,y) =>
        println(s"$x / $y carried out by ${self} with result=${x / y}")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    println(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)
  }
}

class CalculatorSupervisor extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
      decider.orElse(SupervisorStrategy.defaultDecider)
    }

  val calcActor = context.actorOf(CalcFuctions.propsFuncs,"calcFunction")

  override def receive: Receive = {
    case msg@ _ => calcActor.forward(msg)
  }

}


object Calculator {
  def create(port: Int): Unit = {   //create instances of backend Calculator
    val config = ConfigFactory.parseString("akka.cluster.roles = [backend]")
        .withFallback(ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port"))
      .withFallback(ConfigFactory.load())
    val calcSystem = ActorSystem("calcClusterSystem",config)
    val calcRef = calcSystem.actorOf(CalcFuctions.propsSuper,"calculator")
  }
}

口的一生为是寻觅自己生命之义,

据此create函数构建后台节点后再次以上述构建部署一个CalculatorSupervisor。

或许得平等会“海难”,让您取得平等种植恩赐,

frontend定义如下:

见报上同样座小岛屿,成为同郎才女貌“野马”。

package clusterrouter.frontend

import akka.actor._
import akka.routing.FromConfig
import com.typesafe.config.ConfigFactory


object CalcRouter {
  def props = Props(new CalcRouter)
}

class CalcRouter extends Actor {

  // This router is used both with lookup and deploy of routees. If you
  // have a router with only lookup of routees you can use Props.empty
  // instead of Props[CalculatorSupervisor].
  val calcRouter = context.actorOf(FromConfig.props(Props.empty),
    name = "calcRouter")

  override def receive: Receive = {
    case msg@ _ => calcRouter forward msg
  }
}

object FrontEnd {
  private var router: ActorRef = _
  def create = {  //must load this seed-node before any backends
    val calcSystem = ActorSystem("calcClusterSystem",ConfigFactory.load("hashing"))
    router = calcSystem.actorOf(CalcRouter.props,"frontend")
  }
  def getRouter = router
}

人数毕竟在创立和谐的义,

顾calcRouter构建,用了FromConfig.props(Props.empty),代表单独进行Routees查找,无需配备。hashing.conf文件如下:

若果你已开立了,生命的起或任光在于物质了,

include "application"
akka.cluster.roles = [frontend]
akka.actor.deployment {
  /frontend/calcRouter {
    router = consistent-hashing-group
    routees.paths = ["/user/calculator"]
    cluster {
      enabled = on
      allow-local-routees = on
      use-role = backend
    }
  }
}

有的是之文学家、艺术家、哲学家论证了人的极度基本意思,

application.conf:

那便是“情”。

akka {
  actor {
    provider = cluster
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://calcClusterSystem@127.0.0.1:2551"]

    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    auto-down-unreachable-after = 10s
  }
}

“情”是创造的源动力,

用脚的代码进行测试:

斯“情”在乎胸,在乎天地中的乃,

package clusterLoadBalance.demo
import clusterrouter.messages.Messages._
import clusterrouter.backend.Calculator
import clusterrouter.frontend.FrontEnd

object LoadBalancerDemo extends App {

  Calculator.create(2551)   //seed-node
  Calculator.create(0)      //backend node
  Calculator.create(0)
  Calculator.create(0)
  Calculator.create(0)
  Calculator.create(0)

  Thread.sleep(2000)

  FrontEnd.create


  Thread.sleep(2000)

  val router = FrontEnd.getRouter

  router ! Add(10,3)
  router ! Mul(3,7)
  router ! Div(8,2)
  router ! Sub(45, 3)
  router ! Div(8,0)

  Thread.sleep(2000)

  router ! Add(10,3)
  router ! Mul(3,7)
  router ! Div(8,2)
  router ! Sub(45, 3)
  router ! Div(8,0)

}

搜不至社会中的“岛”,

复两批次运算后显示: 

您尽管找你内心的“岛”。

8 / 2 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#1669565820] with result=4
3 * 7 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1532934391] with result=21
10 + 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#904088130] with result=13
45 - 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1532934391] with result=42
[WARN] [07/02/2017 17:35:03.381] [calcClusterSystem-akka.actor.default-dispatcher-20] [akka://calcClusterSystem/user/calculator/calcFunction] / by zero
[INFO] [07/02/2017 17:35:05.076] [calcClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://calcClusterSystem)] Cluster Node [akka.tcp://calcClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://calcClusterSystem@127.0.0.1:51304] to [Up]
10 + 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#904088130] with result=13
3 * 7 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1532934391] with result=21
8 / 2 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#1669565820] with result=4
45 - 3 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1532934391] with result=42
[WARN] [07/02/2017 17:35:05.374] [calcClusterSystem-akka.actor.default-dispatcher-17] [akka://calcClusterSystem/user/calculator/calcFunction] / by zero

一样功能的运算被分配至了扳平之节点上,这点可以用Actor的地址证明。

Akka-Cluster提供的Adaptive-Group是一律栽于智能化的全自动Routing模式,它是由此对各集群节点的具体负载情况来分配任务的。用户就待定义adaptive-group的布置,按情况增减集群节点和以不同之集群节点上构建部署Routee都是活动的。Adaptive-Group-Router可以当布置文件被设置:

include "application"

akka.cluster.min-nr-of-members = 3

akka.cluster.role {
  frontend.min-nr-of-members = 1
  backend.min-nr-of-members = 2
}

akka.actor.deployment {
  /frontend/adaptive/calcRouter {
    # Router type provided by metrics extension.
    router = cluster-metrics-adaptive-group
    # Router parameter specific for metrics extension.
    # metrics-selector = heap
    # metrics-selector = load
    # metrics-selector = cpu
    metrics-selector = mix
    #
    nr-of-instances = 100
    routees.paths = ["/user/calculator"]
    cluster {
      enabled = on
      use-role = backend
      allow-local-routees = off
    }
  }
}

后台的运算Actor与方人为分配办法同样不更换。前端Router的构建也从未换,只不过在构建时目标是cluster-metrics-adaptive-group。这个于测试代码中可以看: 

package clusterLoadBalance.demo
import clusterrouter.messages.Messages._
import clusterrouter.backend.Calculator
import clusterrouter.frontend.CalcRouter
import com.typesafe.config.ConfigFactory
import scala.util.Random
import scala.concurrent.duration._
import akka.actor._
import akka.cluster._


class RouterRunner extends Actor {
  val jobs = List(Add,Sub,Mul,Div)
  import context.dispatcher

  val calcRouter = context.actorOf(CalcRouter.props,"adaptive")
  context.system.scheduler.schedule(3.seconds, 3.seconds, self, "DoRandomMath")

  override def receive: Receive = {
    case  _ => calcRouter ! anyMathJob
  }
  def anyMathJob: MathOps =
    jobs(Random.nextInt(4))(Random.nextInt(100), Random.nextInt(100))
}

object AdaptiveRouterDemo extends App {

  Calculator.create(2551)   //seed-node
  Calculator.create(0)      //backend node

  Thread.sleep(2000)


  val config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").
    withFallback(ConfigFactory.load("adaptive"))

  val calcSystem = ActorSystem("calcClusterSystem",config)

  //#registerOnUp
  Cluster(calcSystem) registerOnMemberUp {
    val _ = calcSystem.actorOf(Props[RouterRunner],"frontend")
  }
  //#registerOnUp

  //val _ = calcSystem.actorOf(Props[RouterRunner],"frontend")

}

测试结果显示如下:

...
12 * 9 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=108
27 + 74 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1512057504] with result=101
78 + 37 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1512057504] with result=115
77 * 33 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=2541
32 - 19 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1512057504] with result=13
65 * 46 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=2990
68 - 99 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=-31
97 + 18 carried out by Actor[akka://calcClusterSystem/user/calculator/calcFunction#-1771471734] with result=115
...

运算已经被部署至不同节点去开展了。

 下面是这次座谈示范的源代码:

例1:

build.sbt:

name := "cluster-load-balance"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= {
  val akkaVersion = "2.5.3"
  Seq(
    "com.typesafe.akka"       %%  "akka-actor"   % akkaVersion,
    "com.typesafe.akka"       %%  "akka-cluster"   % akkaVersion
  )

}

application.conf:

Frontend {
  akka {
    actor {
      provider = "cluster"
    }
    remote {
      log-remote-lifecycle-events = off
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2551
      }
    }

    cluster {
      roles = [frontend]
      seed-nodes = [
        "akka.tcp://calcClusterSystem@127.0.0.1:2551"]

      auto-down-unreachable-after = 10s
    }
  }
}

Backend {
  akka{
    actor {
      provider = "cluster"
    }
    remote {
      log-remote-lifecycle-events = off
      netty.tcp {
        hostname = "127.0.0.1"
        port = 0
      }
    }

    cluster {
      roles = [backend]
      seed-nodes = [
        "akka.tcp://calcClusterSystem@127.0.0.1:2551"]

      auto-down-unreachable-after = 10s
    }
  }
}

messages/Messages.scala

package clusterloadbalance.messages

object Messages {
  sealed trait MathOps
  case class Add(x: Int, y: Int) extends MathOps
  case class Sub(x: Int, y: Int) extends MathOps
  case class Mul(x: Int, y: Int) extends MathOps
  case class Div(x: Int, y: Int) extends MathOps

  sealed trait ClusterMsg
  case class RegisterBackendActor(role: String) extends ClusterMsg

}

backend/Calculator.scala:

package clusterloadbalance.backend

import akka.actor._
import clusterloadbalance.messages.Messages._

import scala.concurrent.duration._
import akka.cluster._
import akka.cluster.ClusterEvent._
import com.typesafe.config.ConfigFactory


object CalcFuctions {
  def propsFuncs = Props(new CalcFuctions)
  def propsSuper(role: String) = Props(new CalculatorSupervisor(role))
}

class CalcFuctions extends Actor {
  override def receive: Receive = {
    case Add(x,y) =>
      println(s"$x + $y carried out by ${self} with result=${x+y}")
    case Sub(x,y) =>
      println(s"$x - $y carried out by ${self} with result=${x - y}")
    case Mul(x,y) =>
      println(s"$x * $y carried out by ${self} with result=${x * y}")
    case Div(x,y) =>
        println(s"$x / $y carried out by ${self} with result=${x / y}")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    println(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)
  }
}

class CalculatorSupervisor(mathOps: String) extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
      decider.orElse(SupervisorStrategy.defaultDecider)
    }

  val calcActor = context.actorOf(CalcFuctions.propsFuncs,"calcFunction")
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self,classOf[MemberUp])
    super.preStart()
  }

  override def postStop(): Unit =
    cluster.unsubscribe(self)
    super.postStop()

  override def receive: Receive = {
    case MemberUp(m) =>
      if (m.hasRole("frontend")) {
        context.actorSelection(RootActorPath(m.address)+"/user/frontend") !
          RegisterBackendActor(mathOps)
      }
    case msg@ _ => calcActor.forward(msg)
  }

}


object Calculator {
  def create(role: String): Unit = {   //create instances of backend Calculator
    val config = ConfigFactory.parseString("Backend.akka.cluster.roles = ["+role+"]")
      .withFallback(ConfigFactory.load()).getConfig("Backend")
    val calcSystem = ActorSystem("calcClusterSystem",config)
    val calcRef = calcSystem.actorOf(CalcFuctions.propsSuper(role),"calculator")
  }
}

frontend/FrontEnd.scala:

package clusterloadbalance.frontend

import akka.actor._
import clusterloadbalance.messages.Messages._
import com.typesafe.config.ConfigFactory


object CalcRouter {
  def props = Props(new CalcRouter)
}

class CalcRouter extends Actor {
  var nodes: Map[String,ActorRef] = Map()
  override def receive: Receive = {
    case RegisterBackendActor(role) =>
      nodes += (role -> sender())
      context.watch(sender())
    case add: Add => routeCommand("adder", add)
    case sub: Sub => routeCommand("substractor",sub)
    case mul: Mul => routeCommand("multiplier",mul)
    case div: Div => routeCommand("divider",div)

    case Terminated(ref) =>    //remove from register
      nodes = nodes.filter { case (_,r) => r != ref}

  }
  def routeCommand(role: String, ops: MathOps): Unit = {
    nodes.get(role) match {
      case Some(ref) => ref ! ops
      case None =>
        println(s"$role not registered!")
    }
  }
}

object FrontEnd {
  private var router: ActorRef = _
  def create = {  //must load this seed-node before any backends
    val calcSystem = ActorSystem("calcClusterSystem",ConfigFactory.load().getConfig("Frontend"))
    router = calcSystem.actorOf(CalcRouter.props,"frontend")
  }
  def getRouter = router
}

loadBalanceDemo.scala

package clusterLoadBalance.demo
import clusterloadbalance.messages.Messages._
import clusterloadbalance.backend.Calculator
import clusterloadbalance.frontend.FrontEnd

object LoadBalancerDemo extends App {
  FrontEnd.create

  Calculator.create("adder")

  Calculator.create("substractor")

  Calculator.create("multiplier")

  Calculator.create("divider")

  Thread.sleep(2000)

  val router = FrontEnd.getRouter

  router ! Add(10,3)
  router ! Mul(3,7)
  router ! Div(8,2)
  router ! Sub(45, 3)
  router ! Div(8,0)

}

例2:

build.sbt:

name := "cluster-router-demo"

version := "1.0"

scalaVersion := "2.11.9"

sbtVersion := "0.13.7"

resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"

val akkaVersion = "2.5.3"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-remote" % akkaVersion,
  "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
  "com.typesafe.akka" %% "akka-cluster-metrics" % akkaVersion,
  "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
  "com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion)

application.conf+hashing.conf+adaptive.conf:

akka {
  actor {
    provider = cluster
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://calcClusterSystem@127.0.0.1:2551"]

    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    auto-down-unreachable-after = 10s
  }
}

include "application"
akka.cluster.roles = [frontend]
akka.actor.deployment {
  /frontend/calcRouter {
    router = consistent-hashing-group
    routees.paths = ["/user/calculator"]
    cluster {
      enabled = on
      allow-local-routees = on
      use-role = backend
    }
  }
}

include "application"

akka.cluster.min-nr-of-members = 3

akka.cluster.role {
  frontend.min-nr-of-members = 1
  backend.min-nr-of-members = 2
}

akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters-during-shutdown = off
akka.log-dead-letters = off

akka.actor.deployment {
  /frontend/adaptive/calcRouter {
    # Router type provided by metrics extension.
    router = cluster-metrics-adaptive-group
    # Router parameter specific for metrics extension.
    # metrics-selector = heap
    # metrics-selector = load
    # metrics-selector = cpu
    metrics-selector = mix
    #
    nr-of-instances = 100
    routees.paths = ["/user/calculator"]
    cluster {
      enabled = on
      use-role = backend
      allow-local-routees = off
    }
  }
}

messages/Messages.scala:

package clusterrouter.messages
import akka.routing.ConsistentHashingRouter._
object Messages {
  class MathOps(hashKey: String) extends Serializable with ConsistentHashable {
    override def consistentHashKey: Any = hashKey
  }
  case class Add(x: Int, y: Int) extends MathOps("adder")
  case class Sub(x: Int, y: Int) extends MathOps("substractor")
  case class Mul(x: Int, y: Int) extends MathOps("multiplier")
  case class Div(x: Int, y: Int) extends MathOps("divider")

}

backend/Calculator.scala:

package clusterrouter.backend

import akka.actor._
import clusterrouter.messages.Messages._

import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory


object CalcFuctions {
  def propsFuncs = Props(new CalcFuctions)
  def propsSuper = Props(new CalculatorSupervisor)
}

class CalcFuctions extends Actor {
  override def receive: Receive = {
    case Add(x,y) =>
      println(s"$x + $y carried out by ${self} with result=${x+y}")
    case Sub(x,y) =>
      println(s"$x - $y carried out by ${self} with result=${x - y}")
    case Mul(x,y) =>
      println(s"$x * $y carried out by ${self} with result=${x * y}")
    case Div(x,y) =>
        println(s"$x / $y carried out by ${self} with result=${x / y}")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    println(s"Restarting calculator: ${reason.getMessage}")
    super.preRestart(reason, message)
  }
}

class CalculatorSupervisor extends Actor {
  def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: ArithmeticException => SupervisorStrategy.Resume
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds){
      decider.orElse(SupervisorStrategy.defaultDecider)
    }

  val calcActor = context.actorOf(CalcFuctions.propsFuncs,"calcFunction")

  override def receive: Receive = {
    case msg@ _ => calcActor.forward(msg)
  }

}


object Calculator {
  def create(port: Int): Unit = {   //create instances of backend Calculator
    val config = ConfigFactory.parseString("akka.cluster.roles = [backend]")
        .withFallback(ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port"))
      .withFallback(ConfigFactory.load("adaptive"))
    val calcSystem = ActorSystem("calcClusterSystem",config)
    val calcRef = calcSystem.actorOf(CalcFuctions.propsSuper,"calculator")
  }
}

frontend/FrontEnd.scala:

package clusterrouter.frontend

import akka.actor._
import akka.routing.FromConfig
import com.typesafe.config.ConfigFactory

object CalcRouter {
  def props = Props(new CalcRouter)
}

class CalcRouter extends Actor {

  // This router is used both with lookup and deploy of routees. If you
  // have a router with only lookup of routees you can use Props.empty
  // instead of Props[CalculatorSupervisor].
  val calcRouter = context.actorOf(FromConfig.props(Props.empty),
    name = "calcRouter")

  override def receive: Receive = {
    case msg@ _ => calcRouter forward msg
  }
}

object FrontEnd {
  private var router: ActorRef = _
  def create = {  //must load this seed-node before any backends
    val calcSystem = ActorSystem("calcClusterSystem",ConfigFactory.load("hashing"))
    router = calcSystem.actorOf(CalcRouter.props,"frontend")
  }
  def getRouter = router
}

loadBalancerDemo.scala:

package clusterrouter.demo
import clusterrouter.messages.Messages._
import clusterrouter.backend.Calculator
import clusterrouter.frontend.FrontEnd

object LoadBalancerDemo extends App {

  Calculator.create(2551)   //seed-node
  Calculator.create(0)      //backend node
  Calculator.create(0)
  Calculator.create(0)
  Calculator.create(0)
  Calculator.create(0)

  Thread.sleep(2000)

  FrontEnd.create


  Thread.sleep(2000)

  val router = FrontEnd.getRouter

  router ! Add(10,3)
  router ! Mul(3,7)
  router ! Div(8,2)
  router ! Sub(45, 3)
  router ! Div(8,0)

  Thread.sleep(2000)

  router ! Add(10,3)
  router ! Mul(3,7)
  router ! Div(8,2)
  router ! Sub(45, 3)
  router ! Div(8,0)

}

adaptiveRouterDemo.scala:

package clusterrouter.demo
import clusterrouter.messages.Messages._
import clusterrouter.backend.Calculator
import clusterrouter.frontend.CalcRouter
import com.typesafe.config.ConfigFactory
import scala.util.Random
import scala.concurrent.duration._
import akka.actor._
import akka.cluster._


class RouterRunner extends Actor {
  val jobs = List(Add,Sub,Mul,Div)
  import context.dispatcher

  val calcRouter = context.actorOf(CalcRouter.props,"adaptive")
  context.system.scheduler.schedule(3.seconds, 3.seconds, self, "DoRandomMath")

  override def receive: Receive = {
    case  _ => calcRouter ! anyMathJob
  }
  def anyMathJob: MathOps =
    jobs(Random.nextInt(4))(Random.nextInt(100), Random.nextInt(100))
}

object AdaptiveRouterDemo extends App {

  Calculator.create(2551)   //seed-node
  Calculator.create(0)      //backend node

  Thread.sleep(2000)


  val config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").
    withFallback(ConfigFactory.load("adaptive"))

  val calcSystem = ActorSystem("calcClusterSystem",config)

  //#registerOnUp
  Cluster(calcSystem) registerOnMemberUp {
    val _ = calcSystem.actorOf(Props[RouterRunner],"frontend")
  }
  //#registerOnUp

  //val _ = calcSystem.actorOf(Props[RouterRunner],"frontend")

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图