一、DRPC简介和工作流程

1、 DRPC 简介

分布式RPC( distributed RPC,DRPC) 用于对storm上大量的函数调用进行并行计算。对于每一次函数调用,storm集群上运行的拓扑接收调用函数的参数信息作为输入流,并将计算结果作为输出流发射出去。

一句话概括:storm进行计算,根据客户端提交的请求参数,而返回storm计算的结果。

DRPC通过DRPC Server来实现,DRPC Server的整体工作过程如下:

接收到一个 RPC 调用请求;

发送请求到storm上的拓扑;

从storm上接收计算结果;

将计算结果返回给客户端;

注:在client客户端来看,一个DRPC调用看起来和普通的RPC调用没什么区别;

2、工作流程

DRPC的工作流程如下

client客户端给DRPC Server发送要被调用执行DRPC函数名称及参数。

storm上的topology通过DRPC Spout实现这一函数,从DRPC Server接收到函数调用流,DRPC Server为每次函数调用生成唯一的id;

storm上运行的Topology开始计算结果,最后通过一个 ReturnResults 的Bolt连接到DRPC服务器,发送指定id的计算结果给DRPC Server。

DRPC Server 通过使用之前为每个函数调用生成的id,将结果关联到对应的发起调用的client,将计算结果返回给client。

3、LinearDRPCTopologyBuilder类

LinearDRPCTopologyBuilder 类是官方封装好的,专门用于DRPC的builder,但是从 storm0.82 已经不推荐使用该类,原因是因为在trident已经有更好的封装;

Storm自带了一个称作 LinearDRPCTopologyBuilder 的topology builder, 它把实现DRPC的几乎所有步骤都自动化了。
包括:
1. 构建spout;
2. 向DRPC Server返回结果;
3. 为bolt提供函数用于对tuple进行处理;

二、官方DRPC入门案例

在官方给出的storm-starter-master工程,storm.starter包,有DRPC案例;

1、BasicDRPCTopology案例

用官方封装好的 LinearDRPCTopologyBuilder 类,比较简单。本地提交。

package storm.starter;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;/*** This topology is a basic example of doing distributed RPC on top of Storm. It implements a function that appends a* "!" to any string you send the DRPC function.* <p/>* See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on doing distributed RPC on top of* Storm.*/
public class BasicDRPCTopology {public static class ExclaimBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);// 发射的第一列是请求id,第二列是数据。对收到的数据加上 "!" 之后返回到 DRPC Server,collector.emit(new Values(tuple.getValue(0), input + "!"));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "result"));}}public static void main(String[] args) throws Exception {// LinearDRPCTopologyBuilder 类是官方封装好的,专门用于DRPC的builder。且该builder没有数据源;// 构造函数的入参,是函数名称 "exclamation" ,在storm上必须唯一;LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");// 设置bolt的实例为3个builder.addBolt(new ExclaimBolt(), 3);Config conf = new Config();conf.setDebug(false);// 如果没有参数,本地模式提交if (args == null || args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));// 循环,每次调用函数和传参数。这里是客户端代码for (String word : new String[]{ "hello", "goodbye" }) {System.err.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));}// 运行程序后,会报错,注释这2行就不会报错
//      cluster.shutdown();
//      drpc.shutdown();}else {     // 有参数,分布式提交。分布式提交不用传 drpcconf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());}}
}

运行程序后,控制台打印日志

Result for "hello": hello!
Result for "goodbye": goodbye!

2、ManualDRPC案例

用TopologyBuilder 来实现 DRPC,实现bolt可以个性化,增加功能。本地提交。

package drpc;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;public class ManualDRPC {public static class ExclamationBolt extends BaseBasicBolt {@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("result", "return-info"));}@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String arg = tuple.getString(0);Object retInfo = tuple.getValue(1);collector.emit(new Values(arg + "!!!", retInfo));}}public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();LocalDRPC drpc = new LocalDRPC();// 构造 DRPCSpoutDRPCSpout spout = new DRPCSpout("exclamation", drpc);builder.setSpout("drpc", spout);builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");LocalCluster cluster = new LocalCluster();Config conf = new Config();conf.setDebug(false);cluster.submitTopology("exclaim", conf, builder.createTopology());System.out.println(drpc.execute("exclamation", "aaa"));System.out.println(drpc.execute("exclamation", "bbb"));}
}

运行程序后,控制台打印日志

aaa!!!
bbb!!!

三、远程模式DRPC

1、远程DRPC简介

远程模式也就是分布式提交。远程模式的DRPC与本地模式的DRPC不同之处在于:远程模式DRPC不需要模拟DRPC Server,而是通过在真实的Storm集群中配置DRPC Server来完成,远程模式通过调用builder的createRemoteTopology方法来构建topology。

在实际的storm集群上运行DRPC也一样很简单,只需要完成以下步骤:

a、启动DRPC Server(s);

b、配置DRPC Server(s)地址;

c、向storm集群提交DRPC拓扑。

首先,通过storm脚本启动DRPC Server:

bin/storm drpc

然后,在storm集群中配置DRPC Server地址,这就是DRPCSpout读取函数调用请求的地方,这一步的配置可以通过storm.yaml文件或者拓扑的配置来完成。通过storm.yaml文件的配置方式如下(设置主机地址):

drpc.servers:- "drpc1.foo.com"- "drpc2.foo.com"

2、准备代码

(1)ManualDRPC,主程序类,包含bolt类。

package drpc;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;public class ManualDRPC {public static class ExclamationBolt extends BaseBasicBolt {@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("result", "return-info"));}@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String arg = tuple.getString(0);Object retInfo = tuple.getValue(1);// 第一列是数据,在传进来的字符串后面加上3个感叹号collector.emit(new Values(arg + "!!!", retInfo));}}public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();LocalDRPC drpc = new LocalDRPC();if (args.length > 0) {// 构造 DRPCSpout,分布式模式不需要第二个参数drpcDRPCSpout spout = new DRPCSpout("exclamation");builder.setSpout("drpc", spout);builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");Config conf = new Config();conf.setDebug(false);try {StormSubmitter.submitTopology(args[0], conf, builder.createTopology());} catch (AlreadyAliveException | InvalidTopologyException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}} else {// 构造 DRPCSpout,本地模式需要第二个参数drpcDRPCSpout spout = new DRPCSpout("exclamation", drpc);builder.setSpout("drpc", spout);builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");LocalCluster cluster = new LocalCluster();Config conf = new Config();conf.setDebug(false);cluster.submitTopology("exclaim", conf, builder.createTopology());}// 上传到storm集群做分布式提交,下面两行要注释
//    System.out.println(drpc.execute("exclamation", "aaa"));
//    System.out.println(drpc.execute("exclamation", "bbb"));}
}

(2)MyDRPCClient,客户端类

package drpc;import org.apache.thrift7.TException;import backtype.storm.generated.DRPCExecutionException;
import backtype.storm.utils.DRPCClient;public class MyDRPCClient {public static void main(String[] args) {// storm 集群的master主机地址DRPCClient client = new DRPCClient("192.168.178.131", 3772);try {String result = client.execute("exclamation", "hello");System.out.println(result);} catch (TException | DRPCExecutionException e) {e.printStackTrace();}}
}

(3)现在本地自己,运行ManualDRPC类,用的是本地模式;没问题再打包。

3、打包发布到storm集群

(1)项目右键,Run As -> maven install,生成安装包,在工程target目录下有2个jar包,长名称的包含依赖,这个才是可用的。

(2)用filezilla,把 storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar 包传送到 hadoop-senior 主机的指定目录(因人而异,不影响结果) /opt/datas/stormjars

4、配置DRPC

在 /opt/stormmodules/storm-0.9.0.1/conf 目录下的  storm.yaml 加上以下内容(主机地址可变)

 drpc.servers:- "hadoop-senior.ibeifeng.com"- "hadoop-senior02.ibeifeng.com"

5、启动storm集群

首先启动zookeeper,在集群的每一台主机的 zookeeper 安装的 bin 目录下运行命令

./zkServer.sh start

启动storm集群,storm的master主机,storm安装的bin目录下运行2个命令,第二个命令是启动监控界面

nohup ./storm nimbus &
nohup ./storm ui &

storm的slave主机,storm安装的bin目录下运行命令

nohup ./storm supervisor &

打开监控界面 http://hadoop-senior:8081/ ,可以看到集群已经启动

6、启动 DRPC 服务

在集群的每一台主机的 storm安装 bin 目录下运行命令:

nohup storm drpc &

可以在监控界面看到drpc已经启动

7、提交拓扑任务

进入上文保存storm案例jar包的目录 /opt/datas/stormjars ,运行命令,后面2个参数依次表示 类名 drpc.ManualDRPC,拓扑任务名为 testDRPC。类名必须包含包名,不然寻找不到具体的类。

storm jar ./storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar drpc.ManualDRPC testDRPC

可以在监控界面看到拓扑任务已经启动

8、从本地发起调用

从上文的 MyDRPCClient 客户端类,运行代码,发起调用,

package drpc;import org.apache.thrift7.TException;import backtype.storm.generated.DRPCExecutionException;
import backtype.storm.utils.DRPCClient;public class MyDRPCClient {public static void main(String[] args) {// storm 集群的master主机地址DRPCClient client = new DRPCClient("192.168.178.131", 3772);try {String result = client.execute("exclamation", "hello");System.out.println(result);} catch (TException | DRPCExecutionException e) {e.printStackTrace();}}
}

入参数是 "hello",可以看到控制台打印出 "hello!!!" ,说明调用拓扑任务成功

hello!!!

【Storm】DRPC精解和案例分析相关推荐

  1. EMD算法之Hilbert-Huang Transform原理详解和案例分析

    目录 Hilbert-Huang Transform 希尔伯特-黄变换 Section I 人物简介 Section II Hilbert-Huang的应用领域 Section III Hilbert ...

  2. [SV]SystemVerilog進程之fork join专题详解及案例分析

                SystemVerilog進程之fork...join专题详解及案例分析  目錄 一.fork-join 1.1.fork join example, 二.fork-join_ ...

  3. 清风数学建模学习笔记——系统(层次)聚类原理详解及案例分析

    系统聚类   系统聚类的合并算法通过计算两类数据点间的距离,对最为接近的两类数据点进行组合,并反复迭代这一过程,直到将所有数据点合成一类,并生成聚类谱系图.此外,系统聚类可以解决簇数 K 的取值问题, ...

  4. 清风数学建模学习笔记——主成分分析(PCA)原理详解及案例分析

    主成分分析   本文将介绍主成分分析(PCA),主成分分析是一种降维算法,它能将多个指标转换为少数几个主成分,这些主成分是原始变量的线性组合,且彼此之间互不相关,其能反映出原始数据的大部分信息. 一般 ...

  5. 一条标准SQL语句是怎么执行之“步步惊心”过程详解与案例分析

    SQL逻辑执行过程详解(限标准SQL) 表与数据 -- 1 创建 HR.Employees表 CREATE TABLE HR.Employees (empid INT NOT NULL IDENTIT ...

  6. python sql注入脚本_python辅助sql手工注入猜解数据库案例分析

    发现存在sql注入漏洞 简单一点可以直接用sqlmap工具暴库 但是如果想深入理解sql注入的原理,可以尝试手工注入,配合python脚本实现手工猜解数据库 首先hachbar开启 获取cms登录后的 ...

  7. RACI 职责分配矩阵 模型使用详解及案例分析

    一.RACI产生背景     RACI是项目管理中的人力资源管理方法.一个项目团 队的成员往往来自于不同背景的各个部门,这些成员受部门经理和项目经理的双重管辖.由于这些人往往是临时组织起来的,并且项目 ...

  8. RACI职责分配矩阵模型使用详解及案例分析

    原文链接:https://blog.csdn.net/buding_pmp/article/details/54881486,有增删. 一.RACI产生背景 RACI是项目管理中的人力资源管理方法.一 ...

  9. 偏最小二乘回归分析原理详解和案例分析实例

    偏最小二乘回归分析原理详解 背景 偏最小二乘回归分析 Partial least squares regression analysis 基本思想 建模步骤 步骤一:分别提取两变量组的第一对成分,并使 ...

最新文章

  1. R语言使用gt包和gtExtras包优雅地、漂亮地显示表格数据:使用gtExtras包添加一个图,显示表中某一列中的数字、并自定义表格数据显示的主题格式、并自定义数值数据的格式(例如百分比)
  2. 以太坊DApp开发环境搭建
  3. day29Struts 类型转换和自定义类型转换,input视图
  4. 2018学校计算机 远程教学工作总结,2018年远程教育工作总结范文
  5. 互联网架构设计漫谈 (5)-搞清SpringCloud
  6. 通过DMVS采集并存储SQL Server性能计数器数据
  7. leetcode哈希表解决异位词问题
  8. 【数据处理】奇异值分解(SVD) 数据降噪的python实现
  9. ubuntu下查看opencv版本
  10. Window上装Linux系统的便捷方法,简单又省事!
  11. 2018蓝桥杯校选复现3
  12. 如何打开.exe文件
  13. HDFS的设计目标是什么?
  14. 同花顺炒股指标定制-K线只有红绿2个颜色怎么行?
  15. 华章IT图书书讯(2011年第6期)
  16. HSV/HSB/HSL 色相、饱和度、亮度的色彩模型
  17. UFO-ViT:没有Softmax的高性能线性视觉Transformer
  18. 高德地图定位蓝点不显示问题
  19. HTML期末学生大作业-电影网站html+css+javascript
  20. 【LeetCode-1109】航班预订统计<Java版>

热门文章

  1. 最近几天小说站的观察
  2. 安装 timescaledb 使用navcat连接 创建 hypertable
  3. Halcon直线检测
  4. 交货单 增强 VL01N:LE_SHP_DELIVERY_PROC自动增加批次号
  5. pycharm是文字替换模式
  6. 华为HCIA鲲鹏云学习Linux指令|CSDN创作打卡
  7. Vuejs——(9)组件——props数据传递
  8. 材料硕士转行计算机经验分享(Java开发)
  9. 简单实现微信小程序 input 的双向绑定
  10. 一文读懂Back Pressure