EDICIÓN ACTUALIZADA AL FINAL:muestra el código de trabajo. Módulo principal sin modificar excepto por el código de depuración. Nota:experimenté el problema que ya noté con respecto a la necesidad de cancelar la suscripción antes de la terminación.
El código parece correcto. Me gustaría ver cómo lo estás instanciando.
En config/application.rb, probablemente tenga al menos algo como:
require 'ws_communication'
config.middleware.use WsCommunication
Luego, en su cliente de JavaScript, debería tener algo como esto:
var ws = new WebSocket(uri);
¿Crea una instancia de otra instancia de WsCommunication? Eso establecería a @clients en una matriz vacía y podría mostrar sus síntomas. Algo como esto sería incorrecto:
var ws = new WsCommunication;
Nos ayudaría si mostrara el cliente y, quizás, config/application.rb si esta publicación no ayuda.
Por cierto, estoy de acuerdo con el comentario de que @clients debe estar protegido por un mutex en cualquier actualización, si no se lee también. Es una estructura dinámica que podría cambiar en cualquier momento en un sistema controlado por eventos. redis-mutex es una buena opción. (Espero que el enlace sea correcto ya que Github parece arrojar 500 errores en todo en este momento).
También puede notar que $redis.publish devuelve un valor entero de la cantidad de clientes que recibieron el mensaje.
Finalmente, es posible que deba asegurarse de cancelar la suscripción de su canal antes de la terminación. He tenido situaciones en las que terminé enviando cada mensaje varias veces, incluso muchas, debido a suscripciones anteriores al mismo canal que no se limpiaron. Dado que se está suscribiendo al canal dentro de un hilo, deberá darse de baja dentro de ese mismo hilo o el proceso simplemente se "colgará" esperando que aparezca mágicamente el hilo correcto. Manejo esa situación configurando un indicador de "cancelar suscripción" y luego enviando un mensaje. Luego, dentro del bloque on.message, pruebo el indicador de cancelación de suscripción y emito la cancelación de suscripción allí.
El módulo que proporcionó, con solo modificaciones menores de depuración:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
$redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts "Message event. Clients receiving:#{@clients.count};"
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts "Open event. Clients open:#{@clients.count};"
end
ws.on :message do |event|
receivers = $redis.publish(CHANNEL, event.data)
puts "Message published:#{event.data}; Receivers:#{receivers};"
end
ws.on :close do |event|
@clients.delete(ws)
puts "Close event. Clients open:#{@clients.count};"
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
El código de suscriptor de prueba que proporcioné:
# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
ws = WebSocket::Client::Simple.connect url
ws.on :message do |msg|
puts msg
end
ws.on :open do
puts "-- Subscriber open (#{ws.url})"
end
ws.on :close do |e|
puts "-- Subscriber close (#{e.inspect})"
exit 1
end
ws.on :error do |e|
puts "-- Subscriber error (#{e.inspect})"
end
end
El código de editor de prueba que proporcioné. El editor y el suscriptor se pueden combinar fácilmente, ya que estas son solo pruebas:
# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
count ||= 0
timer = EventMachine.add_periodic_timer(5+rand(5)) do
count += 1
send({"MESSAGE": "COUNT:#{count};"})
end
@ws = WebSocket::Client::Simple.connect url
@ws.on :message do |msg|
puts msg
end
@ws.on :open do
puts "-- Publisher open"
end
@ws.on :close do |e|
puts "-- Publisher close (#{e.inspect})"
exit 1
end
@ws.on :error do |e|
puts "-- Publisher error (#{e.inspect})"
@ws.close
end
def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end
Un ejemplo de config.ru que ejecuta todo esto en la capa de middleware del rack:
require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new
Esto es Principal. Lo eliminé de mi versión en ejecución, por lo que es posible que deba modificarse si lo usa:
%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
class Main < Sinatra::Base
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
get "/" do
erb :"index.html"
end
get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end