分布式实时计算课程学习(2.2)——Source API
目录
- 2.2.1 文件Source
- 示例:
- 1.示例数据:
- 2.操作过程
- 3.最终代码
- 2.2.2 Socket Source
- 2.2.3 集合Source
- 示例:
- 1.操作过程
- 2.最终代码
- 2.2.4 Kafka Source(主要)
- 示例:
- 1.虚拟机环境启动zk、kafka:
- 2.在kafka上创建一个topic t1:
- 3.在realtime工程的pom.xml文件中添加如下依赖:
- 4.编写class:
2.2.1 文件Source
基于文件:
readTextFile(path)
读取文本文件,文件遵循TextInputFormat读取规则,逐行读取并返回。
示例:
1.示例数据:
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1
2.操作过程
将数据文件拷贝到D:目录下,然后创建如下的class
首先还是和我之前写的分布式系列博客4.2.2中,flink的五大步骤
然后sorce这里我们就要改动了
可以看出这个api只需要给出文件路径
3.最终代码
package com.edu.neusoft.bigdata.flink.source;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FileSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> fileSource = env.readTextFile("你的文件路径");SingleOutputStreamOperator<String> stream = fileSource.map(new MapFunction<String, String>() {@Overridepublic String map(String line) throws Exception {String[] words = line.split(",");boolean f = Double.valueOf(words[2]) > 30;return line+"->"+f;}});stream .print().setParallelism(1);env.execute("FileSource");}
}
这里我做了一个简单的transform将温度大于30的进行输出
2.2.2 Socket Source
见此博客
2.2.3 集合Source
基于集合:
fromCollection(Collection)
通过Java中的Collection集合创建一个数据流,集合中的所有元素必须是相同类型的
示例:
1.操作过程
然后创建如下的class
还是五大步骤,只需要更改sorce部分,它需要的参数是集合。
2.最终代码
package com.edu.neusoft.bigdata.flink.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;public class CollectionSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> Source = env.fromCollection(Arrays.asList(1, 3, 5, 7, 9));
//方法一
// SingleOutputStreamOperator<Integer> stream = Source.map(new MapFunction<Integer, Integer>() {// @Override
// public Integer map(Integer n) throws Exception {// return n * n;
// }
// });
//方法二SingleOutputStreamOperator<Integer> stream = Source.map(n -> n * n);stream.print().setParallelism(1);env.execute("CollectionSource");}
}
这了简单做了一个transform,对每个数都进行平方,这里我用了两个方法,方法一是我们之前讲到过的map方法,方法二是拉姆达表达式,更简单方便(有点像Scala里的语法)。
2.2.4 Kafka Source(主要)
第三方Source对接:
addSource可以实现对接第三方数据源的数据
系统内置提供了一批Connectors
示例:
1.虚拟机环境启动zk、kafka:
$ zkServer.sh start
$ cd /usr/local/kafka
$ bin/kafka-server-start.sh config/server.properties
2.在kafka上创建一个topic t1:
3.在realtime工程的pom.xml文件中添加如下依赖:
<!-- Kafka Connector -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.11.3</version>
</dependency>
4.编写class:
然后创建如下的class
还是五大步骤,重点是写source,代码如下
package com.edu.neusoft.bigdata.flink.source;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//kafka配置项String topic = "t1";Properties prop = new Properties();prop.setProperty("bootstrap.servers","ubuntu:9092");//消费者组idprop.setProperty("group.id","flink_kafka_group1");//初始化consumerFlinkKafkaConsumer011<String> flinkKafkaConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop);//默认的消费策略flinkKafkaConsumer.setStartFromGroupOffsets();DataStreamSource<String> kafkaSource = env.addSource(flinkKafkaConsumer);// SingleOutputStreamOperator<Integer> stream = Source.map(new MapFunction<Integer, Integer>() {// @Override
// public Integer map(Integer n) throws Exception {// return n * n;
// }
// });kafkaSource.print().setParallelism(1);env.execute("KafkaSource");}
}
这时我们打开kafka生产者这一端,可以看到控制台有数据产生,同时我们的kafkatool也有数据产生
分布式实时计算课程学习(2.2)——Source API相关推荐
- 分布式实时计算—Spark—Spark Core
原文作者:bingoabin 原文地址:Spark Core 目录 一.Spark Core 1. 主要功能 2. Spark Core子框架 3. Spark架构 4. Spark计算模型 二.组件 ...
- 分布式实时计算—实时计算相关问题及解决方案
原文作者:孟知之 原文地址:实时计算相关问题及解决方案 目录 1. 怎么处理 Spark structured streaming 慢速变化数据 join 的问题? 2. Kafka不稳定导致Spar ...
- 分布式实时计算—Storm—基础介绍
目录 一.概念 二.编程模型(spout->tuple->bolt) 三.Topology 运行 四.Storm Streaming Grouping 一.概念 Storm 是一个免费并开 ...
- 分布式实时计算—实时数据质量如何保障?
原文作者:阿里巴巴文娱技术 原文地址:算法基石-实时数据质量如何保障? 目录 一.现状分析 二.实时数据质量保障体系方案 三.线下质量 四.线上质量 五.质量效能 六.产品体验实时自动化保障 优酷视频 ...
- 分布式实时计算—从霍普金大学数据错误谈谈如何保证实时计算数据准确性
原文作者:实时流式计算 原文地址:从霍普金大学数据错误谈谈如何保证实时计算数据准确性 目录 一.Kafka 1.Produce端消息传递 1.Consumer端消息传递 3.精确一次 二.Flink ...
- 大数据实时计算Spark学习笔记(9)—— Spar SQL(1) 读取 json 文件
1 Spark SQL 编程方式:(1)SQL;(2) DataFrame API scala> case class Customer(id:Int,name:String,age:Int) ...
- 【大数据实时计算框架】Storm框架
一.大数据实时计算框架 1.什么是实时计算?流式计算? (一)什么是Storm? Storm为分布式实时计算提供了一组通用原语,可被用于"流处理"之中,实时处理消息并更新数据库.这 ...
- 日均百亿级日志处理:微博基于Flink的实时计算平台建设
来自:DBAplus社群 作者介绍 吕永卫,微博广告资深数据开发工程师,实时数据项目组负责人. 黄鹏,微博广告实时数据开发工程师,负责法拉第实验平台数据开发.实时数据关联平台.实时算法特征数据计算.实 ...
- 大数据实时处理:百分点实时计算架构和算法
当今时代,数据不再昂贵,但从海量数据中获取价值变得昂贵,而要及时获取价值则更加昂贵,这正是大数据实时计算越来越流行的原因.以百 分点公司为例,在高峰期每秒钟会有近万HTTP请求发送到百分点服务器上,这 ...
最新文章
- 用Async函数简化异步代码
- 用JSP实现基于Web的RSS阅读器
- php学习之------[流程控制]
- datagrid 什么时候结束编辑_2020年中考结束后,什么时候出分?什么时候报志愿?...
- 36日期计算包含计算某月某日是星期几的公式
- subline text3设置中文亲测
- python解析css文件_Python格式化css文件的方法
- MySQL 入门教程
- Unable to open log device '/dev/log/main' : No such file or directory it ...
- 分布式系统工具箱 Spring Cloud 概览
- OpenJ_POJ C16B Robot Game 打表找规律
- Java笔记第五篇 文本编辑器初见面
- skyline 系列 3 -TerraBuilder的使用 、mpt的创建和发布
- python网络爬虫实战解析
- WiFi抓包图形化版本
- 钉钉如何群里定时发送文件_简单好用的钉钉群消息助手
- 《应用回归及分类》学习笔记1
- java对接云之家群聊机器人
- 下载mysql那个版本好_Mysql各个版本区别及官网下载
- 领域驱动设计整理——概念架构