性能测试

在48core,128G内存的物理服务器上测试协议解析效率:35K条/s, cpu使用率25%.

Build

执行mvn package . jdk1.6以上.

增加了业务处理API

业务层实现接口:BusinessHandlerInterface,或者继承AbstractBusinessHandler抽象类实现业务即可。 连接保活,消息重发,消息持久化,连接鉴权都已封装,不须要业务层再实现。

如何实现自己的Handler,比如按短短信计费

参考 CMPPChargingDemoTest 里的扩展位置

实体类说明

CMPP的连接端口

com.zx.sms.connect.manager.cmpp.CMPPEndpointEntity 表示一个Tcp连接的发起端,或者接收端。用来记录连接的IP.port,以及CMPP协议的用户名,密码,业务处理的ChannelHandler集合等其它端口参数。包含三个子类:

com.zx.sms.connect.manager.cmpp.CMPPServerEndpointEntity 服务监听端口,包含一个List属性。 一个服务端口包含多个CMPPServerChildEndpointEntity端口

com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointEntity 服务接收端口,包含CMPP连接用户名,密码,以及协议版本等信息

com.zx.sms.connect.manager.cmpp.CMPPClientEndpointEntity 客户端端口,包含CMPP连接用户名,密码,以及协议版本,以及服务端IP.port. 用于连接服务端

端口连接器接口

com.zx.sms.connect.manager.EndpointConnector 负责一个端口的打开,关闭,查看当前连接数,新增连接,移除连接。每个端口的实体类都对应一个EndpointConnector.当CMPP连接建立完成,将连接加入连接器管理,并给pipeLine上挂载业务处理的ChannelHandler.

com.zx.sms.connect.manager.cmpp.CMPPServerEndpointConnector 这个类的open()调用netty的ServerBootstrap.bind()开一个服务监听

com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointConnector 用来收集CMPPServerChildEndpointEntity端口下的所有连接。它的open()方法为空.

com.zx.sms.connect.manager.cmpp.CMPPClientEndpointConnector 这个类open()调用netty的Bootstrap.connect()开始一个TCP连接

端口管理器

com.zx.sms.connect.manager.EndpointManager 该类是单例模式,管理所有端口,并负责所有端口的打开,关闭,以及端口信息保存,以及连接断线重连。

CMPP协议的连接登陆管理

com.zx.sms.session.cmpp.SessionLoginManager 这是一个netty的ChannelHandler实现,主要负责CMPP连接的建立。当CMPP连接建立完成后,会调用EndpointConnector.addChannel(channel)方法,把连接加入连接器管理,连接器负责给channel的pipeline上挂载业务处理的Handler,最后触发 SessionState.Connect事件,通知业务处理Handler连接已建立成功。

CMPP的连接状态管理器

com.zx.sms.session.cmpp.SessionStateManager 这是一个netty的ChannelHandler实现。负责每个连接上CMPP消息的存储,短信重发,流量窗口控制,过期短信的处理

CMPP协议解析器

CMPP20MessageCodecAggregator [2.0协议] CMPPMessageCodecAggregator [这是3.0协议] 聚合了CMPP主要消息协议的解析,编码,长短信拆分,合并处理。

短信持久化存储实现 StoredMapFactory

使用BDB的StoreMap实现消息持久化,防止系统意外丢失短信。

程序启动处理流程

程序启动类 new 一个CMPPEndpointEntity的实体类并设置IP,port,用户名,密码,业务处理的Handler等参数,

程序启动类 调用EndpointManager.addEndpointEntity(endpoint)方法,将端口加入管理器

程序启动类 调用EndpointManager.openAll()或者EndpointManager.openEndpoint()方法打开端口。

EndpointManager会调用EndpointEntity.buildConnector()创建一个端口连接器,并调用EndpointConnector.open()方法打开端口。

如果是CMPPClientEndpointEntity的话,就会向服务器发起TCP连接请求,如果是CMPPServerEndpointEntity则会在本机开启一个服务端口等客户端连接。

TCP连接建立完成后。netty会调用EndpointConnector.initPipeLine()方法初始化PipeLine,把CMPP协议解析器,SessionLoginManager加到PipeLine里去,然后netty触发ChannelActive事件。

在SessionLoginManager类里,客户端收到ChannelActive事件后会发送一个CMPPConnnect消息,请求建立CMPP连接.

同样在SessionLoginManager.channelRead()方法里,服务端会收到CMPPConnnect消息,开始对用户名,密码进行鉴权,并给客户端鉴权结果。

鉴权通过后,SessionLoginManager调用EndpointConnector.addChannel(channel)方法,把channel加入ArrayList,并给pipeLine上挂载SessionStateManager和业务处理的ChannelHandler,如心跳处理,日志记录,长短信合并拆分处理类。

EndpointConnector.addChannel(channel)完成后,SessionLoginManager调用ctx.fireUserEventTriggered()方法,触发 SessionState.Connect事件。

以上CMPP连接建立完成。

业务处理类收到SessionState.Connect事件,开始业务处理,如从MQ获取短信下发,或开启Consumer接收MQ推送的消息。

SessionStateManager会拦截所有read()和write()的消息,进行消息持久化,消息重发,流量控制。

增加同步调用api

smsgate自开发以来,一直使用netty的异步发送消息,但实际使用场景中同步发送消息的更方便,或者能方便的取到response。因此增加一个同步调用的api。即:发送消息后等接收到对应的响应后才返回。 使用方法如下:

//因为长短信要拆分,因此返回一个promiseList.每个拆分后的短信对应一个promise

List futures = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage);

for(Promise future: futures){

//调用sync()方法,阻塞线程。等待接收response

future.sync();

//接收成功,如果失败可以获取失败原因,比如遇到连接突然中断错误等等

if(future.isSuccess()){

//打印收到的response消息

logger.info("response:{}",future.get());

}else{

打印错误原因

logger.error("response:{}",future.cause());

}

}

//或者不阻塞进程,不调用sync()方法。

List promises = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage);

for(Promise promise: promises){

//接收到response后回调Listener方法

promise.addListener(new GenericFutureListener() {

@Override

public void operationComplete(Future future) throws Exception {

//接收成功,如果失败可以获取失败原因,比如遇到连接突然中断错误等等

if(future.isSuccess()){

//打印收到的response消息

logger.info("response:{}",future.get());

}else{

打印错误原因

logger.error("response:{}",future.cause());

}

}

});

}

CMPP Api使用举例

public class TestCMPPEndPoint {

private static final Logger logger = LoggerFactory.getLogger(TestCMPPEndPoint.class);

@Test

public void testCMPPEndpoint() throws Exception {

ResourceLeakDetector.setLevel(Level.ADVANCED);

final EndpointManager manager = EndpointManager.INS;

CMPPServerEndpointEntity server = new CMPPServerEndpointEntity();

server.setId("server");

server.setHost("127.0.0.1");

server.setPort(7890);

server.setValid(true);

//使用ssl加密数据流

server.setUseSSL(false);

CMPPServerChildEndpointEntity child = new CMPPServerChildEndpointEntity();

child.setId("child");

child.setChartset(Charset.forName("utf-8"));

child.setGroupName("test");

child.setUserName("901783");

child.setPassword("ICP001");

child.setValid(true);

child.setVersion((short)0x30);

child.setMaxChannels((short)4);

child.setRetryWaitTimeSec((short)30);

child.setMaxRetryCnt((short)3);

child.setReSendFailMsg(true);

//child.setWriteLimit(200);

//child.setReadLimit(200);

List serverhandlers = new ArrayList();

serverhandlers.add(new CMPPMessageReceiveHandler()); //在这个handler里接收短信

child.setBusinessHandlerSet(serverhandlers);

server.addchild(child);

manager.addEndpointEntity(server);

CMPPClientEndpointEntity client = new CMPPClientEndpointEntity();

client.setId("client");

client.setHost("127.0.0.1");

//client.setLocalhost("127.0.0.1");

//client.setLocalport(65521);

client.setPort(7890);

client.setChartset(Charset.forName("utf-8"));

client.setGroupName("test");

client.setUserName("901783");

client.setPassword("ICP001");

client.setMaxChannels((short)10);

client.setVersion((short)0x30);

client.setRetryWaitTimeSec((short)30);

client.setUseSSL(false);

//client.setWriteLimit(100);

client.setReSendFailMsg(true);

client.setSupportLongmsg(SupportLongMessage.BOTH);

List clienthandlers = new ArrayList();

clienthandlers.add( new CMPPSessionConnectedHandler(10000)); //在这个handler里发送短信

client.setBusinessHandlerSet(clienthandlers);

manager.addEndpointEntity(client);

manager.openEndpoint(server);

Thread.sleep(1000);

for(int i=0;i<=child.getMaxChannels()+1;i++)

manager.openEndpoint(client);

System.out.println("start.....");

//Thread.sleep(300000);

LockSupport.park();

EndpointManager.INS.close();

}

}

SMPP Api使用举例

public class TestSMPPEndPoint {

private static final Logger logger = LoggerFactory.getLogger(TestSMPPEndPoint.class);

@Test

public void testSMPPEndpoint() throws Exception {

final EndpointManager manager = EndpointManager.INS;

SMPPServerEndpointEntity server = new SMPPServerEndpointEntity();

server.setId("smppserver");

server.setHost("127.0.0.1");

server.setPort(2776);

server.setValid(true);

//使用ssl加密数据流

server.setUseSSL(false);

SMPPServerChildEndpointEntity child = new SMPPServerChildEndpointEntity();

child.setId("smppchild");

child.setSystemId("901782");

child.setPassword("ICP");

child.setValid(true);

child.setChannelType(ChannelType.DUPLEX);

child.setMaxChannels((short)3);

child.setRetryWaitTimeSec((short)30);

child.setMaxRetryCnt((short)3);

child.setReSendFailMsg(true);

child.setIdleTimeSec((short)15);

//child.setWriteLimit(200);

//child.setReadLimit(200);

List serverhandlers = new ArrayList();

serverhandlers.add(new SMPPSessionConnectedHandler(10000));

child.setBusinessHandlerSet(serverhandlers);

server.addchild(child);

SMPPClientEndpointEntity client = new SMPPClientEndpointEntity();

client.setId("smppclient");

client.setHost("127.0.0.1");

client.setPort(2776);

client.setSystemId("901782");

client.setPassword("ICP");

client.setChannelType(ChannelType.DUPLEX);

client.setMaxChannels((short)12);

client.setRetryWaitTimeSec((short)100);

client.setUseSSL(false);

client.setReSendFailMsg(true);

//client.setWriteLimit(200);

//client.setReadLimit(200);

client.setSupportLongmsg(SupportLongMessage.SEND); //接收长短信时不自动合并

List clienthandlers = new ArrayList();

clienthandlers.add( new SMPPMessageReceiveHandler());

client.setBusinessHandlerSet(clienthandlers);

manager.addEndpointEntity(server);

manager.addEndpointEntity(client);

manager.openAll();

manager.startConnectionCheckTask();

Thread.sleep(1000);

for(int i=0;i

manager.openEndpoint(client);

System.out.println("start.....");

LockSupport.park();

EndpointManager.INS.close();

}

}

SGIP Api使用举例

public class TestSgipEndPoint {

private static final Logger logger = LoggerFactory.getLogger(TestSgipEndPoint.class);

@Test

public void testsgipEndpoint() throws Exception {

ResourceLeakDetector.setLevel(Level.ADVANCED);

final EndpointManager manager = EndpointManager.INS;

SgipServerEndpointEntity server = new SgipServerEndpointEntity();

server.setId("sgipserver");

server.setHost("127.0.0.1");

server.setPort(8001);

server.setValid(true);

//使用ssl加密数据流

server.setUseSSL(false);

SgipServerChildEndpointEntity child = new SgipServerChildEndpointEntity();

child.setId("sgipchild");

child.setLoginName("333");

child.setLoginPassowrd("0555");

child.setValid(true);

child.setChannelType(ChannelType.DUPLEX);

child.setMaxChannels((short)3);

child.setRetryWaitTimeSec((short)30);

child.setMaxRetryCnt((short)3);

child.setReSendFailMsg(false);

child.setIdleTimeSec((short)30);

//child.setWriteLimit(200);

//child.setReadLimit(200);

child.setSupportLongmsg(SupportLongMessage.SEND); //接收长短信时不自动合并

List serverhandlers = new ArrayList();

serverhandlers.add(new SgipReportRequestMessageHandler());

serverhandlers.add(new SGIPMessageReceiveHandler());

child.setBusinessHandlerSet(serverhandlers);

server.addchild(child);

manager.addEndpointEntity(server);

SgipClientEndpointEntity client = new SgipClientEndpointEntity();

client.setId("sgipclient");

client.setHost("127.0.0.1");

client.setPort(8001);

client.setLoginName("333");

client.setLoginPassowrd("0555");

client.setChannelType(ChannelType.DUPLEX);

client.setMaxChannels((short)10);

client.setRetryWaitTimeSec((short)100);

client.setUseSSL(false);

client.setReSendFailMsg(true);

//client.setWriteLimit(200);

//client.setReadLimit(200);

List clienthandlers = new ArrayList();

clienthandlers.add(new SGIPSessionConnectedHandler(10000));

client.setBusinessHandlerSet(clienthandlers);

manager.addEndpointEntity(client);

manager.openAll();

Thread.sleep(1000);

for(int i=0;i

manager.openEndpoint(client);

System.out.println("start.....");

LockSupport.park();

EndpointManager.INS.close();

}

}

Demo 执行日志

11:31:52.842 [workGroup2] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@1d7059df

11:31:52.852 [workGroup1] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@75e134be

11:31:52.852 [workGroup1] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.handler.api.gate.SessionConnectedHandler@aa80b58

11:31:52.869 [workGroup1] INFO c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0xfdc7b81e, L:/127.0.0.1:11481 - R:/127.0.0.1:2776]

11:31:52.867 [workGroup2] INFO c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0x1fba3767, L:/127.0.0.1:2776 - R:/127.0.0.1:11481]

11:31:53.863 [busiWork-3] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:343, speed : 343/s

11:31:54.872 [busiWork-1] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:1381, speed : 1038/s

11:31:55.873 [busiWork-8] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:2704, speed : 1323/s

11:31:56.875 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:4010, speed : 1306/s

11:31:57.880 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:5416, speed : 1406/s

11:31:58.881 [busiWork-7] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:7442, speed : 2026/s

11:31:59.882 [busiWork-8] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:9581, speed : 2139/s

11:32:00.883 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:12865, speed : 3284/s

11:32:01.884 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:15937, speed : 3072/s

11:32:02.886 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:19489, speed : 3552/s

11:32:03.887 [busiWork-6] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:23065, speed : 3576/s

11:32:04.888 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:26337, speed : 3272/s

android 短信编解码方式,中移短信cmpp协议/smpp协议 netty实现编解码相关推荐

  1. 中移ML302模组通过MQTT协议接入oneNT平台

    @中移ML302模组通过MQTT协议接入oneNT平台 ML302 是中国移动最新推出的 LTE Cat.1 模块. 丰富的 Internet 协议.行业标准接口和功能,支持 Windows.Linu ...

  2. Android-插入短信及备份手机中的短信到SD卡

    短信数据库 只需要关注sms表 只需要关注4个字段 body:短信内容 address:短信的发件人或收件人号码 date:短信时间 type:1为收到,2为发送 读取系统短信,首先查询源码获得短信数 ...

  3. java跟onenet平台交互_中移物联OneNET平台HTTP协议接入

    HTTP协议接入 登录后进入开发者中心 登录成功,点击进入"开发者中心". 点击左侧菜单 这里我们点击箭头所指的"多协议接入" 可以看到这里可以创建基于MQTT ...

  4. 短信ui--短信设置界面之sim卡短信管理

    sim卡短信管理 1.前言           对于sim卡的短信管理,其功能包含了将存在手机上的短信保存到sim卡.将存储位置设置为sim卡时自动将短信保存到sim卡.将sim卡中的短信导入到电话中 ...

  5. 中移路由怎么调虚拟服务器,用手机怎么设置中移禹路由器?

    问:请问中国移动无线路由器禹路由器(zy366)支持用手机设置吗?电脑开不了机了,用手机怎么设置中移禹路由器呢?求教程! 答:中国移动(铁通)的中移禹路由器ZY-366支持用手机设置,在本文家用路由器 ...

  6. Android开发 亲测可用--多种方式获取手机短信验证码自动填入

    Android开发 静态注册.动态注册.短信中心库监控获取手机验证码,自动复制到剪切板或或填入输入框. 友情提醒初学者:这是广播接收器的类,写在xml中静态注册或写在启动类的Oncreate方法下动态 ...

  7. android发送短信的两种方式,发送长短信的两种方式,群发短信

    android 发送短信的方法 方法一:调用系统的短信APP,发送短信. Intent smsIntent = new Intent(Intent.ACTION_VIEW);smsIntent.set ...

  8. android短信iphone,不越狱教你向iPhone中导入短信_手机生活评测-中关村在线

    2.短信 微信只有备份通讯录的功能,第三方软件包括QQ同步助手等虽然支持通话记录.短信的备份,但是无法再Android和iOS之间互通,想要在不越狱的情况下完成短信的备份有些繁琐,接下来我们就为大家演 ...

  9. Android中读取短信信息

    Android中读取的短信文件有 /*** 所有的短信*/public static final String SMS_URI_ALL = "content://sms/";/** ...

最新文章

  1. Web前端与HTML5有什么区别,百分之八十的人分不清
  2. Sqlserver 数据库安全
  3. 【iVX 初级工程师培训教程 10篇文拿证】07 08 新闻页制作
  4. JAVA SSM框架+Redis 实现单点登录
  5. 95-080-048-源码-启动-启动standalonesession
  6. Android 上哪个更好:除以 2 还是位移 1?
  7. mysql删除十天前数据脚本_前几天手工删除测试数据库并重建的脚本
  8. 信用评分-(scorecard)记分卡开发流程,详细介绍分数校准原理calibration
  9. Java开发者必备十大学习网站
  10. HZOI20190903模拟36 字符,蛋糕,游戏
  11. 数据分析:逻辑树分析模型
  12. nginx02-RTFM
  13. 多功能时钟电路的设计框图_详解通用串行总线USB,工作原理、接口电路设计
  14. ROS-机器人操作系统(ROS)浅析----第三章
  15. 阿里云购买免费ssl证书
  16. 徐小湛概率论与数理统计课件_概率论与数理统计-徐小湛-视频教程70讲
  17. 对抗样本方向(Adversarial Examples)2018-2020年最新论文调研
  18. <口算练习机 方案开发原理图>口算练习机/口算宝/儿童数学宝/儿童计算器 LCD液晶显示驱动IC-VK1621B,提供技术支持
  19. java判断一个月连续打卡时间_java并发编程实战《五》死锁 挑战打卡60天
  20. JDBC向数据库插入一条数据控制台不报错但是插入没成功

热门文章

  1. 将类似html数据打印机,机器人和3D打印机的架构有哪些相似之处
  2. centos6.5装mysql好难_centos 6.5装mysql5.7
  3. mysql主备切换 自动_核电生产管理信息系统EAM完成首次备用环境切换演练
  4. python并发编程4-线程
  5. Python实现单例
  6. 使用exp导出导入,需要注意的问题。
  7. Visual Paradigm中文乱码
  8. jQuery 中json字符串与对象互转
  9. Selenium2+python自动化5-操作浏览器基本方法
  10. jcenter那些事儿