本系列是学习SpringBoot整合RabbitMQ的练手,包含服务安装,RabbitMQ整合SpringBoot2.x,消息可靠性投递实现等三篇博客。

  学习路径:https://www.imooc.com/learn/1042 RabbitMQ消息中间件极速入门与实战

  项目源码:https://github.com/ZbLeaning/Boot-RabbitMQ


设计一个消息可靠性投递方案,服务结构如下:

组成:

  Sender+Confirm Listener :组成消息的生产者

  MQ Broker:消息的消费者,包含具体的MQ服务

  BIZ DB:业务数据数据库

  MSG DB:消息日志记录数据库(0:发送中、1:发送成功、2:发送失败)

思路:

  以最常见的创建订单业务来举例,假设订单创建成功后需要去发短信通知用户

  1、先完成订单业务数据的存储,并记录这条操作日志(发送中)

  2、生产者发送一条消息到消费者(异步)

  3、消费者成功消费后给给Confirm listener发送应答

  4、监听收到消息确认成功后,对消息日志表操作,修改之前的日志状态(发送成功)

  5、在消费端返回应答的过程中,可能发生网络异常导致生产者未收到应答消息,因此需要一个定时任务去捞取状态是发送中并已经超时的消息集合

  6、将捞取到的日志对应的消息,进行重发

  7、定时任务判断设置的消息最大重投次数,大于最大重投次数就判断消息发送失败,更新日志记录状态(发送失败)


项目搭建

  Durid数据源配置文件

//druid.properties
##下面为连接池的补充设置,应用到上面所有数据源中
#初始化大小,最小,最大
druid.initialSize=5
druid.minIdle=10
druid.maxActive=300
#配置获取连接等待超时的时间
druid.maxWait=60000
#配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
druid.timeBetweenEvictionRunsMillis=60000
#配置一个连接在池中最小生存的时间,单位是毫秒
druid.minEvictableIdleTimeMillis=300000
druid.validationQuery=SELECT 1 FROM DUAL
druid.testWhileIdle=true
druid.testOnBorrow=false
druid.testOnReturn=false
#打开PSCache,并且指定每个连接上PSCache的大小
druid.poolPreparedStatements=true
druid.maxPoolPreparedStatementPerConnectionSize=20
#配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
druid.filters=stat,wall,log4j
#通过connectProperties属性来打开mergeSql功能;慢SQL记录
druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
#合并多个DruidDataSource的监控数据
druid.useGlobalDataSourceStat=true

  添加相应的数据源配置类、定时任务配置类、常量类

package com.imooc.mq.config.database;import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.stereotype.Component;/*** @Title: DruidDataSourceSettings* @Description: Druid数据源读取配置* @date 2019/1/2214:31*/
@Component
@ConfigurationProperties(prefix = "spring.datasource")
@PropertySource("classpath:druid.properties")
public class DruidDataSourceSettings {private String driverClassName;private String url;private String username;private String password;@Value("${druid.initialSize}")private int initialSize;@Value("${druid.minIdle}")private int minIdle;@Value("${druid.maxActive}")private int maxActive;@Value("${druid.timeBetweenEvictionRunsMillis}")private long timeBetweenEvictionRunsMillis;@Value("${druid.minEvictableIdleTimeMillis}")private long minEvictableIdleTimeMillis;@Value("${druid.validationQuery}")private String validationQuery;@Value("${druid.testWhileIdle}")private boolean testWhileIdle;@Value("${druid.testOnBorrow}")private boolean testOnBorrow;@Value("${druid.testOnReturn}")private boolean testOnReturn;@Value("${druid.poolPreparedStatements}")private boolean poolPreparedStatements;@Value("${druid.maxPoolPreparedStatementPerConnectionSize}")private int maxPoolPreparedStatementPerConnectionSize;@Value("${druid.filters}")private String filters;@Value("${druid.connectionProperties}")private String connectionProperties;@Beanpublic static PropertySourcesPlaceholderConfigurer properdtyConfigure(){return new PropertySourcesPlaceholderConfigurer();}public String getDriverClassName() {return driverClassName;}public void setDriverClassName(String driverClassName) {this.driverClassName = driverClassName;}public String getUrl() {return url;}public void setUrl(String url) {this.url = url;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public int getInitialSize() {return initialSize;}public void setInitialSize(int initialSize) {this.initialSize = initialSize;}public int getMinIdle() {return minIdle;}public void setMinIdle(int minIdle) {this.minIdle = minIdle;}public int getMaxActive() {return maxActive;}public void setMaxActive(int maxActive) {this.maxActive = maxActive;}public long getTimeBetweenEvictionRunsMillis() {return timeBetweenEvictionRunsMillis;}public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) {this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;}public long getMinEvictableIdleTimeMillis() {return minEvictableIdleTimeMillis;}public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) {this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;}public String getValidationQuery() {return validationQuery;}public void setValidationQuery(String validationQuery) {this.validationQuery = validationQuery;}public boolean isTestWhileIdle() {return testWhileIdle;}public void setTestWhileIdle(boolean testWhileIdle) {this.testWhileIdle = testWhileIdle;}public boolean isTestOnBorrow() {return testOnBorrow;}public void setTestOnBorrow(boolean testOnBorrow) {this.testOnBorrow = testOnBorrow;}public boolean isTestOnReturn() {return testOnReturn;}public void setTestOnReturn(boolean testOnReturn) {this.testOnReturn = testOnReturn;}public boolean isPoolPreparedStatements() {return poolPreparedStatements;}public void setPoolPreparedStatements(boolean poolPreparedStatements) {this.poolPreparedStatements = poolPreparedStatements;}public int getMaxPoolPreparedStatementPerConnectionSize() {return maxPoolPreparedStatementPerConnectionSize;}public void setMaxPoolPreparedStatementPerConnectionSize(int maxPoolPreparedStatementPerConnectionSize) {this.maxPoolPreparedStatementPerConnectionSize = maxPoolPreparedStatementPerConnectionSize;}public String getFilters() {return filters;}public void setFilters(String filters) {this.filters = filters;}public String getConnectionProperties() {return connectionProperties;}public void setConnectionProperties(String connectionProperties) {this.connectionProperties = connectionProperties;}
}

package com.imooc.mq.config.database;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;import javax.sql.DataSource;
import java.sql.SQLException;import com.alibaba.druid.pool.DruidDataSource;
/*** @Title: DruidDataSourceConfig* @Description: Druid数据源初始化** EnableTransactionManagement 开启事务* @date 2019/1/2214:35*/@Configuration
@EnableTransactionManagement
public class DruidDataSourceConfig {private static Logger logger = LoggerFactory.getLogger(com.imooc.mq.config.database.DruidDataSourceConfig.class);//注入数据源配置信息
    @Autowiredprivate DruidDataSourceSettings druidSettings;public static String DRIVER_CLASSNAME;@Beanpublic static PropertySourcesPlaceholderConfigurer propertyConfigure() {return new PropertySourcesPlaceholderConfigurer();}/*** 创建DataSource* @return* @throws SQLException*/@Beanpublic DataSource dataSource() throws SQLException {DruidDataSource ds = new DruidDataSource();ds.setDriverClassName(druidSettings.getDriverClassName());DRIVER_CLASSNAME = druidSettings.getDriverClassName();ds.setUrl(druidSettings.getUrl());ds.setUsername(druidSettings.getUsername());ds.setPassword(druidSettings.getPassword());ds.setInitialSize(druidSettings.getInitialSize());ds.setMinIdle(druidSettings.getMinIdle());ds.setMaxActive(druidSettings.getMaxActive());ds.setTimeBetweenEvictionRunsMillis(druidSettings.getTimeBetweenEvictionRunsMillis());ds.setMinEvictableIdleTimeMillis(druidSettings.getMinEvictableIdleTimeMillis());ds.setValidationQuery(druidSettings.getValidationQuery());ds.setTestWhileIdle(druidSettings.isTestWhileIdle());ds.setTestOnBorrow(druidSettings.isTestOnBorrow());ds.setTestOnReturn(druidSettings.isTestOnReturn());ds.setPoolPreparedStatements(druidSettings.isPoolPreparedStatements());ds.setMaxPoolPreparedStatementPerConnectionSize(druidSettings.getMaxPoolPreparedStatementPerConnectionSize());ds.setFilters(druidSettings.getFilters());ds.setConnectionProperties(druidSettings.getConnectionProperties());logger.info(" druid datasource config : {} ", ds);return ds;}/*** 开启事务* @return* @throws Exception*/@Beanpublic PlatformTransactionManager transactionManager() throws Exception {DataSourceTransactionManager txManager = new DataSourceTransactionManager();txManager.setDataSource(dataSource());return txManager;}
}

package com.imooc.mq.config.database;import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;import javax.sql.DataSource;/*** @Title: MybatisDataSourceConfig* @Description: 整合mybatis和Druid* @date 2019/1/2214:39*/
@Configuration
public class MybatisDataSourceConfig {@Autowiredprivate DataSource dataSource;@Bean(name="sqlSessionFactory")public SqlSessionFactory sqlSessionFactoryBean() {SqlSessionFactoryBean bean = new SqlSessionFactoryBean();bean.setDataSource(dataSource);// 添加XML目录ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();try {bean.setMapperLocations(resolver.getResources("classpath:mapping/*.xml"));SqlSessionFactory sqlSessionFactory = bean.getObject();sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);return sqlSessionFactory;} catch (Exception e) {throw new RuntimeException(e);}}@Beanpublic SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {return new SqlSessionTemplate(sqlSessionFactory);}
}

package com.imooc.mq.config.database;import org.mybatis.spring.mapper.MapperScannerConfigurer;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*** @Title: MybatisMapperScanerConfig* @Description: 扫码Mybatis* @AutoConfigureAfter(MybatisDataSourceConfig.class) 先加载数据源类,再加载该类* @date 2019/1/2214:43*/
@Configuration
@AutoConfigureAfter(MybatisDataSourceConfig.class)
public class MybatisMapperScanerConfig {@Beanpublic MapperScannerConfigurer mapperScannerConfigurer() {MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();mapperScannerConfigurer.setSqlSessionFactoryBeanName("sqlSessionFactory");mapperScannerConfigurer.setBasePackage("com.imooc.mq.mapper");return mapperScannerConfigurer;}
}

package com.imooc.mq.config.task;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/*** @Title: TaskSchedulerConfig* @Description: 定时任务配置* @date 2019/1/2214:46*/
@Configuration
@EnableScheduling //启动定时任务
public class TaskSchedulerConfig  implements SchedulingConfigurer {@Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {taskRegistrar.setScheduler(taskScheduler());}/*** 定时任务线程池* @return*/@Bean(destroyMethod = "shutdown")public Executor taskScheduler(){return Executors.newScheduledThreadPool(100);}
}

package com.imooc.mq.constant;/*** @Title: Constans* @Description: 常量* @date 2019/1/2214:50*/
public class Constans {/*** 发送中*/public static final String ORDER_SENDING = "0";/*** 发送成功*/public static final String ORDER_SEND_SUCCESS = "1";/*** 发送失败*/public static final String ORDER_SEND_FAILURE = "2";/*** 分钟超时单位:min*/public static final int ORDER_TIMEOUT = 1;
}


相应的mapper接口和mapper.xml文件配置

package com.imooc.mq.mapper;import com.imooc.mq.entity.BrokerMessageLog;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;import java.util.Date;
import java.util.List;/*** @Title: BrokerMessageLogMapper* @Description: 消息记录接口* @date 2019/1/2214:45*/
@Repository
public interface BrokerMessageLogMapper {/*** 查询消息状态为0(发送中) 且已经超时的消息集合* @return*/List<BrokerMessageLog> query4StatusAndTimeoutMessage();/*** 重新发送统计count发送次数 +1* @param messageId* @param updateTime*/void update4ReSend(@Param("messageId")String messageId, @Param("updateTime") Date updateTime);/*** 更新最终消息发送结果 成功 or 失败* @param messageId* @param status* @param updateTime*/void changeBrokerMessageLogStatus(@Param("messageId")String messageId, @Param("status")String status, @Param("updateTime")Date updateTime);int insertSelective(BrokerMessageLog record);
}
------------------------------------------------------------------
package com.imooc.mq.mapper;import com.imooc.mq.entity.Order;
import org.springframework.stereotype.Repository;/*** @Title: OrderMapper* @Description: 订单接口* @date 2019/1/2214:45*/
@Repository
public interface OrderMapper {int insert(Order record);int deleteByPrimaryKey(Integer id);int insertSelective(Order record);Order selectByPrimaryKey(Integer id);int updateByPrimaryKeySelective(Order record);int updateByPrimaryKey(Order record);
}

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.imooc.mq.mapper.BrokerMessageLogMapper" ><resultMap id="BaseResultMap" type="com.imooc.mq.entity.BrokerMessageLog" ><id column="message_id" property="messageId" jdbcType="VARCHAR" /><result column="message" property="message" jdbcType="VARCHAR" /><result column="try_count" property="tryCount" jdbcType="INTEGER" /><result column="status" property="status" jdbcType="VARCHAR" /><result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" /><result column="create_time" property="createTime" jdbcType="TIMESTAMP" /><result column="update_time" property="updateTime" jdbcType="TIMESTAMP" /></resultMap><sql id="Base_Column_List" >message_id, message, try_count, status, next_retry, create_time, update_time</sql><select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >select<include refid="Base_Column_List" />from broker_message_logwhere message_id = #{messageId,jdbcType=VARCHAR}</select><delete id="deleteByPrimaryKey" parameterType="java.lang.String" >delete from broker_message_logwhere message_id = #{messageId,jdbcType=VARCHAR}</delete><insert id="insert" parameterType="com.imooc.mq.entity.BrokerMessageLog" >insert into broker_message_log (message_id, message, try_count,status, next_retry, create_time,update_time)values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR}, #{tryCount,jdbcType=INTEGER},#{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP},#{updateTime,jdbcType=TIMESTAMP})</insert><insert id="insertSelective" parameterType="com.imooc.mq.entity.BrokerMessageLog" >insert into broker_message_log<trim prefix="(" suffix=")" suffixOverrides="," ><if test="messageId != null" >message_id,</if><if test="message != null" >message,</if><if test="tryCount != null" >try_count,</if><if test="status != null" >status,</if><if test="nextRetry != null" >next_retry,</if><if test="createTime != null" >create_time,</if><if test="updateTime != null" >update_time,</if></trim><trim prefix="values (" suffix=")" suffixOverrides="," ><if test="messageId != null" >#{messageId,jdbcType=VARCHAR},</if><if test="message != null" >#{message,jdbcType=VARCHAR},</if><if test="tryCount != null" >#{tryCount,jdbcType=INTEGER},</if><if test="status != null" >#{status,jdbcType=VARCHAR},</if><if test="nextRetry != null" >#{nextRetry,jdbcType=TIMESTAMP},</if><if test="createTime != null" >#{createTime,jdbcType=TIMESTAMP},</if><if test="updateTime != null" >#{updateTime,jdbcType=TIMESTAMP},</if></trim></insert><update id="updateByPrimaryKeySelective" parameterType="com.imooc.mq.entity.BrokerMessageLog" >update broker_message_log<set ><if test="message != null" >message = #{message,jdbcType=VARCHAR},</if><if test="tryCount != null" >try_count = #{tryCount,jdbcType=INTEGER},</if><if test="status != null" >status = #{status,jdbcType=VARCHAR},</if><if test="nextRetry != null" >next_retry = #{nextRetry,jdbcType=TIMESTAMP},</if><if test="createTime != null" >create_time = #{createTime,jdbcType=TIMESTAMP},</if><if test="updateTime != null" >update_time = #{updateTime,jdbcType=TIMESTAMP},</if></set>where message_id = #{messageId,jdbcType=VARCHAR}</update><update id="updateByPrimaryKey" parameterType="com.imooc.mq.entity.BrokerMessageLog" >update broker_message_logset message = #{message,jdbcType=VARCHAR},try_count = #{tryCount,jdbcType=INTEGER},status = #{status,jdbcType=VARCHAR},next_retry = #{nextRetry,jdbcType=TIMESTAMP},create_time = #{createTime,jdbcType=TIMESTAMP},update_time = #{updateTime,jdbcType=TIMESTAMP}where message_id = #{messageId,jdbcType=VARCHAR}</update><select id="query4StatusAndTimeoutMessage" resultMap="BaseResultMap"><![CDATA[select message_id, message, try_count, status, next_retry, create_time, update_timefrom broker_message_log bmlwhere status = '0'and next_retry <= sysdate()]]></select><update id="update4ReSend" >update broker_message_log bmlset bml.try_count = bml.try_count + 1,bml.update_time = #{updateTime, jdbcType=TIMESTAMP}where bml.message_id = #{messageId,jdbcType=VARCHAR}</update><update id="changeBrokerMessageLogStatus" >update broker_message_log bmlset bml.status = #{status,jdbcType=VARCHAR},bml.update_time = #{updateTime, jdbcType=TIMESTAMP}where bml.message_id = #{messageId,jdbcType=VARCHAR}</update></mapper>
-------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.imooc.mq.mapper.OrderMapper" ><resultMap id="BaseResultMap" type="com.imooc.mq.entity.Order" ><id column="id" property="id" jdbcType="INTEGER" /><result column="name" property="name" jdbcType="VARCHAR" /><result column="message_id" property="messageId" jdbcType="VARCHAR" /></resultMap><sql id="Example_Where_Clause" ><where ><foreach collection="oredCriteria" item="criteria" separator="or" ><if test="criteria.valid" ><trim prefix="(" suffix=")" prefixOverrides="and" ><foreach collection="criteria.criteria" item="criterion" ><choose ><when test="criterion.noValue" >and ${criterion.condition}</when><when test="criterion.singleValue" >and ${criterion.condition} #{criterion.value}</when><when test="criterion.betweenValue" >and ${criterion.condition} #{criterion.value} and #{criterion.secondValue}</when><when test="criterion.listValue" >and ${criterion.condition}<foreach collection="criterion.value" item="listItem" open="(" close=")" separator="," >#{listItem}</foreach></when></choose></foreach></trim></if></foreach></where></sql><sql id="Update_By_Example_Where_Clause" ><where ><foreach collection="example.oredCriteria" item="criteria" separator="or" ><if test="criteria.valid" ><trim prefix="(" suffix=")" prefixOverrides="and" ><foreach collection="criteria.criteria" item="criterion" ><choose ><when test="criterion.noValue" >and ${criterion.condition}</when><when test="criterion.singleValue" >and ${criterion.condition} #{criterion.value}</when><when test="criterion.betweenValue" >and ${criterion.condition} #{criterion.value} and #{criterion.secondValue}</when><when test="criterion.listValue" >and ${criterion.condition}<foreach collection="criterion.value" item="listItem" open="(" close=")" separator="," >#{listItem}</foreach></when></choose></foreach></trim></if></foreach></where></sql><sql id="Base_Column_List" >id, name, message_id</sql><select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer" >select<include refid="Base_Column_List" />from t_orderwhere id = #{id,jdbcType=INTEGER}</select><delete id="deleteByPrimaryKey" parameterType="java.lang.Integer" >delete from t_orderwhere id = #{id,jdbcType=INTEGER}</delete><insert id="insert" parameterType="com.imooc.mq.entity.Order" >insert into t_order (id, name, message_id)values (#{id,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR}, #{messageId,jdbcType=VARCHAR})</insert><insert id="insertSelective" parameterType="com.imooc.mq.entity.Order" >insert into t_order<trim prefix="(" suffix=")" suffixOverrides="," ><if test="id != null" >id,</if><if test="name != null" >name,</if><if test="messageId != null" >message_id,</if></trim><trim prefix="values (" suffix=")" suffixOverrides="," ><if test="id != null" >#{id,jdbcType=INTEGER},</if><if test="name != null" >#{name,jdbcType=VARCHAR},</if><if test="messageId != null" >#{messageId,jdbcType=VARCHAR},</if></trim></insert><update id="updateByExampleSelective" parameterType="map" >update t_order<set ><if test="record.id != null" >id = #{record.id,jdbcType=INTEGER},</if><if test="record.name != null" >name = #{record.name,jdbcType=VARCHAR},</if><if test="record.messageId != null" >message_id = #{record.messageId,jdbcType=VARCHAR},</if></set><if test="_parameter != null" ><include refid="Update_By_Example_Where_Clause" /></if></update><update id="updateByExample" parameterType="map" >update t_orderset id = #{record.id,jdbcType=INTEGER},name = #{record.name,jdbcType=VARCHAR},message_id = #{record.messageId,jdbcType=VARCHAR}<if test="_parameter != null" ><include refid="Update_By_Example_Where_Clause" /></if></update><update id="updateByPrimaryKeySelective" parameterType="com.imooc.mq.entity.Order" >update t_order<set ><if test="name != null" >name = #{name,jdbcType=VARCHAR},</if><if test="messageId != null" >message_id = #{messageId,jdbcType=VARCHAR},</if></set>where id = #{id,jdbcType=INTEGER}</update><update id="updateByPrimaryKey" parameterType="com.imooc.mq.entity.Order" >update t_orderset name = #{name,jdbcType=VARCHAR},message_id = #{messageId,jdbcType=VARCHAR}where id = #{id,jdbcType=INTEGER}</update>
</mapper>

package com.imooc.mq.entity;import java.util.Date;/*** @Title: BrokerMessageLog* @Description: 消息记录* @date 2019/1/2214:29*/
public class BrokerMessageLog {private String messageId;private String message;private Integer tryCount;private String status;private Date nextRetry;private Date createTime;private Date updateTime;public BrokerMessageLog() {}public BrokerMessageLog(String messageId, String message, Integer tryCount, String status, Date nextRetry, Date createTime, Date updateTime) {this.messageId = messageId;this.message = message;this.tryCount = tryCount;this.status = status;this.nextRetry = nextRetry;this.createTime = createTime;this.updateTime = updateTime;}public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}public Integer getTryCount() {return tryCount;}public void setTryCount(Integer tryCount) {this.tryCount = tryCount;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}public Date getNextRetry() {return nextRetry;}public void setNextRetry(Date nextRetry) {this.nextRetry = nextRetry;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}public Date getUpdateTime() {return updateTime;}public void setUpdateTime(Date updateTime) {this.updateTime = updateTime;}
}
--------------------------------------------------------------
package com.imooc.mq.entity;import java.io.Serializable;/*** @Title: Order* @Description: 订单* @date 2019/1/2210:18*/
public class Order implements Serializable {private String id;private String name;//存储消息发送的唯一标识private String messageId;public Order() {}public Order(String id, String name, String messageId) {this.id = id;this.name = name;this.messageId = messageId;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMessageId() {return messageId;}public void setMessageId(String messageId) {this.messageId = messageId;}}


现在开始按照设计思路写实现代码:

  1、首先我们把最核心了生产者写好,生产者组成有基本的消息投递,和监听

package com.imooc.mq.producer;import com.imooc.mq.constant.Constans;
import com.imooc.mq.entity.Order;
import com.imooc.mq.mapper.BrokerMessageLogMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;/*** @Title: RabbitOrderSender* @Description: 消息发送* @date 2019/1/2214:52*/
@Component
public class RabbitOrderSender {private static Logger logger = LoggerFactory.getLogger(RabbitOrderSender.class);@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate BrokerMessageLogMapper brokerMessageLogMapper;/*** Broker应答后,会调用该方法区获取应答结果*/final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {logger.info("correlationData:"+correlationData);String messageId = correlationData.getId();if (ack){//如果返回成功,则进行更新brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constans.ORDER_SEND_SUCCESS,new Date());}else {//失败进行操作:根据具体失败原因选择重试或补偿等手段logger.error("异常处理"+cause);}}};/*** 发送消息方法调用: 构建自定义对象消息* @param order* @throws Exception*/public void sendOrder(Order order) throws Exception {// 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
        rabbitTemplate.setConfirmCallback(confirmCallback);//消息唯一IDCorrelationData correlationData = new CorrelationData(order.getMessageId());rabbitTemplate.convertAndSend("order-exchange1", "order.ABC", order, correlationData);}
}

  2、将定时任务逻辑写好

package com.imooc.mq.task;import com.imooc.mq.constant.Constans;
import com.imooc.mq.entity.BrokerMessageLog;
import com.imooc.mq.entity.Order;
import com.imooc.mq.mapper.BrokerMessageLogMapper;
import com.imooc.mq.producer.RabbitOrderSender;
import com.imooc.mq.utils.FastJsonConvertUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.List;/*** @Title: RetryMessageTasker* @Description: 定时任务* @date 2019/1/2215:45*/
@Component
public class RetryMessageTasker {private static Logger logger = LoggerFactory.getLogger(RetryMessageTasker.class);@Autowiredprivate RabbitOrderSender rabbitOrderSender;@Autowiredprivate BrokerMessageLogMapper brokerMessageLogMapper;/*** 定时任务*/@Scheduled(initialDelay = 5000, fixedDelay = 10000)public void reSend(){logger.info("-----------定时任务开始-----------");//抽取消息状态为0且已经超时的消息集合List<BrokerMessageLog> list = brokerMessageLogMapper.query4StatusAndTimeoutMessage();list.forEach(messageLog -> {//投递三次以上的消息if(messageLog.getTryCount() >= 3){//更新失败的消息brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLog.getMessageId(), Constans.ORDER_SEND_FAILURE, new Date());} else {// 重试投递消息,将重试次数递增brokerMessageLogMapper.update4ReSend(messageLog.getMessageId(),  new Date());Order reSendOrder = FastJsonConvertUtil.convertJSONToObject(messageLog.getMessage(), Order.class);try {rabbitOrderSender.sendOrder(reSendOrder);} catch (Exception e) {e.printStackTrace();logger.error("-----------异常处理-----------");}}});}}

  3、写好消费者的逻辑,直接用上一篇中的消费者代码,修改对应的exchange、queue、路由key就好

package com.imooc.mq.consumer;import com.imooc.mq.entity.Order;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.util.Map;/*** @Title: OrderReceiver* @Description: 消费* @date 2019/1/2211:03*/
@Component
public class OrderReceiver {/*** @RabbitListener 消息监听,可配置交换机、队列、路由key* 该注解会创建队列和交互机 并建立绑定关系* @RabbitHandler 标识此方法如果有消息过来,消费者要调用这个方法* @Payload 消息体* @Headers 消息头* @param order*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order-queue1",declare = "true"),exchange = @Exchange(name = "order-exchange1",declare = "true",type = "topic"),key = "order.ABC"))@RabbitHandlerpublic void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers,Channel channel) throws Exception{//消费者操作System.out.println("------收到消息,开始消费------");System.out.println("订单ID:"+order.getId());Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);//现在是手动确认消息 ACK channel.basicAck(deliveryTag,false);}
}

  4、业务逻辑

package com.imooc.mq.service;import com.imooc.mq.constant.Constans;
import com.imooc.mq.entity.BrokerMessageLog;
import com.imooc.mq.entity.Order;
import com.imooc.mq.mapper.BrokerMessageLogMapper;
import com.imooc.mq.mapper.OrderMapper;
import com.imooc.mq.producer.RabbitOrderSender;
import com.imooc.mq.utils.DateUtils;
import com.imooc.mq.utils.FastJsonConvertUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;/*** @Title: OrderService* @Description: 业务实现* @date 2019/1/2215:41*/
@Service
public class OrderService {private static Logger logger = LoggerFactory.getLogger(OrderService.class);@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate BrokerMessageLogMapper brokerMessageLogMapper;@Autowiredprivate RabbitOrderSender rabbitOrderSender;public void createOrder(Order order)  {try {// 使用当前时间当做订单创建时间(为了模拟一下简化)Date orderTime = new Date();// 插入业务数据
            orderMapper.insert(order);// 插入消息记录表数据BrokerMessageLog brokerMessageLog = new BrokerMessageLog();// 消息唯一ID
            brokerMessageLog.setMessageId(order.getMessageId());// 保存消息整体 转为JSON 格式存储入库
            brokerMessageLog.setMessage(FastJsonConvertUtil.convertObjectToJSON(order));// 设置消息状态为0 表示发送中brokerMessageLog.setStatus("0");// 设置消息未确认超时时间窗口为 一分钟
            brokerMessageLog.setNextRetry(DateUtils.addMinutes(orderTime, Constans.ORDER_TIMEOUT));brokerMessageLog.setCreateTime(new Date());brokerMessageLog.setUpdateTime(new Date());brokerMessageLogMapper.insertSelective(brokerMessageLog);// 发送消息
            rabbitOrderSender.sendOrder(order);} catch (Exception e) {logger.error("订单业务异常{}",e);}}
}

  5、测试

 /*** 测试订单创建*/@Testpublic void createOrder(){Order order = new Order();order.setId("201901228");order.setName("测试订单");order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());try {orderService.createOrder(order);} catch (Exception e) {e.printStackTrace();}}

  先启动消费者服务、再启动生产者服务让定时任务跑起来,最后启动测试方法。消息被消费成功后,日志记录状态被修改为1。测试消息重投的话需要制造一些异常情况,比如修改消费者端跟exchange,生产者找不到该交互机,拿不到回调,就会重试投递。

转载于:https://www.cnblogs.com/zhangbLearn/p/10304976.html

SpringBoot整合RabbitMQ-消息可靠性投递相关推荐

  1. SpringBoot整合RabbitMQ 消息可靠投递、手动ack、延迟队列、死信队列、消息幂等性保障、消息积压

    1.消息可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 ...

  2. RabbitMQ消息可靠性投递及分布式事务最终一致性实现

    RabbitMQ消息可靠性投递就是保证消息生产者能够将消息百分百投递到RabbitMQ服务器,并在传递过程中不丢失.然而在生产环境中由于网络中断.网络不稳定等原因导致消息在投递过程中丢失,这或许会造成 ...

  3. SpringBoot整合RabbitMQ消息队列

    RabbitMQ 一.RabbitMQ介绍 1.1 现存问题 服务调用:两个服务调用时,我们可以通过传统的HTTP方式,让服务A直接去调用服务B的接口,但是这种方式是同步的方式,虽然可以采用Sprin ...

  4. 【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战

    目录 一.绪论 二.生产者 2.1事务机制 2.2confirm模式 串行模式 批量模式 异步模式 三.消费者 3.1手动ACK 一.绪论 上篇文章介绍了rabbitmq的基本知识.交换机类型实战&l ...

  5. 腻害,高人都是这样玩SpringBoot整合RabbitMQ

    一.认识 RabbitMQ RabbitMQ 简介以 AMQP 协议: (1)RabbitMQ 是开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ 底层是用了 ...

  6. SpringBoot使用RabbitMQ消息队列

    RabbitMQ简介 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的 ...

  7. springboot 与rabbitmq集成+生产者投递确认+消费者手动确认+TTL+死信队列+延时队列

    1.生产者的消息可靠性投递机制 1)springboot yml文件配置 spring:rabbitmq:host: 10.0.23.83username: lifwepassword: 123456 ...

  8. Springboot——整合Rabbitmq之Confirm和Return详解

    文章目录 前言 为什么会有Confirm Springboot 整合 Mq 实现 Confirm 监听机制 依赖引入 增加配置文件,设定连接信息 配置队列.交换机,以及对其进行绑定 编写mq消息发送服 ...

  9. SpringBoot整合RabbitMQ(包含生产者和消费者)

    生产者 创建一个SpringBoot项目springboot-producer,作为RabbitMQ的生产者. 在pom文件中引入相关的依赖坐标 <dependency><group ...

最新文章

  1. 1000亿个整数,请找出其中最大的100个
  2. java数据结构二叉树遍历_java数据结构 之 二叉树的遍历(1)
  3. 程序员眼中的UML(2)--克服用例图的恐惧
  4. 请问一个表的update能同时触发两个触发器吗?
  5. 人声处理_10款免费的人声处理工具
  6. 这就是数据分析之数据集成
  7. 遍历删除List中的元素,会报错? 用iterator.remove() 完美解决
  8. java队列类_用Java编写一个队列类
  9. 用TreeWalk提高网速及其在vista中的安装方法
  10. java list取补集_Java 2 个 List 集合数据求并、补集操作
  11. 基于SSM的知识库管理系统
  12. 通过公网访问二级路由器
  13. 苹果x用安兔兔测试html5,安兔兔跑分23万,苹果iPhone X怎么样?
  14. 【愚公系列】2021年11月 攻防世界-进阶题-MISC-025(Miscellaneous-200)
  15. 1.27 Watermelon
  16. Vista Beta下载
  17. 如何删除Chrome地址栏记录?
  18. java 读取文件 报错 java.io.FileNotFoundException
  19. 壹职行帮助学生做好职业规划
  20. 1579 字符串类(II)

热门文章

  1. Cookie 学习案例之三天免登录
  2. Linux 系统安全 - 近期发现的 polkit pkexec 本地提权漏洞(CVE-2021-4034)修复方案
  3. Python 技术篇-1行代码实现语音识别,speech库快速实现简单的语音对话
  4. vue项目设置img标签的默认图片
  5. LinkedHashMap 实现缓存(LRU、FIFO、weakhashMap)
  6. 嵌入式 Jlink中flash.csv和*.jflash文件分析
  7. CTFshow 文件上传 web153
  8. sdut 2136 数据结构实验之二叉树的建立与遍历
  9. ACE Lock类介绍
  10. Dogleg“狗腿”最优化算法