日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
構(gòu)建一個(gè)即時(shí)消息應(yīng)用(五):實(shí)時(shí)消息

本文是該系列的第五篇。

創(chuàng)新互聯(lián)專注于企業(yè)成都全網(wǎng)營銷推廣、網(wǎng)站重做改版、甘南網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、html5、商城系統(tǒng)網(wǎng)站開發(fā)、集團(tuán)公司官網(wǎng)建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為甘南等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。

  • 第一篇: 模式
  • 第二篇: OAuth
  • 第三篇: 對(duì)話
  • 第四篇: 消息

對(duì)于實(shí)時(shí)消息,我們將使用 服務(wù)器發(fā)送事件Server-Sent Events。這是一個(gè)打開的連接,我們可以在其中傳輸數(shù)據(jù)流。我們會(huì)有個(gè)端點(diǎn),用戶會(huì)在其中訂閱發(fā)送給他的所有消息。

消息戶端

在 HTTP 部分之前,讓我們先編寫一個(gè)映射map ,讓所有客戶端都監(jiān)聽消息。 像這樣全局初始化:

 
 
 
  1. type MessageClient struct {
  2.     Messages chan Message
  3.     UserID   string
  4. }
  5. var messageClients sync.Map

已創(chuàng)建的新消息

還記得在 上一篇文章 中,當(dāng)我們創(chuàng)建這條消息時(shí),我們留下了一個(gè) “TODO” 注釋。在那里,我們將使用這個(gè)函數(shù)來調(diào)度一個(gè) goroutine。

 
 
 
  1. go messageCreated(message)

把這行代碼插入到我們留注釋的位置。

 
 
 
  1. func messageCreated(message Message) error {
  2.     if err := db.QueryRow(`
  3.         SELECT user_id FROM participants
  4.         WHERE user_id != $1 and conversation_id = $2
  5.     `, message.UserID, message.ConversationID).
  6.     Scan(&message.ReceiverID); err != nil {
  7.         return err
  8.     }
  9.     go broadcastMessage(message)
  10.     return nil
  11. }
  12. func broadcastMessage(message Message) {
  13.     messageClients.Range(func(key, _ interface{}) bool {
  14.         client := key.(*MessageClient)
  15.         if client.UserID == message.ReceiverID {
  16.             client.Messages <- message
  17.         }
  18.         return true
  19.     })
  20. }

該函數(shù)查詢接收者 ID(其他參與者 ID),并將消息發(fā)送給所有客戶端。

訂閱消息

讓我們轉(zhuǎn)到 main() 函數(shù)并添加以下路由:

 
 
 
  1. router.HandleFunc("GET", "/api/messages", guard(subscribeToMessages))

此端點(diǎn)處理 /api/messages 上的 GET 請(qǐng)求。請(qǐng)求應(yīng)該是一個(gè) EventSource 連接。它用一個(gè)事件流響應(yīng),其中的數(shù)據(jù)是 JSON 格式的。

 
 
 
  1. func subscribeToMessages(w http.ResponseWriter, r *http.Request) {
  2.     if a := r.Header.Get("Accept"); !strings.Contains(a, "text/event-stream") {
  3.         http.Error(w, "This endpoint requires an EventSource connection", http.StatusNotAcceptable)
  4.         return
  5.     }
  6.     f, ok := w.(http.Flusher)
  7.     if !ok {
  8.         respondError(w, errors.New("streaming unsupported"))
  9.         return
  10.     }
  11.     ctx := r.Context()
  12.     authUserID := ctx.Value(keyAuthUserID).(string)
  13.     h := w.Header()
  14.     h.Set("Cache-Control", "no-cache")
  15.     h.Set("Connection", "keep-alive")
  16.     h.Set("Content-Type", "text/event-stream")
  17.     messages := make(chan Message)
  18.     defer close(messages)
  19.     client := &MessageClient{Messages: messages, UserID: authUserID}
  20.     messageClients.Store(client, nil)
  21.     defer messageClients.Delete(client)
  22.     for {
  23.         select {
  24.         case <-ctx.Done():
  25.             return
  26.         case message := <-messages:
  27.             if b, err := json.Marshal(message); err != nil {
  28.                 log.Printf("could not marshall message: %v\n", err)
  29.                 fmt.Fprintf(w, "event: error\ndata: %v\n\n", err)
  30.             } else {
  31.                 fmt.Fprintf(w, "data: %s\n\n", b)
  32.             }
  33.             f.Flush()
  34.         }
  35.     }
  36. }

首先,它檢查請(qǐng)求頭是否正確,并檢查服務(wù)器是否支持流式傳輸。我們創(chuàng)建一個(gè)消息通道,用它來構(gòu)建一個(gè)客戶端,并將其存儲(chǔ)在客戶端映射中。每當(dāng)創(chuàng)建新消息時(shí),它都會(huì)進(jìn)入這個(gè)通道,因此我們可以通過 for-select 循環(huán)從中讀取。

服務(wù)器發(fā)送事件Server-Sent Events使用以下格式發(fā)送數(shù)據(jù):

 
 
 
  1. data: some data here\n\n

我們以 JSON 格式發(fā)送:

 
 
 
  1. data: {"foo":"bar"}\n\n

我們使用 fmt.Fprintf() 以這種格式寫入響應(yīng)寫入器writter,并在循環(huán)的每次迭代中刷新數(shù)據(jù)。

這個(gè)循環(huán)會(huì)一直運(yùn)行,直到使用請(qǐng)求上下文關(guān)閉連接為止。我們延遲了通道的關(guān)閉和客戶端的刪除,因此,當(dāng)循環(huán)結(jié)束時(shí),通道將被關(guān)閉,客戶端不會(huì)收到更多的消息。

注意,服務(wù)器發(fā)送事件Server-Sent Events(EventSource)的 JavaScript API 不支持設(shè)置自定義請(qǐng)求頭,所以我們不能設(shè)置 Authorization: Bearer 。這就是為什么 guard() 中間件也會(huì)從 URL 查詢字符串中讀取令牌的原因。


實(shí)時(shí)消息部分到此結(jié)束。我想說的是,這就是后端的全部?jī)?nèi)容。但是為了編寫前端代碼,我將再增加一個(gè)登錄端點(diǎn):一個(gè)僅用于開發(fā)的登錄。

  • 源代碼

當(dāng)前名稱:構(gòu)建一個(gè)即時(shí)消息應(yīng)用(五):實(shí)時(shí)消息
當(dāng)前URL:http://www.5511xx.com/article/dhjehec.html