akka系统是一个分布式的消息驱动系统。akka应用由一群负责不同运算工作的actor组成,每个actor都是被动等待外界的某种消息来驱动自己的作业。所以,通俗点描述:akka应用就是一群actor相互之间发送消息的系统,每个actor接收到消息后开始自己负责的工作。对于akka-typed来说,typed-actor只能接收指定类型的消息,所以actor之间的消息交流需要按照消息类型来进行,即需要协议来规范消息交流机制。想想看,如果用户需要一个actor做某件事,他必须用这个actor明白的消息类型来发送消息,这就是一种交流协议。

所谓消息交流方式包括单向和双向两类。如果涉及两个actor之间的消息交换,消息发送方式可以是单向和双向的。但如果是从外界向一个actor发送消息,那么肯定只能是单向的发送方式了,因为消息发送两端只有一端是actor。

典型的单向消息发送fire-and-forget如下:

import akka.actor.typed._

import scaladsl._objectPrinter {case classPrintMe(message: String)//只接收PrintMe类型message

def apply(): Behavior[PrintMe] =Behaviors.receive {case (context, PrintMe(message)) =>context.log.info(message)

Behaviors.same

}

}objectFireAndGo extends App {//system就是一个root-actor

val system: ActorRef[Printer.PrintMe] = ActorSystem(Printer(), "fire-and-forget-sample")

val printer: ActorRef[Printer.PrintMe]=system//单向消息发送,printMe类型的消息

printer ! Printer.PrintMe("hello")

printer! Printer.PrintMe("world!")

system.asInstanceOf[ActorSystem[Printer.PrintMe]].terminate()

}

当然,在现实中通常我们要求actor去进行某些运算然后返回运算结果。这就涉及到actor之间双向信息交换了。第一种情况:两个actor之间的消息是任意无序的,这是一种典型的无顺序request-response模式。就是说一个response不一定是按照request的接收顺序返回的,只是它们之间能够交流而已。不过,在akka-typed中这种模式最基本的要求就是发送的消息类型必须符合接收方actor的类型。

好了,我们先对这个模式做个示范。所有actor的定义可以先从它的消息类型开始。对每个参加双向交流的actor来说,可以从request和response两种消息来反映它的功能:

objectFrontEnd {sealedtrait FrontMessagescase classSayHi(who: String) extends FrontMessages

}objectBackEnd {//先从这个actor的回应消息开始

sealedtrait Responsecase classHowAreU(msg: String) extends Responsecase objectUnknown extends Response//可接收消息类型

sealedtrait BackMessages//这个replyTo应该是一个能处理Reponse类型消息的actor

case classMakeHello(who: String, replyTo: ActorRef[Response]) extends BackMessages

}

这个FrontEnd接收SayHi消息后开始工作,不过目前还没有定义返回的消息类型。BackEnd接到MakeHello类型消息后返回response类型消息。从这个角度来讲,返回的对方actor必须能够处理Response类型的消息。

我们试试实现这个FrontEnd actor:

objectFrontEnd {sealedtrait FrontMessagescase classSayHi(who: String) extends FrontMessages

def apply(backEnd: ActorRef[BackEnd.BackMessages]): Behavior[FrontMessages]={

Behaviors.receive { (ctx,msg)=>msg match {case SayHi(who) =>ctx.log.info("requested to say hi to {}", who)

backEnd! BackEnd.MakeHello(who, ???)

}

}

}

MakeHello需要一个replyTo,应该是什么呢?不过它一定是可以处理Response类型消息的actor。但我们知道这个replyTo就是FrontEnd,不过FrontEnd只能处理FrontMessages类型消息,应该怎么办呢?可不可以把replyTo直接写成FrontEnd呢?虽然可以这么做,但这个MakeHello消息就只能跟FrontEnd绑死了。如果其它的actor也需要用到这个MakeHello的话就需要另外定义一个了。所以,最好的解决方案就是用某种类型转换方式来实现。如下:

import akka.actor.typed._

import scaladsl._objectFrontEnd {sealedtrait FrontMessagescase classSayHi(who: String) extends FrontMessagescase classWrappedBackEndResonse(res: BackEnd.Response) extends FrontMessages

def apply(backEnd: ActorRef[BackEnd.BackMessages]): Behavior[FrontMessages]={

Behaviors.setup[FrontMessages] { ctx=>

//ctx.messageAdapter(ref => WrappedBackEndResonse(ref))

val backEndRef: ActorRef[BackEnd.Response] =ctx.messageAdapter(WrappedBackEndResonse)

Behaviors.receive { (ctx, msg)=>msg match {case SayHi(who) =>ctx.log.info("requested to say hi to {}", who)

backEnd!BackEnd.MakeHello(who, backEndRef)

Behaviors.same//messageAdapter将BackEnd.Response转换成WrappedBackEndResponse

case WrappedBackEndResonse(msg) =>msg match {case BackEnd.HowAreU(msg) =>ctx.log.info(msg)

Behaviors.samecase BackEnd.Unknown =>ctx.log.info("Unable to say hello")

Behaviors.same

}

}

}

}

}

}

首先,我们用ctx.mesageAdapter产生了ActorRef[BackEnd.Response],正是我们需要提供给MakeHello消息的replyTo。看看这个messageAdapter函数:

def messageAdapter[U: ClassTag](f: U => T): ActorRef[U]

如果我们进行类型替换U -> BackEnd.Response, T -> FrontMessage 那么:

val backEndRef: ActorRef[BackEnd.Response] =ctx.messageAdapter((response: BackEnd.Response)=> WrappedBackEndResonse(response))

实际上这个messageAdapter函数在本地ActorContext范围内登记了一个从BackEnd.Response类型到FrontMessages的转换。把接收到的BackEnd.Response立即转换成WrappedBackEndResponse(response)。

还有一种两个actor之间的双向交流模式是 1:1 request-response,即一对一模式。一对一的意思是发送方发送消息后等待回应消息。这就意味着收信方需要在完成运算任务后立即向发信方发送回应,否则造成发信方的超时异常。无法避免的是,这种模式依然会涉及消息类型的转换,如下:

objectFrontEnd {sealedtrait FrontMessagescase classSayHi(who: String) extends FrontMessagescase classWrappedBackEndResonse(res: BackEnd.Response) extends FrontMessagescase classErrorResponse(errmsg: String) extends FrontMessages

def apply(backEnd: ActorRef[BackEnd.BackMessages]): Behavior[FrontMessages]={

Behaviors.setup[FrontMessages] { ctx=>

//ask需要超时上限

import scala.concurrent.duration._

import scala.util._implicit val timeOut: Timeout = 3.seconds

Behaviors.receive[FrontMessages] { (ctx, msg)=>msg match {case SayHi(who) =>ctx.log.info("requested to say hi to {}", who)

ctx.ask(backEnd,(backEndRef: ActorRef[BackEnd.Response])=>BackEnd.MakeHello(who,backEndRef) ){case Success(backResponse) =>WrappedBackEndResonse(backResponse)case Failure(err) =>ErrorResponse(err.getLocalizedMessage)

}

Behaviors.samecase WrappedBackEndResonse(msg) =>msg match {case BackEnd.HowAreU(msg) =>ctx.log.info(msg)

Behaviors.samecase BackEnd.Unknown =>ctx.log.info("Unable to say hello")

Behaviors.same

}case ErrorResponse(errmsg) =>ctx.log.info("ask error: {}",errmsg)

Behaviors.same

}

}

}

}

}

似乎类型转换是在ask里实现的,看看这个函数:

def ask[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[Res] =>Req)(

mapResponse: Try[Res]=> T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit

req -> BackEnd.BackMessages, res -> BackEnd.Response, T -> FrontMessages。现在ask可以写成下面这样:

ctx.ask[BackEnd.BackMessages,BackEnd.Response](backEnd,

(backEndRef: ActorRef[BackEnd.Response])=>BackEnd.MakeHello(who,backEndRef) ){case Success(backResponse:BackEnd.Response) =>WrappedBackEndResonse(backResponse)case Failure(err) =>ErrorResponse(err.getLocalizedMessage)

}

这样看起来更明白点,也就是说ask把接收的BackEnd.Response转换成了FrontEnd处理的消息类型WrappedBackEndRespnse,也就是FrontMessages

还有一种ask模式是在actor之外进行的,如下:

objectAskDemo extends App {

import akka.actor.typed.scaladsl.AskPattern._

import scala.concurrent._

import scala.concurrent.duration._

import akka.util._

import scala.util._implicit val system: ActorSystem[BackEnd.BackMessages] = ActorSystem(BackEnd(), "front-app")//asking someone requires a timeout if the timeout hits without response//the ask is failed with a TimeoutException

implicit val timeout: Timeout = 3.seconds

val result: Future[BackEnd.Response]=system.asInstanceOf[ActorRef[BackEnd.BackMessages]]

.ask[BackEnd.Response]((ref: ActorRef[BackEnd.Response]) =>BackEnd.MakeHello("John", ref))//the response callback will be executed on this execution context

implicit val ec =system.executionContext

result.onComplete {case Success(res) =>res match {case BackEnd.HowAreU(msg) =>println(msg)case BackEnd.Unknown =>println("Unable to say hello")

}case Failure(ex) =>println(s"error: ${ex.getMessage}")

}

system.terminate()

}

这个ask是在akka.actor.typed.scaladsl.AskPattern包里。函数款式如下:

def ask[Res](replyTo: ActorRef[Res] => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res]

向ask传入一个函数ActorRef[BackEnd.Response] => BackEnd.BackMessages,然后返回Future[BackEnd.Response]。这个模式中接收回复方是在ActorContext之外,不存在消息截获机制,所以不涉及消息类型的转换。

另一种单actor双向消息交换模式,即自己ask自己。在ActorContext内向自己发送消息并提供回应消息的接收,如pipeToSelf:

objectPipeFutureTo {

trait CustomerDataAccess {

def update(value: Customer): Future[Done]

}

finalcase classCustomer(id: String, version: Long, name: String, address: String)objectCustomerRepository {sealedtrait Command

finalcase classUpdate(value: Customer, replyTo: ActorRef[UpdateResult]) extends Commandsealedtrait UpdateResult

finalcase classUpdateSuccess(id: String) extends UpdateResult

finalcase classUpdateFailure(id: String, reason: String) extends UpdateResultprivate final case classWrappedUpdateResult(result: UpdateResult, replyTo: ActorRef[UpdateResult])

extends Commandprivate val MaxOperationsInProgress = 10def apply(dataAccess: CustomerDataAccess): Behavior[Command]={

Behaviors.setup[Command] { ctx=>

implicit val dispatcher = ctx.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-dispatcher"))

next(dataAccess, operationsInProgress= 0)

}

}private def next(dataAccess: CustomerDataAccess, operationsInProgress: Int)(implicit ec: ExecutionContextExecutor): Behavior[Command] ={

Behaviors.receive { (context, command)=>command match {case Update(value, replyTo) =>

if (operationsInProgress ==MaxOperationsInProgress) {

replyTo! UpdateFailure(value.id, s"Max $MaxOperationsInProgress concurrent operations supported")

Behaviors.same

}else{

val futureResult=dataAccess.update(value)

context.pipeToSelf(futureResult) {//map the Future value to a message, handled by this actor

case Success(_) =>WrappedUpdateResult(UpdateSuccess(value.id), replyTo)case Failure(e) =>WrappedUpdateResult(UpdateFailure(value.id, e.getMessage), replyTo)

}//increase operationsInProgress counter

next(dataAccess, operationsInProgress + 1)

}case WrappedUpdateResult(result, replyTo) =>

//send result to original requestor

replyTo !result//decrease operationsInProgress counter

next(dataAccess, operationsInProgress - 1)

}

}

}

}

}

akka typed mysql_akka-typed(2) - typed-actor交流方式和交流协议相关推荐

  1. Akka 学习(四)Remote Actor

    目录 一 介绍 1.1 Remote Actor 1.2 适用场景 1.3 踩坑点 二 实战 2.1 需求 2.2 Java 版本 2.2.1 效果图 2.2.2 实体类 2.2.3 服务端Actor ...

  2. 计算机对文字信息交流方式案例,《信息交流的方式》题本梳理_教师资格面试初中信息技术...

    <信息交流的方式>题本梳理_教师资格面试初中信息技术,中公讲师为大家进行录制教师资格面试备考系列视频,希望对各位考生有所帮助.以下为<信息交流的方式>题本梳理_教师资格面试初中 ...

  3. 深度:语音技术革命正在改变人类的交流方式

    来源:资本实验室 语言是我们日常交流的主要方式,可以让我们快速了解对方的意图,并做出适当的反应.但对很多语言障碍者而言,说话这一看似简单的行为依旧是难以想象的困难. 语言是件极其复杂的事情,不仅仅是文 ...

  4. 面对面交流的好处_我们的交流方式是如何被网络社交媒体一步步改变的

    随着互联网时代的到来,使得信息的传播越来越快,世界似乎也变得越来越小. 在以前我们的父辈与他人之间的互动能力极为有限,很大程度上仅限于他们认识的人.有时候一个对他人的问候都需要等上好几天对方才能收到. ...

  5. 即时通讯应用战争开打,到底谁能最终定义我们的交流方式?

    题图:风靡亚洲的Line 北京时间4月4日消息,据科技网站TechRadar报道,对业界来说,即时通讯应用是一个巨大的市场,除了专门发力该领域的公司,专注搜索的谷歌和专注社交的Facebook最近几年 ...

  6. 「镁客·请讲」小i机器人朱频频:会话AI将成为主流人机交流方式,积累和深度学习是关键...

    消费级服务机器人技术还没达到一个理想的状态,但积累很重要. 和小i机器人创始人&CEO朱频频的见面是在由中国服务机器人应用及推广联盟主办的2017国际服务机器人产业高峰论坛上.因为人很多,所以 ...

  7. 微信公众号与用户互动的8种交流方式

    说到微信公众号是现在大家在熟悉不过的了,每个人都会关注那么几个公众号,而现在公众号不管是企业.机构.个人都在运用的一个平台,企业用来传播自己的品牌以及产品的营销,机构.个人注册公众号也是这样一个理,在 ...

  8. matlab交流电路仿真,直流交流变换电路MATLAB仿真实训教案

    实训一.交流-直流变换电路的MATLAB仿真研究 一.MATLAB 介绍 MATLAB 是一种科学计算软件.由于它使用方便.输入便捷.运算高效.适应科技人员的思维方式,并且有绘图功能,有用户自行扩展的 ...

  9. web前端交流群小程序交流群uni-app交流群vue交流群react交流群

    初心: 前端行业缺乏团结,缺乏有力的互相支撑,前端工程师的话语权其实非常低, 有赖于越来越多的从业者意识到这一点,遂相约几个好友创建此群,望能互助 技术栈: web,前端,小程序,uni-app,vu ...

最新文章

  1. ocx js php,JS实现OCX控件的事件响应示例_javascript技巧
  2. 基于稀疏表示的人脸识别 (SRC,LASRC,RASL,MRR)
  3. Spring里Bean类的运行时小写之谜
  4. 躲开职业生涯的“甜蜜陷阱”
  5. matlab集群搭建问题
  6. 有人说学了C语言,两天就能学会Java,两个星期就可以找工作?
  7. list copy中status列的状态的意义。
  8. 【学习笔记】计算机导论之计算机硬件
  9. 第一次 Zul'grub
  10. php格式转换rar,如何在PHP中创建压缩的RAR文件?
  11. Deciding the Number of Clusterings
  12. 小程序学习笔记(5)-目录结构介绍
  13. 编译器vc6 新手使用教程(C、C++)
  14. Excel实用技巧辞典 01
  15. 人工智能学习-高等数学
  16. 浪潮存储通过ISCSI映射至Linux服务器、多路径配置方法
  17. 乒乓球比赛赛程_这家律所再次摘得业余乒乓球赛事冠军,为何结缘乒乓?
  18. 浅谈vue的生命周期
  19. 阿里云IOT入门教程(三)阿里云IOT Studio自建手机App控制Wemos D1 Mini( ESP8266 )板载灯亮灭
  20. 统一用户认证和单点登录和授权的原理与流程

热门文章

  1. Daily English Dictation Number Four
  2. 自媒体人怎么写出爆文?这些技巧让你轻松10W+
  3. QT点击信号怎么获取到是哪个对象点击的
  4. MySql函数, 实现Oracle中的to_data和to_char函数
  5. Using UVAtlas (Direct3D 9) 翻译
  6. Gazebo结合ROS仿真时,如何编写机器人的URDF
  7. 文献引文分析利器HistCite使用教程(附精简易用免安装Pro版本下载)
  8. Processing小游戏制作 01-弹跳小球
  9. 瞄准企业团险,决胜数字中台
  10. 使用TCL脚本读取配置文件