发送消息的一方称为生产者,负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

Producer 发送同一类消息且发送逻辑一致,这类消息被称为 Producer group。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

它在整个 RocketMQ 的生产和消费体系中扮演的角色如下图所示:

生产者: 一个逻辑概念,在使用生产者实例的时候需要指定一个组名。一个生产者组可以生产多个 Topic 消息。

生产者实例:一个生产者组部署了多个进程,每个进程都可以成为一个生产者实例。

Topic:主题名字,表示一类消息的集合,一个 Topic 由若干 Queue 组成。每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。

RocketMQ 客户端中的生产者有两个独立的生产者实现类:

org.apache.rocketmq.client.producer.DefaultMQProducer
org.apache.rocketmq.client.producer.TransactionMQProducer

其中 DefaultMQProducer 主要负责生产普通消息、顺序消息、单向消息、批量消息和延迟消息,TransactionMQProducer 主要负责生产事务消息。

消息结构和消息类型

RocketMQ 消息定义相关的代码位于 org.apache.rocketmq.common.message.Message 下,Producer 发送的消息定义为 Message 类,其基本字段为:

public class Message implements Serializable {private static final long serialVersionUID = 8445773977080406428L;private String topic;private int flag;private Map<String, String> properties;private byte[] body;private String transactionId;public Message() {}public Message(String topic, byte[] body) {this(topic, "", "", 0, body, true);}public String getTopic() {return topic;}public void setTopic(String topic) {this.topic = topic;}public int getFlag() {return flag;}public void setFlag(int flag) {this.flag = flag;}public byte[] getBody() {return body;}public void setBody(byte[] body) {this.body = body;}public Map<String, String> getProperties() {return properties;}void setProperties(Map<String, String> properties) {this.properties = properties;}public String getTransactionId() {return transactionId;}public void setTransactionId(String transactionId) {this.transactionId = transactionId;}@Overridepublic String toString() {return "Message{" +"topic='" + topic + '\'' +", flag=" + flag +", properties=" + properties +", body=" + Arrays.toString(body) +", transactionId='" + transactionId + '\'' +'}';}
}

各个字段的含义为:

topic:主题名字,可以通过 RocketMQ COnsole 创建。Message 都有 Topic 这一属性,Producer 发送指定 Topic 的消息,Consumer 订阅 Topic 下的消息。通过 Topic 字段,Producer 会获取消息投递的路由信息,决定发送给哪个 Broker。

flag:网络通信层标记,目前用到的地方很少。

properties:消息扩展信息,该字段是 HashMap 类型的,用来存储 Message 的其余各项参数,比如 tag、key 等关键的消息属性。RocketMQ 预定义了一组内置属性,除了内置属性之外,还可以设置任意自定义属性。当然属性的数量也是有限的,消息序列化之后的大小不能超过预设的最大消息大小。系统内置属性定义于 org.apache.rocketmq.common.message.MessageConst,内置属性有:

public class MessageConst {public static final String PROPERTY_KEYS = "KEYS";public static final String PROPERTY_TAGS = "TAGS";public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";public static final String PROPERTY_PRODUCER_GROUP = "PGROUP";public static final String PROPERTY_MIN_OFFSET = "MIN_OFFSET";public static final String PROPERTY_MAX_OFFSET = "MAX_OFFSET";public static final String PROPERTY_BUYER_ID = "BUYER_ID";public static final String PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";public static final String PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG";public static final String PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG";public static final String PROPERTY_MQ2_FLAG = "MQ2_FLAG";public static final String PROPERTY_RECONSUME_TIME = "RECONSUME_TIME";public static final String PROPERTY_MSG_REGION = "MSG_REGION";public static final String PROPERTY_TRACE_SWITCH = "TRACE_ON";public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES";public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME";public static final String PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = "TRAN_PREPARED_QUEUE_OFFSET";public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS";public static final String KEY_SEPARATOR = " ";public static final HashSet<String> STRING_HASH_SET = new HashSet<String>();static {STRING_HASH_SET.add(PROPERTY_TRACE_SWITCH);STRING_HASH_SET.add(PROPERTY_MSG_REGION);STRING_HASH_SET.add(PROPERTY_KEYS);STRING_HASH_SET.add(PROPERTY_TAGS);STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK);STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL);STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC);STRING_HASH_SET.add(PROPERTY_REAL_TOPIC);STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID);STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED);STRING_HASH_SET.add(PROPERTY_PRODUCER_GROUP);STRING_HASH_SET.add(PROPERTY_MIN_OFFSET);STRING_HASH_SET.add(PROPERTY_MAX_OFFSET);STRING_HASH_SET.add(PROPERTY_BUYER_ID);STRING_HASH_SET.add(PROPERTY_ORIGIN_MESSAGE_ID);STRING_HASH_SET.add(PROPERTY_TRANSFER_FLAG);STRING_HASH_SET.add(PROPERTY_CORRECTION_FLAG);STRING_HASH_SET.add(PROPERTY_MQ2_FLAG);STRING_HASH_SET.add(PROPERTY_RECONSUME_TIME);STRING_HASH_SET.add(PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);}
}

body:消息体,字节数组。需要注意生产者是用什么编码,消费者也必须使用相同的编码去解码,否则会产生乱码。

transactionId:RocketMQ 4.3.0 引入的事务消息相关的事务编号。

对于一些重要属性,Message 类还提供了一组 set 接口来进行设置,如:

public class Message implements Serializable {private static final long serialVersionUID = 8445773977080406428L;public Message() {}public void setKeys(String keys) {this.putProperty(MessageConst.PROPERTY_KEYS, keys);}void putProperty(final String name, final String value) {if (null == this.properties) {this.properties = new HashMap<String, String>();}this.properties.put(name, value);}public void putUserProperty(final String name, final String value) {if (MessageConst.STRING_HASH_SET.contains(name)) {throw new RuntimeException(String.format("The Property<%s> is used by system, input another please", name));}if (value == null || value.trim().isEmpty()|| name == null || name.trim().isEmpty()) {throw new IllegalArgumentException("The name or value of property can not be null or blank string!");}this.putProperty(name, value);}public String getTags() {return this.getProperty(MessageConst.PROPERTY_TAGS);}public void setTags(String tags) {this.putProperty(MessageConst.PROPERTY_TAGS, tags);}public String getKeys() {return this.getProperty(MessageConst.PROPERTY_KEYS);}public void setKeys(Collection<String> keyCollection) {String keys = String.join(MessageConst.KEY_SEPARATOR, keyCollection);this.setKeys(keys);}public int getDelayTimeLevel() {String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);if (t != null) {return Integer.parseInt(t);}return 0;}public void setDelayTimeLevel(int level) {this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));}public boolean isWaitStoreMsgOK() {String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);if (null == result) {return true;}return Boolean.parseBoolean(result);}public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK));}public void setInstanceId(String instanceId) {this.putProperty(MessageConst.PROPERTY_INSTANCE_ID, instanceId);}public String getBuyerId() {return getProperty(MessageConst.PROPERTY_BUYER_ID);}public void setBuyerId(String buyerId) {putProperty(MessageConst.PROPERTY_BUYER_ID, buyerId);}@Overridepublic String toString() {return "Message{" +"topic='" + topic + '\'' +", flag=" + flag +", properties=" + properties +", body=" + Arrays.toString(body) +", transactionId='" + transactionId + '\'' +'}';}
}

setKeys():设置消息的 key,多个 key 可以通过 MessageConst.KEY_SEPARATOR(空格)分隔或者直接使用另外一个重载方法。如果 broker 中 messageIndexEnable == true,则会根据 key 创建消息的 Hash 索引,帮助用户进行快速查询。

setTags():消息过滤的标记,用户可以订阅某个 Topic 的某个 Tag,这样 Broker 只会把订阅了 topic-tag 的消息发送给消费者。

setDelyTimeLevel():设置消息延迟处理级别,不同级别对应不同延迟时间。

setWaitStoreMsgOK():设置是否需要等待数据落地才认为消息发送成功的标记。

putUserProperty():如果还有其他扩展信息,可以存放到这里。其内部是一个 HashMap,重复调用会覆盖旧值。

对于发送方来说,上述 Message 的定义已足够使用。但对于 RocketMQ 的整个处理流程来说,还需要更多的字段信息用以记录一些必要内容,比如消息的 id、创建时间、存储时间等等。于是白尅使用另外一个实体去定义消息对象,即 MessageExt,该实体继承自 Message:

public class MessageExt extends Message {private static final long serialVersionUID = 5720810158625748049L;private String brokerName;private int queueId;private int storeSize;private long queueOffset;private int sysFlag;private long bornTimestamp;private SocketAddress bornHost;private long storeTimestamp;private SocketAddress storeHost;private String msgId;private long commitLogOffset;private int bodyCRC;private int reconsumeTimes;private long preparedTransactionOffset;public MessageExt() {}public MessageExt(int queueId, long bornTimestamp, SocketAddress bornHost, long storeTimestamp,SocketAddress storeHost, String msgId) {this.queueId = queueId;this.bornTimestamp = bornTimestamp;this.bornHost = bornHost;this.storeTimestamp = storeTimestamp;this.storeHost = storeHost;this.msgId = msgId;}public static TopicFilterType parseTopicFilterType(final int sysFlag) {if ((sysFlag & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG) {return TopicFilterType.MULTI_TAG;}return TopicFilterType.SINGLE_TAG;}public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;InetAddress address = inetSocketAddress.getAddress();if (address instanceof Inet4Address) {byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);} else {byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 16);}byteBuffer.putInt(inetSocketAddress.getPort());byteBuffer.flip();return byteBuffer;}public static ByteBuffer socketAddress2ByteBuffer(SocketAddress socketAddress) {InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;InetAddress address = inetSocketAddress.getAddress();ByteBuffer byteBuffer;if (address instanceof Inet4Address) {byteBuffer = ByteBuffer.allocate(4 + 4);} else {byteBuffer = ByteBuffer.allocate(16 + 4);}return socketAddress2ByteBuffer(socketAddress, byteBuffer);}public ByteBuffer getBornHostBytes() {return socketAddress2ByteBuffer(this.bornHost);}public ByteBuffer getBornHostBytes(ByteBuffer byteBuffer) {return socketAddress2ByteBuffer(this.bornHost, byteBuffer);}public ByteBuffer getStoreHostBytes() {return socketAddress2ByteBuffer(this.storeHost);}public ByteBuffer getStoreHostBytes(ByteBuffer byteBuffer) {return socketAddress2ByteBuffer(this.storeHost, byteBuffer);}public String getBrokerName() {return brokerName;}public void setBrokerName(String brokerName) {this.brokerName = brokerName;}public int getQueueId() {return queueId;}public void setQueueId(int queueId) {this.queueId = queueId;}public long getBornTimestamp() {return bornTimestamp;}public void setBornTimestamp(long bornTimestamp) {this.bornTimestamp = bornTimestamp;}public SocketAddress getBornHost() {return bornHost;}public void setBornHost(SocketAddress bornHost) {this.bornHost = bornHost;}public String getBornHostString() {if (null != this.bornHost) {InetAddress inetAddress = ((InetSocketAddress) this.bornHost).getAddress();return null != inetAddress ? inetAddress.getHostAddress() : null;}return null;}public String getBornHostNameString() {if (null != this.bornHost) {if (bornHost instanceof InetSocketAddress) {// without reverse dns lookupreturn ((InetSocketAddress) bornHost).getHostString();}InetAddress inetAddress = ((InetSocketAddress) this.bornHost).getAddress();return null != inetAddress ? inetAddress.getHostName() : null;}return null;}public long getStoreTimestamp() {return storeTimestamp;}public void setStoreTimestamp(long storeTimestamp) {this.storeTimestamp = storeTimestamp;}public SocketAddress getStoreHost() {return storeHost;}public void setStoreHost(SocketAddress storeHost) {this.storeHost = storeHost;}public String getMsgId() {return msgId;}public void setMsgId(String msgId) {this.msgId = msgId;}public int getSysFlag() {return sysFlag;}public void setSysFlag(int sysFlag) {this.sysFlag = sysFlag;}public void setStoreHostAddressV6Flag() { this.sysFlag = this.sysFlag | MessageSysFlag.STOREHOSTADDRESS_V6_FLAG; }public void setBornHostV6Flag() { this.sysFlag = this.sysFlag | MessageSysFlag.BORNHOST_V6_FLAG; }public int getBodyCRC() {return bodyCRC;}public void setBodyCRC(int bodyCRC) {this.bodyCRC = bodyCRC;}public long getQueueOffset() {return queueOffset;}public void setQueueOffset(long queueOffset) {this.queueOffset = queueOffset;}public long getCommitLogOffset() {return commitLogOffset;}public void setCommitLogOffset(long physicOffset) {this.commitLogOffset = physicOffset;}public int getStoreSize() {return storeSize;}public void setStoreSize(int storeSize) {this.storeSize = storeSize;}public int getReconsumeTimes() {return reconsumeTimes;}public void setReconsumeTimes(int reconsumeTimes) {this.reconsumeTimes = reconsumeTimes;}public long getPreparedTransactionOffset() {return preparedTransactionOffset;}public void setPreparedTransactionOffset(long preparedTransactionOffset) {this.preparedTransactionOffset = preparedTransactionOffset;}@Overridepublic String toString() {return "MessageExt [brokerName=" + brokerName + ", queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset+ ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost+ ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId+ ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="+ reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset+ ", toString()=" + super.toString() + "]";}
}

queueId:记录 MessageQueue 编号,消息会被发送到 Topic 下的 MessageQueue

storeSize:记录消息在 Broker 存盘中大小

queueOffset:记录在 ConsumeQueue 中的偏移

sysFlag:记录一些系统标志的开状态态,org.apache.rocketmq.common.sysflag.MessageSysFlag 中定义了系统标识

bornTimestamp:记录消息创建时间,在 Producer 发送消息时设置

storeHost:记录存储该消息的 Broker 地址

msgId:消息 Id

commitLogOffset:记录在 Broker 中存储便宜

bodyCRC:消息内容 CRC 校验值

reconsumeTimes:记录消息重试消费次数

preparedTransactionOffset:事务消息相关字段

Message 还有一个名为 MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX 的属性:

public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";

在消息发送时由 Producer 生成创建。msgId 则是消息在 Broker 端进行存储时通过MessageDecoder.createMessageId 方法生成的:

public class MessageDecoder {
//    public final static int MSG_ID_LENGTH = 8 + 8;public final static Charset CHARSET_UTF8 = StandardCharsets.UTF_8;public final static int MESSAGE_MAGIC_CODE_POSTION = 4;public final static int MESSAGE_FLAG_POSTION = 16;public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28;//    public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56;public final static int MESSAGE_MAGIC_CODE = -626843481;public static final char NAME_VALUE_SEPARATOR = 1;public static final char PROPERTY_SEPARATOR = 2;public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;public static final int QUEUE_OFFSET_POSITION = 4 + 4 + 4 + 4 + 4;public static final int SYSFLAG_POSITION = 4 + 4 + 4 + 4 + 4 + 8 + 8;
//    public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
//        + 4 // 2 MAGICCODE
//        + 4 // 3 BODYCRC
//        + 4 // 4 QUEUEID
//        + 4 // 5 FLAG
//        + 8 // 6 QUEUEOFFSET
//        + 8 // 7 PHYSICALOFFSET
//        + 4 // 8 SYSFLAG
//        + 8 // 9 BORNTIMESTAMP
//        + 8 // 10 BORNHOST
//        + 8 // 11 STORETIMESTAMP
//        + 8 // 12 STOREHOSTADDRESS
//        + 4 // 13 RECONSUMETIMES
//        + 8; // 14 Prepared Transaction Offsetpublic static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {input.flip();int msgIDLength = addr.limit() == 8 ? 16 : 28;input.limit(msgIDLength);input.put(addr);input.putLong(offset);return UtilAll.bytes2string(input.array());}public static String createMessageId(SocketAddress socketAddress, long transactionIdhashCode) {InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;int msgIDLength = inetSocketAddress.getAddress() instanceof Inet4Address ? 16 : 28;ByteBuffer byteBuffer = ByteBuffer.allocate(msgIDLength);byteBuffer.put(inetSocketAddress.getAddress().getAddress());byteBuffer.putInt(inetSocketAddress.getPort());byteBuffer.putLong(transactionIdhashCode);byteBuffer.flip();return UtilAll.bytes2string(byteBuffer.array());}public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {byte[] bytes = UtilAll.string2bytes(msgId);ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);// address(ip+port)byte[] ip = new byte[msgId.length() == 32 ? 4 : 16];byteBuffer.get(ip);int port = byteBuffer.getInt();SocketAddress address = new InetSocketAddress(InetAddress.getByAddress(ip), port);// offsetlong offset = byteBuffer.getLong();return new MessageId(address, offset);}/*** Just decode properties from msg buffer.** @param byteBuffer msg commit log buffer.*/public static Map<String, String> decodeProperties(ByteBuffer byteBuffer) {int sysFlag = byteBuffer.getInt(SYSFLAG_POSITION);int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;int bodySizePosition = 4 // 1 TOTALSIZE+ 4 // 2 MAGICCODE+ 4 // 3 BODYCRC+ 4 // 4 QUEUEID+ 4 // 5 FLAG+ 8 // 6 QUEUEOFFSET+ 8 // 7 PHYSICALOFFSET+ 4 // 8 SYSFLAG+ 8 // 9 BORNTIMESTAMP+ bornhostLength // 10 BORNHOST+ 8 // 11 STORETIMESTAMP+ storehostAddressLength // 12 STOREHOSTADDRESS+ 4 // 13 RECONSUMETIMES+ 8; // 14 Prepared Transaction Offsetint topicLengthPosition = bodySizePosition + 4 + byteBuffer.getInt(bodySizePosition);byte topicLength = byteBuffer.get(topicLengthPosition);short propertiesLength = byteBuffer.getShort(topicLengthPosition + 1 + topicLength);byteBuffer.position(topicLengthPosition + 1 + topicLength + 2);if (propertiesLength > 0) {byte[] properties = new byte[propertiesLength];byteBuffer.get(properties);String propertiesString = new String(properties, CHARSET_UTF8);Map<String, String> map = string2messageProperties(propertiesString);return map;}return null;}public static MessageExt decode(ByteBuffer byteBuffer) {return decode(byteBuffer, true, true, false);}public static MessageExt clientDecode(ByteBuffer byteBuffer, final boolean readBody) {return decode(byteBuffer, readBody, true, true);}public static MessageExt decode(ByteBuffer byteBuffer, final boolean readBody) {return decode(byteBuffer, readBody, true, false);}public static byte[] encode(MessageExt messageExt, boolean needCompress) throws Exception {byte[] body = messageExt.getBody();byte[] topics = messageExt.getTopic().getBytes(CHARSET_UTF8);byte topicLen = (byte) topics.length;String properties = messageProperties2String(messageExt.getProperties());byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);short propertiesLength = (short) propertiesBytes.length;int sysFlag = messageExt.getSysFlag();int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;byte[] newBody = messageExt.getBody();if (needCompress && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {Compressor compressor = CompressorFactory.getCompressor(MessageSysFlag.getCompressionType(sysFlag));newBody = compressor.compress(body, 5);}int bodyLength = newBody.length;int storeSize = messageExt.getStoreSize();ByteBuffer byteBuffer;if (storeSize > 0) {byteBuffer = ByteBuffer.allocate(storeSize);} else {storeSize = 4 // 1 TOTALSIZE+ 4 // 2 MAGICCODE+ 4 // 3 BODYCRC+ 4 // 4 QUEUEID+ 4 // 5 FLAG+ 8 // 6 QUEUEOFFSET+ 8 // 7 PHYSICALOFFSET+ 4 // 8 SYSFLAG+ 8 // 9 BORNTIMESTAMP+ bornhostLength // 10 BORNHOST+ 8 // 11 STORETIMESTAMP+ storehostAddressLength // 12 STOREHOSTADDRESS+ 4 // 13 RECONSUMETIMES+ 8 // 14 Prepared Transaction Offset+ 4 + bodyLength // 14 BODY+ 1 + topicLen // 15 TOPIC+ 2 + propertiesLength // 16 propertiesLength+ 0;byteBuffer = ByteBuffer.allocate(storeSize);}// 1 TOTALSIZEbyteBuffer.putInt(storeSize);// 2 MAGICCODEbyteBuffer.putInt(MESSAGE_MAGIC_CODE);// 3 BODYCRCint bodyCRC = messageExt.getBodyCRC();byteBuffer.putInt(bodyCRC);// 4 QUEUEIDint queueId = messageExt.getQueueId();byteBuffer.putInt(queueId);// 5 FLAGint flag = messageExt.getFlag();byteBuffer.putInt(flag);// 6 QUEUEOFFSETlong queueOffset = messageExt.getQueueOffset();byteBuffer.putLong(queueOffset);// 7 PHYSICALOFFSETlong physicOffset = messageExt.getCommitLogOffset();byteBuffer.putLong(physicOffset);// 8 SYSFLAGbyteBuffer.putInt(sysFlag);// 9 BORNTIMESTAMPlong bornTimeStamp = messageExt.getBornTimestamp();byteBuffer.putLong(bornTimeStamp);// 10 BORNHOSTInetSocketAddress bornHost = (InetSocketAddress) messageExt.getBornHost();byteBuffer.put(bornHost.getAddress().getAddress());byteBuffer.putInt(bornHost.getPort());// 11 STORETIMESTAMPlong storeTimestamp = messageExt.getStoreTimestamp();byteBuffer.putLong(storeTimestamp);// 12 STOREHOSTInetSocketAddress serverHost = (InetSocketAddress) messageExt.getStoreHost();byteBuffer.put(serverHost.getAddress().getAddress());byteBuffer.putInt(serverHost.getPort());// 13 RECONSUMETIMESint reconsumeTimes = messageExt.getReconsumeTimes();byteBuffer.putInt(reconsumeTimes);// 14 Prepared Transaction Offsetlong preparedTransactionOffset = messageExt.getPreparedTransactionOffset();byteBuffer.putLong(preparedTransactionOffset);// 15 BODYbyteBuffer.putInt(bodyLength);byteBuffer.put(newBody);// 16 TOPICbyteBuffer.put(topicLen);byteBuffer.put(topics);// 17 propertiesbyteBuffer.putShort(propertiesLength);byteBuffer.put(propertiesBytes);return byteBuffer.array();}public static MessageExt decode(ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) {return decode(byteBuffer, readBody, deCompressBody, false);}public static MessageExt decode(ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody, final boolean isClient) {try {MessageExt msgExt;if (isClient) {msgExt = new MessageClientExt();} else {msgExt = new MessageExt();}// 1 TOTALSIZEint storeSize = byteBuffer.getInt();msgExt.setStoreSize(storeSize);// 2 MAGICCODEbyteBuffer.getInt();// 3 BODYCRCint bodyCRC = byteBuffer.getInt();msgExt.setBodyCRC(bodyCRC);// 4 QUEUEIDint queueId = byteBuffer.getInt();msgExt.setQueueId(queueId);// 5 FLAGint flag = byteBuffer.getInt();msgExt.setFlag(flag);// 6 QUEUEOFFSETlong queueOffset = byteBuffer.getLong();msgExt.setQueueOffset(queueOffset);// 7 PHYSICALOFFSETlong physicOffset = byteBuffer.getLong();msgExt.setCommitLogOffset(physicOffset);// 8 SYSFLAGint sysFlag = byteBuffer.getInt();msgExt.setSysFlag(sysFlag);// 9 BORNTIMESTAMPlong bornTimeStamp = byteBuffer.getLong();msgExt.setBornTimestamp(bornTimeStamp);// 10 BORNHOSTint bornhostIPLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 : 16;byte[] bornHost = new byte[bornhostIPLength];byteBuffer.get(bornHost, 0, bornhostIPLength);int port = byteBuffer.getInt();msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port));// 11 STORETIMESTAMPlong storeTimestamp = byteBuffer.getLong();msgExt.setStoreTimestamp(storeTimestamp);// 12 STOREHOSTint storehostIPLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 : 16;byte[] storeHost = new byte[storehostIPLength];byteBuffer.get(storeHost, 0, storehostIPLength);port = byteBuffer.getInt();msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port));// 13 RECONSUMETIMESint reconsumeTimes = byteBuffer.getInt();msgExt.setReconsumeTimes(reconsumeTimes);// 14 Prepared Transaction Offsetlong preparedTransactionOffset = byteBuffer.getLong();msgExt.setPreparedTransactionOffset(preparedTransactionOffset);// 15 BODYint bodyLen = byteBuffer.getInt();if (bodyLen > 0) {if (readBody) {byte[] body = new byte[bodyLen];byteBuffer.get(body);// uncompress bodyif (deCompressBody && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {Compressor compressor = CompressorFactory.getCompressor(MessageSysFlag.getCompressionType(sysFlag));body = compressor.decompress(body);}msgExt.setBody(body);} else {byteBuffer.position(byteBuffer.position() + bodyLen);}}// 16 TOPICbyte topicLen = byteBuffer.get();byte[] topic = new byte[(int) topicLen];byteBuffer.get(topic);msgExt.setTopic(new String(topic, CHARSET_UTF8));// 17 propertiesshort propertiesLength = byteBuffer.getShort();if (propertiesLength > 0) {byte[] properties = new byte[propertiesLength];byteBuffer.get(properties);String propertiesString = new String(properties, CHARSET_UTF8);Map<String, String> map = string2messageProperties(propertiesString);msgExt.setProperties(map);}int msgIDLength = storehostIPLength + 4 + 8;ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIDLength);String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());msgExt.setMsgId(msgId);if (isClient) {((MessageClientExt) msgExt).setOffsetMsgId(msgId);}return msgExt;} catch (Exception e) {byteBuffer.position(byteBuffer.limit());}return null;}public static List<MessageExt> decodes(ByteBuffer byteBuffer) {return decodes(byteBuffer, true);}public static List<MessageExt> decodes(ByteBuffer byteBuffer, final boolean readBody) {List<MessageExt> msgExts = new ArrayList<MessageExt>();while (byteBuffer.hasRemaining()) {MessageExt msgExt = clientDecode(byteBuffer, readBody);if (null != msgExt) {msgExts.add(msgExt);} else {break;}}return msgExts;}public static String messageProperties2String(Map<String, String> properties) {if (properties == null) {return "";}int len = 0;for (final Map.Entry<String, String> entry : properties.entrySet()) {final String name = entry.getKey();final String value = entry.getValue();if (value == null) {continue;}if (name != null) {len += name.length();}len += value.length();len += 2; // separator}StringBuilder sb = new StringBuilder(len);if (properties != null) {for (final Map.Entry<String, String> entry : properties.entrySet()) {final String name = entry.getKey();final String value = entry.getValue();if (value == null) {continue;}sb.append(name);sb.append(NAME_VALUE_SEPARATOR);sb.append(value);sb.append(PROPERTY_SEPARATOR);}if (sb.length() > 0) {sb.deleteCharAt(sb.length() - 1);}}return sb.toString();}public static Map<String, String> string2messageProperties(final String properties) {Map<String, String> map = new HashMap<String, String>();if (properties != null) {int len = properties.length();int index = 0;while (index < len) {int newIndex = properties.indexOf(PROPERTY_SEPARATOR, index);if (newIndex < 0) {newIndex = len;}if (newIndex - index >= 3) {int kvSepIndex = properties.indexOf(NAME_VALUE_SEPARATOR, index);if (kvSepIndex > index && kvSepIndex < newIndex - 1) {String k = properties.substring(index, kvSepIndex);String v = properties.substring(kvSepIndex + 1, newIndex);map.put(k, v);}}index = newIndex + 1;}}return map;}public static byte[] encodeMessage(Message message) {//only need flag, body, propertiesbyte[] body = message.getBody();int bodyLen = body.length;String properties = messageProperties2String(message.getProperties());byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);//note properties length must not more than Short.MAXint propsLen = propertiesBytes.length;if (propsLen > Short.MAX_VALUE)throw new RuntimeException(String.format("Properties size of message exceeded, properties size: {}, maxSize: {}.", propsLen, Short.MAX_VALUE));short propertiesLength = (short) propsLen;int sysFlag = message.getFlag();int storeSize = 4 // 1 TOTALSIZE+ 4 // 2 MAGICCOD+ 4 // 3 BODYCRC+ 4 // 4 FLAG+ 4 + bodyLen // 4 BODY+ 2 + propertiesLength;ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);// 1 TOTALSIZEbyteBuffer.putInt(storeSize);// 2 MAGICCODEbyteBuffer.putInt(0);// 3 BODYCRCbyteBuffer.putInt(0);// 4 FLAGint flag = message.getFlag();byteBuffer.putInt(flag);// 5 BODYbyteBuffer.putInt(bodyLen);byteBuffer.put(body);// 6 propertiesbyteBuffer.putShort(propertiesLength);byteBuffer.put(propertiesBytes);return byteBuffer.array();}public static Message decodeMessage(ByteBuffer byteBuffer) throws Exception {Message message = new Message();// 1 TOTALSIZEbyteBuffer.getInt();// 2 MAGICCODEbyteBuffer.getInt();// 3 BODYCRCbyteBuffer.getInt();// 4 FLAGint flag = byteBuffer.getInt();message.setFlag(flag);// 5 BODYint bodyLen = byteBuffer.getInt();byte[] body = new byte[bodyLen];byteBuffer.get(body);message.setBody(body);// 6 propertiesshort propertiesLen = byteBuffer.getShort();byte[] propertiesBytes = new byte[propertiesLen];byteBuffer.get(propertiesBytes);message.setProperties(string2messageProperties(new String(propertiesBytes, CHARSET_UTF8)));return message;}public static byte[] encodeMessages(List<Message> messages) {//TO DO refactor, accumulate in one buffer, avoid copiesList<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());int allSize = 0;for (Message message : messages) {byte[] tmp = encodeMessage(message);encodedMessages.add(tmp);allSize += tmp.length;}byte[] allBytes = new byte[allSize];int pos = 0;for (byte[] bytes : encodedMessages) {System.arraycopy(bytes, 0, allBytes, pos, bytes.length);pos += bytes.length;}return allBytes;}public static List<Message> decodeMessages(ByteBuffer byteBuffer) throws Exception {//TO DO add a callback for processing,  avoid creating listsList<Message> msgs = new ArrayList<Message>();while (byteBuffer.hasRemaining()) {Message msg = decodeMessage(byteBuffer);msgs.add(msg);}return msgs;}
}

这个 MsgId 是在 Broker 生成的,Producer 在发送消息时没有该信息,Consumer 在消费消息时则能获取到该值。

RocketMQ 支持普通消息、分区有序消息、全局有序消息、延迟消息和事务消息。

普通消息:普通消息也称为并发消息,和传统的队列相比,并发消息没有顺序,但是生产者和消费者都是并行进行的,单机性能可达十万级别的 tps。

分区顺序消息:分区有序消息又称普通有序消息,与 Kafka 的分区类似,把一个 Topic 消息分为多个分区“保存”起来,在一个分区内的消息就是传统的队列,遵循 FIFO(先进先出)原则,所以是有序的,于是消费者通过同一个消息队列(Topic 分区,称作 Message Queue) 收到的消息也是有顺序的,但是不同消息队列收到的消息则可能是无顺序的。

全局顺序消息:全局有序消息又称严格顺序消息,消费者收到的所有消息均是有顺序的。可以理解为把一个 Topic 的分区数设置为 1,那么该 Topic 中的消息就是单分区,所有的消息都遵循 FIFO(先进先出)队列,所以是有序的。

事务消息:RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。主要涉及分布式事务,即需要保证在多个操作同时成功或者同时失败时消费者才能消费消息。RocketMQ 通过发送 Half 消息、处理本地事务、提交消息或者回滚消息可以优雅地实现分布式事务。

延迟消息:消息发送到 broker 后,不会立即被消费,等待特定时间投递给真正的 topic。也可以理解为消费者要在一定时间之后,或者指定某个时间点才可以消费消息。在没有延迟消息时,基本的做法是给予定时计划任务调度,定时发送消息。在 RocketMQ 中只需要在发送消息时设置延迟级别即可实现。 broker 中有配置项 messageDelayLevel:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

总共 18 个 level。可以配置自定义 messageDelayLevel。注意,messageDelayLevel 是 broker 的属性,不属于某个 topic。发消息时,设置 delayLevel 等级即可:msg.setDelayLevel(level)。level有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1 <= leve l<= maxLevel,消息延迟特定时间,例如 level == 1,表示延迟 1s
  • level > maxLevel,则 level == maxLevel,例如 level == 20,表示延迟 2h

定时消息会暂存在名为 SCHEDULE_TOPIC_XXXX 的 topic 中,并根据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel - 1,即一个 queue 只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker 会调度地消费 SCHEDULE_TOPIC_XXXX,将消息写入真实的 topic。

需要注意的是,定时消息会在第一次写入和调度写入真实 topic 时都会计数,因此发送数量、tps 都会变高。

RocketMQ 消息结构和消息类型相关推荐

  1. 企业微信 接收消息服务器,接收消息与事件

    [TOC] 关于接收消息 为了能够让自建应用和企业微信进行双向通信,企业可以在应用的管理后台开启接收消息模式. 开启接收消息模式的企业,需要提供可用的接收消息服务器URL. 开启接收消息模式后,用户在 ...

  2. 【RocketMQ】玩转各种类型的消息

    顺序消息 消息有序指的是可以按照消息的发送顺序来消费(FIFO).RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序. 顺序消费的原理解析,在默认的情况下消息发送会采取Round R ...

  3. 消息中间件:RocketMQ 介绍(特性、术语、原理、优缺点、消息顺序、消息重复)

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到教程. 消息中间件的作用 1. 应用解耦 2. 异步处理 比如用户注册场景,注册主流程完成以后,需要调用邮件 ...

  4. rocketmq 重复消费_消息队列 RocketMQ

    引言 本文整理了RocketMQ的相关知识,方便以后查阅. 功能介绍 简单来说,消息队列就是基础数据结构课程里"先进先出"的一种数据结构,但是如果要消除单点故障,保证消息传输的可靠 ...

  5. RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?

    文章目录 一.前言 二.消息轨迹 1.消息轨迹的引入目的 2.如何使用消息轨迹 1)使用案例 2)消息轨迹内容 3) RocketMQ-Console中查看消息轨迹 3.消息轨迹实现原理 1)消息轨迹 ...

  6. RocketMQ同步消息、异步消息、单向消息详解

    一.RocketMQ 支持 3 种消息发送方式 : 1.同步消息(sync message ) producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送 ...

  7. 【RocketMQ工作原理】消息的存储

    RocketMQ中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下的store目录中. abort:该文件在Broker启动后会自动创建,正常关闭Broker,该文件会自动消失.若在没有 ...

  8. HTTP学习笔记:HTTP的消息结构

    Request 消息结构: Response消息结构: 请求方法: 一个URL地址用于描述一个网络上的资源,而HTTP中的GET, POST, PUT, DELETE就对应着对这个资源的查,改,增,删 ...

  9. 云栖发布|阿里云消息队列 RocketMQ 5.0:消息、事件、流融合处理平台

    简介:RocketMQ5.0 的发布标志着阿里云消息正式从消息领域正式迈向了"消息.事件.流"场景大融合的新局面. 引言:从"消息"到"消息.事件.流 ...

最新文章

  1. linux每天进步一点点-7月22日
  2. C++:迭代器(STL迭代器)iterator详解
  3. const对象,NULL和nullptr,C++中创建对象数组
  4. leetcode sql
  5. linux qt lgl,Linux下QT、cannot find -lGL、
  6. 安卓uc斗鱼html5,斗鱼HTML5播放器
  7. MissileDatcom 导弹气动计算
  8. 5.5G产业再提速!高通5GAdvanced-ready芯片商用终端下半年面世
  9. MySQL 乱七八糟的可重复读隔离级别实现
  10. 叶公好龙——存在与逻辑
  11. 积分竟然比微分早了1300年!一文讲清积分的历史
  12. stm32正常运行流程图_STM32单片机学习笔记(超详细整理143个问题,学习必看)...
  13. 刷题记录-NPUCTF2020(web部分)
  14. iOS 设计模式 浅析MVC、MVP、MVVM
  15. 乐教乐学各关的解(3-10)
  16. OpenGL 视差贴图 Parallax Mapping
  17. 服务器和kad正在连接,P2P连不上kad网络怎么解决?P2P连不上kad网络的处理方法教程详解...
  18. Messenger更改系统语言以后无法登陆,提示“初始设置被修改”
  19. 基于java毕设springboot餐厅预约订座系统 毕业设计毕设源码毕业设计论文开题报告参考(2)后台功能
  20. --legacy-peer-deps 有什么用

热门文章

  1. RFID电子标签对比传统条形码有哪些优势
  2. Required Long parameter is not present,SpringMVC的参数传递问题
  3. 中兴路由器 ZXR10 6800启用pptp支持
  4. 【Gaze】Generating Image Descriptions via Sequential Cross-Modal Alignment Guided by Human Gaze
  5. 【回忆一下】我的第一个课程设计(还记得你的第一个课设大作业吗?)
  6. 听说你的JWT库用起来特别扭,推荐这款贼好用的!
  7. Python3源码编译和使用静态链接库lib动态链接库dll详细介绍
  8. 推进自动化意味着人类需要接受终身学习
  9. 旁路流量检测--流量镜像
  10. 企业订货系统,手机订货系统整理了一下内容介绍和功能名称