目录

  • 2.2.1 文件Source
    • 示例:
      • 1.示例数据:
      • 2.操作过程
      • 3.最终代码
  • 2.2.2 Socket Source
  • 2.2.3 集合Source
    • 示例:
      • 1.操作过程
      • 2.最终代码
  • 2.2.4 Kafka Source(主要)
    • 示例:
      • 1.虚拟机环境启动zk、kafka:
      • 2.在kafka上创建一个topic t1:
      • 3.在realtime工程的pom.xml文件中添加如下依赖:
      • 4.编写class:

2.2.1 文件Source

基于文件:
readTextFile(path)
读取文本文件,文件遵循TextInputFormat读取规则,逐行读取并返回。

示例:

1.示例数据:

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1

2.操作过程

将数据文件拷贝到D:目录下,然后创建如下的class

首先还是和我之前写的分布式系列博客4.2.2中,flink的五大步骤

然后sorce这里我们就要改动了

可以看出这个api只需要给出文件路径

3.最终代码

package com.edu.neusoft.bigdata.flink.source;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FileSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> fileSource = env.readTextFile("你的文件路径");SingleOutputStreamOperator<String> stream = fileSource.map(new MapFunction<String, String>() {@Overridepublic String map(String line) throws Exception {String[] words = line.split(",");boolean f = Double.valueOf(words[2]) > 30;return line+"->"+f;}});stream .print().setParallelism(1);env.execute("FileSource");}
}

这里我做了一个简单的transform将温度大于30的进行输出

2.2.2 Socket Source

见此博客

2.2.3 集合Source

基于集合:
fromCollection(Collection)
通过Java中的Collection集合创建一个数据流,集合中的所有元素必须是相同类型的

示例:

1.操作过程

然后创建如下的class

还是五大步骤,只需要更改sorce部分,它需要的参数是集合。

2.最终代码

package com.edu.neusoft.bigdata.flink.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;public class CollectionSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> Source = env.fromCollection(Arrays.asList(1, 3, 5, 7, 9));
//方法一
//        SingleOutputStreamOperator<Integer> stream = Source.map(new MapFunction<Integer, Integer>() {//            @Override
//            public Integer map(Integer n) throws Exception {//                return n * n;
//            }
//        });
//方法二SingleOutputStreamOperator<Integer> stream = Source.map(n -> n * n);stream.print().setParallelism(1);env.execute("CollectionSource");}
}

这了简单做了一个transform,对每个数都进行平方,这里我用了两个方法,方法一是我们之前讲到过的map方法,方法二是拉姆达表达式,更简单方便(有点像Scala里的语法)。

2.2.4 Kafka Source(主要)

第三方Source对接:
addSource可以实现对接第三方数据源的数据
系统内置提供了一批Connectors

示例:

1.虚拟机环境启动zk、kafka:

$ zkServer.sh start
$ cd /usr/local/kafka
$ bin/kafka-server-start.sh config/server.properties

2.在kafka上创建一个topic t1:

3.在realtime工程的pom.xml文件中添加如下依赖:

<!-- Kafka Connector -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.11.3</version>
</dependency>

4.编写class:

然后创建如下的class

还是五大步骤,重点是写source,代码如下

package com.edu.neusoft.bigdata.flink.source;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//kafka配置项String topic = "t1";Properties prop = new Properties();prop.setProperty("bootstrap.servers","ubuntu:9092");//消费者组idprop.setProperty("group.id","flink_kafka_group1");//初始化consumerFlinkKafkaConsumer011<String> flinkKafkaConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop);//默认的消费策略flinkKafkaConsumer.setStartFromGroupOffsets();DataStreamSource<String> kafkaSource = env.addSource(flinkKafkaConsumer);//        SingleOutputStreamOperator<Integer> stream = Source.map(new MapFunction<Integer, Integer>() {//            @Override
//            public Integer map(Integer n) throws Exception {//                return n * n;
//            }
//        });kafkaSource.print().setParallelism(1);env.execute("KafkaSource");}
}

这时我们打开kafka生产者这一端,可以看到控制台有数据产生,同时我们的kafkatool也有数据产生

分布式实时计算课程学习(2.2)——Source API相关推荐

  1. 分布式实时计算—Spark—Spark Core

    原文作者:bingoabin 原文地址:Spark Core 目录 一.Spark Core 1. 主要功能 2. Spark Core子框架 3. Spark架构 4. Spark计算模型 二.组件 ...

  2. 分布式实时计算—实时计算相关问题及解决方案

    原文作者:孟知之 原文地址:实时计算相关问题及解决方案 目录 1. 怎么处理 Spark structured streaming 慢速变化数据 join 的问题? 2. Kafka不稳定导致Spar ...

  3. 分布式实时计算—Storm—基础介绍

    目录 一.概念 二.编程模型(spout->tuple->bolt) 三.Topology 运行 四.Storm Streaming Grouping 一.概念 Storm 是一个免费并开 ...

  4. 分布式实时计算—实时数据质量如何保障?

    原文作者:阿里巴巴文娱技术 原文地址:算法基石-实时数据质量如何保障? 目录 一.现状分析 二.实时数据质量保障体系方案 三.线下质量 四.线上质量 五.质量效能 六.产品体验实时自动化保障 优酷视频 ...

  5. 分布式实时计算—从霍普金大学数据错误谈谈如何保证实时计算数据准确性

    原文作者:实时流式计算 原文地址:从霍普金大学数据错误谈谈如何保证实时计算数据准确性 目录 一.Kafka 1.Produce端消息传递 1.Consumer端消息传递 3.精确一次 二.Flink ...

  6. 大数据实时计算Spark学习笔记(9)—— Spar SQL(1) 读取 json 文件

    1 Spark SQL 编程方式:(1)SQL;(2) DataFrame API scala> case class Customer(id:Int,name:String,age:Int) ...

  7. 【大数据实时计算框架】Storm框架

    一.大数据实时计算框架 1.什么是实时计算?流式计算? (一)什么是Storm? Storm为分布式实时计算提供了一组通用原语,可被用于"流处理"之中,实时处理消息并更新数据库.这 ...

  8. 日均百亿级日志处理:微博基于Flink的实时计算平台建设

    来自:DBAplus社群 作者介绍 吕永卫,微博广告资深数据开发工程师,实时数据项目组负责人. 黄鹏,微博广告实时数据开发工程师,负责法拉第实验平台数据开发.实时数据关联平台.实时算法特征数据计算.实 ...

  9. 大数据实时处理:百分点实时计算架构和算法

    当今时代,数据不再昂贵,但从海量数据中获取价值变得昂贵,而要及时获取价值则更加昂贵,这正是大数据实时计算越来越流行的原因.以百 分点公司为例,在高峰期每秒钟会有近万HTTP请求发送到百分点服务器上,这 ...

最新文章

  1. 用Async函数简化异步代码
  2. 用JSP实现基于Web的RSS阅读器
  3. php学习之------[流程控制]
  4. datagrid 什么时候结束编辑_2020年中考结束后,什么时候出分?什么时候报志愿?...
  5. 36日期计算包含计算某月某日是星期几的公式
  6. subline text3设置中文亲测
  7. python解析css文件_Python格式化css文件的方法
  8. MySQL 入门教程
  9. Unable to open log device '/dev/log/main' : No such file or directory it ...
  10. 分布式系统工具箱 Spring Cloud 概览
  11. OpenJ_POJ C16B Robot Game 打表找规律
  12. Java笔记第五篇 文本编辑器初见面
  13. skyline 系列 3 -TerraBuilder的使用 、mpt的创建和发布
  14. python网络爬虫实战解析
  15. WiFi抓包图形化版本
  16. 钉钉如何群里定时发送文件_简单好用的钉钉群消息助手
  17. 《应用回归及分类》学习笔记1
  18. java对接云之家群聊机器人
  19. 下载mysql那个版本好_Mysql各个版本区别及官网下载
  20. 领域驱动设计整理——概念架构

热门文章

  1. 博学谷前端 CSS字体样式属性
  2. 小程序之自定义扫码界面和扫码api学习
  3. Spire.OCR for .NET Patch
  4. 文章双标题自动生成插件(火车头采集器文章自动生成双标题)
  5. visio第一次作业
  6. 基于C#和Sql Server的网上书店管理系统
  7. 找人接电话的常用套语
  8. import xx_rc 问题记录
  9. webstorm,phpstorm无法打开设置的问题
  10. 推荐一个格式化json数据的谷歌插件JSONView:谷歌浏览器中JSONVue扩展程序插件jsonview的下载与安装