RocketMQ简单实例搭建
扫描二维码
随时随地手机看文章
安装RocketMQ需要jdk1.6以上, maven,git环境,以上环境自行百度命令安装。
git clone https://github.com/alibaba/RocketMQ.git ##从github上下载RocketMQ开源项目
cd RocketMQ ##进入文件夹
sh install.sh ##开始安装
安装完之后可以看到下图这样:其中可以看到一个符号链接devenv如红框所示
然后
cd devenv/bin ##进入链接的目录下的bin目录
nohup sh mqnamesrv -n "121.42.179.195:9876" & ##配置nameserver,121.42.179.195是本机ip,也就是服务器外网地址
nohup sh mqbroker -n "121.42.179.195:9876" & ##配置broker,121.42.179.195同上
之后
cat nohup.out
在输出的最低端,可以看到红框中的两句话则说明nameserver和broker启动成功。
如果服务器内存不够,你就会启动失败,可以修改runbroker.sh脚本(mqbroker文件中通过runbroker.sh脚本调用Broker的主函数com.alibaba.rocketmq.broker.BrokerStartup启动Broker)的JAVA_OPT参数
vi runbroker.sh
我阿里云内存小,我就改成这样
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m"
2、编写Consumer和Producer测试类
首先需要的jar包如下:
4.0.0
groupId
RocketMQ
1.0-SNAPSHOT
org.apache.maven.plugins
maven-compiler-plugin
1.6
RocketMQTest
http://maven.apache.org
UTF-8
com.alibaba.rocketmq
rocketmq-client
3.0.10
com.alibaba.rocketmq
rocketmq-all
3.0.10
pom
ch.qos.logback
logback-classic
1.1.1
ch.qos.logback
logback-core
1.1.1
junit
junit
4.10
test
Consumer类:
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"rmq-group");
consumer.setNamesrvAddr("121.42.179.195:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("TopicA-test", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
Producer类:
public class Producer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
producer.setNamesrvAddr("121.42.179.195:9876");
producer.setInstanceName("producer");
producer.start();
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000); //每秒发送一次MQ
Message msg = new Message("TopicA-test",// topic
"TagA",// tag
(new Date() + "Hello RocketMQ ,QuickStart" + i)
.getBytes()// body
);
SendResult sendResult = producer.send(msg);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
之后分别启动,Consumer效果如下
这里需要注意,
默认情况下,一台服务器只能启动一个Producer或Consumer实例,所以如果需要在一台服务器启动多个实例,需要设置实例的名称,如要再建一个producer:
producer.setNamesrvAddr(“121.42.179.195:9876”);
producer.setInstanceName(“Producer2”);