当前位置: 首页 > 知识库问答 >
问题:

春云数据流输入源Kafka

锺英卫
2023-03-14

我有一个常见的任务问题,我可以找到任何解决方案或帮助(也许我需要传递一些属性来工作?)我使用本地服务器1.3.0.M2并创建简单的流

dataflow:>stream create --name test --definition ":bosstds > log" --deploy

在日志中,我得到了这个:

2017-09-28 12:31:00.644 信息 5156 --- [ -C-1] o.. a.k.c.c.internals.AbstractCoordinator : 成功加入第 1 代的组测试 2017-09-28 12:31:00.646 信息 5156 --- [ -C-1] o.a.k.c.c.internals.ConsumerCoordinator : 为组测试设置新分配的分区 [bosstds-0] 2017-09-28 12:31:00.671 信息 5156 --- [ -C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$3 : 分配的分区:[bosstds-0] 2017-09-28 12:37:08.898 错误 5156 --- [ -L-1] o.s.c.s.b.k.KafkaMessageChannelBinder : 可以 不转换消息: 74657374 java.lang.StringIndexOutOfBoundsException: 字符串索引超出范围: 103 at java.lang.String.checkBounds(String.java:385) ~[na:1.8.0_144] at java.lang.String.(字符串.java:425) ~[na:1.8.0_144] at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.oldExtractHeaders(EmbeddedHeaderUtils.java:154) ~[spring-cloud-stream-1.3.0.M2.jar!/:1.3.0.M2] at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.extractHeaders(EmbeddedHeaderUtils.java:115) ~[spring-cloud-stream-1.3.0.M2.jar!/:1.3.0.M2]

消息由kafka控制台生成器生成。sh--代理列表localhost:9902--主题bosstds并简单地发送行“test”

有什么建议吗?

共有2个答案

施越彬
2023-03-14

tnx求助。我用以下方法解决了这个问题:

 --spring.cloud.stream.bindings.input.content-type=text/plain
 --spring.cloud.stream.bindings.input.consumer.headerMode=raw
卢光远
2023-03-14

SCS在kafka上嵌入头,以便将这个工作集头模式设置为raw。当与不使用SCS的外部应用程序交互时,您需要这样做

 类似资料:
  • 我一直在尝试从 kafka 流式传输我的 json 事件,将其展平,然后使用 Spring Cloud 流将其推送回另一个主题。 输入: 压平工艺: 仅产生: 我的问题是怎么让它变成这样 所以我可以像我所做的那样推回残缺的 JSONObject 而不是单个 JSONArray? 尽管如此,Spring Cloud Stream输出只是一个单独的事件,不适合我上面的案例,无法为Kafka生成3个事件

  • Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?

  • 我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,

  • 我正在遵循入门指南[1],但是我已经从配置设置中删除了MySQL和analytics的内容,因为我不打算使用任何分析函数。但是,scdf服务后来崩溃了,因为没有配置数据源。 好的,所以似乎仍然需要在scdf-config-kafka.yml[2]中配置数据源(尽管从阅读文档来看,我认为它只用于分析内容)。 但为了什么?数据源用于持久化Kafka消息,还是在节点之间建立云流消息? 我找不到任何关于大

  • 我正在从Spring XD迁移到Spring Cloud Data Flow。当我寻找模块列表时,我意识到一些源码没有在Spring Cloud Flow中列出--其中一个是Kafka源码。 我的问题是为什么在spring cloud data flow中KAFKA源从标准源列表中删除?

  • 是否可以使用函数()样式,使用多个独立的函数/绑定来实现反应性SCS应用程序?我发现的所有示例总是只注册一个具有默认绑定的函数bean。我想注册多个,每个都有自己的绑定。 传统上,这可以使用来完成,但现在不推荐使用函数支持。