Kafka SASL_PLAINTEXT权限管理,并整合SpringBoot
使用SASL/PLAIN进行身份验证
SASL/PLAIN是一种简单的用户名/密码身份验证机制,通常与TLS一起用于加密以实现安全身份验证。Kafka支持SASL/PLAIN的默认实现,可以扩展到生产环境中使用
1. Kafka brokers 配置:
1)kafka增加认证信息:
向每个Kafka代理的配置目录中添加一个经过适当修改的类似于下面的JAAS文件
在kafka的配置文件中创建JAAS文件:
新建: kafka_jaas.conf 写入以下内容
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-sec"
user_admin="admin-sec"
user_producer="prod-sec"
user_consumer="cons-sec";
};
JAAS文件定义了链接Kafka Broker时所需要的用户名密码及broker各个节点之间相互通信的用户名密码
- username定义一个公共的用户名,用于节点之间进行通信,
- user_xxxx 为自定义的用户,主要是客户端用来连接kafka的,所有可以使用的用户必须在此定义,不能再之后新增。等号后面是密码xxxxx是用户名,这里大小写一个字都不能差,除了用户名和密码
producer用于开放生产权限。
consumer用于开放消费权限。
username/ password:broker之间通信使用的用户名密码。
user_admin/user_producer/user_consumer:客户端(管理员、生产者、消费者)链接broker时所使用到的用户名密码。
2)修改kafka启动脚本:kafka-server-start.sh
将JAAS配置文件位置作为JVM参数传递给每个Kafka代理
-Djava.security.auth.login.config=/root/ysw/sasl_kafka_zk/kafka/config/kafka_jaas.conf
找到 export KAFKA_HEAP_OPTS,添加jvm 参数,注意kafka_jaas.conf文件是之前第一步创建的安全认证文件
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/root/ysw/sasl_kafka_zk/kafka/config/kafka_jaas.conf kafka.Kafka "$@"
3)修改 kafka 配置文件下的server.properties
配置服务器中的SASL端口和SASL机制,所涉及的属性如下
listeners=SASL_PLAINTEXT://主机名称:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true //当没有找到ACL配置时,允许所有的访问操作。
allow.everyone.if.no.acl.found=true 时,整个ACL机制为黑名单机制,即只有黑名单中的用户不能访问资源,非黑名中的用户都可以正常访问kafka的资源
allow.everyone.if.no.acl.found=false时, 也就是默认为false,ACL的机制是白名单机制,只有白名单中的用户才能访问kafka的资源,其他用户为未授权用户。
启动kafka
bin/kafka-server-start.sh config/server.properties
如果是kafka集群,其他节点,也要按照这样进行配置即可
2. Kafka客户端(clients)配置
broker开启了SASL也 要在客户端上配置SASL身份验证
1)创建JAAS文件:
- 在kafka 配置文件中新建消费者:kafka_client_consumer_jaas.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="cons-sec";
};
- 在kafka 配置文件中新建消费者: 生产者:kafka_client_producer_jaas.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer"
password="prod-sec";
};
2)修改客户端脚本指定JAAS文件加载:
将JAAS配置文件位置作为JVM参数传递给每个客户机JVM
- 修改生产启动脚本: kafka-console-producer.sh
-Djava.security.auth.login.config=/root/ysw/kafka_zk_alone/kafka_alone/config/kafka_client_producer_jaas.conf
- 修改生产启动脚本: kafka-console-consumer.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/root/ysw/kafka_zk_alone/kafka_alone/config/kafka_client_consumer_jaas.conf kafka.tools.ConsoleConsumer "$@"
3)修改客户端配置信息:
分别在producer.properties和consumer.properties添加认证机制
- producer.properties:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
- consumer.properties:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN# consumer group id
group.id=ysw-group-sasl
3. 生产消费测试
- 生产
不指定配置文件,会报连接错误
bin/kafka-console-producer.sh --broker-list kafka2:9099 --topic test
指定配置文件
bin/kafka-console-producer.sh --broker-list kafka2:9099 --topic test --producer.config config/producer.properties
- 消费:
bin/kafka-console-consumer.sh --bootstrap-server kafka2:9099 --topic test --from-beginning --consumer.config config/consumer.properties
3. SpringBoot 整合SASL(使用java api来使用客户端)
- 新建kafka_client_jaas.conf添加如下内容
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer"
password="prod-sec";
};
- 代码中主要添加如下内容
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Properties;public class CustomConsumerSasl extends Thread{KafkaConsumer<String, String> consumer;public CustomConsumerSasl() {Properties properties = new Properties();//连接的集群//"kafka1:9092,kafka2:9092,kafka3:9092"properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka3:9092");//消费组properties.setProperty("group.id", "test2");//此参数有效条件 默认值是latest 及最大可消费offset也是最新发布存入的消息offset//1.还在同一个消费组消费但是之前的最小可消费offset已经不存在// 或者换了一个组去消费properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");//关闭自动提交offsetproperties.put("enable.auto.commit", "false");//properties.put("max.poll.records",2);//6235213288010379048//开启自动提交// properties.put("enable.auto.commit", "true");// properties.put("auto.commit.interval.ms", "1000");//反序列化key valproperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/**添加SASL权限认证后的需要添加的内容==================begin=======================*///接入协议为SASL_SSL, 鉴权方式为PLAINproperties.put("sasl.mechanism","PLAIN");properties.put("security.protocol","SASL_PLAINTEXT");// properties.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"prod-sec\"");/*** 将应用启动参数java.security.auth.login.config设置为kafka_client_jaas.conf的绝对路径,可以通过代码或者启动参数设置。* 注意 : spring boot配置中可以不需要的这个kafka_client_jaas.conf,它通过读取application.yaml中的配置生成了一个* 为客户端节点的认证文件:kafka_client_jaas.conf的内容为:* KafkaClient {* org.apache.kafka.common.security.plain.PlainLoginModule required* username="producer"* password="prod-sec";* };**/System.setProperty("java.security.auth.login.config", "E:/apps/kafka_client_jaas.conf");/**添加SASL权限认证后的需要添加的内容==================end=======================*///创建消费者consumer = new KafkaConsumer<String, String>(properties);订阅主题指定topicconsumer.subscribe(Arrays.asList("test"));//consumer.subscribe(Arrays.asList("bigdata2","bigdata3"));}@Overridepublic void run() {long i = 0;// 一般生产这开启就不需要关闭了while (true) {//获取取数 每次拉去会获取多条数据,默认500ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> recode : records) {i++;SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateString = formatter.format(recode.timestamp());
//解析打印ConsumerRecordsSystem.out.println(i +" timestamp= " + dateString + " partition=" + recode.partition() + " recodeOffset = " + recode.offset() + " " + "recodeValue = " + recode.value());}System.out.println("===================================");}}public static void main(String[] args) {new CustomConsumerSasl().start();// new MyConsumer().setConsumerTest();}
}
参考:https://www.jianshu.com/p/09129c9f4c80
使用SASL机制的KAFKA集群的安装:(包含springBoot整合)
阿里云社区zookeeper和kafka的SASL认证以及生产实践
Kafka安全认证SASL/PLAIN,并和springBoot整合
Spring Boot + Spring Kafka配置公网接入阿里云Kafka
Kafka SASL_PLAINTEXT权限管理,并整合SpringBoot相关推荐
- Kafka单机环境搭建及整合SpringBoot完成基本使用
Kafka单机环境搭建及整合SpringBoot完成基本使用 Kafka单机环境搭建 下载kafka_2.11-1.1.0.tgz版本 下载地址 https://archive.apache.org/ ...
- SpringBoot中关于Shiro权限管理的整合使用
在整合Shiro的时候,我们先要确定一下我们的步骤: 1.加入Shiro的依赖包,实现自己的Realm类(通过继承AuthorizingRealm类): 2.实现Shiro的配置类 3.实现前端的登录 ...
- Java权限管理|基于springBoot+springSecurity+jwt实现前后端分离用户权限认证
基于springBoot+springSecurity+jwt实现前后端分离用户权限认证 1. 项目说明 主要基于前后端分离情况下用户权限认证, 当用户登录认证成功后,每个用户会获取到自己的tok ...
- Spring +mybatisplus+shiro权限管理集成整合
一.Apache Shiro是一个功能强大.灵活的,开源的安全框架.它可以干净利落地处理身份验证.授权.企业会话管理和加密. Shiro能做什么呢? 验证用户身份 用户访问权限控制,比如:1.判断用户 ...
- SpringBoot 基于Shiro + Jwt + Redis的用户权限管理 (三) 鉴权
项目Github地址: https://github.com/baiye21/ShiroDemo SpringBoot 基于Shiro + Jwt + Redis的用户权限管理 (一) 简介与配置 S ...
- kafka集群搭建+权限认证(SASL/SCRAM)+整合springboot
本文介绍的的是kafka集群搭建.kafka权限认证(SASL/SCRAM).整合springboot项目. 1.创建kafka日志和zookeeper文件目录: /data/kafka/kafka- ...
- springBoot整合spring security+JWT实现单点登录与权限管理前后端分离
在前一篇文章当中,我们介绍了springBoot整合spring security单体应用版,在这篇文章当中,我将介绍springBoot整合spring secury+JWT实现单点登录与权限管理. ...
- springBoot整合spring security+JWT实现单点登录与权限管理前后端分离--筑基中期
写在前面 在前一篇文章当中,我们介绍了springBoot整合spring security单体应用版,在这篇文章当中,我将介绍springBoot整合spring secury+JWT实现单点登录与 ...
- 《SpringBoot与Shiro整合-权限管理实战---从构建到模拟数据库登入》
<SpringBoot与Shiro整合-权限管理实战> ---- 从构建到模拟数据库登入 ---- 点击下载源码 ---- 或者查看? 文章目录 <SpringBoot与Shiro整 ...
最新文章
- Redis5.0之Stream案例应用解读
- 请收藏!新型冠状病毒感染的肺炎防控知识手册.pdf
- 趋势不能deploy的解决方法
- 神经网络激活函数对数函数_神经网络中的激活函数
- java jstack 工具_java命令之jstack工具
- C++学习——抽象类
- 获取文件当前地址GetModuleFileName函数
- diy操作系统 0:万事开头难
- 【转】腾讯云-解决Winscp permission denied的问题
- 【为什么需要FabricPath】FabricPath是思科 Nexus交换机上的一项技术特性,其目标是在保证二层环境的前提下,提高性能。来看看为什么数据中心需要FabricPath?
- 复制文字到剪贴板的几种方法
- 基于plc的污水处理,组态王动画仿真,带PLC源代码,组态王源代码
- html简单导航页单页源码
- Windows11 下屏幕亮度自动调整的问题解决
- matlab作业 阳光的快乐老爹,霍思燕6岁儿子近照曝光,调皮起来超阳光,完美继承老爹容颜!...
- latex 表格标题分行和居中
- 领扣LintCode问题答案-58. 四数之和
- Ueeshop:外贸网站推广优化方法和注意事项
- [附源码]计算机毕业设计宁财二手物品交易网站Springboot程序
- Linux--Linux服务器空间占满解决办法