当前位置: 首页 > 工具软件 > OFTest > 使用案例 >

MQClientException: No route info of this topic, topicTest01的原因

濮景龙
2023-12-01

maven

<dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.4.0</version>
    </dependency>

producer

public class SyncProducer {
    public static void main(String[] args) throws Exception{

        //注意maven引入的版本要和服务器的版本一致
        DefaultMQProducer producer = new DefaultMQProducer("test_sync_group");
        //我的虚拟机IP地址10.8.80.87
        producer.setNamesrvAddr("10.8.80.87:9876");
        producer.start();
        for (int i=0; i<20; i++){
            //String topic, String tags, byte[] body
            Message message = new Message("topicTest01","TagA",("hello rocketMq"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(message);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

consumer

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //指定组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_sync_group");
        //指定服务器
        consumer.setNamesrvAddr("10.8.80.87:9876");
        // Subscribe one more more topics to consume.
        consumer.subscribe("topicTest01","*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //运行消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
 类似资料: