Hadoop 图处理

1.1 实验内容

本课程将基于hadoop平台实现Giraph 分布式系统中的图处理。

1.2 课程来源

本课程基于 图灵教育 的 《Hadoop应用架构》 第5章制作,真诚感谢 图灵教育 对实验楼的授权。

为了保证可以在实验楼环境中完成本次实验,我们在原书内容基础上补充了一系列的实验指导,比如实验截图,代码注释,帮助您更好的实战。

如果您对于实验有疑惑或者建议可以随时在讨论区中提问,与同学们一起探讨。

1.3. 实验知识点

  • Hadoop文件存储
  • 块同步并行模型
  • Giraph
  • Maven编译jar

1.4 实验环境

  • Hadoop 2.6.1
  • Xfce 终端

1.5 适合人群

本课程属于中等难度级别,适合具有 Hadoop,java 基础的用户,如果对 giraph 了解能够更好的上手本课程。

1.6 代码获取

你可以通过下面命令将代码下载到实验楼环境中,作为参照对比进行学习。

$ git clone https://github.com/hadooparchitecturebook/hadoop-arch-book.git

二、实验原理

2.1 分布式系统中的图处理

为了在类似 Hadoop 之类的系统中执行这类数据处理,我们先从 MapReduce 开始。问题是 MapReduce 只能提供一层合并,这表明我们不得不像剥洋葱一样来处理图数据。对于不剥洋葱的人来说,这很新鲜,因为剥洋葱和削苹果完全不同。洋葱只有剥掉很多层之后才能看到核心。另外,洋葱会让人流眼泪。洋葱的刺激性让剥洋葱的过程更加不愉快,使用 MapReduce 进行图数据处理也是这样的。MapReduce 有时可能会让你想哭。图 2-1 中的图为一个类似于“剥洋葱”的使用案例。中点为最开始的人,而每个增加的圈都是另一个 MapReduce 任务,每层中相连的人就是这样计算的。

图 2-1:“剥洋葱”

2.2 在 MapReduce 中,图处理如同剥洋葱

意识到每一条路径都需要重新将整个图读写到磁盘,你会更加痛苦。

幸好,Google 的智者再一次决定打破这个规则,改变 MapReduce 框架中 mapper 之间不能通信的情况。这种无共享的思路对于 Hadoop 这样的分布式系统来说非常重要,因为它们依赖于同步点和故障恢复策略。聪明人是怎么解决这个问题的呢?总之他们找到了另外的方法应对同步点和恢复策略,而不会受到 mapper 的限制。

2.3 块同步并行模型

那么,我们怎样才能保持同步处理而又打破“mapper 之间不能通信”的规则呢?来自哈佛大学的英国计算机科学家 Leslie Valiant 提供了一个解决方案,他在 20 世纪 90 年代创建了块同步并行模型(Bulk Synchronous Parallel,BSP)。BSP 模型正是 Google 图处理解决方案 Pregel 的核心。

BSP 的理念非常复杂,同时又很简单,说穿了就是在一个 superstep 之内执行分布式的数据处理任务。这种分布式数据处理进程能够互相发送消息,但是直到下一个 superstep 开始之前都不能处理消息。这些 superstep 将作为我们需要的同步点发挥作用。所有的分布式处理任务都完成,并且当前 superstep 的消息也已经发送完毕后,才会到达下一个 superstep。接下来通常会有一个单线程程序,决定整个数据处理是否需要继续并进入新的 superstep。与工作任务线程相比,它需要执行的任务非常少,所以可以接受该处理在单线程中运行,而不会成为瓶颈。

2.4 BSP举例

不可否认,确定分布式数据处理的简短定义花费了数年的研究。我们将通过一个 BSP 案例来佐证和分析相关概念。科学家可以使用图处理的方法,对疾病在一个社区内的传播建立模型。本案例中,我们通过僵尸来阐明这一概念——僵尸原本是人类,被别的僵尸咬过之后变为僵尸。

让我们来制作一张新的图,并把这个图命名为僵尸咬人。如图2-4,开始时图中含有一个僵尸和一群人。数据处理开始时规则是这样的:僵尸可以咬每一个跟其共享同一个边的人。当一个点被咬之后,这个点上的人转变为僵尸,而且开始咬与其相连接的其他人。一旦咬人,僵尸周围的人就变成了僵尸,所以这个僵尸不会再继续咬周围的点。有无数的僵尸电影能够告诉我们,僵尸不咬其他僵尸。

图 2-4:僵尸咬人图的 superstep 示意图

本章将介绍两个图数据处理工具(Giraph 与 GraphX),并演示该案例的实现。但是在这之前你需要明白,BSP 并不是唯一的解决方法。我们在深入讨论 Spark 时就了解到,使用 Spark 能够在很大程度上降低 MapReduce 那种剥洋葱式方法带来的缺陷。MapReduce 使得每个洋葱分层之间的 I/O 读写操作频繁,而 Spark 大大缓解了这些问题,至少在数据能够存储在内存时效果显著。但是我们也已经说过,BSP 模型非常独特,只会在分布式数据处理进程之间发送消息。使用剥洋葱的方法则会重新发送所有的消息。

在后面两节,我们将深入讨论目前两个最常用的 Hadoop 图数据处理框架。首先要介绍的是 LinkedIn 创建,由 Facebook 用于图数据搜索的 Giraph。Giraph 是一种更为成熟稳定的系统,宣称能处理多达一万亿的边。

第二个工具是较为年轻的 GraphX,它是 Apache Spark 项目的一部分。Spark GraphX 的创建基础主要为 GraphLab(http://dato.com/products/create/open_source.html),这是一个早期的开源图数据处理项目,在 Spark 通用 DAG 执行引擎的基础上扩展而成。尽管 GraphX 仍然是一种很年轻的工具,而且不如 Giraph 灵活稳定,但它的使用方法简单,而且能够与 Spark 的所有其他组件兼容,所以 GraphX 的前景很广阔。

2.5 Giraph

Giraph 是 Google Pregel 的一种开源实现。从开始创建以来,Giraph 就被用于图数据处理。我们已经注意到这与 Spark 的 GraphX 有所不同,后者包含一种基于 Spark DAG 引擎的 Pregel API 实现。

为了简单了解一 Giraph,我们会去掉很多细节内容,并将重点放在 Giraph 项目的三个主要阶段(如图 2-5 所示)。

(1) 数据的输入和分片。

(2) 使用 BSP 进行图的批处理。

(3) 将图回写磁盘。

图 2-5:Giraph 程序的三个主要阶段

三、代码介绍

我们在这里没有讨论 Giraph 的其他细节。我们的目的是为你提供足够多的内容,让你能够决定在架构中添加哪种工具。

下面我们来详细了解一下这些阶段,还有自定义这些阶段所需要的代码,以及僵尸咬人的问题。

3.1 数据的输入和分片

MapReduce 与 Spark 都支持 InputFormat,Giraph 的输入格式 VertexInputFormat 则与之类似。两种情况的输入格式都能够对数据分片,并作为 Reader 获得一条记录或一个点。在这个实现中,我们将保留默认的分片逻辑,只重写 Reader。因此,ZombieTextVertex-InputFormat 很简单,如下所示。

public class ZombieTextVertexInputFormat extendsTextVertexInputFormat<LongWritable, Text, LongWritable> {@Overridepublic TextVertexReader createVertexReader(InputSplit split,TaskAttemptContext context)throws IOException {return new ZombieTextReader();}
}

下面我们需要一个 VertexReaderVertexReader 与普通的 MapReduce RecordReader 相比,主要差别在于后者会返还一个键与 Writable 类型的值,而前者返还一个 Vertex 对象。

那么,Vertex 对象又是什么呢?它主要由三部分组成。

  • 顶点 ID

    这种 ID 能区分图中每一个点。

  • 顶点值

    这是一个对象,包含点的信息。本例中,它将存储人或僵尸的状态,以及他或她变成僵尸的阶段。我们使用字符串“Human”表示人还未变成僵尸,使用“Zombie.2”表示在第二个 superstep 被咬。

  • 边由两部分组成:源的顶点 ID 与一个包含若干信息的对象。后者包含的信息代表边的方向与 / 或边的类型(比如:边代表亲戚关系、距离还是体重?)

我们已经了解了点,下面来看一下在源文件中如何描述点。

{vertexId}|{Type}|{comma-separated vertexId of "bitable" people}
2|Human|4,6

这是一个 ID 为 2 的点,目前代表人类,而且与点 4 和点 6 之间均相连着一条有方向的边。现在来看一下这条边以及将其转成 Vertex 对象的代码。

public class ZombieTextReader extends TextVertexReader {@Overridepublic boolean nextVertex() throws IOException, InterruptedException {return getRecordReader().nextKeyValue();}@Overridepublic Vertex<LongWritable, Text, LongWritable> getCurrentVertex()throws IOException, InterruptedException {Text line = getRecordReader().getCurrentValue();String[] majorParts = line.toString().split("\\|");LongWritable id = new LongWritable(Long.parseLong(majorParts[0]));Text value = new Text(majorParts[1]);ArrayList<Edge<LongWritable, LongWritable>> edgeIdList =new ArrayList<Edge<LongWritable, LongWritable>>();if (majorParts.length > 2) {String[] edgeIds = majorParts[2].split(",");for (String edgeId: edgeIds) {DefaultEdge<LongWritable, LongWritable> edge =new DefaultEdge<LongWritable, LongWritable>();LongWritable longEdgeId = new LongWritable(Long.parseLong(edgeId));edge.setTargetVertexId(longEdgeId);edge.setValue(longEdgeId); // dummy valueedgeIdList.add(edge);}}Vertex<LongWritable, Text, LongWritable> vertex = getConf().createVertex();vertex.initialize(id, value, edgeIdList);return vertex;}
}

这段代码包含的内容很多,下面进行分解。

  • VertexReader 继承了 TextVertexReader,因此能够一行行地读取文本文件。注意,想读取任何其他类型的 Hadoop 文件,你需要改变父类的 Reader 类名。

  • nextVertex() 是一种很有趣的方法。查看父类的方法,你会发现这其实使用了通用的 RecordReader 来尝试读取下一行文件,并返回剩下的文件。

  • 使用方法 getCurrentVertex() 解析文件并创建、填充一个 Vertex 对象。

所以,在使用这种方法时,可以将得到的 Vertex 对象分区存储到集群中不同的分布式 Worker 中。默认的分区逻辑为基本的散列分区,你也可以对其修改。这已经超出了本例的讲解范围,我们只是在此提醒你注意控制分区。如果了解图的模式,你应该能分辨出这种情况:图分布到了较少的分布式任务中,网络使用可能不充分,性能也会有所降低。

一旦数据加载到内存或者存到磁盘中(并开启 Giraph 中新增的 spill-to-disk 功能),我们就能够开始使用 BSP 处理数据了,如 5.4.2 节所述。

在开始下一节之前,你需要注意一点:这只是一个 VertexInputFormat 的例子。Giraph 中存在更多高级的选择,如通过不同的 Reader 在点和边上读取数据,以及更为高级的分区策略,但是这些都不属于本书的讨论范围。

3.2 使用BSP批处理图

在 Giraph 中,对于新手来说 BSP 执行模式是最难懂的。为了让这个概念更容易理解,我们将重点放在三个计算阶段:顶点、主线程与工作线程。后面将呈现这三个阶段的代码,我们来看一下图 5-7。

图 3-7:BSP 执行模式的三个计算阶段:顶点计算阶段、主线程计算阶段、工作线程计算阶段

从图中可以看到,每一个 BSP 都从一个主线程计算阶段开始,然后是每个分布式 JVM 的工作线程计算阶段,最后是相应的 JVM 本地内存或本地磁盘中每个点的顶点计算阶段。

这些顶点计算可能会处理消息,然后这些消息被发送到接收点,但是直到下一个 BSP 通过,接收点才能接收到这些消息。

我们从最简单的计算阶段(即主线程计算)开始。

public class ZombieMasterCompute extends DefaultMasterCompute {@Overridepublic void compute() {LongWritable zombies = getAggregatedValue("zombie.count");System.out.println("Superstep "+String.valueOf(getSuperstep())+" - zombies:" + zombies);System.out.println("Superstep "+String.valueOf(getSuperstep())+" - getTotalNumEdges():" + getTotalNumEdges());System.out.println("Superstep "+String.valueOf(getSuperstep())+" - getTotalNumVertices():" +getTotalNumVertices());}@Overridepublic void initialize()throws InstantiationException, IllegalAccessException {registerAggregator("zombie.count", LongSumAggregator.class);}
}

下面我们在 ZombieMasterCompute 类中深入研究这两个方法。我们先来看一下 initialize() 方法——在真正开始之前可以这么称呼它。重要的是我们在这里注册了一个 Aggregator 类。

Aggregator 类与 MapReduce 中的高级计数器相似,但是更像 Spark 中的 accumulator。Giraph 中存在很多 aggregator可供选择,如后面列表所示。用户也可以创建自定义的 aggregator

以下为一些 Giraph aggregator 的例子。

  • 求和(Sum

  • 求平均(Avg

  • 求最大值(Max

  • 求最小值(Min

  • 文本方式追加(TextAppend

  • 和 / 或布尔运算(And/Or

ZombieMasterCompute 中的第二个方法是 compute(),这种方法会在每个 BSP 开始的时候运行。在此我们只打印协助调试过程的信息。

下面讲一下后面的一些代码,即用于工作线程计算阶段的 ZombieWorkerContext。这些代码将在应用与每个 superstep 的前后执行,并且可以用于一些高级操作。比如,在一个 superstep 开始时放置合并的数据,使其能够访问顶点计算阶段。在这里,我们只使用 System.out.println(),所以我们能看到在数据处理过程中不同方法被调用的时间。

public class ZombieWorkerContext extends WorkerContext {@Overridepublic void preApplication() {System.out.println("PreApplication # of Zombies: " +getAggregatedValue("zombie.count"));}@Overridepublic void postApplication() {System.out.println("PostApplication # of Zombies: " +getAggregatedValue("zombie.count"));}@Overridepublic void preSuperstep() {System.out.println("PreSuperstep # of Zombies: " +getAggregatedValue("zombie.count"));}@Overridepublic void postSuperstep() {System.out.println("PostSuperstep # of Zombies: " +getAggregatedValue("zombie.count"));}
}

最后是最复杂的顶点计算阶段。

public class ZombieComputationextends BasicComputation<LongWritable,Text, LongWritable, LongWritable> {private static final Logger LOG = Logger.getLogger(ZombieComputation.class);Text zombieText = new Text("Zombie");LongWritable longIncrement = new LongWritable(1);@Overridepublic void compute(Vertex<LongWritable, Text, LongWritable> vertex,Iterable<LongWritable> messages) throws IOException {Context context = getContext();long superstep = getSuperstep();if (superstep == 0) {if (vertex.getValue().toString().equals("Zombie")) {zombieText.set("Zombie." + superstep);vertex.setValue(zombieText);LongWritable newMessage = new LongWritable();newMessage.set(superstep+1);aggregate("zombie.count",longIncrement );for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {this.sendMessage(edge.getTargetVertexId(), newMessage);}}} else {if (vertex.getValue().toString().equals("Human")) {Iterator<LongWritable> it = messages.iterator();if (it.hasNext()) {zombieText.set("Zombie." + superstep);vertex.setValue(zombieText);aggregate("zombie.count",longIncrement );LongWritable newMessage = new LongWritable();newMessage.set(superstep+1);for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {this.sendMessage(edge.getTargetVertexId(), newMessage);}} else {vertex.voteToHalt();}} else {vertex.voteToHalt();}}}
}

本代码中存在许多具体的逻辑,我们会深入地研究每个操作,但是首先要明白 compute() 方法如何调用。每个点均会调用 compute() 方法,而且在最后一个 superstep 结束时,所有消息的迭代器会将消息发送到点上。

逻辑流程如下所示。

  • 如果这是第一个 superstep,而且我是一个僵尸,那么我周围的每一个人都要被咬。

  • 如果在第一个 superstep 之后,而且我是一个接受咬人消息的人,那么我自己会变成僵尸,还会去咬我周围的每一个人。

  • 如果我是一个僵尸,而且被咬了,那么不会有任何改变。

同时需要注意的是,我们提出在两个位置停止:僵尸被咬或者人类未被咬。这样做的原因在于有以下两种最终状态。

  • 我们可以让每个人都被咬然后变成僵尸,而在那一点上我们需要停止数据处理。

  • 我们可以使僵尸与人类之间没有直接相连的边。所以这些人类永远都不会变成僵尸。

3.3 将图回写磁盘

图中已经产生了很多僵尸,现在应该将结果写回磁盘了。我们使用 VertexOutputFormat 完成写回数据。这里不会涉及细节内容,只要注意这与 InputFormat 相反即可。

public class ZombieTextVertexOutputFormatextends TextVertexOutputFormat<LongWritable, Text, LongWritable> {@Overridepublic TextVertexWriter createVertexWriter(TaskAttemptContext context)throws IOException, InterruptedException {return new ZombieRecordTextWriter();}public class ZombieRecordTextWriter extends TextVertexWriter {Text newKey = new Text();Text newValue = new Text();public void writeVertex(Vertex<LongWritable, Text, LongWritable> vertex)throws IOException, InterruptedException {Iterable<Edge<LongWritable, LongWritable>> edges = vertex.getEdges();StringBuilder strBuilder = new StringBuilder();boolean isFirst = true;for (Edge<LongWritable, LongWritable> edge : edges) {if (isFirst) {isFirst = false;} else {strBuilder.append(",");}strBuilder.append(edge.getValue());}newKey.set(vertex.getId().get() + "|" + vertex.getValue() + "|"+ strBuilder.toString());getRecordWriter().write(newKey, newValue);}}
}

3.4 整体流程控制

现在,与 MapReduce 中的情况类似,我们需要将一切都设定好,而且在主函数中进行配置。代码如下。

public class ZombieBiteJob implements Tool {private static final Logger LOG = Logger.getLogger(ZombieBiteJob.class);private Configuration conf;@Overridepublic void setConf(Configuration conf) {this.conf = conf;}@Overridepublic Configuration getConf() {return conf;}@Overridepublic int run(String[] args) throws Exception {if (args.length != 3) {throw new IllegalArgumentException("Syntax error: Must have 3 arguments " +" <numbersOfWorkers> <inputLocaiton> <outputLocation>");}int numberOfWorkers = Integer.parseInt(args[0]);String inputLocation = args[1];String outputLocation = args[2];GiraphJob job = new GiraphJob(getConf(), getClass().getName());GiraphConfiguration gconf = job.getConfiguration();gconf.setWorkerConfiguration(numberOfWorkers, numberOfWorkers, 100.0f);GiraphFileInputFormat.addVertexInputPath(gconf,new Path(inputLocation));FileOutputFormat.setOutputPath(job.getInternalJob(),new Path(outputLocation));gconf.setComputationClass(ZombieComputation.class);gconf.setMasterComputeClass(ZombieMasterCompute.class);gconf.setVertexInputFormatClass(ZombieTextVertexInputFormat.class);gconf.setVertexOutputFormatClass(ZombieTextVertexOutputFormat.class);gconf.setWorkerContextClass(ZombieWorkerContext.class);boolean verbose = true;if (job.run(verbose)) {return 0;} else {return -1;}}public static void main(String[] args) throws Exception {int ret = ToolRunner.run(new ZombieBiteJob(), args);if (ret == 0) {System.out.println("Ended Good");} else {System.out.println("Ended with Failure");}System.exit(ret);}
}

3.5 何时选用Giraph

Giraph 的功能非常强大,但是就像我们从概念和需要的代码中看到的,它会挑战你的心理承受能力。不过,如果需要进行图数据处理,保证确切的服务等级协议(Service-Level Agreement,SLA),而且需要成熟的解决方案,那么可以选择 Giraph。

但是要注意,目前并不是所有主流 Hadoop 厂商发行的版本都支持 Giraph。这并不是说 Giraph 不能用于你所选择的发行版本,只是可能需要与 Hadoop 厂商进一步沟通,看一下是否能获得这类支持。你至少可以为自己的发行版本编译 Giraph 对应的 Jar 包。

四、实验步骤

本次实验是在 Hadoop 2.6.1 下进行,请确保 Hadoop 安装没有问题。

本课程的实验环境已配置该版本的 Hadoop 。请按照以下步骤启动。

  1. 使用 su -l hadoop 命令切换到 hadoop 用户。
  2. 使用 start-dfs.sh 和 start-yarn.sh 命令打开服务。

请注意:

  1. 本课程的实验环境中,Hadoop 的安装目录位于 /opt/hadoop-2.6.1
  2. hadoop 用户的密码为 hadoop 。
  3. 进行下一步操作之前,请确认使用 jps 命令可以查看到 NameNode、SecondaryNameNode、NodeManager、ResourceManager 和 DataNode 已启动。
  4. 初次使用 Hadoop 时,需要对 NameNode 进行格式化 hadoop namenode -format

4.1 导入 maven 项目

双击桌面上的图标打开 eclipse,点击菜单栏的 File -> Import 。

选中 Import 对话框中的 Maven -> Existing Maven Projects 选项,再点击 Next 按钮。

在终端中切换到 hadoop 用户,修改文件名 hadoop-arch-book 为 hadoop-arch-book-master。

hadoop@907b3dc56edb:/home/shiyanlou$ sudo mv hadoop-arch-book/ hadoop-arch-book-master

使用 chmod 命令修改项目目录和文件的权限,避免导入项目时提示权限不够的错误。

hadoop@907b3dc56edb:/home/shiyanlou$ sudo chmod 777 -R hadoop-arch-book-master/

回到 eclipse,点击 Import Maven Projects 对话框中的 Browse 按钮。在弹出的文件选择对话框中选中 /home/shiyanlou/hadoop-arch-book-master/ch05-graph-processing/giraph 目录,点击 确定 按钮。等待 Analysing 过程结束后,点击 Finish 按钮完成项目导入。

项目导入后,由于通过 pom.xml 文件下载的依赖项较多,需要一定的时间。请耐心等待完成。

以下为 pom.xml 文件中的内容:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.hadooparchitecturebook</groupId><artifactId>examples.giraph</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>giraph.1.0.0.zombie.bite</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!-- cdh5 versions --><cdh5.version>5.3.1</cdh5.version><cdh5.hadoop.version>2.5.0-cdh${cdh5.version}</cdh5.hadoop.version><giraph.version>1.1.0-hadoop2</giraph.version></properties><profiles><profile><id>cdh5</id><activation><activeByDefault>true</activeByDefault></activation><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><cdh.version>${cdh5.version}</cdh.version><cdh.hadoop.version>${cdh5.hadoop.version}</cdh.hadoop.version></properties></profile></profiles><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${cdh.hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${cdh.hadoop.version}</version></dependency><dependency><groupId>org.apache.giraph</groupId><artifactId>giraph-core</artifactId><version>${giraph.version}</version></dependency><dependency><groupId>org.apache.giraph</groupId><artifactId>giraph-examples</artifactId><version>${giraph.version}</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-buffer</artifactId><version>4.0.26.Final</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-handler</artifactId><version>4.0.26.Final</version></dependency><dependency><groupId>org.jodah</groupId><artifactId>typetools</artifactId><version>0.2.1</version></dependency><dependency><groupId>com.facebook.nifty</groupId><artifactId>nifty-client</artifactId><version>0.14.1</version></dependency><dependency><groupId>com.yammer.metrics</groupId><artifactId>metrics-core</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.json</groupId><artifactId>json</artifactId><version>20140107</version></dependency><dependency><groupId>it.unimi.dsi</groupId><artifactId>fastutil</artifactId><version>6.6.3</version></dependency><dependency><groupId>net.iharder</groupId><artifactId>base64</artifactId><version>2.3.8</version></dependency></dependencies><repositories><repository><id>maven-hadoop</id><name>Hadoop Releases</name><url>https://repository.cloudera.com/content/repositories/releases/</url></repository><repository><id>maven</id><name>Maven repository</name><url>http://repo1.maven.org/maven2</url></repository></repositories><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.2</version><configuration><shadedArtifactAttached>false</shadedArtifactAttached><outputFile>target/GiraphCode.jar</outputFile><artifactSet><includes><include>*:*</include><include>org.apache.giraph:giraph-core</include><include>org.apache.giraph:giraph-examples</include></includes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.spark*</exclude><exclude>org.apache.hadoop*</exclude><exclude>org.spark*</exclude></excludes></artifactSet><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>

4.2 项目编译

依赖加载完成后,在左侧的文件目录窗口中找到 pom.xml 文件。右键点击该文件,在菜单中选择 Run As -> Maven install 选项。

注意:项目可能会提示含有错误,可以忽略这些错误直接进行编译。

等待依赖下载。

等待 Build 过程完成,生成 jar 包。

在项目的 target 目录中,右键点击 GiraphCode.jar 包并选择 Properties 选项,可以查看其所在的位置。

如果编译未成功或后续步骤有错误,则建议直接使用本实验打好的 jar 包。下载方式为:

wget http://labfile.oss.aliyuncs.com/courses/781/GiraphCode.jar

使用sudo mkdir /App命令创建目录后,再使用 mv 命令移动 GiraphCode.jar 至 App 目录下。

hadoop@907b3dc56edb:/home/shiyanlou$ sudo mv hadoop-arch-book-master/ch05-graph-processing/giraph/target/GiraphCode.jar  /App/

在 /App 目录下创建和准备数据 input.txt:

2|Human|4,6
1|Zombie|2,3,8,9,10
3|Human|4,5,7
4|Human|2,3
5|Human|3
6|Human|2
7|Human|3
8|Human|9,11
9|Human|8
10|Human|
11|Human|8

用 vi 命令编辑 input.txt。

根据 input.txt 这个数据集,我们可以形象的用下图表示:

执行程序:

hadoop@907b3dc56edb:/App$ /opt/hadoop-2.6.1/bin/hadoop jar GiraphCode.jar com.hadooparchitecturebook.zombie.giraph.ZombieBiteJob -Dmapred.job.tracker=yarn 2 input.txt out

注意: 该过程易出错,如有问题请参照下一小节中的描述进行解决。同时,运算过程耗时较长,请耐心等待并注意延长实验剩余时间。

查看输出结果:

4.3 可能遇到的问题

4.3.1 内存不足

在执行上述 GiraphCode.jar 程序的时候有可能会遇到 虚拟内存不足,程序被迫中断。

解决办法如下:

停止 Hadoop 进程。

hadoop@907b3dc56edb:/opt/hadoop-2.6.1/bin$ stop-all.sh

调整 yarn-site.xml 参数。

hadoop@907b3dc56edb:/opt/hadoop-2.6.1/etc/hadoop$ sudo vi yarn-site.xml

添加以下参数:

<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>10000</value>
</property><property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>3000</value>
</property><property>
<name>mapreduce.reduce.memory.mb</name>
<value>3000</value>
</property>

重启 Hadoop 进程。

hadoop@907b3dc56edb:/opt/hadoop-2.6.1/bin$ start-all.sh

4.3.2 Giraph程序报错

1) 如果出现以下报错信息:

Exception in thread "main" java.lang.IllegalArgumentException: checkLocalJobRunnerConfiguration: When using LocalJobRunner, must have only one worker since only 1 task at a time!

请将任务参数中的 2 修改为 1 即可。例如:

hadoop jar GiraphCode.jar com.hadooparchitecturebook.zombie.giraph.ZombieBiteJob -Dmapred.job.tracker=yarn 1 input.txt out

2) 如果出现以下报错信息:

Exception in thread "main" java.lang.IllegalArgumentException: checkLocalJobRunnerConfiguration: When using LocalJobRunner, you cannot run in split master / worker mode since there is only 1 task at a time!

则可以在 /examples.giraph/src/main/java/com/hadooparchitecturebook/zombie/giraph/ZombieBiteJob.java 文件的第 41 行添加以下代码:

gconf.setBoolean("giraph.SplitMasterWorker", false);

保存后,重新编译即可(Run as -> Maven install)。

3) 如果出现 /App 目录下的 input.txt 无法被读取的情况,即出现以下错误信息时:

Exception in thread "main" org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.SafeModeException): Cannot create directory /tmp/hadoop-yarn/staging/hadoop/.staging. Name node is in safe mode.

请通过下面的命令来关闭 NamoNode 的安全模式。

hdfs dfsadmin -safemode leave

五、实验总结

本课程首先讲解分布式系统中的图处理理论,接着讲解 MapReduce 模型处理图处理的不足,及块同步并行模型,最后深入讲解BSPgiraph,并基于 hadoop 平台实现 Giraph 分布式系统中的图处理,对深入学习图处理有很大帮助。

六、扩展阅读

本课程基于 图灵教育 的 《hadoop应用架构》 第5章制作,再次感谢 图灵教育 对实验楼的授权。

如果学完本课程,对书籍其他内容感兴趣欢迎点击以下链接购买书籍:

  • 立即购买《hadoop应用架构》

Hadoop 图处理相关推荐

  1. 【大数据入门】Hadoop技术原理与应用之基于Hadoop的数据仓库Hive

    基于Hadoop的数据仓库Hive 文章目录 基于Hadoop的数据仓库Hive @[toc] 6.1 概述 6.1.1 数据仓库概念 6.1.2 传统数据仓库面临的挑战 6.1.3 Hive简介 6 ...

  2. 大数据学习笔记第1课 Hadoop基础理论与集群搭建

    大数据学习笔记第1课 Hadoop基础理论与集群搭建 一.环境准备 二.下载JDK 三.安装JDK 四.下载hadoop 五.安装hadoop集群 六.打通3台服务器的免密登录 七.hadoop集群配 ...

  3. 40个大数据学习资源,个个是干货,最后7个太给力

    "数据是驱动商业向前发展的核心,更是人类社会的未来." 尽管将马云的这句话断章取义地拿出来说会显得唐突,但这话所表达的意思却显而易见.今天给大家分享的这40个教程,送给对未来抱有远 ...

  4. scala spark 数据对比_IT大牛耗时三个月总结出大数据领域学习路线,网友评论:炸锅了...

    大数据不是某个专业或一门编程语言,实际上它是一系列技术的组合运用. 有人通过下方的等式给出了大数据的定义. 大数据 = 编程技巧 + 数据结构和算法 + 分析能力 + 数据库技能 + 数学 + 机器学 ...

  5. 面试数据库知识点总结

    文章目录 1 数据库相关知识总结 1.2 MySQL数据库 1.2.1 索引有哪些? 1.2.2 MyISAM 和 InnoDB有什么不同? 1.2.3 MySQL 有哪些锁? 1.2.4 有哪些隔离 ...

  6. hortonworks_具有在IBM POWER8上运行的Hortonworks Data Platform(HDP)的SAS软件

    Hadoop的SAS / ACCESS接口 Hadoop的SAS / ACCESS接口提供了访问SAS本机中Hadoop中存储的数据集的功能. 通过SAS / ACCESS到Hadoop: LIBNA ...

  7. Spark 应用开发程序

    目 录 1 绪论 1 1.1 项目背景 1 1.1.1 Hadoop的发展 1 1.1.2 Spark的兴起 2 1.2 项目意义 2 2 Hadoop云计算环境的搭建 3 2.1 准备工作 3 2. ...

  8. 数据开发岗面试绝地求生

    如果说,学习知识就像遍历一个巨大的图一样,那么必然有深度优先遍历和广度优先遍历,本文大概属于广度优先遍历,相关知识点点到即止.文末的链接里的文章是更为详细的内容,代表着这些知识点的子结点. 我主要找J ...

  9. 一卡通系统服务器配置,一卡通服务器配置要求

    一卡通服务器配置要求 内容精选 换一换 弹性云服务器可以根据业务需求和伸缩策略,自动调整计算资源.您可以根据自身需要自定义服务器配置,灵活地选择所需的内存.CPU.带宽等配置,帮助您打造可靠.安全.灵 ...

最新文章

  1. OpenCV+python:膨胀和腐蚀
  2. Spark Stream整合flum和kafka,数据存储在HBASE上,分析后存入数据库
  3. springboot中下面哪一个作为jpa默认实现_35个超高频SpringBoot知识点(附解析),别怪我没给你机会收藏...
  4. 线性共轭梯度法python_python实现的共轭梯度法
  5. Java魔法堂:URI、URL(含URL Protocol Handler)和URN
  6. CSS3 Transform、Transition和Animation属性总结
  7. docker初级操作
  8. python数据结构-串
  9. BZOJ4011:[HNOI2015]落忆枫音(DP,拓扑排序)
  10. 【转】Linux 的启动流程
  11. DLL load failed: 找不到指定模块\Failed to load the native TensorFlow runtime解决方法
  12. 如何去除暴风影音2009的广告
  13. linux 核显驱动程序,在Ubuntu系统上安装英特尔核显驱动安装器的方法
  14. 微信无法打开xlsx文件_微信打不开文件怎么办显示excel丢失或损坏
  15. 阿里天池:小样本商标检测(baseline0.50)
  16. 输入一个字符串,内有数字和非数字字符,例如: A123x456 17960? 302tab5876 将其中连续的数字作为一个整数,依次存放到一数组a中。例如,123存放在a[0],456放在a[1]中
  17. .net core word转pdf_Enolsoft PDF to Word with OCR for Mac(PDF转Word软件)
  18. google新搜索网站
  19. 西门子S7通信协议中TSAP的确认
  20. 【独立篇】React UI组件库

热门文章

  1. 使用 Acrobat 给 PDF 重新编排页码
  2. gnuplot安装,及error:terminal type set to 'unknown'的解决
  3. 绝地求生 无限复活服务器,绝地求生无限复活玩法说明 绝地求生无限复活分数计算规则/吃鸡攻略...
  4. php mysql安装包_apache php mysql集成开发环境
  5. 有人用Python写了个自动亏钱脚本,还能微信实时通知!
  6. mysql版本升级对数据的影响_MySQL数据库升级的一些坑
  7. Vue+Element之SpingBoot书本管理系统
  8. Oracle语法:merge into using
  9. 世界四大会计行控制中国审计业:中国大企业和银行已无商业秘密
  10. 再出售44万股股份,英伟达CEO黄仁勋打着什么算盘?