这篇主要介绍利用hdfs接口,使用java编程向hdfs写入数据。

一、模仿hadoop fs -put 和 -copyFromLoca命令,实现本地复制文件到hdfs:

View Code

二、写一个与hadoop fs -getmerge相对应的一个简单程序: putmerge 。
我们知道,getmerge命令是从hdfs上获取大量文件组合成一个文件放到本地文件系统中的命令。但是hadoop没有提供与这一过程相逆的命令。不幸的是我们会在处理apache日志过程中常用到这样的一个命令,比如有很多按日期分的apache日志。

我们想传到hdfs中使用MepReduce来处理的话,我们只能用笨办法先本地合成大文件,然后上传这个大文件到hdfs,这种方法很低效。我们接下来给出一个程序,利用hdfs提供的JavaAPI来编写一个上传多个文件的过程中合成一个大文件的程序:

 1 import java.io.IOException;
 2
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.FSDataInputStream;
 5 import org.apache.hadoop.fs.FSDataOutputStream;
 6 import org.apache.hadoop.fs.FileStatus;
 7 import org.apache.hadoop.fs.FileSystem;
 8 import org.apache.hadoop.fs.Path;
 9
10 public class putMerge {
11     public static void main(String[] args) throws IOException {
12         Configuration conf = new Configuration();
13         FileSystem hdfs = FileSystem.get(conf);
14         FileSystem local = FileSystem.getLocal(conf);
15
16         Path inputDir = new Path(args[0]);
17         Path hdfsFile = new Path(args[1]);
18
19         try {
20             FileStatus[] inputFiles = local.listStatus(inputDir);
21             FSDataOutputStream out = hdfs.create(hdfsFile);
22
23             for (int i = 0; i < inputFiles.length; i++) {
24                 System.out.println(inputFiles[i].getPath().getName());
25                 FSDataInputStream in = local.open(inputFiles[i].getPath());
26                 byte buffer[] = new byte[256];
27                 int bytesRead = 0;
28                 while ((bytesRead = in.read(buffer)) > 0) {
29                     out.write(buffer, 0, bytesRead);
30                 }
31                 in.close();
32             }
33             out.close();
34         } catch (IOException e) {
35             e.printStackTrace();
36         }
38     }
39 }

将代码打包成putMerge.jar格式后放在个人常用的路径下

执行的时候采用:

[root@JueFan pconline]#hadoop jar putMerge.jar putMerge /home/juefan/*(本地目录名) /user/juefan/(HDFS文件系统目录)

三、有时候我们想合并hdfs中的文件,并存在hdfs里,又不想经过下载到local文件系统里这一过程,我们可以书写这样的程序,并且实现递归合并:

 1 import java.io.IOException;
 2
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.FSDataInputStream;
 5 import org.apache.hadoop.fs.FSDataOutputStream;
 6 import org.apache.hadoop.fs.FileStatus;
 7 import org.apache.hadoop.fs.FileSystem;
 8 import org.apache.hadoop.fs.Path;
 9
10 public class filesmerge {
11     public static boolean isRecur = false;
12
13     public static void merge(Path inputDir, Path hdfsFile, FileSystem hdfs,FSDataOutputStream out) {
14         try {
15             FileStatus[] inputFiles = hdfs.listStatus(inputDir);
16             for (int i = 0; i < inputFiles.length; i++) {
17                 if (!hdfs.isFile(inputFiles[i].getPath())) {
18                     if (isRecur){
19                         merge(inputFiles[i].getPath(), hdfsFile, hdfs,out);
20                         return ;
21                     }
22                     else {
23                         System.out.println(inputFiles[i].getPath().getName()
24                                 + "is not file and not allow recursion, skip!");
25                         continue;
26                     }
27                 }
28                 System.out.println(inputFiles[i].getPath().getName());
29                 FSDataInputStream in = hdfs.open(inputFiles[i].getPath());
30                 byte buffer[] = new byte[256];
31                 int bytesRead = 0;
32                 while ((bytesRead = in.read(buffer)) > 0) {
33                     out.write(buffer, 0, bytesRead);
34                 }
35                 in.close();
36             }
37             out.close();
38         } catch (IOException e) {
39             e.printStackTrace();
40         }
41     }
42
43     public static void errorMessage(String str) {
44         System.out.println("Error Message: " + str);
45         System.exit(1);
46     }
47
48     public static void main(String[] args) throws IOException {
49         if (args.length == 0)
50             errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>");
51         if (args[0].matches("^-[rR]$")) {
52             isRecur = true;
53         }
54         if ((isRecur && args.length != 3) || ( !isRecur && args.length != 2)) {
55             errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>");
56         }
57
58         Configuration conf = new Configuration();
59         FileSystem hdfs = FileSystem.get(conf);
60
61         Path inputDir;
62         Path hdfsFile;
63         if(isRecur){
64             inputDir = new Path(args[1]);
65             hdfsFile = new Path(args[2]);
66         }
67         else{
68             inputDir = new Path(args[0]);
69             hdfsFile = new Path(args[1]);
70         }
71
72         if (!hdfs.exists(inputDir)) {
73             errorMessage("hdfsTargetDir not exist!");
74         }
75         if (hdfs.exists(hdfsFile)) {
76             errorMessage("hdfsFileName exist!");
77         }
78
79         FSDataOutputStream out = hdfs.create(hdfsFile);
80         merge(inputDir, hdfsFile, hdfs,out);
81         System.exit(0);
82     }
83 }

四、更不幸的是我们经常遇到的并非正常的文本文件,因为直接存储文本文件比较浪费空间,所以大部分服务器运维人员针对该类日志文件都是进行压缩打包存放的,所以我们有时候,或者说更多情况下需要的是对大量压缩包进行解压缩合并上传到hdfs的命令,为了方便我们同样只能自己搞生产了。

以下代码的主要作用是将HDFS内的zip格式文件转换成gzip格式
目的:hive在进行外部表数据读取的时候不能读取zip格式文件,能读取文本文件
先前的作法是:先从HDFS中把zip文件取到本地中,再将zip文件在本地中解压出来,通过加载数据的形式将本地的文件回到到HDFS中
其弊端主要有:1、可操作性弱,要来回折腾HDFS与本地上的文件   2、zip文件解压出来加载回HDFS后占用的空间较大
通过正面这种形式能够解决以上两个问题
  1 import java.io.File;
  2 import java.io.IOException;
  3 import java.util.zip.GZIPOutputStream;
  4 import java.util.zip.ZipEntry;
  5 import java.util.zip.ZipInputStream;
  6
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.fs.FSDataInputStream;
  9 import org.apache.hadoop.fs.FSDataOutputStream;
 10 import org.apache.hadoop.fs.FileStatus;
 11 import org.apache.hadoop.fs.FileSystem;
 12 import org.apache.hadoop.fs.Path;
 13 import org.apache.hadoop.io.Text;
 14
 15 public class filesmerge {
 16     //判断是否递归执行
 17     public static boolean isRecur = false;
 18
 19     /**
 20      * @author JueFan
 21      * @param inputDir zip文件的存储地址
 22      * @param hdfsFile 解压结果的存储地址
 23      * @param hdfs 分布式文件系统数据流
 24      * @param pcgroupText 需要解压缩的文件关键名
 25      */
 26     public static void merge(Path inputDir, Path hdfsFile,
 27             FileSystem hdfs, Text pcgroupText) {
 28         try {
 29             //文件系统地址inputDir下的FileStatus
 30             FileStatus[] inputFiles = hdfs.listStatus(inputDir);
 31             for (int i = 0; i < inputFiles.length; i++) {
 32                 if (!hdfs.isFile(inputFiles[i].getPath())) {
 33                     if (isRecur){
 34                         merge(inputFiles[i].getPath(), hdfsFile, hdfs,pcgroupText);
 35                         return ;
 36                     }
 37                     else {
 38                         System.out.println(inputFiles[i].getPath().getName()
 39                                 + "is not file and not allow recursion, skip!");
 40                         continue;
 41                     }
 42                 }
 43                 //判断文件名是否在需要解压缩的关键名内
 44                 if(inputFiles[i].getPath().getName().contains(pcgroupText.toString()) == true){
 45                     //输出待解压的文件名
 46                     System.out.println(inputFiles[i].getPath().getName());
 47                     //将数据流指向待解压文件
 48                     FSDataInputStream in = hdfs.open(inputFiles[i].getPath());
 49                     /**
 50                      *数据的解压执行过程
 51                      */
 52                     ZipInputStream zipInputStream = null;
 53                     try{
 54                         zipInputStream = new ZipInputStream(in);
 55                         ZipEntry entry;
 56                         //解压后有多个文件一并解压出来并实现合并
 57                         //合并后的地址
 58                         FSDataOutputStream mergerout = hdfs.create(new Path(hdfsFile + File.separator +
 59                                 inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf("."))));
 60                         while((entry = zipInputStream.getNextEntry()) != null){
 61                             byte[] buffer1 = new byte[2048];
 62                             int nNumber;
 63                             while((nNumber = zipInputStream.read(buffer1,
 64                                     0, buffer1.length)) != -1)
 65                                 mergerout.write(buffer1, 0, nNumber);
 66                         }
 67                         mergerout.flush();
 68                         mergerout.close();
 69                         zipInputStream.close();
 70                     }catch(IOException e){
 71                         continue;
 72                     }
 73                     in.close();
 74                     /**
 75                      *将解压合并后的数据压缩成gzip格式
 76                      */
 77                     GZIPOutputStream gzipOutputStream = null;
 78                     try{
 79                         FSDataOutputStream outputStream = null;
 80                         outputStream = hdfs.create(new Path(hdfsFile + File.separator +
 81                                 inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")) + ".gz"));
 82                         FSDataInputStream inputStream = null;
 83                         gzipOutputStream = new GZIPOutputStream(outputStream);
 84                         inputStream = hdfs.open(new Path(hdfsFile + File.separator + inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf("."))));
 85                         byte[] buffer = new byte[1024];
 86                         int len;
 87                         while((len = inputStream.read(buffer)) > 0){
 88                             gzipOutputStream.write(buffer, 0, len);
 89                         }
 90                         inputStream.close();
 91                         gzipOutputStream.finish();
 92                         gzipOutputStream.flush();
 93                         outputStream.close();
 94                     }catch (Exception exception){
 95                         exception.printStackTrace();
 96                     }
 97                     gzipOutputStream.close();
 98                     //删除zip文件解压合并后的临时文件
 99                     String tempfiles = hdfsFile + File.separator + inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf("."));
100                     try{
101                         if(hdfs.exists(new Path(tempfiles))){
102                             hdfs.delete(new Path(tempfiles), true);
103                         }
104                     }catch(IOException ie){
105                         ie.printStackTrace();
106                     }
107                 }
108             }
109         }catch (IOException e) {
110             e.printStackTrace();
111         }
112     }
113
114     public static void errorMessage(String str) {
115         System.out.println("Error Message: " + str);
116         System.exit(1);
117     }
118
119     @SuppressWarnings("null")
120     public static void main(String[] args) throws IOException {
121         if (args.length == 0)
122             errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>");
123         if (args[0].matches("^-[rR]$")) {
124             isRecur = true;
125         }
126         if ((isRecur && args.length != 4) || ( !isRecur && args.length != 3)) {
127             errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>");
128         }
129
130         Configuration conf = new Configuration();
131         FileSystem hdfs = FileSystem.get(conf);
132
133         Path inputDir;
134         Path hdfsFile;
135         Text pcgroupText;
136         if(isRecur){
137             inputDir = new Path(args[1]);
138             hdfsFile = new Path(args[2]);
139             pcgroupText = new Text(args[3]);
140         }
141         else{
142             inputDir = new Path(args[0]);
143             hdfsFile = new Path(args[1]);
144             pcgroupText = new Text(args[2]);
145         }
146
147         if (!hdfs.exists(inputDir)) {
148             errorMessage("hdfsTargetDir not exist!");
149         }
150         if (hdfs.exists(hdfsFile)) {
151             errorMessage("hdfsFileName exist!");
152         }
153         merge(inputDir, hdfsFile, hdfs, pcgroupText);
154         System.exit(0);
155     }
156 }

具体的执行方式如下:

[root@JueFan pconline]# hadoop jar zip_to_gzip.jar filesmerge /zip/(待转换文件路径,在HDFS上) /user/juefan/pconline/(转换完成后的文件存储地址,也在HDFS上) pconline(待转换的文件名包含的字符) 

如果要实现递归的话,可以在filesmerge后面加上 -r  

本文的前三个要点转载自三江小渡的博客:http://blog.pureisle.net/archives/1701.html

该博客的第四个要点没有写出具体实现,本人就帮忙完善了

转载于:https://www.cnblogs.com/juefan/archive/2013/02/27/2935163.html

HDFS文件系统内的文件格式转换(zip格式转化成gzip格式)相关推荐

  1. 如何将png/jpg等图片格式转化成eps格式

    如何将png/jpg等图片格式转化成eps格式 在我们提交论文时,往往需要提交可编辑的图片供期刊编辑进行修改,可接受的格式有:Word.eps.ps.tiff.ppt和excel. eps格式是用的比 ...

  2. php pkcs 1格式的公钥,解说--2--微信支付RSA公钥PKCS1格式转化成PKCS8格式的公钥

    最近在开发一个功能:微信自动转账给个人用户(个人微信零钱.银行卡) 今天只讲RSA公钥PKCS1格式转化成PKCS8格式的公钥 先说说解决过程(一路心酸,一万个······): 1.昨晚开始转格式,未 ...

  3. java字符串数组转json_java中字符串String格式转化成json格式

    java字符串数组转json_java中字符串String格式转化成json格式 String s= Connection.deleteHost("10310");System.o ...

  4. pbmplus-图像文件格式转换包与PBM/PGM/PPM 格式图像文件

    pbmplus-图像文件格式转换包 PBMPLUS 是一个用于多种图像类型和可移植格式(portble formats)之间来回转换的工具包.官方网站主页介绍,和下载地址在这里.该工具的想法是如果你想 ...

  5. 讲讲如何将图片格式转化成base64格式的

    又到周五了,这几周遇到一个让我很头疼的问题,接口文档里写着: 尼玛!看着就头疼,这写的什么呀,完全看不懂呀! 于是乎,一番百度,似乎有了点头绪,下面讲讲: 我不是来讲概念的,直接切入正题,图片的bas ...

  6. java中字符串String格式转化成json格式

    一.将Sting转为Json 在开发中,经常是前端发送一串数据过来之后,是通过一个参数带着一串值过来,这一串值是String 格式的,但是里面的内容其实是json格式类型的,所以拿到值之后就要将该值转 ...

  7. 怎么把qlv格式转化成mp3格式 格式工厂

    1.搜索: 小白兔视频格式在线转换 2.上传你的视频(腾讯qlv,爱奇艺qsv.优酷kux)都可以. 3.转换好后,我们把转换的视频下载到电脑里,就可以看到视频已经是MP4格式了.

  8. 【JavaScript】VUE前端实现微信版录制音频wav格式转化成mp3格式和Base64

    一.前言 前端有个需求是要实现一个像微信一样,按住录音,松开发送语音,期间踩了不少坑,特地记录一下,主要用到两个库 js-audio-recorder :负责录制音频,支持的格式只有wav.pcm L ...

  9. .tex类型文件怎么阅读_有了这些神器,什么文件格式转换都能搞得定!

    大家好,我是小渔. 周一的时候,睿文老师写了一篇关于页码的干货文,为了方便大家学习,还录制了视频. 一般呢,在微信上发布的文章,小渔都需要同步到其他平台.然而在上传视频的时候,小渔却怎么也找不到文件. ...

最新文章

  1. CodeGen融合核心关系循环扩展
  2. 程序分析工具gprof介绍
  3. JavaScript装逼指南
  4. ionic开发ios app
  5. PostgreSQL 中的引号与大小写
  6. Laravel日志查看器 -- log-viewer扩展
  7. Webpack使用指南
  8. win7创建虚拟无线网络
  9. 数据库,部分函数依赖,传递函数依赖,完全函数依赖,三种范式的区别
  10. java中使用字符(字节)读取文本内容
  11. 更改数据库表中有数据的字段类型NUMERIC(18,2)为NUMERIC(18,6)
  12. 机器学习-数据科学库-day6
  13. Mybatis-第N篇配置log4j1、log4j2打印执行的sql语句
  14. delphi压缩后使用http协议base64上传下载6G超大文件
  15. 检测UDP端口是否畅通方法
  16. 前端js生成自定义内容的PDF及word文件的实现
  17. 台式机就是指什么的计算机,什么是台式机操作系统
  18. 如何知道计算机是否支持64位,查看CPU是否支持64位操作系统的简单方法
  19. 【学习】笔记本电脑重新安装系统win10
  20. GeoServer结合FWTools切片工具发布影像金字塔切片

热门文章

  1. java自动化测试语言高级之Applet 基础
  2. c# aspx转为html,asp.net(c#)网页跳转七种方法小结
  3. VIAVI MTS-6000A新一代电信级以太网测试解决方案
  4. 对于拓展欧几里德算法的理解
  5. http-server简单HTTP服务器配置
  6. 基于STM32的智能风扇的制作
  7. 【node】升级 Node 版本教程
  8. wxMEdit 新增德文翻译
  9. 蓝汛ChinaCache打破传输瓶颈,提高宽带利用率
  10. Django对接支付宝Alipay支付接口