我们想通过spring-kafka列出所有Kafka主题,以获得类似于kafka命令的结果:
bin/kafka-topics.sh --list --zookeeper localhost:2181
在下面的服务中运行 getTopics() 方法时,我们会得到
配置:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
服务:
@Service
public class TopicServiceKafkaImpl implements TopicService {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Override
public Set<String> getTopics() {
try (Consumer<String, String> consumer =
consumerFactory.createConsumer()) {
Map<String, List<PartitionInfo>> map = consumer.listTopics();
return map.keySet();
}
}
Kafka已经启动并运行,我们可以成功地从应用程序向主题发送消息。
kafka-tops--list
是一个外壳脚本,它只是kafka.admin.TopicCommand
类的包装器,您可以在其中找到您正在寻找的方法
或者,您也可以使用AdminClient#listTopics
方法
您正在连接到动物园管理员 (2181) 而不是 Kafka(默认为 9092)。
爪哇Kafka客户端不再直接与 ZK 通信。
您可以使用管理客户端列出此类主题
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(properties);
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
System.out.println("topics:" + adminClient.listTopics(listTopicsOptions).names().get());
我正在运行ansible在一台服务器上安装Kafka。(在Dev env上,独立设置)但我的剧本在列出Kafka主题的任务列表时失败了。下面是日志。 Journal alctl-fu动物园管理员 /opt/kafka/kafka\u 2.12-2.2.2/config/zookeeper。属性 动物园管理员。服务
在我们的docker-swarm中运行kafka connect,使用以下撰写文件: kafka connect节点成功启动,我可以设置任务并查看这些任务的状态······ 我是否在撰写文件或任务配置中缺少某些配置?
我已经在集群中配置了3个kafka,我正在尝试与sping-kafka一起使用。 但是在我杀死kafka领导者后,我无法发送其他消息到队列。 我将Spring.kafka.bootstrap-servers属性设置为:“kafka-1:9092;kafka-2:9093,kafka-3:9094”以及我的主机文件中的所有名称。 Kafka0.10版 有人知道如何正确配置? 编辑 我测试过一个东西,
我正在尽可能地简化我的消费者。问题是,当我看到Kafka监听器中的记录时: <代码>列表 我在使用时注意到: SpringKafka。消费者值反序列化器=io。汇合的。Kafka。序列化程序。KafkaavroderializerSpring。Kafka。消费者键反序列化器=组织。阿帕奇。Kafka。常见的序列化。字符串反序列化器 我得到以下错误: 2020-12-02 17:04:42.745调
问题内容: 我想通过Java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符下创建一个主题,并且如果我通过java api推送消息,它也可以正常工作。但是我想通过java api创建一个主题。经过长时间的搜索,我发现了以下代码, 我尝试了上面的代码,它表明创建了主题,但是无法在该主题中推送消息。我的代码有什么问题吗?还是通过其他方式实现以上目标? 问题答案:
我想问你们一些关于阿帕奇·Kafka和压缩主题的问题。我们想提供一些Kafka压缩主题的PII数据。我们想通过墓碑删除这个主题的数据。目前有多个问题需要验证我们的假设: 有没有其他公司像KIP-354那样通过压缩主题和墓碑生成来满足Kafka的gdpr要求(忘记的权利)https://cwiki.apache.org/confluence/display/KAFKA/KIP-354:添加最大日志压