特性 | RocketMQ | Kafka | RabbitMQ | ActiveMQ |
单机吞吐量 | 10 万级,支撑高吞吐 | 10 万级以上,甚至有文献称,可以达到单机百万级TPS。 | 万级,同ActiveMQ | 万级,相对其他MQ较低。 |
topic 数量对吞吐量的影响 | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic | topic从几十到几百个时候,吞吐量会大幅度下降,所以请不要给Kafka设计过多的topic,需要更多的机器资源支撑大规模的 topic | topic 数量增多,吞吐量会下降 | topic 数量增多,吞吐量会下降 |
时效性 | ms 级 | 延迟在 ms 级以内 | 微秒级,延迟最低RabbitMQ 的一大特点 | ms 级 |
可用性 | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 | 同 ActiveMQ | 高,基于主从架构实现高可用 |
消息可靠性 | 经过参数优化配置,可以做到 0 丢失。支持事务 | 同 RocketMQ。支持事务 | 基本不丢 | 有较低的概率丢失数据 |
消息顺序性 | 分区内消息有序 | 分区内消息有序 | 队列的消息有序 | 队列消息有序,topic不保证。 |
消息延时 | 5.0开始支持,定时消息 | 插件支持 | 插件支持 | 支持,Scheduled Message |
功能支持 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 领域的功能极其完备 |
资料文档 | 少。没有专门写rocketmq的书,网上的资料良莠不齐,官方文档很简洁,但是对技术细节没有过多的描述 | 中,有kafka作者自己写的书,网上资料也有一些 | 多。有一些不错的书,网上资料多 | 多。没有专门写activemq的书,网上资料多 |
开发语言 | java | Scala+Java | Erlang | java |
支持协议 | 自定义 | 自定义(基于TCP) | AMQP | OpenWire、STOMP、REST、XMPP、AMQP |
消息存储 | 磁盘。支持大量堆积 | 内存、磁盘、数据库。支持大量堆积 | 内存、磁盘。支持少量堆积 | |
集群方式 | 常用多对’Master-Slave’ 模式,开源版本需手动切换Slave变成Master | 天然的‘Leader-Slave’无状态集群,每台服务器既是Master也是Slave | 支持简单集群,’复制’模式,对高级集群模式支持不好 | 支持简单集群模式,比如’主-备’,对高级集群模式支持不好 |
系统场景 | 电商系统,金融系统,物流系统 | 大数据处理平台(如 Hadoop、Spark) 流处理平台(如 Flink、Storm) 日志收集系统(如 ELK) | 网站通知系统 任务队列系统 微服务通信系统 | 传统企业应用(如 ERP、CRM) JMS 兼容系统 |
RocketMQ
安装参考 https://kunyuan.tech/archives/208
使用参考 https://kunyuan.tech/archives/358
Kafka
安装参考,这里还是使用docker进行安装,所需要的docker镜像在这里可以找到。
通过网盘分享的文件:docker-images
链接: https://pan.baidu.com/s/1qyidkCeGg0O_3odSG5jstA 提取码: sduq
1、在目录下创建一个compose.yml 文件,配置以下内容:
version : '3.8'
networks:
kafka-networks:
driver: bridge
services:
zookeeper:
container_name: zookeeper
image: zookeeper:latest
restart: always
ports:
- "2181:2181"
- "8081:8080"
environment:
- TZ=Asia/Shanghai
volumes:
- ./data:/data
- ./logs:/logs
- /etc/localtime:/etc/localtime
networks:
- kafka-networks
kafka:
container_name: kafka
image: kafka:latest
restart: always
ports:
- "9092:9092"
depends_on:
- zookeeper
links:
- "zookeeper:zookeeper"
environment:
- TZ=Asia/Shanghai
- KAFKA_BROKER_ID=1
- KAFKA_LISTENERS=PLAINTEXT://:9092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 #此IP为服务器对外的IP地址
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
networks:
- kafka-networks
2、创建一个data和一个logs目录,用作zookeeper 映射目录。
3、在上面分享百度网盘中,找到zookeeper和kafka镜像包,上传到服务器并load进行。
没安装docker和docker-compose的,可以参考 https://kunyuan.tech/archives/1287
4、使用docker-compose up -d 启动
5、python调用示例
先安装 kafka-python。
pip install kafka-python
python 3.12 版本安装
pip install git+https://github.com/dpkp/kafka-python.git
消费者
from kafka import KafkaConsumer
consumer = KafkaConsumer('test', bootstrap_servers='127.0.0.1:9092')
# 读取消息
for message in consumer:
print(f"Received message: {message.value.decode()}")
生产者
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
# 发送消息
producer.send('test', b'Hello, Kafka!')
producer.flush() # 确保所有消息被发送
6、JAVA调用示例
先引入 spring-kafka 合适的版本
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.2.0</version>
</dependency>
消费者
/**
* Description: kafka消费
* Author: ky
* DateTime: 2024-12-11 12:04
*/
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void consumer(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}
}
}
生产者
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(String topic){
kafkaTemplate.send(topic,"hello kafka");
}
8、php调用示例
首先安装kafka-php
composer require nmred/kafka-php
消费者
<?php
require 'vendor/autoload.php';
use Kafka\ConsumerConfig;
use Kafka\Consumer;
$config = ConsumerConfig::getInstance();
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('test-a');
$config->setTopics(['test']);
echo 'start...';
$consumer = new Consumer();
$consumer->start(function($topic, $part, $message) {
//var_dump($message);
echo 'get message';
print_r($message);
});
生产者
<?php
require 'vendor/autoload.php';
use Kafka\Producer;
use Kafka\ProducerConfig;
$config = ProducerConfig::getInstance();
$config->setMetadataBrokerList('127.0.0.1:9092');
$producer = new Producer();
$producer->send([
[
'topic' => 'test',
'value' => 'php test message',
'key' => 'test-key',
],
]);
RabbitMQ
略
ActiveMQ
略