当前位置: 首页 > 工具软件 > Heka > 使用案例 >

heka 0.11.0源码分析--Elasticsearch OutPut插件

海保臣
2023-12-01

ElasticSearchOutput

ElasticSearchOutput插件使用HTTP或UDP将记录插入ElasticSearch数据库。由特定的编码器将消息序列化为JSON结构,同时转换成适当的ElasticSearch Bulk 批量API接口的 Indexing索引的JSON格式。通常,此插件与特定ElasticSearch的编码器插件结合使用,例如ElasticSearch JSON Encoder, ElasticSearch Logstash V0 Encoder 或ElasticSearch Playload Encoder

主要插件源码:主目录=> plugins => elasticsearch => elasticsearch.go

用于批量Message交互,通过chan通道传输数据的格式
type ESBatch struct {
queueCursor string # 队列当前位置
count int64 # 批量大小
batch []byte # Message消息字节
}
消息打包格式
type MsgPack struct {
bytes []byte # Message 消息字节
queueCursor string # 当前位置
}

Elasticsearch参数配置struct

// ConfigStruct for ElasticSearchOutput plugin.
type ElasticSearchOutputConfig struct {
	// 批量刷新毫秒数
	FlushInterval uint32 `toml:"flush_interval"`
	// 触发批量刷新的批量大小
	// (default to 10)
	FlushCount int `toml:"flush_count"`
	// ElasticSearch 地址,如http://localhost:9200
	Server string
	// TLS 相关配置,一般使用https协议时需要配置
	Tls tcp.TlsConfig
	// 可选 用户名
	Username string `toml:"username"`
	// 可选 用户密码
	Password string `toml:"password"`
	// 批量更新超时时间
	// Default is 0 (infinite)
	HTTPTimeout uint32 `toml:"http_timeout"`
	// 开始TCP/http保持连接
	HTTPDisableKeepalives bool `toml:"http_disable_keepalives"`
	// 连接超时
	// Default is 0 (infinite)
	ConnectTimeout uint32 `toml:"connect_timeout"`
	// 是否更新之前是否缓存到磁盘
	UseBuffering bool `toml:"use_buffering"`
}

ElasticSearchOutput插件struct

ElasticSearchOutput插件struct,即类似于其他面向对象的类,也可以说ElasticSearchOutput插件类

type ElasticSearchOutput struct {
   // 发送message的数量, 一般用于监控
	sentMessageCount int64
	// drop Message的数量,一般用于监控
	dropMessageCount int64
	// 总Message的数量,一般用于监控
	count            int64
	backChan         chan []byte
	// 数据接收通道chan
	recvChan         chan MsgPack
	// 批量数据接收通道chan
	batchChan        chan ESBatch // Chan to pass completed batches
	// 批量输出字节
	outBatch         []byte
	// 当前位置或者游标
	queueCursor      string
	// ElasticSearch批量更新的struct,对批量数据的封装
	// The BulkIndexer used to index documents
	bulkIndexer      BulkIndexer 
	// Elasticsearch 参数配置
	conf             *ElasticSearchOutputConfig
	// OutputRunner运行容器,暂且叫容器
	or               OutputRunner
	outputBlock      *RetryHelper
	// pipeline相关struct类
	pConfig          *PipelineConfig
	// report的互斥锁
	reportLock       sync.Mutex
	// 接收停止消息的chan
	stopChan         chan bool
	flushTicker      *time.Ticker
}

BulkIndexer接口

// A BulkIndexer is used to index documents in ElasticSearch
type BulkIndexer interface {
	// 组装Elaticsearch的http或UDP请求包,并发送到Elasticsearch
	Index(body []byte) (err error, retry bool)
	// 检查是否需要刷新,一般超过最大长度或者超过最大数才会刷新
	CheckFlush(count int, length int) bool
}

实现此接口有HttpBulkIndexer和UDPBulkIndexer类

ElasticSearchOutput类struct主要方法

缩略基本代码实现

// 为了看清楚,先把类的方法统计出来
type ElasticSearchOutput struct {
    // 初始化struct, Elasticsearch配置参数读取,建立chan
    // 判断Indexing模式:http/https 或UDP,创建HttpBulkIndexer和UDPBulkIndexer
    Init(config interface{})(err error)
    // 做一下基础动作,转换刷新间隔参数,启动两个go程committer和batchSender 
    Prepare(or OutputRunner,h PluginHelper) error
    // 调用OutputRunner的Encoder 转成MsgPack类型,并写入到recvChan通道
    ProcessMessage(pack * PipelinePack) error
    // 一直接收stopChan通道和recvChan的数据,判断是否需要刷新,需要则调用sendBatch提交,还有一种方式超时,outBatch长度大于0,则调用sendBatch
    batchSender()
    // 字节转成ESBatch类型,写入到batchChan 同时接收backChan数据放到outBatch
    sendBatch()
    // 等待batchChan的批量数据,调用sendRecord,并调用OutputRunnerde的UpdateCursor方法更新位置
    committer()
    // 首先buffer转成BulkIndexer的结构并发送数据到Elasticsearch,发送失败,则重试,直到超过重试次数
    sendRecord(buffer[]byte) error
    // 重置刷新计时器
    CleanUp()
    // 报告插件状态
    ReportMsg(msg * message.Message) error
    
}

主要有两种,其中的一种数据大概流向
ProcessMessage调用Encoder => 写入到recvChan => BatchSender读取recvChan的数据合并到outBatch => 需要刷新则调用sendBatch =>sendBatch写入batchChan =>comitter循环读取batchChan数据并调用sendRecord => sendRecord调用HttpBulkIndexer和UDPBulkIndexer的Index方法发送数据到Elasticsearch

 类似资料: