甲乙小朋友的房子

甲乙小朋友很笨,但甲乙小朋友不会放弃

0%

OpenMessaging源码阅读1

考虑到自己记性实在太差,还是好好记笔记吧。 本节主要看以下几个接口: - Meaasage - Producer - PullConsumer

Message.java

Message接口是所有OMS消息中的根接口。最常用的消息就是BytesMessage。

标准Message 大多消息导向(message-oriented)的中间件(MOM)产品更趋向于将消息认做轻实体,这个轻实体包含一个header和一个body:

  • header:包含用来路由和识别的信息域;
  • body:包含将会被发送的应用信息;

本Message 本消息是一个仅包含与具体消息对象相关的property(财产)的轻量级实体。主要包含以下几个方面:

  • Header:所有消息都有同样的header域。header域的值用来给客户端(clients)和提供商(providers)唯一标示消息,以及路由消息。
  • Properties(财产,特性):每个消息都有一个消息自有的部分,这部分用来提供“应用定义(application-defined)”的property(财产)值。这一部分为支持“应用定义”消息的过滤提供了很有效的机制。

源码解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public interface Message {
/*headers()返回Message对象的header域,返回值类型是keyValue。*/
KeyValue headers();

/*properties()返回消息自有的property域。返回值类型是keyValue。*/
KeyValue properties();

/*putHeaders(String key, int value)将输入(String key, int value)全部传入header*/
/*参数key:headers的关键字*/
/*参数values:与key对应的值*/
Message putHeaders(String key, int value);
Message putHeaders(String key, long value);
Message putHeaders(String key, double value);
Message putHeaders(String key, String value);


/*将key和value全部存入header*/
/*参数key:headers的关键字*/
/*参数values:与key对应的值*/
Message putProperties(String key, int value);
Message putProperties(String key, long value);
Message putProperties(String key, double value);
Message putProperties(String key, String value);
}

Producer.java

Producer是一个用来发送消息的简单对象,它是MessagingAccessPoint的一个具体实现。 创建Producer对象Producer的具体实例是通过MessagingAccessPoint#createProducer()方法创建的。 这个方法提供了多种定点发送消息的方式,其中,目的地可以是MessageHeader#TOPICMessageHeader#QUEUE

Producer#send(Message) 同步定点发送消息方法。 当发送请求完成时,线程将会关闭(block)。

Producer#sendAsync(Message) 异步定点发送消息方法。 当发送请求完成时,线程不会很快关闭,而会立即返回一个Promise作为发送结果。

Producer#sendOneway(Message) one way定点发送消息方法。 当发送请求完成时,线程不会很快关闭,而是立即返回。线程发起者不关心发送结果,同时server也对返回值没有责任。

源码解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public interface Producer extends MessageFactory, ServiceLifecycle {
/*返回本实例的properties*/
/*返回值的变化不会反应在Producer本身上,并且这个变化可以用ResourceManager#setProducerProperties(String, KeyValue)来修改。(Changes to the return {@code KeyValue} are not reflected in physical {@code Producer},and use {@link ResourceManager#setProducerProperties(String, KeyValue)} to modify.)*/
KeyValue properties();

/*同步定点发送message方法*/
/*发送目的地应该预置在MessageHeader中。当然其它类型的header域也可以*/
/*异常OMSRuntimeException:当由于内部原因发送失败时*/
void send(Message message);
void send(Message message, KeyValue properties);/*properties是属性值*/

/*异步定点发送消息方法*/
/*返回值是Promise类型。同时,登记过的PromiseListener将会被通知*/
Promise<Void> sendAsync(Message message);
Promise<Void> sendAsync(Message message, KeyValue properties);

/*oneway定点发送消息方法*/
/*无返回值,也没有thrown。因为oneway发送不在乎发送结果*/
void sendOneway(Message message);
void sendOneway(Message message, KeyValue properties);

BatchToPartition createBatchToPartition(String partitionName);

BatchToPartition createBatchToPartition(String partitionName, KeyValue properties);
}

PullConsumer.java

PullConsumer对象能从特定的队列中poll消息。而且支持通过‘ack’方式提交消费结果。

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public interface PullConsumer {
/*返回本PullConsumer实例的properties*/
/* Changes to the return {@code KeyValue} are not reflected in physical {@code PullConsumer},and use {@link ResourceManager#setConsumerProperties(String, KeyValue)} to modify.*/
KeyValue properties();

/**
* 规范要求实现阻塞的接口,由properties来设置阻塞时间,但本赛题不需要用到该特性,请实现一个非阻塞(也即阻塞时间为0)调用, 也即没有消息则返回null
*/

/*抽取下一条为本pullconsumer生产的消息*/
/*除非一条消息被产生了,或者本pullConsumer被关闭了,本调用会一直block*/
/*返回为本PullConsumer生产的下一条消息;当本PullConsumer被同时关闭时返回null*/
/*当本PullConsumer由于一些内部原因而抽取下一条消息失败时,throw OMSRuntimeException*/
Message poll();
Message poll(final KeyValue properties);/*properties是一些参数*/

/*用消息id回确认指定的已消费的消息*/
/*若某消息已被接收,但还未被回确认,那它可能会被重新投递*/
/*有OMSRuntimeException*/
void ack(String messageId);
void ack(String messageId, final KeyValue properties);

/* 绑定到一个Queue,并订阅topics,即从这些topic读取消息*/
void attachQueue(String queueName, Collection<String> topics);
}