使用SASL/PLAIN进行身份验证

SASL/PLAIN是一种简单的用户名/密码身份验证机制,通常与TLS一起用于加密以实现安全身份验证。Kafka支持SASL/PLAIN的默认实现,可以扩展到生产环境中使用

1. Kafka brokers 配置:

1)kafka增加认证信息:

向每个Kafka代理的配置目录中添加一个经过适当修改的类似于下面的JAAS文件

在kafka的配置文件中创建JAAS文件:

新建: kafka_jaas.conf 写入以下内容

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-sec"
user_admin="admin-sec"
user_producer="prod-sec"
user_consumer="cons-sec";
};


JAAS文件定义了链接Kafka Broker时所需要的用户名密码及broker各个节点之间相互通信的用户名密码

  • username定义一个公共的用户名,用于节点之间进行通信,
  • user_xxxx 为自定义的用户,主要是客户端用来连接kafka的,所有可以使用的用户必须在此定义,不能再之后新增。等号后面是密码xxxxx是用户名,这里大小写一个字都不能差,除了用户名和密码

producer用于开放生产权限。
consumer用于开放消费权限。

username/ password:broker之间通信使用的用户名密码。
user_admin/user_producer/user_consumer:客户端(管理员、生产者、消费者)链接broker时所使用到的用户名密码。

2)修改kafka启动脚本:kafka-server-start.sh

将JAAS配置文件位置作为JVM参数传递给每个Kafka代理

-Djava.security.auth.login.config=/root/ysw/sasl_kafka_zk/kafka/config/kafka_jaas.conf

找到 export KAFKA_HEAP_OPTS,添加jvm 参数,注意kafka_jaas.conf文件是之前第一步创建的安全认证文件

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/root/ysw/sasl_kafka_zk/kafka/config/kafka_jaas.conf kafka.Kafka "$@"

3)修改 kafka 配置文件下的server.properties

配置服务器中的SASL端口和SASL机制,所涉及的属性如下

listeners=SASL_PLAINTEXT://主机名称:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true //当没有找到ACL配置时,允许所有的访问操作。

allow.everyone.if.no.acl.found=true 时,整个ACL机制为黑名单机制,即只有黑名单中的用户不能访问资源,非黑名中的用户都可以正常访问kafka的资源
allow.everyone.if.no.acl.found=false时, 也就是默认为false,ACL的机制是白名单机制,只有白名单中的用户才能访问kafka的资源,其他用户为未授权用户。

启动kafka

bin/kafka-server-start.sh config/server.properties

如果是kafka集群,其他节点,也要按照这样进行配置即可

2. Kafka客户端(clients)配置

broker开启了SASL也 要在客户端上配置SASL身份验证

1)创建JAAS文件:

  • 在kafka 配置文件中新建消费者:kafka_client_consumer_jaas.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="cons-sec";
};

  • 在kafka 配置文件中新建消费者: 生产者:kafka_client_producer_jaas.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer"
password="prod-sec";
};


2)修改客户端脚本指定JAAS文件加载:

将JAAS配置文件位置作为JVM参数传递给每个客户机JVM

  • 修改生产启动脚本: kafka-console-producer.sh

-Djava.security.auth.login.config=/root/ysw/kafka_zk_alone/kafka_alone/config/kafka_client_producer_jaas.conf
  • 修改生产启动脚本: kafka-console-consumer.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/root/ysw/kafka_zk_alone/kafka_alone/config/kafka_client_consumer_jaas.conf kafka.tools.ConsoleConsumer "$@"

3)修改客户端配置信息:

分别在producer.properties和consumer.properties添加认证机制

  • producer.properties:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
  • consumer.properties:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN# consumer group id
group.id=ysw-group-sasl

3. 生产消费测试

  • 生产
    不指定配置文件,会报连接错误
bin/kafka-console-producer.sh --broker-list kafka2:9099 --topic test

指定配置文件

bin/kafka-console-producer.sh --broker-list kafka2:9099 --topic test --producer.config config/producer.properties
  • 消费:
bin/kafka-console-consumer.sh --bootstrap-server kafka2:9099 --topic test --from-beginning --consumer.config config/consumer.properties

3. SpringBoot 整合SASL(使用java api来使用客户端)

  1. 新建kafka_client_jaas.conf添加如下内容
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer"
password="prod-sec";
};
  1. 代码中主要添加如下内容
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Properties;public class CustomConsumerSasl extends Thread{KafkaConsumer<String, String> consumer;public CustomConsumerSasl() {Properties properties = new Properties();//连接的集群//"kafka1:9092,kafka2:9092,kafka3:9092"properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka3:9092");//消费组properties.setProperty("group.id", "test2");//此参数有效条件 默认值是latest 及最大可消费offset也是最新发布存入的消息offset//1.还在同一个消费组消费但是之前的最小可消费offset已经不存在// 或者换了一个组去消费properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");//关闭自动提交offsetproperties.put("enable.auto.commit", "false");//properties.put("max.poll.records",2);//6235213288010379048//开启自动提交//  properties.put("enable.auto.commit", "true");//  properties.put("auto.commit.interval.ms", "1000");//反序列化key valproperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/**添加SASL权限认证后的需要添加的内容==================begin=======================*///接入协议为SASL_SSL, 鉴权方式为PLAINproperties.put("sasl.mechanism","PLAIN");properties.put("security.protocol","SASL_PLAINTEXT");// properties.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"prod-sec\"");/*** 将应用启动参数java.security.auth.login.config设置为kafka_client_jaas.conf的绝对路径,可以通过代码或者启动参数设置。* 注意 : spring boot配置中可以不需要的这个kafka_client_jaas.conf,它通过读取application.yaml中的配置生成了一个* 为客户端节点的认证文件:kafka_client_jaas.conf的内容为:* KafkaClient {* org.apache.kafka.common.security.plain.PlainLoginModule required* username="producer"* password="prod-sec";* };**/System.setProperty("java.security.auth.login.config", "E:/apps/kafka_client_jaas.conf");/**添加SASL权限认证后的需要添加的内容==================end=======================*///创建消费者consumer = new KafkaConsumer<String, String>(properties);订阅主题指定topicconsumer.subscribe(Arrays.asList("test"));//consumer.subscribe(Arrays.asList("bigdata2","bigdata3"));}@Overridepublic void run() {long i = 0;// 一般生产这开启就不需要关闭了while (true) {//获取取数     每次拉去会获取多条数据,默认500ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> recode : records) {i++;SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateString = formatter.format(recode.timestamp());
//解析打印ConsumerRecordsSystem.out.println(i +"   timestamp= " + dateString + "       partition=" + recode.partition() + "         recodeOffset = " + recode.offset() + "   " + "recodeValue = " + recode.value());}System.out.println("===================================");}}public static void main(String[] args) {new CustomConsumerSasl().start();// new MyConsumer().setConsumerTest();}
}

参考:https://www.jianshu.com/p/09129c9f4c80

使用SASL机制的KAFKA集群的安装:(包含springBoot整合)

阿里云社区zookeeper和kafka的SASL认证以及生产实践
Kafka安全认证SASL/PLAIN,并和springBoot整合

Spring Boot + Spring Kafka配置公网接入阿里云Kafka

Kafka SASL_PLAINTEXT权限管理,并整合SpringBoot相关推荐

  1. Kafka单机环境搭建及整合SpringBoot完成基本使用

    Kafka单机环境搭建及整合SpringBoot完成基本使用 Kafka单机环境搭建 下载kafka_2.11-1.1.0.tgz版本 下载地址 https://archive.apache.org/ ...

  2. SpringBoot中关于Shiro权限管理的整合使用

    在整合Shiro的时候,我们先要确定一下我们的步骤: 1.加入Shiro的依赖包,实现自己的Realm类(通过继承AuthorizingRealm类): 2.实现Shiro的配置类 3.实现前端的登录 ...

  3. Java权限管理|基于springBoot+springSecurity+jwt实现前后端分离用户权限认证

    基于springBoot+springSecurity+jwt实现前后端分离用户权限认证 1. 项目说明   主要基于前后端分离情况下用户权限认证, 当用户登录认证成功后,每个用户会获取到自己的tok ...

  4. Spring +mybatisplus+shiro权限管理集成整合

    一.Apache Shiro是一个功能强大.灵活的,开源的安全框架.它可以干净利落地处理身份验证.授权.企业会话管理和加密. Shiro能做什么呢? 验证用户身份 用户访问权限控制,比如:1.判断用户 ...

  5. SpringBoot 基于Shiro + Jwt + Redis的用户权限管理 (三) 鉴权

    项目Github地址: https://github.com/baiye21/ShiroDemo SpringBoot 基于Shiro + Jwt + Redis的用户权限管理 (一) 简介与配置 S ...

  6. kafka集群搭建+权限认证(SASL/SCRAM)+整合springboot

    本文介绍的的是kafka集群搭建.kafka权限认证(SASL/SCRAM).整合springboot项目. 1.创建kafka日志和zookeeper文件目录: /data/kafka/kafka- ...

  7. springBoot整合spring security+JWT实现单点登录与权限管理前后端分离

    在前一篇文章当中,我们介绍了springBoot整合spring security单体应用版,在这篇文章当中,我将介绍springBoot整合spring secury+JWT实现单点登录与权限管理. ...

  8. springBoot整合spring security+JWT实现单点登录与权限管理前后端分离--筑基中期

    写在前面 在前一篇文章当中,我们介绍了springBoot整合spring security单体应用版,在这篇文章当中,我将介绍springBoot整合spring secury+JWT实现单点登录与 ...

  9. 《SpringBoot与Shiro整合-权限管理实战---从构建到模拟数据库登入》

    <SpringBoot与Shiro整合-权限管理实战> ---- 从构建到模拟数据库登入 ---- 点击下载源码 ---- 或者查看? 文章目录 <SpringBoot与Shiro整 ...

最新文章

  1. Redis5.0之Stream案例应用解读
  2. 请收藏!新型冠状病毒感染的肺炎防控知识手册.pdf
  3. 趋势不能deploy的解决方法
  4. 神经网络激活函数对数函数_神经网络中的激活函数
  5. java jstack 工具_java命令之jstack工具
  6. C++学习——抽象类
  7. 获取文件当前地址GetModuleFileName函数
  8. diy操作系统 0:万事开头难
  9. 【转】腾讯云-解决Winscp permission denied的问题
  10. 【为什么需要FabricPath】FabricPath是思科 Nexus交换机上的一项技术特性,其目标是在保证二层环境的前提下,提高性能。来看看为什么数据中心需要FabricPath?
  11. 复制文字到剪贴板的几种方法
  12. 基于plc的污水处理,组态王动画仿真,带PLC源代码,组态王源代码
  13. html简单导航页单页源码
  14. Windows11 下屏幕亮度自动调整的问题解决
  15. matlab作业 阳光的快乐老爹,霍思燕6岁儿子近照曝光,调皮起来超阳光,完美继承老爹容颜!...
  16. latex 表格标题分行和居中
  17. 领扣LintCode问题答案-58. 四数之和
  18. Ueeshop:外贸网站推广优化方法和注意事项
  19. [附源码]计算机毕业设计宁财二手物品交易网站Springboot程序
  20. Linux--Linux服务器空间占满解决办法

热门文章

  1. ZYNQ 高速QDR IP功能和调用
  2. DM数据库事务、锁、多版本机制
  3. 为你的CD Walkman找到另一半 耳塞(耳机)乱点鸳鸯谱
  4. 最浪漫的爱是得不到的
  5. 如果解除计算机密码,如何强制解除电脑开机密码win7教程
  6. css表格文字超数量就竖排_CSS奇特技巧:控制文字竖排_css
  7. SysTrayIcon 改的 python tkinter 最小化至系统托盘
  8. 【平面设计基础】06:文字——创意字体
  9. 阿里资深技术专家: 程序员怎样快速成长? | 内部干货
  10. Groovy轻松入门——Grails实战基础篇