PS: HDFS对于MapReduce来说,HDFS就是一个就是一个客户端。

PS: 离线就是 写sql,sparkh还是写sql

1. MAPREDUCE原理篇(1)

Mapreduce是一个分布式运算程序的编程框架是用户开发“基于hadoop的数据分析应用”的核心框架;

Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上;

PS: 上图为mapreduce的设计思想,以wordcount为例子,首先把一个任务分我 两个阶段。 第一map阶段负责业务逻辑,第二数据的合并(比如按a-h合并,然后第二个安装i-k)。中间还有还多负责的业务管理逻辑,由mr application master来管理,协调调度。PS: MapReduce程序都是依赖于HDFS,以流的形式读取

----------------------------------------------------------------------------------------------------------------

1.1 为什么要MAPREDUCE

(1)海量数据在单机上处理因为硬件资源限制,无法胜任

(2)而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度

(3)引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理

-----------------------------------------------------------

设想一个海量数据场景下的wordcount需求:

单机版:内存受限,磁盘受限,运算能力受限

分布式:

1、文件分布式存储(HDFS)

2、运算逻辑需要至少分成2个阶段(一个阶段独立并发,一个阶段汇聚)

3、运算程序如何分发

4、程序如何分配运算任务(切片)

5、两阶段的程序如何启动?如何协调?

6、整个程序运行过程中的监控?容错?重试?

  可见在程序由单机版扩成分布式时,会引入大量的复杂工作。为了提高开发效率,可以将分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。

而mapreduce就是这样一个分布式程序的通用框架,其应对以上问题的整体结构如下:

1、MRAppMaster(mapreduce application master)

2、MapTask

3、ReduceTask

1.2 MAPREDUCE框架结构及核心运行机制

1.2.1 结构

一个完整的mapreduce程序在分布式运行时有三类实例进程:

1、MRAppMaster:负责整个程序的过程调度及状态协调

2、mapTask:负责map阶段的整个数据处理流程

3、ReduceTask:负责reduce阶段的整个数据处理流程

-----------------------------------------------------------------------------

PS:写程序代码

1.配置项目,common和hdfs如之前配置

2.配置mapreduce

3.配置yarn。PS:

yarn
Apache Hadoop YARN (Yet Another Resource Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。------------------YARN的基本思想是将JobTracker的两个主要功能(资源管理和作业调度/监控)分离,主要方法是创建一个全局的ResourceManager(RM)和若干个针对应用程序的ApplicationMaster(AM)。这里的应用程序是指传统的MapReduce作业或作业的DAG(有向无环图)。

-------------------------------------------------------------------

package cn.itcast.bigdata.mr.wcdemo;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*** KEYIN: 默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,* 但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable* * VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text* * KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String,同上,用Text* VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable* * @author**/public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{/*** map阶段的业务逻辑就写在自定义的map()方法中* maptask会对每一行输入数据调用一次我们自定义的map()方法*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//将maptask传给我们的文本内容先转换成StringString line = value.toString();//根据空格将这一行切分成单词String[] words = line.split(" ");//将单词输出为<单词,1>for(String word:words){//将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce taskcontext.write(new Text(word), new IntWritable(1));}}}

package cn.itcast.bigdata.mr.wcdemo;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*** KEYIN, VALUEIN 对应  mapper输出的KEYOUT,VALUEOUT类型对应* * KEYOUT, VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型* KEYOUT是单词* VLAUEOUT是总次数* @author**/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{/* 到这里的时候,数据已经按照一定的特性(规律如 统计a-h)  分组好了,所以入参的时候是一个同样的key,values是Itrater专门用来迭代累加;   然后再执行第二组 hello ,1,...指导统计完* <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>* <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>* <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>* 入参key,是一组相同单词kv对的key*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int count=0;/*Iterator<IntWritable> iterator = values.iterator();while(iterator.hasNext()){count += iterator.next().get();}*/for(IntWritable value:values){count += value.get();}context.write(key, new IntWritable(count));}}

package cn.itcast.bigdata.mr.wcdemo;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 相当于一个yarn集群的客户端* 需要在此封装我们的mr程序的相关运行参数,指定jar包* 最后提交给yarn* @author**/
public class WordcountDriver {public static void main(String[] args) throws Exception {if (args == null || args.length == 0) {args = new String[2];args[0] = "hdfs://192.168.8.10:9000/wordcount/input/wordcount.txt";args[1] = "hdfs://192.168.8.10:9000/wordcount/output";}Configuration conf = new Configuration();//设置的没有用!  ??????      因为在linux下允许,所以这些注释了
//        conf.set("HADOOP_USER_NAME", "hadoop");
//        conf.set("dfs.permissions.enabled", "false");// ---因为linux和windows操作系统的环境变量不同,所以不能直接运行在windows上,现在打包到linux系统上,那么,就不用注释了(因为集群上已经配置过了)/*conf.set("mapreduce.framework.name", "yarn");conf.set("yarn.resoucemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);/*job.setJar("/home/hadoop/wc.jar");*///指定本程序的jar包所在的本地路径job.setJarByClass(WordcountDriver.class);//指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(WordcountMapper.class);job.setReducerClass(WordcountReducer.class);//指定mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));//指定job的输出结果所在目录FileOutputFormat.setOutputPath(job, new Path(args[1]));//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行/*job.submit();*/boolean res = job.waitForCompletion(true);System.exit(res?0:1);}
}

PS:打包文件

hadoop jar wordcount.jar cn.itcast.bigdata.mr.wcdemo.WordcountDriver /wordcount/input /wordcount/output1

PS: 红色命令  其实就是 调用 jar命令 ,只是把关联的jar链接上了

PS:在界面端也有一个专门界面。

PS:这个文件可以part-r-0000可以自定义文件的个数,现在把所有的文件都放在part-r-0000中。

PS:程序先启动的时候先启动Mr application master,然后再启动map task 、reducetask

------------------------------------------------------------------------------------WordCount程序运行流程分析------------------------------------------------

PS:上图应该这么看,首先待处理文件。当submit()的时候,会看到文件的规模,进行map()划分(启动maptask的多少,这个文件是job.split),然后job.split、wc.jar、job。xml提交到云端的YARN进行管理。然后yarn启动mr appmaster,启动相应的Node Manager进行管理。   执行map()任务,他是通过InputFormat组件以流的形式读入一定范围的数据进来,每一个任务执行完了,接着通过另一个组件outputController会生成分区且排序的结果文件,再进入reducetask()任务,  每传过来maptask可以根据分区号进行分区,也就是输出的part-r-0000X的变化的值。   在mapreduce线程中,执行业务逻辑,通过outputFormat输出,最后生成文件(读取多个的文件,输出会进行文件)。

PS:  上图为输出的应用结果,   key为单词字符,value是每个单词的个数

---------------------------------

-----------------------------------------MapReduce 应用编译执行

1.  PS:start-dfs.sh  //启动应用start-yarn.sh //启动yarn管理器

--------------------------------------------另一个程序练习

/*PS:  打印什么内容是 由key   和  输出  value对象的toString方法所决定的。
*/

PS:文件在hdfs上,用jar包进行编译执行。

--------------------------切片和并行度的概念------------

1.3.1 mapTask并行度的决定机制

一个job的map阶段并行度由客户端在提交job时决定

而客户端对map阶段并行度的规划的基本逻辑为:

将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理

Ps:由HDFS读取到 日志文件后,根据文件block的大小进行并行度的确定。通常大文件直接有文件块决定,决定时机是在 waitForCompletion这个函数执行时,会生成一个分块的文件。然后再mapreduce执行时,MRAppMaster就可以读取到信息进行调度操作了。


PS:上图为提交数据的完成的信息,里面有包含分块的信息。所有配置参数,这些参数可以让MRAppMaster读取数据

PS :客户端提交MR程序job的流程

-------------------------------------------------------------------------------------

按照不同号码分区

package cn.itcast.bigdata.mr.provinceflow;import java.util.HashMap;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;/*** K2  V2  对应的是map输出kv的类型,   * @author* 返回的是分区地址的hashcode*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean>{public static HashMap<String, Integer> proviceDict = new HashMap<String, Integer>();static{proviceDict.put("136", 0);proviceDict.put("137", 1);proviceDict.put("138", 2);proviceDict.put("139", 3);}@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {String prefix = key.toString().substring(0, 3);Integer provinceId = proviceDict.get(prefix);return provinceId==null?4:provinceId;}
}

package cn.itcast.bigdata.mr.provinceflow;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowCount {static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();    //将一行内容转成stringString[] fields = line.split("\t");    //切分字段String phoneNbr = fields[1];    //取出手机号long upFlow = Long.parseLong(fields[fields.length-3]);    //取出上行流量下行流量long dFlow = Long.parseLong(fields[fields.length-2]);context.write(new Text(phoneNbr), new FlowBean(upFlow, dFlow));}}static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{//<183323,bean1><183323,bean2><183323,bean3><183323,bean4>.......
        @Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {long sum_upFlow = 0;long sum_dFlow = 0;//遍历所有bean,将其中的上行流量,下行流量分别累加for(FlowBean bean: values){sum_upFlow += bean.getUpFlow();sum_dFlow += bean.getdFlow();}FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow);context.write(key, resultBean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/*conf.set("mapreduce.framework.name", "yarn");conf.set("yarn.resoucemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);/*job.setJar("/home/hadoop/wc.jar");*///指定本程序的jar包所在的本地路径job.setJarByClass(FlowCount.class);//指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);//指定我们自定义的数据分区器job.setPartitionerClass(ProvincePartitioner.class);//同时指定相应“分区”数量的reducetaskjob.setNumReduceTasks(5);              //数量必须和分区相对应,可以为1  可以大于分区个数。单数少于1到分区数会报错。//指定mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));//指定job的输出结果所在目录FileOutputFormat.setOutputPath(job, new Path(args[1]));//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行/*job.submit();*/boolean res = job.waitForCompletion(true);System.exit(res?0:1);}}

----------------------------作业:对以下数据进行排序,按照流量排序,这个是在之前生成数据的基础上做的

PS:不要想着程序一下子全部完成,也可以分步骤解决问题、

package cn.itcast.bigdata.mr.flowsum;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable<FlowBean>{private long upFlow;private long dFlow;private long sumFlow;//反序列化时,需要反射调用空参构造函数,所以要显示定义一个public FlowBean(){}public FlowBean(long upFlow, long dFlow) {this.upFlow = upFlow;this.dFlow = dFlow;this.sumFlow = upFlow + dFlow;}public void set(long upFlow, long dFlow) {this.upFlow = upFlow;this.dFlow = dFlow;this.sumFlow = upFlow + dFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getdFlow() {return dFlow;}public void setdFlow(long dFlow) {this.dFlow = dFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}/*** 序列化方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(dFlow);out.writeLong(sumFlow);}/*** 反序列化方法* 注意:反序列化的顺序跟序列化的顺序完全一致*/@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();dFlow = in.readLong();sumFlow = in.readLong();}@Overridepublic String toString() {    return upFlow + "\t" + dFlow + "\t" + sumFlow;}@Overridepublic int compareTo(FlowBean o) {return this.sumFlow>o.getSumFlow()?-1:1;    //从大到小, 当前对象和要比较的对象比, 如果当前对象大, 返回-1, 交换他们的位置(自己的理解)
    }}

package cn.itcast.bigdata.mr.flowsum;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import cn.itcast.bigdata.mr.flowsum.FlowCount.FlowCountMapper;
import cn.itcast.bigdata.mr.flowsum.FlowCount.FlowCountReducer;/*** 13480253104 180 180 360 13502468823 7335 110349 117684 13560436666 1116 954* 2070* * @author* */
public class FlowCountSort {static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {FlowBean bean = new FlowBean();//!!!虽然这是一个对象,但是是序列化,所以不用担心数据操作Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 拿到的是上一个统计程序的输出结果,已经是各手机号的总流量信息String line = value.toString();String[] fields = line.split("\t");String phoneNbr = fields[0];long upFlow = Long.parseLong(fields[1]);long dFlow = Long.parseLong(fields[2]);bean.set(upFlow, dFlow);v.set(phoneNbr);context.write(bean, v);}}/*** 根据key来掉, 传过来的是对象, 每个对象都是不一样的, 所以每个对象都调用一次reduce方法* @author: 张政* @date: 2016年4月11日 下午7:08:18* @package_name: day07.sample*/static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {// <bean(),phonenbr>
        @Overrideprotected void reduce(FlowBean bean, Iterable<Text> values, Context context) throws IOException, InterruptedException {context.write(values.iterator().next(), bean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/*conf.set("mapreduce.framework.name", "yarn");conf.set("yarn.resoucemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);/*job.setJar("/home/hadoop/wc.jar");*///指定本程序的jar包所在的本地路径job.setJarByClass(FlowCountSort.class);//指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(FlowCountSortMapper.class);job.setReducerClass(FlowCountSortReducer.class);//指定mapper输出数据的kv类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);//指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));//指定job的输出结果所在目录
        Path outPath = new Path(args[1]);/*FileSystem fs = FileSystem.get(conf);if(fs.exists(outPath)){fs.delete(outPath, true);}*/FileOutputFormat.setOutputPath(job, outPath);//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行/*job.submit();*/boolean res = job.waitForCompletion(true);System.exit(res?0:1);}}

3.1.4 详细流程示意图        --------------这个图非常重要

------------------------Yarn拿到程序怎样启动,这是今天关心的问题---------------------------------

PS:

转载于:https://www.cnblogs.com/bee-home/p/7908169.html

day08 MapReduce相关推荐

  1. mapreduce理解_大数据

    map:对不同的数据进行同种操作 reduce:按keys 把数据规约到一起 看这篇文章请出去跑两圈,然后泡一壶茶,边喝茶,边看,看完你就对hadoop 与MapReduce的整体有所了解了. [前言 ...

  2. 2021年大数据Hadoop(二十二):MapReduce的自定义分组

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 MapReduce的自定义分组 需求 分析 实现 第一步: ...

  3. 第2节 mapreduce深入学习:4, 5

    第2节 mapreduce深入学习:4.mapreduce的序列化以及自定义排序 序列化(Serialization)是指把结构化对象转化为字节流. 反序列化(Deserialization)是序列化 ...

  4. 第一个MapReduce程序

    计算文件中每个单词的频数 wordcount 程序调用 wordmap 和 wordreduce 程序. 1 import org.apache.hadoop.conf.Configuration; ...

  5. hadoop程序MapReduce之SingletonTableJoin

    需求:单表关联问题.从文件中孩子和父母的关系挖掘出孙子和爷奶关系 样板:child-parent.txt xiaoming daxiong daxiong alice daxiong jack 输出: ...

  6. 初学Hadoop之图解MapReduce与WordCount示例分析

    Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS) ...

  7. MapReduce编程初体验

    需求:在给定的文本文件中统计输出每一个单词出现的总次数 第一步: 准备一个aaa.txt文本文档 第二步: 在文本文档中随便写入一些测试数据,这里我写入的是 hello,world,hadoop he ...

  8. [MaxCompute MapReduce实践]通过简单瘦身,解决Dataworks 10M文件限制问题

    用户在DataWorks上执行MapReduce作业的时候,文件大于10M的JAR和资源文件不能上传到Dataworks,导致无法使用调度去定期执行MapReduce作业. 解决方案: 第一步:大于1 ...

  9. mapreduce作业reduce被大量kill掉

    之前有一段时间.我们的hadoop2.4集群压力非常大.导致提交的job出现大量的reduce被kill掉.同样的job执行时间比在hadoop0.20.203上面长了非常多.这个问题事实上是redu ...

最新文章

  1. UNIX再学习 -- 再识
  2. MySQL 表一列逗号分隔字段,按逗号切割
  3. win7连接sftp_SFTP远程连接服务器上传下载文件-vs2013项目实例
  4. js 请求接口获取不到登录cookie xhrFields 配置
  5. ABP理论之CSRF
  6. java-web-j2e学习建议路线
  7. 开发者应该关注的五项Web新兴技术:WebGL和SVG名列其中
  8. Spring学习14-源码下载地址
  9. 优质前端掘金小册推荐及优惠购买链接
  10. 用户故事与用户故事地图——互联网平台建设
  11. 在Mac下,快速搭建针对WP8应用的PhoneGap开发环境
  12. TYUT太原理工大学2022需求工程考试简答题
  13. 怎样一键比较2个CAD图纸文件的不同呢?
  14. flex布局实现无缝滚动
  15. python处理字体(动态字体库)
  16. zuul+SpringSession的session不一致问题
  17. MDCC 2014移动开发者大会 小礼品展商活动
  18. matlab仿真plant函数,(整理)matlab预测控制工具箱函数.
  19. 字符串的使用(JavaScript)
  20. AWS configure region 配置错误了怎么更正

热门文章

  1. x86服务器当虚拟化的存储,X86服务器虚拟化实施方案.doc
  2. java 与 乱码问题_透彻分析和解决一切javaWeb项目乱码问题
  3. WriteFile写UNICODE字符串 TxT显示乱码
  4. StackOverflow程序员推荐:每个程序员都应读的30本书
  5. 完全二叉树的结点个数
  6. 自己在windows下第一次安装pygame成功的经历
  7. python从入门到实践 第12章 武装飞船 之 调整飞船的速度程序实践
  8. linux下的X server:linux图形界面原理
  9. linux5.5 dvd安装教程,linux 5.5 yum的安装方法(ftp)
  10. Java Jsoup库 实现天气爬取(附第三方库加载方式)