日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
講解一下Redis中的訂閱發(fā)布模式

發(fā)布與訂閱模型在許多編程語(yǔ)言中都有實(shí)現(xiàn),也就是我們經(jīng)常說(shuō)的設(shè)計(jì)模式中的一種–觀察者模式。在一些應(yīng)用場(chǎng)合,例如發(fā)送方并不是以固定頻率發(fā)送消息,如果接收方頻繁去咨詢發(fā)送方,這種操作無(wú)疑是很麻煩并且不友好的。

為什么做訂閱分布?

隨著業(yè)務(wù)復(fù)雜, 業(yè)務(wù)的項(xiàng)目依賴關(guān)系增強(qiáng), 使用消息隊(duì)列幫助系統(tǒng)降低耦合度.

  • 訂閱分布本身也是一種生產(chǎn)者消費(fèi)者模式, 訂閱者是消費(fèi)者, 發(fā)布者是生產(chǎn)者.
  • 訂閱發(fā)布模式, 發(fā)布者發(fā)布消息后, 只要有訂閱方, 則多個(gè)訂閱方會(huì)收到同樣的消息
  • 生產(chǎn)者消費(fèi)者模式, 生產(chǎn)者往隊(duì)列里放入消息, 由多個(gè)消費(fèi)者對(duì)一條消息進(jìn)行搶占.
  • 訂閱分布模式可以將一些不著急完成的工作放到其他進(jìn)程或者線程中進(jìn)行離線處理.

Redis中的訂閱發(fā)布

Redis中的訂閱發(fā)布模式, 當(dāng)沒(méi)有訂閱者時(shí), 消息會(huì)被直接丟棄(Redis不會(huì)持久化保存消息)

Redis生產(chǎn)者消費(fèi)者

生產(chǎn)者使用Redis中的list數(shù)據(jù)結(jié)構(gòu)進(jìn)行實(shí)現(xiàn), 將待處理的消息塞入到消息隊(duì)列中.

class Producer(object):

def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self.key = "test_key"
self.value = "test_value_{id}"

def produce(self):
for id in xrange(5):
msg = self.value.format(id=id)
self._conn.lpush(self.key, msg)

消費(fèi)者使用redis中brpop進(jìn)行實(shí)現(xiàn), brpop會(huì)從list頭部消息, 并能夠設(shè)置超時(shí)等待時(shí)間.

class Consumer(object):

def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self.key = "test_key"

def consume(self, timeout=0):
# timeout=0 表示會(huì)無(wú)線阻塞, 直到獲得消息
while True:
msg = self._conn.brpop(self.key, timeout=timeout)
process(msg)


def process(msg):
print msg

if __name__ == '__main__':
consumer = Consumer()
consumer.consume()
# 輸出結(jié)果
('test_key', 'test_value_1')
('test_key', 'test_value_2')
('test_key', 'test_value_3')
('test_key', 'test_value_4')
('test_key', 'test_value_5')

Redis中訂閱發(fā)布

在Redis Pubsub中, 一個(gè)頻道(channel)相當(dāng)于一個(gè)消息隊(duì)列

class Publisher(object):

def __init__(self, host, port):
self._conn = redis.StrictRedis(host=host, port=port)
self.channel = "test_channel"
self.value = "test_value_{id}"

def pub(self):
for id in xrange(5):
msg = self.value.format(id=id)
self._conn.publish(self.channel, msg)

其中get_message使用了select IO多路復(fù)用來(lái)檢查socket連接是否是否可讀.

class Subscriber(object):

def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self._pubsub = self._conn.pubsub() # 生成pubsub對(duì)象
self.channel = "test_channel"
self._pubsub.subscribe(self.channel)

def sub(self):
while True:
msg = self._pubsub.get_message()
if msg and isinstance(msg.get("data"), basestring):
process(msg.get("data"))

def close(self):
self._pubsub.close()

# 輸出結(jié)果
test_value_1
test_value_2
test_value_3
test_value_4
test_value_

Java Jedis踩過(guò)的坑

在Jedis中訂閱方處理是采用同步的方式, 看源碼中PubSub模塊的process函數(shù)

do-while循環(huán)中, 會(huì)等到當(dāng)前消息處理完畢才能夠處理下一條消息, 這樣會(huì)導(dǎo)致當(dāng)入隊(duì)列消息量過(guò)大的時(shí)候, redis鏈接被強(qiáng)制關(guān)閉.

解決方案: 將整個(gè)處理函數(shù)改為異步的方式.


網(wǎng)頁(yè)名稱:講解一下Redis中的訂閱發(fā)布模式
新聞來(lái)源:http://www.5511xx.com/article/dpieppi.html