当前位置: 首页 > 知识库问答 >
问题:

如何在kubernetes中创作简单的Kafka

暴绪
2023-03-14
apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  labels:
    name: kafka
spec:
  ports:
  - port: 9092
    targetPort: 9092
    protocol: TCP
  selector:
    name: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-service
  labels:
    name: zookeeper
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
  - name: follower
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  selector:
    name: zookeeper
  type: LoadBalancer
---
apiVersion: v1
kind: Pod
metadata:
  name: zookeeper
  labels:
    name: zookeeper 
spec:
  containers:
    - name: zookeeper
      image: zookeeper:3.7.0
---
apiVersion: v1
kind: Pod
metadata:
  name: kafka
  labels:
    name: kafka 
spec:
  containers:
    - name: kafka
      image: wurstmeister/kafka:2.13-2.6.0
      imagePullPolicy: "IfNotPresent"
      env:
        - name: KAFKA_ADVERTISED_PORT
          value: "666"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: localhost
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-service:2181
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: INSIDE:PLAINTEXT
        - name: KAFKA_ADVERTISED_LISTENERS
          value: INSIDE://:666
        - name: KAFKA_LISTENERS
          value: INSIDE://:666
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: INSIDE
      ports:
        - containerPort: 9092

简单的java代码:

@Service
public class Consumer {
    @KafkaListener(topics = "new-topic",groupId = "test")
    public void consumeMessage(String message){
        System.out.println("************************");
        System.out.println(message);
        System.out.println("************************");
    }
}

如何创建kafka broker而不需要扩展测试?

共有1个答案

巫新知
2023-03-14

这篇文章对我很有帮助:https://www.confluent.io/blog/kafka-listeners-complinated/

我将yaml文件更改为:

apiVersion: v1
kind: Pod
metadata:
  name: kafka-service
  labels:
    name: kafka-service 
spec:
  containers:
    - name: kafka-service
      image: wurstmeister/kafka:2.13-2.6.0
      imagePullPolicy: "IfNotPresent"
      env:
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-service:2181
        - name: KAFKA_BROKER_ID
          value: "1"

        - name: KAFKA_LISTENERS
          value: IN://:9092,OUT://:9093
        - name: KAFKA_ADVERTISED_LISTENERS
          value: IN://localhost:9092,OUT://kafka-service:9093
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: IN:PLAINTEXT,OUT:PLAINTEXT
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: IN

这意味着kubernetes network内的端口9093和端口9092从主机打开。之后一切正常工作。

 类似资料:
  • 在job.yaml下面用于创建作业。未创建初始化容器。 [root@app]#kubectl版本客户端版本:version.info{Major:“1”,Minor:“15”,GitVersion:“v1.15.5”,GitCommit:“”,GitTreeState:“Clean”,BuildDate:“2019-10-15T19:16:51Z”,GoVersion:“Go1.12.10”,编译

  • 问题内容: 我目前正在学习Java,并且想知道如何以OO方式控制状态。我实现了一个Pong应用程序。如果我想要多个状态,例如游戏性和菜单状态,并且这些状态中的每个状态都必须执行启动,停止和运行,我将如何实现此目标以及如何在这些状态之间进行切换。 我知道我可以简单地添加一个大的switch语句,但是实现这一点的最佳方法是什么? 我希望能够在游戏状态下切换到菜单状态,反之亦然。 问题答案: 您可以使用

  • 我正试图创建一个代理服务器,将请求从客户端传递到第三方网站(比如谷歌)。我的代理只需要将传入请求镜像到目标站点上相应的路径,因此如果我的客户端请求的url为: 应提供以下资源: 以下是我想出来的: 它可以很好地处理html页面,但是对于其他类型的文件,它只是返回一个空白页面或目标站点的一些错误消息(这在不同的站点中有所不同)。

  • 问题内容: 我有很多网址,并且想实现自动补全功能。我不喜欢朴素方法的复杂性,因为它与设置大小成线性关系: 现在我知道在哈希集中,函数“ contains()”在“ O(1)”中有效,但是没有“ containsPrefix()”。是否有一种简单的方法,而无需使用像Lucene这样的大库或自己编写代码?我这样做没有问题,但对于这样一个简单的问题似乎有点过头了,所以我想知道是否存在现有的简单解决方案:

  • 问题内容: 我正在尝试创建一个代理服务器,以将请求从客户端传递到第三方网站(例如google)。我的代理只需将传入请求镜像到目标站点上的相应路径,因此,如果我的客户请求的url为: 应提供以下资源: 这是我想出的: 它适用于html页面,但对于其他类型的文件,它仅返回空白页面或来自目标站点的错误消息(在不同站点中有所不同)。 问题答案: 我认为处理从第三方服务器收到的响应不是一个好主意。这只会增加

  • 问题内容: 我正在寻找与JavaScript相同的效果。 我今天下午使用Twisted.web编写了一个基于Web的简单解释器。您基本上是通过表单提交Python代码块的,客户端来抓取并执行它。我希望能够发出一个简单的弹出消息,而不必每次都重写一堆样板wxPython或TkInter代码(因为该代码通过表单提交然后消失了)。 我尝试过tkMessageBox: 但这会在后台用tk图标打开另一个窗口