blog

ソフトウェア開発|インスタント・メッセージング・アプリケーションの構築:リアルタイム・メッセージング

リアルタイムのメッセージの場合、イベントはサーバーを使って送信されます。...

Oct 18, 2025 · 4 min. read
シェア

この記事はシリーズの5回目です。

リアルタイム・メッセージの場合は、.NET 接続が使用されます。これはデータのストリームを送信できるオープンな接続です。ユーザが自分宛に送られるすべてのメッセージを購読するエンドポイントがあります。

メッセージターミナル

HTTPセクションで、すべてのクライアントがメッセージをリッスンできるように、まず,を書きましょう。 このようにグローバルに初期化します:

type MessageClient struct {
 Messages chan Message
 UserID string
}
var messageClients sync.Map

新規メッセージの作成

go messageCreated(message)
func messageCreated(message Message) error {
 if err := db.QueryRow(`
 SELECT user_id FROM participants
 WHERE user_id != $1 and conversation_id = $2
 `, message.UserID, message.ConversationID).
 Scan(&message.ReceiverID); err != nil {
 return err
 }
 go broadcastMessage(message)
 return nil
}
func broadcastMessage(message Message) {
 messageClients.Range(func(key, _ interface{}) bool {
 client := key.(*MessageClient)
 if client.UserID == message.ReceiverID {
 client.Messages <- message
 }
 return true
 })
}

この関数は受信者IDを照会し、すべてのクライアントにメッセージを送信します。

メッセージの購読

Functions main() に移動し、以下のルートを追加します:

router.obj('GET', '/api/messages', guard(subscribeToMessages));

このエンドポイントは、/api/messages の GET リクエストを処理します。リクエストは EventSource 接続でなければなりません。このエンドポイントは、JSON 形式のデータを含むイベント・ストリームで応答します。

func subscribeToMessages(w http.er, r *http.st) {
 if a := r.Header.Get("Accept"); !strings.Contains(a, "text/event-stream") {
 http.(w, "This endpoint requires an EventSource connection", http.le)
 return
 }
 f, ok := w.(http.er)
 if !ok {
 respondError(w, errors.New("streaming unsupported"))
 return
 }
 ctx := r.Context()
 authUserID := ctx.Value(keyAuthUserID).(string)
 h := w.Header()
 h.Set("Cache-Control", "no-cache")
 h.Set("Connection", "keep-alive")
 h.Set("Content-Type", "text/event-stream")
 messages := make(chan Message)
 defer close(messages)
 client := &MessageClient{Messages: messages, UserID: authUserID}
 messageClients.Store(client, nil)
 defer messageClients.Delete(client)
 for {
 select {
 case <-ctx.Done():
 return
 case message := <-messages:
 if b, err := json.Marshal(message); err != nil {
 log.Printf("could not marshall message: %v
", err)
 fmt.Fprintf(w, "event: error
data: %v

", err)
 } else {
 fmt.Fprintf(w, "data: %s

", b)
 }
 f.Flush()
 }
 }
}

まず、リクエストヘッダが正しいかどうか、サーバーがストリーミングを サポートしているかどうかをチェックします。メッセージチャネルが作成され、そのチャネルでクライアントが構築され、 クライアントマップに格納されます。新しいメッセージが作成されるたびに、そのメッセージはこのチャネルに入ります。

データを送信するには、以下の形式を使用します:

data: some data here


JSON形式で送信されます:

data: {"foo":"bar"}


fmt.Fprintf() 使用して、このフォーマットでレスポンスを書き込み、ループの各反復でデータをリフレッシュします。

このループは、リクエストコンテキストを使用して接続が閉じられるまで実行されます。チャネルのクローズとクライアントの削除は、ループが終了したときにチャネルがクローズされ、クライアントがメッセージを受け取らないように、遅延されます。

JavaScript API はカスタムリクエストヘッダーの設定をサポートしていないことに注意してください。 Authorization: Bearer <token>設定することはできません。そのため、guard() ミドルウェアは URL クエリ文字列からトークンも読み取ります。

これでリアルタイム・メッセージング・セクションは終わりです。バックエンドはこれで終わりです。しかし、フロントエンドのコードを書くために、もう一つログインエンドポイントを追加します。

経由

Read next