消息队列apache rocketmq4.2入门(二)

原创 2018-03-09 11:12 阅读(1443)次

上一篇我们讲了apache rocketmq4.2的安装,这是目前最新版本了,我本着要研究他的事务消息,却发现apache官方文档里都没有这个,百度了下好像大家都在说事务消息还没有真开源,好失望,但我下载的4.2的官方示例却有事务消息的示例代码,真不知道官方要干嘛。。好了话不多说,我看看其他功能代码吧,我也是根据官方示例弄下来,这里分享下,直接上代码了,其他不说了:

Maven:

<!-- rocketmq -->
<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-client</artifactId>
	<version>4.2.0</version>
</dependency>
MQProducer:

package com.cloud.ali.mq;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class MQProducer {

	public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
        
        producer.start();

        for (int i = 0; i < 10000000; i++)
            try {
                
        		//keys:消息体的唯一标识
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}
MQPushConsumer:

package com.cloud.ali.mq;

import java.io.UnsupportedEncodingException;
import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class MQPushConsumer {

	public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
        consumer.subscribe("TopicTest", "*");
        /** 
         *  ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费 
         *  ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET 一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费 
         *  ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST  @Deprecated 
         *  ConsumeFromWhere.CONSUME_FROM_MAX_OFFSET   @Deprecated 
         *  ConsumeFromWhere.CONSUME_FROM_MIN_OFFSET  @Deprecated 
         *  ConsumeFromWhere.CONSUME_FROM_TIMESTAMP    一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费, 
         *                                            时间点设置参见DefaultMQPushConsumer.consumeTimestamp参数 
         *   
         *  MessageModel.CLUSTERING  集群消费,一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个 
                                     Consumer Group 有 3 个实例(可能是 3 个迕程,戒者 3 台机器),那举每个实例只消费其中的 3 条消息 
         *  MessageModel.BROADCASTING  广播 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);  
        //wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20170422221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            	 /** 
                 * List<MessageExt> msgs  这里虽然是list 但实际上基本都只会是一条,除非消息堆积, 
                 * 且记住一定是先启动消费者,再启动生产者 
                 * 否则极有可能导致消息的重复消费 
                 */  
                for(MessageExt mext : msgs) {  
                    try {  
                        System.out.println("消费了一条消息:"+new String(mext.getBody(),"utf-8"));    
                    } catch (UnsupportedEncodingException e) {  
                        // TODO Auto-generated catch block  
                        e.printStackTrace();  
                        //消费失败告诉mq重新发送继续消费  如果多次消费仍不成功可以记录在数据库中,可以通过mext.getReconsumeTimes()获取消费次数  
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
                    }   
                }  
                // 告诉mq消费成功  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
收发消息还是可以的,但本人对不能实现事务消息感到失望,只能等apache接下来的版本会不会实现了。

如何安装rocketMQ请参考上一篇文章:消息队列apache rocketmq4.2入门安装