代码:
package main
import (
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
"sync"
"time"
)
func main() {
var (
wg sync.WaitGroup
success_num, error_num int
)
config := sarama.NewConfig() // 1
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
// NewRandomPartitioner NewHashPartitioner NewRoundRobinPartitioner NewManualPartitioner NewReferenceHashPartitioner
config.Producer.Partitioner = sarama.NewRandomPartitioner
client,err := sarama.NewClient([]string{"localhost:9192","localhost:9292","localhost:9392"}, config) // 2
if err != nil {
panic(err)
}
defer client.Close()
producer, err := sarama.NewAsyncProducerFromClient(client) // 3
if err != nil {
panic(err)
}
defer producer.AsyncClose()
wg.Add(1)
go func() {
wg.Done()
// config.Producer.Return.Successes = true 后一定要监听这个chan,默认大小256 如果满了就阻塞掉
for range producer.Successes() {
success_num++
}
}()
wg.Add(1)
go func() {
wg.Done()
// config.Producer.Return.Errors = true 后一定要监听这个chan,默认大小256 如果满了就阻塞掉
for range producer.Errors() {
error_num++
}
}()
// Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
ProducerLoop:
for {
message := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder("testing 123")} // 4
select {
case producer.Input() <- message:
case <-signals:
// Trigger a shutdown of the producer.
break ProducerLoop
}
}
wg.Wait()
time.Sleep(time.Second * 2)
log.Printf("Successfully produced: %d; errors: %d\n", success_num, error_num)
}
在第一步, 通过sarama.NewConfig() 创建了 *Config 对象。 内部 是给很多对象定义了初始值
在第二步,通过 sarama.NewClient()创建 Client对象
func NewClient(addrs []string, conf *Config) (Client, error) {
// 前面对config 进行校验
// 初始化client
client := &client{...}
// 填充client.seedBrokers
random := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, index := range random.Perm(len(addrs)) {
client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
}
// 初始化时为true
if conf.Metadata.Full {
// 刷新注册client中的seedBrokers信息,metadata信息
err := client.RefreshMetadata()
switch err {
// 处理异常
}
}
// 开启后台线程定时刷新metadata信息,直到client关闭
go withRecover(client.backgroundMetadataUpdater)
return client, nil
}
在第三步,通过sarama.NewAsyncProducerFromClient() 创建producer,内部调用newAsyncProducer创建
// Check that we are not dealing with a closed Client before processing any other arguments
if client.Closed() {
return nil, ErrClosedClient
}
txnmgr, err := newTransactionManager(client.Config(), client)
if err != nil {
return nil, err
}
p := &asyncProducer{
client: client,
conf: client.Config(),
errors: make(chan *ProducerError),
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]*brokerProducer),
brokerRefs: make(map[*brokerProducer]int),
txnmgr: txnmgr,
}
// launch our singleton dispatchers
go withRecover(p.dispatcher) // 开启输入chan
go withRecover(p.retryHandler)
return p, nil
p.dispatcher 做的事情
func (p *asyncProducer) dispatcher() {
handlers := make(map[string]chan<- *ProducerMessage)
shuttingDown := false
for msg := range p.input {
//....
if msg.flags&shutdown != 0 {
shuttingDown = true
p.inFlight.Done()
continue
} else if msg.retries == 0 {
if shuttingDown {
// we can't just call returnError here because that decrements the wait group,
// which hasn't been incremented yet for this message, and shouldn't be
pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
if p.conf.Producer.Return.Errors {
p.errors <- pErr // 如果开启错误返回, 向chan发送数据
} else {
Logger.Println(pErr)
}
continue
}
p.inFlight.Add(1)
}
// ....
handler := handlers[msg.Topic]
if handler == nil {
handler = p.newTopicProducer(msg.Topic)// 这里开启topic的input监听,并返回input
handlers[msg.Topic] = handler
}
handler <- msg
}
for _, handler := range handlers {
close(handler)
}
}
p.newTopicProducer(msg.Topic)
func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
tp := &topicProducer{
parent: p,
topic: topic,
input: input,
breaker: breaker.New(3, 1, 10*time.Second),
handlers: make(map[int32]chan<- *ProducerMessage),
partitioner: p.conf.Producer.Partitioner(topic),
}
go withRecover(tp.dispatch) // 这里开启partition的input监听
return input
}
tp.dispatch
func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
pp := &partitionProducer{
parent: p,
topic: topic,
partition: partition,
input: input,
breaker: breaker.New(3, 1, 10*time.Second),
retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
}
go withRecover(pp.dispatch)
return input
}
pp.dispatch
func (pp *partitionProducer) dispatch() {
pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
if pp.leader != nil {
pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader) // 这里开启BrokerProducer其处理消息
pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
}
//.....
}
pp.parent.getBrokerProducer(pp.leader)
func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()
bp := p.brokers[broker]
if bp == nil {
bp = p.newBrokerProducer(broker) // 这里开启chan监控去发送消息及获取结果
p.brokers[broker] = bp
p.brokerRefs[bp] = 0
}
p.brokerRefs[bp]++
return bp
}