感慨:玩大数据java必须要精通,不然遇到眼花缭乱的框架以及插件拓展的时候就会一下子傻眼了。各种框架之间版本不同有现成的插件或者方案到还可以但是没有就需要自己扩展。目前我使用的是CDH6.3.2,flink使用的是1.6,Phoenix版本的是5.0.0这有在我的博客中提到过,hbase使用的是自带的2.x。这就遇到问题了目前有支持的比较好的是dlinky这个里面的插件,我现在需要做的是将dlink-connector-phoenix这个插件编译打包上传到flink的lib目录中使用sql-client进行测试。

问题:目前dlinky支持flink1.14不支持flink1.16所以需要扩展。经过我比对flink源码中的flink-connector-jdbc的写法。结合
dlinky中dlink-connector-phoenix-1.14的版本进行扩展

如果想要已经打包好的
https://download.csdn.net/download/u012228523/87853354

1、拉取dlinky的源码,
https://gitee.com/mirrors/Dlink.git
并且切换到0.7.3分支

2、按照官网来基本环境要求来
http://www.dlink.top/docs/next/deploy_guide/compiler

特殊说明:
mvn的仓库配置的是

  <mirrors><mirror><id>alimaven</id><mirrorOf>central</mirrorOf><name>nexus</name><url>https://maven.aliyun.com/repository/public/</url></mirror>  </mirrors>

3、将dlink-connectors中的dlink-connector-phoenix-1.14拷贝一份到同级目录下面。修改名称为dlink-connector-phoenix-1.16

修改dlink-connectors同级目录的pom.xml文件

新增代码如下截图:

4、修改dlink-connector-phoenix-1.16的pom.xml文件

一定是这样并且flink-table-common必须在最上面不然会有问题。版本1.16-SNAPSHOT是我编译flink的时候打的包。你也可以写1.16.1或者1.16.2如果编译不同过可以留言邮箱,我发出来。

5、编译过程中出现兼容问题,需要修改源码PhoenixDynamicTableSource.java这个类中由于dlinky0.7.3使用了
TableSchemaUtils这个工具类中的projectSchema方法.但是这个方法在flink1.16已经给删除了。于是可以将flink1.14中TableSchemaUtils的projectSchema方法写到PhoenixDynamicTableSource.java这个类中稍后贴出源码直接覆盖类就行

6、打包

mvn clean install --settings /Users/admin/Documents/softwares/repository-zi/settings-aliyun.xml  -DskipTests=true -P aliyun,prod,scala-2.12,web,fast,flink-1.16

8、编译完成后如果是想只使用flink只需要到入一下包到flink的lib目录下
dlink-connector-phoenix-1.16-0.7.3.jar,phoenix-5.0.0-cdh6.2.0-client.jar,phoenix-core-5.0.0-cdh6.2.0.jar

注意:一定不要与flink自带的hbase-connector包放一起,会冲突

9、如果将Phoenix添加到dlinky web中运行需要将phoenix-5.0.0-cdh6.2.0-client-dlinky.jar包中的servlet包删除
路径:javax/servlet。在重新打开

-- 解压包
jar xvf xxx.jar-- 打包
jar cvf xxx.jar <打包路径>
/***  Licensed to the Apache Software Foundation (ASF) under one or more*  contributor license agreements.  See the NOTICE file distributed with*  this work for additional information regarding copyright ownership.*  The ASF licenses this file to You under the Apache License, Version 2.0*  (the "License"); you may not use this file except in compliance with*  the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0**  Unless required by applicable law or agreed to in writing, software*  distributed under the License is distributed on an "AS IS" BASIS,*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.*  See the License for the specific language governing permissions and*  limitations under the License.**/package org.apache.flink.connector.phoenix.table;import org.apache.flink.connector.phoenix.dialect.JdbcDialect;
import org.apache.flink.connector.phoenix.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.phoenix.internal.options.JdbcReadOptions;
import org.apache.flink.connector.phoenix.internal.options.PhoenixJdbcOptions;
import org.apache.flink.connector.phoenix.split.JdbcNumericBetweenParametersProvider;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.table.types.FieldsDataType;
import org.apache.flink.table.types.logical.RowType;
import java.util.Objects;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.types.utils.DataTypeUtils;
/*** PhoenixDynamicTableSource** @author gy* @since 2022/3/17 10:40**/
public class PhoenixDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown,SupportsLimitPushDown {private final PhoenixJdbcOptions options;private final JdbcReadOptions readOptions;private final JdbcLookupOptions lookupOptions;private TableSchema physicalSchema;private final String dialectName;private long limit = -1L;public PhoenixDynamicTableSource(PhoenixJdbcOptions options, JdbcReadOptions readOptions, JdbcLookupOptions lookupOptions, TableSchema physicalSchema) {this.options = options;this.readOptions = readOptions;this.lookupOptions = lookupOptions;this.physicalSchema = physicalSchema;this.dialectName = options.getDialect().dialectName();}@Overridepublic LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {// JDBC only support non-nested look up keysString[] keyNames = new String[context.getKeys().length];for (int i = 0; i < keyNames.length; i++) {int[] innerKeyArr = context.getKeys()[i];Preconditions.checkArgument(innerKeyArr.length == 1, "JDBC only support non-nested look up keys");keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]];}final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();return TableFunctionProvider.of(new PhoenixRowDataLookupFunction(options,lookupOptions,physicalSchema.getFieldNames(),physicalSchema.getFieldDataTypes(),keyNames,rowType));}@Overridepublic ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {PhoenixJdbcRowDataInputFormat.Builder builder = PhoenixJdbcRowDataInputFormat.builder().setDrivername(this.options.getDriverName()).setDBUrl(this.options.getDbURL()).setUsername((String)this.options.getUsername().orElse((String) null)).setPassword((String)this.options.getPassword().orElse((String) null)).setAutoCommit(this.readOptions.getAutoCommit())//setting phoenix schema.setNamespaceMappingEnabled(this.options.getNamespaceMappingEnabled()).setMapSystemTablesToNamespace(this.options.getMapSystemTablesToNamespace());if (this.readOptions.getFetchSize() != 0) {builder.setFetchSize(this.readOptions.getFetchSize());}JdbcDialect dialect = this.options.getDialect();String query = dialect.getSelectFromStatement(this.options.getTableName(), this.physicalSchema.getFieldNames(), new String[0]);if (this.readOptions.getPartitionColumnName().isPresent()) {long lowerBound = (Long)this.readOptions.getPartitionLowerBound().get();long upperBound = (Long)this.readOptions.getPartitionUpperBound().get();int numPartitions = (Integer)this.readOptions.getNumPartitions().get();builder.setParametersProvider((new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)).ofBatchNum(numPartitions));query = query + " WHERE " + dialect.quoteIdentifier((String)this.readOptions.getPartitionColumnName().get()) + " BETWEEN ? AND ?";}if (this.limit >= 0L) {query = String.format("%s %s", query, dialect.getLimitClause(this.limit));}builder.setQuery(query);RowType rowType = (RowType)this.physicalSchema.toRowDataType().getLogicalType();builder.setRowConverter(dialect.getRowConverter(rowType));builder.setRowDataTypeInfo(runtimeProviderContext.createTypeInformation(this.physicalSchema.toRowDataType()));return InputFormatProvider.of(builder.build());}@Overridepublic ChangelogMode getChangelogMode() {return ChangelogMode.insertOnly();}@Overridepublic boolean supportsNestedProjection() {return false;}@Overridepublic void applyProjection(int[][] projectedFields) {this.physicalSchema = projectSchema(this.physicalSchema, projectedFields);}private boolean containsPhysicalColumnsOnly(TableSchema schema) {Preconditions.checkNotNull(schema);return schema.getTableColumns().stream().allMatch(TableColumn::isPhysical);}private TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields) {Preconditions.checkArgument(containsPhysicalColumnsOnly(tableSchema),"Projection is only supported for physical columns.");TableSchema.Builder builder = TableSchema.builder();FieldsDataType fields =(FieldsDataType)DataTypeUtils.projectRow(tableSchema.toRowDataType(), projectedFields);RowType topFields = (RowType) fields.getLogicalType();for (int i = 0; i < topFields.getFieldCount(); i++) {builder.field(topFields.getFieldNames().get(i), fields.getChildren().get(i));}return builder.build();}public DynamicTableSource copy() {return new PhoenixDynamicTableSource(this.options, this.readOptions, this.lookupOptions, this.physicalSchema);}public String asSummaryString() {return "JDBC:" + this.dialectName;}public boolean equals(Object o) {if (this == o) {return true;} else if (!(o instanceof PhoenixDynamicTableSource)) {return false;} else {PhoenixDynamicTableSource that = (PhoenixDynamicTableSource)o;return Objects.equals(this.options, that.options)&& Objects.equals(this.physicalSchema, that.physicalSchema)&& Objects.equals(this.dialectName, that.dialectName)&& Objects.equals(this.limit, that.limit);}}public int hashCode() {return Objects.hash(new Object[]{this.options, this.readOptions, this.lookupOptions, this.physicalSchema, this.dialectName, this.limit});}public void applyLimit(long limit) {this.limit = limit;}
}

扩展dlink-connector-phoenix使其phoenix-5.0.0支持flink1.16相关推荐

  1. Phoenix 原理 以及 Phoenix在HBase中的应用

    一.前言 业务使用HBase已经有一段时间了,期间也反馈了很多问题,其中反馈最多的是HBase是否支持SQL查询和二级索引,由于HBase在这两块上目前暂不支持,导致业务在使用时无法更好的利用现有的经 ...

  2. 配置phoenix连接hbase_使用 Phoenix-4.11.0连接 Hbase 集群 ,并使用 JDBC 查询测试

    什么是 Phoenix ? Apache Phoenix 是运行在Hbase之上的高性能关系型数据库,通过Phoenix可以像使用jdbc访问关系型数据库一样访问hbase. Phoenix,操作的表 ...

  3. apache phoenix 入门_apache phoenix 入门

    apache phoenix 是一种HBase的SQL皮肤,或者说SQL工具,它弥补了原生的HBase不支持SQL的缺陷.用官网的话说,Phoenix让HBase这种NoSQL数据库又重归SQL行列 ...

  4. 有一个5 * 5的二维数组,保留主对角线上的元素,并使其他元素均为0,要求用函数和子函数完成

    <程序设计基础实训指导教程-c语言> ISBN 978-7-03-032846-5 p143 7.1.2 上级实训内容 [实现内容8]有一个5 * 5的二维数组,保留主对角线上的元素,并使 ...

  5. IBM发布Open Liberty 18.0.0.4,支持MicroProfile 2.1和反应性扩展框架

    IBM在2018年第四季度发布的Open Liberty 18.0.0.4提供了对MicroProfile 2.1.反应性扩展框架和连接池指标的全面支持.根据发布说明: Open Liberty现在对 ...

  6. 如何使‘CREATE TABLE AS SELECT’能支持ORDER BY ?

    如何使'CREATE TABLE AS SELECT'能支持ORDER BY ?         大家都知道,"CREATE TABLE AS SELECT"这个SQL命令并不支持 ...

  7. php log pecl,PHP日志扩展SeasLog-1.0.0正式版在PECL发布

    PHP日志扩展SeasLog-1.0.0正式版在PECL发布 发布于 2014-07-29 07:00:26 | 140 次阅读 | 评论: 0 | 来源: 网友投递 SeasLog PHP日志扩展S ...

  8. dev c++ 64位_RHEL7.8添加本地源以及扩展GCC,C++的32位和静态库支持

    初次安装系统后,需要配置一下yum本地源,这是因为,我们需要的软件系统默认安装过程中许多软件没有安装 1:配置本地yum源 配置本地yum源是通过本地映射光盘挂载到系统中,然后将yum的配置文件中的 ...

  9. 使你的C/C++代码支持Unicode

    分享一下我老师大神的人工智能教程!零基础,通俗易懂!http://blog.csdn.net/jiangjunshow 也欢迎大家转载本篇文章.分享知识,造福人民,实现我们中华民族伟大复兴! 本文摘自 ...

最新文章

  1. 记一次性能优化,单台4核8G机器支撑5万QPS
  2. 设置cookie和查找cookie的方法
  3. 基于Linux C的socketEthereal程序和Package分析 (一个)
  4. php表单服务器验证失败,php 表单验证代码(验证失败显示提示信息)
  5. WordPress 自定义插件初始化及卸载
  6. Linux命令之 mount -- 文件系统挂载
  7. 消息推送生命周期_一套完整的APP推送体系方案|附思维导图
  8. Python 进阶 之 socket模块
  9. 读书笔记:非营利组织的管理
  10. 推荐 9 个超赞的 JavaScript 库
  11. 《算法分析与设计(第5版)》——王晓东 - 学习记录 / 期末复习
  12. 电影下载、播放、制作、转换、各类问题全攻略
  13. 前端实现动画的7种方式
  14. 如何撤销Word文档的只读模式
  15. php7米酷cms,米酷CMS6.2修改版 支持PHP7 独家首发 - 百码云
  16. 【春招实习】贝壳金服电话一面
  17. 视频号怎么赚钱?4个赚钱小技巧,实现视频号流量变现!
  18. 使用IO完善快递管理系统
  19. java中国象棋棋盘放置棋子,Qt中国象棋二——棋盘与棋子的绘制
  20. 湍流 Spectrum 与 Cascade 的理解

热门文章

  1. C语言求1元2次方程的解,一元二次方程求解程序完整代码
  2. php转出的json数据美化,PHP输出日志,json美化
  3. python open函数参数_python中open函数的使用
  4. 基本函数input() print() map() filter() reduce()和lambda()算子-operater用法
  5. PANTONE潘通公布2020年度代表色!
  6. 快速构建页面APP程序员的人生计划
  7. 【git】番外~~佛主
  8. OpenStack-基本概念之neutron
  9. 梳理一下各大平台使用的sample rate convert算法
  10. 太厉害了,终于有人能把文件上传漏洞讲的明明白白了