当前位置: 首页 > 知识库问答 >
问题:

javascript - Kafka的连接问题?

袁宜民
2023-06-10

这个包来连接Kafka,之后实现了个连接池的功能,代码如下:

package kfk

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "strings"
    "sync"
)

type Pool struct {
    brokers []string
    config  *kafka.ConfigMap
    pool    chan *kafka.Producer
    wg      sync.WaitGroup
}

var (
    KafkaPool *Pool
)

func KfkInit() {
    poolSize := 10
    brokers := []string{"localhost:9092"}
    KafkaPool = NewPool(brokers, poolSize)
}

// 初始化连接池
func NewPool(brokers []string, poolSize int) *Pool {
    config := &kafka.ConfigMap{
        "bootstrap.servers":   strings.Join(brokers, ","),
        "acks":                "all",
        "delivery.timeout.ms": 3000,
    }

    p := &Pool{
        brokers: brokers,
        config:  config,
        pool:    make(chan *kafka.Producer, poolSize),
    }

    for i := 0; i < poolSize; i++ {
        producer, err := p.createProducer()
        if err != nil {
            p.Close()
            panic(err)
        }
        p.pool <- producer
    }

    return p
}

// 创建Producer实例
func (p *Pool) createProducer() (*kafka.Producer, error) {
    producer, err := kafka.NewProducer(p.config)
    if err != nil {
        return nil, err
    }

    return producer, nil
}

// 从连接池中获取Producer实例
func (p *Pool) getProducer() *kafka.Producer {
    return <-p.pool
}

// 将Producer实例放回连接池
func (p *Pool) releaseProducer(producer *kafka.Producer) {
    p.pool <- producer
}

// 发送消息到指定主题
func (p *Pool) SendMessage(topic string, tableName string, message []byte) error {
    producer := p.getProducer()
    defer p.releaseProducer(producer)

    deliveryChan := make(chan kafka.Event)
    err := producer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic:     &topic,
            Partition: kafka.PartitionAny,
        },
        Key:   []byte(tableName),
        Value: message,
    }, deliveryChan)
    if err != nil {
        return err
    }

    e := <-deliveryChan
    m := e.(*kafka.Message)
    if m.TopicPartition.Error != nil {
        return m.TopicPartition.Error
    }

    return nil
}

// 关闭连接池
func (p *Pool) Close() {
    close(p.pool)
    p.wg.Wait()
}

开发完之后,我打算执行交叉编译,编译为Linux下可执行文件,打包脚本如下:(之前使用MQ的时候就没有问题,换成了kafka就报错):

#!/bin/bash

swag init >/dev/null 2>&1
# 根据不同的平台进行交叉编译

echo "$(date +"%Y-%m-%d %H:%M:%S") Success: Swag生成成功,正在执行打包···"

# save the original values of GOOS and GOARCH
GOOS_ORIGINAL=$(go env GOOS)
GOARCH_ORIGINAL=$(go env GOARCH)

# specify the target platform and architecture
platform=${1:-"linux"}
arch=${2:-"amd64"}

# set the environment variables for the target platform and architecture
if [ "$platform" == "linux" ] && [ "$arch" == "amd64" ]; then
    export GOOS=linux
    export GOARCH=amd64
elif [ "$platform" == "windows" ] && [ "$arch" == "amd64" ]; then
    export GOOS=windows
    export GOARCH=amd64
else
    echo "Error: Unsupported platform and architecture combination"
    exit 1
fi

# build the Go binary for the target platform and architecture
go build -o dot_production main_production.go

# check if the binary was successfully built
if [ $? -eq 0 ]; then
    echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 生产者 平台打包完成!"
else
    echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 生产者 平台打包失败!"
    exit 1
fi

go build -o dot_consumer main_consumer.go

# check if the binary was successfully built
if [ $? -eq 0 ]; then
    echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 消费者 平台打包完成!"
else
    echo "$(date +"%Y-%m-%d %H:%M:%S") Success: ${GOOS} 消费者 平台打包失败!"
    exit 1
fi

# reset the GOOS and GOARCH environment variables to their original values
export GOOS=$GOOS_ORIGINAL
export GOARCH=$GOARCH_ORIGINAL

报错如下:

zxx@Macbook dot % ./pack.sh 
2023-06-03 00:31:29 Success: Swag生成成功,正在执行打包···
# dot/middleware/kfk
middleware/kfk/kfk_consumer.go:18:22: undefined: kafka.ConfigMap
middleware/kfk/kfk_consumer.go:19:22: undefined: kafka.Consumer
middleware/kfk/kfk_consumer.go:26:19: undefined: kafka.ConfigMap
middleware/kfk/kfk_consumer.go:39:25: undefined: kafka.NewConsumer
middleware/kfk/kfk_consumer.go:56:2: maxMessages declared and not used
middleware/kfk/kfk_production.go:11:17: undefined: kafka.ConfigMap
middleware/kfk/kfk_production.go:12:22: undefined: kafka.Producer
middleware/kfk/kfk_production.go:53:41: undefined: kafka.Producer
middleware/kfk/kfk_production.go:63:37: undefined: kafka.Producer
middleware/kfk/kfk_production.go:68:48: undefined: kafka.Producer
middleware/kfk/kfk_consumer.go:56:2: too many errors
2023-06-03 00:31:29 Success: linux 生产者 平台打包失败

共有3个答案

孟翰藻
2023-06-10
export CGO_ENABLED=1
export CC=x86_64-linux-musl-gcc
export CXX=x86_64-linux-musl-g++
export GOOS=linux
export GOARCH=amd64
孟鹤龄
2023-06-10

编译前设个环境变量 CGO_ENABLED=1 看看。

凌远
2023-06-10

interface Animal {
name: string;
age: number;
}
type Simplify<T> = {

};

 类似资料:
  • 我在CentOS7(confluent)上安装了Apache Kafka,正试图以分布式模式运行filestream Kafka connect,但收到以下错误: 现在可以通过更新workers.properties(如http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-conf

  • 我用的是Kafka 0.8.2-beta,有2台Ubuntu 14虚拟机: 172.30.141.127正在运行动物园管理员 172.30.141.184在经营一家Kafka经纪人 我正在启动动物园管理员实例,如果一切顺利的话。然后,我尝试启动代理并将其连接到172.30.141.127:2181。它似乎能够在特定的端口上连接并建立会话,但是由于一些似乎没有记录的异常,它失去了连接。 代理输出:

  • 首先我使用了 "github.com/confluentinc/confluent-kafka-go/kafka" 这个包来连接Kafka,之后实现了个连接池的功能,代码如下: 开发完之后,我打算执行交叉编译,编译为Linux下可执行文件,打包脚本如下:(之前使用MQ的时候就没有问题,换成了kafka就报错): 报错如下: 十分不理解,这是什么原因呢?换成kafka之后,不能进行交叉编译么?请各位

  • 我正在尝试对Kafka消息流进行流处理和CEP。为此,我选择Apache Ignite首先实现一个原型。但是,我无法连接到队列: 使用KAFKA2.11-0.10.1.0 apache-ignite-fabric-1.8.0-bin Kafka工作正常,我用一个消费者测试了它。然后启动ignite,然后在spring boot命令行应用程序中运行following。 当应用程序启动时,我得到 20

  • 我在加入一个KStream和一个GlobalKTable时遇到了一个问题,希望能得到您的帮助。 给定两个Kafka主题和: 订单 客户 需求是用客户名称丰富订单流 我正在尝试以下操作: null

  • 我正在使用Apache Flink 1.3.2的集群。我们正在使用Kafka消息,自从将代理升级到1.1.0(从0.10.2)后,我们经常在日志中发现此错误: 因此,有时我们在处理过程中会遇到缺失事件。我们在工作中使用FlinkKafkaConsumer010。 已启用检查点(间隔10秒,超时1分钟,检查点之间的最小暂停时间为5秒,最大并发检查点时间为1秒。E2E的平均持续时间不到1秒,甚至不到半