设计管道
假设我们有一个简单的场景:事件正在流向Kafka,我们希望使用管道中的事件,进行一些转换并将结果写入BigQuery表,以使数据可用于分析。

可以在作业开始之前创建BigQuery表,或者Beam本身可以创建它。

代码看起来很简单:

EventsProcessingOptions options = PipelineOptionsFactory。fromArgs(args)。withValidation()

                。作为(EventsProcessingOptions。类);

管道 p = 管道。创造(选项);
PCollection tableRows =

              //阅读kafka主题p。apply(“kafka-topic-read”,kafkaReader)。申请(“海边的卡夫卡值”,MapElements。到(TypeDescriptors。字符串())。通过(记录 - > 记录。getKV。()的getValue()))//将值转换为JsonNode。申请(“字符串到JSON” ,ParseJsons。的(JsonNode。类))//创建TableRow。申请(“建设-表行”,帕尔多。的(新 EventsRowFn()))//将表格行保存到BigQuery。apply(“BQ-write”,BigQueryIO。< TableRowWithEvent > write()。到(tableSpec)。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_NEVER)。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND);

少了什么东西?
在现实世界中,可能会发生错误,在大多数情况下,我们将需要处理它们。

在上面的管道中,当我们尝试将事件从Kafka解析为JsonNode,转换期间以及BigQuery插入阶段时,可能会发生错误。

错误处理计划
对于每个错误,我们将在不同的BigQuery表中创建一行,其中包含更多信息,例如来自Kafka的origin事件。

一旦发生错误,我们就可以分析错误记录并全面了解它。

然后,我们可以修复管道代码,重置/更改Kafka使用者组偏移,并再次使用固定代码重播事件。

我们还可以修复事件本身(例如,在JSON解析错误中)并将其重新发送到Kafka。

处理转换错误
让我们快速浏览一下我们的转换函数:

@ProcessElement
public void processElement(@Element JsonNode > element,OutputReceiver < TableRowWithEvent > out){
TableRow convertedRow = new TableRow();
insertLong(元件。得到(“server_time” ),“server_time” ,convertedRow);
insertFloat(元件。得到(“screen_dpi” ),“screen_dpi” ,convertedRow);
//更多转变来
背景。输出(输出);
}
private void insertLong(JsonNode value,String key,TableRow convertedRow){

    String  valueToInsert  =  value。asText();如果(valueToInsert  !=  空 &&  !valueToInsert。的isEmpty()){long  longValue  =  Long。parseLong(valueToInsert);convertedRow。set(key,longValue);}

}
private void insertFloat(JsonNode value,String key,TableRow convertedRow){

    String  valueToInsert  =  getStringValue(value);if(valueToInsert  !=  null){float  floatValue  =  Float。parseFloat(valueToInsert);convertedRow。set(key,floatValue);}

}
是的,我们可能在解析过程中失败,因为我们将字符串解析为Float / Long,并且这对无法转换的数据失败。

我们需要从主函数输出中排除失败的数据并将这些数据发送到管道中的不同路径,然后我们将它们保存到BigQuery中的错误表中。

怎么样?让我们使用标签
当我们在ParDo 函数末尾输出一个元素时 ,我们可以在一个标签内输出它。然后我们可以获取所有标记为特定名称的元素,并对它们执行一些处理。

这里我们将使用两个标签,一个是MAIN标签,它包含所有成功的记录,另一个包含所有错误和一些上下文,例如 DEADLETTER_OUT。

该主标记必须与ParDo 函数本身的OUTPUT类型相同,并且 所有其他标记可以是不同类型。

现在,我们的 ParDo 函数将如下所示(注意标记添加):

@ProcessElement
public void processElement(@Element JsonNode > element,OutputReceiver < TableRowWithEvent > out){
public static final TupleTag < JsonNode > MAIN_OUT = new TupleTag < JsonNode >(){};
public static final TupleTag < BigQueryProcessError > DEADLETTER_OUT = new TupleTag < BigQueryProcessError >(){};
TableRow convertedRow = new TableRow();
尝试 {

  insertLong(元件。得到(“server_time” ),“server_time” ,convertedRow);insertFloat(元件。得到(“screen_dpi” ),“screen_dpi” ,convertedRow);//更多转变来背景。输出(输出);

} catch(例外 e){

  记录器。误差(“失败变换” + ë。的getMessage(),ê);背景。输出(DEADLETTER_OUT,新 BigQueryProcessError(convertedRow。的toString(),ê。的getMessage(),ERROR_TYPE。BQ_PROCESS,originEvent));

}
}
我们如何通过标签处理元素?让我们改变管道,并进行拆分。该 MAIN 元素将大量查询表和 DEADLETTER_OUT 内容将被发送到错误表。

EventsProcessingOptions options = PipelineOptionsFactory。fromArgs(args)。withValidation()

                。作为(EventsProcessingOptions。类);

管道 p = 管道。创造(选项);
PCollectionTuple tableRows =

              //阅读kafka主题p。apply(“kafka-topic-read”,kafkaReader)。申请(“海边的卡夫卡值”,MapElements。到(TypeDescriptors。字符串())。通过(记录 - > 记录。getKV。()的getValue()))//将值转换为JsonNode。申请(“字符串到JSON” ,ParseJsons。的(JsonNode。类))//创建TableRow。申请(“建设-表行”,帕尔多。的(新 EventsRowFn())。withOutputTags(MAIN_OUT,TupleTagList。的(DEADLETTER_OUT)));//将MAIN标签保存到BQtableRows。得到(MAIN_OUT)。apply(“BQ-write”,BigQueryIO。< TableRowWithEvent > write()。到(tableSpec)。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_NEVER)。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND);//将DEADLETTER_OUT保存到BQ错误表tableRows。得到(DEADLETTER_OUT)。申请(“BQ-进程的错误提取物”,帕尔多。的(新 BigQueryProcessErrorExtracFn()))。申请(“BQ-进程的错误写”,BigQueryIO。writeTableRows()。to(errTableSpec)。withJsonSchema(errSchema)。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_IF_NEEDED)。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND));

p。run();
处理BigQuery插入错误
为了在BigQuery插入期间处理错误,我们必须使用BiqQueryIO API。

让我们放大写入阶段。并稍微改变一下:

WriteResult writeResult = tableRowToInsertCollection

    。申请(“BQ-写”,BigQueryIO。写()//指定将返回失败的行及其错误。withExtendedErrorInfo()。到(tableSpec)。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_NEVER)。withWriteDisposition(BigQueryIO。写。writeDisposition会。WRITE_APPEND)//指定处理失败插入的策略。。withFailedInsertRetryPolicy(InsertRetryPolicy。retryTransientErrors()));

//将失败的行及其错误写入错误表
写结果

    。getFailedInsertsWithErr()。申请(窗口。到(FixedWindows。的(持续时间。standardMinutes(5))))。申请(“BQ-插入错误提取物”,帕尔多。的(新 BigQueryInsertErrorExtractFn(tableRowToInsertView))。withSideInputs(tableRowToInsertView))。申请(“BQ-插入错误写”,BigQueryIO。writeTableRows()。to(errTableSpec)。withJsonSchema(errSchema)。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_IF_NEEDED)。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND));

在上面的代码片段中,我们从BigQueryIO获取失败的TableRows及其错误。现在我们可以将它们转换为另一个 TableRow 并将它们写入错误表。在这种情况下,我们让作业在需要时创建表。

Apache Beam和BigQuery的错误处理(Java SDK)相关推荐

  1. Apache Beam 是什么,它为什么比其他选择更受欢迎?

    1. 概述 在本教程中,我们将介绍 Apache Beam 并探讨其基本概念.我们将首先演示使用 Apache Beam 的用例和好处,然后介绍基本概念和术语.之后,我们将通过一个简单的例子来说明 A ...

  2. apache beam java api_Apache Beam的基本概念

    不多说,直接上干货! Apache Beam的基本概念 在使用Apache Beam构建数据处理程序,首先需要使用Beam SDK中的类创建一个Driver程序,在Driver程序中创建一个满足我们数 ...

  3. Apache Beam构建流处理任务

    最近做的一个项目需要用到Google云平台的Dataflow来进行数据处理,因此研究了一下相关的文档,了解到Dataflow是基于Apache beam来进行流程的编排.Beam支持多种不同的Runn ...

  4. Apache Beam是什么?

    Apache Beam 的前世今生 1月10日,Apache软件基金会宣布,Apache Beam成功孵化,成为该基金会的一个新的顶级项目,基于Apache V2许可证开源. 2003年,谷歌发布了著 ...

  5. Apache Beam 使用指南(一)

    Apache Beam 一.概述 更多 Apache Beam 代码案例:https://github.com/xiye50070/Apache-Beam-Model.git Apache Beam是 ...

  6. Apache Beam 架构原理及应用实践

    导读:大家好,很荣幸跟大家分享 Apache Beam 架构原理及应用实践.讲这门课之前大家可以想想,从进入 IT 行业以来,不停的搬运数据,不管职务为前端,还是后台服务器端开发.随着这两年科技的发展 ...

  7. Apache Beam指南

    Apache Beam 标签(空格分隔): Hadoop 1. What is Beam ? 前世今生: 诞生背景: 分布式数据处理发展迅猛 –> 新的分布式数据处理技术越来越多 –> H ...

  8. Apache Beam WordCount案例编写

    在pom.xml 中 导入maven依赖 <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi=" ...

  9. Apache Beam实战指南 | 玩转KafkaIO与Flink

    AI前线导读:本文是 Apache Beam实战指南系列文章 的第二篇内容,将重点介绍 Apache Beam与Flink的关系,对Beam框架中的KafkaIO和Flink源码进行剖析,并结合应用示 ...

最新文章

  1. 激光雷达(LiDAR)点云数据知多少?
  2. fedora14 an mysql_Fedora 14下 MySQL 更改密码
  3. mac qt android开发环境搭建,Mac 下 PyQt5 的开发环境搭建
  4. SQLite命令行程序说明
  5. Python有自动完成功能的IDE 么?
  6. JAVA集合(笔记)
  7. CSS实现树形结构 + js加载数据
  8. 面试后要请你吃饭_面试问同事请吃饭唯独不叫你咋办?小伙说这是好机会,当场被录取...
  9. 设计模式在各编程语言类库及框架上的应用
  10. spring-bean(xml方式管理)
  11. 麒麟系统安装打印机共享_银河麒麟 惠普打印机驱动怎么安装
  12. win7QQ安装包可能被非法改动导致安装失败怎么办
  13. window10运行不了1stopt_1stopt win10版下载
  14. 计算机数据链路层教案,4.1数据链路层作用教案(计算机网络技术基础教案).doc...
  15. 缩放指数型线性单元(SELU)
  16. linux crc工具,Windows和Linux下使用MD5、SHA1、CRC32校验备份文件的完整性
  17. 百度AI的2020:迎合时代节拍,扛起智能大旗
  18. 服务器和交换机物理连接_二层、三层及四层交换机的区别 | 小知识
  19. 【Autojs教程】03-Autojs 控件学习 | 淘宝关注店铺取消实战
  20. 基于OpenCv的视频流处理方法

热门文章

  1. Jquery中attr与prop的区别
  2. 36)PHP,搜寻数据库信息在html中显示(晋级1)
  3. mysql补充(3)优化sql语句查询常用的30种方法
  4. 改变 PropertyGrid 控件的编辑风格(2)——编辑多行文本
  5. 【报错信息】Google Play 上架报错 ( Your app contains ads that do not comply with our Families ad | 退出亲子同乐计划 )
  6. 【错误记录】执行 Python 程序报错 ( NameError: name ‘reload‘ is not defined )
  7. 【MATLAB】进阶绘图 ( Boxplot 箱线图 | boxplot 函数 | Error Bar 误差条线图 | errorbar 函数 )
  8. 【OpenGL】十四、OpenGL 绘制三角形 ( 绘制 GL_TRIANGLE_STRIP 三角形 | GL_TRIANGLE_STRIP 三角形绘制分析 )
  9. 【计算机网络】应用层 : 网络应用模型 ( 应用层概述 | 客户端 / 服务器 模型 | P2P 模型 )
  10. 【计算机网络】传输层 : TCP 拥塞控制 ( 慢开始 | 拥塞避免 | 快重传 | 快恢复 )