本章包括:

  • 什么是事件总线
  • 如何在事件总线上拥有点对点通信【point-to-point】、请求-应答通信【request-reply】、发布/订阅通信【publish/subscribe】
  • 用于在网络上进行verticleverticle通信【verticle-to-verticle】的分布式事件总线

前一章我们介绍了Vert.x中的verticles。Vert.x应用程序由一个或多个verticles组成,每个verticle构成一个处理异步事件的单元【unit】。通常会根据功能【functional】和技术【technical 】方面的考虑来对各个verticle进行专门化处理,例如,一个verticle用于暴露HTTP API,而另一个verticle用于处理数据存储。此设计还鼓励出于可伸缩性目的而部署给定verticle的多个实例。

我们还没有讨论的是verticles之间如何相互通信。例如,如果较大的Vert.x应用程序要执行任何有用的操作,则HTTP API Verticle需要与数据存储Verticle通信。

事件总线的作用是连接verticle并确保它们能够协作。在构建响应式应用程序时,这一点很重要——事件总线提供了一种方法,可以透明地在进程内部和网络上的几个节点上分发事件处理工作。

3.1 什么是事件总线

事件总线是一种以异步方式发送和接收消息的方法。消息可以发送到目标地址【destination 】,亦可以从目标地址【destination 】检索。目标地址【destination 】只是一个形式自由的字符串,比如incoming.purchase.orders或incoming-purchase-orders这种字符串,尽管我们更倾向于首选前一种带圆点的格式。

一个消息包含:

  • 消息体【body】
  • 消息头【header】<可选的>,用于存储元数据
  • 过期时间戳【expiration timestamp】,如果消息还未被处理,那么过期时间之后将被丢弃

消息体通常使用Vert.x JSON表示进行编码。使用JSON的好处是,它是一种序列化格式,可以轻松地通过网络传输,而且所有编程语言都能解释它。也可以使用Java原语【primitive 】和字符串类型,特别是当用于编写verticle的JVM语言具有直接绑定时。最后,但并非最重要的是,可以注册自定义编码器/解码器(codec),以支持更专门的消息体序列化形式。例如,您可以编写一个编解码器,将Java对象转换为自己的二进制编码。然而,这样做很少有用,而且JSON和字符串数据覆盖了大多数Vert.x应用程序的需求。

事件总线允许在verticles之间解耦。一个verticle类不需要访问另一个verticle类——所需要的只是就目标名称【destination name】和数据表示【data represention】达成一致。另一个好处是由于Vert.x是多语言的,事件总线允许用不同语言编写的Verticle相互通信,而不需要任何复杂的语言互操作层【language interoperability layer】,无论是在同一个JVM进程内通信还是跨网络通信。

事件总线的一个有趣特性是:它可以扩展到应用程序进程之外。在您将在本章中看到,事件总线也可以在集群的分布式成员之间工作。在本书的后面,您将看到如何将事件总线扩展到嵌入式或外部消息代理、远程客户端以及在Web浏览器中运行的JavaScript应用程序。

事件总线上的通信遵循以下三种模式:

  • 点对点消息传递【Point-to-point messaging】
  • 请求-应答消息传递【Request-reply messaging】
  • 发布/订阅消息传递【Publish/subscribe messaging】

3.1.1 IS THE EVENT BUS JUST ANOTHER MESSAGE BROKER?

熟悉面向消息的中间件的读者会发现事件总线和消息代理之间有着明显的相似之处。毕竟,事件总线展示了熟悉的消息传递模式,例如发布/订阅模式,该模式在集成分布式【distributed 】和异构【heterogeneous 】的应用程序时很流行。

答案是否定的,Vert.x事件总线不是Apache ActiveMQ、RabbitMQ、ZeroMQ或Apache Kafka的替代方案。更详细的解释是,它是用于应用程序内部verticleverticle通信的事件总线,而不是用于应用程序到应用程序之间通信的消息总线。正如您将在本书后面看到的,Vert.x可以与消息代理集成,但事件总线不能替代这种类型的中间件。具体地说,事件总线不做以下事情:

  • 支持消息确认【message acknowledgments】
  • 支持消息优先级【message priorities】
  • 支持消息持久性【message durability】以从崩溃中恢复
  • 提供路由规则【routing rules】
  • 提供转换规则【transformation rules】(模式适应【schema adaptation】,分散/聚集【scatter/gather】等)

事件总线仅承载易失性事件,这些事件由verticle异步处理。

并非所有事件都是一样的,虽然有些事件可能会丢失,但有些可能不会。 在我们编写响应式应用程序的过程中,您将看到在哪里将数据复制或消息代理(例如Apache Kafka)与事件总线结合使用。

事件总线是一种简单、快速的事件传递器【event conveyor】,我们可以利用它进行大多数verticle之间的通信交互,同时使用成本更高的中间件来处理不能丢失的事件。

TIP 熟悉消息传递模式的读者可能希望略过接下来的三个小节,甚至跳过它们。

3.1.2 POINT-TO-POINT MESSAGING

消息由生产者发送到目标地址【Destination 】,如图3.1中的 a.b.c。目标地址名称【Destination Name】是形式自由的字符串,但是Vert.x社区中的约定是使用分隔点。例如,我们可以使用datastore.new-purchase-orders发送存储在数据库中的新采购订单。

使用点对点消息传递【POINT-TO-POINT MESSAGING】,可能有多个消费者中的一个选择消息并处理它。图3.1用消息M1、M2和M3阐述了这一点。

消息以轮询【round-robin】的方式在消费者之间分发,因此它们以相同的比例分割消息处理。这就是为什么在图3.1中,第一个消费者处理M1和M3,而第二个消费者处理M2。请注意,没有公平机制来将更少的消息分发给重负载的消费者。

3.1.3 REQUEST-REPLY MESSAGING

在Vert.x中,请求-应答消息传递通信模式【request-reply messaging communication pattern】是点对点消息传递【point-to-point messaging】的变体。当以点对点消息传递【point-to-point messaging】方式发送消息时,可以注册一个应答处理程序【reply handler】。当您这样做时,事件总线将生成一个临时目标名称【Destination Name】,专门用于期望得到应答的请求消息生产者和最终将接收并处理消息的消费者之间的通信。

此消息传递模式可以很好地模拟远程过程调用,但是响应是以异步方式发送的,因此不需要一直等待直到响应返回。例如,HTTP API Verticle可以向数Verticle发送请求以获取一些数据,数据存储Verticle最终返回一个应答消息。

请求-应答消息传递模式如图3.2所示。当一个消息期望得到应答时,应答目标地址【reply destination】由事件总线生成,并在消息到达使用者之前附加到消息上。如果您愿意,您可以通过事件总线消息API检查应答目标地址名称,但是您很少需要知道目标,因为您只需在消息对象上调用应答方法。当然,需要对消息使用者进行编程,以便在使用此模式时提供应答。

3.1.4 PUBLISH/SUBSCRIBE MESSAGING

在发布/订阅通信【publish/subscribe communications】中,生产者和消费者之间的解耦更加严重。当消息被发送到目标地址【destination】时,所有订阅者都会接收到该消息,如图3.3所示。消息M1、M2和M3分别由不同的生产者发送,所有订阅者都将接收到消息,这与点对点消息传递【point-to-point messaging】的情况不同(见图3.1)。无法在事件总线上为发布/订阅通信【publish/subscribe communications】指定应答处理程序。


当您不确定有多少个Verticle和处理程序【handler】对特定事件感兴趣时,发布/订阅非常有用。如果您需要消息消费者对消息生产者作出反馈,请使用请求-应答模式【request-reply】。否则,选择点对点【point-to-point】与发布/订阅【publish/subscribe】则只是一个功能需求,主要是所有使用者应该处理一个事件还是只有一个使用者应该处理。

3.2 The event bus in an example

让我们使用事件总线,看看如何在独立的Verticle之间进行通信。我们将使用的示例涉及几个温度传感器【temperature sensors】。当然,我们不会使用任何硬件。相反,我们将使用伪随机数让温度演变。我们还将发布一个简单的web界面,实时更新温度及其平均值。

Web界面的屏幕截图如图3.4所示。 它显示来自四个传感器的温度,并保持其平均温度值为最新状态。 Web界面和服务器之间的通信将使用服务器发送的事件进行,这是大多数Web浏览器支持的一种简单而有效的协议[2]。

图3.5给出了应用程序体系结构的概述。图中显示了两个并发的事件通信,分别用顺序序列[1、2、3](正在发送温度更新)和[a,b,c,d](正在请求温度平均计算)进行注释。

该应用程序由四个Verticle组成:

  1. HeatSensor会以非固定速率生成温度测量值,并将其发布给sensor.updates目的地址的订阅者。 每个Verticle都有一个唯一的传感器标识符。
  2. Listener 监视新的温度测量值并使用SLF4J记录它们
  3. SensorData会记录每个传感器的最新观测值。 它还支持请求-响应通信【request-response communications】:将消息发送到sensor.average,会触发一次基于最新温度测量值的平均温度计算,并将结果作为响应发送获取
  4. HttpServer公开HTTP服务器并提供Web界面。 每当观察到新的温度测量值时,它将新值推送到其客户端,并定期请求当前平均温度值,并更新所有连接的客户端。

3.2.1 HEAT SENSOR VERTICLE

以下清单显示了HeatSensor verticle类的实现。

public class HeatSensor extends AbstractVerticle {private final Random random = new Random();private final String sensorId = UUID.randomUUID().toString(); ① 使用UUID生成传感器标识private double temperature = 21.0;@Overridepublic void start() {scheduleNextUpdate();}private void scheduleNextUpdate() {vertx.setTimer(random.nextInt(5000) + 1000, this::update); ② 更新被安排在1到6秒之间的随机延迟。}private void update(long timerId) {temperature = temperature + (delta() / 10);JsonObject payload = new JsonObject().put("id", sensorId).put("temp", temperature);vertx.eventBus().publish("sensor.updates", payload);    ③ 将消息发送给订阅者scheduleNextUpdate();   ④ 我们安排/调度下一次更新}private double delta() {            ⑤ 这将计算一个随机的正或负值,以略微修改当前温度。if (random.nextInt() > 0) {return random.nextGaussian();} else {return -random.nextGaussian();}}
}

Listing 3.1 Heatsensor verticle implementation

HeatSensor verticle类不使用任何实际的温度模型,而是使用随机的增量或减量。 因此,如果您运行足够长的时间,它可能会报告荒谬的值,但这在我们探索响应式应用程序的旅途中并不是很重要。

通过Vertx上下文和eventBus()方法可以访问到事件总线。 由于此 verticle发布的值将用于什么,因此我们使用publish方法将它们发送给sensor.updates目标地址【destination】上的订阅者。 我们还使用JSON对数据进行编码,这是Vert.x的惯用做法。

现在让我们看一下消费温度更新消息的订阅者verticle

3.2.2 LISTENER VERTICLE

以下清单显示了Listener verticle类的实现。

public class Listener extends AbstractVerticle {private final Logger logger = LoggerFactory.getLogger(Listener.class);private final DecimalFormat format = new DecimalFormat("#.##");   ① 我们不需要完整的double值,因此我们将所有温度格式化为保留2位小数的字符串表示形式。@Overridepublic void start() {EventBus bus = vertx.eventBus();bus.<JsonObject>consumer("sensor.updates", msg -> {  ② consumer允许订阅消息,并且注册一个回调用于处理所有事件总线的消息JsonObject body = msg.body();  ③ 消息的有效载荷存在于body中String id = body.getString("id");String temperature = format.format(body.getDouble("temp"));logger.info("{} reports a temperature ~{}C", id, temperature);  ④ 简单打印下日志});}
}

Listing 3.2 Listener verticle implementation

Listener verticle类的目的是记录所有的温度测量值,因此它所做的就是监听从sensor.updates目标地址【destination】j接收到的消息。由于HeatSensor类中的发射器使用发布/订阅模式【publish/subscribe pattern】,因此Listener 并不是能够接收此类消息的唯一verticle

在本例中,我们没有利用消息头【message headers】,但是可以将它们用于存储不属于消息体【message body】的任何元数据。一个常见的消息头【headers】是“action”,帮助消息接收者知道消息是关于什么的。例如,给定一个database.operations目标地址【destination】,我们可以使用“action”消息头【headers】来指定增(存储新条目)、删(删除先前存储的条目)、改(更新条目)、查(查询数据库)。

现在让我们看看另一个消费温度更新消息的订阅者verticle

3.2.3 SENSOR DATA VERTICLE

以下清单显示了SensorData verticle类的实现。

public class SensorData extends AbstractVerticle {private final HashMap<String, Double> lastValues = new HashMap<>(); ① 我们通过其唯一标识符存储每个传感器的最新测量值。@Overridepublic void start() {   ② start方法仅声明两个事件总线目标地址处理程序【destination handlers】。EventBus bus = vertx.eventBus();bus.consumer("sensor.updates", this::update);bus.consumer("sensor.average", this::average);}private void update(Message<JsonObject> message) {  ③ 当接收到新的温度度量值时,我们从JSON主体中提取数据。JsonObject json = message.body();lastValues.put(json.getString("id"), json.getDouble("temp"));}private void average(Message<JsonObject> message) {  ④  average请求所传入消息没有被使用,因此它可以只包含一个空的JSON文档。double avg = lastValues.values().stream().collect(Collectors.averagingDouble(Double::doubleValue));JsonObject json = new JsonObject().put("average", avg);message.reply(json);                               ⑤ reply方法用于回复消息}
}

Listing 3.3 Sensordata verticle implementation

SensorData类具有两个事件总线处理程序【event-bus handlers】:

  • 一个用于传感器更新:它更新HashMap中的条目
  • 另一个用于平均温度计算请求:它计算平均值并响应消息发送者

下一个 verticle是HTTP服务器。

3.2.4 HTTP SERVER VERTICLE

HTTP服务器很有趣,因为它通过事件总线从SensorData verticle请求平均温度,并且它实现了SSE协议【server-sent events protocol】来消费温度的更新。

让我们从这个verticle实现的主干开始。

SERVER IMPLEMENTATION

public class HttpServer extends AbstractVerticle {@Overridepublic void start() {vertx.createHttpServer().requestHandler(this::handler).listen(config().getInteger("port", 8080));   ① HTTP服务器端口配置为默认值8080。}private void handler(HttpServerRequest request) {if ("/".equals(request.path())) {request.response().sendFile("index.html");   ② sendFile方法允许将任何本地文件的内容流式传输到客户端,连接将会自动关闭。} else if ("/sse".equals(request.path())) {    ③ SSE将使用/sse资源,并且为该请求提供了方法实现sse(request);} else {                                       ④ 任何其他请求报404request.response().setStatusCode(404);}}private void sse(HttpServerRequest request) {HttpServerResponse response = request.response();response.putHeader("Content-Type", "text/event-stream").putHeader("Cache-Control", "no-cache").setChunked(true);MessageConsumer<JsonObject> consumer = vertx.eventBus().consumer("sensor.updates");consumer.handler(msg -> {response.write("event: update\n");response.write("data: " + msg.body().encode() + "\n\n");});TimeoutStream ticks = vertx.periodicStream(1000);ticks.handler(id -> {vertx.eventBus().<JsonObject>request("sensor.average", "", reply -> {if (reply.succeeded()) {response.write("event: average\n");response.write("data: " + reply.result().body().encode() + "\n\n");}});});response.endHandler(v -> {consumer.unregister();ticks.cancel();});}
}

Listing 3.4 Prologue of the HTTP server verticle implementation

该处理程序【handelr】处理如下三种场景:

  • 向浏览器提供wen应用程序
  • 提供SSE资源
  • 当请求其他资源时返回404

TIP 根据请求的资源路径和HTTP方法手动派发【manually dispatching】自定义操作非常繁琐。 稍后您将看到,vertx-web模块提供了一个更好的路由器API,可以方便地声明处理程序。

THE WEB APPLICATION

现在,我们来看看由HTTP服务器提供服务的客户端应用程序。 该Web应用程序可以放在一个HTML文档中,如下面的清单所示(我删除了不相关的HTML部分,例如页眉和页脚)。

<head><meta charset="UTF-8"><title>Chapter 3</title>
</head>
<body>
<div id="avg"></div>
<div id="main"></div><script language="JavaScript">const sse = new EventSource("/sse")             ① EventSource 用于处理SSEconst main = document.getElementById("main")const avg = document.getElementById("avg")sse.addEventListener("update", (evt) => {      ② 该回调监听update类型的消息const data = JSON.parse(evt.data)            ③ 响应数据为纯文本,并且由于服务器将发送JSON,因此我们需要对其进行解析。let div = document.getElementById(data.id);if (div === null) {div = document.createElement("div")        ④ 如果传感器没有用于显示其数据的div,我们将创建div.setAttribute("id", data.id)main.appendChild(div)                      ⑤ 更新div}div.innerHTML = `<strong>${data.temp.toFixed(2)}</strong> (<em>${data.id}</em>)`})sse.addEventListener("average", (evt) => {     ⑥ 该回调监听average类型的消息  const data = JSON.parse(evt.data)avg.innerText = `Average = ${data.average.toFixed(2)}`})</script></body>
</html>

Listing 3.5 Web application code

前面清单中的JavaScript代码处理SSE【server-sent events】,并作出响应以更新显示的内容。我们本可以使用许多流行的JavaScript框架之一,但有时回到最基本的东西也不错。

SUPPORTING SERVER-SENT EVENTS

现在让我们关注SSE【server-sent events】是如何工作的,以及如何使用Vert.x轻松实现它们。

SSE【server-sent events】是服务器将事件推送到客户端的一种非常简单但有效的协议。协议是基于文本的,每个事件是一个带有事件类型【event type】和一些数据【some data】的块【block 】:

event: foo
data: bar

每个块事件【block event】由一个空行分隔,所以两个连续的事件看起来像这样:

event: foo
data: abcevent: bar
data: 123

Vert.x 中实现SSE是一件很简单的事

  private void sse(HttpServerRequest request) {HttpServerResponse response = request.response();response.putHeader("Content-Type", "text/event-stream")  ① 为SSE设置MIME类型为text/event-stream.putHeader("Cache-Control", "no-cache")    ② 由于这是实时流,因此我们需要防止浏览器和代理对其进行缓存。.setChunked(true);MessageConsumer<JsonObject> consumer = vertx.eventBus().consumer("sensor.updates");    ③ 我们在没有处理程序的情况下调用consume,因为我们需要一个对象来在客户端断开连接时取消订阅。consumer.handler(msg -> {response.write("event: update\n");    ④ 发送事件块只是发送文本。response.write("data: " + msg.body().encode() + "\n\n");});TimeoutStream ticks = vertx.periodicStream(1000);   ⑤ 我们每秒更新一次平均值,因此我们需要一个定期计时器。 由于需要取消它,因此我们也使用不带处理程序的形式来获取对象。ticks.handler(id -> {vertx.eventBus().<JsonObject>request("sensor.average", "", reply -> {  ⑥ 请求发送一条期望响应的消息。 该回复是一个异步对象,因为它可能已失败。if (reply.succeeded()) {response.write("event: average\n");response.write("data: " + reply.result().body().encode() + "\n\n");}});});response.endHandler(v -> {   ⑦ 当客户端断开连接(或刷新页面)时,我们需要注销事件总线消息使用者,并取消计算平均值的定期任务。consumer.unregister();ticks.cancel();});}

Listing 3.6 Supporting server-sent events

清单3.6提供了sse方法的实现,该方法处理对/sse资源的HTTP请求。它为每个温度更新的HTTP请求声明一个消费者,并推送新的事件。它还声明了一个定期任务查询SensorData verticle ,以请求-答复的方式维护平均值。

因为这两个处理程序【handler】是针对HTTP请求的,所以我们需要能够在连接断开时停止它们。这可能是由于网页浏览器选项【browser tab】卡关闭了,或者只是页面重新加载【page reloads,F5】。为此,我们获得流对象,并为每个对象声明一个处理程序【handler】,就像我们使用接受回调的形式一样。在下一章中,您将看到如何处理流对象,以及它们何时有用。

我们还可以使用命令行工具,比HTTPie或者curl,来查看应用程序的事件流,如下面的清单所示。

$ http http://localhost:8080/sse --stream     ❶ --stream标志允许将响应流传输到控制台,而不是等待服务器结束连接。
HTTP/1.1 200 OK
Cache-Control: no-cache
Content-Type: text/event-stream
Transfer-Encoding: chunkedevent: average                                ❷ 每一个SSE事件都有一个类型
data: {"average":21.132465880152044}          ❸ 因为JSON只是文本,所以它可以像事件数据一样传输。event: update
data: {"id":"3fa8321d-7600-42d3-b114-9fb6cdab7ecd","temp":21.043921061475107}event: update
data: {"id":"8626e13f-9114-4f7d-acc3-bd60b00f3028","temp":21.47111113365458}event: average
data: {"average":21.123126848463464}

Listing 3.7 Stream of SSE events using HTTPie

3.2.5 BOOTSTRAPPING THE APPLICATION

现在我们已经准备好所有的verticle,我们可以将它们组装为一个Vert.x应用程序。 以下清单展示了一个用于引导应用程序的主类【main class】。 它部署了四个传感器verticle,并为其他verticle部署了一个实例。

public class Main {public static void main(String[] args) {Vertx vertx = Vertx.vertx();vertx.deployVerticle("chapter3.HeatSensor", new ➥ DeploymentOptions().setInstances(4));          ❶ 我们启动4个传感器verticlevertx.deployVerticle("chapter3.Listener");        ❷ 我们正在使用deployVerticle的变体,该变体使用反射来实例化verticle类。vertx.deployVerticle("chapter3.SensorData");vertx.deployVerticle("chapter3.HttpServer");}
}

Listing 3.8 Main class to bootstrap the application

运行此类的main方法,使我们可以通过web浏览器连接到http://localhost:8080/。当您这样做时,您应该会看到一个类似于图3.4的图形界面,其中包含持续的实时更新。控制台日志还将显示温度更新。

3.3 Clustering and the distributed event bus

到目前为止,我们对事件总线的使用都是本地的:所有通信【communications 】都在同一JVM进程内进行。更有趣的是使用Vert.x集群并从分布式事件总线中获益。

3.3.1 CLUSTERING IN VERT.X

Vert.x应用程序可以在群集模式下运行,在群集模式下,一组Vert.x应用程序节点可以通过网络一起工作。 它们可能是同一应用程序的节点实例,并具有相同的已部署的verticle集,但这不是必需的。 一些节点可以具有一组verticle,而其他节点可以具有另一组verticle

图3.6展示了Vert.x群集的概述。 集群管理器【cluster manager】确保节点可以通过事件总线交换消息,从而支持以下功能:

  • 组成员关系与发现【Group membership and discovery】使得我们可以发现新节点,维护当前节点列表以及检测节点何时消失。
  • 共享数据【Shared data】使得我们可以在群集范围内维护映射【maps】和计数器【counters 】,以便所有节点共享相同的值。 分布式锁对于节点之间的某种形式的协作【coordination 】很有用。
  • 订阅者拓扑【Subscriber topology】使得我们可以了解每个节点所感兴趣的事件总线目标地址【destinations 】。这对于在分布式事件总线上有效地调度消息很有用。 如果一个节点在a.b.c目标地址【destination】上没有消费者,则从该目标地址【destination】向该节点发送事件则没有任何意义

有几种基于Hazelcast、Infinispan、Apache Ignite和Apache ZooKeeper的Vert.x集群管理器实现。历史上Hazelcast是用于Vert.x的集群管理器,然后添加了其他引擎。它们都支持相同的Vert.x集群抽象,用于成员关系【membership】、共享数据【Shared data】和事件总线消息传递【event-bus message passing】。它们在功能上都是等价的,因此您必须根据自己的需求和约束条件选择一个。如果您不知道该选择哪一个,我建议您使用Hazelcast,这是一个很好的默认选择。


最后,如图3.6所示,节点之间的事件总线通信是通过使用自定义协议的TCP直连进行的。当节点向目标地址【destination】发送消息时,它将使用集群管理器【cluster manager 】检查订阅者拓扑【subscriber topology】,并将消息分发给具有该目标地址【destination】订阅者的节点。

What cluster manager should you use?

对于应该使用哪个集群管理器的问题,没有一个最优答案。这取决于您是否需要与一个库进行特殊集成,还取决于您需要部署哪种类型的环境。例如,如果您需要在代码中使用Infinispan API,而不仅仅是将Infinispan作为Vert.x的集群管理器引擎,则应该使用Infinispan来满足这两个需求。

您还应该考虑您的部署环境。如果您部署到一个正在使用Apache ZooKeeper的环境中,也许将其作为Vert.x集群管理器来使用也会是一个不错的选择。

默认情况下,某些集群管理器使用多播通信【 multicast communications】进行节点发现,这在某些网络上可能是禁用的,特别是在像Kubernetes这样的容器化环境中。在这种情况下,您将需要配置集群管理器以在这些环境中工作。

如前所述,如果有疑问,请选择Hazelcast,并检查项目文档以获取特定的网络配置,比如部署到Kubernetes时。稍后您总是可以更改为另一个集群管理器实现。

3.3.2 FROM EVENT BUS TO DISTRIBUTED EVENT BUS

让我们回到本章前面开发的热传感器应用程序。 迁移到分布式事件总线对于这些verticles是透明的。

我们将准备两个主类【main class】,它们分别部署不同的verticle,如图3.7所示:

  1. 4个HeatSensor实例,一个监听808端口的HttpServer 实例
  2. 4个HeatSensor实例,1个Listener实例,一个SensorData实例,一个监听8081端口的HttpServer 实例(这样就可以再一个宿主机运行、测试了)

我们的目标是表明:在集群模式下启动每个部署的一个实例,verticles节点之间的通信就像在同一个JVM进程中运行一样。通过web浏览器连接到任意一个实例,都可以看到8个传感器数据的相同视图。同样,第二个实例上的Listner verticle将从第一个实例获取温度更新。

我们将使用Infinispan作为集群管理器,但你也可以使用其他的管理器。假设你的项目是用Gradle构建的,你需要添加vertx-infinispan作为一个依赖项:

implementation("io.vertx:vertx-infinispan:version")

下面的清单展示了主类FirstInstance的实现,我们可以使用它来启动一个未部署所有应用程序verticle的节点。

public class FirstInstance {private static final Logger logger = LoggerFactory.getLogger(FirstInstance.class);public static void main(String[] args) {Vertx.clusteredVertx(new VertxOptions(), ar -> {   ① 启动一个群集模式的Vert.x应用程序是异步操作。if (ar.succeeded()) {logger.info("First instance has been started");Vertx vertx = ar.result();                     ② 成功后,我们将检出Vertx实例。vertx.deployVerticle("chapter3.HeatSensor", new DeploymentOptions().setInstances(4));vertx.deployVerticle("chapter3.HttpServer");} else {logger.error("Could not start", ar.cause());  ③ 失败的潜在原因可能是缺少集群管理器库。}});}
}

Listing 3.9 Code of the main class for the first instance

如您所见,以集群模式启动应用程序需要调用clusteredVertx方法。 剩下的只是经典的Verticle部署。

第二个实例的main方法的代码非常相似,如下面的清单所示。

public class SecondInstance {private static final Logger logger = LoggerFactory.getLogger(SecondInstance.class);public static void main(String[] args) {Vertx.clusteredVertx(new VertxOptions(), ar -> {if (ar.succeeded()) {logger.info("Second instance has been started");Vertx vertx = ar.result();vertx.deployVerticle("chapter3.HeatSensor", new DeploymentOptions().setInstances(4));vertx.deployVerticle("chapter3.Listener");vertx.deployVerticle("chapter3.SensorData");JsonObject conf = new JsonObject().put("port", 8081);  ① 我们使用不同的端口,因此您可以在同一主机上启动两个实例。vertx.deployVerticle("chapter3.HttpServer", new DeploymentOptions().setConfig(conf));} else {logger.error("Could not start", ar.cause());}});}
}

Listing 3.10 Code of the main class for the second instance

两个主类【main class】都可以在同一宿主机上运行,并且两个实例将彼此发现。 和以前一样,您可以从IDE中启动它们,也可以通过在两个不同的终端中运行如下命令:

  • gradle run -PmainClass=chapter3.cluster.FirstInstance
  • gradle run -PmainClass= chapter3.cluster.SecondInstance

TIP:如果您正在使用IPv6,并且遇到了问题,可以在JVM参数中添加-Djava.net.preferIPv4Stack=true标志。

默认情况下,Vert Infinispan集群管理器被配置为使用网络广播【network broadcast】执行发现,这样当两个实例在同一台机器上运行时就会发现对方。当然您还可以在同一个网络上使用两台机器。

WARNING网络广播【Network broadcast】很少在云环境和许多数据中心中起作用。 在这些情况下,需要将集群管理器配置为使用其他发现和组成员关系协议【discovery and group membership protocols】。 对于Infinispan,该文档的详细信息位于https://infinispan.org/documentation/。

图3.8显示了运行的应用程序,其中一个浏览器通过端口8080连接到实例,而另一个浏览器通过端口8081连接到第二个实例,并且我们在后台看到了来自Listener Verticle的日志。 可以看到,两个实例都显示来自八个传感器的事件,并且
第一个实例更新了其平均温度,以便它可以与第二个实例上的SensorData Verticle交互。


分布式事件总线是一个有趣的工具,因为它对Verticle是透明的。

TIP:

事件总线API具有localConsumer方法,在集群模式下,该方法用于声明仅在本地工作【work locally 】的消息处理程序【message handlers】。例如,目标地址【destination】a.b.c的使用者将不会接收从集群中的另一个实例发送到该目标地址【destination】的消息。

下一章讨论异步数据【asynchronous data】和事件流【event streams】。

总结

  • 事件总线是verticles 之间进行通信的首选方式,并且它使用异步消息传递。
  • 事件总线既实现发布/订阅【publish/subscribe】(一对多)通信又实现点对点【point-to-point】(多对一)通信。
  • 事件总线看起来像传统的消息代理,但它不提供持久性【durability】保证,因此只能用于瞬态数据【transient data】。
  • 集群允许网络实例以透明的方式通过分布式事件总线通信,并在多个应用程序实例之间扩展工作负载

Vert.x实战 事件总线:Vert.x应用程序的主干相关推荐

  1. Vert.x(vertx) 事件总线(EventBus)与 远程服务调用

    Event Bus(事件总线) 是Vert.x的神经系统,负责应用系统消息的传递.Vert.x各模块(Verticle)之间的相互调用就是通过Event Bus实现的,因此各Verticle之间是高度 ...

  2. vert.x笔记:4.vert.x中调用spring服务

    evenbus事件总线介绍: 在介绍怎么在vert.x中集成spring服务前,我们要先简单介绍一下什么是vert.x的事件总线. eventbus是vert.x的神经总线,每个vert.x实例维护了 ...

  3. Vert.x实战 异步数据和事件流

    本章包括: 为什么流[streams]是事件[eventing]之上的一个有用的抽象 什么是背压[back-pressure],为什么它是异步生产者和消费者的基础 如何从流[streams]中解析协议 ...

  4. 第三章: 事件总线:Vert.x 应用程序的支柱

    本章涵盖了 事件总线是什么 如何通过事件总线进行点对点.请求-回复 和 发布/订阅 通信 用于通过网络进行verticle到verticle通信的分布式事件总线 上一章介绍了verticles. 一个 ...

  5. Vert.x实战 Verticles:Vert.x的基本处理单元

    本章包括: verticles是什么 如何写[write].配置[configure]以及部署[deploy]一个verticles Vert.x的线程模型[threading model] 如何混合 ...

  6. AndroidEventBus(事件总线)了解+实战体验

    转载请注明出处:http://blog.csdn.net/woshizisezise/article/details/51225186 大家好,今天咱们来说说本人最近使用到的一个新的开源工具类,也就是 ...

  7. java eventbus 原理_事件总线,事件驱动(RxJava,EventBus)与广播(BroadcastReceiver)(2)

    > 事件总线与广播 事件总线通过注解或者反射的方式自动维护和调用. otto和AndroidEventBus采用的是注解,EventBus则是规定方法名基础上采用反射. 从调度策略角度,Even ...

  8. 200代码写一套属于自己的事件总线(EventBus)库

    理论千万篇,不如实战来一篇. 源码 https://github.com/harvie1208/EventBus 关键词:观察者模式.反射.自定义注解.线程调度 手写200行代码,一步一步实现Even ...

  9. Android事件总线还能怎么玩?

    作者简介:何红辉,Android工程师,现任职于友盟. 顾名思义,AndroidEventBus是一个Android平台的事件总线框架,它简化了Activity.Fragment.Service等组件 ...

最新文章

  1. RabbitMQ (五) 订阅者模式之分发模式 ( fanout )
  2. 数据库Sqlite3
  3. RDLC报表下载的权限问题
  4. 易被销售员忽略的销售细节
  5. python运行代码不成功_python的运行时易犯错误,你中招了没?快来学习了
  6. php取excel中的值,在Php Excel中使用列名获取单元格值
  7. ssh无密码登录设置方法以及出现问题 ECDSA host key 和IP地址对应的key不同的解决...
  8. Unity 在windows10上资源默认下载的路径
  9. kotlin基本语法--kotlin官网文档学习
  10. 「干货」Linux 应急响应日志分析命令「详细总结」
  11. R语言使用mad函数、median函数、mean函数计算向量数据的中位数绝对偏差、中位数、均值
  12. 研究遭质疑!Jeff Dean回应:我们本就不是为得到新SOTA,成本计算也搞错了...
  13. DITHER 抖动算法
  14. 《真三国无双5》全人研究完整版
  15. 国产三维CAD华天软件STNOVATION 几何造型内核CRUX IV 解析
  16. 语音计算机音乐学猫叫,“杨钰莹学猫叫”上热搜,坤音四子自带流量……山东卫视春晚喜提全国收视第一...
  17. Mac电脑如何查看本机网卡mac地址
  18. 国外10款开源的HTML 5小游戏
  19. 十个关于商业智能商业智能BI的观点,你认同几个?
  20. 自动控制原理笔记-频率响应法-频率特性的概念

热门文章

  1. Qt 实现excel加法
  2. Git系列:常用操作一指禅
  3. window10更新完后,移动热点无法打开(显示我们无法设置移动热点)
  4. MBR30100CT-ASEMI插件肖特基二极管MBR30100CT
  5. 00后计算机学霸,这些00后学霸逆天了!超一本线135分,12岁少女考上985!
  6. 软件工程师,不懂点设计模式怎么行
  7. 前端和后端交互的一些原规范问题
  8. OpenCV中的图像处理 —— 图像梯度+Canny边缘检测+图像金字塔
  9. 直播教程-矩阵小程序流量主广告怎么赚钱
  10. 代码质量与安全 | 免费的静态分析工具好吗?