跳至內容 跳至搜尋

Action Cable Channel Streams

Streams 允許通道將廣播路由到訂閱者。如同其他地方所討論的,廣播是一個發佈/訂閱佇列,其中放入的任何資料都會自動發送到當時連線的客戶端。然而,它純粹是一個線上佇列。如果您沒有在廣播發送更新的那一刻串流它,您將不會收到該更新,即使您在它發送後才連線。

最常見的情況是,串流的廣播會直接發送到客戶端上的訂閱者。通道僅充當雙方(廣播者和通道訂閱者)之間的連接器。以下是一個允許訂閱者獲取給定頁面上所有新評論的通道範例

class CommentsChannel < ApplicationCable::Channel
  def follow(data)
    stream_from "comments_for_#{data['recording_id']}"
  end

  def unfollow
    stop_all_streams
  end
end

基於上述範例,此通道的訂閱者將在資料放入,例如,comments_for_45 廣播後立即收到該資料。

此通道的廣播範例如下所示

ActionCable.server.broadcast "comments_for_45", { author: 'DHH', content: 'Rails is just swell' }

如果您有一個與模型相關的串流,則可以使用模型和通道生成所使用的廣播。以下範例將訂閱類似 comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE 的廣播。

class CommentsChannel < ApplicationCable::Channel
  def subscribed
    post = Post.find(params[:id])
    stream_for post
  end
end

然後,您可以使用以下方式向此通道廣播

CommentsChannel.broadcast_to(@post, @comment)

如果您不只想將未經過濾的廣播轉發給訂閱者,您還可以提供一個回呼,讓您可以更改發送的內容。以下範例顯示了如何使用它來提供過程中的效能檢查

class ChatChannel < ApplicationCable::Channel
  def subscribed
    @room = Chat::Room[params[:room_number]]

    stream_for @room, coder: ActiveSupport::JSON do |message|
      if message['originated_at'].present?
        elapsed_time = (Time.now.to_f - message['originated_at']).round(2)

        ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
        logger.info "Message took #{elapsed_time}s to arrive"
      end

      transmit message
    end
  end
end

您可以透過呼叫 stop_all_streams 來停止所有廣播的串流。

方法
S

實例公開方法

stop_all_streams()

從發佈/訂閱佇列取消訂閱與此通道關聯的所有串流。

# File actioncable/lib/action_cable/channel/streams.rb, line 135
def stop_all_streams
  streams.each do |broadcasting, callback|
    pubsub.unsubscribe broadcasting, callback
    logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
  end.clear
end

stop_stream_for(model)

取消訂閱 model 的串流。

# File actioncable/lib/action_cable/channel/streams.rb, line 130
def stop_stream_for(model)
  stop_stream_from(broadcasting_for(model))
end

stop_stream_from(broadcasting)

取消訂閱指定名稱 broadcasting 的串流。

# File actioncable/lib/action_cable/channel/streams.rb, line 121
def stop_stream_from(broadcasting)
  callback = streams.delete(broadcasting)
  if callback
    pubsub.unsubscribe(broadcasting, callback)
    logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
  end
end

stream_for(model, callback = nil, coder: nil, &block)

在此通道中開始串流 model 的發佈/訂閱佇列。或者,您可以傳遞一個 callback,它將用於取代直接將更新傳輸到訂閱者的預設行為。

傳遞 coder: ActiveSupport::JSON 將訊息解碼為 JSON 後再傳遞給回呼。預設為 coder: nil,它不解碼,傳遞原始訊息。

# File actioncable/lib/action_cable/channel/streams.rb, line 116
def stream_for(model, callback = nil, coder: nil, &block)
  stream_from(broadcasting_for(model), callback || block, coder: coder)
end

stream_from(broadcasting, callback = nil, coder: nil, &block)

開始從指定的 broadcasting 發佈/訂閱佇列進行串流。或者,您可以傳遞一個 callback,它將用於取代直接將更新傳輸到訂閱者的預設行為。傳遞 coder: ActiveSupport::JSON 將訊息解碼為 JSON 後再傳遞給回呼。預設為 coder: nil,它不解碼,傳遞原始訊息。

# File actioncable/lib/action_cable/channel/streams.rb, line 90
def stream_from(broadcasting, callback = nil, coder: nil, &block)
  broadcasting = String(broadcasting)

  # Don't send the confirmation until pubsub#subscribe is successful
  defer_subscription_confirmation!

  # Build a stream handler by wrapping the user-provided callback with a decoder
  # or defaulting to a JSON-decoding retransmitter.
  handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
  streams[broadcasting] = handler

  connection.server.event_loop.post do
    pubsub.subscribe(broadcasting, handler, lambda do
      ensure_confirmation_sent
      logger.info "#{self.class.name} is streaming from #{broadcasting}"
    end)
  end
end

stream_or_reject_for(model)

如果 model 存在,則使用給定的 model 呼叫 stream_for 以開始串流,否則拒絕訂閱。

# File actioncable/lib/action_cable/channel/streams.rb, line 144
def stream_or_reject_for(model)
  if model
    stream_for model
  else
    reject
  end
end