我正在开发一个Kafka-Stream应用程序,它将从输入Kafka主题读取消息,过滤不需要的数据并推送到输出Kafka主题。
Kafka流配置:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> streamsConfiguration = new HashMap<>();
streamsConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "QC-NormalizedEventProcessor-v1.0.0");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9072");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), -1);
streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaConsumerProperties.getConsumerJKSFileLocation());
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConsumerProperties.getConsumerJKSPwd());
streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
streamsConfiguration.put(SASL_MECHANISM, "PLAIN");
return new KafkaStreamsConfiguration(streamsConfiguration);
}
KStream筛选器逻辑:
@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream(kafkaConsumerProperties.getConsumerTopic(), Consumed.with(Serdes.String(), Serdes.String()));
/** Printing the source message */
stream.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " *****Message From Input Topic: " + key + ": " + value));
KStream<String, String> filteredDocument = stream.filter((k, v) -> filterCondition.test(k, v));
filteredDocument.to(kafkaConsumerProperties.getProducerTopic(), Produced.with(Serdes.String(), Serdes.String()));
/** After filtering printing the same message */
filteredDocument.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " #####Filtered Document: " + key + ": " + value));
return stream;
}
当开始以上spring的Kafka流应用程序,我得到以下例外。
2019-05-27T07:58:36.018-0500 ERROR stream-thread [QC-NormalizedEventProcessor-v1.0.0-e9cb1bed-3d90-41f1-957a-4fc7efc12a02-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: QC-NormalizedEventProcessor-v1.0.0
我们的Kafka Infra团队给了“group.id”必要的权限,使用这个相同的“group.id”,我可以使用其他Kafka消费者应用程序来使用消息,我在“application.id”中按照我的意愿使用了name。我们不是在Kafka访问控制列表中添加/更新“application.id”。
我真的不确定我们需要为“application.id”授予任何权限,或者在Kafka流配置中缺少了什么。请指教。
请注意:我尝试过在Kafka流配置中使用with“group.id”和不使用“group.id”,但我总是遇到同样的异常。
谢谢!巴拉蒂拉贾·尚穆甘姆
我不在办公桌前,但我认为Streams将group.id设置为application.id。
曾发表过多篇文章,但大多数都与处理错误消息有关,而不是处理过程中的异常处理。 我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因造成的,如网络故障、RuntimeException等。, 有人能提出正确的方法吗?我应该使用setUncaughtExceptionHandler吗?还是有更好的方法
问题内容: 曾经经历过多个帖子,但是其中大多数都是相关的处理错误消息,与处理它们时的异常处理无关。 我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?该异常可能是由于多种原因造成的,例如网络故障,RuntimeException等, 有人可以建议正确的做法吗?我应该使用 吗?或者,还有更好的方法? 如何处理重试? 问题答案: 这取决于您要如何处理生产者方面的异常。如果将对生产者抛出异
我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。
我正在努力定制我的spring kafka streams应用程序。我一直试图在我的KStreams上配置处理未捕获(运行时异常)。 参考文档https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_st
我们的应用程序在streams代码中间歇性地遇到无序异常。这会导致流线程停止。 实现很简单,2个KStreams连接并输出到另一个主题。 在寻找这个无序顺序异常的解决方案时,我在Confluent上找到了以下文档 https://docs.confluent.io/current/streams/concepts.html#out-of-order-handling 但找不到这里提到的设置、配置或
我以json字符串的形式生成输入数据。 对于主题-myinput 我的班级是这样的: 我得到下面类铸造异常: 线程“countries-streaming-analysis-app-f7f95119-4401-4a6e-8060-5A138FFADB2-StreamThread-1”组织中的异常。阿帕奇。Kafka。溪流。错误。异常:流程中捕获到异常。taskId=0_0,processor=KS