我收到了来自Kafka的JSON字符串,需要由PySpark处理。字符串如下所示: 我的计划是将字符串分成JSON字段。为此,我定义了以下模式: 但是,使用此架构会导致以下错误: 但是,如果我使用没有嵌套字段的模式(如下所示),我可以解析: 我的目标是得到这样的输出: 我想在这方面得到一些帮助。现在我可以得到除嵌套结构之外的所有字段。 我使用的模式如下: Adam提到的模式适用于这个特定的字符串。
在spring-kafka中,如何从包中添加类作为自定义头字段来信任? 消息是这样发送的: 接收端如下所示: 我不断得到的例外是:
我正在使用spring boot构建一个web应用程序,现在我需要接收实时通知。我正计划使用apache kafka作为这方面的消息代理。要求用户具有不同的角色,并且根据角色,他们应该接收其他用户正在执行的操作的通知。 我设置了一个生产者和消费者,作为消费者,我可以接收发布到一个主题的信息,比如说topic1。 我遇到的问题是,我可以让多个用户收听同一个主题,而每个用户都应该得到发布到该主题的消息
我试图构建一个简单的spring boot Kafka Consumer来消费来自Kafka主题的消息,但是由于KafkaListener方法没有被触发,所以没有任何消息被消费。 Java类: start.java: kafkaConsumerConfig.java:
我看到了Spring的Kafka代码,我有些怀疑: > 如果我们将1@KafKalistener与2个主题一起使用,那么spring Kafka将创建一个MessageListenerContainer。如果我为每个主题使用单独的@KafKalistener,那么将创建2个MessageListenerContainer。 MessageListenerContainer是否意味着消费者? 请帮帮
这是创建ListenerContainerFactory的类 这是我用@KafKalistener注释的Listener类 这是KafkaListenerConfig类,它接受引导服务器、主题名称等。
在我的例子中,我们有auto.commit.enable=false,以便在处理消息后将offset提交给Zookeeper。如果处理失败,那么offset将不会被提交,我们应该尝试在某个配置的时间内再次处理相同的消息,从zookeeper的offset开始。但它不起作用,因为,我假设,Apache Kafka客户端在内存中保留了偏移量。 我发现kafka.consumer.consumerite
版本: null
我正在尝试为节点中的应用程序设置Kafka队列。我在一个消费者组中有3个消费者,并且它订阅了一个主题,但是当生产者发送消息到一个有3个分区的主题时,消费者组接收到重复的消息,例如,consumer1从partition1读取并获取消息,而consumer2也从相同的分区读取并获取相同的消息。 我给出了到git存储库的链接,其中有所有的可执行代码。我找不到它是设置问题吗?或节点包问题。 为创建使用者
我使用的是Spring-Kafka2.2.2.release(org.apache.kafka:kafka-clients:jar:2.0.1)和spring-boot(2.1.1)。我无法执行事务,因为我的侦听器无法获得分配的分区。我只为一个消费者创建了建议的配置。我正在尝试配置一个事务性侦听器容器,并且只处理一次 我使用事务管理器配置了生产者和使用者,生产者使用事务id,使用者使用isolat
我正在建立一个新的Kafka集群,为了测试目的,我创建了一个有1个分区和3个副本的主题。 有什么想法哪种配置或其他东西可以帮助我消费更多的数据吗?? 提前致谢
我正在尝试使用Kafka-Python编写一个消费者,以确保精确的once语义。分区中的消息是使用事务感知生成器生成的。我从Kafka文档中了解到,我应该将指定为,这样它将只读取提交的消息。问题是,我在Python客户机的文档中没有看到关于如何指定的任何地方。关于如何让我的消费者只阅读提交的消息,有什么想法吗? 预期结果:只需获取transation committed消息实际结果:使用者甚至读取
以下是我的Kafka消费者属性:- Kafka Producer在其属性中有一个事务id,在推送一些消息之后,它将事务作为一个整体提交。以下是Kafka制作人的属性:- log.info(“初始化属性”);属性道具=新属性(); 我无法理解是否commit没有从生产者端正确地发生,从而导致Kafka消费者无法用事务性语义读取它,或者Kafka消费者端存在问题。 任何帮助都将不胜感激。
当Kafka broker不可用时,我的spring Boot2.1.5消费者应用程序会以以下例外情况关闭。它使用的是spring-kafka 2.2.6。我需要我的应用程序保持和恢复消费时,Kafka经纪人回来。 例外情况: 我使用尝试了@kafkalistener,并使用CommandLineRunner来绕过它,这样应用程序就不会关闭,但当broker返回时,它不会使用消息,直到重新启动为止