当前位置: 首页 > 知识库问答 >
问题:

Kafka流异常:GroupAuthorizationException

魏俊茂
2023-03-14

我正在开发一个Kafka-Stream应用程序,它将从输入Kafka主题读取消息,过滤不需要的数据并推送到输出Kafka主题。

Kafka流配置:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {

    Map<String, Object> streamsConfiguration = new HashMap<>();
    streamsConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "QC-NormalizedEventProcessor-v1.0.0");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9072");
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), -1);
    streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaConsumerProperties.getConsumerJKSFileLocation());
    streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConsumerProperties.getConsumerJKSPwd());
    streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    streamsConfiguration.put(SASL_MECHANISM, "PLAIN");

    return new KafkaStreamsConfiguration(streamsConfiguration);
}

KStream筛选器逻辑:

@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {

    KStream<String, String> stream = builder.stream(kafkaConsumerProperties.getConsumerTopic(), Consumed.with(Serdes.String(), Serdes.String()));
    /** Printing the source message */
    stream.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " *****Message From Input Topic: " + key + ": " + value));
    KStream<String, String> filteredDocument = stream.filter((k, v) -> filterCondition.test(k, v));

    filteredDocument.to(kafkaConsumerProperties.getProducerTopic(), Produced.with(Serdes.String(), Serdes.String()));
    /** After filtering printing the same message */
    filteredDocument.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " #####Filtered Document: " + key + ": " + value));
    return stream;
}

当开始以上spring的Kafka流应用程序,我得到以下例外。

2019-05-27T07:58:36.018-0500 ERROR stream-thread [QC-NormalizedEventProcessor-v1.0.0-e9cb1bed-3d90-41f1-957a-4fc7efc12a02-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: QC-NormalizedEventProcessor-v1.0.0

我们的Kafka Infra团队给了“group.id”必要的权限,使用这个相同的“group.id”,我可以使用其他Kafka消费者应用程序来使用消息,我在“application.id”中按照我的意愿使用了name。我们不是在Kafka访问控制列表中添加/更新“application.id”。

我真的不确定我们需要为“application.id”授予任何权限,或者在Kafka流配置中缺少了什么。请指教。

请注意:我尝试过在Kafka流配置中使用with“group.id”和不使用“group.id”,但我总是遇到同样的异常。

谢谢!巴拉蒂拉贾·尚穆甘姆

共有1个答案

须景辉
2023-03-14

我不在办公桌前,但我认为Streams将group.id设置为application.id。

 类似资料:
  • 曾发表过多篇文章,但大多数都与处理错误消息有关,而不是处理过程中的异常处理。 我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因造成的,如网络故障、RuntimeException等。, 有人能提出正确的方法吗?我应该使用setUncaughtExceptionHandler吗?还是有更好的方法

  • 问题内容: 曾经经历过多个帖子,但是其中大多数都是相关的处理错误消息,与处理它们时的异常处理无关。 我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?该异常可能是由于多种原因造成的,例如网络故障,RuntimeException等, 有人可以建议正确的做法吗?我应该使用 吗?或者,还有更好的方法? 如何处理重试? 问题答案: 这取决于您要如何处理生产者方面的异常。如果将对生产者抛出异

  • 我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。

  • 我正在努力定制我的spring kafka streams应用程序。我一直试图在我的KStreams上配置处理未捕获(运行时异常)。 参考文档https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_st

  • 我们的应用程序在streams代码中间歇性地遇到无序异常。这会导致流线程停止。 实现很简单,2个KStreams连接并输出到另一个主题。 在寻找这个无序顺序异常的解决方案时,我在Confluent上找到了以下文档 https://docs.confluent.io/current/streams/concepts.html#out-of-order-handling 但找不到这里提到的设置、配置或

  • 我以json字符串的形式生成输入数据。 对于主题-myinput 我的班级是这样的: 我得到下面类铸造异常: 线程“countries-streaming-analysis-app-f7f95119-4401-4a6e-8060-5A138FFADB2-StreamThread-1”组织中的异常。阿帕奇。Kafka。溪流。错误。异常:流程中捕获到异常。taskId=0_0,processor=KS