文章目录

  • 前言
  • 1 Pulsar安装部署
    • 1.1 Pulsar集群搭建
    • 1.2 安装前准备
    • 1.3 安装Pulsar
    • 1.4 安装Pulsar 连接器(可选)
    • 1.5 部署ZooKeeper集群
    • 1.6 配置集群信息
    • 1.7 部署BookKeeper集群
  • 2 Pulsar可视化
    • 2.1 Pulsar Manager安装
    • 2.2 Pulsar Manager使用
      • 2.2.1 初始化账号
      • 2.2.2 添加environment
  • 3 使用Pulsar发布、订阅消息(Java)
    • 3.1 安装Pulsar Java客户端
    • 3.2 构造Client
    • 3.3 构造生产者
    • 3.4 构造消费者

前言

最近apache pulsar出镜率挺高的,这里新开一篇文档记录一下pulsar的学习之路
前面有介绍Apache Pulsar基本理论,这里再开一篇介绍一个Pulsar的基本部署与可视化,还有一个小demo


1 Pulsar安装部署

1.1 Pulsar集群搭建

温馨提示

  1. 单集群的Pulsar实例可以满足绝大多数学习、开发、验证需求,如果没有特殊的需要,建议使用单集群Pulsar实例,多集群的安装部署参考多集群部署。
  2. 在部署Pulsar过程中,如需要使用所有内置的Pulsar IO 连接器,需要下载apache-pulsar-io-connectors(apache-pulsar-io-connectors-2.1.1-incubating-bin.tar.gz),并安装到Pulsar的connectors目录下。

Pulsar集群的部署步骤如下:

  • 部署ZooKeeper集群(可选)
  • 初始化集群元信息
  • 部署BookKeeper集群
  • 部署一个或多个Pulsar broker

1.2 安装前准备

如果你已经有一个ZooKeeper集群,并且愿意重用该集群,则无需准备安装ZooKeeper集群的资源,也无需部署ZooKeeper集群

  1. 至少6台Linux机器或虚拟机
  • 三台机器用于部署ZooKeeper集群,Pulsar仅会定期使用ZooKeeper进行协调和配置任务,业务操作不依赖ZooKeeper集群,部署时可以使用性能规格较低的机器。
  • 三台部署BookKeeper集群和Pulsar broker。Puslar集群实际承载业务,建议使用性能规格更高的机器,比如计算能力更强的CPU、10Gbps NIC、SSD硬盘或高性能存储。
  1. 覆盖所有节点的DNS名称,如果没有DNS服务器可以通过hosts文件实现。
  2. 所有的机器需要安装Java 8或更高版本


本文安装部署Pulsar集群的节点信息

节点 规格 部署组件 主机名地址
Pulsar-zk-01 4vCPU,8G内存 ZooKeeper集群 host06
Pulsar-zk-02 4vCPU,8G内存 ZooKeeper集群 host03
Pulsar-zk-03 4vCPU,8G内存 ZooKeeper集群 host04
Pulsar-bk-01 8vCPU,16G内存 BookKeeper集群 host02
Pulsar-bk-02 8vCPU,16G内存 BookKeeper集群 host01
Pulsar-bk-03 8vCPU,16G内存 BookKeeper集群 host05

1.3 安装Pulsar

集群中的每个节点都需要安装Pulsar二进制包,包括ZooKeeper和BookKeeper节点。

  • 获取Pulsar安装包(apache-pulsar-2.8.0-bin.tar.gz)
  • 将软件包拷贝到/opt目录下解压,并将解压的目录重命名为pulsarCluster
tar -zxvf apache-pulsar-2.1.1-incubating-bin.tar.gz
mv apache-pulsar-2.1.1-incubating pulsarCluster·

*确保/opt目录有足够的磁盘空间,或使用其他目录安装
Pulsar的目录结构如下表:

目录 内容
bin Pulsar的命令行工具
conf Pulsar的配置文件
data 存储ZooKeeper和BookKeeper数据
lib Pulsar使用的第三方库
logs 日志存储路径
examples Pulsar提供的样例

1.4 安装Pulsar 连接器(可选)

从2.1.0-inclubating版本开始,Pulsar单独发布了包含所有内置连接器的二进制包,如果想使用这些内置的连接器,可以参考下面的步骤安装,如果不需要可以直接跳过。

  • 获取Pulsar IO Connectors软件包(apache-pulsar-io-connectors-2.1.1-incubating-bin.tar.gz)
  • 解压软件包,并将connector目录拷贝到Pulsar安装目录(/opt/puslarCluster)
tar -zxvf apache-pulsar-io-connectors-2.1.1-incubating-bin.tar.gz
cd apache-pulsar-io-connectors-2.1.1-incubating
cp -r connectors/ /opt/pulsarCluster

1.5 部署ZooKeeper集群

  • 获取ZooKeeper安装包
  • 将安装包拷贝到三个节点的/opt目录解压
tar -zxvf zookeeper-3.4.12.tar.gz
  • 修改 conf/zookeeper.conf配置,增加ZooKeeper集群节点信息
server.1=host06:2888:3888
server.2=host03:2888:3888
server.3=host04:2888:3888cd /opt/pulsarCluster
echo “server.1=host06:2888:3888” >> conf/zookeeper.conf
echo “server.1=host03:2888:3888” >> conf/zookeeper.conf
echo “server.1=host04:2888:3888” >> conf/zookeeper.conf
  • 配置ZooKeeper myid信息
cd /opt/pulsarCluster
mkdir -p data/zookeeper
echo 1 > data/zookeeper/myid
  • 以守护进程启动ZooKeeper
cd /opt/pulsarCluster/bin
./pulsar-daemon start zookeeper

1.6 配置集群信息

部署完ZooKeeper集群后,需要将一些Pulsar集群的元信息写入ZooKeeper集群的每个节点,由于数据在ZooKeeper集群内部会互相同步,因此只需要将元信息写入ZooKeeper的一个节点。可以在ZooKeeper集群的任意节点通过pulsar工具的initialize-cluster-metadata方法配置数据,配置命令只需执行一次,否则ZooKeeper会报节点已经存在的错误。命令的一个简单样例如下:

$ ./pulsar initialize-cluster-metadata \--cluster pulsar-cluster-zk-1 \--zookeeper host06:2181 \--configuration-store host06:2181 \--web-service-url http://pulsar.cluster.com:8080 \--web-service-url-tls https://pulsar.cluster.com:8443 \--broker-service-url pulsar://pulsar.cluster.com:6650 \--broker-service-url-tls pulsar+ssl://pulsar.cluster.com:6651

在本文的安装部署过程中,Pulsar集群的名称为pulsar-cluster,统一域名pulsar.cluster.com。命令参数的具体含义如下:

Flag Description
–cluster 集群名称
–zookeeper ZooKeeper集群连接参数,仅需要包含集群中的一个节点即可
–configuration-store Pulsar实例的配置存储集群(ZooKeeper),和-zookeeper参数一样只需要包含集群中的一个节点即可
–web-service-url 集群Web服务的URL+端口,URL必须是一个i标准的DNS名称,默认端口8080,不建议修改。
–web-service-url-tls 集群Web提供TLS服务的URL+端口,端口默认8443,不建议修改。
–broker-service-url 集群brokers服务URL,URL中DNS的名称和Web服务保持一致,URL使用pulsar替代http/http,端口默认6650,不建议修改。
–broker-service-url-tls 集群brokers提供TLS服务的URL,默认端口6551,不建议修改。

1.7 部署BookKeeper集群

  • 配置BookKeeper集群

Pulsar集群中所有持久数据的存储都由BookKeeper负责,因此如果想使用Pulsar需要部署一个BookKeeper集群,建议部署一个包含3个bookie节点的BookKeeper集群。BookKeeper集群的配置使用conf/bookkeeper.conf文件,BookKeeper最终的配置是配置ZooKeeper集群的地址,一个具体的配置例子如下

zkServers=host06:2181,host03:2181,host04:2181

Pulsar从2.1.0版本开始引入了有状态函数,如果想使用该功能,还需要在conf/bookkeeper.conf文件中增加如下配置:

extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
  • 启动BooKeeper集群
  • 后台进程启动
cd /opt/pulsarCluster/bin
./pulsar-daemon start bookie
  • 前台进程启动
cd /opt/pulsarCluster/bin
./bookkeeper bookie
  • 检查BookKeeper集群状态
cd /opt/pulsarCluster/bin./bookkeeper shell bookiesanity

如果BookKeeper成功运行,输出的最后一行结果为

org.apache.bookkeeper.bookie.BookieShell - Bookie sanity test succeeded

2 Pulsar可视化

Apache Pulsar Manager 是一个基于网页的 GUI 管理和监控工具,帮助 Pulsar 管理员和用户管理和监控 Tenant、Namespace、Topic、Subscription、Broker 和 Cluster 等,并支持动态配置多种环境。

2.1 Pulsar Manager安装

最简单的方式是docker安装:

$ yum install -y docker
$ systemctl start docker
$ systemctl enable docker
  • 下载镜像并安装Pulsar Manager
$ docker pull apachepulsar/pulsar-manager:v0.2.0
$ docker run -dit \-p 9527:9527 -p 7750:7750 \-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \apachepulsar/pulsar-manager:v0.2.0

2.2 Pulsar Manager使用

2.2.1 初始化账号

需要使用curl命令添加一个账户

$ CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)

$ curl \-H 'X-XSRF-TOKEN: $CSRF_TOKEN' \-H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' \-H "Content-Type: application/json" \-X PUT http://localhost:7750/pulsar-manager/users/superuser \-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'


调用完成之后,可以使用用户名admin,密码apachepulsar登录pulsar manager
使用浏览器http://hostname:9527

2.2.2 添加environment

如需管理多集群,只需要配置 serviceURL,即能在多个环境间自由切换

  • 管理 Tenant

支持对 Tenant 进行增加、修改和删除等操作

  • 管理 Namespace

支持对 Namespace 进行增加、删除和修改 policy 等操作。

  • 管理 Subscription

支持对 Subscription 进行 skip、expire、clear 和 reset 等操作。

  • 管理 Cluster

支持对 Cluster 进行浏览和配置等操作。

  • 管理 Broker

支持对 Broker 进行浏览、健康检查和配置查询等操作

  • 监控 Topic 和 Subscription



3 使用Pulsar发布、订阅消息(Java)

3.1 安装Pulsar Java客户端

如果使用Maven,在工程的pom文件增加如下配置:

<!-- 在<properties> 块中增加版本号信息 -->
<pulsar.version>2.1.1-incubating</pulsar.version><!--增加依赖 -->
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>${pulsar.version}</version>
</dependency>

3.2 构造Client

  • Pular连接 URLs
  • 本地Pulsar
pulsar://localhost:6650
  • 生成环境Pulsar集群(使用域名
pulsar://pulsar.cluster.com:6650
  • 开启TLS的条件下,Pulsar集群连接URLs
pulsar+ssl://pulsar.cluster.com:6651
  • 配置Pulsar客户端

客户端的配置主要包括:Pulsar集群信息配置、鉴权信息配置、TLS配置、线程数连接数配置等。具体的配置可以参考Pulsar Client配置。
基于本文搭建的集群,只保留最简单的配置信息,Pulsar Client的构造如下:

  private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";PulsarClient client = PulsarClient.builder().serviceUrl(SERVER_URL).enableTcpNoDelay(true).build();

3.3 构造生产者

一个完整的生产者样例如下:

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class ProducerDemm {private static final Logger log = LoggerFactory.getLogger(ProducerDemm.class);private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";public static void main(String[] args) throws Exception {// 构造Pulsar ClientPulsarClient client = PulsarClient.builder().serviceUrl(SERVER_URL).enableTcpNoDelay(true).build();// 构造生产者Producer<String> producer = client.newProducer(Schema.STRING).producerName("my-producer").topic("persistent://public/default/my-topic").batchingMaxMessages(1024).batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS).enableBatching(true).blockIfQueueFull(true).maxPendingMessages(512).sendTimeout(10, TimeUnit.SECONDS).blockIfQueueFull(true).create();// 同步发送消息MessageId messageId = producer.send("Hello World");log.info("message id is {}",messageId);CompletableFuture<MessageId> asyncMessageId = producer.sendAsync("This is a async message");// 阻塞线程,直到返回结果log.info("async message id is {}",asyncMessageId.get());// 配置发送的消息元信息,同步发送producer.newMessage().key("my-message-key").value("my-message").property("my-key", "my-value").property("my-other-key", "my-other-value").send();producer.newMessage().key("my-async-message-key").value("my-async-message").property("my-async-key", "my-async-value").property("my-async-other-key", "my-async-other-value").sendAsync();// 关闭producer的方式有两种:同步和异步// producer.closeAsync();producer.close();// 关闭licent的方式有两种,同步和异步// client.close();client.closeAsync();}
}

3.4 构造消费者

  • 单订阅
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.TimeUnit;public class MyConsumer {private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";public static void main(String[] args) throws Exception {// 构造Pulsar ClientPulsarClient client = PulsarClient.builder().serviceUrl(SERVER_URL).enableTcpNoDelay(true).build();Consumer consumer = client.newConsumer().consumerName("my-consumer").topic("persistent://public/default/my-topic").subscriptionName("my-subscription").ackTimeout(10, TimeUnit.SECONDS).maxTotalReceiverQueueSizeAcrossPartitions(10).subscriptionType(SubscriptionType.Exclusive).subscribe();do {// 接收消息有两种方式:异步和同步// CompletableFuture<Message<String>> message = consumer.receiveAsync();Message message = consumer.receive();log.info("get message from pulsar cluster,{}", message);} while (true);}
  • 多订阅
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;public class MultiConsumer {private static final Logger log = LoggerFactory.getLogger(MultiConsumer.class);private static final String SERVER_URL = "pulsar://pulsar.cluster.com:6650";private static final String DEFAULT_NS_TOPICS = "persistent://public/default/.*";private static final String DEFATULT_NS_REG_TOPICS= "persistent://public/default/my.*";private static void main(String[] args) throws Exception {PulsarClient client = PulsarClient.builder().serviceUrl(SERVER_URL).enableTcpNoDelay(true).build();ConsumerBuilder consumerBuilder = client.newConsumer().subscriptionName("multi-sub");// 订阅namespace下所有的topicPattern allTopicsInNamespace = Pattern.compile(DEFAULT_NS_TOPICS);consumerBuilder.topicsPattern("").subscribe();// 订阅namespace下满足正则匹配的topicPattern someTopicsInNamespace = Pattern.compile(DEFATULT_NS_REG_TOPICS);Consumer allTopicsConsumer = consumerBuilder.topicsPattern(someTopicsInNamespace).subscribe();List<String> topics = Arrays.asList("topic-1","topic-2","topic-3");Consumer multiTopicConsumer = consumerBuilder.topics(topics).subscribe();}
}

Pulsar简单实现demo相关推荐

  1. Dubbo入门介绍---搭建一个最简单的Demo框架

    Dubbo入门---搭建一个最简单的Demo框架 置顶 2017年04月17日 19:10:44 是Guava不是瓜娃 阅读数:320947 标签: dubbo zookeeper 更多 个人分类: ...

  2. java 基础api实现上传,上传文件到7牛云存储的java api一个简单的demo实现

    最近在做一个项目,需要用到云存储,项目用的是七牛云.现在将项目过程中关于调用七牛云平台的java api来上传本地文件到七牛云空间的一个简单的demo展示给大家,希望对同样再用七牛云的童鞋们有所帮助. ...

  3. Ibatis.Net 学习手记一 简单的Demo

    最近在做游戏推广的需求,趁公司给了不少充足的时间...再一次看了下自己以前学过的IbatisDemo,同时拿出来分享一下 Ibatis.Net的官方文档地址为 http://www.mybatis.o ...

  4. c# MEF框架(一 MEF简介及简单的Demo)

    转自:http://www.cnblogs.com/yunfeifei/p/3922668.html 在文章开始之前,首先简单介绍一下什么是MEF,MEF,全称Managed Extensibilit ...

  5. 两个简单的Demo示例向读者展示Flash和ASP.NET交互原理以及过程

    ASP.NET与FLASH交互学习了ASP.NET的基础知识之后,终于等到学习交互的时候了.请大家和我一起来进行让人激动的交互吧!本章我将用两个简单的Demo示例向读者展示Flash和ASP.NET交 ...

  6. input 模糊匹配功能 文本框模糊匹配(纯html+jquery简单实现) demo

    input 模糊匹配功能 文本框模糊匹配(纯html+jquery简单实现) demo <!DOCTYPE HTML> <html lang="en"> & ...

  7. Dubbo入门----搭建一个最简单的Demo框架

    Dubbo背景和简介 Dubbo开始于电商系统,因此在这里先从电商系统的演变讲起. 单一应用框架(ORM) 当网站流量很小时,只需一个应用,将所有功能如下单支付等都部署在一起,以减少部署节点和成本. ...

  8. android 观察者模式的简单demo,一个简单的demo彻底搞懂观察者模式

    介绍 观察者模式也被称为发布-订阅(Publish/Subscribe)模式,它属于行为型模式的一种.观察者模式定义了一种一对多的依赖关系,一个主题对象可被多个观察者对象同时监听.当这个主题对象状态变 ...

  9. 【unity】快速了解游戏制作流程-制作九宫格简单游戏demo

    前言 hi~大家好呀!欢迎来到我的unity学习笔记系列~,本篇我会简单的记录一下游戏流程并且简单上手一个通过九宫格移动到指定位置的小游戏,话不多说,我们直接开始吧~ 本篇源自我看B站一位up主的视频 ...

最新文章

  1. Centos7 cdh5.14 安装
  2. 面试AI算法岗,你被要求复现顶会了嘛?
  3. 关于Linux下s、t、i、a权限
  4. MySQL limit 优化,百万至千万级快速分页:复合索引
  5. 如何设置iMatrix平台中列表标签(gridjqGrid)实现动态列表
  6. python sqlite3 带密码_Python实现ATM提款机系统
  7. php invoke 反射,PHP ReflectionMethod invoke()用法及代码示例
  8. atitit.bsh BeanShell 的动态脚本使用java
  9. php爬虫框架选用什么
  10. 解决notepad++ php代码美化
  11. PR 2019 快速入门(8)
  12. excel2007不显示文件名
  13. 如何用Deeplink为快应用提供多个快速直达入口
  14. 下载安装Vue-CLI
  15. 小新想把百度搜索引擎改为edge
  16. 企业为什么要建立独立电商网站?
  17. 【机器学习】如何成为当下合格的算法工程师
  18. 圆周率一千万亿位_圆周率已被算到60万亿位,继续算下去有何意义这里告诉你真正原因...
  19. 计算机二级软件VC++6.0下载地址
  20. UltraEdit 快捷键(UE 快捷键)

热门文章

  1. 矩阵运算实现求样本与样本之间欧式距离
  2. 微信小程序实战篇-购物车
  3. R语言ggplot2可视化:通过在element_text函数中设置标签字体大小列表和标签字体形式列表自定义标签可视化效果
  4. 第十一章 :日志采集工具flume使用
  5. NLP 中损失函数对输入的求导的思考
  6. 2020年7月 :国产数据库名录和产品信息一览
  7. HTML+CSS炫酷彩色立方体特效
  8. 经典游戏“大富翁4”存档文件修改器Rich4Editor下载
  9. android 强制更新流程图,AndroidUpdateDemo
  10. MATLAB中COBRA插件的学习(一)输入文件的格式