Action Cable Channel
Streams
Streams
允許通道將廣播路由到訂閱者。如同其他地方所討論的,廣播是一個發佈/訂閱佇列,其中放入的任何資料都會自動發送到當時連線的客戶端。然而,它純粹是一個線上佇列。如果您沒有在廣播發送更新的那一刻串流它,您將不會收到該更新,即使您在它發送後才連線。
最常見的情況是,串流的廣播會直接發送到客戶端上的訂閱者。通道僅充當雙方(廣播者和通道訂閱者)之間的連接器。以下是一個允許訂閱者獲取給定頁面上所有新評論的通道範例
class CommentsChannel < ApplicationCable::Channel
def follow(data)
stream_from "comments_for_#{data['recording_id']}"
end
def unfollow
stop_all_streams
end
end
基於上述範例,此通道的訂閱者將在資料放入,例如,comments_for_45
廣播後立即收到該資料。
此通道的廣播範例如下所示
ActionCable.server.broadcast "comments_for_45", { author: 'DHH', content: 'Rails is just swell' }
如果您有一個與模型相關的串流,則可以使用模型和通道生成所使用的廣播。以下範例將訂閱類似 comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE
的廣播。
class CommentsChannel < ApplicationCable::Channel
def subscribed
post = Post.find(params[:id])
stream_for post
end
end
然後,您可以使用以下方式向此通道廣播
CommentsChannel.broadcast_to(@post, @comment)
如果您不只想將未經過濾的廣播轉發給訂閱者,您還可以提供一個回呼,讓您可以更改發送的內容。以下範例顯示了如何使用它來提供過程中的效能檢查
class ChatChannel < ApplicationCable::Channel
def subscribed
@room = Chat::Room[params[:room_number]]
stream_for @room, coder: ActiveSupport::JSON do |message|
if message['originated_at'].present?
elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
logger.info "Message took #{elapsed_time}s to arrive"
end
transmit message
end
end
end
您可以透過呼叫 stop_all_streams
來停止所有廣播的串流。
- S
實例公開方法
stop_all_streams() 連結
從發佈/訂閱佇列取消訂閱與此通道關聯的所有串流。
來源:顯示 | 在 GitHub 上
# File actioncable/lib/action_cable/channel/streams.rb, line 135 def stop_all_streams streams.each do |broadcasting, callback| pubsub.unsubscribe broadcasting, callback logger.info "#{self.class.name} stopped streaming from #{broadcasting}" end.clear end
stop_stream_for(model) 連結
取消訂閱 model
的串流。
來源:顯示 | 在 GitHub 上
# File actioncable/lib/action_cable/channel/streams.rb, line 130 def stop_stream_for(model) stop_stream_from(broadcasting_for(model)) end
stop_stream_from(broadcasting) 連結
取消訂閱指定名稱 broadcasting
的串流。
來源:顯示 | 在 GitHub 上
# File actioncable/lib/action_cable/channel/streams.rb, line 121 def stop_stream_from(broadcasting) callback = streams.delete(broadcasting) if callback pubsub.unsubscribe(broadcasting, callback) logger.info "#{self.class.name} stopped streaming from #{broadcasting}" end end
stream_for(model, callback = nil, coder: nil, &block) 連結
在此通道中開始串流 model
的發佈/訂閱佇列。或者,您可以傳遞一個 callback
,它將用於取代直接將更新傳輸到訂閱者的預設行為。
傳遞 coder: ActiveSupport::JSON
將訊息解碼為 JSON 後再傳遞給回呼。預設為 coder: nil
,它不解碼,傳遞原始訊息。
來源:顯示 | 在 GitHub 上
# File actioncable/lib/action_cable/channel/streams.rb, line 116 def stream_for(model, callback = nil, coder: nil, &block) stream_from(broadcasting_for(model), callback || block, coder: coder) end
stream_from(broadcasting, callback = nil, coder: nil, &block) 連結
開始從指定的 broadcasting
發佈/訂閱佇列進行串流。或者,您可以傳遞一個 callback
,它將用於取代直接將更新傳輸到訂閱者的預設行為。傳遞 coder: ActiveSupport::JSON
將訊息解碼為 JSON 後再傳遞給回呼。預設為 coder: nil
,它不解碼,傳遞原始訊息。
來源:顯示 | 在 GitHub 上
# File actioncable/lib/action_cable/channel/streams.rb, line 90 def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! # Build a stream handler by wrapping the user-provided callback with a decoder # or defaulting to a JSON-decoding retransmitter. handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder) streams[broadcasting] = handler connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end end
stream_or_reject_for(model) 連結
如果 model
存在,則使用給定的 model
呼叫 stream_for
以開始串流,否則拒絕訂閱。
來源:顯示 | 在 GitHub 上
# File actioncable/lib/action_cable/channel/streams.rb, line 144 def stream_or_reject_for(model) if model stream_for model else reject end end