声明:本系列博客是根据SGG的视频整理而成,非常适合大家入门学习。

《2021年最新版大数据面试题全面开启更新》

版本说明:

  1. Flink 1.11.2
  2. Kafka 2.4.0
  3. Hive 3.1.2
  4. Hadoop 3.1.3

1 hive

安装hive,使用mysql做为元数据存储

1.2 hive-site.xml 配置 (版本3.1.2)

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration><property><name>javax.jdo.option.ConnectionURL</name><value>jdbc:mysql://hadoop102:3306/metastore?createDatabaseIfNotExist=true</value><description>JDBC connect string for a JDBC metastore</description></property><property><name>javax.jdo.option.ConnectionDriverName</name><value>com.mysql.cj.jdbc.Driver</value><description>Driver class name for a JDBC metastore</description></property><property><name>javax.jdo.option.ConnectionUserName</name><value>root</value><description>username to use against metastore database</description></property><property><name>javax.jdo.option.ConnectionPassword</name><value>123456</value><description>password to use against metastore database</description></property><property><name>hive.metastore.warehouse.dir</name><value>/user/hive/warehouse</value><description>location of default database for the warehouse</description></property><property><name>hive.cli.print.header</name><value>true</value></property><property><name>hive.cli.print.current.db</name><value>true</value></property><property><name>hive.cli.print.current.db</name><value>true</value></property><property><name>hive.metastore.schema.verification</name><value>false</value></property><property><name>hive.server2.thrift.bind.host</name><value>192.168.1.122</value></property><property><name>hive.metastore.event.db.notification.api.auth</name><value>false</value></property><property><name>datanucleus.schema.autoCreateAll</name><value>true</value></property>    <property><name>hive.metastore.uris</name><value>thrift://localhost:9083</value> <!-- metastore 在的pc的ip--></property>
</configuration>

2 flink(版本1.10.2) 

2.1 配置conf/sql-client-hive.yaml

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################# This file defines the default environment for Flink's SQL Client.
# Defaults might be overwritten by a session specific environment.# See the Table API & SQL documentation for details about supported properties.#==============================================================================
# Tables
#==============================================================================# Define tables here such as sources, sinks, views, or temporal tables.#tables: [] # empty list
# A typical table source definition looks like:
# - name: ...
#   type: source-table
#   connector: ...
#   format: ...
#   schema: ...# A typical view definition looks like:
# - name: ...
#   type: view
#   query: "SELECT ..."# A typical temporal table definition looks like:
# - name: ...
#   type: temporal-table
#   history-table: ...
#   time-attribute: ...
#   primary-key: ...#==============================================================================
# User-defined functions
#==============================================================================# Define scalar, aggregate, or table functions here.#functions: [] # empty list
# A typical function definition looks like:
# - name: ...
#   from: class
#   class: ...
#   constructor: ...#==============================================================================
# Catalogs
#==============================================================================# Define catalogs here.catalogs: # empty list
# A typical catalog definition looks like:- name: myhive # 名字随意取type: hive hive-conf-dir: /opt/module/hive/conf # hive-site.xml 所在的路径
#    default-database: ...#==============================================================================
# Modules
#==============================================================================# Define modules here.#modules: # note the following modules will be of the order they are specified
#  - name: core
#    type: core#==============================================================================
# Execution properties
#==============================================================================# Properties that change the fundamental execution behavior of a table program.execution:# select the implementation responsible for planning table programs# possible values are 'blink' (used by default) or 'old'
  planner: blink# 'batch' or 'streaming' executiontype: streaming# allow 'event-time' or only 'processing-time' in sourcestime-characteristic: event-time# interval in ms for emitting periodic watermarksperiodic-watermarks-interval: 200# 'changelog' or 'table' presentation of resultsresult-mode: table# maximum number of maintained rows in 'table' presentation of resultsmax-table-result-rows: 1000000# parallelism of the programparallelism: 1# maximum parallelismmax-parallelism: 128# minimum idle state retention in msmin-idle-state-retention: 0# maximum idle state retention in msmax-idle-state-retention: 0# current catalog ('default_catalog' by default)current-catalog: myhive# current database of the current catalog (default database of the catalog by default)current-database: hive# controls how table programs are restarted in case of a failuresrestart-strategy:# strategy type# possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default)type: fallback#==============================================================================
# Configuration options
#==============================================================================# Configuration options for adjusting and tuning table programs.# A full list of options and their default values can be found
# on the dedicated "Configuration" web page.# A configuration can look like:
# configuration:
#   table.exec.spill-compression.enabled: true
#   table.exec.spill-compression.block-size: 128kb
#   table.optimizer.join-reorder-enabled: true#==============================================================================
# Deployment properties
#==============================================================================# Properties that describe the cluster to which table programs are submitted to.deployment:# general cluster communication timeout in msresponse-timeout: 5000# (optional) address from cluster to gatewaygateway-address: ""# (optional) port from cluster to gatewaygateway-port: 0

2.2 配置jar包

 

/flink-1.10.2/lib// Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jarsflink-connector-hive_2.11-1.10.2.jar// Hadoop dependencies// You can pick a pre-built Hadoop uber jar provided by Flink, alternatively// you can use your own hadoop jars. Either way, make sure it's compatible with your Hadoop// cluster and the Hive version you're using.flink-shaded-hadoop-2-uber-2.7.5-8.0.jar// Hive dependencieshive-exec-2.3.4.jar
       hive-metastore-3.1.2.jar
    libfb303-0.9.3.jar
       // kafka dependenciesflink-sql-connector-kafka_2.11-1.11.2.jar

后三个JAR包都是Hive自带的,可以在${HIVE_HOME}/lib目录下找到。前两个可以通过阿里云Maven搜索GAV找到并手动下载(groupId都是org.apache.flink)。

注意:要将lib包分发到集群中其他flink机器上

3 启动

3.1 启动hadoop集群

省略。。。

3.2 启动Hive meatastore

hive --service metastore &

3.3 启动Flink

$FLINK_HOME/bin/start-cluster.sh

3.4 启动 Flink SQL Client

atguigu@hadoop102:/opt/module/flink$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml -l lib/

3.5 在Flink SQL Client中创建Hive表,指定数据源为Kafka

CREATE TABLE student(id INT,name STRING,password STRING,age INT,ts BIGINT,eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND -- 水印
) WITH ('connector.type' = 'kafka','connector.version' = 'universal', -- 指定Kafka连接器版本,不能为2.4.0,必须为universal,否则会报错'connector.topic' = 'student', -- 指定消费的topic'connector.startup-mode' = 'latest-offset', -- 指定起始offset位置'connector.properties.zookeeper.connect' = 'hadoop000:2181','connector.properties.bootstrap.servers' = 'hadooop000:9092','connector.properties.group.id' = 'student_1','format.type' = 'json','format.derive-schema' = 'true', -- 由表schema自动推导解析JSON'update-mode' = 'append'
);

3.6 启动Kafka,发送数据

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list hadoop000:9092 --topic student
{"id":12, "name":"kevin", "password":"wong", "age":22, "ts":1603769073}

3.7 通过Flink SQL Client查询表中的数据

select * from student

参考:https://blog.csdn.net/hll19950830/article/details/109308055

错误参考:

java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

https://blog.csdn.net/qq_31866793/article/details/107487858

Flink实战(八十):flink-sql使用(七)Flink SQL Clien读取Kafka数据流式写入Hive(用hive 管理kafka元数据)相关推荐

  1. 2021年大数据Flink(三十六):​​​​​​​Table与SQL ​​​​​​案例三

    目录 案例三 需求 编码步骤 代码实现-方式1 代码实现-方式2 案例三 需求 使用Flink SQL来统计5秒内 每个用户的 订单总数.订单的最大金额.订单的最小金额 也就是每隔5秒统计最近5秒的每 ...

  2. 2021年大数据Flink(三十五):​​​​​​​Table与SQL ​​​​​​案例二

    目录 案例二 需求 代码实现-SQL 代码实现-Table 案例二 需求 使用SQL和Table两种方式对DataStream中的单词进行统计 代码实现-SQL package cn.it.sql;i ...

  3. 2021年大数据Flink(三十四):​​​​​​​Table与SQL ​​​​​​案例一

    ​​​​​​ 目录 ​​​​​​案例一 需求 代码实现 案例一 需求 将DataStream注册为Table和View并进行SQL统计 代码实现 package cn.it.sql;import lo ...

  4. 2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API

    目录 API 获取环境 创建表 查询表 Table API SQL ​​​​​​​写出表 ​​​​​​​与DataSet/DataStream集成 ​​​​​​​TableAPI ​​​​​​​SQL ...

  5. 2021年大数据Flink(三十九):​​​​​​​Table与SQL ​​​​​​总结 Flink-SQL常用算子

    目录 总结 Flink-SQL常用算子 SELECT WHERE ​​​​​​​DISTINCT ​​​​​​​GROUP BY ​​​​​​​UNION 和 UNION ALL ​​​​​​​JOI ...

  6. 第十三章 使用动态SQL(七)

    文章目录 第十三章 使用动态SQL(七) SQL元数据 语句类型元数据 选择项目Select-item元数据 Query参数元数据 Query结果集元数据 %SQL.StatementResult属性 ...

  7. 2021年大数据Flink(三十八):​​​​​​​Table与SQL ​​​​​​案例五 FlinkSQL整合Hive

    目录 案例五 FlinkSQL整合Hive 介绍 集成Hive的基本方式 准备工作 1.添加hadoop_classpath 2.下载jar并上传至flink/lib目录 3.修改hive配置 4.启 ...

  8. Flink实战(八十五):flink-sql使用(十二)Flink 与 hive 结合使用(四)Hive Read Write

    0 简介 Using the HiveCatalog and Flink's connector to Hive, Flink can read and write from Hive data as ...

  9. 2021年大数据Flink(三十):Flink ​​​​​​​Table API  SQL 介绍

    目录 ​​​​​​​Table API & SQL 介绍 为什么需要Table API & SQL ​​​​​​​Table API& SQL发展历程 架构升级 查询处理器的选 ...

最新文章

  1. python绘制盖尔圆并做特征值的隔离
  2. XML(eXtensible Markup Language)文件的解析
  3. python表单防重复提交_防止表单重复提交的几种策略
  4. java多线程提高性能写法
  5. Ajax的用法之JQuery
  6. django 使用Ajax方式POST JSON数据包
  7. SQL Server中的“描述表”等效什么?
  8. python利用pygame框架实现类似大鱼吃小鱼游戏 (松鼠吃松鼠) code from making games with python and pygame
  9. uniapp 根据经纬度逆转地理地质
  10. C# 引用访问权限,很多老手都不懂
  11. sql server的linux版命令行,Linux配置SQLServer
  12. html图片的自动轮播js,js实现轮播图效果 纯js实现图片自动切换
  13. 20220326-代码日记-Unity画符
  14. Excel怎么将一列数据合并到一个单元格中
  15. 关于心跳 Heartbeat
  16. python_pdf常规使用
  17. 楼氏电子推出Raspberry Pi 开发工具包,为新物联网应用和行业语音整合提供支持
  18. HMM(隐马尔可夫)笔记
  19. 计算机磁盘密码解锁,Bitlocker加密磁盘但是没有密码加密,我的电脑是专业版
  20. 【Python】动量策略回测(日内高频数据)

热门文章

  1. ROS的GPS驱动包
  2. lisp绘制直齿圆柱齿轮_autolisp齿轮绘制
  3. 你以为ChatGPT真的听懂了你的话吗?
  4. 生鲜水果商品商城静态网站,vue+elementui简单实现
  5. 【产品工作方法】产品需求调研
  6. 从文档导入单号进行物流查询
  7. 高度塌陷问题和解决高度塌陷问题
  8. 基于Zkoss框架的安全态势感知平台Demo
  9. 软件使用-历史足迹-02
  10. java 实现的excel数据导入及导入模板下载