我有一个简单的流处理器(不是消费者/生产者),看起来像这样(Kotlin)
@Bean
fun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {
return Function { input-> input.map { key, value ->
println("\nPAYLOAD KEY: ${key.name}\n");
println("\nPAYLOAD value: ${value.address}\n");
val output = FooAddressPlus()
output.address = value.address
output.name = value.name
output.plus = "$value.name-$value.address"
KeyValue(key, output)
}}
}
这些类fooname
、fooaddress
和fooaddressplus
与处理器在同一个包中。下面是我的配置文件:
spring.cloud.stream.kafka.binder:
brokers: localhost:9093
spring.cloud.stream.function.definition: processFoo
spring.cloud.stream.kafka.streams.binder.functions.processFoo.applicationId: foo-processor
spring.cloud.stream.bindings.processFoo-in-0:
destination: foo.processor
spring.cloud.stream.bindings.processFoo-out-0:
destination: foo.processor.out
spring.cloud.stream.kafka.streams.binder:
deserializationExceptionHandler: logAndContinue
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 1000
The class '<here_comes_package>.FooAddress' is not in the trusted packages: [java.util, java.lang].
If you believe this class is safe to deserialize, please provide its name.
If the serialization is only done by a trusted source, you can also enable trust all (*).
您可以在...binder.configuration
下设置任意配置属性(Kafka streams属性、serdes使用的属性等):
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
spring.json.trusted.packages: '*'
我已经遵循下面的文档,我有一个生产者和消费者与Kinesis流工作得非常好。我想了解如何处理生产者(源)和消费者(处理器)的错误,以防任何异常发生。 根据Spring Stream错误处理文档,我尝试了以下方法: 我该如何处理和解决这个问题?请建议和帮助我。 文档:https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kine
我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要
我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个
我的应用程序有一个生产者和一个消费者。我的生产者不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者监听队列,当有消息在其中时,接受它并处理这条消息。这个过程可能需要几个小时,如果我的消费者没有完成处理当前消息,我不希望他接受队列中的另一条消息。 我认为AKKA和AWS SQS可以满足我的需求。通过阅读文档和示例,akka-camel似乎可以简化我的工作。 我在github
向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前