几种常用的MQ

特性RocketMQKafkaRabbitMQActiveMQ
单机吞吐量10 万级,支撑高吞吐10 万级以上,甚至有文献称,可以达到单机百万级TPS。万级,同ActiveMQ万级,相对其他MQ较低。
topic 数量对吞吐量的影响topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topictopic从几十到几百个时候,吞吐量会大幅度下降,所以请不要给Kafka设计过多的topic,需要更多的机器资源支撑大规模的 topictopic 数量增多,吞吐量会下降topic 数量增多,吞吐量会下降
时效性ms 级延迟在 ms 级以内微秒级,延迟最低RabbitMQ 的一大特点ms 级
可用性非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用同 ActiveMQ高,基于主从架构实现高可用
消息可靠性经过参数优化配置,可以做到 0 丢失。支持事务同 RocketMQ。支持事务基本不丢有较低的概率丢失数据
消息顺序性分区内消息有序分区内消息有序队列的消息有序队列消息有序,topic不保证。
消息延时5.0开始支持,定时消息插件支持插件支持支持,Scheduled Message
功能支持MQ 功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用基于 erlang 开发,并发能力很强,性能极好,延时很低MQ 领域的功能极其完备
资料文档少。没有专门写rocketmq的书,网上的资料良莠不齐,官方文档很简洁,但是对技术细节没有过多的描述中,有kafka作者自己写的书,网上资料也有一些多。有一些不错的书,网上资料多多。没有专门写activemq的书,网上资料多
开发语言javaScala+JavaErlangjava
支持协议自定义自定义(基于TCP)AMQPOpenWire、STOMP、REST、XMPP、AMQP
消息存储磁盘。支持大量堆积内存、磁盘、数据库。支持大量堆积内存、磁盘。支持少量堆积
集群方式常用多对’Master-Slave’ 模式,开源版本需手动切换Slave变成Master天然的‘Leader-Slave’无状态集群,每台服务器既是Master也是Slave支持简单集群,’复制’模式,对高级集群模式支持不好支持简单集群模式,比如’主-备’,对高级集群模式支持不好
系统场景电商系统,金融系统,物流系统大数据处理平台(如 Hadoop、Spark) 流处理平台(如 Flink、Storm) 日志收集系统(如 ELK)网站通知系统 任务队列系统 微服务通信系统传统企业应用(如 ERP、CRM) JMS 兼容系统
常用MQ比较

RocketMQ

安装参考 https://kunyuan.tech/archives/208

使用参考 https://kunyuan.tech/archives/358

Kafka

安装参考,这里还是使用docker进行安装,所需要的docker镜像在这里可以找到。

通过网盘分享的文件:docker-images
链接: https://pan.baidu.com/s/1qyidkCeGg0O_3odSG5jstA 提取码: sduq

1、在目录下创建一个compose.yml 文件,配置以下内容:

version : '3.8'
networks:
  kafka-networks:
    driver: bridge
services:
  zookeeper:
    container_name: zookeeper
    image: zookeeper:latest
    restart: always
    ports:
      - "2181:2181"
      - "8081:8080"
    environment:
      - TZ=Asia/Shanghai
    volumes:
      - ./data:/data
      - ./logs:/logs
      - /etc/localtime:/etc/localtime
    networks:
      - kafka-networks

  kafka:
    container_name: kafka
    image: kafka:latest
    restart: always
    ports:
      - "9092:9092"
    depends_on: 
      - zookeeper
    links:
      - "zookeeper:zookeeper"

    environment:
      - TZ=Asia/Shanghai
      - KAFKA_BROKER_ID=1
      - KAFKA_LISTENERS=PLAINTEXT://:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 #此IP为服务器对外的IP地址
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    networks:
      - kafka-networks

2、创建一个data和一个logs目录,用作zookeeper 映射目录。

3、在上面分享百度网盘中,找到zookeeper和kafka镜像包,上传到服务器并load进行。

没安装docker和docker-compose的,可以参考 https://kunyuan.tech/archives/1287

4、使用docker-compose up -d 启动

5、python调用示例

先安装 kafka-python。

pip install kafka-python

python 3.12 版本安装

pip install git+https://github.com/dpkp/kafka-python.git

消费者


from kafka import KafkaConsumer

consumer = KafkaConsumer('test', bootstrap_servers='127.0.0.1:9092')

# 读取消息
for message in consumer:
    print(f"Received message: {message.value.decode()}")

生产者

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')

# 发送消息
producer.send('test', b'Hello, Kafka!')
producer.flush()  # 确保所有消息被发送
测试

6、JAVA调用示例

先引入 spring-kafka 合适的版本


<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>3.2.0</version>
</dependency>

消费者


/**
 * Description: kafka消费
 * Author: ky
 * DateTime: 2024-12-11 12:04
 */
@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"test"})
    public void consumer(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            log.info("----------------- record =" + record);
            log.info("------------------ message =" + message);
        }
    }
}

生产者

@Autowired
private KafkaTemplate kafkaTemplate;

public void send(String topic){
    kafkaTemplate.send(topic,"hello kafka");
}

8、php调用示例

首先安装kafka-php

composer require nmred/kafka-php

消费者

<?php
require 'vendor/autoload.php';
 
use Kafka\ConsumerConfig;
use Kafka\Consumer;

$config = ConsumerConfig::getInstance();
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('test-a');
$config->setTopics(['test']);
echo 'start...';
 
$consumer = new Consumer();
$consumer->start(function($topic, $part, $message) {
    //var_dump($message);
    echo 'get message';
    print_r($message);
});

生产者

<?php
require 'vendor/autoload.php';

use Kafka\Producer;
use Kafka\ProducerConfig;
 
$config = ProducerConfig::getInstance();
$config->setMetadataBrokerList('127.0.0.1:9092');
 
$producer = new Producer();
$producer->send([
    [
        'topic' => 'test',
        'value' => 'php test message',
        'key' => 'test-key',
    ],
]);

RabbitMQ

ActiveMQ

This entry was posted in 应用. Bookmark the permalink.