Akka(1):Actor – 靠音信使得的运算器

by admin on 2018年12月20日

脚是Actor
wallet的概念,必须继承Actor以及override receive函数:

正文拟统计一些己以学node.js开发进程被遭遇的坑,并重现其来境况,分析出因,提议解决办法,以此一来进步协调node.js的学习水平,二来帮忙与本人平的稍白童鞋们在遭遇相同的题材时,能便捷的收获解决办法。当然,水平有限,片面和不当是在所难免的,欢迎各位路过的高手不吝指出。

 

用作一个node.js的小白新手,在学习过程中难免会遭逢各个各类的坑,有时是程序报错,却未晓得错在什么地方;有时是思量实现一个功力,却非清楚该怎么开,诸如此类,不胜枚举。这多少个坑对于大牛来说,可能一直未是问题,但是对于小白新手,每每遭逢那多少个题目,可能就是会师占用大量底日错开搜寻寻解决问题的艺术。

 

1.连接mongodb时,出现 Trying to open unclosed connection 错误

发出原因:在描绘及数据库交互的代码时,一般情况下,我们的思绪是这般的。首先创立一个连续,调用mongoose.connect('mongodb://localhost/dbname');然后于模型(MVC设计模式中之Model)文件中定义模型的细节,通过定义Schema并调用mongoose.model(dbname, schema);最终遵照是model生成一个具体的文档并调用相应的数据库操作,比如save(),当执行这多少个操作时,上述荒唐就出现了。出错的原委在调用mongoose.model(dbname, schema)每每,程序会打算新建一个针对mongodb的连天,而及时是mongodb不容许的,于是报错。
解决办法:创制一个数据库的连接,并当此外需要选择数据库的地点引用那连续。具体做法是,可以新建一个独立的文件,比如mongoose.js,代码如下

var mongoose = require(‘mongoose’);
mongoose.connect(‘mongodb://hostname/dbname’);
module.exports = mongoose;

然后以每个model文件被引用这连续

var mongoose = require(‘../mongoose.js’);

尽管能够放心的推行外各类数据库操作了。对于欲连接五只mongodb数据库的次,原理同。

(本文将不止更新)

PriorityMailbox需要继承UnboundedPriorityMailbox并且提供相比函数PriorityGenerator。ZipUp,UnZip和PutIn都是最优先的。然后于application.conf登记dispatcher的布:

曾看到一个有关Actor情势之观:认为Actor并无适合出现(concurrency)编程,更应是保安其中状态的运算工具。听起好像死无知,毕竟Actor形式本身就是是出现模式,如若非抱出现编程,岂不与Akka的注解意愿相左。再仔细研商了瞬间这意见的论述后就全认可了这种观点。在这里大家分析一下这种论述,先瞧下边就段Actor用法伪代码:

  def fuQuery(query: DBQuery): Future[FResult] = Future {
    val x = db.RunQuery(query)
    val y = getValue(x)
    computeResults(x,y)
  } 

  val r1 = fuQuery(query1)
  val r2 = fuQuery(query2)
  for {
    x <- r1
    y <- r2
  } yield combineValues(x,y)
prio-dispatcher {
  mailbox-type = "PriorityMailbox"
}

3、Behavior:简而言之就是对Mailbox里消息之反射措施。Mailbox中即存放了起外围流传的命令,怎样运算这些指令、发生什么结果尚且是由那么些指令的演算函数来规定。所以那些函数的效能就象征正Actor的表现情势。Actor的运算行为好通过become来替换默认的receive函数,用unbecome来还原默认行为。

 

  class PriorityMailbox(settings: ActorSystem.Settings, config: Config)
    extends UnboundedPriorityMailbox (
    PriorityGenerator {
      case Wallet.ZipUp => 0        
      case Wallet.UnZip => 0
      case Wallet.PutIn(_) => 0
      case Wallet.DrawOut(_) => 2
      case Wallet.CheckBalance => 4
      case PoisonPill => 4
      case otherwise => 4
     }
    )

立段代码中QueryActor没有外内部状态。通过Future传递总计结果可知兑现不死(non-blocking)运算。上边我们为此QueryActor来落实并作运算:

    class Wallet extends Actor {
      import Wallet._
      var balance: Double = 0
      var zipped: Boolean = true

      override def receive: Receive = {
        case ZipUp =>
          zipped = true
          println("Zipping up wallet.")
        case UnZip =>
          zipped = false
          println("Unzipping wallet.")
        case PutIn(amt) =>
          if (zipped) {         
            self ! UnZip         //无论如何都要把钱存入
            self ! PutIn(amt)
          }
          else {
            balance += amt
            println(s"$amt put-in wallet.")
          }

        case DrawOut(amt) =>
          if (zipped)  //如果钱包没有打开就算了
            println("Wallet zipped, Cannot draw out!")
          else
            if ((balance - amt) < 0)
              println(s"$amt is too much, not enough in wallet!")
            else {
            balance -= amt
            println(s"$amt drawn out of wallet.")
          }

        case CheckBalance => println(s"You have $balance in your wallet.")
      }
    }
Unzipping wallet.
10.5 put-in wallet.
20.3 put-in wallet.
100.0 put-in wallet.
Zipping up wallet.
Wallet zipped, Cannot draw out!
You have 130.8 in your wallet.

Process finished with exit code 0
prio-dispatcher {
  mailbox-type = "PriorityMailbox"
}

咱得以看到此Actor的里边状态分别是:var
balance, var zipped。下边是定制Mailbox定义:

 

1、ActorRef:Akka系统是一个树形层级式的社团,每个节点由一个Actor代表。每一个Actor在结构面临还得据此一个路线(ActorPath)来表示她当系统结构里的岗位。我们可另行用者途径来构建Actor,但每一遍构建都会面出新的ActorRef。所以ActorRef是唯一的,代表了某个路径指向地点及的一个运转时的Actor实例,大家不得不用ActorRef来向Actor发送音信

object Actor101 extends App {
  val system = ActorSystem("actor101-demo",ConfigFactory.load)
  val wallet = system.actorOf(Wallet.props.withDispatcher(
    "prio-dispatcher"),"mean-wallet")

  wallet ! Wallet.UnZip
  wallet ! Wallet.PutIn(10.50)
  wallet ! Wallet.PutIn(20.30)
  wallet ! Wallet.DrawOut(10.00)
  wallet ! Wallet.ZipUp
  wallet ! Wallet.PutIn(100.00)
  wallet ! Wallet.CheckBalance

  Thread.sleep(1000)
  system.terminate()

}

好了,回到正题:从效果上Actor是由于实例引用(ActorRef),音讯邮箱(Mailbox),内部状态(State),运算行为(Behavior),子类下属(Child-Actor),监管政策(Supervision/Monitoring)几部分构成。Actor的大体构造由ActorRef、Actor
Instance(runtime实例)、Mailbox、dispatcher(运算器)组成。大家以本篇先介绍一下ActorRef,Mailbox,State和Behavior。

 

import akka.actor._
import akka.dispatch.PriorityGenerator
import akka.dispatch.UnboundedPriorityMailbox
import com.typesafe.config._

  object Wallet {
    sealed trait WalletMsg
    case object ZipUp extends WalletMsg    //锁钱包
    case object UnZip extends WalletMsg    //开钱包
    case class PutIn(amt: Double) extends WalletMsg   //存入
    case class DrawOut(amt: Double) extends WalletMsg //取出
    case object CheckBalance extends WalletMsg  //查看余额

    def props = Props(new Wallet)
  }


  class PriorityMailbox(settings: ActorSystem.Settings, config: Config)
    extends UnboundedPriorityMailbox (
    PriorityGenerator {
      case Wallet.ZipUp => 0
      case Wallet.UnZip => 0
      case Wallet.PutIn(_) => 0
      case Wallet.DrawOut(_) => 2
      case Wallet.CheckBalance => 4
      case PoisonPill => 4
      case otherwise => 4
     }
    )

    class Wallet extends Actor {
      import Wallet._
      var balance: Double = 0
      var zipped: Boolean = true

      override def receive: Receive = {
        case ZipUp =>
          zipped = true
          println("Zipping up wallet.")
        case UnZip =>
          zipped = false
          println("Unzipping wallet.")
        case PutIn(amt) =>
          if (zipped) {
            self ! UnZip         //无论如何都要把钱存入
            self ! PutIn(amt)
          }
          else {
            balance += amt
            println(s"$amt put-in wallet.")
          }

        case DrawOut(amt) =>
          if (zipped)  //如果钱包没有打开就算了
            println("Wallet zipped, Cannot draw out!")
          else
            if ((balance - amt) < 0)
              println(s"$amt is too much, not enough in wallet!")
            else {
            balance -= amt
            println(s"$amt drawn out of wallet.")
          }

        case CheckBalance => println(s"You have $balance in your wallet.")
      }
    }

object Actor101 extends App {
  val system = ActorSystem("actor101-demo",ConfigFactory.load)
  val wallet = system.actorOf(Wallet.props.withDispatcher(
    "prio-dispatcher"),"mean-wallet")

  wallet ! Wallet.UnZip
  wallet ! Wallet.PutIn(10.50)
  wallet ! Wallet.PutIn(20.30)
  wallet ! Wallet.DrawOut(10.00)
  wallet ! Wallet.ZipUp
  wallet ! Wallet.PutIn(100.00)
  wallet ! Wallet.CheckBalance

  Thread.sleep(1000)
  system.terminate()

}

 

由要解析application.conf里之配置,所以使用了ActorSystem(name,
config)模式。构建Actor时用.withDispatcher把application.conf里之dispatcher配置prio-dispatcher传入。

运算的结果如下:

 

下边我们尽管因而个例证来示范Actor:模拟一个吝啬人的钱管,他总是会管付出放在最下的职位。如此大家得以用音讯优先排序信箱UnboundedPriorityMailbox来贯彻。依据Akka程序规范格式,大家事先管每个Actor所用处理的新闻和Props构建放在她的伴生对象里:

 

下的代码可以就此来试运行Actor
wallet:

下是此次示范的完全代码:

 

 

每当那么些事例里r1和r2就实在是互运算的。从之案例被我的定论是硬着头皮将Akka
Actor使用以急需爱戴其中状态的运中。如果为了实现non-blocking只需要拿程序分布及不同之线程里运行的言辞就该直接用Future,这样自然之基本上。但以Future是了不可以保障内部状态的。

 class QueryActor extends Actor {
    override def receive: Receive = {
      case GetResult(query) => 
        val x = db.RunQuery(query)
        val y = getValue(x)
        sender() ! computeResult(x,y)
    }
  }

  val result: Future[Any] = QueryActor ? GetResult(...)

哲学原理, 

 

  object Wallet {
    sealed trait WalletMsg
    case object ZipUp extends WalletMsg    //锁钱包
    case object UnZip extends WalletMsg    //开钱包
    case class PutIn(amt: Double) extends WalletMsg   //存入
    case class DrawOut(amt: Double) extends WalletMsg //取出 
    case object CheckBalance extends WalletMsg  //查看余额

    def props = Props(new Wallet)   
  }

4、State:Actor内部状态,由同样组变量值表示。当前里边状态即行为函数最终一涂鸦运算所生的变量值

 

乍眼看r1跟r2貌似能实现互动运算,但并非遗忘Actor运算环境是单线程的,而Actor信箱又是按序的(Ordered),所以顿时片单运算只可以按照顺序运行,最多也就是是会当旁一个线程里异步进行而已,r1运算始终会阻塞r2的运转。如此还不若直接使用Future,能重新好的贯彻并发程序的相互运算。同样的求而就此Future来实现的讲话可以用底的伪代码:

2、Mailbox:可以说成是一个运算指令队列(command
queque)。Actor从外表接收的音信如故先期寄存于Mailbox里之。系统默认Mailbox中最好数量的信是仍日顺序排列的,但用户可以依据实际需要定制Mailbox,比如简单容量信箱、按音信优先排序信箱等。

Actor101.scala:

application.conf:

  val r1 = QueryActor ! request1
  val r2 = QueryActor ! request2
  for {
    x <- r1
    y <- r2
  } yield combineValues(x,y)

 
Akka是由各个角色跟法力的Actor组成的,工作之显要原理是把同桩好的盘算任务分割成小环节,再根据各环节的渴求构建相应效率的Actor,然后拿各环节的演算托付给相应的Actor去单独完成。Akka是个工具库(Tools-Library),不是一个软件架构(Software-Framework),我们不需要遵照Akka的框架格式去编写程序,而是向来以需要构建Actor去异步运算一宗完整的职能,这样吃用户在无形中中自然之兑现了大半线程并发软件编程(concurrent
programming)。按这样的描述,Actor就是一模一样栽据信息使(Message-driven)的运算器,大家可直接调用它来运算一段子先后。音信让情势之补是得实现低度的涣散耦合(loosely-coupling),因为系统部件之间未用软件接口,而是通过新闻来举行系统并的。音信使得情势协理了每个Actor的独门运算环境,又可以当运转时依照需巧的指向网Actor举行增减,伸缩自如,甚至可以于运转时(runtime)对系布局开展调配。Akka的这个强烈的风味都是因此音讯使得来促成之。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图