java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > java kafka动态设置用户读写权限

java kafka如何动态设置用户读写权限

作者:精英丶阿琦

这篇文章主要介绍了java kafka如何动态设置用户读写权限问题,具有很好的参考家价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

kafka动态设置用户读写权限

我这里cloud Hoxton.SR8 版本

boot 2.3.0.RELEASE版本

直接上代码了嗷

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeAclsResult;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.springframework.kafka.core.KafkaAdmin;
public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        // broker地址,多个用逗号分割
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:端口");
        configs.put("security.protocol", "SASL_PLAINTEXT");
        configs.put("sasl.mechanism", "SCRAM-SHA-512");
        // 登录broker的账户 admin是管理员
        configs.put("sasl.jaas.config",
            "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin\";");
        KafkaAdmin admin = new KafkaAdmin(configs);
        AdminClient adminClient = AdminClient.create(admin.getConfigurationProperties());
        // principal:User:test2是需要赋予权限的帐号
        // host:主机 (*号即可)
        // operation:权限操作
        // permissionType:权限类型
        AccessControlEntry ace = new AccessControlEntry("User:test2", "*", AclOperation.WRITE, AclPermissionType.ALLOW);
        // resourceType:资源类型(topic)
        // name:topic名称
        // patternType:资源模式类型
        ResourcePattern rp = new ResourcePattern(ResourceType.TOPIC, "Z7TEST", PatternType.LITERAL);
        AclBinding ab = new AclBinding(rp, ace);
        // 多个权限赋予可以传list
        List<AclBinding> ablist = Arrays.asList(ab);
        adminClient.createAcls(ablist);
        // 可以查看赋予用户的所有权限
        DescribeAclsResult b = adminClient.describeAcls(AclBindingFilter.ANY);
        System.out.println(b.values());
        adminClient.close();
    }

kafka版本

kafka-2.11-2.1.1

Kafka 0.7 最早开源版本

Kafka 0.8 

Kafka 0.9.0.0 后 

Kafka 0.10.0.0 后

Kafka 0.11.0.0 后

不管用哪个版本,都要保持服务器端版本和客户端版本一致

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文