Storm是一个分布式的实时的流式计算框架。

Storm运行有两种模式,分别是local与remote。

Storm的local就是单进程模式(运行在单一的JVM),local模式storm(注:下文将用storm-local来表示)整合Springboot,这与我们平常的开发方式没有多大区别。storm-local又被叫做测试模式。

Storm的remote是跨进程模式,spout与bolt运行在多个JVM中,也就是说,相互之间的消息通讯要通过远程调用的方式(跨进程)。local模式的storm(注:下文将用storm-remote来表示)与storm的开发方式差别,其实也不大(想想,也不可能很大,否则,一个不易于测试的东西,谁还想用),不过,理解起来就有很大区别了,一不小心,死都不知是怎么死的。

不管是那种方式,Storm都是必须通过Spring的factory(下文称为Spring BeanFacotry)才能获得目标bean。因此,我们设计了一个公共的工具,使可以在storm的两种运行模式下都可直接获得Spring BeanFacotry。

这个工具类比较关键,先贴出来。

package com.banling.stormspr.config;import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;/**取得Spring的上下文(也就是Factory,使通过Factory可以得到目标JavaBean)* * @author Ban**/
public class SpringContext implements ApplicationContextAware {private  static ApplicationContext applicationContext = null;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// TODO Auto-generated method stubsynchronized(this) {if(SpringContext.applicationContext == null){SpringContext.applicationContext  = applicationContext;}}}public static ApplicationContext getApplicationContext() {return applicationContext;}}

同时,注意,必须在SpringBoot启动后并由SpringBoot初始化这个工具类SpringContext。如何做到,下文介绍。

一、storm-local整合Springboot

在架构上,是这样的:

程序启动时,组件的启动顺序是这样的,先启动SpringBoot,然后在SpringBoot中启动Strom:

程序启动后,接着就是开始Storm运算,这时,程序运行是这样的:

下面列一个简单的storm-local例子,就是计算1到n的总数,为了扩展storm的知识点(测试),同时还做了些多余的逻辑,应用字段分组。

Storm-local源码请参考:https://github.com/banat020/storm-spr-local

1.1)数据输入

1.1.1)我们设计一个数据接口NumInService,用于获得源数据输入。这个数据接口的实例(接口的实现,请查看github源)将由Spring负责初始化并管理。

package com.banling.stormspr.service;public interface NumInService {int getNum();}

1.1.2)Storm获取数据输入

package com.banling.stormspr.numcount.node;import java.util.HashMap;
import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;import com.banling.stormspr.config.SpringContext;
import com.banling.stormspr.service.NumInService;/**数据输入* @author Ban**/
public class NumInSpout extends BaseRichSpout {/*** */private static final long serialVersionUID = 1L;private SpoutOutputCollector collector;@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {// TODO Auto-generated method stubthis.collector=collector;}@Overridepublic void nextTuple() {// TODO Auto-generated method stubNumInService numInService=(NumInService)SpringContext.getApplicationContext().getBean("numInService");int curNum=numInService.getNum();if(curNum==-1) {//计算已经结束return;}int groupFlag=curNum%10;//用于分组Values values = new Values(curNum,groupFlag);collector.emit(values);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("inNum","groupFlag"));}@Overridepublic Map<String, Object> getComponentConfiguration() {Map<String, Object> map=new HashMap<String, Object>();//仅仅是测试,因此不要跑得太快了,设置为每50ms发送一次数据map.put("topology.sleep.spout.wait.strategy.time.ms", 50);return map;}
}

注意NumInService numInService=(NumInService)SpringContext.getApplicationContext().getBean("numInService"),就是Spout通过Spring BeanFacotry获得NumInService(数据输入接口)的一个实例。

1.2)数据分组计算与汇总计算

1.2.1)设计数据运算接口NumCountService,这个接口的实例也是由Spring BeanFactory管理。

package com.banling.stormspr.service;public interface NumCountService {int count(int in,int curNum);
}

1.2.2)对输入的数据进行分组运算GroupCountBolt

package com.banling.stormspr.numcount.node;import java.util.Map;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.banling.stormspr.config.SpringContext;
import com.banling.stormspr.service.NumCountService;/**演示分组* @author Ban**/
public class GroupCountBolt extends BaseRichBolt {/*** */private static final long serialVersionUID = 1L;private static final Logger LOGGER = LoggerFactory.getLogger(GroupCountBolt.class);private OutputCollector collector;private int groupCount=0;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.collector=collector;}@Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubNumCountService numCountService=(NumCountService)SpringContext.getApplicationContext().getBean("numCountService");int inNum=input.getIntegerByField("inNum");int groupFlag=input.getIntegerByField("groupFlag");//用于分组的字段groupCount=numCountService.count(inNum,groupCount);LOGGER.info(" This Thread is {}, In data is {},GroupFlag is {}, and the curCount value is {}",Thread.currentThread().getName(),inNum,groupFlag,groupCount);Values values=new Values(groupCount,inNum);collector.emit(values);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("GroupCount","inNum"));}}

这句代码NumCountService numCountService=(NumCountService)SpringContext.getApplicationContext().getBean("numCountService"),是Bolt通过Spring BeanFacory获得目标Bean。

1.2.3)对分组的运行结果进行汇总运算TotalBolt

package com.banling.stormspr.numcount.node;import java.util.Map;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.banling.stormspr.config.SpringContext;
import com.banling.stormspr.service.NumCountService;/**设计这个Bolt为了演示:经过前面的分组计算后,是否有序得到输入的数据。* * @author Ban**/
public class TotalBolt extends BaseRichBolt {/*** */private static final long serialVersionUID = 1L;private static final Logger LOGGER = LoggerFactory.getLogger(TotalBolt.class);private OutputCollector collector;private int totalCount=0;@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {// TODO Auto-generated method stubthis.collector=collector;}@Overridepublic void execute(Tuple input) {// TODO Auto-generated method stubNumCountService numCountService=(NumCountService)SpringContext.getApplicationContext().getBean("numCountService");int inNum=input.getIntegerByField("inNum");totalCount=numCountService.count(totalCount, inNum);LOGGER.info(" The inNum is {}, the Total is {} for now ", inNum,totalCount);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub}}

这句代码NumCountService numCountService=(NumCountService)SpringContext.getApplicationContext().getBean("numCountService"),是Bolt通过Spring BeanFacory获得目标Bean。

到此,storm的业务运算已开发完成,但是还剩下与Springboot如何整合的事还没有做。

1.3)Storm以local模式与Springboot整合

1.3.1)Storm的配置

package com.banling.stormspr.numcount;import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.banling.stormspr.numcount.node.GroupCountBolt;
import com.banling.stormspr.numcount.node.NumInSpout;
import com.banling.stormspr.numcount.node.TotalBolt;public class LocalStorm {private static final Logger LOGGER = LoggerFactory.getLogger(LocalStorm.class);public static void startStorm() {final String NUM_IN_SPOUT="NumInSpout";final String HALF_COUNT_BOLT="HalfCountBolt";final String TOTAL_COUNT_BOLT="TatalCountBolt";TopologyBuilder builder = new TopologyBuilder();builder.setSpout(NUM_IN_SPOUT, new NumInSpout());//如果是多线程,那么,HalfCountBolt处理数据总体是无序的。//不过,可以实现分组有序处理,就像本例子一样;但是,这样一个计算节点TotalBolt接收到的数据就是无序的了。builder.setBolt(HALF_COUNT_BOLT, new GroupCountBolt(),10).fieldsGrouping(NUM_IN_SPOUT, new Fields("groupFlag"));builder.setBolt(TOTAL_COUNT_BOLT, new TotalBolt()).localOrShuffleGrouping(HALF_COUNT_BOLT);Config config=StormLocalConfig.config();//local模式LocalCluster cluster = new LocalCluster();LOGGER.info(" Storm Run In Local Way ... ");cluster.submitTopology("LOCAL_TOPOLOGY", config, builder.createTopology());}
}

1.3.2)Springboot的启动入口

package com.banling.stormspr;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;@SpringBootApplication
@Import(com.banling.stormspr.config.SpringContext.class)
public class StormSprApplication {private static final Logger LOGGER = LoggerFactory.getLogger(StormSprApplication.class);public synchronized  static void run(String... args) {LOGGER.info("SpringBoot is starting");SpringApplication springApplication = new SpringApplication(StormSprApplication.class);//忽略Spring启动信息日志springApplication.setLogStartupInfo(false);springApplication.run(args);LOGGER.info("SpringBoot lauched");}}

@Import(com.banling.stormspr.config.SpringContext.class)表示,在启动Springboot后,同时初始化SpringContext。

1.3.3)通过Springboot的CommandLineRunner启动Storm。

package com.banling.stormspr;import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;import com.banling.stormspr.numcount.LocalStorm;@Component
public class StartStormCommandLineRunner implements CommandLineRunner {@Overridepublic void run(String... args) throws Exception {// TODO Auto-generated method stubLocalStorm.startStorm();}}

1.4)测试

整合完成,我们查看下测试结果:

同时,可以看到,打印的顺序与数据的输入顺序不一样,也就是说,运算的顺序与数据的输入顺序是不一样的。

主要原因是:业务运算节点是被我设计为多线程并发执行,因此不保存数据的运算与输入的顺序一致,总是,可以绝对保证最终运算结果的正确。

二、storm-remote整合Springboot

在架构与启动顺序上,与storm-local的整合模式很不一样。

storm-remote构架,springboot运行在storm容器(平台)中,就像是springboot运行在tomcat中一样回事:

storm-remote模式中组件的启动顺序是,先启动storm,接着在storm中启动SpringBoot。这个与Local模式的启动顺序完全相反。

程序启动后(或者说是提交运程作业后),接着开始storm运算:

storm-remote模式整合SpringBoot示例,业务运算与storm-local一样,不一样的地方是,什么时候初始化Spring BeanFactory。

我们应该注意到,Storm的每个节点都是实现了Serializable接口,在生产环境中(也就是remote模式)每个节点可能会在不同的进程(不同的JVM)中被反序列化。Storm-remote整合SpringBoot时,是SpringBoot在Storm容器中被初始化并被管理,每个进程(这个进程就是Storm)有自己的Spring BeanFactory。

综上所述,我们已经知道,storm-remote中,第一个节点都是独立的了,不可能通过一个JVM(再次说明,一个JVM就是一个进程)共享数据,我们必须在每个节点初始化时,同时初始化一个Spring BeanFactory;因此,相对于storm-local的整合SpringBoot的方式,我们只要着重改造初始化SpringBoot的时机即可。

Storm-remot源码请参考:https://github.com/banat020/storm-spr-remote

2.1)改造SpringBoot的启动程序与Main程序。

2.1.1)SpringBoot的启动程序

package com.banling.stormspr;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;import java.util.concurrent.atomic.AtomicBoolean;@SpringBootApplication
@Import(com.banling.stormspr.config.SpringContext.class)
public class StormSprApplication {private static final Logger LOGGER = LoggerFactory.getLogger(StormSprApplication.class);private static AtomicBoolean flag=new AtomicBoolean(false);public synchronized  static void run(String... args) {if(flag.compareAndSet(false, true)) {LOGGER.info("SpringBoot is starting");SpringApplication springApplication = new SpringApplication(StormSprApplication.class);//忽略Spring启动信息日志springApplication.setLogStartupInfo(false);springApplication.run(args);LOGGER.info("SpringBoot launched");}}}

2.1.2)Main程序

package com.banling.stormspr;import com.banling.stormspr.numcount.RemoteStorm;public class StartStorm {public static void main(String[] args) {RemoteStorm.start();}
}

2.2)(mop.xml文件)改造资源引用与打包方式。

2.2.1)修改原

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.0.6.RELEASE</version>

<relativePath/>

</parent>

<dependencyManagement>

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-dependencies</artifactId>

<version>2.0.6.RELEASE</version>

<type>pom</type>

<scope>import</scope>

</dependency>

</dependencies>

</dependencyManagement>

2.2.2)在<plugins>节点中增加下列plugin

<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>2.6</version><configuration><archive><manifest><addClasspath>true</addClasspath><classpathPrefix>lib/</classpathPrefix><mainClass>com.banling.stormspr.StartStorm</mainClass></manifest></archive></configuration></plugin><!-- 用Shade打包 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.0.6.RELEASE</version></dependency></dependencies><configuration><keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope><createDependencyReducedPom>true</createDependencyReducedPom><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"><resource>META-INF/spring.factories</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemas</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.banling.stormspr.StartStorm</mainClass></transformer></transformers></configuration></execution></executions></plugin>

2.3)在所有运算结点中增加初始化SpringBoot

也就是,在Spout中的open方法中,在Bolt中的prepare方法中,初始化SpringBoot:

StormSprApplication.run();

2.4)测试效果

2.4.1)向生产环境提交作业:

$STORM_HOME/bin/storm jar storm-spr-remote-0.0.1-SNAPSHOT.jar com.banling.stormspr.StartStorm

在supervisor机器上,在目录$STORM_HOME/logs可查看执行日志。

Storm整合Springboot相关推荐

  1. 使用Gradle整合SpringBoot+Vue.js-开发调试与打包

    为什么80%的码农都做不了架构师?>>>    非常感谢两位作者: kevinz分享的文章<springboot+gradle+vue+webpack 组合使用> 首席卖 ...

  2. 微信公众号授权步骤详细步骤介绍和整合springboot开发(java版)

    文章有不当之处,欢迎指正,如果喜欢微信阅读,你也可以关注我的微信公众号:好好学java,获取优质学习资源. 一.微信公众号授权步骤 首先到微信公众平台注册账号,可以看到有四种类型(服务号,订阅号,小程 ...

  3. spring日志报错提醒_使用爬虫框架htmlunit整合springboot出现的一个不兼容问题

    使用爬虫框架htmlunit整合springboot不兼容的一个问题 本来使用htmlunit爬虫爬取数据非常正常好用,之前一直是直接java程序或者整合Javaswing界面,都没有问题,但是后来整 ...

  4. java kafka client_Kafka Java Client基本使用及整合SpringBoot

    kafka-clients 添加依赖 org.apache.kafka kafka-clients 2.5.0 消费者 Consumer 代码上总体可以分为三部分:消费者的配置消费者的配置在 org. ...

  5. [RabbitMQ]整合SpringBoot

    整合SpringBoot 创建项目 引入依赖 <dependencies><!--RabbitMQ 依赖--><dependency><groupId> ...

  6. mall整合SpringBoot+MyBatis搭建基本骨架

    本文主要讲解mall整合SpringBoot+MyBatis搭建基本骨架,以商品品牌为例实现基本的CRUD操作及通过PageHelper实现分页查询. mysql数据库环境搭建 下载并安装mysql5 ...

  7. Shiro 整合 SpringBoot

    Shiro 整合 SpringBoot shiro主要有三大功能模块 Subject:主体,一般指用户. SecurityManager:安全管理器,管理所有Subject,可以配合内部安全组件.(类 ...

  8. JWT认证原理、整合springboot实战应用

    JWT认证原理.整合springboot实战应用 1.什么是JWT 2.JWT能做什么 3.与传统的session认证做对比 4.JWT结构 5.JWT的封装方法 1.什么是JWT JWT(Json ...

  9. 基于 SpringBoot2.0+优雅整合 SpringBoot+Mybatis

    SpringBoot 整合 Mybatis 有两种常用的方式,一种就是我们常见的 xml 的方式 ,还有一种是全注解的方式.我觉得这两者没有谁比谁好,在 SQL 语句不太长的情况下,我觉得全注解的方式 ...

最新文章

  1. 神经网络 | Hopfield神经网络(附python源代码)
  2. 程序员解决20年前的加密问题
  3. pythonsql注入_python使用mysql,sql注入问题
  4. Django1.9开发博客06- 模板继承
  5. 数据结构与算法简单总结()
  6. ssm注解配置连接mysql_基于注解和配置类的SSM(Spring+SpringMVC+Mybatis)项目详细配置...
  7. csgo fps不稳定和服务器,win10玩csgofps不稳定怎么办
  8. Web服务器配置管理
  9. 爽一把手写Bundle Adjustment
  10. 电脑Mac地址更改后有什么害处?怎么改回原来的?
  11. 索尼展示基于MicroLED技术的16K显示屏:约780吋
  12. 你知道一本书封面的著、编著、编、主编分别是什么意思吗?
  13. H5页面嵌套在APP中的坑
  14. vue3.x 重复点击路由报错
  15. Python之Flask框架(一)
  16. 22 个鲜为人知的 CSS 高招让你技高一筹
  17. Chrome主题下载网站安装简单教程
  18. oracle表数据如何导出成dbf,怎么将EXCEL导成DBF?《dbf导出excel数据》
  19. win10安装VMware死机
  20. 淘宝U站排名揭秘-看淘宝优站排名规则解析U站排名优化策略

热门文章

  1. linux中ss命令
  2. https中文乱码问题
  3. Mac OS下maven安装与配置
  4. 正则表达式中匹配开头关键词和结尾关键词
  5. 经济学目的与概论——《可以量化的经…
  6. oracle判断是否相同,oracle 判断字符串是否相等
  7. 网页上传不了文件 服务器错误,网页为何总是出现错误?
  8. 2012-2020蓝桥C++ B组蓝桥杯省赛真题(第一题)
  9. debian启动mysql_debian 修改启动服务器
  10. pytorch train.py与test.py代码流程