s" "time" "unsafe" "github.com/Shopify/sarama" "github.com/fluent/fluent-bit-go/output" "github.com/ugorji/go/codec"
)
var (
brokers []string
producer sarama.SyncProducer
timeout = 0 * time.Minute
topic string
module string
messageKey string
)
//export FLBPluginRegister func FLBPluginRegister(ctx unsafe.Pointer) int {
return output.FLBPluginRegister(ctx, "out_kafka", "Kafka Output Plugin.!")
}
//export FLBPluginInit // ctx (context) pointer to fluentbit context (state/ c code) func FLBPluginInit(ctx unsafe.Pointer) int {
if bs := output.FLBPluginConfigKey(ctx, "brokers"); bs != "" {
brokers = strings.Split(bs, ",")
} else {
log.Printf("you must set brokers")
return output.FLB_ERROR
}
if tp := output.FLBPluginConfigKey(ctx, "topics"); tp != "" {
topic = tp
} else {
log.Printf("you must set topics")
return output.FLB_ERROR
}
if mo := output.FLBPluginConfigKey(ctx, "module"); mo != "" {
module = mo
} else {
log.Printf("you must set module")
return output.FLB_ERROR
}
if key := output.FLBPluginConfigKey(ctx, "message_key"); key != "" {
messageKey = key
} else {
log.Printf("you must set message_key")
return output.FLB_ERROR
}
config := sarama.NewConfig()
config.Producer.Return.Successes = true if required_acks := output.FLBPluginConfigKey(ctx, "required_acks"); required_acks != "" {
if acks, err := strconv.Atoi(required_acks); err == nil {
config.Producer.RequiredAcks = sarama.RequiredAcks(acks)
}
}
if compression_codec := output.FLBPluginConfigKey(ctx, "compression_codec"); compression_codec != "" {
if codec, err := strconv.Atoi(compression_codec); err == nil {
config.Producer.Compression = sarama.CompressionCodec(codec)
}
}
if max_retry := output.FLBPluginConfigKey(ctx, "max_retry"); max_retry != "" {
if max_retry, err := strconv.Atoi(max_retry); err == nil {
config.Producer.Retry.Max = max_retry
}
}
if timeout == 0 {
timeout = 5 * time.Minute
}
// If Kafka is not running on init, wait to connect
deadline := time.Now().Add(timeout)
for tries := 0; time.Now().Before(deadline); tries++ {
var err error
if producer == nil {
producer, err = sarama.NewSyncProducer(brokers, config)
}
if err == nil {
return output.FLB_OK
}
log.Printf("Cannot connect to Kafka: (%s) retrying...", err)
time.Sleep(time.Second * 30)
}
log.Printf("Kafka failed to respond after %s", timeout)
return output.FLB_ERROR
}
//export FLBPluginFlush // FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent. func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
var h codec.MsgpackHandle
var b []byte var m interface{}
var err error
b = C.GoBytes(data, length)
dec := codec.NewDecoderBytes(b, &h)
// Iterate the original MessagePack array var msgs []*sarama.ProducerMessage
for {
// decode the msgpack data
err = dec.Decode(&m)
if err != nil {
if err == io.EOF {
break
}
log.Printf("Failed to decode msgpack data: %v\n", err)
return output.FLB_ERROR
}
// Get a slice and their two entries: timestamp and map
slice := reflect.ValueOf(m)
data := slice.Index(1)
// Convert slice data to a real map and iterate
mapData := data.Interface().(map[interface{}]interface{})
flattenData, err := Flatten(mapData, "", UnderscoreStyle)
if err != nil {
break
}
message := ""
host := "" for k, v := range flattenData {
value := "" switch t := v.(type) {
case string:
value = t
case []byte:
value = string(t)
default:
value = fmt.Sprintf("%v", v)
}
if k == "pod_name" {
host = value
}
if k == messageKey {
message = value
}
}
if message == "" || host == "" {
break
}
m := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(fmt.Sprintf("host=%s|module=%s", host, module)),
Value: sarama.ByteEncoder(message),
}
msgs = append(msgs, m)
}
err = producer.SendMessages(msgs)
if err != nil {
log.Printf("FAILED to send kafka message: %s\n", err)
return output.FLB_ERROR
}
return output.FLB_OK
}
//export FLBPluginExit func FLBPluginExit() int {
producer.Close()
return output.FLB_OK
}
func main() {
}
PS
当然除了FLBPluginConfigKey之外,也可以通过获取环境变量来获得设置参数。
ctx相当于一个上下文,负责之间的数据的传递。
编译的时候
go build -buildmode=c-shared -o out_kafka.so .
生成out_kafka.so
执行的时候
/fluent-bit/bin/fluent-bit" -c /fluent-bit/etc/fluent-bit.conf -e /fluent-bit/out_kafka.so
采用类似的编写结构,就可以定制化自己的输出插件了。
本文转自SegmentFault-k8s与日志--采用golang实现Fluent Bit的output插件