写在前面:
本章来源于 https://gitee.com/Cookie_XiaoD/easykafka 大佬的文章,大部分代码都可以再上面找到
再次感谢大佬提供的:
虽然是别人写的,最好是自己理解透彻,关于kafka内容网上很多就不放进去了
test
--------------main.go
--------------setting.json
--------------common
------------------------common/kafka
------------------------common/kafka/consumer.go
------------------------common/kafka/data.go
------------------------common/kafka/kafka_setting.go
------------------------common/kafka/producer.go
--------------common/spec
------------------------common/spec/aormode.go
------------------------common/spec/auth.go
------------------------common/spec/consumer.go
------------------------common/spec/msg.go
------------------------common/spec/producer.go
------------------------producer.go —使用
{
"kafka": {
"brokers": "127.0.0.1:9092",
"topicName": "test"
}
}
//读取
var KafkaData = &Kafka{}
//读取setting文件
func init() {
filePtr, err := os.Open("./setting.json")
if err != nil {
log.Printf("文件打开失败 [Err:%s]", err.Error())
return
}
defer filePtr.Close()
// 创建json解码器
info := KafkaConfig{}
decoder := json.NewDecoder(filePtr)
err = decoder.Decode(&info)
if err != nil {
fmt.Println("redis解码失败", err.Error())
}
KafkaData = &info.Kafka
}
type KafkaConfig struct {
Kafka Kafka `json:"kafka"`
}
type Kafka struct {
Brokers string `json:"brokers"`
TopicName string `json:"topicName"`
}
package kafka
import (
easykafka "common/kafka/spec"
"common/kafka/spec"
"log"
)
func ProduceSend(content string) error {
//就是setting.json里面的配置
var topicName = easykafka.KafkaData.TopicName
var brokers = easykafka.KafkaData.Brokers
var err error
producer, err := easykafka.NewProducer(
brokers,
easykafka.WithProducerErrorHandler(func(err *easykafka.AsyncProduceError) {
log.Println("异步生产消息时发生错误:", err)
}),
easykafka.WithProducerAckMode(spec.WaitLeader))
if err != nil {
log.Fatalf(err.Error())
}
defer func() {
if err = producer.Close(); err != nil {
log.Println("关闭生产者发生错误:", err)
}
}()
size, err := producer.SyncProduce(topicName, "mineLog", content)
if err != nil {
log.Println("发送消息错误:", err)
return err
} else {
log.Println("发送成功, 数据大小:", size)
}
return nil
}
type ExampleData struct {
Content string `json:"content"`
Seq string `json:"seq"`
}
package main
import (
"context"
"encoding/json"
easykafka "common/kafka/spec"
"common/kafka/spec"
"log"
"time"
)
var consumer spec.Consumer
var msgs = make(chan spec.Msg, 10000)
var brokers = "127.0.0.1:9092"
func main() {
startConsumer()
var batch []spec.Msg
//模拟每收到5条数据就进行一次处理,处理完成后批量提交
for {
msg := <-msgs
batch = append(batch, msg)
if len(batch) != 5 {
log.Println("接收到", len(batch), "条数据")
continue
}
log.Println("接收到5条数据,开始处理")
for _, v := range batch {
var data ExampleData
err := json.Unmarshal(v.Data(), &data)
if err != nil {
continue
}
log.Println("处理数据:", data)
}
log.Println("处理完成开始批量提交")
err := consumer.ConfirmBatch(batch)
if err != nil {
log.Println("批量提交失败:", err)
} else {
log.Println("批量提交成功")
}
batch = batch[0:0]
time.Sleep(1 * time.Second)
}
}
func startConsumer() {
go func() {
var err error
consumer, err = easykafka.NewConsumer(
brokers,
[]string{"topic_example"},
"group_example",
handleMsg,
easykafka.WithConsumerErrorHandler(handleErr),
easykafka.WithConsumerAOR(spec.Earliest),
easykafka.WithConsumerManualCommit(true))
if err != nil {
log.Fatalf(err.Error())
}
defer func() {
if err := consumer.Close(); err != nil {
log.Println("关闭消费者发生错误:", err)
}
}()
log.Println("开始接收数据")
consumer.StartBlock(context.Background())
}()
}
func handleMsg(msg spec.Msg) {
log.Println("接收到数据", msg.Topic(), msg.Partition(), msg.Offset())
msgs <- msg
}
func handleErr(err *easykafka.ConsumeError) {
log.Println("发生错误:", err)
}
type ExampleData struct {
Content string `json:"content"`
Seq string `json:"seq"`
}
在:SyncProduce 函数中有:msg, err := p.getMsg(topic, key, data) 代码:作者是这样写的:
func (p *Producer) getMsg(topic, key string, data interface{}) (*sarama.ProducerMessage, error) {
if strings.TrimSpace(topic) == "" {
return nil, errors.New("topic无效")
}
if strings.TrimSpace(key) == "" {
return nil, errors.New("key无效")
}
if data == nil {
return nil, errors.New("data无效")
}
ret, err := p.dataEncoder(data)
if err != nil {
return nil, fmt.Errorf("data序列化失败:%w", err)
}
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(ret),
}
return msg, nil
}
----------------------------------------
Value: sarama.ByteEncoder(ret) 是已经序列化了需要消费或者发送的值,如果你的代码只是接受的是string, 这个地方需要修改,不序列化就好,不然用转json的时候会报错