Storm之同步服务DRPC
一、概述
DRPC (Distributed RPC -- remote procedure call分布式远程过程调用)是一种同步服务实现的机制,在Storm中客户端提交数据请求之后,立刻取得计算结果并返回给客户端。同时充分利用Storm的计算能力实现高密度的并行实时计算。
二、架构
- DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。
- DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。(其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)
- DRPC设计目的是为了充分利用Storm的计算能力实现高密度的并行实时计算。(Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)
解释:客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使JoinResult的Bolt实现数据的聚合, ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。
三、实现方式
3.1、通过LinearDRPCTopologyBuilder (该方法也过期,不建议使用)
该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现
package com.lxk.storm.drpc;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;public class BasicDRPCTopology {public static class ExclaimBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);collector.emit(new Values(tuple.getValue(0), input + "!"));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "result"));}}public static void main(String[] args) throws Exception {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");builder.addBolt(new ExclaimBolt(), 3);Config conf = new Config();if (args == null || args.length == 0) {LocalDRPC drpc = new LocalDRPC();LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));for (String word : new String[] { "hello", "goodbye" }) {System.err.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));}cluster.shutdown();drpc.shutdown();} else {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());}}
}
结果:
3.2、直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑
需要手动设定好开始的DRPCSpout以及结束的ReturnResults
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package com.lxk.storm.drpc;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;public class ManualDRPC {public static class ExclamationBolt extends BaseBasicBolt {@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("result", "return-info"));}@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {String arg = tuple.getString(0);Object retInfo = tuple.getValue(1);collector.emit(new Values(arg + "!!!", retInfo));}}public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();LocalDRPC drpc = new LocalDRPC();DRPCSpout spout = new DRPCSpout("exclamation", drpc);builder.setSpout("drpc", spout);builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");LocalCluster cluster = new LocalCluster();Config conf = new Config();cluster.submitTopology("exclaim", conf, builder.createTopology());System.err.println(drpc.execute("exclamation", "aaa"));System.err.println(drpc.execute("exclamation", "bbb"));}
}
结果:
四、Storm运行模式
4.1、本地模式
public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();LocalDRPC drpc = new LocalDRPC();DRPCSpout spout = new DRPCSpout("exclamation", drpc);builder.setSpout("drpc", spout);builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");LocalCluster cluster = new LocalCluster();Config conf = new Config();cluster.submitTopology("exclaim", conf, builder.createTopology());System.err.println(drpc.execute("exclamation", "aaa"));System.err.println(drpc.execute("exclamation", "bbb"));}
4.2、远程模式(集群模式)
配置
集群drpc
---------------------------------------------------
修改
$ vi conf/storm.yaml
drpc.servers:- "node03"分发配置storm.yaml文件给其他节点启动zk
主节点启动 nimbus,supervisor,drpc
从启动 supervisor
- 启动DRPC Server:bin/storm drpc &
- 通过StormSubmitter.submitTopology提交拓扑:StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
public static void main(String[] args) { DRPCClient client = new DRPCClient("node03", 3772);//通信端口 try {String result = client.execute("exclamation", "11,22");System.out.println(result);} catch (TException e) {e.printStackTrace();} catch (DRPCExecutionException e) {e.printStackTrace();}
总结:Drpc分布式远程调用帮我们
- 实现了drpcSpout用来向后发送数据,我们只需要传参即可。
- 实现了最后的JoinResult用来汇合结果,ReturnResult用来将结果返回客户端。从而达到实时的目的。
- 我们可以修改并行度,使集群的并行计算能力达到最优,主要实现并行计算。
Storm之同步服务DRPC相关推荐
- rsync同步服务实验讲解
rsync 同步服务 复制: 源所有数据 同步: 只传输变化数据 • 命令用法 – rsync [选项...] 源目录 目标目录 • 本地同步 – rsync [选项...] 本地目录1 本地目录2 ...
- Opera浏览器同步服务被黑,用户数据和存储密码泄露
8月26日晚,知名浏览器厂商Opera发布公告,表示其云同步服务遭遇黑客攻击,开启了浏览器同步功能的用户将受影响. Opera公司的一台用于存储用户同步数据的服务器被攻破,如果用户开启了跨平台数据同步 ...
- Rsync数据同步服务
Rsync数据同步服务 Rsync软件适用与unix/linux/windows等多种操作系统平台 Rsync是一款开源的,快速的,多功能的,可实现全量及增量的本地或远程数据同步备份的优秀工具,可以实 ...
- User profile synchronization service starting issues 用户配置文件同步服务启动问题
User profile synchronization service starting issues 用户配置文件同步服务启动问题 这里,尽管我删除并重建了user profile service ...
- Storm集群使用DRPC功能Version1.0.1
在Storm集群上开启DRPC功能, 基于Storm的1.0.1版本, 并且执行简单的例子测试. 1.DRPC概念 DRPC就是分布式远程过程调用. Storm里面引入DRPC主要是利用storm的实 ...
- ntpd时钟同步服务
原网址:http://blog.csdn.net/wzyzzu/article/details/46515129 ntpd时钟同步服务 目录 参考: CentOS配置时间同步NTP: http://w ...
- SDH通信网络时钟同步服务(NTP/PTP精密网络时钟源)精度分析
SDH通信网络时钟同步服务(NTP/PTP精密网络时钟源)精度分析 SDH通信网络时钟同步服务(NTP/PTP精密网络时钟源)精度分析 安徽京准公司提供原创资料!! 3) 从站时钟要从高一级设备或同一 ...
- 大数据开发平台-数据同步服务
什么是数据同步服务?顾名思义,就是在不同的系统之间同步数据.根据具体业务目的和应用场景的不同,各种数据同步服务框架的功能侧重点往往不尽相同,因而大家也会用各种大同小异的名称来称呼这类服务,比如数据传输 ...
- win2003能装mysql_win2003 安装2个mysql实例做主从同步服务配置
win2003 安装2个mysql实例做主从同步服务配置 2017年12月12日 | 萬仟网IT编程 | 我要评论 配置前的准备: 2台电脑,均安装windows2003 64位.均分三区c,d,e. ...
最新文章
- Python Logging模块实现运行的程序写入 日志
- liunx 下 sendmail 反病毒和防垃圾邮件
- html制作彩虹_制作彩虹
- 解决由于没有办理权限导致的403,权限不足
- 04-----赋值运算符
- 无法安装Visual Studio 2010 Service Pack 1
- maven中使用MySQL
- cocos 修改层级_管理节点层级和显示顺序
- 加性噪声(目前不理解)
- 目标:安全纯净互联网 软件升级报38期
- 明日之后手机正版服务器,明日之后能不能换区 明日之后手游渠道服怎么转换成网易官方服...
- 倒计时3天!云栖大会龙蜥操作系统峰会最新议程一览
- MAMP配置虚拟主机
- Alignment--本地blast使用详解1-数据库序列检索下载及比对
- 四、LockSupport与线程中断
- DMPQ2Iracac红光铱(Ir)配合物|Ir(dfbpy)2(bpy)+PF6-|Ir(dfbpy)2(pyq)+PF6-|Ir(dfbpy)2(quqo)+PF6-科研试剂
- win7 右键添加 显示\隐藏 系统文件+扩展名
- FW: 图说 WebAssembly
- Java开发工程师--面试题(珍藏版)
- 实践出真知:全网最强秒杀系统架构解密