producer操作
tips:以下涉及到的代码是关键步骤代码。
- ====================构造n个Topic和n个Queue==================== - 1 
 2
 3
 4- String topic1 = "TOPIC1"; //实际测试时大概会有100个Topic左右 
 String queue2 = "QUEUE2"; //实际测试时,queue数目与消费线程数目相同
 List<Message> messagesForTopic1 = new ArrayList<>(1024);
 List<Message> messagesForQueue1 = new ArrayList<>(1024);
- ====================向Topic和Queue中create数据==================== - 1 - messagesForTopic1.add(producer.createBytesMessageToTopic(topic1, (topic1 + i).getBytes())); 
- 调用producer的producer.createBytesMessageToTopic()方法来创建BytesMessage
- 将上一步产生的标准消息扔进messagesForTopic1中,即调用了每个messagesForTopic1.add()来向这个Topic中添加消息
需要注意的是: - messageFactory,而createBytesMessageToTopic()正是通过messageFactory.createBytesMessageToTopic(topic, body);来创建消息; - messageFactory.createBytesMessageToTopic(topic, body);仅仅只是将消息的body和header放入了一个defaultBytesMessage类型的消息中,并返回 - 每个producer对应一个messageStore 总结:这一步将producer产生的数据放入消息列表messagesForTopic/Queue中
3.====================send数据====================
方式:
producer.send(messagesForTopic1.get(i));然后send()内部是:
String topic = message.headers().getString(MessageHeader.TOPIC);
messageStore.putMessage(topic或queue, message);需要注意的是:
- 每个producer有一个messageStore,通过调用它的putMessage()来进行发送消息(将消息存储在硬盘中)
- MessageStore类有一个成员变量Map <String, ArrayList<Message>> messageBuckets用来装消息,其中键是topic或queue的名字,值是new ArrayList<>(1024)(这里有一点疑问?)
而putMessage:
1
2
3
4
5
6
7
public synchronized void putMessage(String bucket, Message message) {
    if (!messageBuckets.containsKey(bucket)) {
        messageBuckets.put(bucket, new ArrayList<>(1024));
    }
    ArrayList<Message> bucketList = messageBuckets.get(bucket);
    bucketList.add(message);
}
- 如果本messageStore的messageBuckets没有本bucket(Topic或Queue),则将这个buket加入到messageBuckets中,并使得其键值为new ArrayList<>(1024)
- 从本messageBuckets拿出(get)本bucket,放入消息列表bucketList中
- 再将本message加入消息列表bucketList
总结:将Topic或Queue放入MessageStore的messageBuckets中,将消息体放入bMessageStore的ucketList中
consumer操作
- ====================进行消息订阅attach====================
操作:
consumer1.attachQueue(queue1, Collections.singletonList(topic1));备注: - singletonList(T) 方法用于返回一个只包含指定对象的不可变列表
然后DefaultPullConsumer的attachQueue如下:
1
2
3
4
5
6
7
8
9
10
 public synchronized void attachQueue(String queueName, Collection<String> topics) {
    if (queue != null && !queue.equals(queueName)) {
        throw new ClientOMSException("You have alreadly attached to a queue " + queue);
    }
    queue = queueName;
    buckets.add(queueName);
    buckets.addAll(topics);
    bucketList.clear();
    bucketList.addAll(buckets);
}
- buckets是DefaultPullConsumer的一个成员变量:private Setbuckets = new HashSet<>(); 
- bucketList是DefaultPullConsumer的一个成员变量:private ListbucketList = new ArrayList<>(); 
- 将本queue以及其下的所有topic都加入到buckets中
- 将bukets都加入到bucketList中
2.====================进行消息拉取pull====================
操作:
Message message = consumer1.poll();
其中,poll为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 public synchronized Message poll() {
    if (buckets.size() == 0 || queue == null) {
        return null;
    }
    //use Round Robin
    int checkNum = 0;
    while (++checkNum <= bucketList.size()) {
        String bucket = bucketList.get((++lastIndex) % (bucketList.size()));
        Message message = messageStore.pullMessage(queue, bucket);
        if (message != null) {
            return message;
        }
    }
    return null;
}
- 遍历每个bucketList(bucketList装的是本consumer订阅的topics以及对应的queue)
- 对遍历到的每个bucket,从messageStore拉取消息
而 messageStore.pullMessage为:
```Java
public synchronized Message pullMessage(String queue, String bucket) {
    ArrayList<Message> bucketList = messageBuckets.get(bucket);
    if (bucketList == null) {
        return null;
    }
    HashMap<String, Integer> offsetMap = queueOffsets.get(queue);
    if (offsetMap == null) {
        offsetMap = new HashMap<>();
        queueOffsets.put(queue, offsetMap);
    }
    int offset = offsetMap.getOrDefault(bucket, 0);
    if (offset >= bucketList.size()) {
        return null;
    }
    Message message = bucketList.get(offset);
    offsetMap.put(bucket, ++offset);
    return message;
}```- 先从messageStore的messageBuckets中get到本bucket的bucketList
- 将这个consumer绑定的queue放入本messageStore的queueOffsets中
- 然后我就有点疑惑了??