这篇文章给大家介绍RocketMQ如何在java中使用,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
创新互联建站专注为客户提供全方位的互联网综合服务,包含不限于网站制作、成都做网站、麻山网络推广、成都小程序开发、麻山网络营销、麻山企业策划、麻山品牌公关、搜索引擎seo、人物专访、企业宣传片、企业代运营等,从售前售中售后,我们都将竭诚为您服务,您的肯定,是我们最大的嘉奖;创新互联建站为所有大学生创业者提供麻山建站搭建服务,24小时服务热线:13518219792,官方网址:www.cdcxhl.com
如何使用
1、引入 rocketmq-client
org.apache.rocketmq rocketmq-client 4.1.0-incubating
2、编写Producer
DefaultMQProducer producer = new DefaultMQProducer("producer_demo"); //指定NameServer地址 producer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改为自己的 /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可 * 注意:切记不可以在每次发送消息时,都调用start方法 */ producer.start(); for (int i = 0; i < 997892; i++) { try { //构建消息 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); //发送同步消息 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown();
3、编写Consumer
/** * Consumer Group,非常重要的概念,后续会慢慢补充 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo"); //指定NameServer地址,多个地址以 ; 隔开 consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改为自己的 /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { try { for(MessageExt msg:msgs){ String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(" MessageBody: "+ msgbody);//输出消息内容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功 } }); consumer.start(); System.out.printf("Consumer Started.%n");
4、说明
各位根据自己的环境,修改NamesrvAddr的值,我的集群请参考:RocketMQ集群部署配置。稍后通过RocketMQ管控台就可以看到之前搭建的多Master多Slave模式,异步复制集群模式。
5、通过RocketMQ管控台
rocketmq-console-ng获取方式为:rocketmq-console-ng,之后通过mavne进行编译获取jar,命令如下:
mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-console-ng-1.0.0.jar
得到rocketmq-console-ng-1.0.0.jar之后,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties文件,根据自己的NamesrvAddr进行修改rocketmq.config.namesrvAddr的值。
直接启动:
java -jar rocketmq-console-ng-1.0.0.jar
管控台是基于springboot的,的确springboot非常方便和非常火了,所以有必要去学习下springboot了(其实还是spring系列,所以spring也必要深入学习下),稍后通过管控台进行观察运行。
6、运行观察
一个好的习惯是先运行Consumer,之后在运行Producer,之后通过rocketmq-console-ng管控台观察
运行完成之后,的确broker-a的数据加上broker-b的数据量就等于我们发送的数据量,而且slave的数量也master的数量也是一致的,效果如下:
查看发送这些数据,2台机器的磁盘情况如下:
关于RocketMQ如何在java中使用就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。