我有Spring启动应用程序,它是接收静态数据,基于一些业务html" target="_blank">逻辑,我需要将数据转移到两个不同的kafka集群,它们有自己的kerberos密钥提及的jaas文件。
我已经编写了两个不同的生产者实例,在它们的不同对象实例中具有以下属性。
@Service
public class EventProducer {
private Logger logger = LoggerFactory.getLogger(EventProducer.class);
Producer<String, String> kafkaProducer = null;
@Autowired
public Producer<String, String> createProducer() {
if (kafkaProducer == null) {
Properties props = getKafkaConfig();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Cluste_1_hostaddress:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG,"usertest");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1600);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "true");
System.setProperty("java.security.auth.login.config", "/home/user/clusrter_1_jaas.conf);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("kafka.cluster.SecurityProtocol",PLAINTEXTSASL);
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.kerberos", "sasl.kerberos.service.namekafka");
props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
props.put("sasl.enabled.mechanisms", "PLAIN");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaProducer = new KafkaProducer<String, String>(props);
}
return kafkaProducer;
}
}
第二制片人
@Service
public class MovementProducer {
private Logger logger = LoggerFactory.getLogger(MovementProducer.class);
Producer<String, String> kafkaProducer = null;
@Autowired
public Producer<String, String> createProducer() {
if (kafkaProducer == null) {
Properties props = getKafkaConfig();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Cluste_2_hostaddress:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG,"usertest");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1600);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "true");
System.setProperty("java.security.auth.login.config", "/home/user/clusrter_2_jaas.conf);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("kafka.cluster.SecurityProtocol",PLAINTEXTSASL);
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.kerberos", "sasl.kerberos.service.namekafka");
props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
props.put("sasl.enabled.mechanisms", "PLAIN");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaProducer = new KafkaProducer<String, String>(props);
}
return kafkaProducer;
}
}
当我将其作为两个服务启动并仅启用生产者实例时,它可以工作,但当我在单个jar中启用两个实例时,只有一个生产者可以工作,其他生产者会遇到身份验证问题。
我觉得这是因为System.setProperty(“java.security.auth.login.config”,“”),因为它是全局系统变量,所以当我在单个进程中同时使用这两个变量时,它会覆盖,所以只有一个可以工作。
那么除了启动两个进程之外,还有什么方法可以解决这个问题吗?我只有一个Spring service,应该可以为两个不同的kafka集群生产。
Kafka客户端的最新版本为不同客户端的多个JAAS配置提供了选项。
例如,如果您的实例想要连接两个具有不同 JAAS conf 的集群,我们可以在不同的生产者和消费者级别上覆盖。只需创建 2 个独立的生产者工厂并设置
clustera.java.security.auth.login.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="test.keytab" \
principal="test@domain.com";
clusterb.java.security.auth.login.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="testb.keytab" \
principal="testb@domain.com"
问题内容: 我有一份詹金斯工作,后来被克隆和修改。现在,我想比较两个作业的配置。不是历史更改,不是结果,而是两个作业的配置。 是否可以比较两个Jenkins作业的配置? 问题答案: 尝试: 要么
问题内容: 我想做类似于“和”过滤器示例的操作,除了每个示例中都带有“应该”的术语,而不是示例中的字段类型。我提出以下内容: 但是,我收到此错误: 还有另一种方法可以执行我正在尝试执行的操作,还是我走在正确的轨道上?还是在Elasticsearch中这是不可能的? 问题答案: 每个布尔查询子句可以包含多个子句。字词查询(http://www.elasticsearch.org/guide/refe
我试图用两个CacheManager设置一个spring-boot应用程序,代码如下: 但是当我启动应用程序时,它总是失败,出现以下错误: 由:java.lang.IllegalStateException引起:当预期只有1个CachingConfigurer实现时,发现了2个。重构配置,使CachingConfigurer只实现一次或根本不实现。在org.springframework.cach
如何在NGINX配置中为两个位置设置相同的规则? 我尝试了以下方法 但是nginx reload抛出了这个错误:
我正在为Spring Core认证学习,对于使用Java配置方式配置Bean的相关练习,我有以下疑问。 Java配置的正确解释是Spring吗? 例如,我可以说RewardNetwork是声明的bean,而RewardNetworkImpl是这个bean的当前实现吗? 所有的3Beans(AccountRepository,RestaurantRepository和RewardRepository
我有一个spring boot kafka客户端应用程序,其中有两个消费者在kafka中收听不同的主题和不同的消费者群。 为了实现,我需要下面有两个JAAS会议(使用不同的keytab文件) dc-jaas-A.conf文件 dc-jaas-B.conf 由于下面的connect tFactory在System.setProperty中设置dc-jaas-A.conf和dc-jaas-B.conf