当前位置: 首页 > 知识库问答 >
问题:

消息集线器上的Kafka Streams KTable配置错误

卫骏
2023-03-14

此问题现在已在Message Hub上解决

我在Kafka创建一个KTable时遇到了一些麻烦。我对Kafka是初来乍到的,这大概是我问题的根源,但我原以为无论如何都可以问到这里。我有一个项目,在那里我想通过统计它们的总数来跟踪不同的ID。我正在使用IBM Cloud上的Message Hub来管理我的主题,到目前为止它工作得非常出色。

我有一个关于Message Hub的主题,它会生成诸如{“id”:“123”、“timestamp”:“1525339553”、“balance”:“100”、“amount”:“4”}之类的消息,目前,唯一相关的关键是id。

我的Kafka代码以及Streams配置如下所示:

import org.apache.kafka.streams.StreamsConfig;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");    
props.put("security.protocol","SASL_SSL");
props.put("sasl.mechanism","PLAIN");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
saslJaasConfig = saslJaasConfig.replace("USERNAME", user).replace("PASSWORD", password);
props.put("sasl.jaas.config",saslJaasConfig);

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> Kstreams = builder.stream(myTopic);

KTable<String, Long> eventCount = Kstreams
        .flatMapValues(value -> getID(value)) //function that retrieves the ID
        .groupBy((key, value) -> value)
        .count();

当我运行代码时,我得到以下错误:

线程“KTableTest-E2062D11-0B30-4ED0-82B0-00D83DCD9366->StreamThread-1”org.apache.kafka.streams.errors.StreamsException中出现异常:无法创建主题KTableTest-KStream-Aggregate-State-Store-0000000003-Repartition。

紧随其后的是:

原因:java.util.concurrent.ExecutionException:org.apache.kafka.common.errors.PolicyViolationException:无效配置:{segment.index.bytes=52428800,segment.bytes=52428800,cleanup.policy=delete,segment.ms=600000}。仅允许得配置:[Retention.ms,Cleanup.Policy]

我不知道为什么会出现这个错误,也不知道该怎么做。我构建KStream和KTable的方法是否不正确?或者是Bluemix上的消息集线器?

已解决:

在我标记为正确的答案下面添加一个注释摘录。结果显示我的StreamsConfig很好,而且(目前)Message Hub端存在一个问题,但有一个解决方法:

结果表明,Message Hub在使用Kafka Streams1.1创建重新分区主题时存在问题。当我们进行修复时,您需要手工创建主题KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition。它需要与您的输入主题(myTopic)一样多的分区,并将保留时间设置为最大。修复后我会再发表评论

非常感谢你的帮助!

共有1个答案

祖奇
2023-03-14

Message Hub对创建主题时可以使用的配置有一些限制。

从您收到的PolicyViolationException来看,您的Streams应用程序似乎试图使用一些我们不允许的配置:

  • segment.index.bytes
  • segment.bytes
  • segment.ms
 类似资料:
  • 在使用MessageHub进行开发时,我经常会发现我想从一个主题中清除我的开发数据。 如何清除MessageHub主题?

  • 我正试图让地图在我的Json适配器中工作。但不知何故,我总是收到错误信息: JAVAlang.NullPointerException:尝试调用虚拟方法“void”。所容纳之物上下文空对象引用上的startActivity(android.content.Intent) 这是我的第二个Android项目,我找不到我的错误。我希望你们中的一些人能帮助我。

  • 我在处理spring集成流中的错误时遇到了一个问题。流程的工作方式如下:我的入口点是消息驱动的通道适配器->路由器->过滤器->转换器->服务激活器->数据库。我已经使用自定义消息侦听器容器编写了自己的错误处理程序,它的工作方式与预期一致,但是当我出现异常时,我需要将原始消息保存到数据库中。 问题是,当我从数据库中获得异常时,错误处理策略会重新使用一个MessageHandleException,

  • 问题内容: 当选择menuItem时,尝试关闭当前场景并打开另一个场景时出现问题。我的主要阶段编码如下: 执行该程序后,它将转到cartHomePage.fxml。选择菜单项后,我可以从那里选择创建产品或创建类别。这是我的动作事件: 但是,我只能切换一次舞台。例如,我的默认页面是cartHomePage.fxml。运行程序时,首先要创建产品阶段。在那之后,我不能再去任何地方了。错误消息是: 我关上

  • 我们的一个Kafka主题有一个问题,该主题被&组合使用,这里用工厂使用的描述。不幸的是,有人有点热情,并发表了一些无效的消息到主题。看来斯普林-Kafka没有处理这些信息中的第一个。有可能让spring-kafka记录一个错误并继续吗?查看记录的错误消息,似乎Apache kafka-clients库应该处理这样的情况:在迭代一批消息时,其中一个或多个消息可能无法解析? 下面的代码是说明这个问题的

  • 微信消息模板重置 1.登录微信公众号 2.找到商城后台微信消息模板重置的地方,点重置即可