Apache Camel建立基于消息的应用

该源码展示:

基于Maven开始使用Apache Camel

使用CamelRunner提升路由。

使用camel建立的基于消息应用

基于Maven开始使用Apache Camel

这个 camel-demo可以作为你的项目模板,你只需要重命名的Java包,并重新命名POM的组和artifactID以符合您的需要。

该项目打开如下:

camel-demo

+- bin

+- config

+- data

+- src

+- pom.xml

+- README.txt

Maven的配置:

xsi:schemaLocation='http://maven.apache.org/POM/4.0.0   http://maven.apache.org/maven-v4_0_0.xsd'>

4.0.0

deng.cameldemo

camel-demo

1.0.0-SNAPSHOT

jar

UTF-8

1.6.6

2.10.1

maven-compiler-plugin

2.3.2

1.6

1.6

maven-assembly-plugin

2.3

project

jar-with-dependencies

make-assembly

package

single

junit

junit-dep

4.10

test

org.hamcrest

hamcrest-library

1.2.1

test

org.slf4j

slf4j-api

${slf4j.version}

org.slf4j

slf4j-log4j12

${slf4j.version}

runtime

true

commons-lang

commons-lang

2.6

commons-io

commons-io

2.0.1

org.apache.camel

camel-core

${camel.version}

org.apache.camel

camel-spring

${camel.version}

org.apache.camel

camel-groovy

${camel.version}

org.apache.camel

camel-jackson

${camel.version}

org.apache.camel

camel-mina

${camel.version}

此的pom.xml的声明了一个基于Java的应用程序,它会产生jar包。它需要最少JDK6或更高版本。除了典型的JUnit和hamcrest的单元测试,还添加了SLF4J进行记录。加入的Apache的commons-lang/io的项目。

maven-assembly-plugin只有用于这个Demo演示目的,您可以更改或删除以便符合您自己的项目需要。

对于Camel依赖,除了camel-core,还有:

camel-spring – 将Camel的路由作为XML配置. 见案例的 camel-demo/config目录.

camel-jackson – 以JSON格式产生消息。

camel-mina – 通过TCP Socket跨网络发送数据。

camel-groovy – [可选] 增加动态脚本到路由中,适合调试和POC.

进入该项目,运行 mvn compile 检查是否有错误。

使用CamelRunner提升路由

下面使用路由来表达业务逻辑,以src/main/java/deng/cameldemo/HelloRoute.java为案例:

package deng.cameldemo;

import org.apache.camel.builder.RouteBuilder;

public class HelloRoute extends RouteBuilder {

@Override

public void configure() throws Exception {

from('timer://helloTimer?period=3000').

to('log:' + getClass().getName());

}

}

为了检查其如何运行,需要一个CamelContext ,编制类CamelRunner :

package deng.cameldemo;

import org.apache.camel.CamelContext;

import org.apache.camel.builder.RouteBuilder;

import org.apache.camel.impl.DefaultCamelContext;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.context.ConfigurableApplicationContext;

import org.springframework.context.support.FileSystemXmlApplicationContext;

/**

* A main program to start Camel and run as a server using RouteBuilder class names or

* Spring config files.

*

*

Usage:

*

* java deng.cameldemo.CamelRunner deng.cameldemo.HelloRoute

*

* or

*

* java -Dspring=true deng.cameldemo.CamelRunner /path/to/camel-spring.xml

*

* @author Zemian Deng

*/

public class CamelRunner {

public static void main(String[] args) throws Exception {

CamelRunner runner = new CamelRunner();

runner.run(args);

}

private static Logger logger = LoggerFactory.getLogger(CamelRunner.class);

public void run(String[] args) throws Exception {

if (Boolean.parseBoolean(System.getProperty('spring', 'false')))

runWithSpringConfig(args);

else

runWithCamelRoutes(args);

// Wait for user to hit CRTL+C to stop the service

synchronized(this) {

this.wait();

}

}

private void runWithSpringConfig(String[] args) {

final ConfigurableApplicationContext springContext = new FileSystemXmlApplicationContext(args);

// Register proper shutdown.

Runtime.getRuntime().addShutdownHook(new Thread() {

@Override

public void run() {

try {

springContext.close();

logger.info('Spring stopped.');

} catch (Exception e) {

logger.error('Failed to stop Spring.', e);

}

}

});

// Start spring

logger.info('Spring started.');

}

private void runWithCamelRoutes(String[] args) throws Exception {

final CamelContext camelContext = new DefaultCamelContext();

// Register proper shutdown.

Runtime.getRuntime().addShutdownHook(new Thread() {

@Override

public void run() {

try {

camelContext.stop();

logger.info('Camel stopped for {}', camelContext);

} catch (Exception e) {

logger.error('Failed to stop Camel.', e);

}

}

});

// Added RouteBuilder from args

for (String className : args) {

Class> cls = Class.forName(className);

if (RouteBuilder.class.isAssignableFrom(cls)) {

Object obj = cls.newInstance();

RouteBuilder routeBuilder = (RouteBuilder)obj;

camelContext.addRoutes(routeBuilder);

} else {

throw new RuntimeException('Unable to add Camel RouteBuilder ' + className);

}

}

// Start camel

camelContext.start();

logger.info('Camel started for {}', camelContext);

}

}

其中两个方法,一个是通过Spring的配置运行路由,一个是通过代码。

使用run-java这个SH批命令直接运行:

$ mvn package

$ bin/run-java deng.cameldemo.CamelRunner deng.cameldemo.HelloRoute

你会看到程序加载HelloRoute到 DefaultCamelContext 并且开始为一个服务器,HelloRoute产生一个3秒计时的消息,发送到日志,打印到你的屏幕,使用 CTRL+C 中断。

下面配合Spring配置运行这个CamelRunner。

使用Spring的XML配置可以灵活指定路由。运行:

$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/hellocamel-spring.xml

config/hellocamel-spring.xml 相当于HelloRoute 代码,通过配置完成:

xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'

xsi:schemaLocation='

http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'>

使用camel建立的基于消息应用

为了路由监听camel-mina 提供的TCP端口,需要一个路由:

package deng.cameldemo;

import org.apache.camel.builder.RouteBuilder;

public class TcpMsgRoute extends RouteBuilder {

@Override

public void configure() throws Exception {

String port = System.getProperty('port', '12345');

from('mina:tcp://localhost:' + port + '?sync=false').

to('log:' + getClass().getName());

}

}

一切准备好了,运行:

$ bin/run-java deng.cameldemo.CamelRunner deng.cameldemo.TcpMsgRoute -Dport=12345

输出:

15:21:41 main INFO org.apache.camel.impl.DefaultCamelContext:1391 | Apache Camel 2.10.1 (CamelContext: camel-1) is starting

15:21:41 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled.

15:21:42 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters

15:21:42 main INFO org.apache.camel.component.mina.MinaConsumer:59 | Binding to server address: localhost/127.0.0.1:12345 using acceptor: org.apache.mina.transport.socket.nio.SocketAcceptor@2ffad8fe

15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:2045 | Route: route1 started and consuming from: Endpoint[mina://tcp://localhost:12345?sync=true]

15:21:42 main INFO org.apache.camel.management.DefaultManagementLifecycleStrategy:859 | StatisticsLevel at All so enabling load performance statistics

15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:1426 | Total 1 routes, of which 1 is started.

15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:1427 | Apache Camel 2.10.1 (CamelContext: camel-1) started in 0.505 seconds

15:21:42 main INFO deng.cameldemo.CamelRunner:93 | Camel started for CamelContext(camel-1)

服务器在12345端口等待用户客户端。

客户端代码,写一个TCP客户端:

package deng.cameldemo.client;

import java.io.FileReader;

import org.apache.camel.CamelContext;

import org.apache.camel.ProducerTemplate;

import org.apache.camel.impl.DefaultCamelContext;

import org.apache.commons.io.IOUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class TcpMsgSender {

public static void main(String[] args) throws Exception {

TcpMsgSender runner = new TcpMsgSender();

runner.run(args);

}

private static Logger logger = LoggerFactory.getLogger(TcpMsgSender.class);

public void run(String[] args) throws Exception {

String fileName = args.length > 0 ? args[0] : 'data/msg.txt';

String[] hostPort = (args.length > 1 ? args[1] : 'localhost:12345').split(':');

String host = hostPort[0];

String port = hostPort.length > 1 ? hostPort[1] : '12345';

logger.info('Sending tcp message {} to host={}, port={}', new Object[]{ fileName, host, port});

String text = IOUtils.toString(new FileReader(fileName));

logger.debug('File size={}', text.length());

CamelContext camelContext = new DefaultCamelContext();

ProducerTemplate producer = camelContext.createProducerTemplate();

producer.sendBody('mina:tcp://' + host + ':' + port + '?sync=false', text);

logger.info('Message sent.');

}

}

这个TcpMsgSender 将发送文本到服务器端点。运行:

$ bin/run-java deng.cameldemo.client.TcpMsgSender data/test-msg.json localhost:12345

输出:

15:22:35 main INFO deng.cameldemo.client.TcpMsgSender:24 | Sending tcp message data/test-msg.json to host=localhost, port=12345

15:22:35 main DEBUG deng.cameldemo.client.TcpMsgSender:27 | File size=47

15:22:35 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters

15:22:35 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled.

15:22:35 main INFO deng.cameldemo.client.TcpMsgSender:32 | Message sent.

服务器端我们还是可以用Spring配置替代TcpMsgRoute :

xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'

xsi:schemaLocation='

http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'>

保存为 config/tcpmsgserver-spring.xml,运行:

$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/tcpmsgserver-spring.xml

下面我们将产生JSON格式的文本。

输出你接受到文本,文本格式在data/test-msg.json:

{ 'firstName' : 'Zemian', 'lastName' : 'Deng' }

使用myMsgProcessor用来进行JSON和Java对象之间转换,创建 config/tcpmsgserver-json-spring.xml :

xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'

xsi:schemaLocation='

http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'>

'myMsgProcessor'代码如下:

package deng.cameldemo;

import org.apache.camel.builder.RouteBuilder;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.Map;

public class MyMsgProcessor {

private static Logger logger = LoggerFactory.getLogger(MyMsgProcessor.class);

public void process(Map data) {

logger.info('We should slice and dice the data: ' + data);

}

}

再次运行:

$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/tcpmsgserver-json-spring.xml

最后几行输出:

17:05:28 main INFO deng.cameldemo.CamelRunner:61 | Spring started.

17:05:35 Camel (tcpMsgServer) thread #3 - MinaThreadPool INFO deng.cameldemo.TcpMsgServer:96 | Exchange[ExchangePattern:InOnly, BodyType:String, Body:{ 'firstName' : 'Zemian', 'lastName' : 'Deng' }]

17:05:35 Camel (tcpMsgServer) thread #3 - MinaThreadPool INFO deng.cameldemo.MyMsgProcessor:11 | We should slice and dice the data: {lastName=Deng, firstName=Zemian}

Camel会自动转换的数据格式,客户端只发送JSON格式的纯文本,当服务器接收到它,它使用Jackson库包将其转换成一个Java的Map对象。然后通过map象导入到我们的处理器Bean中。

同样的道理,当你将业务逻辑写成一个或多个处理器bean,这是一个好主意,因为这将限制你的POJO逻辑到尽可能小的单位。当你做到这一点,那么你就可以最大限度地提高处理器的可重用性。如果做成一个混合的更大的POJO,很多业务逻辑混合,它也将很难测试。一旦你养成良好习惯,你就可以使用Camel骆驼以一种更有效的方式解决很多领域问题。

java camel_Apache Camel建立基于消息的应用相关推荐

  1. java抢单功能_基于消息队列的高并发抢单功能实现方法与流程

    本发明涉及嵌入式软件中间件,具体涉及一种基于消息队列的高并发抢单功能实现方法. 背景技术: 中间件是一种独立的系统软件或服务程序,分布式应用系统借助这种软件在不同的技术之间共享资源,管理计算资源和网络 ...

  2. java camel_Apache Camel到底是什么?

    小编典典 我说: Apache Camel是消息传递技术与路由的结合.它将消息传递的起点和终点连接在一起,从而允许将消息从不同的源传输到不同的目的地.例如:JMS-> JSON,HTTP-> ...

  3. camel mq_Camel:构建基于消息的应用程序

    camel mq 这是一篇长文章,包含三个单独的主题: Java的Apache Camel入门 使用CamelRunner改善路线的启动 使用Camel构建基于消息的应用程序 但是,由于我准备了包含所 ...

  4. Camel:构建基于消息的应用程序

    这是一篇长文章,包含三个单独的主题: Java的Apache Camel入门 使用CamelRunner改善路线的启动 使用Camel构建基于消息的应用程序 但是,由于我准备了包含所有这些材料的cam ...

  5. apache.camel_Apache Camel 2.18发布–包含内容

    apache.camel 本周发布了Apache Camel 2.18.0 . 此版本是重要版本,我将在此博客文章中重点介绍. Java 8 Camel 2.18是第一个需要Java 1.8的版本(例 ...

  6. apache camel_Apache Camel的性能调整思路

    apache camel 时不时地,我会以Camel速度较慢的观点来询问有关优化Camel应用程序的问题. 骆驼只是连接不同系统的粘合剂,路由引擎全部在内存中,并且不需要任何持久状态. 因此,在99% ...

  7. Java推送IOS通知消息

    Java推送IOS通知消息 公司需要做IOS消息推送,我负责后台代码的实现.写这篇文章也是将我踩坑得来的结果记录一下,分享一下. APN介绍 Apple 推送通知服务(APNs) 是远程通知功能的核心 ...

  8. 51 rtos系统 : MCUsystem 介绍 -- 基于消息队列

    一个基于消息循环的51操作系统,这个有点象VC的风格,大家看McuSystem.rar 看,这个不错的. ReadMe.txt 默认路径: D:\MCU51\ Version:MS1.01-2003. ...

  9. activiti异步执行_对基于消息队列的Activiti异步执行器进行基准测试

    activiti异步执行 一点历史 永无休止的一件事是,Activiti在某些非常大的规模的大型组织中的使用方式. 过去,这导致了各种优化和重构,其中包括异步执行器-替换旧的作业执行器. 对于未启动的 ...

最新文章

  1. vs如何设置对话框显示在最前面_【另存为】对话框的使用
  2. 最锋利的Visual Studio Web开发工具扩展:Web Essentials详解(转)
  3. UI基础 - UIScrollView
  4. java中类/对象的初始化顺序以及静态代码块的使用
  5. Handler造成Activity泄漏,用弱引用真的有用么?
  6. 微软算法100题26 左旋转字符串
  7. 微软发布 PowerToys 首个预览版,重启的 Windows 工具集
  8. 滴滴怒怼美团;阿里麻吉宝刷屏;B站、爱奇艺上市 | CSDN 极客头条
  9. 【前端基础进阶】JS原型、原型链、对象详解
  10. 2022考研数据结构_1 绪论
  11. Ansys命令流(按字母排列)
  12. 苹果电脑连接打印机操作
  13. IE8兼容html5视频播放
  14. mysql自定义函数的分号_Mysql自定义函数
  15. CCNP基础知识-交换技术
  16. FATAL Exited too quickly (process log may have details)
  17. 写一篇简单的TileMap入门教程
  18. 弹珠css3,使用CSS3实现的弹球小动画
  19. 课程设计 单项选择题标准化考试系统
  20. c3p0连接池使用完毕后连接返还

热门文章

  1. 折腾死人不偿命的电脑加固态和内存过程
  2. Windows Terminal 终极美
  3. 某公司办公室工作人员(网管)岗位职责
  4. iOS GCD(一)
  5. Proface触摸屏按钮互锁、弹出窗口、密码设置、报警设置案例
  6. [渝粤教育] 沈阳理工大学 复变函数与积分变换 参考 资料
  7. oracle split 以及 简单json解析存储过程
  8. 解决Ubuntu 18.04 LTS网络连接不稳定的问题
  9. atl offsetofclass
  10. selenium实战-同步网易云音乐歌单到qq音乐