ElasticSearchOutput插件使用HTTP或UDP将记录插入ElasticSearch数据库。由特定的编码器将消息序列化为JSON结构,同时转换成适当的ElasticSearch Bulk 批量API接口的 Indexing索引的JSON格式。通常,此插件与特定ElasticSearch的编码器插件结合使用,例如ElasticSearch JSON Encoder, ElasticSearch Logstash V0 Encoder 或ElasticSearch Playload Encoder
主要插件源码:主目录=> plugins => elasticsearch => elasticsearch.go
用于批量Message交互,通过chan通道传输数据的格式
type ESBatch struct {
queueCursor string # 队列当前位置
count int64 # 批量大小
batch []byte # Message消息字节
}
消息打包格式
type MsgPack struct {
bytes []byte # Message 消息字节
queueCursor string # 当前位置
}
// ConfigStruct for ElasticSearchOutput plugin.
type ElasticSearchOutputConfig struct {
// 批量刷新毫秒数
FlushInterval uint32 `toml:"flush_interval"`
// 触发批量刷新的批量大小
// (default to 10)
FlushCount int `toml:"flush_count"`
// ElasticSearch 地址,如http://localhost:9200
Server string
// TLS 相关配置,一般使用https协议时需要配置
Tls tcp.TlsConfig
// 可选 用户名
Username string `toml:"username"`
// 可选 用户密码
Password string `toml:"password"`
// 批量更新超时时间
// Default is 0 (infinite)
HTTPTimeout uint32 `toml:"http_timeout"`
// 开始TCP/http保持连接
HTTPDisableKeepalives bool `toml:"http_disable_keepalives"`
// 连接超时
// Default is 0 (infinite)
ConnectTimeout uint32 `toml:"connect_timeout"`
// 是否更新之前是否缓存到磁盘
UseBuffering bool `toml:"use_buffering"`
}
ElasticSearchOutput插件struct,即类似于其他面向对象的类,也可以说ElasticSearchOutput插件类
type ElasticSearchOutput struct {
// 发送message的数量, 一般用于监控
sentMessageCount int64
// drop Message的数量,一般用于监控
dropMessageCount int64
// 总Message的数量,一般用于监控
count int64
backChan chan []byte
// 数据接收通道chan
recvChan chan MsgPack
// 批量数据接收通道chan
batchChan chan ESBatch // Chan to pass completed batches
// 批量输出字节
outBatch []byte
// 当前位置或者游标
queueCursor string
// ElasticSearch批量更新的struct,对批量数据的封装
// The BulkIndexer used to index documents
bulkIndexer BulkIndexer
// Elasticsearch 参数配置
conf *ElasticSearchOutputConfig
// OutputRunner运行容器,暂且叫容器
or OutputRunner
outputBlock *RetryHelper
// pipeline相关struct类
pConfig *PipelineConfig
// report的互斥锁
reportLock sync.Mutex
// 接收停止消息的chan
stopChan chan bool
flushTicker *time.Ticker
}
// A BulkIndexer is used to index documents in ElasticSearch
type BulkIndexer interface {
// 组装Elaticsearch的http或UDP请求包,并发送到Elasticsearch
Index(body []byte) (err error, retry bool)
// 检查是否需要刷新,一般超过最大长度或者超过最大数才会刷新
CheckFlush(count int, length int) bool
}
实现此接口有HttpBulkIndexer和UDPBulkIndexer类
缩略基本代码实现
// 为了看清楚,先把类的方法统计出来
type ElasticSearchOutput struct {
// 初始化struct, Elasticsearch配置参数读取,建立chan
// 判断Indexing模式:http/https 或UDP,创建HttpBulkIndexer和UDPBulkIndexer
Init(config interface{})(err error)
// 做一下基础动作,转换刷新间隔参数,启动两个go程committer和batchSender
Prepare(or OutputRunner,h PluginHelper) error
// 调用OutputRunner的Encoder 转成MsgPack类型,并写入到recvChan通道
ProcessMessage(pack * PipelinePack) error
// 一直接收stopChan通道和recvChan的数据,判断是否需要刷新,需要则调用sendBatch提交,还有一种方式超时,outBatch长度大于0,则调用sendBatch
batchSender()
// 字节转成ESBatch类型,写入到batchChan 同时接收backChan数据放到outBatch
sendBatch()
// 等待batchChan的批量数据,调用sendRecord,并调用OutputRunnerde的UpdateCursor方法更新位置
committer()
// 首先buffer转成BulkIndexer的结构并发送数据到Elasticsearch,发送失败,则重试,直到超过重试次数
sendRecord(buffer[]byte) error
// 重置刷新计时器
CleanUp()
// 报告插件状态
ReportMsg(msg * message.Message) error
}
主要有两种,其中的一种数据大概流向
ProcessMessage调用Encoder => 写入到recvChan => BatchSender读取recvChan的数据合并到outBatch => 需要刷新则调用sendBatch =>sendBatch写入batchChan =>comitter循环读取batchChan数据并调用sendRecord => sendRecord调用HttpBulkIndexer和UDPBulkIndexer的Index方法发送数据到Elasticsearch