实例分析

前面我们看了AMQP说明文档, 对AMQP有了大体的了解, 本文从实例出发再过一遍AMQP的基本操作.

准备

环境
RabbitMQ server 3.7.16
RabbitMQ client 5.7.3

客户端代码使用的是RabbitMQ官网教程, 如下:

public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel();) {boolean durable = true;channel.queueDeclare(QUEUE_NAME, durable, false, false, null);String message = String.join(" ", "dMessage.......");channel.exchangeDeclare("mind", "direct");channel.basicPublish("mind", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}

下面是用wireshark抓包结果

我们后面以编码No来具体分析

抓包分析


1-6是tcp创建连接的三次握手步骤, 这里不做过多分析


7-24是amqp创建连接的过程, 我们可以对照前面的博客中的说明文档来分析这里. 每次一端向另一端发送信息, 另一端在接收到后都会发送一个ack表示接收到了.


1 在tcp连接创建后, 客户端会向服务端发送协议版本信息, 这里是amqp的0.9.1版本, 服务端会校验版本是否接受, 如果不符合要求会返回错误信息, 这里只有正确信息, 后面我们自己实现客户端的时候可以实现一个错误例子.


2 服务端校验协议通过后, 会向客户端发送创建连接请求Connection.Start, 客户端在准备好后会返回一个Connection.Start-Ok. 接着服务端发送Connection.Tune与客户端进行参数调试, 调试的内容有Channel最大数量, Frame最大长度等. 客户端在调试后发送Connection.Tune-OK. 这个阶段就是对连接的参数调试.


3 参数调试之后, 客户端请求服务端打开连接Connection.Open, 服务端打开之后会返回Connection.Open-Ok. Connection打开成功后, 客户端请求打开通道Channel.Open, 服务端打开之后返回Channel.Open-Ok. 至此连接创建成功.


4 连接创建成功之后, 客户端进行队列和exchange的声明, Queue.Declare -> Queue.Declare-Ok, Exchange.Declare -> Exchange.Declare-Ok.


5 有了Exchange后, 客户端向Exchange发送信息, 我们可以看到发送的Exchange, 和发送的内容


6 发送内容结束后, 客户端关闭, 先关闭通道Channel, 然后关闭Connection.


7 最后是tcp关闭连接

代码分析

下面我们从代码层面分析这个过程, 下面是一个总体的时序图, 大家可以参考

创建tcp连接

代码很简单

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();

我们重点看factory.newConnection(); 顺着方法我们很快找到了AutorecoveringConnection的init()方法

public void init() throws IOException, TimeoutException {this.delegate = this.cf.newConnection();this.addAutomaticRecoveryListener(delegate);}

在this.cf.newConnection()中重点看下

FrameHandler frameHandler = factory.create(addr, connectionName());RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);conn.start();metricsCollector.newConnection(conn);return conn;

大家debug代码可以看到factory是SocketFrameHandlerFactory的实例, 所以create中的代码如下:

public FrameHandler create(Address addr, String connectionName) throws IOException {String hostName = addr.getHost();int portNumber = ConnectionFactory.portOrDefault(addr.getPort(), ssl);Socket socket = null;try {socket = createSocket(connectionName);configurator.configure(socket);socket.connect(new InetSocketAddress(hostName, portNumber),connectionTimeout);return create(socket);} catch (IOException ioe) {quietTrySocketClose(socket);throw ioe;}}

这里我们可以看到java网络的底层代码Socket,
socket.connect(new InetSocketAddress(hostName, portNumber), connectionTimeout);
这句代码完成了tcp的连接的创建工作.
(准备看这里源码的时候就想着肯定有个地方在做这步操作, 但就是找不到, 最后一点一点debug找到的…)

创建Connection

在上一步的最后, 把socket对象封装到一个FrameHandler实例中, 从这里我们可以猜想, 后面所有消息的通信都跟这个FrameHandler分不开.
我们继续看, 返回之后

FrameHandler frameHandler = factory.create(addr, connectionName());
RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
conn.start();

使用FrameHandler实例构造了一个Connection对象, 然后调用了start()方法, 实际调用的是父类AMQConnection方法, 这个也是整个连接过程的重点
这里代码比较长, 我们选择一些重要的一点一点看

initializeConsumerWorkService(); // 初始化工作线程
initializeHeartbeatSender(); // 初始化心跳线程
// Make sure that the first thing we do is to send the header,
// which should cause any socket errors to show up for us, rather
// than risking them pop out in the MainLoop
// 确保我们在最开始发送的信息头在发生错误的时候不会出现在MainLoop中
// 这个实体就是为了接收在发送给服务端版本后接收服务端的Connection.Start方法的
AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =new AMQChannel.SimpleBlockingRpcContinuation();
// We enqueue an RPC continuation here without sending an RPC
// request, since the protocol specifies that after sending
// the version negotiation header, the client (connection
// initiator) is to wait for a connection.start method to
// arrive.
// 我们这里没有通过发送请求获取响应, 是因为服务端在接收到版本信息后会主动发送信息
_channel0.enqueueRpc(connStartBlocker);

enqueueRpc里面如下, 就是在循环中等待服务端信息接收成功的通知

private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {synchronized (_channelMutex) {boolean waitClearedInterruptStatus = false;while (_activeRpc != null) {try {_channelMutex.wait(); // 后面在接收到Connection.Start方法后会通知} catch (InterruptedException e) { //NOSONARwaitClearedInterruptStatus = true;// No Sonar: we re-interrupt the thread later}}if (waitClearedInterruptStatus) {Thread.currentThread().interrupt();}// 获取到通知后更新实体信息_activeRpc = rpcWrapperSupplier.get();}
}

_frameHandler.sendHeader(); // 发送版本信息, 对应抓包7

this._frameHandler.initialize(this); // 初始化, 主要是启动了一个MainLoop线程用于获取服务端信息

MainLoop线程的核心代码代码

Frame frame = _frameHandler.readFrame();
readFrame(frame);

_frameHandler.readFrame() 内部代码如下, 这里大家可以查看译文中的2.3.5 Frame Details 帧的细节部分, 对照客户端是如何构造的, Frame结构如下

public static Frame readFrom(DataInputStream is) throws IOException {int type;int channel;try {type = is.readUnsignedByte(); // 一个字节的类型信息} catch (SocketTimeoutException ste) {// System.err.println("Timed out waiting for a frame.");return null; // failed}if (type == 'A') { // 这里是处理, 如果服务端不支持客户端的版本, 会发送支持的版本信息, 开头是'A'/** Probably an AMQP.... header indicating a version* mismatch.*//** Otherwise meaningless, so try to read the version,* and throw an exception, whether we read the version* okay or not.*/protocolVersionMismatch(is); // 这里面会抛出异常}channel = is.readUnsignedShort(); // 两个个字节的channel编号int payloadSize = is.readInt(); // 4个字节的payload大小byte[] payload = new byte[payloadSize];is.readFully(payload); // 读取payloadSize大小的字节int frameEndMarker = is.readUnsignedByte(); // 一个字节的尾部if (frameEndMarker != AMQP.FRAME_END) {throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);}// 构造对象并返回 return new Frame(type, channel, payload);
}

上一步主要是对信息的封装, 下面是客户端对封装对象的处理

private void readFrame(Frame frame) throws IOException {if (frame != null) {_missedHeartbeats = 0;if (frame.type == AMQP.FRAME_HEARTBEAT) {// Ignore it: we've already just reset the heartbeat counter.} else {if (frame.channel == 0) { // the special channel 0通道是在创建连接过程中使用的_channel0.handleFrame(frame); // 这一步就是将Connection.Start内容放到了channel提前设置的实体中} else {if (isOpen()) {// If we're still _running, but not isOpen(), then we// must be quiescing, which means any inbound frames// for non-zero channels (and any inbound commands on// channel zero that aren't Connection.CloseOk) must// be discarded.ChannelManager cm = _channelManager;if (cm != null) {ChannelN channel;try {channel = cm.getChannel(frame.channel);} catch(UnknownChannelException e) {// this can happen if channel has been closed,// but there was e.g. an in-flight delivery.// just ignoring the frame to avoid closing the whole connectionLOGGER.info("Received a frame on an unknown channel, ignoring it");return;}channel.handleFrame(frame);}}}}} else {// Socket timeout waiting for a frame.// Maybe missed heartbeat.handleSocketTimeout();}
}

我们回到start()方法中, 获取Connection.Start方法, 然后设置一些服务单发过来的参数
connStart = (AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod();

然后按照是响应Start.Ok, Tune方法, 对应抓包9-16

do {Method method = (challenge == null)? new AMQP.Connection.StartOk.Builder().clientProperties(_clientProperties).mechanism(sm.getName()).response(response).build(): new AMQP.Connection.SecureOk.Builder().response(response).build();try {Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();if (serverResponse instanceof AMQP.Connection.Tune) {connTune = (AMQP.Connection.Tune) serverResponse;} else {challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();response = sm.handleChallenge(challenge, username, password);}} catch (ShutdownSignalException e) {Method shutdownMethod = e.getReason();if (shutdownMethod instanceof AMQP.Connection.Close) {AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {throw new AuthenticationFailureException(shutdownClose.getReplyText());}}throw new PossibleAuthenticationFailureException(e);}
} while (connTune == null);

获取到调试信息, 设置本地参数

int channelMax = negotiateChannelMax(this.requestedChannelMax,connTune.getChannelMax());
_channelManager = instantiateChannelManager(channelMax, threadFactory);int frameMax =negotiatedMaxValue(this.requestedFrameMax,connTune.getFrameMax());
this._frameMax = frameMax;int heartbeat =negotiatedMaxValue(this.requestedHeartbeat,connTune.getHeartbeat());

setHeartbeat(heartbeat); 启动心跳线程
发送调整完毕方法TuneOk, 并请求打开连接Open

_channel0.transmit(new AMQP.Connection.TuneOk.Builder().channelMax(channelMax).frameMax(frameMax).heartbeat(heartbeat).build());
_channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder().virtualHost(_virtualHost).build());

至此Connection的连接已经创建并打开

创建Channel

接下来是Channel的创建, 我们前面代码中使用的Channel是特殊的, 专门用于创建Connection, 下面创建的是为了后面发送队列消息使用的Channel.
Channel channel = connection.createChannel() // 入口
根据AMQP文档, 创建Channel需要客户端发送Channel.Open方法然后接收服务端的Channel.OpenOk, 我们从抓包中也可以观察到. 我们一步一步跟踪代码, 代码层级比较深, 这里给出调用逻辑, 从下到上(对, 没错, 这就是创建Channel报错日志截取了部分)

com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:295)
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:133)
com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:182)
com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:555)
com.rabbitmq.client.impl.recovery.AutorecoveringConnection.createChannel(AutorecoveringConnection.java:165)

privateRpc的代码我们看下

private AMQCommand privateRpc(Method m) throws IOException, ShutdownSignalException{SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);rpc(m, k); // 发送Channel.Open 方法// At this point, the request method has been sent, and we// should wait for the reply to arrive.// 这里我们已经发送了请求, 我们应该等待响应// Calling getReply() on the continuation puts us to sleep// until the connection's reader-thread throws the reply over// the fence or the RPC times out (if enabled)// 调用getReply()方法会阻塞直到获取到结果或者超时if(_rpcTimeout == NO_RPC_TIMEOUT) {return k.getReply();} else {try {return k.getReply(_rpcTimeout);} catch (TimeoutException e) {throw wrapTimeoutException(m, e);}}
}

接收Channel.OpenOk方法是由MainLoop线程完成的, 方式类似之前获取Connection.Start方法.

消息发送

至此AMQP连接算是完全创建完毕, 下面就是消息队列相关. 首先是队列和Exchange的声明, 这里队列的声明其实没有什么用, 代码这么写就是为了看下声明过程

channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
channel.exchangeDeclare("mind", "direct");channel.basicPublish("mind", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

这里声明的方式非常简单大家跟着代码很容易明白, queue和Exchange的声明过程基本一样, 不同的是queue在声明前会校验下队列的合法性(长度). 它们获取响应结果的方式和Channel.OpenOk的获取方式一毛一样.
消息的发送过程也是发送一个AMQPCommand, 但是细节很多, 准备在后面实现客户端的部分再详细看.

关闭连接

程序执行结束, 执行try-with-resources部分, 自动执行close()方法, 执行顺序从下到上, 也就是先执行Channel的close(), 然后Connection的close(); 从抓包中也可以看到先发送的Channel.close方法, 再发送Connection.close方法. 代码细节的部分这里就不展开了, 会放到后面代码实现上.

总结

整体过了一遍主要流程, 后面我们会自己实现一个简单客户端加深下理解; 这个过程中除了了解了客户端的操作流程外, 对java的部分知识也学习了一下
try-with-resources 在关闭时, 执行关闭的顺序和声明顺序相反;
try-with-resources 也可以有catch和finally块, 它们是在try-with-resources声明关闭之后执行的.

java线程状态流转

客户端实现(待完成~)

今天我们的目标是实现rabbitmq客户端, 并使用该客户端发送消息到指定Exchange中.

tcp连接创建

超级简单

socket = new Socket();
socket.connect(new InetSocketAddress(host, port));
// 保存连接输入输出流
inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));

抓包

发送信息头

我们通过抓包和源码知道, 发送头就是发送了"AMQP0091"

private int major = 0;
private int minor = 9;
private int revision = 1;
outputStream.write("AMQP".getBytes());
outputStream.write(0);
outputStream.write(major);
outputStream.write(minor);
outputStream.write(revision);
outputStream.flush();

抓包结果

可以看到服务端已经认可协议, 并发送了Connection.Start方法过来.
如果我们发送的协议服务端不认识会怎么样, 我们把major改为2试试
抓包结果

自己看下我们发的内容如下

我们是发送了0291, 抓包是支持AMQP协议的, 所以这里应该是不认识了, 所以显示为unknown version, 但是让我不理解的是服务端返回的结果也是unknown version, 根据AMQP文档中的说明, 服务端这时应该返回支持的协议, 我们点开看下

的确是0091正常的协议, 但是抓包软件没有显示出来, 很奇怪~

Connection.StartOk

消息中间件 二 之AMQP实战(上)相关推荐

  1. 慕课网_《RabbitMQ消息中间件极速入门与实战》学习总结

    慕课网<RabbitMQ消息中间件极速入门与实战>学习总结 时间:2018年09月05日星期三 说明:本文部分内容均来自慕课网.@慕课网:https://www.imooc.com 教学源 ...

  2. 【军事】“运筹帷幄”的利器:二三维一体化实战指挥辅助决策系统

    二三维一体化实战指挥辅助决策系统基于SuperMap GIS 二三维一体化技术,将声.光与电子信息技术融为一体,具有传统沙盘无法比拟的模型动态呈现.角度切换.图像缩放效果,可实现模型展示.数据分析.实 ...

  3. Ambari源码二次开发实战课程(持续更新中)

    如果大家想基于 ambari 开发一套自己的大数据平台,那么 ambari 二次开发的相关知识就必须具备.比如:ambari 整体编译,ambrai 汉化,ambari 页面开发,ambari api ...

  4. OnlyOffice验证(二)在Centos7上部署OnlyOffice编译结果

    在Centos7上部署OnlyOffice编译结果   此处将尝试将OnlyOffice验证(一)DocumentServer编译验证的结果部署到Centos7上.并且使用其它服务器现有的Rabbit ...

  5. 家用服务器中心,家用存储娱乐服务器 篇二:硬件实战篇

    家用存储文娱办事器 篇二:硬件实战篇' @3 U7 h0 z. x- t# N$ N6 w 1 S2 y7 S7 f: P% b  P5 ] 正如上文所说,各类纠结一番并正在期待新一代低功耗处置器的我 ...

  6. 韩顺平主讲织梦dedecms 5.7二次开发实战仿站模板制作视频教程

    个人看后感觉 最好的  dedecms 5.7二次开发视频教程 由于原文件太大,上传到cdsn不方便,就传到网盘了 下载地址 传智播客 PHP教程 韩顺平 dedecms 项目开发笔记.rar 下载地 ...

  7. 鸟哥的Linux私房菜(服务器)- 主机基本安全之二: Linux线上自动升級

    主機基本安全之二: Linux 線上自動升級 切換解析度為 800x600 最近更新日期:2006/09/10 本文已不再維護,更新文章請參考這裡 在現在的 Internet 上面,Cracker 實 ...

  8. VTK:二次方用法实战

    VTK:二次方用法实战 程序输出 程序完整源代码 程序输出 程序完整源代码 #include <vtkActor.h> #include <vtkDataSetMapper.h> ...

  9. mybatis源码阅读(二):mybatis初始化上

    转载自  mybatis源码阅读(二):mybatis初始化上 1.初始化入口 //Mybatis 通过SqlSessionFactory获取SqlSession, 然后才能通过SqlSession与 ...

最新文章

  1. [讨论]你的女朋友值多少钱?
  2. android检测本地是否安装,在本地测试模块的安装
  3. JAVA中文注解驱动,解决api接口返回的json里面出现中文乱码的问题
  4. windows opensshd 连接就close_基于Windows白名单执行Payload上线Metasploit 渗透红队笔记...
  5. 别看乐高模板了!利用计算机视觉技术,这个软件几秒就识别满地积木,还能激发灵感...
  6. NLPIR-KGB知识图谱引擎突破传统数据挖掘束缚
  7. 关于大型网站技术演进的思考(七)--存储的瓶颈(7)
  8. 超酷的实时颜色数据跟踪javascript类库 - Tracking.js
  9. Maven快速创建SpringMVC web(1)
  10. [原][歌曲]感动的歌曲排序
  11. [POI2009]SLO
  12. windows 10 安装 spark 环境(spark 2.2.1 + hadoop2.7)
  13. Linux 下安装JDK1.8
  14. Mac Idea批量删除空行
  15. 计算机软件丛书,开天辟地学电脑丛书——办公软件篇
  16. java中ofd文件转pdf_java ofd文件解析
  17. 学习计算机编程(IT、偏网站开发)的参考学习网址syk
  18. 来到广西的十年之吃喝
  19. 从零搭建KVM虚拟服务器
  20. 维度诅咒_维度的诅咒减去行话的诅咒

热门文章

  1. AndroidStudio中打开新项目提示:Unrecognized Android Studio (or Android Support plugin for IntelliJ IDEA)
  2. STM32串口如何代码实现更稳定的接收消息
  3. 奇迹世界无法聊天显示服务器繁忙,奇迹世界(SUN) 中文官方网站
  4. 练习:求列表(整数列表)平衡点
  5. 正则表达式说明(转)
  6. 回忆 08省赛 吉大(上)
  7. 小程序关联公众号后域名和服务器,公众号被封了,关联的小程序会怎么办?
  8. 微信小程序表单验证错误提示。
  9. sabaki加katago配置,加载sgf后同步,提示gtp引擎与当前棋盘状态同步失败
  10. 3Delight feats. OpenVDB