我是Flink的新手,所以在定义Flink中的水印时,我面临一些问题。 让我们从Kafka消费者开始。使用的反序列化是JSONKeyValueDeserializationSchema,因此没有自定义解析。 如果将接收器应用于此代码,则其工作正常。问题是需要水印来避免无序事件。这就是我写的策略: 在做了一些研究后,我最终得到了这段代码,但这不起作用。这些是我的问题: 在这里使用ObjectNode
我刚刚开始玩弄《Spring-Cloud-Stream》中的Kafka活页夹。 我配置了一个简单的消费者: 但当我启动应用程序时,我看到在启动日志中创建了三个独立的消费者配置: 我发现这些配置之间唯一不同的是客户机。id。 除此之外,我不知道为什么只有一个消费者有三种配置。 是因为我也在运行吗? 这是我的:
我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重
使用Spring云流上通道的标准配置,消息重试3次,然后跳过。如果以下消息处理成功完成,则提交偏移量。这意味着在瞬态异常情况下,消息可能会丢失。 是否可以更改此行为,从而使通道卡在失败消息上,直到修复瞬态条件? 我已经尝试配置重试模板,但您必须指定多次重试,这看起来像是一个无用的参数,因为所需的行为将永远重试。 有人卷入这些麻烦吗?非常感谢。 我还怀疑这会如何干扰max.poll。间隔ms属性。
我想利用Kafka 0.11中引入的幂等生产者。根据这篇融合的博客文章,添加了一个新属性来支持这一点: 幂等性:每个分区仅一次有序语义 要启用此功能并在每个分区中准确获取一次语义,即没有重复,没有数据丢失,并且为了语义,请将生产者配置为设置“enable.idemponence=true”。 这一点既不是Spring Cloud Stream,也不是Spring Kafka文档对该属性的使用。我们
我有一个Spring boot应用程序,我使用spring-Cloud-stream从一个kafka主题中消费,进行一些处理并发布到另一个kafka主题。该应用程序运行良好,我已经编写了运行良好的单元测试(使用TestBinder)。 我现在正试图用嵌入式Kafka编写一个集成测试,并测试端到端的功能。我在这里跟踪了样本https://github.com/spring-cloud/spring-
当尝试运行稍微修改过的word count示例版本时,我遇到了一个错误,即“没有符合条件的'org.apache.kafka.streams.kstream.KStreamBuilder'类型的bean”。在我的POM中,我使用了Spring-Cloud-Stream依赖项:Elmhurst。M3导入依赖项,其中导入了spring cloud stream绑定器kstream:2.0.0。立方米。
如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!
我想使用ErrorHandlingDeserializer来处理反序列化错误,并使用spring cloud stream binder kafka将错误记录发送到kafka binder的默认DLQ主题。
我们需要从一个主题中获取消息,然后进行一些扩展,然后将消息发布到另一个主题。以下是活动 消费者-消费信息 丰富-丰富了消费的信息 制作人-将丰富的信息发布到其他主题 我正在使用Spring Cloud kafka绑定器,一切正常。最近我们引入了幂等生产者并包含transactionIdPrefix属性,我们观察到出站通道开始在主题中发送2条消息,因为它应该只发送一条消息。一条具有实际json值的消
我正在与Kafka和阿帕奇·Flink合作。我正在尝试使用apache Flink中的一个kafka主题中的记录(这些记录是avro格式的)。下面是我正在尝试的一段代码。 使用自定义反序列化器来反序列化主题中的avro记录。 我发送到主题“test-topic”的数据的Avro模式如下所示。 我正在使用的自定义反序列化器如下所示。 我的flink应用程序就是这样写的。 我得到的输出是{“name”
我正在尝试对Kafka消息流进行流处理和CEP。为此,我选择Apache Ignite首先实现一个原型。但是,我无法连接到队列: 使用KAFKA2.11-0.10.1.0 apache-ignite-fabric-1.8.0-bin Kafka工作正常,我用一个消费者测试了它。然后启动ignite,然后在spring boot命令行应用程序中运行following。 当应用程序启动时,我得到 20
我使用的是和连接器jar版本为0.10.2,kafka版本为0.9.1,flink版本为1.0.0。 当我在IDE中作为独立的主程序运行Java消费者时,它工作得很好。但是当我从运行它时,我不会看到正在使用的消息,也不会看到中JobManager的stdout中的任何日志。请告诉我可能有什么问题。
我试图使用pyspark将每日批次的数据发送到Kafka主题,但我当前收到以下错误: Traceback(最近的最后一次调用): File", line 5, in File"/usr/local/rms/lib/hdp26_c5000/park2/python/pyspark/sql/readwriter.py", line 548, in保存自己。_jwrite.save()File"/usr
我创建了一个有2个节点的kubernetes集群(GCP GKE)。我为druid(端口2181)设置了zookeeper我工作得很好,但是,我想在同一个集群中部署kafka pod。所以我使用helm,但是当我在脚本values.yaml末尾更改端口时,我运行helm upgrade如下所示 并使用