考虑到自己记性实在太差,还是好好记笔记吧。 本节主要看以下几个接口: - Meaasage - Producer - PullConsumer
Message.java
Message接口是所有OMS消息中的根接口。最常用的消息就是BytesMessage。
标准Message 大多消息导向(message-oriented)的中间件(MOM)产品更趋向于将消息认做轻实体,这个轻实体包含一个header和一个body:
- header:包含用来路由和识别的信息域;
- body:包含将会被发送的应用信息;
本Message 本消息是一个仅包含与具体消息对象相关的property(财产)的轻量级实体。主要包含以下几个方面:
- Header:所有消息都有同样的header域。header域的值用来给客户端(clients)和提供商(providers)唯一标示消息,以及路由消息。
- Properties(财产,特性):每个消息都有一个消息自有的部分,这部分用来提供“应用定义(application-defined)”的property(财产)值。这一部分为支持“应用定义”消息的过滤提供了很有效的机制。
源码解读
1 | public interface Message { |
Producer.java
Producer是一个用来发送消息的简单对象,它是MessagingAccessPoint
的一个具体实现。 创建Producer对象: Producer
的具体实例是通过MessagingAccessPoint#createProducer()
方法创建的。 这个方法提供了多种定点发送消息的方式,其中,目的地可以是MessageHeader#TOPIC
或MessageHeader#QUEUE
。
Producer#send(Message) 同步定点发送消息方法。 当发送请求完成时,线程将会关闭(block)。
Producer#sendAsync(Message) 异步定点发送消息方法。 当发送请求完成时,线程不会很快关闭,而会立即返回一个Promise
作为发送结果。
Producer#sendOneway(Message) one way定点发送消息方法。 当发送请求完成时,线程不会很快关闭,而是立即返回。线程发起者不关心发送结果,同时server也对返回值没有责任。
源码解读
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public interface Producer extends MessageFactory, ServiceLifecycle {
/*返回本实例的properties*/
/*返回值的变化不会反应在Producer本身上,并且这个变化可以用ResourceManager#setProducerProperties(String, KeyValue)来修改。(Changes to the return {@code KeyValue} are not reflected in physical {@code Producer},and use {@link ResourceManager#setProducerProperties(String, KeyValue)} to modify.)*/
KeyValue properties();
/*同步定点发送message方法*/
/*发送目的地应该预置在MessageHeader中。当然其它类型的header域也可以*/
/*异常OMSRuntimeException:当由于内部原因发送失败时*/
void send(Message message);
void send(Message message, KeyValue properties);/*properties是属性值*/
/*异步定点发送消息方法*/
/*返回值是Promise类型。同时,登记过的PromiseListener将会被通知*/
Promise<Void> sendAsync(Message message);
Promise<Void> sendAsync(Message message, KeyValue properties);
/*oneway定点发送消息方法*/
/*无返回值,也没有thrown。因为oneway发送不在乎发送结果*/
void sendOneway(Message message);
void sendOneway(Message message, KeyValue properties);
BatchToPartition createBatchToPartition(String partitionName);
BatchToPartition createBatchToPartition(String partitionName, KeyValue properties);
}
PullConsumer.java
PullConsumer
对象能从特定的队列中poll消息。而且支持通过‘ack’方式提交消费结果。
源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public interface PullConsumer {
/*返回本PullConsumer实例的properties*/
/* Changes to the return {@code KeyValue} are not reflected in physical {@code PullConsumer},and use {@link ResourceManager#setConsumerProperties(String, KeyValue)} to modify.*/
KeyValue properties();
/**
* 规范要求实现阻塞的接口,由properties来设置阻塞时间,但本赛题不需要用到该特性,请实现一个非阻塞(也即阻塞时间为0)调用, 也即没有消息则返回null
*/
/*抽取下一条为本pullconsumer生产的消息*/
/*除非一条消息被产生了,或者本pullConsumer被关闭了,本调用会一直block*/
/*返回为本PullConsumer生产的下一条消息;当本PullConsumer被同时关闭时返回null*/
/*当本PullConsumer由于一些内部原因而抽取下一条消息失败时,throw OMSRuntimeException*/
Message poll();
Message poll(final KeyValue properties);/*properties是一些参数*/
/*用消息id回确认指定的已消费的消息*/
/*若某消息已被接收,但还未被回确认,那它可能会被重新投递*/
/*有OMSRuntimeException*/
void ack(String messageId);
void ack(String messageId, final KeyValue properties);
/* 绑定到一个Queue,并订阅topics,即从这些topic读取消息*/
void attachQueue(String queueName, Collection<String> topics);
}