相比于Hadoop,Spark在数据的处理方面更加灵活方便。然而在最近的使用中遇到了一点小麻烦:Spark保存文件的的函数(如saveAsTextFile)在保存数据时都需要新建一个目录,然后在这个目录下分块保存文件。如果我们想在原有的目录下增加一个文件(而不是增加一个目录)

rddx.repartition(1).saveAsTextFile("test/test.txt")
rddx.coalesce(1).saveAsTextFile("test/test.txt")

把分区设置成1个 结果是Spark仍然是新建了一个目录test.txt,然后在这个目录下把数据都保存在了part-00000文件中

问题:如何让spark将Rdd结果输出到一个文件而不是目录中呢?

Spark的保存模式的设定注定了在保存数据的时候只能新建目录,如果想把数据增加到原有的目录中,单独作为一个文件,就只能借助于hadoop的HDFS操作。下面的例子演示如何用Hadoop的FileSystem实现在已有目录下用一个文件保存Spark数据:

package com.ys.penspark.util;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.net.URI;/*** @ClassName: HdfsOperate* @Description:* @Author: Administrator* @Date: 2017/6/28*/
public class HdfsOperate implements Serializable {private static Logger logger = LoggerFactory.getLogger(HdfsOperate.class);private static Configuration conf = new Configuration();private static BufferedWriter writer = null;//在hdfs的目标位置新建一个文件,得到一个输出流public static void openHdfsFile(String path) throws Exception {FileSystem fs = FileSystem.get(URI.create(path),conf);writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path))));if(null!=writer){logger.info("[HdfsOperate]>> initialize writer succeed!");}}//往hdfs文件中写入数据public static void writeString(String line) {try {writer.write(line + "\n");}catch(Exception e){logger.error("[HdfsOperate]>> writer a line error:"  ,  e);}}//关闭hdfs输出流public static void closeHdfsFile() {try {if (null != writer) {writer.close();logger.info("[HdfsOperate]>> closeHdfsFile close writer succeed!");}else{logger.error("[HdfsOperate]>> closeHdfsFile writer is null");}}catch(Exception e){logger.error("[HdfsOperate]>> closeHdfsFile close hdfs error:" + e);}}}

  先将spark的Rdd重新分区,再将每个分区的数据collectPartitions按行写入hdfs文件中

package com.ys.penspark.util;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;/*** @ClassName: FeatureExtractor* @Description:* @Author: mashiwei* @Date: 2017/6/28*/
public class FeatureExtractor implements Serializable{private static Logger logger = LoggerFactory.getLogger(FeatureExtractor.class);public void extractFeature(Dataset<Row> s, int repartitionNum,String out) throws Exception {StringBuffer sb = new StringBuffer();for (int i = 0; i<= s.schema().fieldNames().length-1;i++) {sb.append(s.schema().fieldNames()[i]);if (i == s.schema().fieldNames().length-1){break;}sb.append(",");}s.show();JavaRDD<String> rddx = s.toJavaRDD().map(new ExtractFeatureMap()).repartition(repartitionNum);//写入hdfs文件位置
//        String destinationPath = "/kettle/penspark/data.txt" ;//创建Hdfs文件,打开Hdfs输出流HdfsOperate.openHdfsFile(out);HdfsOperate.writeString(sb.toString());//分块读取RDD数据并保存到hdfs//如果直接用collect()函数获取List<String>,可能因数据量过大超过内存空间而失败for (int i = 0; i < repartitionNum; i++) {int[] index = new int[1];index[0] = i;
//            List<String>[] featureList = rddx.collectPartitions(index);
//            List<String> strs = rddx.collect();List<String>[] featureList = rddx.collectPartitions(index);if (featureList.length != 1) {logger.error("[FeatureExtractor]>> featureList.length is not 1!");}for (String str : featureList[0]) {//写一行到Hdfs文件logger.info("-----"+str);HdfsOperate.writeString(str);}}//关闭Hdfs输出流HdfsOperate.closeHdfsFile();}class ExtractFeatureMap implements Function<Row, String> {@Overridepublic String call(Row line) throws Exception {try {StringBuffer sb = new StringBuffer();int len = line.length();for (int i = 0; i<= len-1; i++){sb.append(line.get(i).toString());if (i == len-1){break;}sb.append(",");}return sb.toString();} catch (Exception e) {logger.error("[FeatureExtractor]>>GetTokenAndKeywordFeature error:", e);}return null;}}public static void main(String[] args) {//        SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");
//        JavaSparkContext sc= new JavaSparkContext(conf);StructType Schemafinal = new StructType();Map<String,String> options = new HashMap<String,String>();LinkedList<StructField> obj = new LinkedList<StructField>();StructField structField = new StructField("name", DataTypes.StringType, true, Metadata.empty());StructField structField1 = new StructField("age", DataTypes.StringType, true, Metadata.empty());
//        StructField structField2 = new StructField("字段2", DataTypes.StringType, true, Metadata.empty());
//        StructField structField3 = new StructField("字段3", DataTypes.StringType, true, Metadata.empty());obj.add(structField);obj.add(structField1);
//        obj.add(structField2);
//        obj.add(structField3);Schemafinal = new StructType(obj.toArray(new StructField[obj.size()]));SparkConf conf = new SparkConf().setAppName("Example App").setMaster("local[*]");options.put("delimiter",",");options.put("header","true");JavaSparkContext sc = new JavaSparkContext(conf);@SuppressWarnings("deprecation")SQLContext sqlContext = new SQLContext(sc);SparkSession spark = SparkSession.builder().appName("Pentaho Logic as Spark").config("spark.some.config.option", "some-value").config("spark.sql.warehouse.dir", "file:///C:/tmp/").getOrCreate();Dataset<Row> tempdf = spark.read().format("com.databricks.spark.csv").options(options).schema(Schemafinal).option("header", true).load("file:///"+"C:\\Users\\Administrator\\Desktop\\测试\\功能开发\\excel.txt");tempdf.show();FeatureExtractor fx = new FeatureExtractor();try {
//            fx.extractFeature(sc,5);fx.extractFeature(tempdf,2,"/kettle/tempData.txt");} catch (Exception e) {e.printStackTrace();}}
}

  数据

name,age
zs, 44
li, 22
ww, 18

  

转载于:https://www.cnblogs.com/xiaoma0529/p/7090912.html

Spark 把RDD数据保存到hdfs单个文件中,而不是目录相关推荐

  1. Scrapy中将数据保存到Excel和MySQL中

    目录标题 1. Excel 1.1 openpyxl 1.1.1 代码说明 1.1.2 注意 1.2 pandas 1.2.1 代码说明 1.2.2 常见错误 1.3 openpyxl和pandas对 ...

  2. 简单的Http请求数据保存到Hdfs

    使用okhttp工具集来开发:(如果文件已经存在会报错) package com.etl;import java.io.IOException;import org.apache.commons.la ...

  3. Python网络爬虫:爬取CSDN热搜数据 并保存到本地文件中

    hello,大家好,我是wangzirui32,今天我们来学习如何爬取CSDN热搜数据,并保存到Excel表格中. 开始学习吧! 学习目录 1. 数据包抓取 2. 编写代码 1. 数据包抓取 打开CS ...

  4. 把数据保存到cook_将用户信息保存到Cookie中

    /** * 把用户保存到Cookie * * @param request * @param response * @param member */ private void rememberPwdA ...

  5. Pandas的学习(读取mongodb数据库集合到DataFrame,将DataFrame类型数据保存到mongodb数据库中)

    1.读取mongodb数据库集合到DataFrame import pymongo import pandas as pdclient = pymongo.MongoClient("数据库连 ...

  6. html 保存xlsx,HTML SaveXLSX按钮防止将数据保存到SlickGrid的XLSX文件中

    我在网页上有一个SlickGrid,我正在尝试添加一个按钮来调用函数CreateXLSX().当我编辑Main.jade的代码,我输入:HTML SaveXLSX按钮防止将数据保存到SlickGrid ...

  7. C++读取txt数据为二维数组 将数据保存到txt文本中

      C++读取txt数据为二维数组 保存txt文本数据     C++文件读写操作有:ofstream,ifstream,fstream: #include <fstream> ofstr ...

  8. vue 将数据保存到vuex中

    在项目中遇到这样一个问题,就是在登入的时候同时需要从后台获取到左边的导航,但是如果使用h5的localStorage来保存导航信息,会出现app加载进去之后localStorage才保存进浏览器,在m ...

  9. Python中用pandas将numpy中的数组数据保存到csv文件

    Python中用pandas将numpy中的数组数据保存到csv文件 本博客转载自:[1]https://blog.csdn.net/grey_csdn/article/details/7018587 ...

最新文章

  1. MySQL5.6多实例部署
  2. 如何让关键词进入百度相关搜索列表?
  3. textaligncenter仍然不居中_戊唑醇和己唑醇都是杀菌剂,有啥不同?真正懂的人不多...
  4. 康托展开与逆展开(原理+模板)
  5. c语言笔记之数组和指针(初学者)
  6. 奇偶数判断(信息学奥赛一本通-T1041)
  7. NLP之路-Deep Learning in NLP (一)词向量和语言模型
  8. CentOS7配置默认网关
  9. epplus word html,EPPlus简介
  10. Ubuntu最佳字体推荐
  11. 《大型多人在线游戏开发》读书笔记
  12. Android上Excel编辑器,若风excel文件编辑器
  13. 平面设计完全手册_什么是平面设计,做平面设计都要了解哪些基础知识点?
  14. python 批量替换当前.txt文本内容
  15. 软件测试人员必知H5/小程序测试点
  16. TableView的使用
  17. JavaScript实现垃圾分类小游戏教程,附源码!
  18. 心理压力大胃肠容易变弱 注意几点可缓解
  19. oracle 杀掉spid,oracle 存储过程 sid spid 如果sid被杀掉了,spid是不也自动停止了?...
  20. 小程序自定义tabbar,子页面也显示tabbar

热门文章

  1. Spring MVC 解决日期类型动态绑定问题
  2. ISA Server 2006的CARP与NLB的构建
  3. 修复错误配置fstab文件导致系统无法正常启动
  4. DM368启动串口打印分析
  5. 第二届中国云计算应用论坛圆满落幕
  6. 给定二叉树先序、中序遍历序列,求后序遍历
  7. js正则验证手机号码有效性
  8. yii2分页的基本使用及其配置详解
  9. 从Android访问PC端的port (reverse port forwarding)
  10. 初试 webfont