当前位置: 首页 > 知识库问答 >
问题:

Kafka流-内部主题的ACL

松国兴
2023-03-14

我正在尝试设置一个安全的Kafka集群,但在ACL方面遇到了一些困难。

Kafka流的汇流安全指南(https://docs.Confluent.io/current/Streams/developer-guide/html" target="_blank">security.html)只说明必须将集群创建ACL交给主体...但它没有说任何关于如何实际处理内部话题的内容。

通过研究和实验,我确定(对于Kafka版本1.0.0):

  1. 通配符不能与文本一起用于ACL中的主题名称。例如,由于所有内部主题都以应用程序id为前缀,所以我首先想到的是将acl应用于匹配' -*'的主题。这不起作用。
  2. 由Streams API创建的主题不会自动获得授予创建者的读/写访问权限。

内部主题的确切名称是否可预测和一致?换句话说,如果我在开发服务器上运行我的应用程序,当运行时,是否会在生产服务器上创建完全相同的主题?如果是这样,那么我可以在部署之前添加从dev派生的ACL。如果没有,应如何添加ACL?

共有1个答案

郑博厚
2023-03-14

内部主题的确切名称是否可预测和一致?换句话说,如果我在开发服务器上运行我的应用程序,当运行时,是否会在生产服务器上创建完全相同的主题?

是的,您将从一个运行到另一个运行获得完全相同的主题名称。DSL生成具有如下功能的处理器名称:

public String newProcessorName(final String prefix) {
    return prefix + String.format("%010d", index.getAndIncrement());
}

(其中index只是一个递增整数)。然后使用这些处理器名称创建重分区主题,其函数如下所示(参数name是如上所述生成的处理器名称):

static <K1, V1> String createReparitionedSource(final InternalStreamsBuilder builder,
                                                final Serde<K1> keySerde,
                                                final Serde<V1> valSerde,
                                                final String topicNamePrefix,
                                                final String name) {
    Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
    Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
    Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
    Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
    String baseName = topicNamePrefix != null ? topicNamePrefix : name;

    String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
    String sinkName = builder.newProcessorName(SINK_NAME);
    String filterName = builder.newProcessorName(FILTER_NAME);
    String sourceName = builder.newProcessorName(SOURCE_NAME);

    builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
    builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
        @Override
        public boolean test(final K1 key, final V1 value) {
            return key != null;
        }
    }, false), name);

    builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer,
        null, filterName);
    builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(),
        keyDeserializer, valDeserializer, repartitionTopic);

    return sourceName;
}

我没有使用过ACL,但我想既然这些只是常规主题,那么是的,您可以对它们应用ACL。《安全指南》确实提到:

在安全的Kafka群集上运行应用程序时,运行应用程序的主体必须设置ACL-Cluster--操作Create,以便应用程序具有创建内部主题的权限。

我自己也一直在想这个问题,所以如果我错了,我猜Confluent的人会纠正我的。

 类似资料:
  • Java:OpenJdk 11Kafka:2.2.0Kafka流库:2.3.0 我试图在docker容器中部署我的Kafka streams应用程序,但在尝试创建带有TopicAuthorizationException的内部状态存储时失败。它在本地运行良好。本地和服务器上的主要区别在于,它连接到部署了Kafka的服务器,并使用常见的Kerberos身份验证进行身份验证。我无法理解身份验证与本地商

  • 其中一个Kafka流应用程序在Kafka代理和消费者端产生了大量未知生产者ID错误。 流配置如下: 消费者方面的错误: 这背后的原因是什么?

  • 我在Kafka Streams拓扑工作,有时,在更改应用程序ID和/或clientId属性后,我在特定的kafka流上收到错误:“”。我已经在每个Kafka节点的server.properties中设置了属性,但似乎没有创建此流的主题。 这是我的Kafka Streams拓扑:

  • 只是关于Kafka的后续问题-未压缩主题与压缩主题 正如那里所说, 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从未压缩的主题构建的。 作为最佳实践,关于未压缩主题的语义,是否应禁用要在日志启用程序中取消压缩的主题,以便不会发生压缩(清理),其属性如下: 日志清洁工enable=false或log。清洁工启用=true(默认),清除策略为“delete”(默认)

  • 在我的公司,我们广泛使用Kafka,但出于容错的原因,我们一直使用关系数据库来存储几个中间转换和聚合的结果。现在我们正在探索Kafka流作为一种更自然的方法来做到这一点。通常,我们的需求很简单--其中一个例子是 监听的输入队列 对每个记录执行一些高延迟操作(调用远程服务) 如果在处理时,都已生成,那么我应该处理V3,因为V2已经过时了 为了实现这一点,我将主题作为阅读。代码如下所示 这是预期的,但

  • 我有一个kafka流应用程序,它在其中使用stateStore(由RocksDB支持)。 stream thread所做的只是从kafka主题获取数据并将数据放入State Store。(还有其他线程从statestore读取数据并进行业务逻辑处理)。 我观察到它创造了一个新的Kafka主题“变化日志”,因为Statestore。 但我没有明白“变化”Kafka话题有什么用处? 为什么需要它(更改