mq是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
MQ的作用: 异步,解耦,削峰
支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
支持拉(pull)和推(push)两种消息模式
单一队列百万消息的堆积能力
支持多种消息协议,如 JMS、MQTT 等
分布式高可用的部署架构,满足至少一次消息传递语义
提供 docker 镜像用于隔离测试和云集群部署
提供配置、指标和监控等功能丰富的 Dashboard

mq_queue

mq_topic

Producer,消息生产者,生产者的作用就是将消息发送到 MQ
Producer Group,生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组
Consumer,消息消费者,简单来说,消费 MQ 上的消息的应用程序就是消费者
Consumer Group,消费者组,和生产者类似,消费同一类消息的多个 consumer 实例组成一个消费者组。
Topic 是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。
Message 是消息的载体。相当于寄信的地址。Message 还有一个可选的 tag 设置,以便消费端可以基于 tag 进行过滤消息,
标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。
Broker 是 RocketMQ 系统的主要角色,接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。
Name Server 为 producer 和 consumer 提供路由信息。
public static void main(String[] args) throws Exception {
       DefaultMQProducer producer = new
           DefaultMQProducer("group_name");
       producer.setNamesrvAddr("localhost:9876");
       producer.start();
       for (int i = 0; i < 100; i++) {
           Message msg = new Message("TopicTest",
               "TagA",
               ("Hello RocketMQ " +
                   i).getBytes(RemotingHelper.DEFAULT_CHARSET) 
           );
           SendResult sendResult = producer.send(msg);
           System.out.printf("%s%n", sendResult);
       }
       producer.shutdown();
   }	public static void main(String[] args) throws Exception {
       DefaultMQProducer producer = new
           DefaultMQProducer("please_rename_unique_group_name");
       producer.setNamesrvAddr("localhost:9876");
       producer.start();
       for (int i = 0; i < 100; i++) {
           Message msg = new Message("TopicTest" /* Topic */,
               "TagA" /* Tag */,
               ("Hello RocketMQ " +
                   i).getBytes(RemotingHelper.DEFAULT_CHARSET) 
           );
           SendResult sendResult = producer.send(msg);
           System.out.printf("%s%n", sendResult);
       }
       producer.shutdown();
   }
public static void main(String[] args) throws InterruptedException, MQClientException {

       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
       consumer.setNamesrvAddr("localhost:9876");
       consumer.subscribe("TopicTest", "*");
       consumer.registerMessageListener(new MessageListenerConcurrently() {

           @Override
           public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
               ConsumeConcurrentlyContext context) {
               System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
       });
       consumer.start();
       System.out.printf("Consumer Started.%n");
   }