关于持久化到Redis的消息格式,主要是说在Broker上把消息持久化的过程中,需要存储哪些类型的消息,因为我们的消息是分topic的,而每个topic又有若干个queue组成,而我们的topic和queue由于redis存储结构的原因,我们需要将它们分区对应存储一下,而不能像关系型数据库那样灵活,所以要额外设计几个数据结构来存储它们。

一 Topic字典

二 Topic对应的Queue字典

三 Queue里的消息

四 某个客户端对应某个Queue的消费进度

以上四个结构是我们要说的,它们会在推消息,拉消息,删消息时用到,下面一一介绍一下,讲的不好不对的地方,欢迎大家为大叔留言。

一 Topic字典

主要存储每个topic,它是一个set集合,redis的我集合类型之一,每个key是唯一的LindMq_Topic,值value就是我们客户端传来的具体topic的名字,这主要是在删除过期的消息时用的,主是作用是遍历所有的topic消息类型,这样我们在删除消息时,就可以把所有注册的topic都找到了,最后把过期的删除,默认消息存活周期是一天。

删除过期的消息代码如下

 var topicList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQ_TOPICKEY);foreach (var topic in topicList){var queueList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQKEY + topic);foreach (var queue in queueList){var removeKey = LINDMQKEY + queue + "_" + DateTime.Now.AddDays(-1).ToString("yyyyMMdd");RedisClient.RedisManager.Instance.GetDatabase().KeyDelete(removeKey);}}

二 Topic对应的Queue字典

我们知道,为了加大redis的并发量和吞吐量,我们会把大数据键值对设计成多个键,这就像是一个集群环境的sharing,就是将大数据进行分片,而我们的分片规则是采用按对象取模的方式,模数可以自己设置,比较我设置8,那说明我的队列(分片)最多可以被分为8个,这个大家可以去做测试,挺有意思的,比随机数来个直接!而这一次redis里的键就是某个topic,而值就是我们的topic加上队列索引,例如你的topic是zzl,那么队列里的键可能就是zzl0,zzl1,zzl2...

三 Queue里的消息

我们的生产者将消息发送到broker里,然后于broker将消息持久化到具体的存储介质里,当然这里我们用的是Redis,在存储在redis里时,我们的具体队列的键是有后缀的,这主要用于消息的回收,因为我们打算1天回收一次消息,所以我们的消息后缀是个日期变量,当然精确到天就可以了,它可以是这样键名LindMQ_order_Paid4_20161202,每个队列都有自己的后缀,我们在清除消息时也就有了方法了。我们的队列存储结构是比较特殊的sortedSet ,就是可排序的集合,它有权重的概念,我们刚好可以使用这个特性来记录客户端的消费进度,因为我们的权重值在一个redis键/值对里是唯一的。

下面代码选自Push入队列的代码片断,分享给大家

       //存储当前Topic
            RedisClient.RedisManager.Instance.GetDatabase().SetAdd(LINDMQ_TOPICKEY, body.Topic);//要存储到哪个队列body.QueueId = Math.Abs(body.Body.GetHashCode() % BrokerManager.CONFIG_QUEUECOUNT);var dataKey = body.Topic + body.QueueId;RedisClient.RedisManager.Instance.GetDatabase().SetAdd(GetRedisKey(body.Topic), dataKey);//记录偏移var offset = RedisClient.RedisManager.Instance.GetDatabase().SortedSetLength(GetRedisDataKey(dataKey));body.QueueOffset = offset + 1;//存储消息
            RedisClient.RedisManager.Instance.GetDatabase().SortedSetAdd(GetRedisDataKey(dataKey),Utils.SerializeMemoryHelper.SerializeToJson(body),score: body.QueueOffset);

四 某个客户端对应某个Queue的消费进度

消费进度是一个很麻烦的问题,生产者的消息是可以被多个消费者消费的,所以不能使用.net那种简单的Queue机制,出队列后就消失了,这是不靠谱的,万一消失失败了,也会造成消息的丢失!下面我们主要看一下消费进度的存储,它是一个Hash集合,其中redis的键名是LindMQ_ConsumerOffset,而value是一个hash对象,hash里的key是当前队列名+消费者IP地址的hashcode值,hash里的value是这个消费者(客户端)的消费进度(Queue里的权重,Queue的存储结构是一个sortedSet)。

客户端消费的测试代码

            #region Client-LindMQvar consumer = new ConsumerSetting{BrokenName = "test",BrokenAddress = new System.Net.IPEndPoint(IPAddress.Parse("192.168.2.71"), 8406),Callback = new Dictionary<string, Action<MessageBody>>() { {"zzl",(o)=>{Console.WriteLine(o.ToString());Thread.Sleep(1000);}},{"zhz",(o)=>{Console.WriteLine(o.ToString());Thread.Sleep(2000);}}}};var consumerClient = new ConsumerManager(new List<ConsumerSetting> { consumer });consumerClient.Start();#endregion

客户端消费的测试结果

好了,到这里我们的LindMQ里数据存储结构的内容就讲完了,主要使用了redis里的set,sortedSet,hash等数据结构,在设计过程中,使用了分片(Sharing)的概念,当然也是借鉴了mongodb和redis集群的设计理念,同时借鉴了方雪华老兄的EQueue设计理念,在这里和他们说一声:谢谢!

感谢各位对Lind的支持!

本文转自博客园张占岭(仓储大叔)的博客,原文链接:Lind.DDD.LindMQ~关于持久化到Redis的消息格式,如需转载请自行联系原博主。

Lind.DDD.LindMQ~关于持久化到Redis的消息格式相关推荐

  1. Lind.DDD.LindMQ的一些想法

    很久就想写一套属于自己的消息队列组件,前段时候看了汤雪华同学的EQueue,感觉还是不错的,他也是看了rabbitMQ之后写的Equeue,在设计上与前者有类似的地方,而大叔这次准备写一个LindMQ ...

  2. Lind.DDD敏捷领域驱动框架~介绍

    最近觉得自己的框架过于复杂,在实现开发使用中有些不爽,自己的朋友们也经常和我说,框架太麻烦了,要引用的类库太多:之前架构之所以这样设计,完全出于对职责分离和代码附复用的考虑,主要参考了微软的DDD大作 ...

  3. Redis学习笔记~Redis事务机制与Lind.DDD.Repositories.Redis事务机制的实现

    回到目录 Redis本身支持事务,这就是SQL数据库有Transaction一样,而Redis的驱动也支持事务,这在ServiceStack.Redis就有所体现,它也是目前最受业界认可的Redis驱 ...

  4. Lind.DDD敏捷领域驱动框架~Lind.DDD各层介绍

    回到目录 Lind.DDD项目主要面向敏捷,快速开发,领域驱动等,对于它的分层也是能合并的合并,比之前大叔的框架分层更粗糙一些,或者说更大胆一些,在开发人员使用上,可能会感觉更方便了,更益使用了,这就 ...

  5. Lind.DDD.RedisClient~对StackExchange.Redis调用者的封装及多路复用技术

    两雄争霸 使用StackExchange.Redis的原因是因为它开源,免费,而对于商业化的ServiceStack.Redis,它将一步步被前者取代,开源将是一种趋势,商业化也值得被我们尊重,毕竟人 ...

  6. Lind.DDD.API核心技术分享

    关于Lind.DDD框架里API框架的技术点说明 讲解:张占岭 花名:仓储大叔 主要框架:Lind.DDD 目录 关于Lind.DDD.Authorization 关于授权的原理 关于ApiValid ...

  7. Lind.DDD.LindAspects方法拦截的介绍

    回到目录 什么是LindAspects 之前写了关于Aspects的文章<Lind.DDD.Aspects通过Plugins实现方法的动态拦截~Lind里的AOP>,今天主要在设计思想上进 ...

  8. Lind.DDD.Caching分布式数据集缓存介绍

    戏说当年 大叔原创的分布式数据集缓存在之前的企业级框架里介绍过,大家可以关注<我心中的核心组件(可插拔的AOP)~第二回 缓存拦截器>,而今天主要对Lind.DDD.Caching进行更全 ...

  9. Lind.DDD.Domain领域模型介绍

    Lind.DDD.Domain位于Lind.DDD核心项目中,它主要面向领域实体而设计,由一个IEntity的标识接口,EntityBase基类和N个Entity实体类组成,其中IEntity主要用来 ...

最新文章

  1. 【Java】环形链表 ( 给定一个链表,判断链表中是否有环)
  2. oracle从备份归档日志的方法集中回收
  3. Python 微信机器人:调用电脑摄像头时时监控功能实现演示,调用电脑摄像头进行拍照并保存
  4. PHP7.0 Window10 Redis安装教程
  5. 面试官:String的最大长度是多少?
  6. pytorch 正向与反向传播的过程 获取模型的梯度(gradient),并绘制梯度的直方图
  7. ubuntu 安装 LAMP
  8. asp.net如何解决传递中文参数乱码问题
  9. bootstrap treetable 树形网格,动态扩展,连数据库
  10. 拼音字母搜索匹配汉字
  11. python 移动文件 覆盖_python 剪切移动文件的实现代码
  12. 基于经纬度做航线图可视化
  13. Speex编解码手册
  14. 5G协议栈用户面模块
  15. python免费全套教程400集视频-如何入门 Python 爬虫?400集免费教程视频带你从0-1全面掌握...
  16. 2020年度十大高薪岗位出炉!程序员霸榜!
  17. linux虚拟机usb网卡驱动,【Vbox】centos虚拟机安装usb网卡驱动
  18. iTween.MoveTo用法
  19. Python 机器学习工具库
  20. 一度智信电商:店铺转化率太低?

热门文章

  1. js实现复选框的全选、取消全选、反选
  2. php 进行电子商务网站开发
  3. 【金三银四】软件测试简历项目经验怎么写,没有项目经验?
  4. 自定义MySQL安装
  5. 医学文献王与word连用
  6. 关于z-index的详细解释
  7. 游戏一直服务器维护,游戏服务器显示维护中
  8. 连续内存分配与非连续内存分配
  9. layui弹出层(确定、取消)
  10. camunda 应用