生产者的代码: // 1. 建立RabbitMQ连接 conn, err := amqp.Dial("amqp://guest:guest@localhost:/") if err !=nil{ fmt.Println(err) return err } defer conn.Close() // 2. 创建channel ch, err := conn.Channel() //failOnError(err, "Failed to open a channel") defer ch.Close() // 3. 声明exchange,routing key //exchange := "test_fanout_exchange" //routingKey := "asdad" // 随便写的 exchange := "couponExchangeNew" //routingKey := "" //不需要设置routing key //queueName := "couponQueue" // 4. 声明(创建)一个交换机 //name:交换器的名称。 //kind:也叫作type,表示交换器的类型。有四种常用类型:direct、fanout、topic、headers。 //durable:是否持久化,true表示是。持久化表示会把交换器的配置存盘,当RMQ Server重启后,会自动加载交换器。 //autoDelete:是否自动删除,true表示是。至少有一条绑定才可以触发自动删除,当所有绑定都与交换器解绑后,会自动删除此交换器。 //internal:是否为内部,true表示是。客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器。 //noWait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用) //args:直接写nil,没研究过,不解释。 //注意,在生产者里声不声明(创建)交换机都可以。这里声明,是为了防止消费者没有启动或者这个交换机原先不存在,导致消息投递丢失。 err = ch.ExchangeDeclare( exchange, // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) if err !=nil{ return err } //failOnError(err, "Failed to declare an exchange") for _,userId :=range userList{ //TODO 这里面你整成json呗 mqSendBody := common.MqSendBodyDown{} mqSendBody.UserId=userId mqSendBody.CouponInfo=*couponInfo mqSendBody.RuleInfo.UseStartAt=UseStartAt mqSendBody.RuleInfo.UseEndAt=UseEndAt body,err := json.Marshal(mqSendBody) if err!=nil{ return err } // 推送消息 err = ch.Publish( exchange, // exchange(交换机名字,跟前面声明对应) "", // 路由参数,fanout类型交换机,自动忽略路由参数,填了也没用。 false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", // 消息内容类型,这里是普通文本 Body: body, // 消息内容 }) if err!=nil{ return err } }
消费者的代码:
// 1. 建立RabbitMQ连接 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err !=nil{ fmt.Println(err) } //failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 2. 创建channel ch, err := conn.Channel() //failOnError(err, "Failed to open a channel") defer ch.Close() // 3. 声明exchange,routing key,queue name exchange := "couponExchangeNew" routingKey := "" //不需要设置routing key queueName := "couponQueueNew" // qname :="couponQueue" err = ch.ExchangeDeclare( exchange, // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) //failOnError(err, "Failed to declare an exchange") // 5. 声明(创建)一个队列 q, err := ch.QueueDeclare( queueName, // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) //failOnError(err, "Failed to declare a queue") // 6. 队列绑定 //name:队列名称 //key:BandingKey,表示要绑定的键。 //exchange:交换器名称 //nowait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用) //args:直接写nil,没研究过,不解释。 err = ch.QueueBind( q.Name, // queue name routingKey, // routing key exchange, // exchange false, nil) msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) //failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever
参考文章: