当前位置: 首页 > 知识库问答 >
问题:

Spring kafka如何配置两个不同kerberos的jaas

雷骁
2023-03-14

我有Spring启动应用程序,它是接收静态数据,基于一些业务html" target="_blank">逻辑,我需要将数据转移到两个不同的kafka集群,它们有自己的kerberos密钥提及的jaas文件。

我已经编写了两个不同的生产者实例,在它们的不同对象实例中具有以下属性。

@Service
public class EventProducer {
    private Logger logger = LoggerFactory.getLogger(EventProducer.class);
 
    
    Producer<String, String> kafkaProducer = null;

    @Autowired
    public Producer<String, String> createProducer() {
        if (kafkaProducer == null) {
            Properties props = getKafkaConfig();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Cluste_1_hostaddress:9092");
         props.put(ProducerConfig.CLIENT_ID_CONFIG,"usertest");

         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.RETRIES_CONFIG, "3");

         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
         props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1600);

         System.setProperty("javax.security.auth.useSubjectCredsOnly", "true"); 
         System.setProperty("java.security.auth.login.config", "/home/user/clusrter_1_jaas.conf);   
         
         props.put("security.protocol", "SASL_PLAINTEXT");
         props.put("kafka.cluster.SecurityProtocol",PLAINTEXTSASL);
         props.put("sasl.kerberos.service.name",  "kafka"); 
         props.put("sasl.kerberos", "sasl.kerberos.service.namekafka");     
         props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
         props.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
         props.put("sasl.enabled.mechanisms", "PLAIN");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
             
         kafkaProducer = new KafkaProducer<String, String>(props);
        }
        return kafkaProducer;
    }
 
}

第二制片人

@Service
public class MovementProducer {
    private Logger logger = LoggerFactory.getLogger(MovementProducer.class);
    
    Producer<String, String> kafkaProducer = null;

    @Autowired
    public Producer<String, String> createProducer() {
        if (kafkaProducer == null) {
            Properties props = getKafkaConfig();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Cluste_2_hostaddress:9092");
         props.put(ProducerConfig.CLIENT_ID_CONFIG,"usertest");

         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.RETRIES_CONFIG, "3");

         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
         props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1600);

         System.setProperty("javax.security.auth.useSubjectCredsOnly", "true"); 
         System.setProperty("java.security.auth.login.config", "/home/user/clusrter_2_jaas.conf);   
         
         props.put("security.protocol", "SASL_PLAINTEXT");
         props.put("kafka.cluster.SecurityProtocol",PLAINTEXTSASL);
         props.put("sasl.kerberos.service.name",  "kafka"); 
         props.put("sasl.kerberos", "sasl.kerberos.service.namekafka");     
         props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
         props.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
         props.put("sasl.enabled.mechanisms", "PLAIN");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         
            kafkaProducer = new KafkaProducer<String, String>(props);
        }
        return kafkaProducer;
    }
 
}

当我将其作为两个服务启动并仅启用生产者实例时,它可以工作,但当我在单个jar中启用两个实例时,只有一个生产者可以工作,其他生产者会遇到身份验证问题。

我觉得这是因为System.setProperty(“java.security.auth.login.config”,“”),因为它是全局系统变量,所以当我在单个进程中同时使用这两个变量时,它会覆盖,所以只有一个可以工作。

那么除了启动两个进程之外,还有什么方法可以解决这个问题吗?我只有一个Spring service,应该可以为两个不同的kafka集群生产。

共有1个答案

方浩旷
2023-03-14

Kafka客户端的最新版本为不同客户端的多个JAAS配置提供了选项。

例如,如果您的实例想要连接两个具有不同 JAAS conf 的集群,我们可以在不同的生产者和消费者级别上覆盖。只需创建 2 个独立的生产者工厂并设置

clustera.java.security.auth.login.config=com.sun.security.auth.module.Krb5LoginModule required \
    useKeyTab=true \
    storeKey=true \
    keyTab="test.keytab" \
    principal="test@domain.com";

clusterb.java.security.auth.login.config=com.sun.security.auth.module.Krb5LoginModule required \
    useKeyTab=true \
    storeKey=true \
    keyTab="testb.keytab" \
    principal="testb@domain.com"
 类似资料:
  • 问题内容: 我有一份詹金斯工作,后来被克隆和修改。现在,我想比较两个作业的配置。不是历史更改,不是结果,而是两个作业的配置。 是否可以比较两个Jenkins作业的配置? 问题答案: 尝试: 要么

  • 问题内容: 我想做类似于“和”过滤器示例的操作,除了每个示例中都带有“应该”的术语,而不是示例中的字段类型。我提出以下内容: 但是,我收到此错误: 还有另一种方法可以执行我正在尝试执行的操作,还是我走在正确的轨道上?还是在Elasticsearch中这是不可能的? 问题答案: 每个布尔查询子句可以包含多个子句。字词查询(http://www.elasticsearch.org/guide/refe

  • 我试图用两个CacheManager设置一个spring-boot应用程序,代码如下: 但是当我启动应用程序时,它总是失败,出现以下错误: 由:java.lang.IllegalStateException引起:当预期只有1个CachingConfigurer实现时,发现了2个。重构配置,使CachingConfigurer只实现一次或根本不实现。在org.springframework.cach

  • 如何在NGINX配置中为两个位置设置相同的规则? 我尝试了以下方法 但是nginx reload抛出了这个错误:

  • 我正在为Spring Core认证学习,对于使用Java配置方式配置Bean的相关练习,我有以下疑问。 Java配置的正确解释是Spring吗? 例如,我可以说RewardNetwork是声明的bean,而RewardNetworkImpl是这个bean的当前实现吗? 所有的3Beans(AccountRepository,RestaurantRepository和RewardRepository

  • 我有一个spring boot kafka客户端应用程序,其中有两个消费者在kafka中收听不同的主题和不同的消费者群。 为了实现,我需要下面有两个JAAS会议(使用不同的keytab文件) dc-jaas-A.conf文件 dc-jaas-B.conf 由于下面的connect tFactory在System.setProperty中设置dc-jaas-A.conf和dc-jaas-B.conf