producer操作
tips:以下涉及到的代码是关键步骤代码。
====================构造n个Topic和n个Queue====================
1
2
3
4String topic1 = "TOPIC1"; //实际测试时大概会有100个Topic左右
String queue2 = "QUEUE2"; //实际测试时,queue数目与消费线程数目相同
List<Message> messagesForTopic1 = new ArrayList<>(1024);
List<Message> messagesForQueue1 = new ArrayList<>(1024);====================向Topic和Queue中create数据====================
1
messagesForTopic1.add(producer.createBytesMessageToTopic(topic1, (topic1 + i).getBytes()));
- 调用
producer
的producer.createBytesMessageToTopic()
方法来创建BytesMessage
- 将上一步产生的标准消息扔进messagesForTopic1中,即调用了每个
messagesForTopic1.add()
来向这个Topic
中添加消息
需要注意的是: - messageFactory,而createBytesMessageToTopic()正是通过messageFactory.createBytesMessageToTopic(topic, body);来创建消息; - messageFactory.createBytesMessageToTopic(topic, body);仅仅只是将消息的body和header放入了一个defaultBytesMessage类型的消息中,并返回 - 每个producer对应一个messageStore 总结:这一步将producer产生的数据放入消息列表messagesForTopic/Queue中
3.====================send数据====================
方式:
producer.send(messagesForTopic1.get(i));
然后send()
内部是:
String topic = message.headers().getString(MessageHeader.TOPIC);
messageStore.putMessage(topic或queue, message);
需要注意的是:
- 每个producer有一个messageStore,通过调用它的putMessage()来进行发送消息(将消息存储在硬盘中)
- MessageStore类有一个成员变量
Map <String, ArrayList<Message>> messageBuckets
用来装消息,其中键是topic或queue的名字,值是new ArrayList<>(1024)(这里有一点疑问?)
而putMessage:
1
2
3
4
5
6
7
public synchronized void putMessage(String bucket, Message message) {
if (!messageBuckets.containsKey(bucket)) {
messageBuckets.put(bucket, new ArrayList<>(1024));
}
ArrayList<Message> bucketList = messageBuckets.get(bucket);
bucketList.add(message);
}
- 如果本messageStore的messageBuckets没有本bucket(Topic或Queue),则将这个buket加入到messageBuckets中,并使得其键值为new ArrayList<>(1024)
- 从本messageBuckets拿出(get)本bucket,放入消息列表bucketList中
- 再将本message加入消息列表bucketList
总结:将Topic或Queue放入MessageStore的messageBuckets中,将消息体放入bMessageStore的ucketList中
consumer操作
- ====================进行消息订阅attach====================
操作:
consumer1.attachQueue(queue1, Collections.singletonList(topic1));
备注: - singletonList(T) 方法用于返回一个只包含指定对象的不可变列表
然后DefaultPullConsumer的attachQueue如下:
1
2
3
4
5
6
7
8
9
10
public synchronized void attachQueue(String queueName, Collection<String> topics) {
if (queue != null && !queue.equals(queueName)) {
throw new ClientOMSException("You have alreadly attached to a queue " + queue);
}
queue = queueName;
buckets.add(queueName);
buckets.addAll(topics);
bucketList.clear();
bucketList.addAll(buckets);
}
- buckets是DefaultPullConsumer的一个成员变量:private Set
buckets = new HashSet<>(); - bucketList是DefaultPullConsumer的一个成员变量:private List
bucketList = new ArrayList<>(); - 将本queue以及其下的所有topic都加入到buckets中
- 将bukets都加入到bucketList中
2.====================进行消息拉取pull====================
操作:
Message message = consumer1.poll();
其中,poll为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public synchronized Message poll() {
if (buckets.size() == 0 || queue == null) {
return null;
}
//use Round Robin
int checkNum = 0;
while (++checkNum <= bucketList.size()) {
String bucket = bucketList.get((++lastIndex) % (bucketList.size()));
Message message = messageStore.pullMessage(queue, bucket);
if (message != null) {
return message;
}
}
return null;
}
- 遍历每个bucketList(bucketList装的是本consumer订阅的topics以及对应的queue)
- 对遍历到的每个bucket,从messageStore拉取消息
而 messageStore.pullMessage为:
```Java
public synchronized Message pullMessage(String queue, String bucket) {
ArrayList<Message> bucketList = messageBuckets.get(bucket);
if (bucketList == null) {
return null;
}
HashMap<String, Integer> offsetMap = queueOffsets.get(queue);
if (offsetMap == null) {
offsetMap = new HashMap<>();
queueOffsets.put(queue, offsetMap);
}
int offset = offsetMap.getOrDefault(bucket, 0);
if (offset >= bucketList.size()) {
return null;
}
Message message = bucketList.get(offset);
offsetMap.put(bucket, ++offset);
return message;
}```
- 先从messageStore的messageBuckets中get到本bucket的bucketList
- 将这个consumer绑定的queue放入本messageStore的queueOffsets中
- 然后我就有点疑惑了??