Karait:利用MongoDB Capped Collection构建的消息队列

Karait是一个消息队列系统,其消息存储采用了MongoDB的Capped Collections结构,这一结构的特点就是集合的大小可设定,当数据大小超出设定大小时,新数据会抹掉旧数据。

Karait目前提供Python和Ruby的客户端操作包,也就是说你可以使用Python或Ruby来写消息,用Python或Ruby来读队列。下面就是一个例子,用Python写队列,用Ruby读队列。

用python写队列:

import time
from karait import Message, Queue

print 'Starting python writer.'

messages_written = 0
start = time.time()
queue = Queue()

while True:
    queue.write({
        'messages_written': messages_written,
        'sender': 'writer.py',
        'started_running': start,
        'messages_written_per_second': (messages_written / (time.time() - start))
    }, routing_key='for_ruby_reader_writer')
    messages_written += 1

用Ruby读队列:

require 'rubygems'
require 'karait'
require 'yaml'

puts "Starting ruby reader/writer"

messages_read = 0.0
start_time = Time.now().to_f

queue = Karait::Queue.new
while true
  messages = queue.read :routing_key => 'for_ruby_reader_writer', :messages_read => 15
  messages.each do |message|
    messages_read += 1.0

    message.messages_read = messages_read
    message.messages_read_per_second = messages_read / (Time.now().to_f - start_time)

    if (messages_read % 250) == 0.0
      puts "Message Read: \n#{message.to_hash.to_yaml}"
    end
  end
  queue.delete_messages messages
end

输出结果:

---
sender: writer.py
messages_read_per_second: 1653.29212369982
started_running: 1314790290.33488
messages_written_per_second: 1900.18564215637
messages_written: 106199
messages_read: 110250.0

项目地址:github.com

参考文章:blog.attachments.me

anyShare据说看到好文章不转的人,服务器容易宕机!
          

无觅相关文章插件,快速提升流量

  1. 我原来用mongodb 也实现过一个,同样也是 Capped Collection + findAndModify . 但是注意不要把queue 的数据和你的其他数据放在一个instance里面,因为findAndModify 虽然很快,但是会锁住数据库,在数据量大的时候会产生死锁。