Spark 把RDD数据保存到hdfs单个文件中,而不是目录
相比于Hadoop,Spark在数据的处理方面更加灵活方便。然而在最近的使用中遇到了一点小麻烦:Spark保存文件的的函数(如saveAsTextFile)在保存数据时都需要新建一个目录,然后在这个目录下分块保存文件。如果我们想在原有的目录下增加一个文件(而不是增加一个目录)
rddx.repartition(1).saveAsTextFile("test/test.txt")
rddx.coalesce(1).saveAsTextFile("test/test.txt")
把分区设置成1个 结果是Spark仍然是新建了一个目录test.txt,然后在这个目录下把数据都保存在了part-00000文件中
问题:如何让spark将Rdd结果输出到一个文件而不是目录中呢?
Spark的保存模式的设定注定了在保存数据的时候只能新建目录,如果想把数据增加到原有的目录中,单独作为一个文件,就只能借助于hadoop的HDFS操作。下面的例子演示如何用Hadoop的FileSystem实现在已有目录下用一个文件保存Spark数据:
package com.ys.penspark.util;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.net.URI;/*** @ClassName: HdfsOperate* @Description:* @Author: Administrator* @Date: 2017/6/28*/
public class HdfsOperate implements Serializable {private static Logger logger = LoggerFactory.getLogger(HdfsOperate.class);private static Configuration conf = new Configuration();private static BufferedWriter writer = null;//在hdfs的目标位置新建一个文件,得到一个输出流public static void openHdfsFile(String path) throws Exception {FileSystem fs = FileSystem.get(URI.create(path),conf);writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path))));if(null!=writer){logger.info("[HdfsOperate]>> initialize writer succeed!");}}//往hdfs文件中写入数据public static void writeString(String line) {try {writer.write(line + "\n");}catch(Exception e){logger.error("[HdfsOperate]>> writer a line error:" , e);}}//关闭hdfs输出流public static void closeHdfsFile() {try {if (null != writer) {writer.close();logger.info("[HdfsOperate]>> closeHdfsFile close writer succeed!");}else{logger.error("[HdfsOperate]>> closeHdfsFile writer is null");}}catch(Exception e){logger.error("[HdfsOperate]>> closeHdfsFile close hdfs error:" + e);}}}
先将spark的Rdd重新分区,再将每个分区的数据collectPartitions按行写入hdfs文件中
package com.ys.penspark.util;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;/*** @ClassName: FeatureExtractor* @Description:* @Author: mashiwei* @Date: 2017/6/28*/
public class FeatureExtractor implements Serializable{private static Logger logger = LoggerFactory.getLogger(FeatureExtractor.class);public void extractFeature(Dataset<Row> s, int repartitionNum,String out) throws Exception {StringBuffer sb = new StringBuffer();for (int i = 0; i<= s.schema().fieldNames().length-1;i++) {sb.append(s.schema().fieldNames()[i]);if (i == s.schema().fieldNames().length-1){break;}sb.append(",");}s.show();JavaRDD<String> rddx = s.toJavaRDD().map(new ExtractFeatureMap()).repartition(repartitionNum);//写入hdfs文件位置
// String destinationPath = "/kettle/penspark/data.txt" ;//创建Hdfs文件,打开Hdfs输出流HdfsOperate.openHdfsFile(out);HdfsOperate.writeString(sb.toString());//分块读取RDD数据并保存到hdfs//如果直接用collect()函数获取List<String>,可能因数据量过大超过内存空间而失败for (int i = 0; i < repartitionNum; i++) {int[] index = new int[1];index[0] = i;
// List<String>[] featureList = rddx.collectPartitions(index);
// List<String> strs = rddx.collect();List<String>[] featureList = rddx.collectPartitions(index);if (featureList.length != 1) {logger.error("[FeatureExtractor]>> featureList.length is not 1!");}for (String str : featureList[0]) {//写一行到Hdfs文件logger.info("-----"+str);HdfsOperate.writeString(str);}}//关闭Hdfs输出流HdfsOperate.closeHdfsFile();}class ExtractFeatureMap implements Function<Row, String> {@Overridepublic String call(Row line) throws Exception {try {StringBuffer sb = new StringBuffer();int len = line.length();for (int i = 0; i<= len-1; i++){sb.append(line.get(i).toString());if (i == len-1){break;}sb.append(",");}return sb.toString();} catch (Exception e) {logger.error("[FeatureExtractor]>>GetTokenAndKeywordFeature error:", e);}return null;}}public static void main(String[] args) {// SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");
// JavaSparkContext sc= new JavaSparkContext(conf);StructType Schemafinal = new StructType();Map<String,String> options = new HashMap<String,String>();LinkedList<StructField> obj = new LinkedList<StructField>();StructField structField = new StructField("name", DataTypes.StringType, true, Metadata.empty());StructField structField1 = new StructField("age", DataTypes.StringType, true, Metadata.empty());
// StructField structField2 = new StructField("字段2", DataTypes.StringType, true, Metadata.empty());
// StructField structField3 = new StructField("字段3", DataTypes.StringType, true, Metadata.empty());obj.add(structField);obj.add(structField1);
// obj.add(structField2);
// obj.add(structField3);Schemafinal = new StructType(obj.toArray(new StructField[obj.size()]));SparkConf conf = new SparkConf().setAppName("Example App").setMaster("local[*]");options.put("delimiter",",");options.put("header","true");JavaSparkContext sc = new JavaSparkContext(conf);@SuppressWarnings("deprecation")SQLContext sqlContext = new SQLContext(sc);SparkSession spark = SparkSession.builder().appName("Pentaho Logic as Spark").config("spark.some.config.option", "some-value").config("spark.sql.warehouse.dir", "file:///C:/tmp/").getOrCreate();Dataset<Row> tempdf = spark.read().format("com.databricks.spark.csv").options(options).schema(Schemafinal).option("header", true).load("file:///"+"C:\\Users\\Administrator\\Desktop\\测试\\功能开发\\excel.txt");tempdf.show();FeatureExtractor fx = new FeatureExtractor();try {
// fx.extractFeature(sc,5);fx.extractFeature(tempdf,2,"/kettle/tempData.txt");} catch (Exception e) {e.printStackTrace();}}
}
数据
name,age
zs, 44
li, 22
ww, 18
转载于:https://www.cnblogs.com/xiaoma0529/p/7090912.html
Spark 把RDD数据保存到hdfs单个文件中,而不是目录相关推荐
- Scrapy中将数据保存到Excel和MySQL中
目录标题 1. Excel 1.1 openpyxl 1.1.1 代码说明 1.1.2 注意 1.2 pandas 1.2.1 代码说明 1.2.2 常见错误 1.3 openpyxl和pandas对 ...
- 简单的Http请求数据保存到Hdfs
使用okhttp工具集来开发:(如果文件已经存在会报错) package com.etl;import java.io.IOException;import org.apache.commons.la ...
- Python网络爬虫:爬取CSDN热搜数据 并保存到本地文件中
hello,大家好,我是wangzirui32,今天我们来学习如何爬取CSDN热搜数据,并保存到Excel表格中. 开始学习吧! 学习目录 1. 数据包抓取 2. 编写代码 1. 数据包抓取 打开CS ...
- 把数据保存到cook_将用户信息保存到Cookie中
/** * 把用户保存到Cookie * * @param request * @param response * @param member */ private void rememberPwdA ...
- Pandas的学习(读取mongodb数据库集合到DataFrame,将DataFrame类型数据保存到mongodb数据库中)
1.读取mongodb数据库集合到DataFrame import pymongo import pandas as pdclient = pymongo.MongoClient("数据库连 ...
- html 保存xlsx,HTML SaveXLSX按钮防止将数据保存到SlickGrid的XLSX文件中
我在网页上有一个SlickGrid,我正在尝试添加一个按钮来调用函数CreateXLSX().当我编辑Main.jade的代码,我输入:HTML SaveXLSX按钮防止将数据保存到SlickGrid ...
- C++读取txt数据为二维数组 将数据保存到txt文本中
C++读取txt数据为二维数组 保存txt文本数据 C++文件读写操作有:ofstream,ifstream,fstream: #include <fstream> ofstr ...
- vue 将数据保存到vuex中
在项目中遇到这样一个问题,就是在登入的时候同时需要从后台获取到左边的导航,但是如果使用h5的localStorage来保存导航信息,会出现app加载进去之后localStorage才保存进浏览器,在m ...
- Python中用pandas将numpy中的数组数据保存到csv文件
Python中用pandas将numpy中的数组数据保存到csv文件 本博客转载自:[1]https://blog.csdn.net/grey_csdn/article/details/7018587 ...
最新文章
- MySQL5.6多实例部署
- 如何让关键词进入百度相关搜索列表?
- textaligncenter仍然不居中_戊唑醇和己唑醇都是杀菌剂,有啥不同?真正懂的人不多...
- 康托展开与逆展开(原理+模板)
- c语言笔记之数组和指针(初学者)
- 奇偶数判断(信息学奥赛一本通-T1041)
- NLP之路-Deep Learning in NLP (一)词向量和语言模型
- CentOS7配置默认网关
- epplus word html,EPPlus简介
- Ubuntu最佳字体推荐
- 《大型多人在线游戏开发》读书笔记
- Android上Excel编辑器,若风excel文件编辑器
- 平面设计完全手册_什么是平面设计,做平面设计都要了解哪些基础知识点?
- python 批量替换当前.txt文本内容
- 软件测试人员必知H5/小程序测试点
- TableView的使用
- JavaScript实现垃圾分类小游戏教程,附源码!
- 心理压力大胃肠容易变弱 注意几点可缓解
- oracle 杀掉spid,oracle 存储过程 sid spid 如果sid被杀掉了,spid是不也自动停止了?...
- 小程序自定义tabbar,子页面也显示tabbar