甲乙小朋友的房子

甲乙小朋友很笨,但甲乙小朋友不会放弃

0%

OpenMessaging源码阅读2-demo

producer操作

tips:以下涉及到的代码是关键步骤代码。

  1. ====================构造n个Topic和n个Queue====================

    1
    2
    3
    4
    String topic1 = "TOPIC1"; //实际测试时大概会有100个Topic左右
    String queue2 = "QUEUE2"; //实际测试时,queue数目与消费线程数目相同
    List<Message> messagesForTopic1 = new ArrayList<>(1024);
    List<Message> messagesForQueue1 = new ArrayList<>(1024);

  2. ====================向Topic和Queue中create数据====================

    1
    messagesForTopic1.add(producer.createBytesMessageToTopic(topic1,  (topic1 + i).getBytes()));

  • 调用producerproducer.createBytesMessageToTopic()方法来创建BytesMessage
  • 将上一步产生的标准消息扔进messagesForTopic1中,即调用了每个messagesForTopic1.add()来向这个Topic中添加消息

需要注意的是: - messageFactory,而createBytesMessageToTopic()正是通过messageFactory.createBytesMessageToTopic(topic, body);来创建消息; - messageFactory.createBytesMessageToTopic(topic, body);仅仅只是将消息的body和header放入了一个defaultBytesMessage类型的消息中,并返回 - 每个producer对应一个messageStore 总结:这一步将producer产生的数据放入消息列表messagesForTopic/Queue中

3.====================send数据====================

方式:

producer.send(messagesForTopic1.get(i));

然后send()内部是:

String topic = message.headers().getString(MessageHeader.TOPIC);
messageStore.putMessage(topic或queue, message);

需要注意的是:

  • 每个producer有一个messageStore,通过调用它的putMessage()来进行发送消息(将消息存储在硬盘中)
  • MessageStore类有一个成员变量Map <String, ArrayList<Message>> messageBuckets用来装消息,其中键是topic或queue的名字,值是new ArrayList<>(1024)(这里有一点疑问?)

而putMessage:

1
2
3
4
5
6
7
public synchronized void putMessage(String bucket, Message message) {
if (!messageBuckets.containsKey(bucket)) {
messageBuckets.put(bucket, new ArrayList<>(1024));
}
ArrayList<Message> bucketList = messageBuckets.get(bucket);
bucketList.add(message);
}
  • 如果本messageStore的messageBuckets没有本bucket(Topic或Queue),则将这个buket加入到messageBuckets中,并使得其键值为new ArrayList<>(1024)
  • 从本messageBuckets拿出(get)本bucket,放入消息列表bucketList中
  • 再将本message加入消息列表bucketList

总结:将Topic或Queue放入MessageStore的messageBuckets中,将消息体放入bMessageStore的ucketList中

consumer操作

  1. ====================进行消息订阅attach====================

操作:

consumer1.attachQueue(queue1, Collections.singletonList(topic1));

备注: - singletonList(T) 方法用于返回一个只包含指定对象的不可变列表

然后DefaultPullConsumer的attachQueue如下:

1
2
3
4
5
6
7
8
9
10
@Override public synchronized void attachQueue(String queueName, Collection<String> topics) {
if (queue != null && !queue.equals(queueName)) {
throw new ClientOMSException("You have alreadly attached to a queue " + queue);
}
queue = queueName;
buckets.add(queueName);
buckets.addAll(topics);
bucketList.clear();
bucketList.addAll(buckets);
}
  • buckets是DefaultPullConsumer的一个成员变量:private Set buckets = new HashSet<>();
  • bucketList是DefaultPullConsumer的一个成员变量:private List bucketList = new ArrayList<>();
  • 将本queue以及其下的所有topic都加入到buckets中
  • 将bukets都加入到bucketList中

2.====================进行消息拉取pull====================

操作:

Message message = consumer1.poll();

其中,poll为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override public synchronized Message poll() {
if (buckets.size() == 0 || queue == null) {
return null;
}
//use Round Robin
int checkNum = 0;
while (++checkNum <= bucketList.size()) {
String bucket = bucketList.get((++lastIndex) % (bucketList.size()));
Message message = messageStore.pullMessage(queue, bucket);
if (message != null) {
return message;
}
}
return null;
}
  • 遍历每个bucketList(bucketList装的是本consumer订阅的topics以及对应的queue)
  • 对遍历到的每个bucket,从messageStore拉取消息

而 messageStore.pullMessage为:

```Java
public synchronized Message pullMessage(String queue, String bucket) {
    ArrayList<Message> bucketList = messageBuckets.get(bucket);
    if (bucketList == null) {
        return null;
    }
    HashMap<String, Integer> offsetMap = queueOffsets.get(queue);
    if (offsetMap == null) {
        offsetMap = new HashMap<>();
        queueOffsets.put(queue, offsetMap);
    }
    int offset = offsetMap.getOrDefault(bucket, 0);
    if (offset >= bucketList.size()) {
        return null;
    }
    Message message = bucketList.get(offset);
    offsetMap.put(bucket, ++offset);
    return message;
}```
  • 先从messageStore的messageBuckets中get到本bucket的bucketList
  • 将这个consumer绑定的queue放入本messageStore的queueOffsets中
  • 然后我就有点疑惑了??