当前位置: 首页 > 知识库问答 >
问题:

为什么我可以读取ksqldb流而不能读取ksql客户端中的主题?

楚流觞
2023-03-14

我正在最新版本(汇流5.5.1)中的AWS EC2实例上测试ksqldb,遇到了一个无法解决的访问问题。

我有一个安全的Kafka服务器(SASL_SSSL,SASL模式为普通),一个不安全的模式注册表(Avro序列化器的另一个问题,但暂时还可以),以及一个安全的KSQL服务器和客户机。

  • 主题可以用来自JDBC源连接器的AVRO数据(只有值,没有键)正确填充。
  • 我可以使用KSQL访问KSQL服务器而没有任何问题
  • 我可以访问KSQL REST API
  • 在ksql中列出主题时,将得到正确的列表。
  • 当我选择一个推送流时,当我将一些内容推入主题时(在我的例子中是使用Kafka Connect)会收到消息。
  • 但是:当我调用“打印主题”时,在客户端中会出现大约60秒的块,然后是“获取主题元数据时超时过期”。

ksql-kafka.log会出现大量重复的条目,例如

[2020-09-02 18:52:46,246] WARN [Consumer clientId=consumer-2, groupId=null] Bootstrap broker ip-10-1-2-10.eu-central-1.compute.internal:9093 (id: -3 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1037)

相应的broker日志显示

Sep  2 18:52:44 ip-10-1-6-11 kafka-server-start: [2020-09-02 18:52:44,704] INFO [SocketServer brokerId=1002] Failed authentication with ip-10-1-2-231.eu-central-1.compute.internal/10.1.2.231 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
ksql.service.id= hf_kafka_ksql_001
bootstrap.servers=ip-10-1-11-229.eu-central-1.compute.internal:9093,ip-10-1-6-11.eu-central-1.compute.internal:9093,ip-10-1-2-10.eu-central-1.compute.internal:9093
ksql.streams.state.dir=/var/data/ksqldb
ksql.schema.registry.url=http://ip-10-1-1-22.eu-central-1.compute.internal:8081
ksql.output.topic.name.prefix=ksql-interactive-
ksql.internal.topic.replicas=3
confluent.support.metrics.enable=false

# currently the keystore contains only the ksql server and the certificate chain to the CA
ssl.keystore.location=/var/kafka-ssl/ksql.keystore.jks
ssl.keystore.password=kspassword
ssl.key.password=kspassword
ssl.client.auth=true
# Need to set this to empty, otherwise the REST API is not accessible with the client key.
ssl.endpoint.identification.algorithm=

# currently the truststore contains only the CA certificate
ssl.truststore.location=/var/kafka-ssl/client.truststore.jks
ssl.truststore.password=ctpassword

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="ksql" \
    password="ksqlsecret";
listeners=https://0.0.0.0:8088
advertised.listener=https://ip-10-1-2-231.eu-central-1.compute.internal:8088

authentication.method=BASIC
authentication.roles=admin,ksql,cli
authentication.realm=KsqlServerProps

# authentication for producers, needed for ksql commands like "Create Stream"
producer.ssl.endpoint.identification.algorithm=HTTPS
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.ssl.truststore.location=/var/kafka-ssl/client.truststore.jks
producer.ssl.truststore.password=ctpassword
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="ksql" \
    password="ksqlsecret";

# authentication for consumers, needed for ksql commands like "Create Stream"
consumer.ssl.endpoint.identification.algorithm=HTTPS
consumer.security.protocol=SASL_SSL
consumer.ssl.truststore.location=/var/kafka-ssl/client.truststore.jks
consumer.ssl.truststore.password=ctpassword
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="ksql" \
    password="ksqlsecret";

我使用

ksql --user cli --password test --config-file /var/kafka-ssl/ksql_cli.properties https://ip-10-1-2-231.eu-central-1.compute.internal:8088'

这是我的ksql客户端配置ksql_cli.properties:

security.protocol=SSL
#ssl.client.auth=true
ssl.truststore.location=/var/kafka-ssl/client.truststore.jks
ssl.truststore.password=ctpassword
ssl.keystore.location=/var/kafka-ssl/ksql.keystore.jks
ssl.keystore.password=kspassword
ssl.key.password=kspassword

JAAS配置,在服务启动时作为参数包含

KsqlServerProps {
  org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
  file="/var/kafka-ssl/cli.password"
  debug="false";
};
ksql --user cli --password test --config-file /var/kafka-ssl/ksql_cli.properties https://ip-10-1-2-231.eu-central-1.compute.internal:8088'

有人找到解决这个问题的办法了吗?我真的很有想法。多谢了。

共有1个答案

张勇
2023-03-14

找到了!这很容易被忽略--当然是客户机的配置需求。SASL设置...

security.protocol=SASL_SSL
 类似资料:
  • 我是完全新的Web API的,并试图建立一个Android应用程序观看谷歌联系人。我被Oauth的第一步卡住了。我已经完成了登录按钮和授权。按钮工作正常,我可以登录我的谷歌账户,但当我试图从这个账户获得一些东西时,日志猫打印了这个按摩“I/System_Server:oneway函数结果将被删除,但以状态OK和包裹大小结束”。为什么会这样?因为auth令牌不再有效?如何恢复权限?我应该如何从帐户获

  • 首先,非常感谢所有愿意帮忙的人! 如果需要,可以跳过下面的文字墙。 出身背景 我正在尝试为我自己的minecraft服务器网络制作一个代理程序,我希望使用多个服务器,让人们在单独的服务器上发挥创造力和生存(以减少服务器负载并增加我的容量)。我计划使用传输插件的东西的服务器端,但我想防止不得不安装一个clientpatch。 我发现了bungeecort,但是它没有做我需要它做的事情(即,锻造对我计

  • 我遇到了这个问题,我正在使用请求,然后它正在清除该请求。因此,当我在过滤器链中向前传递它时,它以空值的形式传递请求。 我能够通过以下类似的解决方案来解决这个问题: 如何在Spring'HandlerMachodArgumentResolver'中多次读取请求正文? 我的问题是,为什么不允许我们多次阅读该请求?

  • 问题内容: 我正在使用go-hdf5将hdf5文件读取到golang中。我在Windows7上使用的是mingw和hdf5 1.8.14_x86的最新副本,似乎尝试使用任何预定义的类型都行不通,让我们集中关注例如T_NATIVE_UINT64。我已将问题简化为以下内容,基本上将go- hdf5排除在了问题之外,而指出了一些根本性的错误: 您显然需要hdf5,并将编译器指向标头/ dll,然后运行g

  • 我从教程中创建了示例Kafka Streams应用程序: 不幸的是,这个应用程序不读取输入流。我有一个来自PostgreSQL的JDBC源连接器,它正在处理来自一个数据库的精细流数据(我可以在本主题中的Kafka Connect UI数据上看到)。 我的问题是,即使我在BOOTSTRAP\u SERVERS\u CONFIG的Properties IP is localhost中更改了IP,我也不

  • 我有以下情况:我使用Kafka模板,以便与Kafka交互,但我希望能够以编程方式防止Kafka客户端从某个主题读取消息,但仍然能够产生到其他主题。有没有办法实现这一点?