HDFS文件系统内的文件格式转换(zip格式转化成gzip格式)
这篇主要介绍利用hdfs接口,使用java编程向hdfs写入数据。
一、模仿hadoop fs -put 和 -copyFromLoca命令,实现本地复制文件到hdfs:
![](/assets/blank.gif)
![](/assets/blank.gif)
二、写一个与hadoop fs -getmerge相对应的一个简单程序: putmerge 。
我们知道,getmerge命令是从hdfs上获取大量文件组合成一个文件放到本地文件系统中的命令。但是hadoop没有提供与这一过程相逆的命令。不幸的是我们会在处理apache日志过程中常用到这样的一个命令,比如有很多按日期分的apache日志。
我们想传到hdfs中使用MepReduce来处理的话,我们只能用笨办法先本地合成大文件,然后上传这个大文件到hdfs,这种方法很低效。我们接下来给出一个程序,利用hdfs提供的JavaAPI来编写一个上传多个文件的过程中合成一个大文件的程序:
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FSDataInputStream; 5 import org.apache.hadoop.fs.FSDataOutputStream; 6 import org.apache.hadoop.fs.FileStatus; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 10 public class putMerge { 11 public static void main(String[] args) throws IOException { 12 Configuration conf = new Configuration(); 13 FileSystem hdfs = FileSystem.get(conf); 14 FileSystem local = FileSystem.getLocal(conf); 15 16 Path inputDir = new Path(args[0]); 17 Path hdfsFile = new Path(args[1]); 18 19 try { 20 FileStatus[] inputFiles = local.listStatus(inputDir); 21 FSDataOutputStream out = hdfs.create(hdfsFile); 22 23 for (int i = 0; i < inputFiles.length; i++) { 24 System.out.println(inputFiles[i].getPath().getName()); 25 FSDataInputStream in = local.open(inputFiles[i].getPath()); 26 byte buffer[] = new byte[256]; 27 int bytesRead = 0; 28 while ((bytesRead = in.read(buffer)) > 0) { 29 out.write(buffer, 0, bytesRead); 30 } 31 in.close(); 32 } 33 out.close(); 34 } catch (IOException e) { 35 e.printStackTrace(); 36 } 38 } 39 }
将代码打包成putMerge.jar格式后放在个人常用的路径下
执行的时候采用:
三、有时候我们想合并hdfs中的文件,并存在hdfs里,又不想经过下载到local文件系统里这一过程,我们可以书写这样的程序,并且实现递归合并:
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FSDataInputStream; 5 import org.apache.hadoop.fs.FSDataOutputStream; 6 import org.apache.hadoop.fs.FileStatus; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 10 public class filesmerge { 11 public static boolean isRecur = false; 12 13 public static void merge(Path inputDir, Path hdfsFile, FileSystem hdfs,FSDataOutputStream out) { 14 try { 15 FileStatus[] inputFiles = hdfs.listStatus(inputDir); 16 for (int i = 0; i < inputFiles.length; i++) { 17 if (!hdfs.isFile(inputFiles[i].getPath())) { 18 if (isRecur){ 19 merge(inputFiles[i].getPath(), hdfsFile, hdfs,out); 20 return ; 21 } 22 else { 23 System.out.println(inputFiles[i].getPath().getName() 24 + "is not file and not allow recursion, skip!"); 25 continue; 26 } 27 } 28 System.out.println(inputFiles[i].getPath().getName()); 29 FSDataInputStream in = hdfs.open(inputFiles[i].getPath()); 30 byte buffer[] = new byte[256]; 31 int bytesRead = 0; 32 while ((bytesRead = in.read(buffer)) > 0) { 33 out.write(buffer, 0, bytesRead); 34 } 35 in.close(); 36 } 37 out.close(); 38 } catch (IOException e) { 39 e.printStackTrace(); 40 } 41 } 42 43 public static void errorMessage(String str) { 44 System.out.println("Error Message: " + str); 45 System.exit(1); 46 } 47 48 public static void main(String[] args) throws IOException { 49 if (args.length == 0) 50 errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>"); 51 if (args[0].matches("^-[rR]$")) { 52 isRecur = true; 53 } 54 if ((isRecur && args.length != 3) || ( !isRecur && args.length != 2)) { 55 errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>"); 56 } 57 58 Configuration conf = new Configuration(); 59 FileSystem hdfs = FileSystem.get(conf); 60 61 Path inputDir; 62 Path hdfsFile; 63 if(isRecur){ 64 inputDir = new Path(args[1]); 65 hdfsFile = new Path(args[2]); 66 } 67 else{ 68 inputDir = new Path(args[0]); 69 hdfsFile = new Path(args[1]); 70 } 71 72 if (!hdfs.exists(inputDir)) { 73 errorMessage("hdfsTargetDir not exist!"); 74 } 75 if (hdfs.exists(hdfsFile)) { 76 errorMessage("hdfsFileName exist!"); 77 } 78 79 FSDataOutputStream out = hdfs.create(hdfsFile); 80 merge(inputDir, hdfsFile, hdfs,out); 81 System.exit(0); 82 } 83 }
四、更不幸的是我们经常遇到的并非正常的文本文件,因为直接存储文本文件比较浪费空间,所以大部分服务器运维人员针对该类日志文件都是进行压缩打包存放的,所以我们有时候,或者说更多情况下需要的是对大量压缩包进行解压缩合并上传到hdfs的命令,为了方便我们同样只能自己搞生产了。
1 import java.io.File; 2 import java.io.IOException; 3 import java.util.zip.GZIPOutputStream; 4 import java.util.zip.ZipEntry; 5 import java.util.zip.ZipInputStream; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FSDataInputStream; 9 import org.apache.hadoop.fs.FSDataOutputStream; 10 import org.apache.hadoop.fs.FileStatus; 11 import org.apache.hadoop.fs.FileSystem; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.io.Text; 14 15 public class filesmerge { 16 //判断是否递归执行 17 public static boolean isRecur = false; 18 19 /** 20 * @author JueFan 21 * @param inputDir zip文件的存储地址 22 * @param hdfsFile 解压结果的存储地址 23 * @param hdfs 分布式文件系统数据流 24 * @param pcgroupText 需要解压缩的文件关键名 25 */ 26 public static void merge(Path inputDir, Path hdfsFile, 27 FileSystem hdfs, Text pcgroupText) { 28 try { 29 //文件系统地址inputDir下的FileStatus 30 FileStatus[] inputFiles = hdfs.listStatus(inputDir); 31 for (int i = 0; i < inputFiles.length; i++) { 32 if (!hdfs.isFile(inputFiles[i].getPath())) { 33 if (isRecur){ 34 merge(inputFiles[i].getPath(), hdfsFile, hdfs,pcgroupText); 35 return ; 36 } 37 else { 38 System.out.println(inputFiles[i].getPath().getName() 39 + "is not file and not allow recursion, skip!"); 40 continue; 41 } 42 } 43 //判断文件名是否在需要解压缩的关键名内 44 if(inputFiles[i].getPath().getName().contains(pcgroupText.toString()) == true){ 45 //输出待解压的文件名 46 System.out.println(inputFiles[i].getPath().getName()); 47 //将数据流指向待解压文件 48 FSDataInputStream in = hdfs.open(inputFiles[i].getPath()); 49 /** 50 *数据的解压执行过程 51 */ 52 ZipInputStream zipInputStream = null; 53 try{ 54 zipInputStream = new ZipInputStream(in); 55 ZipEntry entry; 56 //解压后有多个文件一并解压出来并实现合并 57 //合并后的地址 58 FSDataOutputStream mergerout = hdfs.create(new Path(hdfsFile + File.separator + 59 inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")))); 60 while((entry = zipInputStream.getNextEntry()) != null){ 61 byte[] buffer1 = new byte[2048]; 62 int nNumber; 63 while((nNumber = zipInputStream.read(buffer1, 64 0, buffer1.length)) != -1) 65 mergerout.write(buffer1, 0, nNumber); 66 } 67 mergerout.flush(); 68 mergerout.close(); 69 zipInputStream.close(); 70 }catch(IOException e){ 71 continue; 72 } 73 in.close(); 74 /** 75 *将解压合并后的数据压缩成gzip格式 76 */ 77 GZIPOutputStream gzipOutputStream = null; 78 try{ 79 FSDataOutputStream outputStream = null; 80 outputStream = hdfs.create(new Path(hdfsFile + File.separator + 81 inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")) + ".gz")); 82 FSDataInputStream inputStream = null; 83 gzipOutputStream = new GZIPOutputStream(outputStream); 84 inputStream = hdfs.open(new Path(hdfsFile + File.separator + inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")))); 85 byte[] buffer = new byte[1024]; 86 int len; 87 while((len = inputStream.read(buffer)) > 0){ 88 gzipOutputStream.write(buffer, 0, len); 89 } 90 inputStream.close(); 91 gzipOutputStream.finish(); 92 gzipOutputStream.flush(); 93 outputStream.close(); 94 }catch (Exception exception){ 95 exception.printStackTrace(); 96 } 97 gzipOutputStream.close(); 98 //删除zip文件解压合并后的临时文件 99 String tempfiles = hdfsFile + File.separator + inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")); 100 try{ 101 if(hdfs.exists(new Path(tempfiles))){ 102 hdfs.delete(new Path(tempfiles), true); 103 } 104 }catch(IOException ie){ 105 ie.printStackTrace(); 106 } 107 } 108 } 109 }catch (IOException e) { 110 e.printStackTrace(); 111 } 112 } 113 114 public static void errorMessage(String str) { 115 System.out.println("Error Message: " + str); 116 System.exit(1); 117 } 118 119 @SuppressWarnings("null") 120 public static void main(String[] args) throws IOException { 121 if (args.length == 0) 122 errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>"); 123 if (args[0].matches("^-[rR]$")) { 124 isRecur = true; 125 } 126 if ((isRecur && args.length != 4) || ( !isRecur && args.length != 3)) { 127 errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>"); 128 } 129 130 Configuration conf = new Configuration(); 131 FileSystem hdfs = FileSystem.get(conf); 132 133 Path inputDir; 134 Path hdfsFile; 135 Text pcgroupText; 136 if(isRecur){ 137 inputDir = new Path(args[1]); 138 hdfsFile = new Path(args[2]); 139 pcgroupText = new Text(args[3]); 140 } 141 else{ 142 inputDir = new Path(args[0]); 143 hdfsFile = new Path(args[1]); 144 pcgroupText = new Text(args[2]); 145 } 146 147 if (!hdfs.exists(inputDir)) { 148 errorMessage("hdfsTargetDir not exist!"); 149 } 150 if (hdfs.exists(hdfsFile)) { 151 errorMessage("hdfsFileName exist!"); 152 } 153 merge(inputDir, hdfsFile, hdfs, pcgroupText); 154 System.exit(0); 155 } 156 }
具体的执行方式如下:
如果要实现递归的话,可以在filesmerge后面加上 -r
本文的前三个要点转载自三江小渡的博客:http://blog.pureisle.net/archives/1701.html
该博客的第四个要点没有写出具体实现,本人就帮忙完善了
转载于:https://www.cnblogs.com/juefan/archive/2013/02/27/2935163.html
HDFS文件系统内的文件格式转换(zip格式转化成gzip格式)相关推荐
- 如何将png/jpg等图片格式转化成eps格式
如何将png/jpg等图片格式转化成eps格式 在我们提交论文时,往往需要提交可编辑的图片供期刊编辑进行修改,可接受的格式有:Word.eps.ps.tiff.ppt和excel. eps格式是用的比 ...
- php pkcs 1格式的公钥,解说--2--微信支付RSA公钥PKCS1格式转化成PKCS8格式的公钥
最近在开发一个功能:微信自动转账给个人用户(个人微信零钱.银行卡) 今天只讲RSA公钥PKCS1格式转化成PKCS8格式的公钥 先说说解决过程(一路心酸,一万个······): 1.昨晚开始转格式,未 ...
- java字符串数组转json_java中字符串String格式转化成json格式
java字符串数组转json_java中字符串String格式转化成json格式 String s= Connection.deleteHost("10310");System.o ...
- pbmplus-图像文件格式转换包与PBM/PGM/PPM 格式图像文件
pbmplus-图像文件格式转换包 PBMPLUS 是一个用于多种图像类型和可移植格式(portble formats)之间来回转换的工具包.官方网站主页介绍,和下载地址在这里.该工具的想法是如果你想 ...
- 讲讲如何将图片格式转化成base64格式的
又到周五了,这几周遇到一个让我很头疼的问题,接口文档里写着: 尼玛!看着就头疼,这写的什么呀,完全看不懂呀! 于是乎,一番百度,似乎有了点头绪,下面讲讲: 我不是来讲概念的,直接切入正题,图片的bas ...
- java中字符串String格式转化成json格式
一.将Sting转为Json 在开发中,经常是前端发送一串数据过来之后,是通过一个参数带着一串值过来,这一串值是String 格式的,但是里面的内容其实是json格式类型的,所以拿到值之后就要将该值转 ...
- 怎么把qlv格式转化成mp3格式 格式工厂
1.搜索: 小白兔视频格式在线转换 2.上传你的视频(腾讯qlv,爱奇艺qsv.优酷kux)都可以. 3.转换好后,我们把转换的视频下载到电脑里,就可以看到视频已经是MP4格式了.
- 【JavaScript】VUE前端实现微信版录制音频wav格式转化成mp3格式和Base64
一.前言 前端有个需求是要实现一个像微信一样,按住录音,松开发送语音,期间踩了不少坑,特地记录一下,主要用到两个库 js-audio-recorder :负责录制音频,支持的格式只有wav.pcm L ...
- .tex类型文件怎么阅读_有了这些神器,什么文件格式转换都能搞得定!
大家好,我是小渔. 周一的时候,睿文老师写了一篇关于页码的干货文,为了方便大家学习,还录制了视频. 一般呢,在微信上发布的文章,小渔都需要同步到其他平台.然而在上传视频的时候,小渔却怎么也找不到文件. ...
最新文章
- CodeGen融合核心关系循环扩展
- 程序分析工具gprof介绍
- JavaScript装逼指南
- ionic开发ios app
- PostgreSQL 中的引号与大小写
- Laravel日志查看器 -- log-viewer扩展
- Webpack使用指南
- win7创建虚拟无线网络
- 数据库,部分函数依赖,传递函数依赖,完全函数依赖,三种范式的区别
- java中使用字符(字节)读取文本内容
- 更改数据库表中有数据的字段类型NUMERIC(18,2)为NUMERIC(18,6)
- 机器学习-数据科学库-day6
- Mybatis-第N篇配置log4j1、log4j2打印执行的sql语句
- delphi压缩后使用http协议base64上传下载6G超大文件
- 检测UDP端口是否畅通方法
- 前端js生成自定义内容的PDF及word文件的实现
- 台式机就是指什么的计算机,什么是台式机操作系统
- 如何知道计算机是否支持64位,查看CPU是否支持64位操作系统的简单方法
- 【学习】笔记本电脑重新安装系统win10
- GeoServer结合FWTools切片工具发布影像金字塔切片