新聞中心
發(fā)布與訂閱模型在許多編程語(yǔ)言中都有實(shí)現(xiàn),也就是我們經(jīng)常說(shuō)的設(shè)計(jì)模式中的一種–觀察者模式。在一些應(yīng)用場(chǎng)合,例如發(fā)送方并不是以固定頻率發(fā)送消息,如果接收方頻繁去咨詢發(fā)送方,這種操作無(wú)疑是很麻煩并且不友好的。

為什么做訂閱分布?
隨著業(yè)務(wù)復(fù)雜, 業(yè)務(wù)的項(xiàng)目依賴關(guān)系增強(qiáng), 使用消息隊(duì)列幫助系統(tǒng)降低耦合度.
-
訂閱分布本身也是一種生產(chǎn)者消費(fèi)者模式, 訂閱者是消費(fèi)者, 發(fā)布者是生產(chǎn)者.
-
訂閱發(fā)布模式, 發(fā)布者發(fā)布消息后, 只要有訂閱方, 則多個(gè)訂閱方會(huì)收到同樣的消息
-
生產(chǎn)者消費(fèi)者模式, 生產(chǎn)者往隊(duì)列里放入消息, 由多個(gè)消費(fèi)者對(duì)一條消息進(jìn)行搶占.
-
訂閱分布模式可以將一些不著急完成的工作放到其他進(jìn)程或者線程中進(jìn)行離線處理.
Redis中的訂閱發(fā)布
“
Redis中的訂閱發(fā)布模式, 當(dāng)沒(méi)有訂閱者時(shí), 消息會(huì)被直接丟棄(Redis不會(huì)持久化保存消息)
”
Redis生產(chǎn)者消費(fèi)者
生產(chǎn)者使用Redis中的list數(shù)據(jù)結(jié)構(gòu)進(jìn)行實(shí)現(xiàn), 將待處理的消息塞入到消息隊(duì)列中.
class Producer(object):
def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self.key = "test_key"
self.value = "test_value_{id}"
def produce(self):
for id in xrange(5):
msg = self.value.format(id=id)
self._conn.lpush(self.key, msg)
消費(fèi)者使用redis中brpop進(jìn)行實(shí)現(xiàn), brpop會(huì)從list頭部消息, 并能夠設(shè)置超時(shí)等待時(shí)間.
class Consumer(object):
def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self.key = "test_key"
def consume(self, timeout=0):
# timeout=0 表示會(huì)無(wú)線阻塞, 直到獲得消息
while True:
msg = self._conn.brpop(self.key, timeout=timeout)
process(msg)
def process(msg):
print msg
if __name__ == '__main__':
consumer = Consumer()
consumer.consume()
# 輸出結(jié)果
('test_key', 'test_value_1')
('test_key', 'test_value_2')
('test_key', 'test_value_3')
('test_key', 'test_value_4')
('test_key', 'test_value_5')
Redis中訂閱發(fā)布
在Redis Pubsub中, 一個(gè)頻道(channel)相當(dāng)于一個(gè)消息隊(duì)列
class Publisher(object):
def __init__(self, host, port):
self._conn = redis.StrictRedis(host=host, port=port)
self.channel = "test_channel"
self.value = "test_value_{id}"
def pub(self):
for id in xrange(5):
msg = self.value.format(id=id)
self._conn.publish(self.channel, msg)
其中get_message使用了select IO多路復(fù)用來(lái)檢查socket連接是否是否可讀.
class Subscriber(object):
def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self._pubsub = self._conn.pubsub() # 生成pubsub對(duì)象
self.channel = "test_channel"
self._pubsub.subscribe(self.channel)
def sub(self):
while True:
msg = self._pubsub.get_message()
if msg and isinstance(msg.get("data"), basestring):
process(msg.get("data"))
def close(self):
self._pubsub.close()
# 輸出結(jié)果
test_value_1
test_value_2
test_value_3
test_value_4
test_value_
Java Jedis踩過(guò)的坑
在Jedis中訂閱方處理是采用同步的方式, 看源碼中PubSub模塊的process函數(shù)
在do-while循環(huán)中, 會(huì)等到當(dāng)前消息處理完畢才能夠處理下一條消息, 這樣會(huì)導(dǎo)致當(dāng)入隊(duì)列消息量過(guò)大的時(shí)候, redis鏈接被強(qiáng)制關(guān)閉.
解決方案: 將整個(gè)處理函數(shù)改為異步的方式.
網(wǎng)頁(yè)名稱:講解一下Redis中的訂閱發(fā)布模式
新聞來(lái)源:http://www.5511xx.com/article/dpieppi.html


咨詢
建站咨詢
