mq是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
MQ的作用: 异步,解耦,削峰
支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
支持拉(pull)和推(push)两种消息模式
单一队列百万消息的堆积能力
支持多种消息协议,如 JMS、MQTT 等
分布式高可用的部署架构,满足至少一次消息传递语义
提供 docker 镜像用于隔离测试和云集群部署
提供配置、指标和监控等功能丰富的 Dashboard
Producer,消息生产者,生产者的作用就是将消息发送到 MQ
Producer Group,生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组
Consumer,消息消费者,简单来说,消费 MQ 上的消息的应用程序就是消费者
Consumer Group,消费者组,和生产者类似,消费同一类消息的多个 consumer 实例组成一个消费者组。
Topic 是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。
Message 是消息的载体。相当于寄信的地址。Message 还有一个可选的 tag 设置,以便消费端可以基于 tag 进行过滤消息,
标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。
Broker 是 RocketMQ 系统的主要角色,接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。
Name Server 为 producer 和 consumer 提供路由信息。
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new
DefaultMQProducer("group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicTest" ,
"TagA" ,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}