当前位置: 首页 > 知识库问答 >
问题:

Uber go/zap和kafka go比赛条件

况浩邈
2023-03-14

我正在创建一个自定义记录器,在这里我们可以登录到std out和std err,但也可以登录到kafka(代码示例如下:https://github.com/roppa/kafka-go). 我们有多个主题,所以我们需要多个记录器,但当我们使用多个时,就会发生一些奇怪的事情。当Kafka围棋的两个设置都是异步的时,我不会收到消费者消息,当一个是异步的,另一个是同步的时,我们会得到如下结果:

//consumer topica
{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:04.023Z","msg":"topic-a log 1","UID":"abc123","ns":"test-service"}

{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:05.078Z","msg":"topic-a log 2","UID":"abc123","ns":"test-service"}

{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:06.085Z","msg":"topic-a log 3","UID":"abc123","ns":"test-service"}

//consumer topicb
2020-12-09T15:31:06.085Z    INFO    topic-a log 3   {"UID": "abc123", "ns": "test-service"}
2","UID":"abc123","ns":"test-service"}

更改同步会产生完全不同的效果。我是个新手。

这是main.go:

package main

import (
    "context"
    "kafka-log/logger"
)

func main() {
    loggerA := logger.Init("test-service", "localhost:9092", "topica", false, false)
    loggerB := logger.Init("test-service", "localhost:9092", "topicb", false, true)

    ctx := context.Background()
    ctx2 := context.WithValue(ctx, logger.UID, "abc123")

    loggerA.CInfo(ctx2, "topic-a log 1")
    loggerB.CInfo(ctx2, "topic-b log 1")

    loggerA.CInfo(ctx2, "topic-a log 2")
    loggerB.CInfo(ctx2, "topic-b log 2")

    loggerA.CInfo(ctx2, "topic-a log 3")
    loggerB.CInfo(ctx2, "topic-b log 3")
}

这是记录器/记录器。转到:

package logger

import (
    "context"
    "os"

    "github.com/segmentio/kafka-go"
    "go.uber.org/zap"
    "go.uber.org/zap/zapcore"
)

type (
    key string

    // Logger type embeds zap and also contains the current system name (namespace, Ns)
    Logger struct {
        *zap.Logger
        Ns string
    }

    // KConfig type for creating a new Kafka logger. Takes a Namespace,
    // Broker (eg 'localhost:9092'), Topic (eg 'topic-a')
    KConfig struct {
        Namespace string
        Broker    string
        Topic     string
        Async     bool
    }

    producerInterface interface {
        WriteMessages(ctx context.Context, msgs ...kafka.Message) error
    }

    // KafkaProducer contains a kafka.Producer and Kafka topic
    KafkaProducer struct {
        Producer producerInterface
        Topic    string
    }
)

const (
    // UID - uniquely request identifier
    UID key = "request_id"
)

var customConfig = zapcore.EncoderConfig{
    TimeKey:        "timeStamp",
    LevelKey:       "level",
    NameKey:        "logger",
    CallerKey:      "caller",
    FunctionKey:    zapcore.OmitKey,
    MessageKey:     "msg",
    StacktraceKey:  "stacktrace",
    LineEnding:     zapcore.DefaultLineEnding,
    EncodeLevel:    zapcore.CapitalColorLevelEncoder,
    EncodeTime:     zapcore.ISO8601TimeEncoder,
    EncodeDuration: zapcore.SecondsDurationEncoder,
}

// CInfo this function takes a context as first parameter, extracts specific fields as well as namespace, and calls zap Info
func (l *Logger) CInfo(ctx context.Context, msg string, fields ...zap.Field) {
    l.Info(msg, consolidate(ctx, l.Ns, fields...)...)
}

func consolidate(ctx context.Context, namespace string, fields ...zap.Field) []zap.Field {
    return append(append(ctxToZapFields(ctx), fields...), zap.String("ns", namespace))
}

// See advanced config example: https://github.com/uber-go/zap/blob/master/example_test.go#L105
var lowPriority = zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
    return lvl < zapcore.ErrorLevel && lvl > zapcore.DebugLevel
})
var debugPriority = zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
    return lvl < zapcore.ErrorLevel
})
var kafkaPriority = zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
    return lvl > zapcore.DebugLevel
})

// Init creates a new instance of a logger. Namespace is the name of the module using the logger. broker and topic are Kafa specific,
// if either of these is not set a default console logger is created.
func Init(namespace, broker, topic string, debug, async bool) *Logger {
    var kp *KafkaProducer = nil
    if broker != "" && topic != "" {
        kp = NewKafkaProducer(&KConfig{
        Broker: broker,
        Topic:  topic,
        Async:  async,
    })
    }
    logger := getLogger(debug, kp)
    // logger.Info("initiated logger", zap.String("ns", namespace), zap.Bool("kafka", kp != nil), zap.Bool("debug", debug))
    return &Logger{logger, namespace}
}

func getLogger(debug bool, kp *KafkaProducer) *zap.Logger {
    // cores are logger interfaces
    var cores []zapcore.Core

    // optimise message for console output (human readable)
    consoleEncoder := zapcore.NewConsoleEncoder(customConfig)
    // Lock wraps a WriteSyncer in a mutex to make it safe for concurrent use.
    // See https://godoc.org/go.uber.org/zap/zapcore
    cores = append(cores,
        zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stdout), getPriority(debug)),
        zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stderr), zap.ErrorLevel),
    )

    if kp != nil {
        cores = append(cores, zapcore.NewCore(zapcore.NewJSONEncoder(customConfig), zapcore.Lock(zapcore.AddSync(kp)), kafkaPriority))
    }

    // join inputs, encoders, level-handling functions into cores, then "tee" together
    logger := zap.New(zapcore.NewTee(cores...))
    defer logger.Sync()
    return logger
}

func getPriority(debug bool) zap.LevelEnablerFunc {
    if debug {
        return debugPriority
    }
    return lowPriority
}

func ctxToZapFields(ctx context.Context) []zap.Field {
    reqID, _ := ctx.Value(UID).(string)
    return []zap.Field{
        zap.String("UID", reqID),
    }
}

// NewKafkaProducer instantiates a kafka.Producer, saves topic, and returns a KafkaProducer
func NewKafkaProducer(c *KConfig) *KafkaProducer {
    return &KafkaProducer{
        Producer: kafka.NewWriter(kafka.WriterConfig{
            Brokers:      []string{c.Broker},
            Topic:        c.Topic,
            Balancer:     &kafka.Hash{},
            Async:        c.Async,
            RequiredAcks: -1, // -1 = all
        }),
        Topic: c.Topic,
    }
}

// Write takes a message as a byte slice, wraps in a kafka.message and calls kafka Produce
func (kp *KafkaProducer) Write(msg []byte) (int, error) {
    return len(msg), kp.Producer.WriteMessages(context.Background(), kafka.Message{
        Key:   []byte(""),
        Value: msg,
    })
}

我将这些用于消费者:

docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic topica

docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic topicb

这是我的kafka docker-comple:

version: '3.8'

services:
  
  zookeeper:
    image: confluentinc/cp-zookeeper
    networks:
      - kafka-net
    container_name: zookeeper
    environment:
        ZOOKEEPER_CLIENT_PORT: 2181
    ports:
        - 2181:2181

  kafka:
    image: confluentinc/cp-kafka
    networks:
      - kafka-net
    container_name: kafka
    environment:
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        ALLOW_PLAINTEXT_LISTENER: "yes"
        KAFKA_LISTENERS-INTERNAL: //kafka:29092,EXTERNAL://localhost:9092
        KAFKA_ADVERTISED: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
        KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
        KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
    ports:
        - 9092:9092
        - 29092:29092
    depends_on:
        - zookeeper
    restart: on-failure

networks:
  kafka-net:
    driver: bridge

共有1个答案

向杜吟
2023-03-14

我设想您的程序在异步消息有时间发送之前就已经退出了(尽管如果我正确阅读了您的示例,我很奇怪“topic-a log 3”是唯一的日志消息)。与javascript不同,Go在退出之前不会等待所有线程/Goroutine终止。

还将突出显示kafka go异步配置的docstring:

// Setting this flag to true causes the WriteMessages method to never block.
// It also means that errors are ignored since the caller will not receive
// the returned value. Use this only if you don't care about guarantees of
// whether the messages were written to kafka.

在解决方案方面:我认为您可以通过调用writer上的Close来解决这个问题:

https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.Close

Close刷新挂起的写入,并等待所有写入完成,然后返回。调用Close还可以防止向写入程序提交新的写操作,对WriteMessages等的进一步调用将因io而失败。错误关闭管道。

您需要显示底层的KafkaProducer。制作人,并致电Kafka制作人。制作人退出前关闭。

可能有更聪明的方法来组织清理工作,但我似乎找不到比关闭编写器更简单的方法来清除挂起的消息。

 类似资料:
  • 问题内容: 这是带有潜在竞争条件的Django视图的简单示例: 竞争条件应该非常明显:用户可以两次发出此请求,并且该应用程序可能同时执行,从而导致其中一个请求覆盖另一个请求。 假设函数相对复杂,并且基于无法放置在单个存储过程中并且难以放置在存储过程中的各种奇怪的东西进行计算。 所以这是我的问题:django可使用哪种锁定机制来处理类似的情况? 问题答案: Django 1.4+支持select_f

  • 问题内容: 我正在阅读本书第4.3.5节 我不清楚它在哪里说 私有构造函数的存在是为了避免如果将复制构造函数实现为(px,py)时会发生竞争情况。这是私有构造函数捕获习语的一个示例(Bloch和Gafter,2005)。 我知道它提供了一个同时在数组中一次获取x和y的getter,而不是为每个数组分别获取一个getter,因此调用方将看到一致的值,但是为什么要使用private构造函数呢?这有什么

  • 比赛速度功能有助于您保持稳定配速,并在设定距离内达到您的目标时间。定义某段距离的目标时间 - 例如将 10 公里跑步的目标时间设定为 45 分钟,并跟踪对比实际用时与这个预设目标的差距。 您可以在手表上设置比赛速度,或者可以在 Flow 网络服务或应用程序中设置比赛速度目标,并同步至手表。 如果您已计划好当天的比赛速度目标,手表会在进入训练准备模式时建议您启动该目标。 在手表上创建比赛速度目标 您

  • 比赛速度功能有助于您保持稳定配速,并在设定距离内达到您的目标时间。定义某段距离的目标时间 - 例如将 10 公里跑步的目标时间设定为 45 分钟,并跟踪对比实际用时与这个预设目标的差距。 您可以在手表上设置比赛速度,或者可以在 Flow 网络服务或应用中设置比赛速度目标,并同步至手表。 如果您已计划好当天的比赛速度目标,手表会在进入训练准备模式时建议您启动该目标。 在手表上创建比赛速度目标 您可以

  • 本文向大家介绍Android自定义控件实现球赛比分条效果,包括了Android自定义控件实现球赛比分条效果的使用技巧和注意事项,需要的朋友参考一下 本文实例为大家分享了Android实现球赛比分条效果的具体代码,供大家参考,具体内容如下 效果图如下所示: 该控件需要输入两个参数,左边的得分数和右边的的分数 然后根据两边的得分的比例绘制中间的比分条 首先将控件的宽度平均分配为10分,第一份和最后一份

  • 我使用模式验证来验证响应,值返回一个数字或“NA”,下面是响应和模式验证。 收到错误消息: 如何纠正匹配表达式?