`
weigang.gao
  • 浏览: 470323 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

rocket MQ消息队列

 
阅读更多

阿里云开发地址:https://www.aliyun.com

1.阿里云账号:springstudent2016  

2.GitHub 账号:gaoweigang/298gaoweigang_20180123

  注册GitHub使用的邮箱:1245508721@qq.com

3.博客:http://www.aiuxian.com/article/p-1933708.html

          http://blog.csdn.net/xiaojie19871116/article/details/46982907

          http://blog.csdn.net/loongshawn/article/details/51086876

4.rocketmq命令:http://jameswxx.iteye.com/blog/2091971

5.linux命令大全:http://man.linuxde.net/sh

6.分布式消息队列RocketMQ部署与监控:https://my.oschina.net/boltwu/blog/472905

7.rocketmq 消息队列的顺序性问题:https://my.oschina.net/u/1589819/blog/787823

一:RocketMQ消息队列环境搭建

http://blog.csdn.net/loongshawn/article/details/51086876

注意:每次在启动Broker之前需要指定nameserver地址(或者将nameserver地址配置到环境变量之中),其中10.125.1.186为所在服务器IP,eg:export NAMESRV_ADDR=10.125.1.186:9876

 

二:测试RocketMQ消息队列

①创建Maven项目目录结构如下:

②pom文件依赖配置


 

③创建生产者

package com.alibaba.rocketmq.producer;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

public class Producer {
	
	
	//使用你的账号构建一个客户端实例来访问DefaultMQProducer
	private static DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
	private static int initialState = 0;
	
	private Producer(){
		
	}
	
	public static DefaultMQProducer getDefaultMQProducer(){
		if(producer == null){
			producer = new DefaultMQProducer("ProducerGroupName");
		}
		if(initialState == 0){
			producer.setNamesrvAddr("10.224.102.101:9876");//RocketMQ服务的地址
			try{
				producer.start();
			} catch(MQClientException e){
				e.printStackTrace();
			}
			initialState = 1;
		}
		
		return producer;
	}

}

 

④创建消费者

package com.alibaba.rocketmq.consumer;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

public class Consumer {
	
	private static DefaultMQPushConsumer  consumer = new DefaultMQPushConsumer("ConsumerGroupName");
    private static int initialState = 0;
    
    private Consumer(){
    	
    }
    
    public static DefaultMQPushConsumer  getDefaultMQPushConsumer(){
    	if(consumer == null){
    		consumer = new DefaultMQPushConsumer("ConsumerGroupName");
    	}
    	
    	if(initialState == 0){
    		consumer.setNamesrvAddr("10.224.102.101:9876");//RocketMQ服务的地址
    		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    		initialState = 1;
    	}
    	return consumer;
    }
}

⑤生产者生产消息

package com.alibaba.rocketmq.service;

import org.apache.log4j.Logger;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.producer.Producer;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

public class ProducerTtest {
	
	private static final Logger LOGGER = Logger.getLogger(ProducerTtest.class);
	
	public static void main(String[] args) {
		sendMsg();
	}
	
	//生产者发送消息
	public static void sendMsg(){
		//获取消息生产者
		DefaultMQProducer producer = Producer.getDefaultMQProducer();
		
		for(int i = 0; i < 2000 ;i++){
			Message msg = new Message("TopicTest1",   //topic
					                  "TagA",         //tag
					                  "OrderIDOO"+i,  //key
					                  ("Hello MetaQ"+i).getBytes()); //body
			
			SendResult sendResult;
			try {
				sendResult = producer.send(msg);
			} catch (MQClientException e) {
				e.printStackTrace();
			} catch (RemotingException e) {
				e.printStackTrace();
			} catch (MQBrokerException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		producer.shutdown();
	}

}

 

⑥消费者消费消息

package com.alibaba.rocketmq.service;

import java.util.List;

import org.apache.log4j.Logger;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.consumer.Consumer;

public class ConsumerTest {

	private static final Logger LOGGER = Logger.getLogger(ConsumerTest.class);
	
	public static void main(String[] args) {
		receiveMsg();
	}

	// 消费者接受消息
	public static void receiveMsg() {
		// 获取消息消费者
		DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer();

		// 订阅主题
		try {
			consumer.subscribe("TopicTest1", "*");
			consumer.setConsumerGroup("gaoweigang");//设置消费组
			consumer.registerMessageListener(new MessageListenerConcurrently() {

				/**
				 * 默认msgs里只有一条消息,可以通过设置consumerMessageBatchMaxSize参数来批量接受消息
				 */
				public ConsumeConcurrentlyStatus consumeMessage(
						List<MessageExt> msgs,
						ConsumeConcurrentlyContext context) {

					LOGGER.info(Thread.currentThread().getName()+" , Receive new Messages: "+msgs.size());
					MessageExt msg = msgs.get(0);

					if (msg.getTopic().equals("TopicTest1")) {
						// 执行TopicTest1的消费逻辑
						if (msg.getTags() != null
								&& msg.getTags().equals("TagA")) {
							// 执行TagA的消费
							LOGGER.info(new String(msg.getBody()));
						} else if (msg.getTags() != null
								&& msg.getTags().equals("TagB")) {
							// 执行TagB的消费
						} else if (msg.getTags() != null
								&& msg.getTags().equals("TagC")) {
							// 执行TagC的消费
						}
					} else if (msg.getTopic().equals("TopicTest2")) {
						// 执行TopicTest2的消费逻辑
					}

					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});
			// Consumer对象在使用之前必须要调用start
			consumer.start();

		} catch (MQClientException e) {

			e.printStackTrace();
		}
	}

}

⑦执行ProducerTest,然后使用如下命令查看指定主题中的数据



RocketMQ命令:


用法:


 

 

  • 大小: 1.5 KB
  • 大小: 20.4 KB
  • 大小: 7.9 KB
  • 大小: 26.5 KB
  • 大小: 15.5 KB
分享到:
评论

相关推荐

    rocket mq 视频(龙果)

    这个视频是龙果 的rocket mq视频,讲的非常不错。分为上下两个系列。直接用txt 打开后。里边是百度云资源

    阿里云rocketmq消息队列对接demo

    该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。

    Rocket MQ 使用排查指南1

    1. 用户发起海量秒杀请求到秒杀业务处理系统 2. 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 3. 下游的通知系统订阅消息队列 Rocke

    商业vs开源MQ对比

    市场上目前主流的消息中间件有IBM MQ、WebLogic JMS、ActiveMQ、Rabbit MQ、Rocket MQ、Apollo等。他们的功能性差异往往成为企业在中间件选型过程中不得不考虑的因素,通过本文档,你可以详细了解商业MQ和开源MQ之间...

    基于海量数据的消息队列的性能对比与优化方案 (2016年)

    在各大技术公司的团队中,使用最频繁的三种消息队列分别是:Apache Kafka,阿里巴巴的 Rocket-MQ,Rabbit-MQ。本文首先先介绍了上述三个分布式消息队列的基本概念、架构特性以及实现原理,基于这些然后分别简单介绍它们的...

    RocketMQ使用指南.pdf

    消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟、高 并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 版既可为分布式 应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所...

    rocketmq-all-4.4.0-source-release.zip

    RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。目前已经捐赠给Apache基金会,并于2016年11月成为 Apache 孵化项目。具有以下特点: 能够...

    RocketMQ安装包

    RocketMQ是一个纯java、分布式、队列模型的开源消息中间件,前身是Metaq,当 Metaq 3.0发布时,产品名称改为 RocketMQ。 具有以下特点: 1、能够保证严格的消息顺序 2、提供丰富的消息拉取模式 3、高效的订阅者水平...

    RocketMQ原理分析

    对RocketMQ队列消息进行了一个整体分析,内容还算可以吧

    RocketMq源码学习过程中的总结资料

    1:Mq的ConsumerMode模式 2:相关名词 3:rocketmq怎么保证队列完全顺序...6:Rocket MQ 消息过滤是发生在服务端还是客户端? 7:为什么基于表达式 tag 会在客户端再进行一次过滤 8:Rocketmq中的单向(oneWay)机制

    阿里巴巴rocketmq-4.9.2-bin-release.zip

    rocket是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性: 1、支持发布/订阅和点对点消息模型 2、在同一队列中有严格的顺序传递(FIFO) 3、支持pull和push两种消息模式 4、单一队列百万级消息...

    csx-bsf-all:yh-csx中台供应链支撑架构,包含监控、日志、检索、配置中心、注册中心、调度中心、消息队列、缓存、分库等等

    集成并封装Apollo,Rocket MQ,Redis, Elastic Search,ELK,XXLJOB, Sharding JDBC,Cat,Eureka,七牛云等第三方中间件,提供简易使用的底层框架。 愿景 为了更好地支持业务开发,让开发人员从中间件中解放出来,专注业务...

    rocketmq控制台jar和启动指令

    rocketmq控制台jar和启动指令,用来图形化查看mq队列和消息,启动指令:nohup java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=192.168.100.62:28881 & 或者 nohup ...

Global site tag (gtag.js) - Google Analytics