
一、模仿hadoop fs -put 和 -copyFromLoca命令,实现本地复制文件到hdfs:

二、写一个与hadoop fs -getmerge相对应的一个简单程序: putmerge 。


 1 import java.io.IOException;
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.FSDataInputStream;
 5 import org.apache.hadoop.fs.FSDataOutputStream;
 6 import org.apache.hadoop.fs.FileStatus;
 7 import org.apache.hadoop.fs.FileSystem;
 8 import org.apache.hadoop.fs.Path;
10 public class putMerge {
11     public static void main(String[] args) throws IOException {
12         Configuration conf = new Configuration();
13         FileSystem hdfs = FileSystem.get(conf);
14         FileSystem local = FileSystem.getLocal(conf);
16         Path inputDir = new Path(args[0]);
17         Path hdfsFile = new Path(args[1]);
19         try {
20             FileStatus[] inputFiles = local.listStatus(inputDir);
21             FSDataOutputStream out = hdfs.create(hdfsFile);
23             for (int i = 0; i < inputFiles.length; i++) {
24                 System.out.println(inputFiles[i].getPath().getName());
25                 FSDataInputStream in = local.open(inputFiles[i].getPath());
26                 byte buffer[] = new byte[256];
27                 int bytesRead = 0;
28                 while ((bytesRead = in.read(buffer)) > 0) {
29                     out.write(buffer, 0, bytesRead);
30                 }
31                 in.close();
32             }
33             out.close();
34         } catch (IOException e) {
35             e.printStackTrace();
36         }
38     }
39 }



[root@JueFan pconline]#hadoop jar putMerge.jar putMerge /home/juefan/*(本地目录名) /user/juefan/(HDFS文件系统目录)


 1 import java.io.IOException;
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.FSDataInputStream;
 5 import org.apache.hadoop.fs.FSDataOutputStream;
 6 import org.apache.hadoop.fs.FileStatus;
 7 import org.apache.hadoop.fs.FileSystem;
 8 import org.apache.hadoop.fs.Path;
10 public class filesmerge {
11     public static boolean isRecur = false;
13     public static void merge(Path inputDir, Path hdfsFile, FileSystem hdfs,FSDataOutputStream out) {
14         try {
15             FileStatus[] inputFiles = hdfs.listStatus(inputDir);
16             for (int i = 0; i < inputFiles.length; i++) {
17                 if (!hdfs.isFile(inputFiles[i].getPath())) {
18                     if (isRecur){
19                         merge(inputFiles[i].getPath(), hdfsFile, hdfs,out);
20                         return ;
21                     }
22                     else {
23                         System.out.println(inputFiles[i].getPath().getName()
24                                 + "is not file and not allow recursion, skip!");
25                         continue;
26                     }
27                 }
28                 System.out.println(inputFiles[i].getPath().getName());
29                 FSDataInputStream in = hdfs.open(inputFiles[i].getPath());
30                 byte buffer[] = new byte[256];
31                 int bytesRead = 0;
32                 while ((bytesRead = in.read(buffer)) > 0) {
33                     out.write(buffer, 0, bytesRead);
34                 }
35                 in.close();
36             }
37             out.close();
38         } catch (IOException e) {
39             e.printStackTrace();
40         }
41     }
43     public static void errorMessage(String str) {
44         System.out.println("Error Message: " + str);
45         System.exit(1);
46     }
48     public static void main(String[] args) throws IOException {
49         if (args.length == 0)
50             errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>");
51         if (args[0].matches("^-[rR]$")) {
52             isRecur = true;
53         }
54         if ((isRecur && args.length != 3) || ( !isRecur && args.length != 2)) {
55             errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>");
56         }
58         Configuration conf = new Configuration();
59         FileSystem hdfs = FileSystem.get(conf);
61         Path inputDir;
62         Path hdfsFile;
63         if(isRecur){
64             inputDir = new Path(args[1]);
65             hdfsFile = new Path(args[2]);
66         }
67         else{
68             inputDir = new Path(args[0]);
69             hdfsFile = new Path(args[1]);
70         }
72         if (!hdfs.exists(inputDir)) {
73             errorMessage("hdfsTargetDir not exist!");
74         }
75         if (hdfs.exists(hdfsFile)) {
76             errorMessage("hdfsFileName exist!");
77         }
79         FSDataOutputStream out = hdfs.create(hdfsFile);
80         merge(inputDir, hdfsFile, hdfs,out);
81         System.exit(0);
82     }
83 }


其弊端主要有:1、可操作性弱,要来回折腾HDFS与本地上的文件   2、zip文件解压出来加载回HDFS后占用的空间较大
  1 import java.io.File;
  2 import java.io.IOException;
  3 import java.util.zip.GZIPOutputStream;
  4 import java.util.zip.ZipEntry;
  5 import java.util.zip.ZipInputStream;
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.fs.FSDataInputStream;
  9 import org.apache.hadoop.fs.FSDataOutputStream;
 10 import org.apache.hadoop.fs.FileStatus;
 11 import org.apache.hadoop.fs.FileSystem;
 12 import org.apache.hadoop.fs.Path;
 13 import org.apache.hadoop.io.Text;
 15 public class filesmerge {
 16     //判断是否递归执行
 17     public static boolean isRecur = false;
 19     /**
 20      * @author JueFan
 21      * @param inputDir zip文件的存储地址
 22      * @param hdfsFile 解压结果的存储地址
 23      * @param hdfs 分布式文件系统数据流
 24      * @param pcgroupText 需要解压缩的文件关键名
 25      */
 26     public static void merge(Path inputDir, Path hdfsFile,
 27             FileSystem hdfs, Text pcgroupText) {
 28         try {
 29             //文件系统地址inputDir下的FileStatus
 30             FileStatus[] inputFiles = hdfs.listStatus(inputDir);
 31             for (int i = 0; i < inputFiles.length; i++) {
 32                 if (!hdfs.isFile(inputFiles[i].getPath())) {
 33                     if (isRecur){
 34                         merge(inputFiles[i].getPath(), hdfsFile, hdfs,pcgroupText);
 35                         return ;
 36                     }
 37                     else {
 38                         System.out.println(inputFiles[i].getPath().getName()
 39                                 + "is not file and not allow recursion, skip!");
 40                         continue;
 41                     }
 42                 }
 43                 //判断文件名是否在需要解压缩的关键名内
 44                 if(inputFiles[i].getPath().getName().contains(pcgroupText.toString()) == true){
 45                     //输出待解压的文件名
 46                     System.out.println(inputFiles[i].getPath().getName());
 47                     //将数据流指向待解压文件
 48                     FSDataInputStream in = hdfs.open(inputFiles[i].getPath());
 49                     /**
 50                      *数据的解压执行过程
 51                      */
 52                     ZipInputStream zipInputStream = null;
 53                     try{
 54                         zipInputStream = new ZipInputStream(in);
 55                         ZipEntry entry;
 56                         //解压后有多个文件一并解压出来并实现合并
 57                         //合并后的地址
 58                         FSDataOutputStream mergerout = hdfs.create(new Path(hdfsFile + File.separator +
 59                                 inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf("."))));
 60                         while((entry = zipInputStream.getNextEntry()) != null){
 61                             byte[] buffer1 = new byte[2048];
 62                             int nNumber;
 63                             while((nNumber = zipInputStream.read(buffer1,
 64                                     0, buffer1.length)) != -1)
 65                                 mergerout.write(buffer1, 0, nNumber);
 66                         }
 67                         mergerout.flush();
 68                         mergerout.close();
 69                         zipInputStream.close();
 70                     }catch(IOException e){
 71                         continue;
 72                     }
 73                     in.close();
 74                     /**
 75                      *将解压合并后的数据压缩成gzip格式
 76                      */
 77                     GZIPOutputStream gzipOutputStream = null;
 78                     try{
 79                         FSDataOutputStream outputStream = null;
 80                         outputStream = hdfs.create(new Path(hdfsFile + File.separator +
 81                                 inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")) + ".gz"));
 82                         FSDataInputStream inputStream = null;
 83                         gzipOutputStream = new GZIPOutputStream(outputStream);
 84                         inputStream = hdfs.open(new Path(hdfsFile + File.separator + inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf("."))));
 85                         byte[] buffer = new byte[1024];
 86                         int len;
 87                         while((len = inputStream.read(buffer)) > 0){
 88                             gzipOutputStream.write(buffer, 0, len);
 89                         }
 90                         inputStream.close();
 91                         gzipOutputStream.finish();
 92                         gzipOutputStream.flush();
 93                         outputStream.close();
 94                     }catch (Exception exception){
 95                         exception.printStackTrace();
 96                     }
 97                     gzipOutputStream.close();
 98                     //删除zip文件解压合并后的临时文件
 99                     String tempfiles = hdfsFile + File.separator + inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf("."));
100                     try{
101                         if(hdfs.exists(new Path(tempfiles))){
102                             hdfs.delete(new Path(tempfiles), true);
103                         }
104                     }catch(IOException ie){
105                         ie.printStackTrace();
106                     }
107                 }
108             }
109         }catch (IOException e) {
110             e.printStackTrace();
111         }
112     }
114     public static void errorMessage(String str) {
115         System.out.println("Error Message: " + str);
116         System.exit(1);
117     }
119     @SuppressWarnings("null")
120     public static void main(String[] args) throws IOException {
121         if (args.length == 0)
122             errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>");
123         if (args[0].matches("^-[rR]$")) {
124             isRecur = true;
125         }
126         if ((isRecur && args.length != 4) || ( !isRecur && args.length != 3)) {
127             errorMessage("filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>");
128         }
130         Configuration conf = new Configuration();
131         FileSystem hdfs = FileSystem.get(conf);
133         Path inputDir;
134         Path hdfsFile;
135         Text pcgroupText;
136         if(isRecur){
137             inputDir = new Path(args[1]);
138             hdfsFile = new Path(args[2]);
139             pcgroupText = new Text(args[3]);
140         }
141         else{
142             inputDir = new Path(args[0]);
143             hdfsFile = new Path(args[1]);
144             pcgroupText = new Text(args[2]);
145         }
147         if (!hdfs.exists(inputDir)) {
148             errorMessage("hdfsTargetDir not exist!");
149         }
150         if (hdfs.exists(hdfsFile)) {
151             errorMessage("hdfsFileName exist!");
152         }
153         merge(inputDir, hdfsFile, hdfs, pcgroupText);
154         System.exit(0);
155     }
156 }


[root@JueFan pconline]# hadoop jar zip_to_gzip.jar filesmerge /zip/(待转换文件路径,在HDFS上) /user/juefan/pconline/(转换完成后的文件存储地址,也在HDFS上) pconline(待转换的文件名包含的字符) 

如果要实现递归的话,可以在filesmerge后面加上 -r  





