kafka sasl java_Kafka SASL 安全认证
java client 中添加 SASL 设置信息:
Java client consumer properties配置.png
注意 sasl.jaas.config 配置中的分号必不可少。
package kafka;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestKafkaSasl {
private static final Logger logger = LoggerFactory.getLogger(TestKafkaSasl.class);
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test_group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// sasl.jaas.config的配置, 结尾分号必不可少.
props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "PLAIN");
@SuppressWarnings("resource")
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
try {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, partition = %d, key = %s, value = %s%n",
record.offset(), record.partition(), record.key(), record.value());
logger.info("offset = {}, partition = {}, key = {}, value = {}",
record.offset(), record.partition(), record.key(), record.value());
}
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
}
}
}
}
运行测试:
继续生产数据.png
java client sasl消费数据测试.png
kafka sasl java_Kafka SASL 安全认证相关推荐
- sasl java_kafka sasl java api
第一种 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "no ...
- kafka sasl java_Kafka安装及开启SASL_PLAINTEXT认证(用户名和密码认证)
前些日子要封装一个kafka的客户端驱动,配置了下kafka环境,发现配置复杂度完爆rabbitmq很多倍啊,而且发布订阅模式使用起来也很麻烦,可能就胜在分布式了吧. kafka需要java环境,自行 ...
- Kafka SCRAM和PLAIN权限认证
目前Kafka ACL支持多种权限认证,今天笔者给大家介绍一下SCRAM和PLAIN的权限认证.验证环境如下: JDK: 1.8 Kafka: 2.3.0 Kafka Eagle: 1.3.8 2.1 ...
- kafka sasl java_Kafka 集群配置SASL+ACL
** Kafka 集群配置SASL+ACL 测试环境:** 系统: CentOS 6.5 x86_64 JDK : java version 1.8.0_121 kafka: kafka_2.11-1 ...
- kafka集群搭建+权限认证(SASL/SCRAM)+整合springboot
本文介绍的的是kafka集群搭建.kafka权限认证(SASL/SCRAM).整合springboot项目. 1.创建kafka日志和zookeeper文件目录: /data/kafka/kafka- ...
- kafka sasl java_kafka 添加SASL鉴权
kafka 版本信息:kafka_2.12-2.3.0 使用kafka自带的zookeeper启动 bin/zookeeper-server-start.sh config/zookeeper.pro ...
- mysql邮箱认证_邮件服务系列postfix+sasl+mysql实现用户认证功能
1.卸载bind [root@mail ~]# rpm -e bind-utils 2.安装bind97-utils bind97-libs bind97 [root@mail ~]# yum ins ...
- php7 memcached sasl,memcached sasl
服务端 # rpm -qa | grep sasl # cyrus-sasl-plain-2.1.23-8.el6.i686 # cyrus-sasl-devel-2.1.23-8.el6.i686 ...
- 安装 kafka 配置 sasl 认证
一.安装kafka 1.安装jdk yum serach jdk # 查找jdk yum install java-latest-openjdk.x86_64 # 选择jdk安装,这里选择最新的 ...
最新文章
- 激光+视觉+IMU+GPS如何做融合?
- 第1条:考虑用静态工厂方法代替构造器
- Java开源数据库管理工具
- Request请求总结
- 日志处理--高效Linux命令整理
- hdu 4279 Number
- php同时抢购 代码,浅谈PHP实现大流量下抢购方案
- 写个买卖小游戏,第1天(昨天)
- 【数据分析就业实战】——缺失值的常见处理方法
- 利用位运算实现加减乘除
- 如何使用Erdas裁剪万能地图下载器下载的谷歌卫星地图
- 四轴飞控DIY Mark4 - RTH/GPS Rescure
- three.js 视频作为纹理贴图
- 一文读懂什么是聚合支付
- php array_change_key_case()
- python中文占几个字节_中文在python中占几个字节
- linux 多个csv合并成一个csv
- C语言LMS双麦克风消噪算法,芯片内部的噪声抑制算法,语音芯片来说也是一样(双麦克风降噪理念)...
- paramiko-简介
- zabbix配置钉钉报警
热门文章
- Word2010中搜狗拼音输入法(各种输入法)消失了的解决办法
- 最实用的Android Debug Bridge (ADB)使用手册
- 特征值与特征向量、特征方程、特征多项式、矩阵相似、相似变换、矩阵对角化、奇异值分解(Singular Value Decomposition)手算加MATLAB
- python数据分析学习笔记——numpy来实现数据拟合
- MySQL 8.0与MariaDB 10.4,谁更易于填坑补锅?
- 解决Result Maps collection already contains value for...BaseResultMap问题
- typeAliases和package标签的用法
- “应用自动启动”和“关联启动”权限
- python中for in语句对列表的修改
- 广义状态平均法功率变换器建模分析