首先说明一下,小白在学习这一块的时候,记得导入包的时候要看清包名,网上有一些博客没有具体导入的包名称,可能会导致在本机上导包后出现各种各样奇葩的强制类型转化.

kafka的相关内容与概念在这里就不再赘述了,咱们直接通过一个小案例来感受一下kafka的魅力,

本篇博客分为五个部分:

  1. 配置情况;
  2. 在搭建好的linux服务器上创建一个topic;
  3. 作为producer(生产者),给kafka上新创建的topic发送一条信息;
  4. 奇葩报错的解决方案
  5. 作为consumer(消费者),接收kafka的topic上的消息;

一:配置情况:

1.在这里我使用四台虚拟机:

>>win8.1的虚拟机作为编程的场所 (在虚拟机上可以随便折腾)

>>三台linux系列的centos虚拟机,ip地址分别是:192.168.0.207,192.168.0.204,192.168.0.171

2.zookeeper端口使用默认端口:2181

3.kafka端口使用默认端口:9092

4.将虚拟机设置为桥接模式(否则得不到三个不同的ip)

5.一个热爱技术的心

二:在搭建好的linux服务器上创建一个topic:

1.首先需要使用三台linux虚拟机安装zookeeper和kafka的集群,在这里推荐一篇我个人认为不错的博文,大家可以参考一下,同时感谢博主的付出:http://www.cnblogs.com/luotianshuai/p/5206662.html

2.由于没有zookeeper的支持,不可以新增topic,因此确保自己的三台linux服务器运行./zkServer.sh status 与jps 后,可以出现下图的内容

3.选取任意一台服务器(这里我选用的是ip为192.168.0.204的服务器),来创建topic:

  • cd 到目标位置 ----kafka/kafka_2.11-0.11.0.0/bin
  • 运行命令: ./kafka-topics.sh --create --zookeeper192.168.0.204:2181--replication-factor 2 --partitions 1 --topic mytopic ,这里的ip地址改为你的ip地址,可以在linux服务器上运行ifconfig 命令(windows上是 ipconfig ),最后的那个 "mytopic"是自定义的topic名称,可以更换,但是要记住这个topic名称,一会要用到
  • 没有任何提醒的话,就说明成功了,现在使用命令来检查一下  ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic mytopic ,如果出现下面的图片内容,就证明成功了:
这里的序号表示集群中主机的id,比如 Leader:2 就表示这个这个topic的leader是id = 2 的那台主机.之前在配置时配置过,所以这个序号不同也是正常的

三:作为producer(生产者),给kafka的一个topic发送信息:

准备工作就绪了,要开始打码了,首先配置maven的pom.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>kafka</groupId><artifactId>kafka</artifactId><version>1.0-SNAPSHOT</version><!--这个才是真正需要添加的--><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>0.8.2.2</version></dependency></dependencies></project>

然后写一个生产者:目的是发送"Hhhhh!I am coming"的字符串

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
//注意导包
import java.util.Properties;public class MyProducer {private static Producer<Integer, String> producer;private final Properties props = new Properties();public MyProducer() {//连接broker list,这里因为要使用kafka,因此端口是kafka的默认端口9092props.put("metadata.broker.list", "192.168.0.204:9092");//在网络传输前都要进行的序列化操作props.put("serializer.class", "kafka.serializer.StringEncoder");producer = new Producer<Integer, String>(new ProducerConfig(props));}public static void main(String[] args) {MyProducer sp = new MyProducer();//设置topic,注意要与之前创建的topic名称保持一致String topic = "mytopic";//设置的消息内容String messageStr = "Hhhhh! I am coming";//构建消息对象KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);//推送消息到brokerproducer.send(data);producer.close();}
}

注意这里导包要正确,否则随时有一大堆强制类型转化的提示!

写到这里,如果运行后出现下面的图,那么恭喜恭喜,你可以直接跳过第四部分,学习消费者的代码书写:

生产者因为要发送数据,发送结束就结束了,因此只会有warn,并且会结束程序.

如果有各种奇葩报错,不用担心,请看第四部分,因为我是新装的虚拟机,因此一般来说如果之前没有设置过,可能会有下面的情况.

四:奇葩报错的解决方案

如果你问我,初学者该怎么学习一种新技术,我不会告诉你应该多看书有耐心,而是先把虚拟机的防火墙关了!

比如这个报错:

如果linux服务器在配置的时候没有打开防火墙端口的记忆,那么直接运行这条命令:

>>>su

>>>然后输入你的管理员密码

>>>systemctl stop firewalld.service #停止firewall

>>>systemctl disable firewalld.service #禁止firewall开机启动 (为了安全考虑,这条可以不用)

更多sentos 防火墙指令,请参考博客:http://blog.csdn.net/weixin_35757704/article/details/76016355

(SentOS 7防火墙配置与端口增删改查的命令)

同样在Windows上,也要把防火墙关闭,然后就可以了

五:作为consumer(消费者),接收kafka的topic上的消息

愉快的写一个消费者:

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;public class MyConsumer {private final ConsumerConnector consumer;private final String topic;public MyConsumer(String zookeeper, String groupId, String topic) {Properties props = new Properties();props.put("zookeeper.connect", zookeeper);props.put("group.id", groupId);//配置一些参数props.put("zookeeper.session.timeout.ms", "500");props.put("zookeeper.sync.time.ms", "250");props.put("auto.commit.interval.ms", "1000");consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));this.topic = topic;}public void testConsumer() {Map<String, Integer> topicCount = new HashMap<String, Integer>();//定义订阅topic的数量topicCount.put("mytopic", new Integer(1));//返回topic的map,这里如果无法理解,可以在这里设置断点,观察真正传回的数据格式Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);//取出我们需要的topic的消息流List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);for (final KafkaStream stream : streams) {ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();while (consumerIte.hasNext())System.out.println("Message from single topic :: " + new String(consumerIte.next().message()));}if (consumer != null) {consumer.shutdown();}}public static void main(String[] args) {String topic = "mytopic";MyConsumer myConsumer = new MyConsumer("192.168.0.204:2181", "testgroup", topic);myConsumer.testConsumer();}
}

然后就可以测试了,首先运行消费者程序MyConsumer,会出现循环,因为kafka本身就是一个死循环,然后运行生产者程序MyProducer,注意观察MyConsumer的控制台打印的内容,如果是下图内容,就证明成功了:

如果出现空指针异常的报错,可能是topic的单词打错了,比如下图所示:

当时我就是一个字母打错了,把'o'打成了'i',大家引以为戒吧.

然后可以根据这个小demo,感受一下kafka的运行过程.同时推荐尝试的读一下kafka的源码,我也在努力的在学习kafka框架,希望能够学到许多关于框架的知识,并且分享出去,同时加强对kafka的理解.

欢迎入坑哟...emoji..

kafka发送与接收数据(含奇葩报错解决方案)相关推荐

  1. Flink解析kafka canal未压平数据为message报错

    canal使用非flatmessage方式获取mysql bin log日志发至kafka比直接发送json效率要高很多,数据发到kafka后需要实时解析为json,这里可以使用strom或者flin ...

  2. 【kafka】kafka 消费数据的时候 报错 (Re-) join group

    文章目录 1.场景1 1.1 概述 2.场景2 3.场景3 1.场景1 1.1 概述 kafka 消费数据的时候 报错 如下 2.场景2 spirng-kafka的多consumer问题困扰了我好久, ...

  3. 使用DatagramSocket发送、接收数据(Socket之UDP套接字)

    2019独角兽企业重金招聘Python工程师标准>>> 创建一个DatagramSocket实例,并将该对象绑定到指定IP地址.指定端口. 通过上面三个构造器中的任意一个构造器即可创 ...

  4. 网络——在网络上发送,接收数据

    问题 创建并加入一个网络会话是一回事,但如果不能发送或接收任何数据那么网络会话有什么用呢? 解决方案 当玩家连接到会话时,你可以在一个PacketWriter流中存储所有想要发送的数据.完成这个操作后 ...

  5. hal 双串口同时接收丢失数据_【STM32Cube_06】使用USART发送和接收数据(查询模式)...

    寻求更简洁舒适的阅读体验,请移步Mculover666的个人博客: [STM32Cube_06]使用USART发送和接收数据(查询模式)​www.mculover666.cn 本篇文章主要介绍如何使用 ...

  6. 手把手教你Android手机与BLE终端通信--连接,发送和接收数据

    假设你还没有看上一篇 手把手教你Android手机与BLE终端通信--搜索,你就先看看吧,由于这一篇要接着讲搜索到蓝牙后的连接.和连接后的发送和接收数据. 评论里有非常多人问假设一条信息特别长,怎么不 ...

  7. socket简介和udp网络程序-发送、接收数据

    socket简介 不同电脑上的进程之间如何通信 首要解决的问题是如何唯一标识一个进程,否则通信无从谈起! 在1台电脑上可以通过进程号(PID)来唯一标识一个进程,但是在网络中这是行不通的. 其实TCP ...

  8. STM32L152RE实现串口发送及接收数据

    本文主要讲解用keil软件实现USART串口发送及接收数据,默认读者keil环境已经配好,且头文件已正确引入,如出现编译错误以及st-link下载问题,请自行百度解决. 串口发送和接收数据是一件看起来 ...

  9. Android-Ble蓝牙开发Demo示例–扫描,连接,发送和接收数据

    万物互联的物联网时代的已经来临,ble蓝牙开发在其中扮演着举重若轻的角色.最近刚好闲一点,抽时间梳理下这块的知识点. 涉及ble蓝牙通讯的客户端(开启.扫描.连接.发送和接收数据.分包解包)和服务端( ...

最新文章

  1. Python matplotlib中文显示异常,导入simhei字体有误
  2. python从数据库取数据保存为excel_python读取数据库表数据并写入excel
  3. JQUERY实现放大镜
  4. 新手坐高铁怎么找车厢_一个新手怎么做直播卖衣服?找对货源供应商成功一半...
  5. sed 第n行后加入_【高新课堂】第一百三十九期Liunx运维17个实用技巧
  6. 不同Linux主机下文件的拷贝
  7. 单镜头反光相机是什么
  8. python学习笔记(14)参数对应
  9. Android配置http请求
  10. Flex4_操作XML
  11. 携手英特尔,百度开放云将提供更强悍云服务
  12. Oracle数据库锁表查询
  13. python 通达信k线_python日线通达信,通达信 主图默认显示每只股票上市以来所有日线...
  14. vue源码之数据侦测
  15. STL-Intelligent IME
  16. 提高数据分析思维能力的三大方法
  17. PrimeNG之FileUpload
  18. 华为鸿蒙os的内核是Linux,谈华为鸿蒙内核和操作系统
  19. 【声纹识别】 EER
  20. 线性代数 --- 线性相关与线性无关(个人学习笔记)

热门文章

  1. python35个关键字_Python关键字35个
  2. 关于 Apple Metal API 的一些想法
  3. Tensorflow图像处理以及数据读取
  4. 揭秘自编码器,一种捕捉数据最重要特征的神经网络(视频+代码)
  5. countif函数比较两列不同_这些Excel函数公式,职场办公天天用,赶紧掌握!
  6. python开发面试笔试题_python集合面试笔试题
  7. abb机器人建立工件坐标系_abb机器人坐标系说明介绍
  8. 接收字节流_Java之IO流、属性配置文件
  9. 数据科学入门与实战:玩转pandas之七数据分箱技术,分组技术,聚合技术
  10. java script 环境搭建_TypeScript环境搭建