我在学Kafka春靴。我想在我的consumer类中添加第二个consumer,它订阅了与第一个主题相同的主题,但具有不同的groupID。这些类不是很复杂,当我只有第一个消费Json的消费者(或者至少输出是Json?)时,它们就可以工作。还有一点需要说明的是,我从一个生产者和消费者开始,他们使用@EnableBindings方法,但这种方法不受欢迎,所以我正在学习正确/新的方法。
任何提示!请让我走上正确的道路。
我有很多Maven依赖项,所以我只是总结一下:它包括spring-kafka、kafka-stream、spring-boot-starter-jpa等等......
应用程序属性,我不确定底部的标头属性是否正确:
spring.kafka.bootstrap-servers=localhost:29092
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.key-deserializer=org.apache.kafka.common.serialization.ErrorHandlingDeserializer
spring.kafka.consumer.properties.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.properties.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
spring.kafka.consumer.properties.spring.json.use.type.headers=false
#cockroachDB configs omitted
消费者类别:
@Service
public class BookConsumer {
@Autowired
public BookConsumer(BookService bookService) {
this.bookService=bookService;
}
private final BookService bookService;
@KafkaListener(topics="testKafka", groupId="group2")
public void consume(BookDto books) {
System.out.println("saved!");
bookService.save(books);
}
@KafkaListener(topics="testKafka", groupId="group_id")
public void consumeMessage(BookDto message){
System.out.println(message);
}
}
制作人阶层:
@Service
public class BookProducer {
@Autowired
private KafkaTemplate<String,BookDto> jsonTemplate;
public void sendBookEvent(BookDto book) {
this.jsonTemplate.send("testKafka", book);
}
public void sendJson(BookDto booklist) {
this.jsonTemplate.send("testKafka", booklist);
}
}
我还有一个调用事物的Rest控制器,我将只包括与生产者和消费者相关的两个。它是“/sendBookFromList”,应该用于当前无论如何都不工作的消费者:
@RestController
public class HelloController {
private final BookProducer producer;
@Autowired
private final BookService bookService;
@Autowired
public HelloController(BookProducer producer, BookService bookService) {
this.producer=producer;
this.bookService=bookService;
}
public List<BookDto> makeList() {
List<BookDto> readingList = new ArrayList<BookDto>();
readingList.add(new BookDto(1, "Needful Things", "Stephen King"));
readingList.add(new BookDto(2, "The Three-Body Problem", "Liu Cixin"));
readingList.add(new BookDto(666, "Cujo", "Stephen King"));
readingList.add(new BookDto(8, "The Castle", "Franz Kafka"));
return readingList;
}
@RequestMapping("json/{pos}")
public String sendJson(@PathVariable("pos") Integer pos) {
producer.sendJson(makeList().get(pos));
return "BookDto sent!";
}
@RequestMapping("/sendBookFromList/{listPos}")
public String sendBooks(@PathVariable("listPos") Integer pos) {
producer.sendBookEvent(makeList().get(pos));
return "added!";
}
我有一个BookDto类和一个实体,因为我将它连接到一个蟑螂数据库,我将包括它以防万一:
public class BookDto {
private Integer id;
private String name;
private String Author;
public BookDto() {
}
public BookDto(Integer id, String name, String Author) {
this.id = id;
this.name = name;
this.Author = Author;
}
//I'll omit the getter and setters here but they exist!
@Override public String toString() {
return "Book "+id+": "+name+" by "+Author; }
}
//I'm using Lombok as well, I didn't forget my constructors and stuff I swear!
@Entity(name="BOOK")
@Data
public class Book {
@Id
private Integer id;
private String name;
private String author;
}
为了澄清,我使用了映射器,因为我认为这可能是从Dto到实体的转换之间的问题。我认为它不起作用,因为这是错误消息(过去是Book而不是BookDto,没有映射器):
Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.Book.kafka.BookConsumer.consume(com.Book.kafka.BookDto)]
Cannot convert from [[B] to [com.Book.kafka.BookDto] for GenericMessage [payload=byte[48], headers={kafka_offset=151, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1ce9bcc9, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=testKafka, kafka_receivedTimestamp=1649930203804, __TypeId__=[B@163eece4, kafka_groupId=group2}]
附加信息:我正在docker中运行Kafka和ZooGuard
非常愚蠢的解决方案,但它似乎解决了我的问题。我怀疑我把财产搞砸了,我是对的!(我想)
在删除后,我将其安装并运行良好。所有Spring的属性。Kafka。[消费者/生产者]。属性。[值/键]-应用程序属性中的反序列化程序。
我不知道为什么这修复了它,但消费者现在可以正确处理它,并将我的图书项目保存到CocroachDB。
我是Apache Kafka的新手,能够从发送方发送消息(以JSON格式),但不能在消费者服务中消费。 有人能帮我吗?
我有课: 配置类:公共类RabbitConfiguration{ 听众: a仅启动应用程序有错误 2017-08-08 12:58:02.128警告5024---[cTaskExecutor-1]S.A.R.L.ConditionalRejectingErrorHandler:Rabbit消息侦听器执行失败。 原因:org.SpringFramework.Messaging.Handler.Ann
下面是我对kafka侦听器的方法定义,如果接收空或空的有效负载字符串,我想我会得到下面的错误...你能帮帮我吗。
该应用程序使用Spring JMS Listener连接到JMS队列,并在WAS 8.5中部署。x、 使用spring应用程序上下文xml,bean用初始化,目标为。 在class方法中,代码试图使用查找JNDI引用,代码片段如下, 应用程序在处理消息时抛出以下异常。 例外:javax。命名。ConfigurationException:无法完成“java:”名称上的JNDI操作,因为服务器运行时
将使用者实例订阅到主题 接下来,我将通过Spring-Kafka应用程序使用发送到Kafka代理的事件,该应用程序应该使用JSON并通过带有@kafkalistener注释的消费者侦听器方法将其转换回Java类型,如下所示: 但是当我试图通过上面的接收器代码使用消息时,在KafkaListenerContainer中得到了下面的错误日志 我尝试但收到相同错误的其他侦听器方法定义是: 使用Inven