22.Flink-高级特性-新特性-异步IO-了解

22.1.原理

22.1.1.异步IO操作的需求

https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html

Async I/O是阿里巴巴贡献给社区的一个呼声非常高的特性,于1.12版本引入。主要目的是为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题。

流计算系统中经常需要与外部系统进行交互,我们通常的做法如向数据库发送用户a的查询请求,然后等待结果返回,在这之前,我们的程序无法发送用户b的查询请求。这是一种同步访问方式,如下图所示:

22.2.API

https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)/*** An implementation of the 'AsyncFunction' that sends requests and sets the callback.*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {/** The database specific client that can issue concurrent requests with callbacks */private transient DatabaseClient client;@Overridepublic void open(Configuration parameters) throws Exception {client = new DatabaseClient(host, post, credentials);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {// issue the asynchronous request, receive a future for resultfinal Future<String> result = client.query(key);// set the callback to be executed once the request by the client is complete// the callback simply forwards the result to the result futureCompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {return result.get();} catch (InterruptedException | ExecutionException e) {// Normally handled explicitly.return null;}}}).thenAccept( (String dbResult) -> {resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));});}
}// create the original stream
DataStream<String> stream = ...;// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

注意:如果要使用异步IO,对应Client有一定要求:
1.该Client要支持发送异步请求,如vertx。
2.如果Client不支持可以使用线程池来模拟异步请求。

DROP TABLE IF EXISTS `t_category`;
CREATE TABLE `t_category` (`id` int(11) NOT NULL,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ----------------------------
-- Records of t_category
-- ----------------------------
INSERT INTO `t_category` VALUES ('1', '手机');
INSERT INTO `t_category` VALUES ('2', '电脑');
INSERT INTO `t_category` VALUES ('3', '服装');
INSERT INTO `t_category` VALUES ('4', '化妆品');
INSERT INTO `t_category` VALUES ('5', '食品');
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.*;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 使用异步io的先决条件* 1.数据库(或key/value存储)提供支持异步请求的client。* 2.没有异步请求客户端的话也可以将同步客户端丢到线程池中执行作为异步客户端。*/
public class ASyncIODemo {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.Source//数据源中只有id//DataStreamSource[1,2,3,4,5]DataStreamSource<CategoryInfo> categoryDS = env.addSource(new RichSourceFunction<CategoryInfo>() {private Boolean flag = true;@Overridepublic void run(SourceContext<CategoryInfo> ctx) throws Exception {Integer[] ids = {1, 2, 3, 4, 5};for (Integer id : ids) {ctx.collect(new CategoryInfo(id, null));}}@Overridepublic void cancel() {this.flag = false;}});//3.Transformation//方式一:Java-vertx中提供的异步client实现异步IO//unorderedWait无序等待SingleOutputStreamOperator<CategoryInfo> result1 = AsyncDataStream.unorderedWait(categoryDS, new ASyncIOFunction1(), 1000, TimeUnit.SECONDS, 10);//方式二:MySQL中同步client+线程池模拟异步IO//unorderedWait无序等待SingleOutputStreamOperator<CategoryInfo> result2 = AsyncDataStream.unorderedWait(categoryDS, new ASyncIOFunction2(), 1000, TimeUnit.SECONDS, 10);//4.Sinkresult1.print("方式一:Java-vertx中提供的异步client实现异步IO \n");result2.print("方式二:MySQL中同步client+线程池模拟异步IO \n");//5.executeenv.execute();}
}@Data
@NoArgsConstructor
@AllArgsConstructor
class CategoryInfo {private Integer id;private String name;
}//MySQL本身的客户端-需要把它变成支持异步的客户端:使用vertx或线程池
class MysqlSyncClient {private static transient Connection connection;private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";private static final String URL = "jdbc:mysql://localhost:3306/bigdata";private static final String USER = "root";private static final String PASSWORD = "root";static {init();}private static void init() {try {Class.forName(JDBC_DRIVER);} catch (ClassNotFoundException e) {System.out.println("Driver not found!" + e.getMessage());}try {connection = DriverManager.getConnection(URL, USER, PASSWORD);} catch (SQLException e) {System.out.println("init connection failed!" + e.getMessage());}}public void close() {try {if (connection != null) {connection.close();}} catch (SQLException e) {System.out.println("close connection failed!" + e.getMessage());}}public CategoryInfo query(CategoryInfo category) {try {String sql = "select id,name from t_category where id = "+ category.getId();Statement statement = connection.createStatement();ResultSet rs = statement.executeQuery(sql);if (rs != null && rs.next()) {category.setName(rs.getString("name"));}} catch (SQLException e) {System.out.println("query failed!" + e.getMessage());}return category;}
}/*** 方式一:Java-vertx中提供的异步client实现异步IO*/
class ASyncIOFunction1 extends RichAsyncFunction<CategoryInfo, CategoryInfo> {private transient SQLClient mySQLClient;@Overridepublic void open(Configuration parameters) throws Exception {JsonObject mySQLClientConfig = new JsonObject();mySQLClientConfig.put("driver_class", "com.mysql.jdbc.Driver").put("url", "jdbc:mysql://localhost:3306/bigdata").put("user", "root").put("password", "root").put("max_pool_size", 20);VertxOptions options = new VertxOptions();options.setEventLoopPoolSize(10);options.setWorkerPoolSize(20);Vertx vertx = Vertx.vertx(options);//根据上面的配置参数获取异步请求客户端mySQLClient = JDBCClient.createNonShared(vertx, mySQLClientConfig);}//使用异步客户端发送异步请求@Overridepublic void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {mySQLClient.getConnection(new Handler<AsyncResult<SQLConnection>>() {@Overridepublic void handle(AsyncResult<SQLConnection> sqlConnectionAsyncResult) {if (sqlConnectionAsyncResult.failed()) {return;}SQLConnection connection = sqlConnectionAsyncResult.result();connection.query("select id,name from t_category where id = " +input.getId(), new Handler<AsyncResult<io.vertx.ext.sql.ResultSet>>() {@Overridepublic void handle(AsyncResult<io.vertx.ext.sql.ResultSet> resultSetAsyncResult) {if (resultSetAsyncResult.succeeded()) {List<JsonObject> rows = resultSetAsyncResult.result().getRows();for (JsonObject jsonObject : rows) {CategoryInfo categoryInfo = new CategoryInfo(jsonObject.getInteger("id"), jsonObject.getString("name"));resultFuture.complete(Collections.singletonList(categoryInfo));}}}});}});}@Overridepublic void close() throws Exception {mySQLClient.close();}@Overridepublic void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {System.out.println("async call time out!");input.setName("未知");resultFuture.complete(Collections.singleton(input));}
}/*** 方式二:同步调用+线程池模拟异步IO*/
class ASyncIOFunction2 extends RichAsyncFunction<CategoryInfo, CategoryInfo> {private transient MysqlSyncClient client;private ExecutorService executorService;//线程池@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);client = new MysqlSyncClient();executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}//异步发送请求@Overridepublic void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {executorService.execute(new Runnable() {@Overridepublic void run() {resultFuture.complete(Collections.singletonList((CategoryInfo) client.query(input)));}});}@Overridepublic void close() throws Exception {}@Overridepublic void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception {System.out.println("async call time out!");input.setName("未知");resultFuture.complete(Collections.singleton(input));}
}

# 22.Flink-高级特性-新特性-异步IO\原理相关推荐

  1. 23-Flink-高级特性-新特性-异步IO-了解

    Flink-高级特性-新特性-异步IO-了解 原理 API https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/strea ...

  2. Flink 1.14 新特性预览

    简介: 一文了解 Flink 1.14 版本新特性及最新进展 本文由社区志愿者陈政羽整理,内容源自阿里巴巴技术专家宋辛童 (五藏) 在 8 月 7 日线上 Flink Meetup 分享的<Fl ...

  3. 【Flink】Flink Flink 1.14 新特性预览

    1.概述 转载:Flink 1.14 新特性预览 简介: 一文了解 Flink 1.14 版本新特性及最新进展 本文由社区志愿者陈政羽整理,内容源自阿里巴巴技术专家宋辛童 (五藏) 在 8 月 7 日 ...

  4. 23.Flink-高级特性-新特性-Streaming Flie Sink\介绍\代码演示\Flink-高级特性-新特性-FlinkSQL整合Hive\添加依赖和jar包和配置

    23.Flink-高级特性-新特性-Streaming Flie Sink 23.1.介绍 23.2.代码演示 24.Flink-高级特性-新特性-FlinkSQL整合Hive 24.1.介绍 24. ...

  5. 傲游3.0.22.1000_Beta 10项新特性,超有料更新

    除了10多项新特性, 还有一些功能细节改进, 以及大量问题修正. 其中包括地址栏无法输入, 点击连接打开下载页面等用户反馈集中的问题. 这些新特性是: 全新Beta图标 支持自定义搜索栏, 头像等界面 ...

  6. 技术前沿资讯-Apache Flink 1.14 新特性介绍

    一.简介 1.14 新版本原本规划有 35 个比较重要的新特性以及优化工作,目前已经有 26 个工作完成:5 个任务不确定是否能准时完成:另外 4 个特性由于时间或者本身设计上的原因,会放到后续版本完 ...

  7. Linux 原生异步 IO 原理与使用

    目录 什么是异步 IO? Linux 原生 AIO 原理 Linux 原生 AIO 使用 什么是异步 IO? 异步 IO:当应用程序发起一个 IO 操作后,调用者不能立刻得到结果,而是在内核完成 IO ...

  8. 总结HTMLT5高级的新特性

    音频与视频 HTML5提供了相关标签支持在网页中实现音频与视频的播放. 音频标签 音频标签支持的文件格式有:WAV.MP3.ogg. 音频标签的简单使用方法: <audio src=" ...

  9. Java高级--JDK8新特性

    1.lambda表达式 lambda表达式:特殊的匿名内部类,语法更简洁: lambda表达式允许把函数作为一个方法的函数(函数作为方法的参数传递),将代码像数据一样传递: 语法: <函数式接口 ...

最新文章

  1. c保留小数点后三位数没有则为0_哪位老师整理的,这么齐全?赶紧存下为孩子期末考试助力...
  2. 不同系统平台下Java默认的安装路径
  3. chrome console js多行输入
  4. 学习笔记(57):Python实战编程-Treeview
  5. POJ2942-Knights of the Round Table【tarjan】
  6. Systemd 入门教程之命令篇
  7. Leetcode unique-paths
  8. oracle打patch,Oracle初学者入门指南-How to get Oracle Patch?
  9. ByWater Solutions支持在图书馆中使用开源
  10. 【SpringBoot与SpringCloud的一些根本】
  11. python编译为机器码_通过 GraalVM 将 Java 程序编译成本地机器码!
  12. .net开发常用工具备忘录
  13. torch.nn.parallel.DistributedDataParallel使用中卡在第一个epoch的原因之一
  14. c++编写浪漫烟花 表白代码大全
  15. 当输入条件不符合规范c语言,【图片】【高二会考】选择判断答案大全【郑州12中吧】_百度贴吧...
  16. linux 搜狗输入法包名,搜狗输入法
  17. Python爬虫实战 --实现 QQ空间自动点赞
  18. 国产MCU替代STM8S003方案汇总
  19. 先进驾驶辅助系统(ADAS) 术语
  20. 【raft】学习五:日志结构raftLog

热门文章

  1. 业界信息系统集成的三个主流模式:单点登录、数据融合、过程协同
  2. echo on 和 echo off
  3. springboot环境变量(environment)加载源码分析
  4. Centos7下PHP安装gd库
  5. 西安美术学院艺术学理论专业考研上岸经验分享
  6. vscode格式化失效问题
  7. Linux 学习笔记 文件、目录操作与用户群组权限
  8. 无字母数字rce(ctfshow web入门56)
  9. 宽带中国创新研究中心
  10. 机器学习中遇到的一些英文单词、缩写、变量名(一)不定期更新