首先我使用了 "github.com/confluentinc/confluent-kafka-go/kafka"
这个包来连接Kafka,之后实现了个连接池的功能,代码如下:
package kfk
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"strings"
"sync"
)
type Pool struct {
brokers []string
config *kafka.ConfigMap
pool chan *kafka.Producer
wg sync.WaitGroup
}
var (
KafkaPool *Pool
)
func KfkInit() {
poolSize := 10
brokers := []string{"localhost:9092"}
KafkaPool = NewPool(brokers, poolSize)
}
// 初始化连接池
func NewPool(brokers []string, poolSize int) *Pool {
config := &kafka.ConfigMap{
"bootstrap.servers": strings.Join(brokers, ","),
"acks": "all",
"delivery.timeout.ms": 3000,
}
p := &Pool{
brokers: brokers,
config: config,
pool: make(chan *kafka.Producer, poolSize),
}
for i := 0; i < poolSize; i++ {
producer, err := p.createProducer()
if err != nil {
p.Close()
panic(err)
}
p.pool <- producer
}
return p
}
// 创建Producer实例
func (p *Pool) createProducer() (*kafka.Producer, error) {
producer, err := kafka.NewProducer(p.config)
if err != nil {
return nil, err
}
return producer, nil
}
// 从连接池中获取Producer实例
func (p *Pool) getProducer() *kafka.Producer {
return <-p.pool
}
// 将Producer实例放回连接池
func (p *Pool) releaseProducer(producer *kafka.Producer) {
p.pool <- producer
}
// 发送消息到指定主题
func (p *Pool) SendMessage(topic string, tableName string, message []byte) error {
producer := p.getProducer()
defer p.releaseProducer(producer)
deliveryChan := make(chan kafka.Event)
err := producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Key: []byte(tableName),
Value: message,
}, deliveryChan)
if err != nil {
return err
}
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
return m.TopicPartition.Error
}
return nil
}
// 关闭连接池
func (p *Pool) Close() {
close(p.pool)
p.wg.Wait()
}
开发完之后,我打算执行交叉编译
,编译为Linux下可执行文件,打包脚本如下:(之前使用MQ的时候就没有问题,换成了kafka就报错):
#!/bin/bash
swag init >/dev/null 2>&1
# 根据不同的平台进行交叉编译
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: Swag生成成功,正在执行打包···"
# save the original values of GOOS and GOARCH
GOOS_ORIGINAL=$(go env GOOS)
GOARCH_ORIGINAL=$(go env GOARCH)
# specify the target platform and architecture
platform=${1:-"linux"}
arch=${2:-"amd64"}
# set the environment variables for the target platform and architecture
if [ "$platform" == "linux" ] && [ "$arch" == "amd64" ]; then
export GOOS=linux
export GOARCH=amd64
elif [ "$platform" == "windows" ] && [ "$arch" == "amd64" ]; then
export GOOS=windows
export GOARCH=amd64
else
echo "Error: Unsupported platform and architecture combination"
exit 1
fi
# build the Go binary for the target platform and architecture
go build -o dot_production main_production.go
# check if the binary was successfully built
if [ $? -eq 0 ]; then
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 生产者 平台打包完成!"
else
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 生产者 平台打包失败!"
exit 1
fi
go build -o dot_consumer main_consumer.go
# check if the binary was successfully built
if [ $? -eq 0 ]; then
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 消费者 平台打包完成!"
else
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 消费者 平台打包失败!"
exit 1
fi
# reset the GOOS and GOARCH environment variables to their original values
export GOOS=$GOOS_ORIGINAL
export GOARCH=$GOARCH_ORIGINAL
报错如下:
zxx@Macbook dot % ./pack.sh
2023-06-03 00:31:29 Success: Swag生成成功,正在执行打包···
# dot/middleware/kfk
middleware/kfk/kfk_consumer.go:18:22: undefined: kafka.ConfigMap
middleware/kfk/kfk_consumer.go:19:22: undefined: kafka.Consumer
middleware/kfk/kfk_consumer.go:26:19: undefined: kafka.ConfigMap
middleware/kfk/kfk_consumer.go:39:25: undefined: kafka.NewConsumer
middleware/kfk/kfk_consumer.go:56:2: maxMessages declared and not used
middleware/kfk/kfk_production.go:11:17: undefined: kafka.ConfigMap
middleware/kfk/kfk_production.go:12:22: undefined: kafka.Producer
middleware/kfk/kfk_production.go:53:41: undefined: kafka.Producer
middleware/kfk/kfk_production.go:63:37: undefined: kafka.Producer
middleware/kfk/kfk_production.go:68:48: undefined: kafka.Producer
middleware/kfk/kfk_consumer.go:56:2: too many errors
2023-06-03 00:31:29 Success: linux 生产者 平台打包失败
十分不理解,这是什么原因呢?换成kafka之后,不能进行交叉编译么?请各位大佬赐教~ 各位大佬辛苦
首先,我并不是很确定问题的原因,因为确实没用过这个库,我只能说一些可能存在的问题:
修改 import 为 v2 ,可能由于老版本存在问题
import (
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
编译Linux时尝试添加 tags
If you are building for Alpine Linux (musl), -tags musl must be specified.
go build -tags musl ./...
尝试使用下面的方式进行编译(我认为最可能的问题)
$ export CC=aarch64-linux-gnu-gcc ; GOOS=linux GOARCH=arm64 CGO_ENABLED=1 go build -v -ldflags="-extld=$CC"
https://github.com/confluentinc/confluent-kafka-go/issues/947
今天线上报错Local: Queue full,导致接口无法使用,请求各位大佬指教 目前使用go连接kafka的库使用的是github.com/confluentinc/confluent-kafka-go/v2/kafka 问题:在我连续像生产者写入110万条数据时,报错 Local: Queue full,具体代码如下: 经过多次的测试,不管是不是进行消费,都是在110万条左右开始报错,并且不
kafka-go Motivations We rely on both Go and Kafka a lot at Segment. Unfortunately, the state of the Goclient libraries for Kafka at the time of this writing was not ideal. The availableoptions were: s
问题内容: 这是我的代码: 输出量 为什么和具有相同的地址?在TCP中,我认为为新连接创建了一个新套接字。 问题答案: 这让我困惑了一秒钟,但这是正确的。确实创建了一个新的套接字(具有唯一的本地+远程地址元组)。维基百科的这段引文很好地描述了它: 服务器可以使用相同的本地端口号和本地IP地址创建多个同时建立的TCP套接字,每个套接字都映射到其自己的服务器子进程,并为自己的客户端进程提供服务。由于远
在这篇文章runn-mongob-querys-contined-with-go中说mgo。DialSusInfo:创建一个会话,它维护一个到MongoDB的套接字连接池,但是当我在函数DialSusInfo的文档中查找时,我没有找到谈论池连接的东西,只有我在Dial Function Dial Function中找到了一些东西,上面写着:对于给定的集群,此方法通常只调用一次。然后在获得的会话上使
我在CentOS7(confluent)上安装了Apache Kafka,正试图以分布式模式运行filestream Kafka connect,但收到以下错误: 现在可以通过更新workers.properties(如http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-conf
这个包来连接Kafka,之后实现了个连接池的功能,代码如下: 开发完之后,我打算执行交叉编译,编译为Linux下可执行文件,打包脚本如下:(之前使用MQ的时候就没有问题,换成了kafka就报错): 报错如下: