spark读取oracle写入kafka,spark读取kafka文件写入hive
1.将hdfs-site,core-site.hive-site文件拷贝到resources目录下
image.png
2.添加maven依赖
org.apache.spark
spark-streaming_2.11
2.1.1
org.apache.spark
spark-streaming-kafka-0-10_2.11
2.1.1
org.apache.hive
hive-jdbc
1.2.1
org.apache.hive
hive-service
1.2.1
org.apache.spark
spark-hive_2.11
2.1.1
mysql
mysql-connector-java
5.1.27
3.编写代码
object KafkaDemo {
def main(args: Array[String]): Unit = {
//1.创建 SparkConf 并初始化 SSC
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("KafkaSparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(20))
//2.定义 kafka 参数
val brokers = "s201:9092"
val consumerGroup = "spark"
//3.将 kafka 参数映射为 map
val kafkaParams = Map[String, String](
"bootstrap.servers" -> brokers,
"group.id" -> consumerGroup,
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
//要监听的Topic,可以同时监听多个
val topics = Array("student")
//4.通过 KafkaUtil 创建 kafkaDSteam
val dstream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
dstream.foreachRDD(rdd => {
//获取到分区和偏移量信息
val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val events: RDD[Some[String]] = rdd.map(x => {
val data = x.value()
Some(data)
})
val warehouseLocation = "spark-warehouse"
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.enableHiveSupport()
.config("spark.sql.warehouse.dir", warehouseLocation)
.config("user.name", "hadoop")
.config("HADOOP_USER_NAME", "hive")
.getOrCreate()
import spark.sql
//配置hive支持动态分区
sql("set hive.exec.dynamic.partition=true")
//配置hive动态分区为非严格模式
sql("set hive.exec.dynamic.partition.mode=nonstrict")
//如果将数据转换为Seq(xxxx),然后倒入隐式转换import session.implicalit._ 是否能实现呢,答案是否定的。
//构建row
val dataRow = events.map(line => {
val temp = line.get.split("###")
Row(temp(0), temp(1), temp(2))
})
//"deviceid","code","time","info","sdkversion","appversion"
//确定字段的类别
val structType = StructType(Array(
StructField("name", StringType, true),
StructField("age", StringType, true),
StructField("major", StringType, true)
))
//构建df
val df = spark.createDataFrame(dataRow, structType)
val unit = df.createOrReplaceTempView("jk_device_info")
val frame = sql("insert into myhive.student select * from jk_device_info")
})
//6.启动 SparkStreaming
ssc.start()
ssc.awaitTermination()
}
}
启动hadoop,zookeeper,kafka
/opt/module/hadoop-2.7.2/sbin/start-dfs.sh
/opt/module/hadoop-2.7.2/sbin/start-yarn.sh
zk.sh start
#! /bin/bash
case $1 in
"start"){
for i in s201 s202 s203
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start"
done
};;
"stop"){
for i in s201 s202 s203
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop"
done
};;
"status"){
for i in s201 s202 s203
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status"
done
};;
esac
kf.sh start
#! /bin/bash
case $1 in
"start"){
for i in s201 s202 s203
do
echo " --------启动 $i Kafka-------"
# 用于KafkaManager监控
ssh $i "export JMX_PORT=9988 && /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/modu
le/kafka/config/server.properties " done
};;
"stop"){
for i in s201 s202 s203
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
done
};;
esac
kafka发送消息
bin/kafka-console-producer.sh --broker-list s201:9092 --topic student
xiekai###24###ningdu
hive中查看是否是否插入
xiekain 24 ningdu
image.png
插入成功
注意,运行是可能会报HDFS的权限问题,所以需要加入运行时参数
image.png
spark读取oracle写入kafka,spark读取kafka文件写入hive相关推荐
- spark2读取oracle工具类,spark读写Oracle、hive的艰辛之路(一)
前两天工作需求,要通过给的几个Oracle的视图把数据入到hive库中,很遗憾,使用的华为云平台的集区环境中并没有sqoop1,当然也并没有sqoop2,所以,想到的解决方案是使用spark读取Ora ...
- python读取oracle数据到hvie parquet_关于sparksql操作hive,读取本地csv文件并以parquet的形式装入hive中...
说明:spark版本:2.2.0 hive版本:1.2.1 需求: 有本地csv格式的一个文件,格式为${当天日期}visit.txt,例如20180707visit.txt,现在需要将其通过spar ...
- c语言 读程序写入结构体,C语言文件写入结构体的乱码解决方法
C语言中用 fwrite 向文件写入结构体时出现乱码的解决方法,由于fwrite是用于二进制文件的,所以打开直接写入的文件是会出现乱码,这时需要一个中间文件来存储临时数据,就是先将数据存储在一个临时文 ...
- ENVI_IDL: 创建HDF5文件并写入数据(以将Geotiff文件写入HDF文件为例) + 详细解析
目录 1. 学习内容 2. 知识储备 3. 编程 1. 学习内容 文如标题:就是自己创建HDF文件并将数据写入其中 ------------------------------------------ ...
- hadoop 写入mysql_使用MapReducer将文件写入mysql 数据库
自定义类 package DBOutFormat; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib. ...
- Python文件(一):文件类型、文件的打开,读取写入,关闭、文件备份、文件和文件夹的操作
一.文件 文件是存储在存储器上的一组数据序列,可以包含任何数据内容. 文件是数据的抽象和集合. 二.文件类型 文本文件:长字符串 二进制是信息按照非字符但有特定格式形成的文件,文件内部数据的组织格式与 ...
- 使用Spark Streaming从kafka中读取数据把数据写入到mysql 实例
文章目录 一. 题目 题目和数据 二. pom依赖 三.建表语句 四. 连接kafka配置类 五. 自定义分区类 六. 读取数据并发送数据 七. 消费数据,把数据存储到mysql 一. 题目 题目和数 ...
- Spark SQL读取Oracle的number类型的数据时精度丢失问题
Spark SQL读取Oracle的number类型的数据时精度丢失问题 在程序开发中,使用到了sparkSQL读取Oracle数据库,发现当sparkSQL读取Oracle的number类型字段时, ...
- spark 集群单词统计_最近Kafka这么火,聊一聊Kafka:Kafka与Spark的集成
Spark 编程模型 在Spark 中, 我们通过对分布式数据集的操作来表达计算意图 ,这些计算会自动在集群上 井行执行 这样的数据集被称为弹性分布式数据集 Resilient Distributed ...
- maven依赖 spark sql_使用Kafka+Spark+Cassandra构建实时处理引擎
大数据技术与架构点击右侧关注,大数据开发领域最强公众号! 暴走大数据点击右侧关注,暴走大数据! Apache Kafka 是一个可扩展,高性能,低延迟的平台,允许我们像消息系统一样读取和写入数据.我们 ...
最新文章
- lvm 逻辑卷 和 网络管理
- iphone微信美颜插件_Cydia插件推荐
- mysql分区字段创建索引_MySQL分区字段列有必要再单独建索引吗?
- linux下设置mysql不区分大小写
- PPT 下载 | 神策数据朱德康:用户中台建设实践解析
- ajax如何将数据写入文本框,ajax 从数据库读到文本框
- ACL 2020 | 多编码器是否能够捕获篇章级信息?
- c语言两个字符串比较,将两个字符串s1和s2比较,如果s1s2,数组编程:将2个字符串s1和s2比较。若s1s2输出1;若s1=s2,输出0;若s1s2,输出-1(不能用strcmp函数)...
- Java加密与解密的艺术~SM4实现
- ubuntu下wps不能输入中文
- vue3.0中使用Element-plus默认英文组件修改为中文
- 程序员常用编程工具: VS Code,那些你不得不知道的小知识!
- map.addoverlay php,覆盖物 - 百度地图开发文档 - php中文网手册
- 他人笑我太疯癫 我笑他人看不穿
- DataGrid 绑定定制的列
- MTK如何烧录IMEI码(俗称串号)
- 2.2 matlab矩阵变换(对角阵、三角阵、矩阵的转置、矩阵的旋转、矩阵的翻转和矩阵求逆)
- 基于Element UI Calendar实现日程提醒功能
- 人生进度条百分之20_1分钟get技能:缺了“进度条”,你注定和80%的失败者一样实现不了人生目标...
- 深入浅出Zookeeper集群搭建
热门文章
- 2021年低压电工试题及解析及低压电工模拟考试题
- 使用ceilometer与gnocchi执行gnocchi metric list相关命令时遇到AttributeError: _Environ instance has no attribute ‘
- Canny算子与霍夫变换检测圆与直线
- CiA402电机控制代码
- 视频文件太大怎样压缩?试试这个方法可以压缩视频文件
- SpringBoot中就绪探针和存活探针
- OC有符号16进制整形转10进制整形
- javascript 10进制和64进制的转换
- android默认安装位置,修改Android手机的软件默认安装位置
- 怎么把动图放到word里_WORD中如何插入动态图片