按照这里的讨论,我使用以下步骤使外部客户端(基于 kafkajs)连接到 OpenShift 上的 Strimzi。这些步骤从这里开始。
kafka-persistent-single.yaml
被编辑为如下所示。
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 2.3.0
replicas: 1
listeners:
plain: {}
tls: {}
external:
type: route
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
log.message.format.version: "2.3"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 5Gi
deleteClaim: false
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 5Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
要提取证书并在客户端中使用它,我运行了以下命令:
kubectl get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -D > ca.crt
请注意,我必须在我的macOS上使用Bas64-D
,而不是Bas64-d
,如留档所示。
这是从他们的 npm
页面和他们的文档改编的客户端。
const fs = require('fs')
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io'],
ssl : { rejectUnauthorized: false,
ca : [fs.readFileSync('ca.crt', 'utf-8')]
}
})
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })
const run = async () => {
// Producing
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello KafkaJS user!' },
],
})
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
},
})
}
run().catch(console.error)
当我从具有ca.crt
的文件夹运行节点sample.js
时,我收到一条连接拒绝消息。
{"level":"ERROR","timestamp":"2019-10-05T03:22:40.491Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.99.100:9094","broker":"my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io:9094","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.99.100:9094\n at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1113:14)"}
我错过了什么?
经过与@ppatierno长时间的讨论,我觉得 Strimzi 集群与 Kafka 控制台客户端配合得很好。另一方面,kafkajs
软件包不断失败,NOT_LEADER_FOR_PARTITION
。
更新 Python 客户端似乎毫不费力地工作;所以,我放弃了卡夫卡伊
。
我猜问题是您在代理地址上缺少正确的端口443,所以您必须使用
经纪人:['我的-集群-Kafka-引导-消息-OS . 192 . 168 . 99 . 100 . nip . io:443 ']
否则它将尝试连接到OpenShift路由上的默认端口80。
我有一个mongo db实例在linux系统中运行,我正试图从windows系统连接到它。当我启用身份验证时,我会不断获得 数据库“管理员”的凭据无效... 我的用户设置看起来像 显示用户{“_id”:“test.myuser”,“user”:“myuser”,“db”:“admin”,“roles”:[{“role”:“readWrite”,“db”:“admin”},{“role”:“user
我如何用C#将消息发布到外部托管的MQTT代理(在我的例子中是VirtualBox--在openwrt之上)?
在Openshift上部署了Hazelcast映像,我已经创建了一个路由,但仍然无法从外部Java客户端连接到它。我开始知道路由仅适用于 HTTP 或 HTTPS 服务,所以我在这里错过了什么,或者我必须做什么才能将该 Hazelcast 实例暴露给外部世界? 并且为黑兹尔卡斯特创建了Docker映像,并且它在映像内部Hazelcast.jar运行,这与我面临的问题有关吗? 我尝试通过运行命令oc
在Netty中创建客户端连接时,我有一个问题。 这里,为什么我们没有一个bind方法,将通道绑定到发起客户端连接的端口(在客户端)?我们唯一需要提供的就是给出服务器地址和端口如下: 这是在客户端还是服务器端创建了一个新的通道?此通道绑定在客户端的哪个端口? 我们在执行服务器端引导时进行绑定,如下所示 我很困惑,不明白客户端从哪个端口向服务器发送数据,使用的是什么通道?
我开始玩Quarkus和它的REST客户端。根据文档,应该创建一个Jax-RS带注释的接口,并用@RegisterRestClient进一步注释。 我的问题是,在服务器提供的一个工件中,我已经有了我需要连接的服务的JaxRS接口,我可以直接导入它。有没有办法使用已经创建的外部Jax-RS接口来创建服务?复制粘贴代码来获得一个完美的界面似乎是错误的,因为它已经很好地为我服务了。
我创建了一个SpringBoot的示例项目,以了解外部客户机功能,运行时会出现以下错误。 com.example.demo.RestClient中的字段Remote teCallClient需要一个类型为com.example.demo.Remote teCallClient的bean,但找不到。操作:考虑在配置中定义类型为com.example.demo.远程呼叫客户端的bean。 我尝试了各种