官方下载
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.2/rocketmq-all-4.9.2-source-release.zip
unzip rocketmq-all-4.9.2-source-release.zip
cd rocketmq-all-4.9.2/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.9.2/rocketmq-4.9.2
修改配置文件conf/broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1=服务器IP
namesevAddr=服务器IP:9876
cd /bin
nohup sh mqnamesrv > /home/rocketmq/logs/mqnamesrv.log 2>&1 &
nohup sh mqbroker -n 服务器IP:9876 -c ../conf/broker.conf
package com.wei.aeo.demo01;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketSyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("sync");
defaultMQProducer.setNamesrvAddr("服务器IP:9876");
defaultMQProducer.start();
for (int i = 0; i < 100; i++) {
// 创建一个信息体,指定主题,表情,和信息内容
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = defaultMQProducer.send(msg);
System.out.printf("%s%n", sendResult);
}
defaultMQProducer.shutdown();
}
}
package com.wei.aeo.demo01;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
public class RocketConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer");
consumer.setNamesrvAddr("82.156.168.244:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}