本系统经过多年应用并且持续不断改进,系统各部功能已基本完善,非常适合大型医疗集团,当然小型医院或科室就更适合了。

文件:590m.com/f/25127180-499031859-968efa(访问密码:551685)

以下内容无关:

-------------------------------------------分割线---------------------------------------------

Flink的API做了4层的封装,上两层TableAPI、SQL语法相对简单便于编写,面对小需求可以快速上手解决,本文参考官网及部分线上教程编写source端、sink端代码,分别读取socket、kafka及文本作为source,并将流数据输出写入Kafka、ES及MySQL,方便后续查看使用。

二、代码部分
说明:这里使用connect及DDL两种写法,connect满足Flink1.10及以前版本使用,目前官方文档均是以DDL写法作为介绍,建议1.10以后的版本使用DDL写法操作,通用性更强。

1.读取(Source)端写法
1.1 基础环境建立,方便演示并行度为1且不设置CK

//建立Stream环境,设置并行度为1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//建立Table环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
1.2 读取Socket端口数据,并使用TableAPI及SQL两种方式查询

//读取服务器9999端口数据,并转换为对应JavaBean
SingleOutputStreamOperator mapDS = env.socketTextStream(“hadoop102”, 9999)
.map(value -> {
String[] split = value.split(",");
return new WaterSensor(split[0]
, Long.parseLong(split[1])
, Integer.parseInt(split[2]));});
//创建表:将流转换成动态表。
Table table = tableEnv.fromDataStream(mapDS);
//对动态表进行查询,TableAPI方式
Table selectResult = table.where(("id").isEqual("ws001")).select(("id").isEqual("ws_001")).select(("id").isEqual("ws0​01")).select((“id”), $(“ts”), $(“vc”));
//对动态表镜像查询,SQL方式-未注册表
Table selectResult = tableEnv.sqlQuery("select * from " + table);
1.3 读取文本(FileSystem)数据,并使用TableAPI进行查询

//Flink1.10写法使用connect方式,读取txt文件并建立临时表
tableEnv.connect(new FileSystem().path(“input/sensor.txt”))
.withFormat(new Csv().fieldDelimiter(’,’).lineDelimiter("\n"))
.withSchema(new Schema().field(“id”, DataTypes.STRING())
.field(“ts”, DataTypes.BIGINT())
.field(“vc”,DataTypes.INT()))
.createTemporaryTable(“sensor”);

//转换成表对象,对表进行查询。SQL写法参考Socket段写法
Table table = tableEnv.from(“sensor”);
Table selectResult = table.groupBy(("id")).aggregate(("id")).aggregate(("id")).aggregate((“id”).count().as(“id_count”))select($(“id”), $(“id_count”));
1.4 消费Kafka数据,并使用TableAPI进行查询,分别用conncet及DDL写法

//Flink1.10写法使用connect方式,消费kafka对应主题并建立临时表
tableEnv.connect(new Kafka().version(“universal”)
.topic(“sensor”)
.startFromLatest()
.property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,“hadoop102:9092”)
.property(ConsumerConfig.GROUP_ID_CONFIG,“BD”))//消费者组
.withSchema(new Schema().field(“id”, DataTypes.STRING())
.field(“ts”, DataTypes.BIGINT())
.field(“vc”,DataTypes.INT()))
.withFormat(new Csv())
.createTemporaryTable(“sensor”);

//Flink1.10以后使用DDL写法
tableEnv.executeSql(“CREATE TABLE sensor (” +
" id STRING," +
" ts BIGINT," +
" vc INT" +
“) WITH (” +
" ‘connector’ = ‘kafka’," +
" ‘topic’ = ‘sensor’," +
" ‘properties.bootstrap.servers’ = ‘hadoop102:9092’," +
" ‘properties.group.id’ = ‘BD’," +
" ‘scan.startup.mode’ = ‘latest-offset’," +
" ‘format’ = ‘csv’" +
“)”);

//转换成表对象,对表进行查询。SQL写法参考Socket段写法
Table table = tableEnv.from(“sensor”);
Table selectResult = table.groupBy(("id")).aggregate(("id")).aggregate(("id")).aggregate((“id”).count().as(“id_count”))
.select($(“id”), $(“id_count”));
2.写入(Sink)端部分写法
2.1 写入文本文件

//创建表:创建输出表,connect写法
tableEnv.connect(new FileSystem().path(“out/sensor.txt”))
.withFormat(new Csv())
.withSchema(new Schema().field(“id”, DataTypes.STRING())
.field(“ts”, DataTypes.BIGINT())
.field(“vc”,DataTypes.INT()))
.createTemporaryTable(“sensor”);

//将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表
selectResult.executeInsert(“sensor”);
2.2 写入Kafka

//connect写法
tableEnv.connect(new Kafka().version(“universal”)
.topic(“sensor”)
.sinkPartitionerRoundRobin() //轮询写入
.property(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,“hadoop102:9092”))
.withSchema(new Schema().field(“id”, DataTypes.STRING())
.field(“ts”, DataTypes.BIGINT())
.field(“vc”,DataTypes.INT()))
.withFormat(new Json())
.createTemporaryTable(“sensor”);

//DDL写法
tableEnv.executeSql(“CREATE TABLE sensor (” +
" id STRING," +
" ts BIGINT," +
" vc INT" +
“) WITH (” +
" ‘connector’ = ‘kafka’," +
" ‘topic’ = ‘sensor’," +
" ‘properties.bootstrap.servers’ = ‘hadoop102:9092’," +
" ‘format’ = ‘json’" +
“)”);

//将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表
selectResult.executeInsert(“sensor”);
2.3 写入MySQL(JDBC方式,这里手动导入了mysql-connector-java-5.1.9.jar)

//DDL
tableEnv.executeSql(“CREATE TABLE sink_sensor (” +
" id STRING," +
" ts BIGINT," +
" vc INT," +
" PRIMARY KEY (id) NOT ENFORCED" +
“) WITH (” +
" ‘connector’ = ‘jdbc’," +
" ‘url’ = ‘jdbc:mysql://hadoop102:3306/test?useSSL=false’," +
" ‘table-name’ = ‘sink_test’," +
" ‘username’ = ‘root’," +
" ‘password’ = ‘123456’" +
“)”);

//将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表
selectResult.executeInsert(“sensor”);
2.4 写入ES

//connect写法
tableEnv.connect(new Elasticsearch()
.index(“sensor”)
.documentType("_doc")
.version(“7”)
.host(“localhost”,9200,“http”)
//设置为1,每行数据都写入是方便客户端输出展示,生产勿使用
.bulkFlushMaxActions(1))
.withSchema(new Schema()
.field(“id”, DataTypes.STRING())
.field(“ts”, DataTypes.BIGINT())
.field(“vc”,DataTypes.INT()))
.withFormat(new Json())
.inAppendMode()
.createTemporaryTable(“sensor”);
//DDL写法
tableEnv.executeSql(“CREATE TABLE sensor (” +
" id STRING," +
" ts BIGINT," +
" vc INT," +
" PRIMARY KEY (id) NOT ENFORCED" +
“) WITH (” +
" ‘connector’ = ‘elasticsearch-7’," +
" ‘hosts’ = ‘http://localhost:9200’," +
" ‘index’ = ‘users’," +
" ‘sink.bulk-flush.max-actions’ = ‘1’)"

医院挂号系统源码(含数据库)相关推荐

  1. 计算机毕业设计Java医院管理系统(系统+源码+mysql数据库+Lw文档)

    计算机毕业设计Java医院管理系统(系统+源码+mysql数据库+Lw文档) 计算机毕业设计Java医院管理系统(系统+源码+mysql数据库+Lw文档) 本源码技术栈: 项目架构:B/S架构 开发语 ...

  2. php医院挂号系统源码

    本系统主要给民营医院内部网络部和咨询师计算业绩做考核使用. 演示网址:http://gh.mediab.cn/ 欢迎试用医院网络预约系统,请按照以下四个场景按步骤操作,即可获得优质使用体验. 解决内线 ...

  3. 基于uniapp的医院挂号系统源码

    开发环境及工具: 大等于Jdk1.8,大于mysql5.5,idea(eclipse),HBuilder X 技术说明: springboot mybatis uniapp 代码注释齐全,没有多余代码 ...

  4. 响应式报名系统源码(含数据库脚本)

    报名系统 1 响应式布局–html5布局 微信.平板.电脑.手机等多终端覆盖.无需编程可快速嵌入到其它网站,可完美载入微信端,灵活对接企业微信公众号. 2 报名支付即时到账–支付接口 支付宝.微信扫描 ...

  5. 计算机毕业设计Java医院预约挂号系统(系统+源码+mysql数据库+Lw文档)

    计算机毕业设计Java医院预约挂号系统(系统+源码+mysql数据库+Lw文档) 计算机毕业设计Java医院预约挂号系统(系统+源码+mysql数据库+Lw文档) 本源码技术栈: 项目架构:B/S架构 ...

  6. java医院挂号代码_基于SSM开发的Java医院预约挂号系统 源码下载

    这是一个基于SSM开发的Java医院预约挂号系统,源码中附带主工程以及数据库文件. 目前已知Bug:因为时间预约信息是假数据,控制预约日历显示的代码在data/index.js,因为js写的有点bug ...

  7. 计算机毕业设计Java医院就诊预约(系统+源码+mysql数据库+Lw文档)

    计算机毕业设计Java医院就诊预约(系统+源码+mysql数据库+Lw文档) 计算机毕业设计Java医院就诊预约(系统+源码+mysql数据库+Lw文档) 本源码技术栈: 项目架构:B/S架构 开发语 ...

  8. java计算机毕业设计门诊预约挂号系统源码+系统+mysql数据库+lw文档

    java计算机毕业设计门诊预约挂号系统源码+系统+mysql数据库+lw文档 java计算机毕业设计门诊预约挂号系统源码+系统+mysql数据库+lw文档 本源码技术栈: 项目架构:B/S架构 开发语 ...

  9. 计算机毕业设计Java医院住院部管理(系统+源码+mysql数据库+Lw文档)

    计算机毕业设计Java医院住院部管理(系统+源码+mysql数据库+Lw文档) 计算机毕业设计Java医院住院部管理(系统+源码+mysql数据库+Lw文档) 本源码技术栈: 项目架构:B/S架构 开 ...

  10. 计算机毕业设计Java医院疫情防控管理系统(系统+源码+mysql数据库+Lw文档)

    计算机毕业设计Java医院疫情防控管理系统(系统+源码+mysql数据库+Lw文档) 计算机毕业设计Java医院疫情防控管理系统(系统+源码+mysql数据库+Lw文档) 本源码技术栈: 项目架构:B ...

最新文章

  1. 谷歌开源张量网络库TensorNetwork,GPU处理提升100倍!
  2. 替换openjdk的版本时遇到报错Transaction check error
  3. leetCode C++ 二分查找 35. 搜索插入位置 给定一个排序数组和一个目标值,在数组中找到目标值,并返回其索引。如果目标值不存在于数组中,返回它将会被按顺序插入的位置。
  4. MySQL主从同步(复制)
  5. 【CV秋季划】生成对抗网络GAN有哪些研究和应用,如何循序渐进地学习好?
  6. 六西格玛dfss_向六西格玛质量水平进攻!
  7. 国内985副教授与行政人员一年能够拿到多少工资?
  8. 6.Linux性能诊断 --- 远程通信gRPC,kafka,docker
  9. iOS 模拟器调试web/h5代码
  10. detours介绍与使用
  11. 正则表达式之邮箱地址格式+非法字符+后缀长度的验证
  12. 一个让人不得不转的故事-《通宵达旦工资只有3200 博客网架构师艰难浪迹于北京》...
  13. Xbox360 十年祭:那些荣耀、激昂、内乱与未尽的未来
  14. java计算机毕业设计物流公司停车位管理源程序+mysql+系统+lw文档+远程调试
  15. Osmo Mobile 教学
  16. Lumen企业站内容管理实战 - 网站配置
  17. 五大常用项目管理工具软件-也支持敏捷开发
  18. 《微观经济学》 第九章(二)
  19. BUUCTFweb比赛做题记录
  20. 7-9 7-10 sdut-C语言实验- 排序

热门文章

  1. 精灵骑士二觉_精灵骑士二觉版本小百科,先睹为快
  2. Linux下安装各种常用软件
  3. 计算机9网络连接不上,本地连接连不上,教您电脑本地连接连不上怎么解决
  4. PowerVR开发工具和SDK 2020 Release 1发布啦!
  5. IDEA中出现java file outside of source root
  6. 义哥征途登录显示服务器维护中,征途单机版
  7. 初探 Redis 客户端 Lettuce:真香!
  8. pspice仿真过程中出现Less than 2 connections at node
  9. Oracle字符串操作[转:http://www.cnblogs.com/xd502djj/archive/2010/08/11/1797577.html]
  10. python wx模块下choice列表框值怎么更新,python的内置模块