参考资料
题目要求
主角:消息中间件 目的:实现消息中间件的推拉模式。即生产者制造消息,使用消息队列储存消息,消费者从消息队列拉取消息。 要点:持久化的消息队列
Producer需要实现
创建一个消息,给消息指定Topic(可以由多个Consumer消费)
BytesMessage createBytesMessageToTopic(String topic, byte[] body);
创建一个消息,给消息指定Queue(只能由一个Consumer消费)
BytesMessage createBytesMessageToQueue(String queue, byte[] body);
发送消息,message中应当包含目的地(Queue,Topic只能选其一),对于发往同一个Topic和Queue的message顺序要保持一致。
void send(Message message);
PullConsumer需要实现
- 绑定到一个Queue,并订阅topics,即从这些topic和Queue读取消息。
void attachQueue(String queueName, Collection topics);
- 规范要求实现阻塞的接口,由properties来设置阻塞时间,但本赛题不需要用到该特性, 请实现一个非阻塞(也即阻塞时间为0)调用, 也即没有消息则返回null
Message poll();
测试流程
- 创建
Topic
,创建Queue
创建Producer
,多个Producer
创建指定Topic
和指定Queue
的Message,调用send
方法发送 - 将数据保存到磁盘中
kill Producer
进程,另取进程进行消费 创建PullConsumer
线程进行消费,一个Consumer
对应一个线程,Consumer
连接到一个Queue
,可以订阅多个Topic
。- 不断的调用poll拉取队列的消息,直到完全读完,读取的消息要相对有序。
补充: 一个Producer
对应一个线程,线程先创建对应的Message,再将Message 发送到对应的队列或topic
中,实际情况中会有多个Producer
。 一个Consumer
对应一个Queue
,多个Consumer
同时从队列中拉取消息。
技术难点
大量的消息产生
并发写
并发读
序列化&反序列化
大量消息 首先根据题目描述,Produce过程会运行5分钟,这个过程中多线程进行消息的发送,然后再考虑将消息持久化。我用自己的程序测试了一下,(不是典型值,只作为参考,在文章的最后我会贴上我的一系列测试结果),多线程发送一亿条消息的时间为27s,而这一亿条消息占据磁盘的大小为将近4G!可以想象在5分钟内会产生多少的消息量,如何将消息存储,如何读取消息都将成为一个非常棘手的问题。
并发写 并发写的问题也非常显而易见。我们一般情况下为了实现消息队列会选择使用一个List或数组来存储Producer产生的消息。这就引发了一个问题,怎样保证向同一个队列中发送消息的线程不产生竞争条件。
并发读 最麻烦的一个部分,每个线程都需要读取磁盘上的消息内容,每个线程读取的位置又不尽相同,消息数又那么多不可能全部加载到内存中,这个问题曾让我伤透了脑筋,直到我遇到了mmap(后面详细介绍)。
序列化和反序列化的问题 大赛刚开始时,我写了一个使用Java自身序列化来实现持久化的版本,这个版本的缺点非常显而易见,就是慢,而这个缺点又是极为致命的。我意识到我需要自己定制一个序列化协议来将消息转化为字节数组,再通过其他方式(如FileOutputStream)写入磁盘,同时再使用这个协议将其从磁盘中恢复。