作者 | Jaskaran S. Puri

译者 | 火火酱 责编 | 徐威龙

封图| CSDN 下载于视觉中国

电子商务市场中典型的一天是这样的:每分钟发生1万个事件流,并且要选择合适的工具对其进行处理。

本文将帮助你重新创建一个场景,其中有大量的数据流入,你不仅需要存储这些数据,还需要对其进行一些实时分析。

这只是系统设计(System Design)的例子之一,你必须在其中开发高度可用且可扩展的数据管道。虽然在电子商务这样的用例中,可能有n个需要考虑的其他因素,但在本文中,我们会将其分成3个主要部分:

1.  提取(Ingestion)

2.  处理(Processing)

3.  输出(Output)

简而言之,几乎所有系统设计都从这个角度进行分析的,同样,这也是最容易出问题的地方。

提取层(INGESTION LAYER)

在介绍我们的工具之前,先退一步,看看这里要解决的是什么样的用例或问题。要想了解输入或提取层(Input or Ingestion layer),首先要先了解一下输出层(Output layer)。一般来讲,输出层可以通过两种方式实现:

1.  批处理(Batch Processing):如果你只进行一次分析,或者只是要更新一下每日报告,又或者只是在团队中进行随机演示,那么你可以选择成批地提取数据。这意味着可能要从你的DB数据库中取出小部分数据转储,并对其进行分析。

2.  实时处理(Real-Time Processing):也被称为流数据(Streaming Data),这种方式在进行十分重要的数据分析的情况下经常使用。这在B2C场景中最为常见。

批处理的好处是,它减少了构建实时管道的开销,而且你永远不用处理完整的数据集。尽管这在B2C环境中并不适用,尤其是在电子商务环境中,你必须推荐新产品、跟踪用户行为或设计实时仪表板。

现在,在了解了输出层的实时特性之后,我们就要选择相应的提取工具(Ingestion Tools)了。当然,要想从各种用例中获取数据,有很多工具可供选择。但是根据流行程度、社区实力和在各种用例中的实现情况来看,Kafka和Spark Streaming是很好的选择。

(Kafka:https://kafka.apache.org/

(Spark Streaming:https://spark.apache.org/streaming/

同样,要重视了解业务需求,以便确定执行同一工作的几个不同工具。在电子商务这样的场景中,我们知道需要实时输出数据,但是怎样才算实时呢?

1-2秒就算相当实时了!这点没错,但对于电子商务网站来说却并非如此,因为用户并不会等待一秒之后再执行下次点击。这就引出了延迟(latency)的概念。这是我们用来选择提取工具的标准。这两个工具之间有很多不同之处,但Kafka能够提供毫秒级的延迟!

处理层(PROCESSING LAYER)

在此用例中,我们将分别讨论Spark和Kafka的处理机制。我们将会看到spark如何处理底层硬件本不应该持有的数据。另一方面,我们也将看到使用Kafka来消费数据是多么的容易,以及其是如何处理百万级规模的数据。

我将使用以下来自Kaggle的数据集,该数据集行数超过1亿。

https://www.kaggle.com/mkechinov/ecommerce-behavior-data-from-multi-category-store

(Kaggle:https://kaggle.com/

除非你有一台非常高端的计算机,否则不可能将整个数据集加载到本地机器的内存中,甚至不可能将其拆分为多批进行处理,当然,除非你对每批传入都执行处理,这就是为什么我们要使用Spark。

(Spark:https://spark.apache.org/)

基础结构

设置spark有其复杂性,因此,为了加快速度,我们将在Databricks上启动一个spark集群,使你能够在AWS S3(数据驻留的地方)的数据支持下快速启动集群。

(Databricks:http://databricks.com/try-databricks

(AWS S3:https://aws.amazon.com/s3/)

Spark遵循典型的Master-Slave架构,该体系概括而言即主服务器(Master Server)负责所有的作业调度及一些其他工作,从服务器(Slave Server)负责执行实际操作或将数据保存在内存中。

 Spark 架构, 1 主节点(Master Node) + 2 工作/从节点(Worker/Slave Nodes)

当我们在数据上实现Spark时,会再对其进行详细讨论。就目前而言,我在数据块上构建了一个1个工作节点(Worker Node)+ 1个主节点(Master Node)集群,总共有2个核和8 GB内存,尽管完整的配置应该是4个核和16 GB内存。

之所以是2核8GB内存是因为我们所有的spark操作都只在工作节点上进行,而我们只有一个工作节点。内核数量(也就是2)将在这里扮演一个十分关键的角色,因为所有的并行化工作都将在这里发生。

在8GB内存上读取14GB数据

内存中只能存储小部分数据,因此spark所做的是:它只在你要对数据执行某些操作时,才将数据加载到内存中。例如,下面这行代码将并行读取我们的数据集,即利用2个内核来读取我们的数据。

ecomm_df = sparkSession.read.csv("/mnt/%s/ecomm.csv" % MOUNT_NAME, header=True
我们将以大约112个小块的形式提取14 GB的文件,由于我们有2个内核,所以每次取2个小块,每个小块128MB。
尽管如此,spark不会在你提交该命令时立即开始读取文件,因为还有另一个延迟计算(lazy evaluation)的概念,这使得它不能按照传统的python方式进行读取!但是我们仍然可以通过快速转换为RDD来检查该文件的分区/块数量。
ecomm_df.rdd.getNumPartitions()
OUTPUT: 110 #Number of partitions(延迟计算:https://bit.ly/2xxt2Br)

这与我们计算的十分接近。查看以下链接,了解我是如何从14 GB的文件大小中计算出112个分区的。

(链接地址:https://bit.ly/2IR77Yh

现在,在不涉及太多技术信息的情况下,我们来快速浏览一下数据:

# A SAMPLE RECORD OF OUR DATARow(event_time='2019-11-01 00:00:00 UTC', event_type='view', product_id='1003461', category_id='2053013555631882655', category_code='electronics.smartphone', brand='xiaomi', price='489.07', user_id='520088904', user_session='4d3b30da-a5e4-49df-b1a8-ba5943f1dd33')

筛选出那些仅购买了小米智能手机的人,然后执行LEFT JOIN。看看每个命令是如何被分成110个任务和2个始终并行运行的任务的。

按照品牌,分析有百分之多少的用户仅查看、添加到购物车还是购买了特定的商品,

现在你已经了解了spark的功能,这是一种可以在有限的资源集上训练/分析几乎任何大小的数据的可扩展的方法。

模拟数据的实时提取

我们之前已经讨论了不少关于Kafka的问题,所以就不在这里进行深入探讨了,让我们看看在真实场景中摄取这类数据的Kafka管道是什么样子的!

无论何时,当我们要讨论的是1亿数量级事件时,可扩展性就是要优先考虑的重中之重,对分区(partitions)和消费者组(consumer groups)的理解也是如此。在如此大的提取量下,这两个要素可以破坏我们的系统。看看下面这个架构,大致了解一下Kafka系统。

  你的邮箱/信箱就是此模型在现实生活中的副本。

1.  邮递员:这个人是生产者,他的工作是挑选数据并将其放入你的邮箱中。

2.  邮箱/信箱:这是你的代理商,如果没有人来收信的话,信就会在这里不断堆积。

3.  你的地址:这是你的主题(topic),邮递员是如何知道要把数据发送到哪里的呢?

4.  你:你就是消费者,你有责任收集这些数据并对其进行进一步处理。

这是对Kafka数据流机制的非常简单的解释,能够帮助我们进一步理解本文,而分区和消费者组的概念也能够帮助你更好地理解代码片段。

  主题容纳着数据,可以被分成n个分区以供并行使用。

在此种规模下,你需要并行处理数据。为此,可以将传入的数据分成不同的分区,此时,我们可以设置消费者组,这意味着有多个消费者希望从同一个源读取数据。

参照上面的体系结构,两个消费者从同一个源读取数据,因此,会在同一时间读取更多的数据,但读取的数据却不相同。如果消费者1(Consumer 1)已经读取了第1行和第2行,那么消费者2(Consumer 2)将永远不会看到这些数据,因为这种分离在分区级上就已经发生了。

下面是我使用分区和消费者组大规模提取此类数据的一个实现:

# 4 Partitions Made# Topic Name : ecomm_test./kafka_2.11-2.3.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic ecomm_test# Send data to ecomm_test topicproducer.send(topic='ecomm_test', value=line)# Start 2 consumers and assign it to the group "ecommGroup"consumer = KafkaConsumer('ecomm_test', group_id='ecommGroup')consumer = KafkaConsumer('ecomm_test', group_id='ecommGroup')# Output of how consumer 1 reads data, only reading from 2 partitions i.e. 0 & 1ConsumerRecord(topic=u'ecomm_test', partition=1, value='2019-11-01 00:00:01 UTC,view,17302664,2053013553853497655,,creed,28.31,561587266,755422e7-9040-477b-9bd2-6a6e8fd97387\n')ConsumerRecord(topic=u'ecomm_test', partition=0, value='2019-11-01 00:00:01 UTC,view,3601530,2053013563810775923,appliances.kitchen.washer,lg,712.87,518085591,3bfb58cd-7892-48cc-8020-2f17e6de6e7f\n')# Output of how consumer 2 reads data, only reading from 2 partitions i.e. 2 & 3ConsumerRecord(topic=u'ecomm_test', partition=3, value='2019-11-01 00:00:05 UTC,view,4600658,2053013563944993659,appliances.kitchen.dishwasher,samsung,411.83,526595547,aab33a9a-29c3-4d50-84c1-8a2bc9256104\n')ConsumerRecord(topic=u'ecomm_test', partition=2, value='2019-11-01 00:00:01 UTC,view,1306421,2053013558920217191,computers.notebook,hp,514.56,514028527,df8184cc-3694-4549-8c8c-6b5171877376\n')KISS生产架构:输出

我们只需要确保自己符合以下概念即可:

1.  KISS:保持简单愚蠢:保持架构尽可能的简单。

2.  微服务:解耦组件以避免一系列的故障。

3.  CAP定理:一致性、可用性、分区容错性。选择两个对你来说最重要的。

最后,我们将介绍可以在生产系统中实现的最终架构,尽管它们还涉及许多其他组件(如可用区、存储系统和故障转移计划)但本文只是对生产中最终处理层的概述。

   具有数据流的完整架构图

如你所见,图中描述得非常明白,没有针对所有用例的正确的架构/系统设计。你只需要根据给定资源构建出可行的方案。

原文:https://towardsdatascience.com/knowing-pyspark-and-kafka-a-100-million-events-use-case-5910159d08d7

推荐阅读:还不知道 AWS 是什么?这 11 个重点带你认识 AWS !
数据库连接池的原理没你想得这么复杂
为什么程序员如此“嫌弃”主干开发模式?
智能合约编写之 Solidity 的设计模式
2020年,5种将死的编程语言
我去,同事居然用明文存储密码!!!
真香,朕在看了!

从提取层、处理层、基础结构入手,带你了解Spark和Kafka!相关推荐

  1. AI神经网络-CNN基本结构分析( Conv层、Pooling层、FCN层/softmax层)

    <link href="https://csdnimg.cn/public/favicon.ico" rel="SHORTCUT ICON"> &l ...

  2. AI:IPPR的数学表示-CNN基本结构分析( Conv层、Pooling层、FCN层/softmax层)

    类似于SVM,CNN为代表的DNN方法的边缘参数随着多类和高精度的要求必然增长.比如向量机方法,使用可以映射到无穷维的高斯核,即使进行两类分类,在大数据集上得到高精度,即保持准确率和高精度的双指标,支 ...

  3. 狠补基础-数学+算法角度讲解卷积层,激活函数,池化层,Dropout层,BN层,全链接层

    狠补基础-数学+算法角度讲解卷积层,激活函数,池化层,Dropout层,BN层,全链接层 在这篇文章中您将会从数学和算法两个角度去重新温习一下卷积层,激活函数,池化层,Dropout层,BN层,全链接 ...

  4. php m层,M层?G层?LG层?终于搞清楚这些电梯楼层数了!

    崛地而起的高楼大厦.无处不在的电梯,已然成为城市的重要符号.不知道大家在乘坐电梯的时候有没有遇到P层.G层.M层这样的楼层按钮呢? 习惯了简单数字指示楼层的我们,被这种字母指示来的措不及防.那么这些字 ...

  5. [转]JAVA中Action层, Service层 ,modle层 和 Dao层的功能区分

    首先这是现在最基本的分层方式,结合了SSH架构.modle层就是对应的数据库表的实体类.Dao层是使用了Hibernate连接数据库.操作数据库(增删改查).Service层:引用对应的Dao数据库操 ...

  6. java 框架 Dao层 Mapper层 controller层 service层 model层 entity层 简介

    目录 简介 entity层 mapper层 service层 controller层 简介 SSM是sping+springMVC+mybatis集成的框架. MVC即model view contr ...

  7. android调频收音机代码,android 收音机 FM 驱动 hal层 框架层以及应用层代码

    [实例简介] android 收音机 FM 驱动 hal层 框架层以及应用层代码 方法一 不需要framework部分 1.fm放到 \hardware\rk2x 2.FmRadio 放到 packa ...

  8. DL之BP:利用乘法层/加法层(forward+backward)算法结合计算图(CG)求解反向求导应用题

    DL之BP:利用乘法层/加法层(forward+backward)算法结合计算图(CG)求解反向求导应用题 导读 计算图中层的实现(加法层/乘法层),其实非常简单,使用这些层可以进行复杂的导数计算.可 ...

  9. DL之DNN:基于神经网络(从1层~50层)DNN算法实现对非线性数据集点进行绘制决策边界

    DL之DNN:基于神经网络(从1层~50层)DNN算法实现对非线性数据集点进行绘制决策边界 目录 输出结果 设计代码 输出结果 设计代码 首先查看数据集 import numpy as np from ...

最新文章

  1. python详细安装教程3.8-手把手教你安装Python3.8环境
  2. About mac80211
  3. Android编译系统分析二:mm编译单个模块
  4. Android(二)——汉化版Eclipse的中英文切换
  5. 一个JSON字符串和文件处理的命令行神器jq,windows和linux都可用
  6. 分享10道常考Java面试题及答案
  7. Elasticsearch 5.2.x 使用 Head 插件连接不上集群
  8. 微软发布5月补丁星期二:3个0day,1个蠕虫
  9. malloc,free 与new delete的区别
  10. 如何利用极致业务基础平台构建一个通用企业ERP之十三盘点单设计
  11. WPF制作简易串口调试助手(上位机部分)
  12. 倾斜摄影数据OSGB进入到ArcGIS平台相关问题小结
  13. js防止双击事件触发单击事件
  14. UEFI开发与调试---ImageHandle和ControllerHandle
  15. Android ADB 环境变量配置
  16. 中国多接收器电感耦合等离子体质谱仪市场行业产销需求与投资预测分析报告2022-2028年
  17. 我对锤子ROM 功能的看法——功能篇
  18. 阅读笔记04——魔鬼搭讪学
  19. 名帖07 赵宧光 篆书《篆书四箴册》
  20. Elasticsearch 如何实现时间差查询?

热门文章

  1. 凡事,预则立,不预则废
  2. wsl安装ubuntu
  3. ie8兼容性视图对ajax,ie8的兼容性视图设置在哪
  4. [python3 实用教程]使用ctypes模块调用运行c代码
  5. 03.生成微博授权URL接口
  6. 华为云12·12直播EI专场即将开始,满足电商行业全场景搜索需求
  7. 计算几何 快速排斥和跨立实验 判断两线段相交
  8. SteamVR 场景重置
  9. oracle数据库设计实例
  10. 【小程序】语音识别 (无需转码、无需后台)