maven
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
producer
public class SyncProducer {
public static void main(String[] args) throws Exception{
//注意maven引入的版本要和服务器的版本一致
DefaultMQProducer producer = new DefaultMQProducer("test_sync_group");
//我的虚拟机IP地址10.8.80.87
producer.setNamesrvAddr("10.8.80.87:9876");
producer.start();
for (int i=0; i<20; i++){
//String topic, String tags, byte[] body
Message message = new Message("topicTest01","TagA",("hello rocketMq"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
consumer
public class Consumer {
public static void main(String[] args) throws MQClientException {
//指定组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_sync_group");
//指定服务器
consumer.setNamesrvAddr("10.8.80.87:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("topicTest01","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//运行消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}