当前位置: 首页 > 知识库问答 >
问题:

Strimzi-连接外部客户端

端木鹏
2023-03-14

按照这里的讨论,我使用以下步骤使外部客户端(基于 kafkajs)连接到 OpenShift 上的 Strimzi。这些步骤从这里开始。

kafka-persistent-single.yaml被编辑为如下所示。

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 2.3.0
    replicas: 1
    listeners:
      plain: {}
      tls: {}
      external:
          type: route
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.3"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: false
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 5Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

要提取证书并在客户端中使用它,我运行了以下命令:

kubectl get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -D > ca.crt

请注意,我必须在我的macOS上使用Bas64-D,而不是Bas64-d,如留档所示。

这是从他们的 npm 页面和他们的文档改编的客户端。

const fs = require('fs')
const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io'],
  ssl : { rejectUnauthorized: false,
    ca : [fs.readFileSync('ca.crt', 'utf-8')]
  }
})

const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {
  // Producing
  await producer.connect()
  await producer.send({
    topic: 'test-topic',
    messages: [
      { value: 'Hello KafkaJS user!' },
    ],
  })

  // Consuming
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

run().catch(console.error)

当我从具有ca.crt的文件夹运行节点sample.js时,我收到一条连接拒绝消息。

{"level":"ERROR","timestamp":"2019-10-05T03:22:40.491Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.99.100:9094","broker":"my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io:9094","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.99.100:9094\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1113:14)"}

我错过了什么?

共有2个答案

周通
2023-03-14

经过与@ppatierno长时间的讨论,我觉得 Strimzi 集群与 Kafka 控制台客户端配合得很好。另一方面,kafkajs 软件包不断失败,NOT_LEADER_FOR_PARTITION

更新 Python 客户端似乎毫不费力地工作;所以,我放弃了卡夫卡伊

程景胜
2023-03-14

我猜问题是您在代理地址上缺少正确的端口443,所以您必须使用

经纪人:['我的-集群-Kafka-引导-消息-OS . 192 . 168 . 99 . 100 . nip . io:443 ']

否则它将尝试连接到OpenShift路由上的默认端口80。

 类似资料:
  • 我有一个mongo db实例在linux系统中运行,我正试图从windows系统连接到它。当我启用身份验证时,我会不断获得 数据库“管理员”的凭据无效... 我的用户设置看起来像 显示用户{“_id”:“test.myuser”,“user”:“myuser”,“db”:“admin”,“roles”:[{“role”:“readWrite”,“db”:“admin”},{“role”:“user

  • 我如何用C#将消息发布到外部托管的MQTT代理(在我的例子中是VirtualBox--在openwrt之上)?

  • 在Openshift上部署了Hazelcast映像,我已经创建了一个路由,但仍然无法从外部Java客户端连接到它。我开始知道路由仅适用于 HTTP 或 HTTPS 服务,所以我在这里错过了什么,或者我必须做什么才能将该 Hazelcast 实例暴露给外部世界? 并且为黑兹尔卡斯特创建了Docker映像,并且它在映像内部Hazelcast.jar运行,这与我面临的问题有关吗? 我尝试通过运行命令oc

  • 在Netty中创建客户端连接时,我有一个问题。 这里,为什么我们没有一个bind方法,将通道绑定到发起客户端连接的端口(在客户端)?我们唯一需要提供的就是给出服务器地址和端口如下: 这是在客户端还是服务器端创建了一个新的通道?此通道绑定在客户端的哪个端口? 我们在执行服务器端引导时进行绑定,如下所示 我很困惑,不明白客户端从哪个端口向服务器发送数据,使用的是什么通道?

  • 我开始玩Quarkus和它的REST客户端。根据文档,应该创建一个Jax-RS带注释的接口,并用@RegisterRestClient进一步注释。 我的问题是,在服务器提供的一个工件中,我已经有了我需要连接的服务的JaxRS接口,我可以直接导入它。有没有办法使用已经创建的外部Jax-RS接口来创建服务?复制粘贴代码来获得一个完美的界面似乎是错误的,因为它已经很好地为我服务了。

  • 我创建了一个SpringBoot的示例项目,以了解外部客户机功能,运行时会出现以下错误。 com.example.demo.RestClient中的字段Remote teCallClient需要一个类型为com.example.demo.Remote teCallClient的bean,但找不到。操作:考虑在配置中定义类型为com.example.demo.远程呼叫客户端的bean。 我尝试了各种