packageRabbitMQJmeter;importjava.io.IOException;importjava.util.concurrent.TimeoutException;importorg.apache.jmeter.config.Arguments;importorg.apache.jmeter.protocol.java.sampler.JavaSamplerContext;importorg.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;importorg.apache.jmeter.samplers.SampleResult;importcom.alibaba.fastjson.JSONObject;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;public class LCS2Logic extendsAbstractJavaSamplerClient {//定义rabbitmq服务器的配置信息
public staticString MQ_serverip;public staticString MQ_serverport;public staticString MQ_serveruser;public staticString MQ_serveruserpwd;public staticString MQ_queue;public staticString MQ_exchange;public staticString MQ_rkey;//定义消息发送次数
public staticString send_count;//定义mq的队列消息ec信息
public staticString ec_sessionId;public staticString ec_ecid;public staticString ec_routInfo;public staticString ec_priority;public staticString ec_groupId;//定义mq的队列消息ec msg信息
public staticString msg_templateId;public staticString msg_templateContent;public staticString msg_content;public staticString msg_mobiles;public staticString msg_sign;public staticString msg_extendCode;public staticString msg_needMo;// publicConnectionFactory factory;publicConnection conn;publicChannel channel;//设置可用参数及的默认值;
@OverridepublicArguments getDefaultParameters() {
Arguments params= newArguments();
params.addArgument("MQ_serverip", "192.168.32.204");
params.addArgument("MQ_serverport", "5672");
params.addArgument("MQ_serveruser", "mas");
params.addArgument("MQ_serveruserpwd", "mas");
params.addArgument("MQ_queue", "downQueue.queue2");
params.addArgument("MQ_exchange", "downQueue.queue2");
params.addArgument("MQ_rkey", "downQueue.queue2");
params.addArgument("send_count", "1");
params.addArgument("ec_sessionId", "3");
params.addArgument("ec_ecid", "EC-1");
params.addArgument("ec_routInfo", "127.0.0.1");
params.addArgument("ec_priority", "1");
params.addArgument("ec_groupId", "3");
params.addArgument("msg_templateId", "");
params.addArgument("msg_templateContent", "");
params.addArgument("msg_content", "test message jmeter");
params.addArgument("msg_mobiles", "13525102870,13671381066,18827103637");
params.addArgument("msg_sign", "ABVFGH1");
params.addArgument("msg_extendCode", "afsd");
params.addArgument("msg_needMo", "1");returnparams;
}//每个线程测试前执行一次,做一些初始化工作;
@Overridepublic voidsetupTest(JavaSamplerContext arg0) {
MQ_serveruser= arg0.getParameter("MQ_serveruser", "");
MQ_serveruserpwd= arg0.getParameter("MQ_serveruserpwd", "");
MQ_serverip= arg0.getParameter("MQ_serverip", "");
MQ_serverport= arg0.getParameter("MQ_serverport", "");
MQ_queue= arg0.getParameter("MQ_queue", "");
MQ_exchange= arg0.getParameter("MQ_exchange", "");
MQ_rkey= arg0.getParameter("MQ_rkey", "");int server_port =Integer.parseInt(MQ_serverport);
factory= newConnectionFactory();
factory.setUsername(MQ_serveruser);
factory.setPassword(MQ_serveruserpwd);
factory.setHost(MQ_serverip);
factory.setPort(server_port);//Connection conn;
try{
conn=factory.newConnection();
channel=conn.createChannel();
channel.exchangeDeclare(MQ_exchange,"direct", true);//EXCHANGE//定义交换机
String queueName = MQ_queue;//message-queue得到消息队列
channel.queueBind(queueName, MQ_exchange, MQ_rkey);//route-bind//定义类似路由器的东西路由交换机VS队列
} catch(IOException e) {
e.printStackTrace();
}catch(TimeoutException e) {
e.printStackTrace();
}//long start = System.currentTimeMillis();//System.out.println("start:" + start);
}//开始测试,从arg0参数可以获得参数值;
@OverridepublicSampleResult runTest(JavaSamplerContext arg0) {
send_count= arg0.getParameter("send_count", "");
ec_sessionId= arg0.getParameter("ec_sessionId", "");
ec_ecid= arg0.getParameter("ec_ecid", "");
ec_routInfo= arg0.getParameter("ec_routInfo", "");
ec_priority= arg0.getParameter("ec_priority", "");
ec_groupId= arg0.getParameter("ec_groupId", "");
msg_templateId= arg0.getParameter("msg_templateId", "");
msg_templateContent= arg0.getParameter("msg_templateContent", "");
msg_content= arg0.getParameter("msg_content", "");
msg_mobiles= arg0.getParameter("msg_mobiles", "");
msg_sign= arg0.getParameter("msg_sign", "");
msg_extendCode= arg0.getParameter("msg_extendCode", "");
msg_needMo= arg0.getParameter("msg_needMo", "");//消息内容格式
int sessionId =Integer.parseInt(ec_sessionId);
String ecId=ec_ecid;
String routInfo=ec_routInfo;int priority =Integer.parseInt(ec_priority);int groupId =Integer.parseInt(ec_groupId);
String templateId=msg_templateId;
String[] templateContent= msg_templateContent.split(",");
String content=msg_content;
String[] mobiles= msg_mobiles.split(",");
String sign=msg_sign;
String extendCode=msg_extendCode;int needMo =Integer.parseInt(msg_needMo);//转换成json格式
JSONObject jsonObject = newJSONObject();
JSONObject msgObject= newJSONObject();
jsonObject.put("sessionId", sessionId);
jsonObject.put("ecId", ecId);
jsonObject.put("routInfo", routInfo);
jsonObject.put("priority", priority);
jsonObject.put("groupId", groupId);// msgObject.put("templateId", templateId);
msgObject.put("templateContent", templateContent);
msgObject.put("content", content);
msgObject.put("mobiles", mobiles);
msgObject.put("sign", sign);
msgObject.put("extendCode", extendCode);
msgObject.put("needMo", needMo);
jsonObject.put("msg", msgObject);// String messageBody =jsonObject.toString();byte[] messageBodyBytes =messageBody.getBytes();//长连接服务器发送过来的消息头,12个字节,
byte[] mqHeadBytes = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};byte[] messageBytes = new byte[mqHeadBytes.length +messageBodyBytes.length];
System.arraycopy(mqHeadBytes,0, messageBytes, 0, mqHeadBytes.length);
System.arraycopy(messageBodyBytes,0, messageBytes, mqHeadBytes.length, messageBodyBytes.length);//打印输出调试//System.out.println("生产者:" + messageBody + " in thread:" +//Thread.currentThread().getName());
int count =Integer.parseInt(send_count);///
SampleResult sr = newSampleResult();
sr.setSampleLabel("JavaTest_RabbitMQ_MSG");try{//jmeter开始统计响应时间标记
sr.sampleStart();//publish / sub 生产者的作用就是将消息推送到消息队列里面去 实现类似于publish的功能
for (int i = 0; i < count; i++) {
channel.basicPublish(MQ_exchange, MQ_rkey,null, messageBytes);
}//sr.isSuccessful();
sr.setSuccessful(true);
}catch(IOException e) {
sr.setSuccessful(false);
e.printStackTrace();
}finally{
sr.sampleEnd();//jmeter 结束统计响应时间标记
}returnsr;
}//测试结束时调用;
@Overridepublic voidteardownTest(JavaSamplerContext arg0) {try{
channel.close();
conn.close();//long end = System.currentTimeMillis();//System.out.println("end:" + end);
} catch(IOException e) {
e.printStackTrace();
}catch(TimeoutException e) {
e.printStackTrace();
}
}
}