Kafuka-windows环境搭建流程说明

一、安装JDK
1.JDK下载路:
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
2.按照: https://blog.csdn.net/zhys0902/article/details/79499329
参考步骤,依次配置JAVA_HOME、Classpath和Path,然后打开cmd,运行java -version成功,则JDK配置成功;

二、安装Zookeeper
由于Kafka的运行依赖于Zookeeper,所以在运行Kafka之前需要安装并运行Zookeeper
1.Zookeeper下载路径 http://zookeeper.apache.org/releases.html#download
2.解压到文件下(我的目录是D:\Program Files\zookeeper-3.5.4-beta)
3.打开D:\Program Files\zookeeper-3.5.4-beta\conf,复制zoo_sample.cfg重命名成zoo.cfg
4.编辑zoo.cfg,修改dataDir为【dataDir=/zookeeper-3.5.4-beta /data】
5.添加环境变量
ZOOKEEPER_HOME D:\Program Files\zookeeper-3.5.4-beta
Path 在现有的值后面添加 ;%ZOOKEEPER_HOME%\bin;

报错:JAVA_HOME is not set
6.手动设置
打开(新建一个记事本,将文件拖入其中)文件手动添加如下图所示Java路径:

set JAVA=C:\Program Files\Java\jdk1.8.0_211\bin\java
set JAVA_HOME=C:\Program Files\Java\jdk1.8.0_211
7. 再次运行发现 8080端口被占用,在zoo.cfg增加admin.serverPort=8888 其中8888为未使用的端口

8.修改端口:
dataDir=D:\Program Files\zookeeper-3.5.4-beta\data
dataLogDir=D:\Program Files\zookeeper-3.5.4-beta\log
clientPort=2181
admin.serverPort=8888
运行成功!!!
三、安装Kafka
1.Kafka下载路径 http://kafka.apache.org/downloads
我们现在编译好的包

2.解压文件(我的目录是D:\kafka_2.11-2.2.0 【这里不要在Program Files等文件名之间有空格的目录下,不然一会执行会不识别路径】)
3.打开目录D:\kafka_2.11-2.2.0 \config下server.properties文件,把log.dirs修改为【log.dirs= D:\kafka_2.11-2.2.0 \kafka-logs】

  1. Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181。
    运行:
    重要:请确保在启动Kafka服务器前,Zookeeper实例已经准备好并开始运行。

在kafka安装目录下运行:
.\bin\windows\kafka-server-start.bat .\config\server.properties

四、测试
上面的Zookeeper和kafka一直打开

(1)、创建主题

在kafka安装目录下运行:
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic linlin

注意不要关了这个窗口!
(2)创建生产者 (Producer)
进入目录D:\kafka_2.11-2.2.0\bin\windows输入如下命令:
kafka-console-producer.bat --broker-list localhost:9092 --topic linlin

(3)创建消费者(Consumer)
kafka-console-consumer.bat --zookeeper localhost:2181 --topic linlin
报错 :

高版本的用以下语句
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic linlin --from-beginning

注意:不要关了这个“生产者”和“消费者”窗口!

注意:在“生产者”窗口中输入内容,最后记得回车

搞定!!!

五、 使用kafka-manager

官网下载的是源码,如果觉得麻烦,可以直接从下面地址下载编译好的 kafka-manager-1.3.3.7.zip。
链接: http://pan.baidu.com/s/1qY8sGoO 密码: ye7b
链接:https://pan.baidu.com/s/1twhwfILRo9ReCAYczd44Fw 密码:kjqh

注意:启动kafka-manager之前请先保证zookeeper和kafka服务是开启的
启动目录下:
运行 bin/kafka-manager
默认情况下端口为9000,你还可以通过下面的命令指定配置文件和端口:
bin/kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8889
zkhosts 是zookeeper服务的端口
登录权限,默认为false,可以改为true

登录账号密码

登录成功:

新建监控集群或单机

C:\Users\Administrator>rd /s /q D:\kafka-manager-1.3.3.7\bin\application.home_IS_UNDEFINED

用到的好文章:
https://blog.csdn.net/dayonglove2018/article/details/106919643?biz_id=102&utm_term=kafka%E4%B8%8Ezookeeper%E5%88%86%E5%BC%80%E5%AE%89%E8%A3%85%E7%9A%84%E6%80%8E%E4%B9%88%E4%BD%BF%E7%94%A8&utm_medium=distribute.pc_search_result.none-task-blog-2allsobaiduweb~default-1-106919643&spm=1018.2118.3001.4187

https://download.csdn.net/download/w353595/12668251?ops_request_misc=&request_id=&biz_id=103&utm_term=Windows%E4%B8%8Bkafka%E6%90%AD%E5%BB%BA&utm_medium=distribute.pc_search_result.none-task-download-2downloadsobaiduweb~default-0-12668251.pc_v2_rank_dl_default&spm=1018.2226.3001.4451.2

windows 上 Kafka 运行环境安装

kafka的windows下使用

在Window下安装kafka-manager

windows 安装 kafka-manager 客户端管理工具

windows上安装kafka-manager

win10 c++ 操作kafka

C++编程中使用librdkafka库去连接kafka集群

kafka最新安装流程(c/c++)

win10 kafka环境搭建,编译C++(librdkafka)

[windows vs2019 编译OpenSSL处理](https://blog.csdn.net/u011046042/article/details/121922039?utm_medium=distribute.pc_relevant.none-task-blog-2defaultbaidujs_baidulandingword~default-0-121922039-blog-89429266.pc_relevant_aa&spm=1001.2101.3001.4242
.1&utm_relevant_index=3)



##着重看一下连接

librdkafka官方Demo在Windows上运行与使用

Kafka C++客户端库librdkafka笔记

Linux下 Kafka 之 C/C++ 客户端库 librdkafka 的编译,安装以及函数介绍

Kafka C++客户端库librdkafka笔记
##安装+cmake librdkafka库

https://github.com/edenhill/librdkafka

##生产者代码:

#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <list>
#include <memory>
#include <string>
#include <string.h>
#include "rdkafkacpp.h"using namespace std;bool run = true;class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb
{public:void dr_cb (RdKafka::Message &message) {std::cout << "Message delivery for (" << message.len() << " bytes): " <<message.errstr() << std::endl;if (message.key())std::cout << "Key: " << *(message.key()) << ";" << std::endl;}
};class ExampleEventCb : public RdKafka::EventCb {public:void event_cb (RdKafka::Event &event) {switch (event.type()){case RdKafka::Event::EVENT_ERROR:std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)run = false;break;case RdKafka::Event::EVENT_STATS:std::cerr << "\"STATS\": " << event.str() << std::endl;break;case RdKafka::Event::EVENT_LOG:fprintf(stderr, "LOG-%i-%s: %s\n",event.severity(), event.fac().c_str(), event.str().c_str());break;default:std::cerr << "EVENT " << event.type() <<" (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;break;}}
};/* Use of this partitioner is pretty pointless since no key is provided * in the produce() call.so when you need input your key */
class MyHashPartitionerCb : public RdKafka::PartitionerCb {public:int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,int32_t partition_cnt, void *msg_opaque){std::cout<<"partition_cnt="<<partition_cnt<<std::endl;return djb_hash(key->c_str(), key->size()) % partition_cnt;}private:static inline unsigned int djb_hash (const char *str, size_t len){unsigned int hash = 5381;for (size_t i = 0 ; i < len ; i++)hash = ((hash << 5) + hash) + str[i];std::cout<<"hash1="<<hash<<std::endl;return hash;}
};void TestProducer()
{std::string brokers = "localhost";std::string errstr;std::string topic_str="helloworld_kugou_test";//自行制定主题topicMyHashPartitionerCb hash_partitioner;int32_t partition = RdKafka::Topic::PARTITION_UA;int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;bool do_conf_dump = false;int opt;int use_ccb = 0;//Create configuration objectsRdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK){std::cerr << errstr << std::endl;exit(1);}/* * Set configuration properties */conf->set("metadata.broker.list", brokers, errstr);ExampleEventCb ex_event_cb;conf->set("event_cb", &ex_event_cb, errstr);ExampleDeliveryReportCb ex_dr_cb;/* Set delivery report callback */conf->set("dr_cb", &ex_dr_cb, errstr);/* * Create producer using accumulated global configuration. */RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);if (!producer){std::cerr << "Failed to create producer: " << errstr << std::endl;exit(1);}std::cout << "% Created producer " << producer->name() << std::endl;/* * Create topic handle. */RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);if (!topic) {std::cerr << "Failed to create topic: " << errstr << std::endl;exit(1);}/* * Read messages from stdin and produce to broker. */for (std::string line; run && std::getline(std::cin, line);){if (line.empty()){producer->poll(0);continue;}/* * Produce message // 1. topic // 2. partition // 3. flags // 4. payload // 5. payload len // 6. std::string key // 7. msg_opaque? NULL */std::string key=line.substr(0,5);//根据line前5个字符串作为key值// int a = MyHashPartitionerCb::djb_hash(key.c_str(),key.size());// std::cout<<"hash="<<a<<std::endl;RdKafka::ErrorCode resp = producer->produce(topic, partition,RdKafka::Producer::RK_MSG_COPY /* Copy payload */,const_cast<char *>(line.c_str()), line.size(),key.c_str(), key.size(), NULL);//这里可以设计key值,因为会根据key值放在对应的partitionif (resp != RdKafka::ERR_NO_ERROR)std::cerr << "% Produce failed: " <<RdKafka::err2str(resp) << std::endl;elsestd::cerr << "% Produced message (" << line.size() << " bytes)" <<std::endl;producer->poll(0);//对于socket进行读写操作。poll方法才是做实际的IO操作的。return the number of events served}//run = true;while (run && producer->outq_len() > 0) {std::cerr << "Waiting for " << producer->outq_len() << std::endl;producer->poll(1000);}delete topic;delete producer;
}int main(int argc, char *argv[])
{TestProducer();return EXIT_SUCCESS;
}

##消费者代码

#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <list>
#include <memory>
#include <string>
#include <string.h>
#include "librdkafka/rdkafkacpp.h"
using namespace std;bool run = true;
bool exit_eof = true;
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb
{public:void dr_cb (RdKafka::Message &message) {std::cout << "Message delivery for (" << message.len() << " bytes): " <<message.errstr() << std::endl;if (message.key())std::cout << "Key: " << *(message.key()) << ";" << std::endl;}
};class ExampleEventCb : public RdKafka::EventCb {public:void event_cb (RdKafka::Event &event) {switch (event.type()){case RdKafka::Event::EVENT_ERROR:std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)run = false;break;case RdKafka::Event::EVENT_STATS:std::cerr << "\"STATS\": " << event.str() << std::endl;break;case RdKafka::Event::EVENT_LOG:fprintf(stderr, "LOG-%i-%s: %s\n",event.severity(), event.fac().c_str(), event.str().c_str());break;default:std::cerr << "EVENT " << event.type() <<" (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;break;}}
};/* Use of this partitioner is pretty pointless since no key is provided * in the produce() call.so when you need input your key */
class MyHashPartitionerCb : public RdKafka::PartitionerCb {public:int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,int32_t partition_cnt, void *msg_opaque){std::cout<<"partition_cnt="<<partition_cnt<<std::endl;return djb_hash(key->c_str(), key->size()) % partition_cnt;}private:static inline unsigned int djb_hash (const char *str, size_t len){unsigned int hash = 5381;for (size_t i = 0 ; i < len ; i++)hash = ((hash << 5) + hash) + str[i];std::cout<<"hash1="<<hash<<std::endl;return hash;}
};void msg_consume(RdKafka::Message* message, void* opaque)
{switch (message->err()){case RdKafka::ERR__TIMED_OUT:break;case RdKafka::ERR_NO_ERROR:/* Real message */std::cout << "Read msg at offset " << message->offset() << std::endl;if (message->key()){std::cout << "Key: " << *message->key() << std::endl;}printf("%.*s\n", static_cast<int>(message->len()),static_cast<const char *>(message->payload()));break;case RdKafka::ERR__PARTITION_EOF:/* Last message */if (exit_eof){run = false;cout << "ERR__PARTITION_EOF" << endl;}break;case RdKafka::ERR__UNKNOWN_TOPIC:case RdKafka::ERR__UNKNOWN_PARTITION:std::cerr << "Consume failed: " << message->errstr() << std::endl;run = false;break;default:/* Errors */std::cerr << "Consume failed: " << message->errstr() << std::endl;run = false;}
}
class ExampleConsumeCb : public RdKafka::ConsumeCb {public:void consume_cb (RdKafka::Message &msg, void *opaque){msg_consume(&msg, opaque);}
};
void TestConsumer()
{std::string brokers = "localhost";std::string errstr;std::string topic_str="helloworld_kugou_test";//helloworld_kugouMyHashPartitionerCb hash_partitioner;int32_t partition = RdKafka::Topic::PARTITION_UA;//为何不能用??在Consumer这里只能写0???无法自动吗???partition = 0;int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;bool do_conf_dump = false;int opt;int use_ccb = 0;//Create configuration objectsRdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK){std::cerr << errstr << std::endl;exit(1);}/* * Set configuration properties */conf->set("metadata.broker.list", brokers, errstr);ExampleEventCb ex_event_cb;conf->set("event_cb", &ex_event_cb, errstr);ExampleDeliveryReportCb ex_dr_cb;/* Set delivery report callback */conf->set("dr_cb", &ex_dr_cb, errstr);/* * Create consumer using accumulated global configuration. */RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);if (!consumer){std::cerr << "Failed to create consumer: " << errstr << std::endl;exit(1);}std::cout << "% Created consumer " << consumer->name() << std::endl;/* * Create topic handle. */RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, errstr);if (!topic){std::cerr << "Failed to create topic: " << errstr << std::endl;exit(1);}/* * Start consumer for topic+partition at start offset */RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl;exit(1);}ExampleConsumeCb ex_consume_cb;/* * Consume messages */while (run){if (use_ccb){consumer->consume_callback(topic, partition, 1000, &ex_consume_cb, &use_ccb);}else{RdKafka::Message *msg = consumer->consume(topic, partition, 1000);msg_consume(msg, NULL);delete msg;}consumer->poll(0);}/* * Stop consumer */consumer->stop(topic, partition);consumer->poll(1000);delete topic;delete consumer;
}int main(int argc, char *argv[])
{TestConsumer();return EXIT_SUCCESS;
}

###最后效果

windows kafka环境搭建相关推荐

  1. VS2008开发Windows Mobile6环境搭建及模拟器联网问题图解

    安装了VS2008后,新建一个智能设备的工程,但调试时只有三个WM5.0的模拟器可用,从网上查到要安装WM6.0的SDK,还要安装一些其它的组件才能开始WM(现在最新的叫法叫WP Windows Ph ...

  2. Windows TensorFlow环境搭建

    Windows TensorFlow环境搭建 简介 本次安装是在Windows10 上进行的,使用tensorflow安装的环境为 Anaconda.Python3.7.TensorFlow2.0 搭 ...

  3. 【spark】windows spark 环境搭建

    Windows平台环境搭建 JDK 1.8-8u201 Scala 2.11.8 spark 2.2.0 hadoop 2.7.2 sbt 0.13.13.1 上面的几个软件之间相互有版本依赖关系的因 ...

  4. Dart开发(一)Windows平台环境搭建

    Dart开发Windows平台环境搭建 SDK官网下载地址:https://gekorm.com/dart-windows/ 安装完成后,在命令行输入:dart --version,安装成功会出现相应 ...

  5. 基于Codeblock的LVGL模拟器Windows平台环境搭建[带源码]

    基于Codeblock的LVGL模拟器Windows平台环境搭建 文章目录 基于Codeblock的LVGL模拟器Windows平台环境搭建 概述 一.介绍 二.使用CodeBlock版本方法以及需要 ...

  6. Windows Python环境搭建

    Windows Python环境搭建 ` 提示:本篇分享适合于非专业程序员出生的python使用者和初学者 文章目录 Windows Python环境搭建 前言 一.Python 编辑环境--Pych ...

  7. Zookeeper和Kafka环境搭建总结

    前言 由于项目需要涉及到zookeeper和Kafka的使用,快速做了一篇笔记,方便小伙伴们搭建环境. zookeeper 官方定义 What is ZooKeeper? ZooKeeper is a ...

  8. react-native for android windows开发环境搭建详细记录

    先说说整个环境搭建的过程.上周开始要在windows上搭建react-native for android环境,当时按照找的教程,从git上clone master分支的代码,然后下载了node,安装 ...

  9. Windows编译环境搭建(VS2010)

    引言:   本篇文章基于教程目的(由于windows和linux相关环境搭建篇幅过大,在博客中做这些讲解),主要描述Windows的发展历程和编译环境搭建(基于 Visual Studio 2010版 ...

最新文章

  1. python操作mysql数据库实现增删改查
  2. 编译phonetisaurus时configure找不到openfst的问题解决
  3. 表格存储的Java SDK性能优化经验
  4. 探React Hooks
  5. 3.Ubuntu18.04取消警告音
  6. larval mysql 查询转数组_laravel 中将DB::select 得到的内容转为数组
  7. P_C_Brules
  8. Spring boot使用Rabbitmq注解及消息序列化
  9. android 富文本框架_五种JavaScript富文本编辑器,总有一款适合你
  10. oracle dbms_sql.describe_columns,PL/SQL Challenge 每日一题:2017-3-6 DBMS_SQL.DESCRIBE_COLUMNS
  11. 图方法:寻找无向图联通子集的JAVA版本
  12. CodeForces 841C (C) Leha and Function 贪心
  13. table表格 html 1128
  14. mysql sql 函数大全_MySQL常用SQL/函数汇总(持续更新)
  15. NavigationBar的显隐和颜色设置
  16. 计算机策略组无法打开怎么办,Win10系统gpedit.msc组策略打不开怎么解决
  17. qq等级计算机在线,I'M QQ - QQ官方网站
  18. 如何解决高度塌陷【超全面】
  19. 如何返回正确与错误信息
  20. 现代C++新特性 列表初始化

热门文章

  1. 《Adobe Flash CS5中文版经典教程》——1.10 发布影片
  2. 51单片机8255扩展c语言,51单片机8255之PC口控制
  3. tiny6410 适用于win7 64bit的dnw 的USB下载驱动
  4. 关于Swap的几种方法
  5. 卧槽,小编在搞事情 。。。
  6. java创建目录的方法:mkdir()、mkdirs()
  7. 深度学习的seq2seq模型
  8. 以图搜图 – 3大相似图片搜索引擎
  9. gin 多结构体嵌套 效验范例
  10. 2022起重机械指挥考试题模拟考试平台操作