SpringBoot整合Flink(施耐德PLC物联网信息采集)

Linux环境安装kafka

前情:

施耐德PLC设备(TM200C16R)设置好信息采集程序,连接局域网,SpringBoot订阅MQTT主题,消息转至kafka,由flink接收并持久化到mysql数据库;

Wireshark抓包如下:

MQTTBox测试订阅如下:

已知参数:

服务器IP:139.220.193.14

端口号:1883

应用端账号:admin@tenlink

应用端密码:Tenlink@123

物联网账号:202303171001

物联网账号密码:03171001

订阅话题(topic):

202303171001/p(发布话题,由设备发送,应用端接收)

202303171001/s(订阅话题,由应用端发送,设备接收)

订阅mqtt (前提是kafka是已经就绪状态且plc_thoroughfare主题是存在的)

  • maven pom

        <dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
  • yaml配置

spring:kafka:bootstrap-servers: ip:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer## 自定义
kafka:topics:# kafka 主题plc1: plc_thoroughfareplc:broker: tcp://139.220.193.14:1883subscribe-topic:  202303171001/pusername: admin@tenlinkpassword: Tenlink@123client-id: subscribe_client
  • 订阅mqtt并将报文发送到kafka主题

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** PLC 订阅消息*/
@Component
public class SubscribeSample {private static final Logger log = LoggerFactory.getLogger(SubscribeSample.class);@Autowiredprivate KafkaTemplate<String,Object> kafkaTemplate;@Value("${kafka.topics.plc1}")private String plc1;@Value("${plc.broker}")private String broker;@Value("${plc.subscribe-topic}")private String subscribeTopic;@Value("${plc.username}")private String username;@Value("${plc.password}")private String password;@Value("${plc.client-id}")private String clientId;@PostConstructpublic void plcGather() {int qos = 0;Thread thread = new Thread(new Runnable() {@Overridepublic void run() {MqttClient client = null;try {client = new MqttClient(broker, clientId, new MemoryPersistence());// 连接参数MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(60);options.setKeepAliveInterval(60);// 设置回调client.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {System.out.println("connectionLost: " + cause.getMessage());}public void messageArrived(String topic, MqttMessage message) {String data = new String(message.getPayload());kafkaTemplate.send(plc1,data).addCallback(success ->{// 消息发送到的topicString kafkaTopic = success.getRecordMetadata().topic();// 消息发送到的分区
//                                int partition = success.getRecordMetadata().partition();// 消息在分区内的offset
//                                long offset = success.getRecordMetadata().offset();log.info("mqtt成功将消息:{},转入到kafka主题->{}", data,kafkaTopic);},failure ->{throw new RuntimeException("发送消息失败:" + failure.getMessage());});}public void deliveryComplete(IMqttDeliveryToken token) {log.info("deliveryComplete---------{}", token.isComplete());}});client.connect(options);client.subscribe(subscribeTopic, qos);} catch (MqttException e) {e.printStackTrace();}}});thread.start();}
}
  • 采集报文测试(如下图表示成功,并且已经发送到了kafka主题上)

Flink接收kafka数据

  • maven pom

<!--工具类 开始--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-collections4</artifactId><version>4.4</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency><!--工具类 结束--><!-- flink依赖引入 开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.13.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.13.1</version></dependency><!-- flink连接kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.13.1</version></dependency><!-- flink连接es--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.13.1</version></dependency><!-- flink连接mysql--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.0</version></dependency><!-- flink依赖引入 结束--><!--spring data jpa--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency>
  • yaml配置

# 服务接口
server:port: 8222spring:kafka:bootstrap-servers: ip:9092consumer:group-id: kafkakey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerdatasource:url:  jdbc:mysql://127.0.0.01:3306/ceshi?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghaidriver-class-name:  com.mysql.cj.jdbc.Driverusername: rootpassword: rootdruid:initial-size: 5 #初始化时建立物理连接的个数min-idle: 5 #最小连接池数量maxActive: 20 #最大连接池数量maxWait: 60000 #获取连接时最大等待时间,单位毫秒timeBetweenEvictionRunsMillis: 60000 #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒minEvictableIdleTimeMillis: 300000 #配置一个连接在池中最小生存的时间,单位是毫秒validationQuery: SELECT 1 #用来检测连接是否有效的sqltestWhileIdle: true #申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效testOnBorrow: false #申请连接时执行validationQuery检测连接是否有效,如果为true会降低性能testOnReturn: false #归还连接时执行validationQuery检测连接是否有效,如果为true会降低性能poolPreparedStatements: true # 打开PSCache,并且指定每个连接上PSCache的大小maxPoolPreparedStatementPerConnectionSize: 20 #要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。在Druid中,不会存在Oracle下PSCache占用内存过多的问题,可以把这个数值配置大一些,比如说100filters: stat,wall,slf4j #配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙#通过connectProperties属性来打开mergeSql功能;慢SQL记录connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000jpa:hibernate:ddl-auto: noneshow-sql: truerepositories:packages: com.hzh.demo.domain.*#自定义配置
customer:#flink相关配置flink:# 功能开关plc-status: trueplc-topic: plc_thoroughfare# 定时任务定时清理失效数据
task:plc-time: 0 0/1 * * * ?
  • 表结构

-- plc_test definition
CREATE TABLE `plc_test` (`pkid` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '主键id',`json_str` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'json格式数据',`create_time` bigint NOT NULL COMMENT '创建时间',PRIMARY KEY (`pkid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='plc存储数据测试表';
  • 启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableJpaRepositories(basePackages = "repository basePackages")
@EntityScan("entity basePackages")
@EnableScheduling
public class PLCStorageApplication {public static void main(String[] args) {SpringApplication.run(PLCStorageApplication.class, args);}
}
  • 实体类

import lombok.Builder;
import lombok.Data;import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;/*** PLC接收实体*/
@Table(name = "plc_test")
@Data
@Builder
@Entity
public class PLCDomain implements Serializable {private static final long serialVersionUID = 4122384962907036649L;@Id@Column(name = "pkid")public String id;@Column(name = "json_str")public String jsonStr;@Column(name = "create_time")private Long createTime;public PLCDomain(String id, String jsonStr,Long createTime) {this.id = id;this.jsonStr = jsonStr;this.createTime = createTime;}public PLCDomain() {}
}
  • jpa 接口

import com.hzh.demo.domain.PLCDomain;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;@Repository
public interface PLCRepository extends JpaRepository<PLCDomain,String> {}
  • 封装获取上下文工具类(ApplicationContextAware)由于加载先后顺序,flink无法使用spring bean注入的方式,特此封装工具类

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.i18n.LocaleContextHolder;
import org.springframework.stereotype.Component;@Component
public class ApplicationContextProviderimplements ApplicationContextAware {/*** 上下文对象实例*/private static ApplicationContext applicationContext;/*** 获取applicationContext** @return*/public static ApplicationContext getApplicationContext() {return applicationContext;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {ApplicationContextProvider.applicationContext = applicationContext;}/*** 通过name获取 Bean.** @param name* @return*/public static Object getBean(String name) {return getApplicationContext().getBean(name);}/*** 通过class获取Bean.** @param clazz* @param <T>* @return*/public static <T> T getBean(Class<T> clazz) {return getApplicationContext().getBean(clazz);}/*** 通过name,以及Clazz返回指定的Bean** @param name* @param clazz* @param <T>* @return*/public static <T> T getBean(String name, Class<T> clazz) {return getApplicationContext().getBean(name, clazz);}/*** 描述 : <获得多语言的资源内容>. <br>* <p>* <使用方法说明>* </p>** @param code* @param args* @return*/public static String getMessage(String code, Object[] args) {return getApplicationContext().getMessage(code, args, LocaleContextHolder.getLocale());}/*** 描述 : <获得多语言的资源内容>. <br>* <p>* <使用方法说明>* </p>** @param code* @param args* @param defaultMessage* @return*/public static String getMessage(String code, Object[] args,String defaultMessage) {return getApplicationContext().getMessage(code, args, defaultMessage,LocaleContextHolder.getLocale());}
}
  • FIink 第三方输出(mysql写入)

import com.hzh.demo.config.ApplicationContextProvider;
import com.hzh.demo.domain.PLCDomain;
import com.hzh.demo.repository.PLCRepository;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;import java.util.UUID;/*** 向mysql写入数据*/
@Component
@ConditionalOnProperty(name = "customer.flink.plc-status")
public class MysqlSink implements SinkFunction<String> {private static final Logger log = LoggerFactory.getLogger(MysqlSink.class);@Overridepublic void invoke(String value, Context context) throws Exception {long currentTime = context.currentProcessingTime();PLCDomain build = PLCDomain.builder().id(UUID.randomUUID().toString().replaceAll("-", "")).jsonStr(value).createTime(currentTime).build();PLCRepository repository = ApplicationContextProvider.getBean(PLCRepository.class);repository.save(build);log.info("持久化写入:{}",build);SinkFunction.super.invoke(value, context);}
}
  • Flink订阅kafka topic读取持续数据

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.Properties;/*** 接收 kafka topic 读取数据*/
@Component
@ConditionalOnProperty(name = "customer.flink.plc-status")
public class FlinkReceivingPLC {private static final Logger log = LoggerFactory.getLogger(MyKeyedProcessFunction.class);@Value("${spring.kafka.bootstrap-servers:localhost:9092}")private String kafkaServer;@Value("${customer.flink.plc-topic}")private String topic;@Value("${spring.kafka.consumer.group-id:kafka}")private String groupId;@Value("${spring.kafka.consumer.key-deserializer:org.apache.kafka.common.serialization.StringDeserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer:org.apache.kafka.common.serialization.StringDeserializer}")private String valueDeserializer;/*** 执行方法** @throws Exception 异常*/@PostConstructpublic void execute(){final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);//设定全局并发度env.setParallelism(1);Properties properties = new Properties();//kafka的节点的IP或者hostName,多个使用逗号分隔properties.setProperty("bootstrap.servers", kafkaServer);//kafka的消费者的group.idproperties.setProperty("group.id", groupId);properties.setProperty("key-deserializer",keyDeserializer);properties.setProperty("value-deserializer",valueDeserializer);FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);DataStream<String> stream = env.addSource(myConsumer);stream.print().setParallelism(1);stream//分组.keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String value) throws Exception {return value;}})//指定处理类
//                .process(new MyKeyedProcessFunction())//数据第三方输出,mysql持久化.addSink(new MysqlSink());//启动任务new Thread(() -> {try {env.execute("PLCPersistenceJob");} catch (Exception e) {log.error(e.toString(), e);}}).start();}
}
  • 失效数据清理机制(为了方便测试,所以清理机制执行频率高且数据失效低)

import com.hzh.demo.repository.PLCRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.Optional;/*** 定时任务配置*/
@Component
@Configuration
public class QutrzConfig {private static final Logger log = LoggerFactory.getLogger(QutrzConfig.class);@Autowiredprivate PLCRepository plcRepository;/*** 数据清理机制*/@Scheduled(cron = "${task.plc-time}")private void PLCCleaningMechanism (){log.info("执行数据清理机制:{}","PLCCleaningMechanism");long currentTimeMillis = System.currentTimeMillis();Optional.of(this.plcRepository.findAll()).ifPresent(list ->{list.forEach(plc ->{Long createTime = plc.getCreateTime();//大于1分钟为失效数据if ((currentTimeMillis - createTime) > (1000 * 60 * 1) ){this.plcRepository.delete(plc);log.info("过期数据已经被清理:{}",plc);}});});}
}
  • 测试结果

  • mysql入库数据

SpringBoot整合Flink(施耐德PLC物联网信息采集)相关推荐

  1. springboot整合flink

    之前看到钉钉群里面有人在讨论这个,导致我被糊弄了一下. 其实并没有这个场景需求, 真正的需求其实是, ①sprinboot连接kafka队列. ②kafka队列连接Flink 对于①招聘岗位的JD中常 ...

  2. SpringBoot整合第三方技术学习笔记(自用)

    SpringBoot整合第三方技术学习笔记 搬运黑马视频配套笔记 KF-4.数据层解决方案 KF-4-1.SQL 回忆一下之前做SSMP整合的时候数据层解决方案涉及到了哪些技术?MySQL数据库与My ...

  3. 在工业污水处理中实现施耐德PLC的远程监控和上下载

    随着中国经济的快速发展和人均GDP的不断增长,人们在享受丰富的物质生活的同时也面临这环境污染的问题.全国各地大大小小的环境污染事件时有发生,在一定程度上暴露出污水治理的难题. 我国污水处理厂存在站点数 ...

  4. 施耐德PLC网关如何实现PLC远程上下载和编程配置?

    施耐德电气在全球能源管理与自动化领域有着极大的声誉,旗下的可编程控制器PLC在能源行业.环保行业.制造行业.水利行业等广泛应用,也是很多工程师经常接触的产品. 施耐德PLC由TWIDO系列.M系列.M ...

  5. 施耐德PLC如何实现组态监控和远程维护?

    工业组态是一种可以反映工艺流程.实现远程监控的系统软件,在很多工业领域都有使用.组态软件设备作为上位机,可以连接PLC.仪器仪表.智能模块等等设备或第三方软件,为企业提供实时高效的监控手段. 施耐德P ...

  6. modbus/施耐德PLC协议网关

    物通博联modbus/施耐德PLC协议网关即wtblnet iot Gateway,是一款支持单网口/两网口/五网口,支持4G/3G/WIFI/PPPOE/WAN有线网络,内嵌工业控制协议,支持远程自 ...

  7. 施耐德PLC如何进行远程维护?

    在现在的工业控制中,PLC发挥了重要的作用.施耐德PLC具备运行稳定可靠的优势,但也存在故障的风险,进而影响到生产和管理的各个方面,因此企业对于PLC的维护也是十分关注.下面简单介绍下几个常见的PLC ...

  8. SpringBoot 整合 MQ

    目录 简介 整合ActiveMQ 整合KAFKA 整合RocketMQ 整合RabbitMQ 简介 对于消息的生产者与消费者的工作模式,还可以将消息划分成两种模式,同步消费与异步消息. 同步消费 所谓 ...

  9. SpringBoot整合篇-雷丰阳-专题视频课程

    SpringBoot整合篇-317人已学习 课程介绍         本视频<SpringBoot>属于下部,着重介绍SpringBoot的与各大场景的整合使用,内容包括:缓存(整合Red ...

最新文章

  1. 去大厂面试,说了没高并发经验,面试官还是抓着这个问!
  2. 数据统计之用户总量统计
  3. docwizard c++程序文档自动生成工具_如何开发一个基于 TypeScript 的工具库并自动生成文档
  4. 华为谈“不造车” ,每辆车上赚1万元。王兴:特斯拉终于遇到真正的对手!...
  5. JQueryEasyUI datagrid框架的基本使用
  6. linux内存管理初始化
  7. CSDN博文下载器(JAVA)
  8. linux arm 运行exe,ARM_Linux嵌入式开发环境配置
  9. Spark、Hadoop大数据平台搭建
  10. leetcode 5724. 绝对差值和
  11. Windows 10, version 22H2 (released Oct 2022) 简体中文版、英文版下载
  12. Neokylin7安装gedit
  13. 【微机原理作业】8086存储器读写实验
  14. 洛谷 P3817 小A的糖果
  15. hdu 1548 A strange lift (BFS)
  16. 使用命令行浏览器在 Linux 终端上网浏览
  17. 开发用于互操作性的应用程序_云标准:确保云应用互操作性的工具
  18. javascript 跳转打开 网页代码
  19. Linux和Apple
  20. 【疑难杂症】解决了这些问题,你就迈进了安卓高级工程师门槛

热门文章

  1. http://makaidong.com/xiaohua0877/1/417889_12562712.html
  2. 用Grafana5.1 给zabbix 3.4.9 披上白富美的外衣
  3. 埃托奥憾中立柱 墨西哥好球被吹胜喀麦隆
  4. pytest 之 mark 用法汇总
  5. python基础(一) P22-P53
  6. [Linux]服务器断开连接程序继续执行
  7. scanf的返回值被忽略
  8. Matlab 在线使用 | 推荐
  9. ACM中JAVA高速IO外挂!!!
  10. freeswitch-ivr语音导航