当前位置: 首页 > 知识库问答 >
问题:

Kafka流:内部状态存储的“TopiAuthorizationException:无权访问主题”

蓟辰沛
2023-03-14

Java:OpenJdk 11Kafka:2.2.0Kafka流库:2.3.0

我试图在docker容器中部署我的Kafka streams应用程序,但在尝试创建带有TopicAuthorizationException的内部状态存储时失败。它在本地运行良好。本地和服务器上的主要区别在于,它连接到部署了Kafka的服务器,并使用常见的Kerberos身份验证进行身份验证。我无法理解身份验证与本地商店之间的联系。

我的流如下所示:

StreamsBuilder builder = new StreamsBuilder();

        //We stream from the source topic
        KStream<String, EnrichedMessagePayload> sourceMessagesStream = builder.stream(sourceTopic, Consumed
                .with(Serdes.serdeFrom(String.class), INPUT_SERDE));

        //We group per room and window
        TimeWindowedKStream<String, EnrichedMessagePayload> windowed = sourceMessagesStream
                .groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(windowSize)).grace(Duration.ZERO));

        //We make them a list
        KStream<Windowed<String>, WindowedMessages> grouped = windowed
                .aggregate(WindowedMessages::new,
                        (key, value, aggregate) -> aggregate.add(value),
                        Materialized.with(Serdes.String(), Serdes.serdeFrom(windowSerializer, windowSerializer)))
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream();

        //Filter
        KStream<Windowed<String>, FilterResult> filtered = grouped
                .mapValues((readOnlyKey, value) -> filterWindow(value.getMessages()));

        //Re map to its original form
        KStream<String, OutputPayload> reduced = filtered
                .flatMap((KeyValueMapper<Windowed<String>, WindowedMessages, Iterable<KeyValue<String, OutputPayload>>>) (key, value) -> value
                        .getMessages()
                        .stream().map(payload -> new KeyValue<>(key.key(), payload))
                        .collect(toList()));


        //Target topic
        reduced.to(sinkTopic, Produced
                .with(Serdes.serdeFrom(String.class), SERDE));

        return builder.build();

它接收一个消息流,打开它,聚合每个窗口的所有消息,只保留带有“抑制”的列表的最后一个版本,然后平面映射整个列表以将其转发到另一个主题。

每次我遇到这样的异常:

错误消息为:org.apache.kafka.common.errors.TopicAuthorizationException:无权访问主题:[主题授权失败。]2019-10-09 06:44:03.255 0000 ERROR[filtereer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1][StreamTrapid_r]-流线程[filtereer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1]在处理过程中遇到以下意外的Kafka异常,这通常表示Streams内部错误:-[hread.java:777-live-mage-filtereer-0-1-snapshot-10.1e842f1a-ea60-11e9-9c7d-024298932744] - [] - [] org.apache.kafka.streams.errors.StreamsException:无法创建主题过滤器-KTABLE-SUPPRESS-STATE-STORE-0000000005-更改. atorg.apache.kafka.streams.processor.internals.InternalTopicManager.get(InternalTopicManager.java:212)atorg.apache.kafka.streams.processor.internals.InternalTopicM(AbstractCoordinator.java:544)在org.apache.kafka.clients.consumer.internals.Abstract协调员$JoinGroup响应andler.handle(AbstractCoordinator.java:527)在org. apache. kafka. clientsers.消费者内部。Abstract协调员$协调者响应处理程序。成功(Abstract协调员. java: 978)在org. apache. kafka. clientsers.消费者内部。抽象协调员$协调者响应处理程序。成功(抽象协调员. java: 958)在org. apache. kafka.客户端。消费者内部。请求未来1美元。成功(请求未来. java: 204)在org. apache. kafka.客户端。消费者内部。请求未来.消防成功(请求未来. java: 167)在org. apache. kafka.客户端。消费者内部。请求未来.完成(请求未来. java: 127)在org. apacheStreamThread. run(StreamThread. java: 774)由:org. apache. kafka. public. error引起。TopicAuthorizationException:未授权访问主题:[主题授权失败。]

共有1个答案

缑兴贤
2023-03-14

它不是“身份验证”,而是“授权”。看看你的日志消息,上面写着“无权访问主题”。据我所知,您无权创建内部主题“filterer-KTABLE-SUPPRESS-STATE-STORE-000000000 5-changelog”,该主题支持您的本地SUPPRESS-STATE存储。默认情况下,Kafka流中包含的国有商店由Kafka代理上的一个主题提供支持。此内部主题在故障切换期间用于恢复本地状态存储。这些内部主题由Kafka Streams应用程序自动创建,因此应用程序需要具有适当的权限才能创建它们。

有关更多信息,请参阅https://kafka.apache.org/23/documentation/streams/developer-guide/security.html#id1。它说“运行应用程序的主体必须设置ACL,以便应用程序具有创建、读取和写入内部主题的权限。”

 类似资料:
  • 我已经阅读了有状态流处理概述,如果理解正确的话,RocksDB被用作键值存储的默认实现的主要原因之一是这样一个事实,即与内存中的集合不同,它可以处理大于可用内存的数据,因为它可以刷新到磁盘。这两种类型的存储都可以在应用程序重新启动时幸存下来,因为数据是作为Kafka主题备份的。 但还有其他不同吗?例如,我注意到我的持久状态存储为每个主题分区创建了一些。log文件,但它们都是空的。 简而言之,我想知

  • 我正在开发使用Spring Cloud Stream构建的Kafka Streams应用程序。在这个应用程序中,我需要: 使用可在以后检索的连续消息流。 保留与某些条件匹配的邮件 ID 列表。 在单独的线程中,运行一个计划程序,该计划程序定期读出消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。 从列表中删除已处理的消息 ID,以便不重复工作。 我已考虑如下实施: < li >将

  • 我试图找到所有的mp3文件在存储,但我的应用程序崩溃,我使用logcat看看为什么和问题的权限访问外部存储,但我已经把权限在清单,我不知道为什么不断崩溃。第一个错误: 03-06 14:55:24.553:E/dex2OAT(2796):创建oat文件失败:/data/dalvik-cache/x86/data@app@felipe.cursoandroid.com.musicbox-2@spli

  • 我正在尝试设置一个安全的Kafka集群,但在ACL方面遇到了一些困难。 Kafka流的汇流安全指南(https://docs.Confluent.io/current/Streams/developer-guide/security.html)只说明必须将集群创建ACL交给主体...但它没有说任何关于如何实际处理内部话题的内容。 通过研究和实验,我确定(对于Kafka版本1.0.0): 通配符不能

  • 我们正在使用带有Spring Cloud Stream函数的Kafka Streams。我们有一个典型的示例应用程序,它将用户单击与用户区域结合在一起。 我们知道,在定义拓扑结构时,我们可以通过使用适当的方法来强制内部变更日志或重新分区主题的自定义名称,这些方法接受物化存储的名称: 但是对于输入的KTable,我们不能更改它的状态存储内部主题名称,我们总是得到这个主题名称: 如果我们完全按照代码创