注意:需要提前创建hive接收信息的表--test0617

否则报错:

Caused by: NoSuchObjectException(message:rt1.cr_kafka_t_test0707 table not found)

配置文件

localAgent.sources = skafka
localAgent.sinks = shive
localAgent.channels = k2h

#k2h shive
localAgent.sinks.shive.channel = k2h
#skafka k2h
localAgent.sources.skafka.channels = k2h

#describe the source
localAgent.sources.skafka.type = org.apache.flume.source.kafka.KafkaSource
localAgent.sources.skafka.zookeeperConnect = localhost:2181
localAgent.sources.skafka.batchSize = 1000
localAgent.sources.skafka.batchDurationMillis = 500
localAgent.sources.skafka.kafka.bootstrap.servers = localhost:9092
localAgent.sources.skafka.kafka.topics = mytopic0617
localAgent.sources.skafka.kafka.consumer.group.id = kafka007
localAgent.sources.skafka.kafka.consumer.auto.offset.reset = earliest

#use a channel which buffers events in memory
localAgent.channels.k2h.type = memory
localAgent.channels.k2h.capacity = 1000
localAgent.channels.k2h.transactionCapacity = 1000

#sinks type  hive
localAgent.sinks.shive.type = hive
localAgent.sinks.shive.channel = k2h
localAgent.sinks.shive.hive.metastore = thrift://localhost:9083
localAgent.sinks.shive.hive.database = database1
localAgent.sinks.shive.hive.table = test0617
localAgent.sinks.shive.hive.partition = %Y-%m-%d
localAgent.sinks.shive.batchSize = 500
localAgent.sinks.shive.useLocalTimeStamp = true
localAgent.sinks.shive.round = true
localAgent.sinks.shive.roundValue = 24
localAgent.sinks.shive.roundUnit = hour
localAgent.sinks.shive.serializer = JSON

启动异常:

org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 1000 full, consider committing more frequently, increasing capacity or increasing thread count

解决,入上配置中标红部分修改之后,异常解决。

需要启动hdfs、hive、mysql、zk、kafka、flume,注意启动顺序。

flume读取kafka写hive相关推荐

  1. 大数据开发超高频面试题!大厂面试必看!包含Hadoop、zookeeper、Hive、flume、kafka、Hbase、flink、spark、数仓等

    大数据开发面试题 包含Hadoop.zookeeper.Hive.flume.kafka.Hbase.flink.spark.数仓等高频面试题. 数据来自原博主爬虫获取! 文章目录 大数据开发面试题 ...

  2. 通过Flume简单实现Kafka与Hive对接(Json格式)

    将以下存储在kafka的topic中的JSON格式字符串,对接存储到Hive的表中 {"id":1,"name":"小李"} {" ...

  3. Flume下读取kafka数据后再打把数据输出到kafka,利用拦截器解决topic覆盖问题

    1:如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体表现为,Kafka Source可以正常从指 ...

  4. Flume和Kafka的区别与联系

    同样是流式数据采集框架, flume一般用于日志采集,可以定制很多数据源,减少开发量,基本架构是一个flume进程agent(source.拦截器.选择器.channel<Memory Chan ...

  5. flume消费kafka数据太慢_kafka补充01

    为什么高吞吐? •写数据 –1.页缓存技术 •kafka写出数据时先将数据写到操作系统的pageCache上,由操作系统自己决定什么时候将数据写到磁盘上 –2.磁盘顺序写 •磁盘顺序写的性能会比随机写 ...

  6. Kafka09:【案例】Flume集成Kafka

    一.Flume集成Kafka 在实际工作中flume和kafka会深度结合使用 1:flume采集数据,将数据实时写入kafka 2:flume从kafka中消费数据,保存到hdfs,做数据备份 下面 ...

  7. flume 对接 kafka 报错: Error while fetching metadata with correlation id 35 {=INVALID_TOPIC_EXCEPTION}

    flume 对接 kafka 报错:Error while fetching metadata with correlation id 35 : {=INVALID_TOPIC_EXCEPTION} ...

  8. Kafka实战-Flume到Kafka

    1.概述 前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据.下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览 下面 ...

  9. 《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

    <!-- more --> 前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的 ...

最新文章

  1. MySQL半同步安装以及参数
  2. Equinox P2的学习
  3. Wannafly挑战赛3
  4. 系列笔记 | 深度学习连载(4):优化技巧(上)
  5. 关于WinForm控件在asp.net中应用的问题。
  6. Linux_CentOS 7下Nginx服务器的安装配置
  7. linux 更新目录时间戳,修改linux 文件的时间戳
  8. Android studio 导入 Android 系统源码
  9. PHP Smarty 学习手册
  10. 宾馆管理系统(做了ppt忘记发博客了)
  11. 机器学习----深刻理解高斯过程回归
  12. xor命令,运算优先级,Matlab计算规则(从左至右),舍入误差,取整函数、求余函数和符号函数,mod和rem的区别
  13. 在浪潮服务器NF8460M4上用u盘安装centos8.5报设置基础软件仓库时出错
  14. Qt界面无法切换输入法的解决方法
  15. 人工神经网络的算法原理,人工神经网络算法优点
  16. linux软件管理及软件仓库
  17. 零基础学日语资料附经验分享
  18. 发那科机器人回原位置先上升_机器人小知识 | 发那科机器人位置信息修改介绍...
  19. 1.1[潜心创作]冒险游戏(MineCraft)不喜勿喷
  20. pdf以文件流的形式导出乱码问题解决

热门文章

  1. 【太遗憾】这些杀毒软件你竟不了解?
  2. 关于谷歌浏览器最新版出现的视频下载按钮禁掉解决方案
  3. 转贴:网友line写的火鸟字幕合并器教程
  4. window系统热键冲突检测
  5. vue.js下拉菜单渲染数据
  6. XML+XSL 实例
  7. java触发器时间_Java 时间触发器
  8. PHP使用phpqrcode生成动态二维码
  9. 国际大会演讲ppt_大会演讲,透明公正
  10. e:可以解包多种存档花样的小工具