Storm DRPC 介绍
原文链接:http://blog.csdn.net/jmppok/article/details/16827837,转载请注明原文出处
问题导读:
1.DRPC的作用是什么?
2.DRPC工作流是怎样的?
3.DRPC分为几部分?
4.服务端有几部分组成?
1. DRPC介绍
Storm是一个分布式实时处理框架,它支持以DRPC方式调用.可以理解为Storm是一个集群,DRPC提供了集群中处理功能的访问接口.
其实即使不通过DRPC,而是通过在Topoloye中的spout中建立一个TCP/HTTP监听来接收数据,在最后一个Bolt中将数据发送到指定位置也是可以的。这是后话,后面再进行介绍。而DPRC则是Storm提供的一套开发组建,使用DRPC可以极大的简化这一过程。
Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算。DRPC的storm topology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流。
DRPC其实不能算是storm本身的一个特性, 它是通过组合storm的原语spout,bolt, topology而成的一种模式(pattern)。本来应该把DRPC单独打成一个包的, 但是DRPC实在是太有用了,所以我们我们把它和storm捆绑在一起。
2.DRPC工作流介绍
Distributed RPC是由一个”DPRC Server”协调的(storm自带了一个实现)。DRPC服务器协调
1) 接收一个RPC请求。
2) 发送请求到storm topology
3) 从storm topology接收结果。
4) 把结果发回给等待的客户端。
从客户端的角度来看一个DRPC调用跟一个普通的RPC调用没有任何区别。比如下面是客户端如何调用RPC: reach方法的,方法的参数是: http://twitter.com。
DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach","http://twitter.com");
DRPC的工作流大致是这样的:
客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数。实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函数调用流。每个函数调用被DRPC服务器标记了一个唯一的id。
这个topology然后计算结果,在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器,并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识)。DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。
DRPC包括服务端和客户端两部分
1)服务端
服务端由四部分组成:包括一个DRPC Server, 一个 DPRC Spout,一个Topology和一个ReturnResult。
在实际使用中,主要有三个步骤:
a.启动Storm中的DRPC Server;
首先,修改Storm/conf/storm.yaml中的drpc server地址;需要注意的是:必须修改所有Nimbus和supervisor上的配置文件,设置drpc server地址。否则在运行过程中可能无法返回结果。
然后,通过 storm drpc命令启动drpc server。
b.创建一个DRPC 的Topology,提交到storm中运行。
该Toplogy和普通的Topology稍有不同,可以通过两种方式创建:
创建方法一:直接使用 Storm 提供的LinearDRPCTopologyBuilder。 (不过该方法在0.82版本中显示为已过期,不建议使用)
LinearDRPCTopologyBuilder 可以很方便的创建一个DRPC 的Topology。
DRPC核心业务逻辑位于Bolt中,代码如下:
public static class ExclaimBolt extends BaseBasicBolt {public void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);collector.emit(new Values(tuple.getValue(0), input + "!"));}public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}public static void main(String[] args) throws Exception {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");builder.addBolt(new ExclaimBolt(), 3);Config conf = new Config();conf.setDebug(true);if (args == null || args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpc-demo", conf,builder.createLocalTopology(drpc));for (String word : new String[] { "hello", "goodbye" }) {System.err.println("Result for "" + word + "": "+ drpc.execute("exclamation", word));}cluster.shutdown();drpc.shutdown();} else {// conf.setNumWorkers(3);StormSubmitter.submitTopology("exclamation", conf,builder.createRemoteTopology());}
}
创建方法二:
直接使用 Storm 提供的通用TopologyBuilder。 不过需要自己手动加上开始的DRPCSpout和结束的ReturnResults。
其实Storm 提供的LinearDRPCTopologyBuilder也是通过这种封装而来的。
TopologyBuilder builder = new TopologyBuilder(); //开始的Spout
DRPCSpout drpcSpout = new DRPCSpout("exclamation");
builder.setSpout("drpc-input", drpcSpout,5);//真正处理的Bolt
builder.setBolt("cpp", new CppBolt(), 5).noneGrouping("drpc-input");//结束的ReturnResults
builder.setBolt("return", new ReturnResults(),5).noneGrouping("cpp");Config conf = new Config();
conf.setDebug(false);
conf.setMaxTaskParallelism(3);try { StormSubmitter.submitTopology("exclamation", conf,builder.createTopology());
} catch (Exception e)
{e.printStackTrace();
}
c.通过DRPCClient对Cluster进行访问
需要修改客户端配置文件 ~/.storm/storm.yaml,配置drpc server的地址。修改方法可storm服务端一样。
访问代码就很简单了:
DRPCClient client = new DRPCClient("10.100.211.232", 3772);
String result = client.execute("exclamation","test");
注意如果是本地模式,topology的提交和drpc的访问都有不同。
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); // 访问
for (String word : new String[] { "hello", "goodbye" }) { System.err.println("Result for "" + word + "": " + drpc.execute("exclamation", word));
} cluster.shutdown();
drpc.shutdown();
服务端完整代码:
package com.david.stormtest.drpc;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;/*** Created by david on 3/24/17.*/
public class BasicDrpcTopology {public static class ExclaimBolt extends BaseBasicBolt {public void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);collector.emit(new Values(tuple.getValue(0), input + "!"));}public void declareOutputFields(OutputFieldsDeclarer declarer) {//定义bolt输出字段declarer.declare(new Fields("id", "result"));}}public static void main(String[] args) throws Exception {//线性分布式rpc拓扑类LinearDRPCTopologyBuilder builder =new LinearDRPCTopologyBuilder("exclamation");//添加Bolt,设置其并行度为3builder.addBolt(new ExclaimBolt(), 3);Config conf = new Config();if (args == null || args.length == 0) {//如果没有传入参数,则使用本地分布式rpcLocalDRPC drpc = new LocalDRPC();//使用本地集群LocalCluster cluster = new LocalCluster();//提交拓扑结构cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));//发送制定字符串,接收drpc server端返回的结果;真实集群环境下,需要另写client端程序去获取drpc server端的响应for (String word : new String[]{"hello", "goodbye"}) {System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));}cluster.shutdown();drpc.shutdown();} else {conf.setNumWorkers(3);StormSubmitter.submitTopology(BasicDrpcTopology.class.getSimpleName(),conf,builder.createRemoteTopology());}}}
2)客户端
客户端在上面服务端中已经介绍了。
客户端完整代码:
package com.david.stormtest.drpc;import backtype.storm.utils.DRPCClient;/*** Created by david on 3/24/17.*/
public class DrpcExclaim {public static void main(String[] args) throws Exception {DRPCClient client = new DRPCClient("hadoop1", 3772);for(String word : new String[]{"storm","is","amazing"}){System.out.println(client.execute("exclamation", word));}}}
程序将执行DRPC服务端,BasicDrpcTopology中的ExclaimBolt中的业务逻辑,将client端发送的消息,经过简单拼装(直接加上!符号),返回给DRPC的client,产生如下结果:
storm!
is!
amazing!
Storm DRPC 介绍相关推荐
- Storm DRPC 使用及访问C++ Bolt问题的解决方法
原创文章,欢迎转载,转载请注明出处:http://blog.csdn.net/jmppok/article/details/16840231 参考1: storm下运行C++程序(一) 参考2: St ...
- storm drpc
转 http://www.cnblogs.com/panfeng412/archive/2012/07/02/storm-common-patterns-of-distributed-rpc.html ...
- Storm DRPC 使用
来源:http://blog.csdn.net/jmppok/article/details/16839363 1. DRPC介绍 Storm是一个分布式实时处理框架,它支持以DRPC方式调用.可以理 ...
- J storm战队成员_DOTA2J.Storm战队介绍-DOTA2ESL孟买站预选赛J.Storm战队介绍_牛游戏网攻略...
<DOTA2>ESL孟买站预选赛J.Storm战队介绍!由ESL主办ESL孟买站各大赛区预选赛热血开战!J.Storm战队作为本次预选赛直邀战队在比赛中会有怎样的表现呢?下面一起来看看小编 ...
- Storm DRPC环境搭建笔记
Storm DRPC环境搭建笔记 By Mickey.Pro 1. 安装系统 CentOS 6.3 64bit minimal http://www.osyunwei.com/archives/475 ...
- Twitter Storm: DRPC学习
学习途径 作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明 网址: http://xumingming.sinaapp.com/756/twitte ...
- storm DRPC问题
一.配置集群storm.yaml文件,配置drpc.server. 二.开启drpc服务,storm drpc. 三.编写DrpcTopology程序.如下: <span style=" ...
- storm DRPC例子
1,DRPC原理 客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数.实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函 数调用流.每个函数调用被DRPC服务器 ...
- J storm战队成员_DOTA2J.Storm战队介绍-DOTA2MD迪士尼Major预选赛J.Storm战队介绍_牛游戏网攻略...
<DOTA2>MDL巴黎Major预选赛J.Storm战队介绍,DOTA2 MDL巴黎迪士尼Major即将开战!本次比赛是2018-2019DOTA2职业巡回赛的第四个Major赛事,J. ...
最新文章
- 1.25亿用户以后,Netflix总结的系统高可用经验
- 物料编码原则外部分配还是内部分配
- Error creating bean with name ‘redisConnectionFactory‘ defined in class path resource
- 3pc在mysql的实现_面试官:了解分布式事务?讲讲你理解的2PC和3PC原理
- 未来数据领域的珠穆朗玛峰之中文自然语言处理
- wordpress插件feed count中文版
- 别再抱怨 TensorFlow2.0 辣鸡了,会了是“真香”
- 基于OpenCV与Dlib的行人计数开源实现
- Apache POI操作Excel的坑
- bzoj 1015: [JSOI2008]星球大战starwar
- [leetcode] 11.盛最多水的容器
- uboot之uboot中环境变量
- PLC系统调试的步骤
- [Java,IDEA]连接oracle的关于oracle.jdbc.driver.OracleDriver一直驱动加载失败的原因
- cad2017单段线_CAD将线段分成多段线的方法步骤
- 战神引擎修改文件的位置
- 计算机怎么显示各磁盘容量,win7系统打开计算机不显示磁盘容量的解决方法
- 【已解决】群晖套件中心无法添加第三方源
- AES加密解密SHA1、SHA加密MD5加密
- 10月更新!又一波新功能上线,升级后的EasyOps®简直神了