java kafka如何动态设置用户读写权限
作者:精英丶阿琦
这篇文章主要介绍了java kafka如何动态设置用户读写权限问题,具有很好的参考家价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
kafka动态设置用户读写权限
我这里cloud Hoxton.SR8 版本
boot 2.3.0.RELEASE版本
直接上代码了嗷
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeAclsResult;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.springframework.kafka.core.KafkaAdmin;
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
// broker地址,多个用逗号分割
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:端口");
configs.put("security.protocol", "SASL_PLAINTEXT");
configs.put("sasl.mechanism", "SCRAM-SHA-512");
// 登录broker的账户 admin是管理员
configs.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin\";");
KafkaAdmin admin = new KafkaAdmin(configs);
AdminClient adminClient = AdminClient.create(admin.getConfigurationProperties());
// principal:User:test2是需要赋予权限的帐号
// host:主机 (*号即可)
// operation:权限操作
// permissionType:权限类型
AccessControlEntry ace = new AccessControlEntry("User:test2", "*", AclOperation.WRITE, AclPermissionType.ALLOW);
// resourceType:资源类型(topic)
// name:topic名称
// patternType:资源模式类型
ResourcePattern rp = new ResourcePattern(ResourceType.TOPIC, "Z7TEST", PatternType.LITERAL);
AclBinding ab = new AclBinding(rp, ace);
// 多个权限赋予可以传list
List<AclBinding> ablist = Arrays.asList(ab);
adminClient.createAcls(ablist);
// 可以查看赋予用户的所有权限
DescribeAclsResult b = adminClient.describeAcls(AclBindingFilter.ANY);
System.out.println(b.values());
adminClient.close();
}kafka版本
kafka-2.11-2.1.1
- Kafka 1.0.0 后,Kafka 版本命名规则从 4 位到 3 位
- Kafka版本号是 2.1.1
- 前 2 : 大版本号 (MajorVersion)
- 中 1 : 小版本号或次版本号 (Minor Version)
- 后 1 : 修订版本号 (Patch)
Kafka 0.7 最早开源版本
- 只提供最基础的消息队列功能,扭头就跑
Kafka 0.8
- 引入了副本机制, 成了分布式高可靠消息队列解决方案
- 副本备份机制保障了消息无丢失
- 生产/消费用老客户端 API,要指定 ZooKeeper 的地址 , 而非 Broker的地址
- 生产者 API,默认用同步方式发送消息,吞吐量一般 (异步方式 : 有可能丢失消息)
- 0.8.2.0 后 , 引入新 Producer API (Bug),即 : 指定 Broker 地址的 Producer
- 升到 0.8.2.2 后 , 用老消费者 API (较稳定)
Kafka 0.9.0.0 后
- 增加基础的安全认证 / 权限功能
- 用 Java 重写了新消费者 API (Bug)
- 引入了 Kafka Connect 组件 , 实现高性能的数据抽取
- 新 Producer API 较稳定
Kafka 0.10.0.0 后
- 引入了 Kafka Streams,正式成为分布式流处理平台
- 自 0.10.2.2 后,新 Consumer API 较稳定
Kafka 0.11.0.0 后
- 引入幂等性 Producer API 以事务 (Transaction) API (Bug)
- 对 Kafka 消息格式做了重构
- 建议用 0.11.0.3
不管用哪个版本,都要保持服务器端版本和客户端版本一致
- 后果 : 损失 Kafka 的性能优化
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
