1.将hdfs-site,core-site.hive-site文件拷贝到resources目录下

image.png

2.添加maven依赖

org.apache.spark

spark-streaming_2.11

2.1.1

org.apache.spark

spark-streaming-kafka-0-10_2.11

2.1.1

org.apache.hive

hive-jdbc

1.2.1

org.apache.hive

hive-service

1.2.1

org.apache.spark

spark-hive_2.11

2.1.1

mysql

mysql-connector-java

5.1.27

3.编写代码

object KafkaDemo {

def main(args: Array[String]): Unit = {

//1.创建 SparkConf 并初始化 SSC

val sparkConf: SparkConf = new SparkConf()

.setMaster("local[*]")

.setAppName("KafkaSparkStreaming")

val ssc = new StreamingContext(sparkConf, Seconds(20))

//2.定义 kafka 参数

val brokers = "s201:9092"

val consumerGroup = "spark"

//3.将 kafka 参数映射为 map

val kafkaParams = Map[String, String](

"bootstrap.servers" -> brokers,

"group.id" -> consumerGroup,

"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",

"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"

)

//要监听的Topic,可以同时监听多个

val topics = Array("student")

//4.通过 KafkaUtil 创建 kafkaDSteam

val dstream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

dstream.foreachRDD(rdd => {

//获取到分区和偏移量信息

val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

val events: RDD[Some[String]] = rdd.map(x => {

val data = x.value()

Some(data)

})

val warehouseLocation = "spark-warehouse"

val spark = SparkSession

.builder()

.appName("Spark Hive Example")

.enableHiveSupport()

.config("spark.sql.warehouse.dir", warehouseLocation)

.config("user.name", "hadoop")

.config("HADOOP_USER_NAME", "hive")

.getOrCreate()

import spark.sql

//配置hive支持动态分区

sql("set hive.exec.dynamic.partition=true")

//配置hive动态分区为非严格模式

sql("set hive.exec.dynamic.partition.mode=nonstrict")

//如果将数据转换为Seq(xxxx),然后倒入隐式转换import session.implicalit._ 是否能实现呢,答案是否定的。

//构建row

val dataRow = events.map(line => {

val temp = line.get.split("###")

Row(temp(0), temp(1), temp(2))

})

//"deviceid","code","time","info","sdkversion","appversion"

//确定字段的类别

val structType = StructType(Array(

StructField("name", StringType, true),

StructField("age", StringType, true),

StructField("major", StringType, true)

))

//构建df

val df = spark.createDataFrame(dataRow, structType)

val unit = df.createOrReplaceTempView("jk_device_info")

val frame = sql("insert into myhive.student select * from jk_device_info")

})

//6.启动 SparkStreaming

ssc.start()

ssc.awaitTermination()

}

}

启动hadoop,zookeeper,kafka

/opt/module/hadoop-2.7.2/sbin/start-dfs.sh

/opt/module/hadoop-2.7.2/sbin/start-yarn.sh

zk.sh start

#! /bin/bash

case $1 in

"start"){

for i in s201 s202 s203

do

ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start"

done

};;

"stop"){

for i in s201 s202 s203

do

ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop"

done

};;

"status"){

for i in s201 s202 s203

do

ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status"

done

};;

esac

kf.sh start

#! /bin/bash

case $1 in

"start"){

for i in s201 s202 s203

do

echo " --------启动 $i Kafka-------"

# 用于KafkaManager监控

ssh $i "export JMX_PORT=9988 && /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/modu

le/kafka/config/server.properties " done

};;

"stop"){

for i in s201 s202 s203

do

echo " --------停止 $i Kafka-------"

ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"

done

};;

esac

kafka发送消息

bin/kafka-console-producer.sh --broker-list s201:9092 --topic student

xiekai###24###ningdu

hive中查看是否是否插入

xiekain 24 ningdu

image.png

插入成功

注意,运行是可能会报HDFS的权限问题,所以需要加入运行时参数

image.png

spark读取oracle写入kafka,spark读取kafka文件写入hive相关推荐

  1. spark2读取oracle工具类,spark读写Oracle、hive的艰辛之路(一)

    前两天工作需求,要通过给的几个Oracle的视图把数据入到hive库中,很遗憾,使用的华为云平台的集区环境中并没有sqoop1,当然也并没有sqoop2,所以,想到的解决方案是使用spark读取Ora ...

  2. python读取oracle数据到hvie parquet_关于sparksql操作hive,读取本地csv文件并以parquet的形式装入hive中...

    说明:spark版本:2.2.0 hive版本:1.2.1 需求: 有本地csv格式的一个文件,格式为${当天日期}visit.txt,例如20180707visit.txt,现在需要将其通过spar ...

  3. c语言 读程序写入结构体,C语言文件写入结构体的乱码解决方法

    C语言中用 fwrite 向文件写入结构体时出现乱码的解决方法,由于fwrite是用于二进制文件的,所以打开直接写入的文件是会出现乱码,这时需要一个中间文件来存储临时数据,就是先将数据存储在一个临时文 ...

  4. ENVI_IDL: 创建HDF5文件并写入数据(以将Geotiff文件写入HDF文件为例) + 详细解析

    目录 1. 学习内容 2. 知识储备 3. 编程 1. 学习内容 文如标题:就是自己创建HDF文件并将数据写入其中 ------------------------------------------ ...

  5. hadoop 写入mysql_使用MapReducer将文件写入mysql 数据库

    自定义类 package DBOutFormat; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib. ...

  6. Python文件(一):文件类型、文件的打开,读取写入,关闭、文件备份、文件和文件夹的操作

    一.文件 文件是存储在存储器上的一组数据序列,可以包含任何数据内容. 文件是数据的抽象和集合. 二.文件类型 文本文件:长字符串 二进制是信息按照非字符但有特定格式形成的文件,文件内部数据的组织格式与 ...

  7. 使用Spark Streaming从kafka中读取数据把数据写入到mysql 实例

    文章目录 一. 题目 题目和数据 二. pom依赖 三.建表语句 四. 连接kafka配置类 五. 自定义分区类 六. 读取数据并发送数据 七. 消费数据,把数据存储到mysql 一. 题目 题目和数 ...

  8. Spark SQL读取Oracle的number类型的数据时精度丢失问题

    Spark SQL读取Oracle的number类型的数据时精度丢失问题 在程序开发中,使用到了sparkSQL读取Oracle数据库,发现当sparkSQL读取Oracle的number类型字段时, ...

  9. spark 集群单词统计_最近Kafka这么火,聊一聊Kafka:Kafka与Spark的集成

    Spark 编程模型 在Spark 中, 我们通过对分布式数据集的操作来表达计算意图 ,这些计算会自动在集群上 井行执行 这样的数据集被称为弹性分布式数据集 Resilient Distributed ...

  10. maven依赖 spark sql_使用Kafka+Spark+Cassandra构建实时处理引擎

    大数据技术与架构点击右侧关注,大数据开发领域最强公众号! 暴走大数据点击右侧关注,暴走大数据! Apache Kafka 是一个可扩展,高性能,低延迟的平台,允许我们像消息系统一样读取和写入数据.我们 ...

最新文章

  1. lvm 逻辑卷 和 网络管理
  2. iphone微信美颜插件_Cydia插件推荐
  3. mysql分区字段创建索引_MySQL分区字段列有必要再单独建索引吗?
  4. linux下设置mysql不区分大小写
  5. PPT 下载 | 神策数据朱德康:用户中台建设实践解析
  6. ajax如何将数据写入文本框,ajax 从数据库读到文本框
  7. ACL 2020 | 多编码器是否能够捕获篇章级信息?
  8. c语言两个字符串比较,将两个字符串s1和s2比较,如果s1s2,数组编程:将2个字符串s1和s2比较。若s1s2输出1;若s1=s2,输出0;若s1s2,输出-1(不能用strcmp函数)...
  9. Java加密与解密的艺术~SM4实现
  10. ubuntu下wps不能输入中文
  11. vue3.0中使用Element-plus默认英文组件修改为中文
  12. 程序员常用编程工具: VS Code,那些你不得不知道的小知识!
  13. map.addoverlay php,覆盖物 - 百度地图开发文档 - php中文网手册
  14. 他人笑我太疯癫 我笑他人看不穿
  15. DataGrid 绑定定制的列
  16. MTK如何烧录IMEI码(俗称串号)
  17. 2.2 matlab矩阵变换(对角阵、三角阵、矩阵的转置、矩阵的旋转、矩阵的翻转和矩阵求逆)
  18. 基于Element UI Calendar实现日程提醒功能
  19. 人生进度条百分之20_1分钟get技能:缺了“进度条”,你注定和80%的失败者一样实现不了人生目标...
  20. 深入浅出Zookeeper集群搭建

热门文章

  1. 2021年低压电工试题及解析及低压电工模拟考试题
  2. 使用ceilometer与gnocchi执行gnocchi metric list相关命令时遇到AttributeError: _Environ instance has no attribute ‘
  3. Canny算子与霍夫变换检测圆与直线
  4. CiA402电机控制代码
  5. 视频文件太大怎样压缩?试试这个方法可以压缩视频文件
  6. SpringBoot中就绪探针和存活探针
  7. OC有符号16进制整形转10进制整形
  8. javascript 10进制和64进制的转换
  9. android默认安装位置,修改Android手机的软件默认安装位置
  10. 怎么把动图放到word里_WORD中如何插入动态图片