java camel_Apache Camel建立基于消息的应用
Apache Camel建立基于消息的应用
该源码展示:
基于Maven开始使用Apache Camel
使用CamelRunner提升路由。
使用camel建立的基于消息应用
基于Maven开始使用Apache Camel
这个 camel-demo可以作为你的项目模板,你只需要重命名的Java包,并重新命名POM的组和artifactID以符合您的需要。
该项目打开如下:
camel-demo
+- bin
+- config
+- data
+- src
+- pom.xml
+- README.txt
Maven的配置:
xsi:schemaLocation='http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd'>
4.0.0
deng.cameldemo
camel-demo
1.0.0-SNAPSHOT
jar
UTF-8
1.6.6
2.10.1
maven-compiler-plugin
2.3.2
1.6
1.6
maven-assembly-plugin
2.3
project
jar-with-dependencies
make-assembly
package
single
junit
junit-dep
4.10
test
org.hamcrest
hamcrest-library
1.2.1
test
org.slf4j
slf4j-api
${slf4j.version}
org.slf4j
slf4j-log4j12
${slf4j.version}
runtime
true
commons-lang
commons-lang
2.6
commons-io
commons-io
2.0.1
org.apache.camel
camel-core
${camel.version}
org.apache.camel
camel-spring
${camel.version}
org.apache.camel
camel-groovy
${camel.version}
org.apache.camel
camel-jackson
${camel.version}
org.apache.camel
camel-mina
${camel.version}
此的pom.xml的声明了一个基于Java的应用程序,它会产生jar包。它需要最少JDK6或更高版本。除了典型的JUnit和hamcrest的单元测试,还添加了SLF4J进行记录。加入的Apache的commons-lang/io的项目。
maven-assembly-plugin只有用于这个Demo演示目的,您可以更改或删除以便符合您自己的项目需要。
对于Camel依赖,除了camel-core,还有:
camel-spring – 将Camel的路由作为XML配置. 见案例的 camel-demo/config目录.
camel-jackson – 以JSON格式产生消息。
camel-mina – 通过TCP Socket跨网络发送数据。
camel-groovy – [可选] 增加动态脚本到路由中,适合调试和POC.
进入该项目,运行 mvn compile 检查是否有错误。
使用CamelRunner提升路由
下面使用路由来表达业务逻辑,以src/main/java/deng/cameldemo/HelloRoute.java为案例:
package deng.cameldemo;
import org.apache.camel.builder.RouteBuilder;
public class HelloRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from('timer://helloTimer?period=3000').
to('log:' + getClass().getName());
}
}
为了检查其如何运行,需要一个CamelContext ,编制类CamelRunner :
package deng.cameldemo;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
/**
* A main program to start Camel and run as a server using RouteBuilder class names or
* Spring config files.
*
*
Usage:
*
* java deng.cameldemo.CamelRunner deng.cameldemo.HelloRoute
*
* or
*
* java -Dspring=true deng.cameldemo.CamelRunner /path/to/camel-spring.xml
*
* @author Zemian Deng
*/
public class CamelRunner {
public static void main(String[] args) throws Exception {
CamelRunner runner = new CamelRunner();
runner.run(args);
}
private static Logger logger = LoggerFactory.getLogger(CamelRunner.class);
public void run(String[] args) throws Exception {
if (Boolean.parseBoolean(System.getProperty('spring', 'false')))
runWithSpringConfig(args);
else
runWithCamelRoutes(args);
// Wait for user to hit CRTL+C to stop the service
synchronized(this) {
this.wait();
}
}
private void runWithSpringConfig(String[] args) {
final ConfigurableApplicationContext springContext = new FileSystemXmlApplicationContext(args);
// Register proper shutdown.
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
springContext.close();
logger.info('Spring stopped.');
} catch (Exception e) {
logger.error('Failed to stop Spring.', e);
}
}
});
// Start spring
logger.info('Spring started.');
}
private void runWithCamelRoutes(String[] args) throws Exception {
final CamelContext camelContext = new DefaultCamelContext();
// Register proper shutdown.
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
camelContext.stop();
logger.info('Camel stopped for {}', camelContext);
} catch (Exception e) {
logger.error('Failed to stop Camel.', e);
}
}
});
// Added RouteBuilder from args
for (String className : args) {
Class> cls = Class.forName(className);
if (RouteBuilder.class.isAssignableFrom(cls)) {
Object obj = cls.newInstance();
RouteBuilder routeBuilder = (RouteBuilder)obj;
camelContext.addRoutes(routeBuilder);
} else {
throw new RuntimeException('Unable to add Camel RouteBuilder ' + className);
}
}
// Start camel
camelContext.start();
logger.info('Camel started for {}', camelContext);
}
}
其中两个方法,一个是通过Spring的配置运行路由,一个是通过代码。
使用run-java这个SH批命令直接运行:
$ mvn package
$ bin/run-java deng.cameldemo.CamelRunner deng.cameldemo.HelloRoute
你会看到程序加载HelloRoute到 DefaultCamelContext 并且开始为一个服务器,HelloRoute产生一个3秒计时的消息,发送到日志,打印到你的屏幕,使用 CTRL+C 中断。
下面配合Spring配置运行这个CamelRunner。
使用Spring的XML配置可以灵活指定路由。运行:
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/hellocamel-spring.xml
config/hellocamel-spring.xml 相当于HelloRoute 代码,通过配置完成:
xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
xsi:schemaLocation='
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'>
使用camel建立的基于消息应用
为了路由监听camel-mina 提供的TCP端口,需要一个路由:
package deng.cameldemo;
import org.apache.camel.builder.RouteBuilder;
public class TcpMsgRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
String port = System.getProperty('port', '12345');
from('mina:tcp://localhost:' + port + '?sync=false').
to('log:' + getClass().getName());
}
}
一切准备好了,运行:
$ bin/run-java deng.cameldemo.CamelRunner deng.cameldemo.TcpMsgRoute -Dport=12345
输出:
15:21:41 main INFO org.apache.camel.impl.DefaultCamelContext:1391 | Apache Camel 2.10.1 (CamelContext: camel-1) is starting
15:21:41 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled.
15:21:42 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters
15:21:42 main INFO org.apache.camel.component.mina.MinaConsumer:59 | Binding to server address: localhost/127.0.0.1:12345 using acceptor: org.apache.mina.transport.socket.nio.SocketAcceptor@2ffad8fe
15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:2045 | Route: route1 started and consuming from: Endpoint[mina://tcp://localhost:12345?sync=true]
15:21:42 main INFO org.apache.camel.management.DefaultManagementLifecycleStrategy:859 | StatisticsLevel at All so enabling load performance statistics
15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:1426 | Total 1 routes, of which 1 is started.
15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:1427 | Apache Camel 2.10.1 (CamelContext: camel-1) started in 0.505 seconds
15:21:42 main INFO deng.cameldemo.CamelRunner:93 | Camel started for CamelContext(camel-1)
服务器在12345端口等待用户客户端。
客户端代码,写一个TCP客户端:
package deng.cameldemo.client;
import java.io.FileReader;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TcpMsgSender {
public static void main(String[] args) throws Exception {
TcpMsgSender runner = new TcpMsgSender();
runner.run(args);
}
private static Logger logger = LoggerFactory.getLogger(TcpMsgSender.class);
public void run(String[] args) throws Exception {
String fileName = args.length > 0 ? args[0] : 'data/msg.txt';
String[] hostPort = (args.length > 1 ? args[1] : 'localhost:12345').split(':');
String host = hostPort[0];
String port = hostPort.length > 1 ? hostPort[1] : '12345';
logger.info('Sending tcp message {} to host={}, port={}', new Object[]{ fileName, host, port});
String text = IOUtils.toString(new FileReader(fileName));
logger.debug('File size={}', text.length());
CamelContext camelContext = new DefaultCamelContext();
ProducerTemplate producer = camelContext.createProducerTemplate();
producer.sendBody('mina:tcp://' + host + ':' + port + '?sync=false', text);
logger.info('Message sent.');
}
}
这个TcpMsgSender 将发送文本到服务器端点。运行:
$ bin/run-java deng.cameldemo.client.TcpMsgSender data/test-msg.json localhost:12345
输出:
15:22:35 main INFO deng.cameldemo.client.TcpMsgSender:24 | Sending tcp message data/test-msg.json to host=localhost, port=12345
15:22:35 main DEBUG deng.cameldemo.client.TcpMsgSender:27 | File size=47
15:22:35 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters
15:22:35 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled.
15:22:35 main INFO deng.cameldemo.client.TcpMsgSender:32 | Message sent.
服务器端我们还是可以用Spring配置替代TcpMsgRoute :
xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
xsi:schemaLocation='
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'>
保存为 config/tcpmsgserver-spring.xml,运行:
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/tcpmsgserver-spring.xml
下面我们将产生JSON格式的文本。
输出你接受到文本,文本格式在data/test-msg.json:
{ 'firstName' : 'Zemian', 'lastName' : 'Deng' }
使用myMsgProcessor用来进行JSON和Java对象之间转换,创建 config/tcpmsgserver-json-spring.xml :
xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
xsi:schemaLocation='
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'>
'myMsgProcessor'代码如下:
package deng.cameldemo;
import org.apache.camel.builder.RouteBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class MyMsgProcessor {
private static Logger logger = LoggerFactory.getLogger(MyMsgProcessor.class);
public void process(Map data) {
logger.info('We should slice and dice the data: ' + data);
}
}
再次运行:
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/tcpmsgserver-json-spring.xml
最后几行输出:
17:05:28 main INFO deng.cameldemo.CamelRunner:61 | Spring started.
17:05:35 Camel (tcpMsgServer) thread #3 - MinaThreadPool INFO deng.cameldemo.TcpMsgServer:96 | Exchange[ExchangePattern:InOnly, BodyType:String, Body:{ 'firstName' : 'Zemian', 'lastName' : 'Deng' }]
17:05:35 Camel (tcpMsgServer) thread #3 - MinaThreadPool INFO deng.cameldemo.MyMsgProcessor:11 | We should slice and dice the data: {lastName=Deng, firstName=Zemian}
Camel会自动转换的数据格式,客户端只发送JSON格式的纯文本,当服务器接收到它,它使用Jackson库包将其转换成一个Java的Map对象。然后通过map象导入到我们的处理器Bean中。
同样的道理,当你将业务逻辑写成一个或多个处理器bean,这是一个好主意,因为这将限制你的POJO逻辑到尽可能小的单位。当你做到这一点,那么你就可以最大限度地提高处理器的可重用性。如果做成一个混合的更大的POJO,很多业务逻辑混合,它也将很难测试。一旦你养成良好习惯,你就可以使用Camel骆驼以一种更有效的方式解决很多领域问题。
java camel_Apache Camel建立基于消息的应用相关推荐
- java抢单功能_基于消息队列的高并发抢单功能实现方法与流程
本发明涉及嵌入式软件中间件,具体涉及一种基于消息队列的高并发抢单功能实现方法. 背景技术: 中间件是一种独立的系统软件或服务程序,分布式应用系统借助这种软件在不同的技术之间共享资源,管理计算资源和网络 ...
- java camel_Apache Camel到底是什么?
小编典典 我说: Apache Camel是消息传递技术与路由的结合.它将消息传递的起点和终点连接在一起,从而允许将消息从不同的源传输到不同的目的地.例如:JMS-> JSON,HTTP-> ...
- camel mq_Camel:构建基于消息的应用程序
camel mq 这是一篇长文章,包含三个单独的主题: Java的Apache Camel入门 使用CamelRunner改善路线的启动 使用Camel构建基于消息的应用程序 但是,由于我准备了包含所 ...
- Camel:构建基于消息的应用程序
这是一篇长文章,包含三个单独的主题: Java的Apache Camel入门 使用CamelRunner改善路线的启动 使用Camel构建基于消息的应用程序 但是,由于我准备了包含所有这些材料的cam ...
- apache.camel_Apache Camel 2.18发布–包含内容
apache.camel 本周发布了Apache Camel 2.18.0 . 此版本是重要版本,我将在此博客文章中重点介绍. Java 8 Camel 2.18是第一个需要Java 1.8的版本(例 ...
- apache camel_Apache Camel的性能调整思路
apache camel 时不时地,我会以Camel速度较慢的观点来询问有关优化Camel应用程序的问题. 骆驼只是连接不同系统的粘合剂,路由引擎全部在内存中,并且不需要任何持久状态. 因此,在99% ...
- Java推送IOS通知消息
Java推送IOS通知消息 公司需要做IOS消息推送,我负责后台代码的实现.写这篇文章也是将我踩坑得来的结果记录一下,分享一下. APN介绍 Apple 推送通知服务(APNs) 是远程通知功能的核心 ...
- 51 rtos系统 : MCUsystem 介绍 -- 基于消息队列
一个基于消息循环的51操作系统,这个有点象VC的风格,大家看McuSystem.rar 看,这个不错的. ReadMe.txt 默认路径: D:\MCU51\ Version:MS1.01-2003. ...
- activiti异步执行_对基于消息队列的Activiti异步执行器进行基准测试
activiti异步执行 一点历史 永无休止的一件事是,Activiti在某些非常大的规模的大型组织中的使用方式. 过去,这导致了各种优化和重构,其中包括异步执行器-替换旧的作业执行器. 对于未启动的 ...
最新文章
- vs如何设置对话框显示在最前面_【另存为】对话框的使用
- 最锋利的Visual Studio Web开发工具扩展:Web Essentials详解(转)
- UI基础 - UIScrollView
- java中类/对象的初始化顺序以及静态代码块的使用
- Handler造成Activity泄漏,用弱引用真的有用么?
- 微软算法100题26 左旋转字符串
- 微软发布 PowerToys 首个预览版,重启的 Windows 工具集
- 滴滴怒怼美团;阿里麻吉宝刷屏;B站、爱奇艺上市 | CSDN 极客头条
- 【前端基础进阶】JS原型、原型链、对象详解
- 2022考研数据结构_1 绪论
- Ansys命令流(按字母排列)
- 苹果电脑连接打印机操作
- IE8兼容html5视频播放
- mysql自定义函数的分号_Mysql自定义函数
- CCNP基础知识-交换技术
- FATAL Exited too quickly (process log may have details)
- 写一篇简单的TileMap入门教程
- 弹珠css3,使用CSS3实现的弹球小动画
- 课程设计 单项选择题标准化考试系统
- c3p0连接池使用完毕后连接返还