Kafka SCRAM和PLAIN权限认证

Kafka SCRAM和PLAIN权限认证目前 KafkaACL 支持多种权限认证 今天笔者给大家介绍一下 SCRAM 和 PLAIN 的权限认证 验证环境如下 JDK 1 8 Kafka 2 3 0 KafkaEagle 1 3 8 2 1PLAIN 认证首先 在 KAFAK HOME config 目录新建一个文本文件 名为 kafka server plain jaas conf 配置内容如下 KafkaServer org apache kafka common secur

目前Kafka ACL支持多种权限认证,今天笔者给大家介绍一下SCRAM和PLAIN的权限认证。验证环境如下:

 

  • JDK:

    1.8

  • Kafka:

    2.3.0

  • Kafka Eagle:

    1.3.8

 

2.1 PLAIN认证

首先,在$KAFAK_HOME/config目录新建一个文本文件,名为kafka_server_plain_jaas.conf,配置内容如下:

KafkaServer { 
   org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_ke="ke";};

接着,将脚本文件kafka-server-start.sh重命名为kafka-server-plain-start.sh,并修改最后一行的内容为:

# 添加鉴权文件 
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_plain_jaas.conf kafka.Kafka "$@"

然后,复制server.properties文件并重命名为plain.properties,接着修改服务端配置文件plain.properties,内容如下:

# Protocollisteners=SASL_PLAINTEXT://127.0.0.1:9092security.inter.broker.protocol=SASL_PLAINTEXTsasl.mechanism.inter.broker.protocol=PLAINsasl.enabled.mechanisms=PLAIN # ACLallow.everyone.if.no.acl.found=falsesuper.users=User:adminauthorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 

最后,创建客户端用户认证文件,kafka_client_plain_jaas.conf内容如下:

KafkaClient { 
   org.apache.kafka.common.security.plain.PlainLoginModule required username="ke" password="ke";};

 

2.2 启动PLAIN认证集群

2.2.1 启动Zookeeper

# Zookeeper的配置比较简单,这里不多做介绍 
zkServer.sh start

 

2.2.2 启动Kafka集群

 

# 进入到Kafka安装bin目录 
./kafka-server-plain-start.sh ../config/plain.properties &

 

2.2.3 创建Topic

./kafka-topics.sh --create --zookeeper 127.0.0.1:2181/plain --replication-factor 1 --partitions 3 --topic test_plain

 

2.2.4 添加读写权限

# 添加读权限./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/plain --add --allow-principal User:ke --operation Read --topic test_plain # 添加写权限./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/plain --add --allow-principal User:ke --operation Write --topic test_plain# 添加消费者组权限./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/plain --add --allow-principal User:ke --operation Read --group g_plain_test# 查看权限列表./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/plain --list 

 

2.2.5 执行结果

图片

 

2.3 SCRAM认证

PLAIN认证有个问题,就是不能动态新增用户,每次添加用户后,需要重启正在运行的Kafka集群才能生效。为此,在生产环境,这种认证方式不符合实际业务场景。而SCRAM不一样,使用SCRAM认证,可以动态新增用户,添加用户后,可以不用重启正在运行的Kafka集群即可进行鉴权。

新增kafka_server_scram_jaas.conf,配置内容如下:

KafkaServer { 
   org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";};

接着,将脚本文件kafka-server-start.sh重命名为kafka-server-scram-start.sh,并修改最后一行的内容为:

# 添加鉴权文件 
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_scram_jaas.conf kafka.Kafka "$@"

 

然后在$KAFKA_HOME/config目录中,复制server.properties文件并重命名为scram.properties,接着修改服务端配置文件scram.properties,内容如下:

# Protocollisteners=SASL_PLAINTEXT://dn1:9092security.inter.broker.protocol=SASL_PLAINTEXTsasl.mechanism.inter.broker.protocol=SCRAM-SHA-256sasl.enabled.mechanisms=SCRAM-SHA-256 # ACLallow.everyone.if.no.acl.found=falsesuper.users=User:adminauthorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

2.3.1 启动Zookeeper

# Zookeeper的配置比较简单,这里不多做介绍 
zkServer.sh start

 

2.3.2 添加管理员权限和普通用户

# 添加管理员 
./kafka-configs.sh --zookeeper 127.0.0.1:2181/scram --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin 
# 添加普通用户(ke) 
./kafka-configs.sh --zookeeper 127.0.0.1:2181/scram --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ke],SCRAM-SHA-512=[password=ke]' --entity-type users --entity-name ke

 

2.3.3 启动SCRAM认证集群

./kafka-server-scram-start.sh ../config/scram.properties &

 

2.3.4 创建Topic

 

./kafka-topics.sh --create --zookeeper 127.0.0.1:2181/scram --replication-factor 1 --partitions 3 --topic test_scram

 

2.3.5 添加权限

# 添加读权限 ​​​
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/scram --add --allow-principal User:ke --operation Read --topic test_scram 
# 添加写权限 ​​​​​​
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/scram --add --allow-principal User:ke --operation Write --topic test_scram 
# 添加消费者组权限 
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/scram --add --allow-principal User:ke --operation Read --group g_scram_test 
# 查看权限列表 
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/scram --list

 

2.3.6 执行结果

图片Kafka权限级别包含Topic、Group、Cluster、TransactionalId,每个维度涉及的权限内容如下:

Resource Operations
Topic Read,Write,Describe,Delete,DescribeConfigs,AlterConfigs,All
Group Read,Describe,All
Cluster Create,ClusterAction,DescribeConfigs,AlterConfigs,IdempotentWrite,Alter,Describe,All
TransactionalId Describe,Write,All

 

例如,统计Topic的Capacity大小时,如果抛出异常“Cluster authorization failed”,这是由于没有开启Cluster级别的Describe权限,执行如下命令即可:

./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/scram --add --allow-principal User:ke --operation Describe --cluster

那么如何使用Kafka Eagle来集成有SCRAM认证的Kafka集群,进行监控呢?访问http://www.kafka-eagle.org/,下载安装包,解压并配置如下内容:

# 这里对启动一个拥有SCRAM认证的Kafka集群(别名为cluster1)进行配置cluster1.kafka.eagle.sasl.enable=truecluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXTcluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ke" password="ke";# 这里ClientId如果不需要,可以不用配置cluster1.kafka.eagle.sasl.client.id= 

然后,执行ke.sh start进行启动Kafka Eagle监控系统。

4.1 Topic预览

图片

 

4.2 使用KSQL查询Topic

执行如下SQL语句,代码如下:

select * from "test_scram" where "partition" in (0) limit 1

执行结果如下:

图片

生产环境中,用户可能随着业务需要,增加或者删除,此时动态控制用户时很有必要的。而且,生产环境Kafka集群不可能随随便便重启,因此采用SCRAM来进行Kafka鉴权非常适合。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/198637.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月26日 下午2:48
下一篇 2026年3月26日 下午2:48


相关推荐

  • a4988 脉宽要求_A4988步进电机驱动模块谁用过?

    a4988 脉宽要求_A4988步进电机驱动模块谁用过?A4988是一款完全的微步电动机驱动器,带有内置转换器,易于操作。该产品可在全、半、1/4、1/8及1/16步进模式时操作双极步进电动机,输出驱动性能可达35V及±2A。A4988包括一个固定关断时间电流稳压器,该稳压器可在慢或混合衰减模式下工作。转换器是A4988易于实施的关键。只要在“步进”输入中输入一个脉冲,即可驱动电动机产生微步。无须进行相位顺序表、高频率控制行或复…

    2022年6月17日
    34
  • 视频编解码学习之二:编解码框架「建议收藏」

    视频编解码学习之二:编解码框架「建议收藏」第四章视频编码基础 1.压缩码流语法:码流中各个元素的位置关系01001001…图像编码类型(01),宏块类型(00),编码系数1001等语义:每个语法元素所表达的意义。例如:图像编码类型 2.

    2022年7月4日
    35
  • SpringMVC工作流程源码剖析

    SpringMVC工作流程源码剖析原文链接:https://juejin.im/post/5e6b0ee3e51d4526f65cdb50.0x0先看名词DispactherServlet:SpringMVC的心脏,所有的请求从这里进入,也从这里出去HandlerAdapter:请求处理器HandlerMapping:请求和处理对象间的映射关系,可以理解为地址/api对应@RequestMapping(“/…

    2022年6月3日
    41
  • CreateMutex用法详解

    CreateMutex用法详解HANDLE nbsp CreateMutex LPSECURITY ATTRIBUTES nbsp lpMutexAttri nbsp nbsp 指向安全属性的指针 BOOL nbsp bInitialOwne nbsp nbsp 初始化互斥对象的所有者 LPCTSTR nbsp lpName nbsp nbsp 指向互斥对象名的指针 1 nbsp CreateMutex 只是创建了一把锁 nbsp nbsp 这把锁你用来锁门还是锁抽屉还是锁

    2026年3月19日
    2
  • arm64(aarch64)安装centos 7.5.1804

    arm64(aarch64)安装centos 7.5.1804总体说明Arm64的centos版本自7.5.1804以后不再和7.4、7.3、7.2等之前的一样直接提供一个rootfs.tar.xz的压缩包,全部变成了ISO的安装文件,因此需要EFI来引导安装,如果Aarch64的cpu用的是uboot就只有干瞪眼了,笔者花了一翻功夫,终于找到如何从ISO中提取出centos7.5.1804文件系统的方法,方便使用uboot的用户可以使用centos7….

    2022年10月16日
    4
  • ring0获取指定进程的PEB

    ring0获取指定进程的PEBifndefTYPEDE H defineTYPEDE HtypedefPPEB stdcall P PsGetProcess PEPROCESS typedefunsig typedefstruc RTL USER PROCESS PARAMETERS BYTEReserved 16

    2026年3月18日
    1

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号