甲乙小朋友的房子

甲乙小朋友很笨,但甲乙小朋友不会放弃

0%

内存映射文件:利用虚拟内存实现将文件“映射”到内存中。文件对应于内存中的一个字节数组,对文件的操作变为对这个字节数组的操作,而字节数组的操作直接映射到文件上。这样这个文件就可以当做是一个内存数组一样的访问,这比传统的文件操作要快得多。

映射:硬盘上文件的位置与进程逻辑地址空间中一块大小相同的区域之间的一一对应

不过,这种映射是操作系统提供的一种假象,文件一般不会马上加载到内存,操作系统只是记录下了这回事,当实际发生读写时,才会按需加载。

这种按需加载的方式,使得内存映射文件可以方便处理非常大的文件,内存放不下整个文件也不要紧,操作系统会自动进行处理,将需要的内容读到内存,将修改的内容保存到硬盘,将不再使用的内存释放。

内存映射文件也有局限性,比如,它不太适合处理小文件,它是按页分配内存的,对于小文件,会浪费空间,另外,映射文件要消耗一定的操作系统资源,初始化比较慢。

java使用内存映射

步骤

  • 引入java.nio包

  • 从文件中获得一个通道(channel)。

    1
    FileChannel channel = FileChanne.open(path,options)

  • 通过调用FileChannel类的map方法从这个通道中获得一个ByteBuffer,它代表内存中的字节数组。其中,映射文件区域与映射模式支持三种方式: -- FileChannel.MapMode.READ_ONLY:缓冲区只读 -- FileChannel.MapMode.READ_WRITE:可读写。任何缓冲区的修改都会写回文件(非立即) -- FileChannel.MapMode.PRIVATE:缓冲区可写,但修改不会传播到文件中

映射完成后,文件就可以关闭了,后续对文件的读写可以通过MappedByteBuffer。

例:以读写模式映射文件"abc.dat",代码可以为:

1
2
3
4
5
6
7
8
9
RandomAccessFile file = new RandomAccessFile("abc.dat","rw");
try {
MappedByteBuffer buf = file.getChannel().map(MapMode.READ_WRITE, 0, file.length());
//使用buf...
} catch (IOException e) {
e.printStackTrace();
}finally{
file.close();
}

## 

参考文献

1.计算机程序的思维逻辑 (61) - 内存映射文件及其应用 - 实现一个简单的消息队列 2.java流的性能优化2-内存映射文件

producer操作

tips:以下涉及到的代码是关键步骤代码。

  1. ====================构造n个Topic和n个Queue====================

    1
    2
    3
    4
    String topic1 = "TOPIC1"; //实际测试时大概会有100个Topic左右
    String queue2 = "QUEUE2"; //实际测试时,queue数目与消费线程数目相同
    List<Message> messagesForTopic1 = new ArrayList<>(1024);
    List<Message> messagesForQueue1 = new ArrayList<>(1024);

  2. ====================向Topic和Queue中create数据====================

    1
    messagesForTopic1.add(producer.createBytesMessageToTopic(topic1,  (topic1 + i).getBytes()));

  • 调用producerproducer.createBytesMessageToTopic()方法来创建BytesMessage
  • 将上一步产生的标准消息扔进messagesForTopic1中,即调用了每个messagesForTopic1.add()来向这个Topic中添加消息

需要注意的是: - messageFactory,而createBytesMessageToTopic()正是通过messageFactory.createBytesMessageToTopic(topic, body);来创建消息; - messageFactory.createBytesMessageToTopic(topic, body);仅仅只是将消息的body和header放入了一个defaultBytesMessage类型的消息中,并返回 - 每个producer对应一个messageStore 总结:这一步将producer产生的数据放入消息列表messagesForTopic/Queue中

3.====================send数据====================

方式:

producer.send(messagesForTopic1.get(i));

然后send()内部是:

String topic = message.headers().getString(MessageHeader.TOPIC);
messageStore.putMessage(topic或queue, message);

需要注意的是:

  • 每个producer有一个messageStore,通过调用它的putMessage()来进行发送消息(将消息存储在硬盘中)
  • MessageStore类有一个成员变量Map <String, ArrayList<Message>> messageBuckets用来装消息,其中键是topic或queue的名字,值是new ArrayList<>(1024)(这里有一点疑问?)

而putMessage:

1
2
3
4
5
6
7
public synchronized void putMessage(String bucket, Message message) {
if (!messageBuckets.containsKey(bucket)) {
messageBuckets.put(bucket, new ArrayList<>(1024));
}
ArrayList<Message> bucketList = messageBuckets.get(bucket);
bucketList.add(message);
}
  • 如果本messageStore的messageBuckets没有本bucket(Topic或Queue),则将这个buket加入到messageBuckets中,并使得其键值为new ArrayList<>(1024)
  • 从本messageBuckets拿出(get)本bucket,放入消息列表bucketList中
  • 再将本message加入消息列表bucketList

总结:将Topic或Queue放入MessageStore的messageBuckets中,将消息体放入bMessageStore的ucketList中

consumer操作

  1. ====================进行消息订阅attach====================

操作:

consumer1.attachQueue(queue1, Collections.singletonList(topic1));

备注: - singletonList(T) 方法用于返回一个只包含指定对象的不可变列表

然后DefaultPullConsumer的attachQueue如下:

1
2
3
4
5
6
7
8
9
10
@Override public synchronized void attachQueue(String queueName, Collection<String> topics) {
if (queue != null && !queue.equals(queueName)) {
throw new ClientOMSException("You have alreadly attached to a queue " + queue);
}
queue = queueName;
buckets.add(queueName);
buckets.addAll(topics);
bucketList.clear();
bucketList.addAll(buckets);
}
  • buckets是DefaultPullConsumer的一个成员变量:private Set buckets = new HashSet<>();
  • bucketList是DefaultPullConsumer的一个成员变量:private List bucketList = new ArrayList<>();
  • 将本queue以及其下的所有topic都加入到buckets中
  • 将bukets都加入到bucketList中

2.====================进行消息拉取pull====================

操作:

Message message = consumer1.poll();

其中,poll为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override public synchronized Message poll() {
if (buckets.size() == 0 || queue == null) {
return null;
}
//use Round Robin
int checkNum = 0;
while (++checkNum <= bucketList.size()) {
String bucket = bucketList.get((++lastIndex) % (bucketList.size()));
Message message = messageStore.pullMessage(queue, bucket);
if (message != null) {
return message;
}
}
return null;
}
  • 遍历每个bucketList(bucketList装的是本consumer订阅的topics以及对应的queue)
  • 对遍历到的每个bucket,从messageStore拉取消息

而 messageStore.pullMessage为:

```Java
public synchronized Message pullMessage(String queue, String bucket) {
    ArrayList<Message> bucketList = messageBuckets.get(bucket);
    if (bucketList == null) {
        return null;
    }
    HashMap<String, Integer> offsetMap = queueOffsets.get(queue);
    if (offsetMap == null) {
        offsetMap = new HashMap<>();
        queueOffsets.put(queue, offsetMap);
    }
    int offset = offsetMap.getOrDefault(bucket, 0);
    if (offset >= bucketList.size()) {
        return null;
    }
    Message message = bucketList.get(offset);
    offsetMap.put(bucket, ++offset);
    return message;
}```
  • 先从messageStore的messageBuckets中get到本bucket的bucketList
  • 将这个consumer绑定的queue放入本messageStore的queueOffsets中
  • 然后我就有点疑惑了??

RocketMQ:分布式开放消息系统

消息中间件需要解决哪些问题?

发布订阅(Publish/Subscribe)

发布订阅是消息中间件的最基本功能,也是相对于传统RPC通信而言。在此不再详述。

消息优先级(Message Priority)

两种方式:

  1. 严格优先级,例如0-65535。开销大,精准,但可能没有必要。
  2. 档位优先级。高、中、低,或其他。每个优先级可以用不同的topic表示,发消息时,指定不同的topic来表示优先级。精确性低。

消息有序性(Message Order)

  1. 一个订单的发出的消息顺序不能变
  2. 订单之间是可以并行消费

消息过滤(Message Filter)

消息协商器(Broker端)消息过滤

在Broker中,按照Consumer的要求做过滤 1. 优点是减少了对于Consumer无用消息的网络传输。 2. 缺点是增加了Broker的负担,实现相对复杂。

淘宝Notify支持多种过滤方式: 包含直接按照消息类型过滤,灵活的语法表达式过滤,几乎可以满足最苛刻的过滤需求。

淘宝RocketMQ支持按照简单的Message Tag过滤,也支持按照Message Header、body进行过滤。

CORBA Notification规范中也支持灵活的语法表达式过滤。

Consumer端消息过滤

这种过滤方式可由应用完全自定义实现,但是缺点是很多无用的消息要传输到Consumer端。

消息持久化(Message Persistence)

持久化(Persistence):即把数据(如内存中的对象)保存到可永久保存的存储设备中(如磁盘)。持久化的主要应用是将内存中的对象存储在数据库中,或者存储在磁盘文件中、XML数据文件中等等。

消息中间件通常采用的几种持久化方式:

  1. 持久化到数据库,例如Mysql。
  2. 持久化到KV存储,例如levelDB、伯克利DB等KV存储系统。
  3. 文件记录形式持久化,例如Kafka,RocketMQ
  4. 对内存数据做一个持久化镜像,例如beanstalkd,VisiNotify
  5. (1)、(2)、(3)三种持久化方式都具有将内存队列Buffer进行扩展的能力,(4)只是一个内存的镜像,作用是当Broker挂掉重启后仍然能将之前内存的数据恢复出来。

JMS与CORBA Notification规范没有明确说明如何持久化,但是持久化部分的性能直接决定了整个消息中间件的性能。

RocketMQ充分利用Linux文件系统内存cache来提高性能。

消息可靠性(Message Reliablity)

响消息可靠性的几种情况:

  1. Broker正常关闭
  2. Broker异常Crash
  3. OS Crash
  4. 机器掉电,但是能立即恢复供电情况。
  5. 机器无法开机(可能是cpu、主板、内存等关键设备损坏)
  6. 磁盘设备损坏

(1)、(2)、(3)、(4)四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。

(5)、(6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。

RocketMQ从3.0版本开始支持同步双写。

参考文献

  1. 十分钟入门RocketMQ

参考资料

  1. 李健胜,阿里中间件大赛初赛解题思路
  2. 1对应的代码

题目要求

主角:消息中间件 目的:实现消息中间件的推拉模式。即生产者制造消息,使用消息队列储存消息,消费者从消息队列拉取消息。 要点:持久化的消息队列

Producer需要实现

  • 创建一个消息,给消息指定Topic(可以由多个Consumer消费) BytesMessage createBytesMessageToTopic(String topic, byte[] body);

  • 创建一个消息,给消息指定Queue(只能由一个Consumer消费) BytesMessage createBytesMessageToQueue(String queue, byte[] body);

  • 发送消息,message中应当包含目的地(Queue,Topic只能选其一),对于发往同一个Topic和Queue的message顺序要保持一致。 void send(Message message);

PullConsumer需要实现

  • 绑定到一个Queue,并订阅topics,即从这些topic和Queue读取消息。 void attachQueue(String queueName, Collection topics);
  • 规范要求实现阻塞的接口,由properties来设置阻塞时间,但本赛题不需要用到该特性, 请实现一个非阻塞(也即阻塞时间为0)调用, 也即没有消息则返回null Message poll();

测试流程

  • 创建Topic,创建Queue 创建Producer,多个Producer创建指定Topic和指定Queue的Message,调用send方法发送
  • 将数据保存到磁盘中
  • kill Producer进程,另取进程进行消费 创建PullConsumer线程进行消费,一个Consumer对应一个线程,Consumer连接到一个Queue,可以订阅多个Topic
  • 不断的调用poll拉取队列的消息,直到完全读完,读取的消息要相对有序。

补充: 一个Producer对应一个线程,线程先创建对应的Message,再将Message 发送到对应的队列或topic中,实际情况中会有多个Producer。 一个Consumer对应一个Queue,多个Consumer同时从队列中拉取消息。

技术难点

  1. 大量的消息产生

  2. 并发写

  3. 并发读

  4. 序列化&反序列化

  5. 大量消息 首先根据题目描述,Produce过程会运行5分钟,这个过程中多线程进行消息的发送,然后再考虑将消息持久化。我用自己的程序测试了一下,(不是典型值,只作为参考,在文章的最后我会贴上我的一系列测试结果),多线程发送一亿条消息的时间为27s,而这一亿条消息占据磁盘的大小为将近4G!可以想象在5分钟内会产生多少的消息量,如何将消息存储,如何读取消息都将成为一个非常棘手的问题。

  6. 并发写 并发写的问题也非常显而易见。我们一般情况下为了实现消息队列会选择使用一个List或数组来存储Producer产生的消息。这就引发了一个问题,怎样保证向同一个队列中发送消息的线程不产生竞争条件。

  7. 并发读 最麻烦的一个部分,每个线程都需要读取磁盘上的消息内容,每个线程读取的位置又不尽相同,消息数又那么多不可能全部加载到内存中,这个问题曾让我伤透了脑筋,直到我遇到了mmap(后面详细介绍)。

  8. 序列化和反序列化的问题 大赛刚开始时,我写了一个使用Java自身序列化来实现持久化的版本,这个版本的缺点非常显而易见,就是慢,而这个缺点又是极为致命的。我意识到我需要自己定制一个序列化协议来将消息转化为字节数组,再通过其他方式(如FileOutputStream)写入磁盘,同时再使用这个协议将其从磁盘中恢复。

考虑到自己记性实在太差,还是好好记笔记吧。 本节主要看以下几个接口: - Meaasage - Producer - PullConsumer

Message.java

Message接口是所有OMS消息中的根接口。最常用的消息就是BytesMessage。

标准Message 大多消息导向(message-oriented)的中间件(MOM)产品更趋向于将消息认做轻实体,这个轻实体包含一个header和一个body:

  • header:包含用来路由和识别的信息域;
  • body:包含将会被发送的应用信息;

本Message 本消息是一个仅包含与具体消息对象相关的property(财产)的轻量级实体。主要包含以下几个方面:

  • Header:所有消息都有同样的header域。header域的值用来给客户端(clients)和提供商(providers)唯一标示消息,以及路由消息。
  • Properties(财产,特性):每个消息都有一个消息自有的部分,这部分用来提供“应用定义(application-defined)”的property(财产)值。这一部分为支持“应用定义”消息的过滤提供了很有效的机制。

源码解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public interface Message {
/*headers()返回Message对象的header域,返回值类型是keyValue。*/
KeyValue headers();

/*properties()返回消息自有的property域。返回值类型是keyValue。*/
KeyValue properties();

/*putHeaders(String key, int value)将输入(String key, int value)全部传入header*/
/*参数key:headers的关键字*/
/*参数values:与key对应的值*/
Message putHeaders(String key, int value);
Message putHeaders(String key, long value);
Message putHeaders(String key, double value);
Message putHeaders(String key, String value);


/*将key和value全部存入header*/
/*参数key:headers的关键字*/
/*参数values:与key对应的值*/
Message putProperties(String key, int value);
Message putProperties(String key, long value);
Message putProperties(String key, double value);
Message putProperties(String key, String value);
}

Producer.java

Producer是一个用来发送消息的简单对象,它是MessagingAccessPoint的一个具体实现。 创建Producer对象Producer的具体实例是通过MessagingAccessPoint#createProducer()方法创建的。 这个方法提供了多种定点发送消息的方式,其中,目的地可以是MessageHeader#TOPICMessageHeader#QUEUE

Producer#send(Message) 同步定点发送消息方法。 当发送请求完成时,线程将会关闭(block)。

Producer#sendAsync(Message) 异步定点发送消息方法。 当发送请求完成时,线程不会很快关闭,而会立即返回一个Promise作为发送结果。

Producer#sendOneway(Message) one way定点发送消息方法。 当发送请求完成时,线程不会很快关闭,而是立即返回。线程发起者不关心发送结果,同时server也对返回值没有责任。

源码解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public interface Producer extends MessageFactory, ServiceLifecycle {
/*返回本实例的properties*/
/*返回值的变化不会反应在Producer本身上,并且这个变化可以用ResourceManager#setProducerProperties(String, KeyValue)来修改。(Changes to the return {@code KeyValue} are not reflected in physical {@code Producer},and use {@link ResourceManager#setProducerProperties(String, KeyValue)} to modify.)*/
KeyValue properties();

/*同步定点发送message方法*/
/*发送目的地应该预置在MessageHeader中。当然其它类型的header域也可以*/
/*异常OMSRuntimeException:当由于内部原因发送失败时*/
void send(Message message);
void send(Message message, KeyValue properties);/*properties是属性值*/

/*异步定点发送消息方法*/
/*返回值是Promise类型。同时,登记过的PromiseListener将会被通知*/
Promise<Void> sendAsync(Message message);
Promise<Void> sendAsync(Message message, KeyValue properties);

/*oneway定点发送消息方法*/
/*无返回值,也没有thrown。因为oneway发送不在乎发送结果*/
void sendOneway(Message message);
void sendOneway(Message message, KeyValue properties);

BatchToPartition createBatchToPartition(String partitionName);

BatchToPartition createBatchToPartition(String partitionName, KeyValue properties);
}

PullConsumer.java

PullConsumer对象能从特定的队列中poll消息。而且支持通过‘ack’方式提交消费结果。

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public interface PullConsumer {
/*返回本PullConsumer实例的properties*/
/* Changes to the return {@code KeyValue} are not reflected in physical {@code PullConsumer},and use {@link ResourceManager#setConsumerProperties(String, KeyValue)} to modify.*/
KeyValue properties();

/**
* 规范要求实现阻塞的接口,由properties来设置阻塞时间,但本赛题不需要用到该特性,请实现一个非阻塞(也即阻塞时间为0)调用, 也即没有消息则返回null
*/

/*抽取下一条为本pullconsumer生产的消息*/
/*除非一条消息被产生了,或者本pullConsumer被关闭了,本调用会一直block*/
/*返回为本PullConsumer生产的下一条消息;当本PullConsumer被同时关闭时返回null*/
/*当本PullConsumer由于一些内部原因而抽取下一条消息失败时,throw OMSRuntimeException*/
Message poll();
Message poll(final KeyValue properties);/*properties是一些参数*/

/*用消息id回确认指定的已消费的消息*/
/*若某消息已被接收,但还未被回确认,那它可能会被重新投递*/
/*有OMSRuntimeException*/
void ack(String messageId);
void ack(String messageId, final KeyValue properties);

/* 绑定到一个Queue,并订阅topics,即从这些topic读取消息*/
void attachQueue(String queueName, Collection<String> topics);
}

线程(thread):每个任务称为一个线程 线程和进程区别: 1. 每个进程有自己独立的变量,而线程则共享数据。 2. 线程是进程的执行单元 3. 线程是进程的组成部分。一个进程可以有多个线程,一个线程必须有一个父进程。 4. 线程可以拥有自己的堆栈、自己的程序计数器和自己的局部变量,但不拥有系统资源(与父进程其它线程共享该进程则资源)

线程的创建

有以下两种方法: 1. 通过继承Thread来创建线程 2. 通过实现Runnable接口创建线程

通过继承Thread来创建线程

要点:通过继承Thread类创建并启动多线程 步骤:

  1. 定义Thread类的子类,并重写run()方法(线程执行体)

  2. 创建Thread子类的实例(创建线程对象)

  3. 调用start()(启用该线程)

    1
    2
    3
    4
    5
    6
    7
    8
    public class FirstThread extends Thread {
    public void run(){
    System.out.println("run\t"+getName()+"\t");//getName()返回thread name
    }
    public static void main(String[] args){
    new FirstThread().start();
    }
    }

以上代码输出:

run  Thread-0

一个有意思的现象

运行如下代码时:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class FirstThread extends Thread {
public void run(){
System.out.println("run\t"+getName()+"\t");//getName()返回thread name
}
public static void main(String[] args){
//调用currentThread()获取当前线程
System.out.println("currentThread : "+Thread.currentThread().getName());
new FirstThread().start();
System.out.println("currentThread : "+Thread.currentThread().getName());
new FirstThread().start();
System.out.println("currentThread : "+Thread.currentThread().getName());
}
}

按照以前的理解,应该是:

currentThread : main
run Thread-0 
currentThread : main 
run Thread-1    
currentThread : main

实际输出:(也有可能是其它顺序)

currentThread : main
currentThread : main
run Thread-0    
run Thread-1    
currentThread : main

新发现:其实start一个线程的时候,main线程在继续运行。main线程不会等start完事儿之后再运行下一句!

通过实现Runnable接口创建线程

1
2
3
4
5
6
7
8
9
public class SecondThread implements Runnable {
public void run(){
System.out.println("run\t"+Thread.currentThread().getName()+"\t");//getName()返回thread name
}
public static void main(String[] args){
SecondThread st = new SecondThread();
new Thread(st,"new_thread_1").start();
}
}

区别: 需要通过Thread.currentThread().getName()来获取getName() main不同

多线程共享变量

以下是一个多线程共享变量i的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SecondThread implements Runnable {
private int i;
public void run(){
while(i<5){
System.out.println("run\t"+Thread.currentThread().getName()+"\t"+i);//getName()返回thread name
i++;
}
}
public static void main(String[] args){
SecondThread st = new SecondThread();
new Thread(st,"thread_name_1").start();
new Thread(st,"thread_name_2").start();
}
}

两次运行结果:

run thread_name_1    0
run thread_name_1    1
run thread_name_1    2
run thread_name_1    3
run thread_name_2    3
run thread_name_1    4

run thread_name_1    0
run thread_name_2    0
run thread_name_1    1
run thread_name_2    2
run thread_name_1    3
run thread_name_2    4

发现

  1. 两个线程共有变量i
  2. 线程间抢占资源

使用CallableFuture创建线程

OpenMessaging的主要关系如下图所示:

其中,各部分的内容和关系见下述。

Namespace

Namespace就像一个cgroup namespace,是用来创建一个有安全保障的独立的空间。每个namespace都有自己的producer,consumer,topic,queue等等。OpenMessaging用 ​MessagingAccessPoint​(消息访问点)来访问/读/写指定namespace的​资源​。

Producer

Openmessaging定义了两种Producer:​Producer​和 ​SequenceProducer

  • ​Producer​:提供各种send方法,用来将一个消息送往指定的destination,Topic或者Queue。支持三种方式:同步、异步、单向(oneway)
  • SequenceProducer:重点在于速度,且支持批处理。能发送多个数据并一次提交。

Consumer

Openmessaging定义了两种Consumer::PullConsumerPushConsumerStreamConsumer.每种Consumer仅支持来自于Queue的consume消息。

  • PullConsumer:从指定队列中pulls消息。支持“submit the consume result by acknowledgement at any time”。每个PullConsumer仅能从固定的队列中pull消息。
  • PushConsumer:可从多个队列中接收消息,且这个消息是由MOM server push上去的。PushConsumer可依附于多个独立的、具有不同的MessageListener的队列,并且可以随时通过ReceivedMessageContext提交结果。
  • StreamingConsumer:一种崭新的consumer类型,是一种面向流的consumer,面向留信息的一体化信息系统。

Topic Queue and Routing

这三个概念非常相近。虽然Topic和Queue有不同的用途,但它们总让人迷惑。

Topic

Topic是原始信息的载体,用来holding消息。消息的分发方式和有序性是没有定义的。

Routing

Topic中的消息是原始的,是待处理的,一般不易引起consumers的注意。总之,Topic中的数据是producer-orented(导向)的,而不是consumer-oriented。

因此Routing负责加工Topic中的原始消息,并routing去Queue中。每个Routing有一个操作管线(operator pipeline),包含着一系列的操作。消息会通过操作管线从Topic流向Queue。

操作(operator)是用来处理在Routing流通的消息的。有很多操作,例如expression operator, deduplicator operator, joiner operator, filter operator, rpc operator等等。

Queue(队列)

现在消息已经被routed到Queue中了。现在消息就可以被consumers使用了。

需要注意的是,一个Queue可能会被分为几部分,消息可能通过MessageHeader#SHARDING_KEY被routed到某个特殊的部分中。

Topic与Queue比较

  • 都是消息的载体
  • Topic是preducer-oriented的,而Queue是consumer-oriented的
  • Topic中的消息来自于Producer,而Queue中的消息来自于Topic或者Producer
  • Queue包含几个部分,而Topic形状未定义
  • 在大多数情况下,Queue是Topic的一个子集
  • Queue的创建、销毁都很容易,且与producer无关

参考文献

  1. 原始文档
  2. pugwoo用c写的
  3. 原始文档扒的API

中间件概念

中间件:处于操作系统和应用程序之间的软件。

中间件简单解释,可以理解为面向信息系统交互,集成过程中的通用部分的集合,屏蔽了底层的通讯,交互,连接等复杂又通用化的功能,以产品的形式提供出来,系统在交互时,直接采用中间件进行连接和交互即可,避免了大量的代码开发和人工成本。

其实,理论上来讲,中间件所提供的功能通过代码编写都可以实现,只不过开发的周期和需要考虑的问题太多,逐渐的,这些部分,以中间件产品的形式进行了替代。

比如常见的消息中间件,即系统之间的通讯与交互的专用通道,类似于邮局,系统只需要把传输的消息交给中间件,由中间件负责传递,并保证传输过程中的各类问题,如网络问题,协议问题,两端的开发接口问题等均由消息中间件屏蔽了,出现了网络故障时,消息中间件会负责缓存消息,以避免信息丢失。相当于你想给美国发一个邮包,只需要把邮包交给邮局,填写地址和收件人,至于运送过程中的一系列问题你都不需要关心了。

中间件分类

  1. 消息中间件(MOM:Message-Oriented Middleware)
  2. 数据中间件(Database Middleware)
  3. 远程过程调用中间件(RPC:Remote Process Call)
  4. 对象请求代理中间件(ORB:Object Request Broker)
  5. 事务处理中间件(TP Monitor:Transaction Process Monitor)
  6. J2EE中间件

Open-Messaging

是一个建立行业内的指引和消息的协议(charter)。它的streaming规范提供了一个可用于电子商务、物联网和大数据的基础框架。它的主要目标是建立一个在分布式异构环境中面向云、简单、灵活和独立于语言的环境。协议的一致性似的它可以跨平台开发异构消息应用程序。

域结构

消息中间件 Message Queue

Message Queue是一种应用程序对应用程序的通信方法。程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

提及消息中间件的时候,还会涉及生产者和消费者两个概念。消息中间件是负责接收来自生产者的消息,并存储并转发给对应的消费者,生产者可以按 topic 发布各样消息,消费者也可以按 topic 订阅各样消息。生产者只管往消息队列里推送消息,不用等待消费者的回应;消费者只管从消息队列中取出数据并处理,可用可靠性等问题都交由消息中间件来负责。 生产者和消费者通常有两种对应关系,一个生产者对应一个消费者,以及一个生产者对应多个消费者。

参考文献

1.知乎FireJones的回答 2.极客学院,消息中间件

os模块

os模块的作用:

  os,语义为操作系统,所以肯定就是操作系统相关的功能了,可以处理文件和目录这些我们日常手动需要做的操作,就比如说:显示当前目录下所有文件/删除某个文件/获取文件大小……

  另外,os模块不受平台限制,也就是说:当我们要在linux中显示当前命令时就要用到pwd命令,而Windows中cmd命令行下就要用到这个,额...我擦,我还真不知道,(甭管怎么着,肯定不是pwd),这时候我们使用python中os模块的os.path.abspath(name)功能,甭管是linux或者Windows都可以获取当前的绝对路径。

文件夹操作

检验文件夹是否存在

os.path.exists(directory)

创建文件夹

os.makedirs(directory)

实例: 检验文件夹是否存在,若不存在,则创建之

1
2
3
4
def check_dir(directory):
flag=os.path.exists(directory)
if not flag:
os.makedirs(directory)

遍历指定目录名,显示目录下所有文件

1
2
3
pathDir =  os.listdir(filepath)
for allDir in pathDir:
print allDir

参考文献

  1. python基础之模块之os模块