Flink-高级特性-新特性-异步IO-了解

原理

API

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html

注意: 如果要使用异步IO, 对应Client有一定要求:

1.该Client要支持发送异步请求,如vertx

2.如果Client不支持可以使用线程池来模拟异步请求

代码演示

DROP TABLE IF EXISTS `t_category`;
CREATE TABLE `t_category` (`id` int(11) NOT NULL,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ----------------------------
-- Records of t_category
-- ----------------------------
INSERT INTO `t_category` VALUES ('1', '手机');
INSERT INTO `t_category` VALUES ('2', '电脑');
INSERT INTO `t_category` VALUES ('3', '服装');
INSERT INTO `t_category` VALUES ('4', '化妆品');
INSERT INTO `t_category` VALUES ('5', '食品');
package cn.itcast.feature;import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.*;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 使用异步io的先决条件* 1.数据库(或key/value存储)提供支持异步请求的client。* 2.没有异步请求客户端的话也可以将同步客户端丢到线程池中执行作为异步客户端。*/
public class ASyncIODemo {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.Source//数据源中只有id//DataStreamSource[1,2,3,4,5]DataStreamSource<CategoryInfo> categoryDS = env.addSource(new RichSourceFunction<CategoryInfo>() {private Boolean flag = true;@Overridepublic void run(SourceContext<CategoryInfo> ctx) throws Exception {Integer[] ids = {1, 2, 3, 4, 5};for (Integer id : ids) {ctx.collect(new CategoryInfo(id, null));}}@Overridepublic void cancel() {this.flag = false;}});//3.Transformation//方式一:Java-vertx中提供的异步client实现异步IO//unorderedWait无序等待SingleOutputStreamOperator<CategoryInfo> result1 = AsyncDataStream.unorderedWait(categoryDS, new ASyncIOFunction1(), 1000, TimeUnit.SECONDS, 10);//方式二:MySQL中同步client+线程池模拟异步IO//unorderedWait无序等待SingleOutputStreamOperator<CategoryInfo> result2 = AsyncDataStream.unorderedWait(categoryDS, new ASyncIOFunction2(), 1000, TimeUnit.SECONDS, 10);//4.Sinkresult1.print("方式一:Java-vertx中提供的异步client实现异步IO \n");result2.print("方式二:MySQL中同步client+线程池模拟异步IO \n");//5.executeenv.execute();}
}@Data
@NoArgsConstructor
@AllArgsConstructor
class CategoryInfo {private Integer id;private String name;
}//MySQL本身的客户端-需要把它变成支持异步的客户端:使用vertx或线程池
class MysqlSyncClient {private static transient Connection connection;private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";private static final String URL = "jdbc:mysql://localhost:3306/bigdata";private static final String USER = "root";private static final String PASSWORD = "root";static {init();}private static void init() {try {Class.forName(JDBC_DRIVER);} catch (ClassNotFoundException e) {System.out.println("Driver not found!" + e.getMessage());}try {connection = DriverManager.getConnection(URL, USER, PASSWORD);} catch (SQLException e) {System.out.println("init connection failed!" + e.getMessage());}}public void close() {try {if (connection != null) {connection.close();}} catch (SQLException e) {System.out.println("close connection failed!" + e.getMessage());}}public CategoryInfo query(CategoryInfo category) {try {String sql = "select id,name from t_category where id = "+ category.getId();Statement statement = connection.createStatement();ResultSet rs = statement.executeQuery(sql);if (rs != null && rs.next()) {category.setName(rs.getString("name"));}} catch (SQLException e) {System.out.println("query failed!" + e.getMessage());}return category;}
}/*** 方式一:Java-vertx中提供的异步client实现异步IO*/
class ASyncIOFunction1 extends RichAsyncFunction<CategoryInfo, CategoryInfo> {private transient SQLClient mySQLClient;@Overridepublic void open(Configuration parameters) throws Exception {JsonObject mySQLClientConfig = new JsonObject();mySQLClientConfig.put("driver_class", "com.mysql.jdbc.Driver").put("url", "jdbc:mysql://localhost:3306/bigdata").put("user", "root").put("password", "root").put("max_pool_size", 20);VertxOptions options = new VertxOptions();options.setEventLoopPoolSize(10);options.setWorkerPoolSize(20);Vertx vertx = Vertx.vertx(options);//根据上面的配置参数获取异步请求客户端mySQLClient = JDBCClient.createNonShared(vertx, mySQLClientConfig);}//使用异步客户端发送异步请求@Overridepublic void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {mySQLClient.getConnection(new Handler<AsyncResult<SQLConnection>>() {@Overridepublic void handle(AsyncResult<SQLConnection> sqlConnectionAsyncResult) {if (sqlConnectionAsyncResult.failed()) {return;}SQLConnection connection = sqlConnectionAsyncResult.result();connection.query("select id,name from t_category where id = " +input.getId(), new Handler<AsyncResult<io.vertx.ext.sql.ResultSet>>() {@Overridepublic void handle(AsyncResult<io.vertx.ext.sql.ResultSet> resultSetAsyncResult) {if (resultSetAsyncResult.succeeded()) {List<JsonObject> rows = resultSetAsyncResult.result().getRows();for (JsonObject jsonObject : rows) {CategoryInfo categoryInfo = new CategoryInfo(jsonObject.getInteger("id"), jsonObject.getString("name"));resultFuture.complete(Collections.singletonList(categoryInfo));}}}});}});}@Overridepublic void close() throws Exception {mySQLClient.close();}@Overridepublic void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {System.out.println("async call time out!");input.setName("未知");resultFuture.complete(Collections.singleton(input));}
}/*** 方式二:同步调用+线程池模拟异步IO*/
class ASyncIOFunction2 extends RichAsyncFunction<CategoryInfo, CategoryInfo> {private transient MysqlSyncClient client;private ExecutorService executorService;//线程池@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);client = new MysqlSyncClient();executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}//异步发送请求@Overridepublic void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {executorService.execute(new Runnable() {@Overridepublic void run() {resultFuture.complete(Collections.singletonList((CategoryInfo) client.query(input)));}});}@Overridepublic void close() throws Exception {}@Overridepublic void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {System.out.println("async call time out!");input.setName("未知");resultFuture.complete(Collections.singleton(input));}
}

23-Flink-高级特性-新特性-异步IO-了解相关推荐

  1. # 22.Flink-高级特性-新特性-异步IO\原理

    22.Flink-高级特性-新特性-异步IO-了解 22.1.原理 22.1.1.异步IO操作的需求 https://nightlies.apache.org/flink/flink-docs-rel ...

  2. 23.Flink-高级特性-新特性-Streaming Flie Sink\介绍\代码演示\Flink-高级特性-新特性-FlinkSQL整合Hive\添加依赖和jar包和配置

    23.Flink-高级特性-新特性-Streaming Flie Sink 23.1.介绍 23.2.代码演示 24.Flink-高级特性-新特性-FlinkSQL整合Hive 24.1.介绍 24. ...

  3. Flink 1.14 新特性预览

    简介: 一文了解 Flink 1.14 版本新特性及最新进展 本文由社区志愿者陈政羽整理,内容源自阿里巴巴技术专家宋辛童 (五藏) 在 8 月 7 日线上 Flink Meetup 分享的<Fl ...

  4. 【Flink】Flink Flink 1.14 新特性预览

    1.概述 转载:Flink 1.14 新特性预览 简介: 一文了解 Flink 1.14 版本新特性及最新进展 本文由社区志愿者陈政羽整理,内容源自阿里巴巴技术专家宋辛童 (五藏) 在 8 月 7 日 ...

  5. 技术前沿资讯-Apache Flink 1.14 新特性介绍

    一.简介 1.14 新版本原本规划有 35 个比较重要的新特性以及优化工作,目前已经有 26 个工作完成:5 个任务不确定是否能准时完成:另外 4 个特性由于时间或者本身设计上的原因,会放到后续版本完 ...

  6. 总结HTMLT5高级的新特性

    音频与视频 HTML5提供了相关标签支持在网页中实现音频与视频的播放. 音频标签 音频标签支持的文件格式有:WAV.MP3.ogg. 音频标签的简单使用方法: <audio src=" ...

  7. JavaScript高级 ES7-ES13 新特性

    1. ES7 1. Array Includes 在ES7之前,如果我们想判断一个数组中是否包含某个元素,需要通过 indexOf 获取结果,并且判断是否为 -1 在ES7中,我们可以通过includ ...

  8. Java高级--JDK8新特性

    1.lambda表达式 lambda表达式:特殊的匿名内部类,语法更简洁: lambda表达式允许把函数作为一个方法的函数(函数作为方法的参数传递),将代码像数据一样传递: 语法: <函数式接口 ...

  9. Java高级:新特性:lambda 函数式接口 方法引用 StreamAPI Optional类

    package com.atguigu.java1;import org.junit.Test;import java.util.Comparator;/*** Lambda表达式使用举例:** @a ...

最新文章

  1. linux 脚本 字符串函数调用函数调用,shell自定义函数及参数调用解析
  2. 使用logrotate管理nginx日志文件
  3. 『干货』分享你最喜欢的技巧和提示(Xcode,objective-c,swift,c...等等)
  4. 我的Go+语言初体验——【一、go+环境WIN10_100%成功安装(附-视频)】
  5. iOS中POST请求
  6. 一个不知名的网站复制来的: java怎样连接到SQL server 2008
  7. python进程与线程_Python进程与线程知识
  8. Symbian系统手机软件
  9. 纯CSS实现的炫酷HOVER效果
  10. 温度对二极管伏安特性的影响
  11. android视频录制旋转,android – 录制的视频在上传到互联网后旋转90度
  12. android 8.0图标适配
  13. C语言入门 -- Simple Simon 简单的西蒙游戏(2021/1/7)
  14. Grafana 短信报警
  15. 计算机系统xp和w7,对比分析老电脑装xp还是win7纯净版好
  16. Qt-利用fmod库显示声音波形
  17. TOPSIS和熵权法的应用(Matlab实现,包括数据预处理)
  18. linux 文件添加标签,SELinux——有趣的标签
  19. matlab上机绘图实验心得,matlab实验心得总结
  20. 云目录(DaaS )快速入门

热门文章

  1. 最简单却又极具扩展性的Java表达式引擎,自创编程语言必备
  2. 一个程序员的中秋节碎碎念
  3. 如何设计和生成游戏的激活码
  4. 计算机无法识别鼠标怎么设置,win10电脑不识别鼠标怎么处理
  5. dnf服务器炸团门票怎么找回,dnf超级福利!自助门票补偿系统上线,打团掉线献祭已成过去式...
  6. eclipse的swt插件的控件
  7. Linux I2C驱动(OMAP3630 ) omap_i2c_probe
  8. MySQL | 触发器
  9. vista下载_在受保护模式下使用IE7在Vista下下载可听内容
  10. 2.13 强大的自定义变换工具 [Ps教程]