当前位置: 首页 > 知识库问答 >
问题:

go - Go连接Kafka的问题?

吕博耘
2023-06-03

首先我使用了
"github.com/confluentinc/confluent-kafka-go/kafka"
这个包来连接Kafka,之后实现了个连接池的功能,代码如下:

package kfk

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "strings"
    "sync"
)

type Pool struct {
    brokers []string
    config  *kafka.ConfigMap
    pool    chan *kafka.Producer
    wg      sync.WaitGroup
}

var (
    KafkaPool *Pool
)

func KfkInit() {
    poolSize := 10
    brokers := []string{"localhost:9092"}
    KafkaPool = NewPool(brokers, poolSize)
}

// 初始化连接池
func NewPool(brokers []string, poolSize int) *Pool {
    config := &kafka.ConfigMap{
        "bootstrap.servers":   strings.Join(brokers, ","),
        "acks":                "all",
        "delivery.timeout.ms": 3000,
    }

    p := &Pool{
        brokers: brokers,
        config:  config,
        pool:    make(chan *kafka.Producer, poolSize),
    }

    for i := 0; i < poolSize; i++ {
        producer, err := p.createProducer()
        if err != nil {
            p.Close()
            panic(err)
        }
        p.pool <- producer
    }

    return p
}

// 创建Producer实例
func (p *Pool) createProducer() (*kafka.Producer, error) {
    producer, err := kafka.NewProducer(p.config)
    if err != nil {
        return nil, err
    }

    return producer, nil
}

// 从连接池中获取Producer实例
func (p *Pool) getProducer() *kafka.Producer {
    return <-p.pool
}

// 将Producer实例放回连接池
func (p *Pool) releaseProducer(producer *kafka.Producer) {
    p.pool <- producer
}

// 发送消息到指定主题
func (p *Pool) SendMessage(topic string, tableName string, message []byte) error {
    producer := p.getProducer()
    defer p.releaseProducer(producer)

    deliveryChan := make(chan kafka.Event)
    err := producer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic:     &topic,
            Partition: kafka.PartitionAny,
        },
        Key:   []byte(tableName),
        Value: message,
    }, deliveryChan)
    if err != nil {
        return err
    }

    e := <-deliveryChan
    m := e.(*kafka.Message)
    if m.TopicPartition.Error != nil {
        return m.TopicPartition.Error
    }

    return nil
}

// 关闭连接池
func (p *Pool) Close() {
    close(p.pool)
    p.wg.Wait()
}

开发完之后,我打算执行交叉编译,编译为Linux下可执行文件,打包脚本如下:(之前使用MQ的时候就没有问题,换成了kafka就报错):

#!/bin/bash

swag init >/dev/null 2>&1
# 根据不同的平台进行交叉编译

echo "$(date +"%Y-%m-%d %H:%M:%S") Success: Swag生成成功,正在执行打包···"

# save the original values of GOOS and GOARCH
GOOS_ORIGINAL=$(go env GOOS)
GOARCH_ORIGINAL=$(go env GOARCH)

# specify the target platform and architecture
platform=${1:-"linux"}
arch=${2:-"amd64"}

# set the environment variables for the target platform and architecture
if [ "$platform" == "linux" ] && [ "$arch" == "amd64" ]; then
    export GOOS=linux
    export GOARCH=amd64
elif [ "$platform" == "windows" ] && [ "$arch" == "amd64" ]; then
    export GOOS=windows
    export GOARCH=amd64
else
    echo "Error: Unsupported platform and architecture combination"
    exit 1
fi

# build the Go binary for the target platform and architecture
go build -o dot_production main_production.go

# check if the binary was successfully built
if [ $? -eq 0 ]; then
    echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 生产者 平台打包完成!"
else
    echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 生产者 平台打包失败!"
    exit 1
fi

go build -o dot_consumer main_consumer.go

# check if the binary was successfully built
if [ $? -eq 0 ]; then
    echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 消费者 平台打包完成!"
else
    echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 消费者 平台打包失败!"
    exit 1
fi

# reset the GOOS and GOARCH environment variables to their original values
export GOOS=$GOOS_ORIGINAL
export GOARCH=$GOARCH_ORIGINAL

报错如下:

zxx@Macbook dot % ./pack.sh 
2023-06-03 00:31:29 Success: Swag生成成功,正在执行打包···
# dot/middleware/kfk
middleware/kfk/kfk_consumer.go:18:22: undefined: kafka.ConfigMap
middleware/kfk/kfk_consumer.go:19:22: undefined: kafka.Consumer
middleware/kfk/kfk_consumer.go:26:19: undefined: kafka.ConfigMap
middleware/kfk/kfk_consumer.go:39:25: undefined: kafka.NewConsumer
middleware/kfk/kfk_consumer.go:56:2: maxMessages declared and not used
middleware/kfk/kfk_production.go:11:17: undefined: kafka.ConfigMap
middleware/kfk/kfk_production.go:12:22: undefined: kafka.Producer
middleware/kfk/kfk_production.go:53:41: undefined: kafka.Producer
middleware/kfk/kfk_production.go:63:37: undefined: kafka.Producer
middleware/kfk/kfk_production.go:68:48: undefined: kafka.Producer
middleware/kfk/kfk_consumer.go:56:2: too many errors
2023-06-03 00:31:29 Success: linux 生产者 平台打包失败

十分不理解,这是什么原因呢?换成kafka之后,不能进行交叉编译么?请各位大佬赐教~ 各位大佬辛苦

共有1个答案

尉迟鸿熙
2023-06-03

首先,我并不是很确定问题的原因,因为确实没用过这个库,我只能说一些可能存在的问题:

  1. 修改 import 为 v2 ,可能由于老版本存在问题

    import (
     "github.com/confluentinc/confluent-kafka-go/v2/kafka"
    )
  2. 编译Linux时尝试添加 tags
    If you are building for Alpine Linux (musl), -tags musl must be specified.

    go build -tags musl ./...
  3. 尝试使用下面的方式进行编译(我认为最可能的问题)

    $ export CC=aarch64-linux-gnu-gcc ; GOOS=linux GOARCH=arm64 CGO_ENABLED=1 go build -v  -ldflags="-extld=$CC"

    https://github.com/confluentinc/confluent-kafka-go/issues/947

 类似资料:
  • 今天线上报错Local: Queue full,导致接口无法使用,请求各位大佬指教 目前使用go连接kafka的库使用的是github.com/confluentinc/confluent-kafka-go/v2/kafka 问题:在我连续像生产者写入110万条数据时,报错 Local: Queue full,具体代码如下: 经过多次的测试,不管是不是进行消费,都是在110万条左右开始报错,并且不

  • kafka-go Motivations We rely on both Go and Kafka a lot at Segment. Unfortunately, the state of the Goclient libraries for Kafka at the time of this writing was not ideal. The availableoptions were: s

  • 问题内容: 这是我的代码: 输出量 为什么和具有相同的地址?在TCP中,我认为为新连接创建了一个新套接字。 问题答案: 这让我困惑了一秒钟,但这是正确的。确实创建了一个新的套接字(具有唯一的本地+远程地址元组)。维基百科的这段引文很好地描述了它: 服务器可以使用相同的本地端口号和本地IP地址创建多个同时建立的TCP套接字,每个套接字都映射到其自己的服务器子进程,并为自己的客户端进程提供服务。由于远

  • 在这篇文章runn-mongob-querys-contined-with-go中说mgo。DialSusInfo:创建一个会话,它维护一个到MongoDB的套接字连接池,但是当我在函数DialSusInfo的文档中查找时,我没有找到谈论池连接的东西,只有我在Dial Function Dial Function中找到了一些东西,上面写着:对于给定的集群,此方法通常只调用一次。然后在获得的会话上使

  • 我在CentOS7(confluent)上安装了Apache Kafka,正试图以分布式模式运行filestream Kafka connect,但收到以下错误: 现在可以通过更新workers.properties(如http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-conf

  • 这个包来连接Kafka,之后实现了个连接池的功能,代码如下: 开发完之后,我打算执行交叉编译,编译为Linux下可执行文件,打包脚本如下:(之前使用MQ的时候就没有问题,换成了kafka就报错): 报错如下: