java kafka如何动态设置用户读写权限
作者:精英丶阿琦
这篇文章主要介绍了java kafka如何动态设置用户读写权限问题,具有很好的参考家价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
kafka动态设置用户读写权限
我这里cloud Hoxton.SR8 版本
boot 2.3.0.RELEASE版本
直接上代码了嗷
import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.DescribeAclsResult; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; import org.springframework.kafka.core.KafkaAdmin; public static void main(String[] args) { Map<String, Object> configs = new HashMap<>(); // broker地址,多个用逗号分割 configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:端口"); configs.put("security.protocol", "SASL_PLAINTEXT"); configs.put("sasl.mechanism", "SCRAM-SHA-512"); // 登录broker的账户 admin是管理员 configs.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin\";"); KafkaAdmin admin = new KafkaAdmin(configs); AdminClient adminClient = AdminClient.create(admin.getConfigurationProperties()); // principal:User:test2是需要赋予权限的帐号 // host:主机 (*号即可) // operation:权限操作 // permissionType:权限类型 AccessControlEntry ace = new AccessControlEntry("User:test2", "*", AclOperation.WRITE, AclPermissionType.ALLOW); // resourceType:资源类型(topic) // name:topic名称 // patternType:资源模式类型 ResourcePattern rp = new ResourcePattern(ResourceType.TOPIC, "Z7TEST", PatternType.LITERAL); AclBinding ab = new AclBinding(rp, ace); // 多个权限赋予可以传list List<AclBinding> ablist = Arrays.asList(ab); adminClient.createAcls(ablist); // 可以查看赋予用户的所有权限 DescribeAclsResult b = adminClient.describeAcls(AclBindingFilter.ANY); System.out.println(b.values()); adminClient.close(); }
kafka版本
kafka-2.11-2.1.1
- Kafka 1.0.0 后,Kafka 版本命名规则从 4 位到 3 位
- Kafka版本号是 2.1.1
- 前 2 : 大版本号 (MajorVersion)
- 中 1 : 小版本号或次版本号 (Minor Version)
- 后 1 : 修订版本号 (Patch)
Kafka 0.7 最早开源版本
- 只提供最基础的消息队列功能,扭头就跑
Kafka 0.8
- 引入了副本机制, 成了分布式高可靠消息队列解决方案
- 副本备份机制保障了消息无丢失
- 生产/消费用老客户端 API,要指定 ZooKeeper 的地址 , 而非 Broker的地址
- 生产者 API,默认用同步方式发送消息,吞吐量一般 (异步方式 : 有可能丢失消息)
- 0.8.2.0 后 , 引入新 Producer API (Bug),即 : 指定 Broker 地址的 Producer
- 升到 0.8.2.2 后 , 用老消费者 API (较稳定)
Kafka 0.9.0.0 后
- 增加基础的安全认证 / 权限功能
- 用 Java 重写了新消费者 API (Bug)
- 引入了 Kafka Connect 组件 , 实现高性能的数据抽取
- 新 Producer API 较稳定
Kafka 0.10.0.0 后
- 引入了 Kafka Streams,正式成为分布式流处理平台
- 自 0.10.2.2 后,新 Consumer API 较稳定
Kafka 0.11.0.0 后
- 引入幂等性 Producer API 以事务 (Transaction) API (Bug)
- 对 Kafka 消息格式做了重构
- 建议用 0.11.0.3
不管用哪个版本,都要保持服务器端版本和客户端版本一致
- 后果 : 损失 Kafka 的性能优化
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。