这个包来连接Kafka,之后实现了个连接池的功能,代码如下:
package kfk
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"strings"
"sync"
)
type Pool struct {
brokers []string
config *kafka.ConfigMap
pool chan *kafka.Producer
wg sync.WaitGroup
}
var (
KafkaPool *Pool
)
func KfkInit() {
poolSize := 10
brokers := []string{"localhost:9092"}
KafkaPool = NewPool(brokers, poolSize)
}
// 初始化连接池
func NewPool(brokers []string, poolSize int) *Pool {
config := &kafka.ConfigMap{
"bootstrap.servers": strings.Join(brokers, ","),
"acks": "all",
"delivery.timeout.ms": 3000,
}
p := &Pool{
brokers: brokers,
config: config,
pool: make(chan *kafka.Producer, poolSize),
}
for i := 0; i < poolSize; i++ {
producer, err := p.createProducer()
if err != nil {
p.Close()
panic(err)
}
p.pool <- producer
}
return p
}
// 创建Producer实例
func (p *Pool) createProducer() (*kafka.Producer, error) {
producer, err := kafka.NewProducer(p.config)
if err != nil {
return nil, err
}
return producer, nil
}
// 从连接池中获取Producer实例
func (p *Pool) getProducer() *kafka.Producer {
return <-p.pool
}
// 将Producer实例放回连接池
func (p *Pool) releaseProducer(producer *kafka.Producer) {
p.pool <- producer
}
// 发送消息到指定主题
func (p *Pool) SendMessage(topic string, tableName string, message []byte) error {
producer := p.getProducer()
defer p.releaseProducer(producer)
deliveryChan := make(chan kafka.Event)
err := producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Key: []byte(tableName),
Value: message,
}, deliveryChan)
if err != nil {
return err
}
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
return m.TopicPartition.Error
}
return nil
}
// 关闭连接池
func (p *Pool) Close() {
close(p.pool)
p.wg.Wait()
}
开发完之后,我打算执行交叉编译,编译为Linux下可执行文件,打包脚本如下:(之前使用MQ的时候就没有问题,换成了kafka就报错):
#!/bin/bash
swag init >/dev/null 2>&1
# 根据不同的平台进行交叉编译
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: Swag生成成功,正在执行打包···"
# save the original values of GOOS and GOARCH
GOOS_ORIGINAL=$(go env GOOS)
GOARCH_ORIGINAL=$(go env GOARCH)
# specify the target platform and architecture
platform=${1:-"linux"}
arch=${2:-"amd64"}
# set the environment variables for the target platform and architecture
if [ "$platform" == "linux" ] && [ "$arch" == "amd64" ]; then
export GOOS=linux
export GOARCH=amd64
elif [ "$platform" == "windows" ] && [ "$arch" == "amd64" ]; then
export GOOS=windows
export GOARCH=amd64
else
echo "Error: Unsupported platform and architecture combination"
exit 1
fi
# build the Go binary for the target platform and architecture
go build -o dot_production main_production.go
# check if the binary was successfully built
if [ $? -eq 0 ]; then
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 生产者 平台打包完成!"
else
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 生产者 平台打包失败!"
exit 1
fi
go build -o dot_consumer main_consumer.go
# check if the binary was successfully built
if [ $? -eq 0 ]; then
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 消费者 平台打包完成!"
else
echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 消费者 平台打包失败!"
exit 1
fi
# reset the GOOS and GOARCH environment variables to their original values
export GOOS=$GOOS_ORIGINAL
export GOARCH=$GOARCH_ORIGINAL
报错如下:
zxx@Macbook dot % ./pack.sh
2023-06-03 00:31:29 Success: Swag生成成功,正在执行打包···
# dot/middleware/kfk
middleware/kfk/kfk_consumer.go:18:22: undefined: kafka.ConfigMap
middleware/kfk/kfk_consumer.go:19:22: undefined: kafka.Consumer
middleware/kfk/kfk_consumer.go:26:19: undefined: kafka.ConfigMap
middleware/kfk/kfk_consumer.go:39:25: undefined: kafka.NewConsumer
middleware/kfk/kfk_consumer.go:56:2: maxMessages declared and not used
middleware/kfk/kfk_production.go:11:17: undefined: kafka.ConfigMap
middleware/kfk/kfk_production.go:12:22: undefined: kafka.Producer
middleware/kfk/kfk_production.go:53:41: undefined: kafka.Producer
middleware/kfk/kfk_production.go:63:37: undefined: kafka.Producer
middleware/kfk/kfk_production.go:68:48: undefined: kafka.Producer
middleware/kfk/kfk_consumer.go:56:2: too many errors
2023-06-03 00:31:29 Success: linux 生产者 平台打包失败
export CGO_ENABLED=1
export CC=x86_64-linux-musl-gcc
export CXX=x86_64-linux-musl-g++
export GOOS=linux
export GOARCH=amd64
编译前设个环境变量 CGO_ENABLED=1
看看。
interface Animal {
name: string;
age: number;
}
type Simplify<T> = {
};
我在CentOS7(confluent)上安装了Apache Kafka,正试图以分布式模式运行filestream Kafka connect,但收到以下错误: 现在可以通过更新workers.properties(如http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-conf
我用的是Kafka 0.8.2-beta,有2台Ubuntu 14虚拟机: 172.30.141.127正在运行动物园管理员 172.30.141.184在经营一家Kafka经纪人 我正在启动动物园管理员实例,如果一切顺利的话。然后,我尝试启动代理并将其连接到172.30.141.127:2181。它似乎能够在特定的端口上连接并建立会话,但是由于一些似乎没有记录的异常,它失去了连接。 代理输出:
首先我使用了 "github.com/confluentinc/confluent-kafka-go/kafka" 这个包来连接Kafka,之后实现了个连接池的功能,代码如下: 开发完之后,我打算执行交叉编译,编译为Linux下可执行文件,打包脚本如下:(之前使用MQ的时候就没有问题,换成了kafka就报错): 报错如下: 十分不理解,这是什么原因呢?换成kafka之后,不能进行交叉编译么?请各位
我正在尝试对Kafka消息流进行流处理和CEP。为此,我选择Apache Ignite首先实现一个原型。但是,我无法连接到队列: 使用KAFKA2.11-0.10.1.0 apache-ignite-fabric-1.8.0-bin Kafka工作正常,我用一个消费者测试了它。然后启动ignite,然后在spring boot命令行应用程序中运行following。 当应用程序启动时,我得到 20
我在加入一个KStream和一个GlobalKTable时遇到了一个问题,希望能得到您的帮助。 给定两个Kafka主题和: 订单 客户 需求是用客户名称丰富订单流 我正在尝试以下操作: null
我正在使用Apache Flink 1.3.2的集群。我们正在使用Kafka消息,自从将代理升级到1.1.0(从0.10.2)后,我们经常在日志中发现此错误: 因此,有时我们在处理过程中会遇到缺失事件。我们在工作中使用FlinkKafkaConsumer010。 已启用检查点(间隔10秒,超时1分钟,检查点之间的最小暂停时间为5秒,最大并发检查点时间为1秒。E2E的平均持续时间不到1秒,甚至不到半