sql >> Base de Datos >  >> NoSQL >> Redis

Accediendo a una variable dentro de un hilo de rieles

EDICIÓN ACTUALIZADA AL FINAL:muestra el código de trabajo. Módulo principal sin modificar excepto por el código de depuración. Nota:experimenté el problema que ya noté con respecto a la necesidad de cancelar la suscripción antes de la terminación.

El código parece correcto. Me gustaría ver cómo lo estás instanciando.

En config/application.rb, probablemente tenga al menos algo como:

require 'ws_communication'
config.middleware.use WsCommunication

Luego, en su cliente de JavaScript, debería tener algo como esto:

var ws = new WebSocket(uri);

¿Crea una instancia de otra instancia de WsCommunication? Eso establecería a @clients en una matriz vacía y podría mostrar sus síntomas. Algo como esto sería incorrecto:

var ws = new WsCommunication;

Nos ayudaría si mostrara el cliente y, quizás, config/application.rb si esta publicación no ayuda.

Por cierto, estoy de acuerdo con el comentario de que @clients debe estar protegido por un mutex en cualquier actualización, si no se lee también. Es una estructura dinámica que podría cambiar en cualquier momento en un sistema controlado por eventos. redis-mutex es una buena opción. (Espero que el enlace sea correcto ya que Github parece arrojar 500 errores en todo en este momento).

También puede notar que $redis.publish devuelve un valor entero de la cantidad de clientes que recibieron el mensaje.

Finalmente, es posible que deba asegurarse de cancelar la suscripción de su canal antes de la terminación. He tenido situaciones en las que terminé enviando cada mensaje varias veces, incluso muchas, debido a suscripciones anteriores al mismo canal que no se limpiaron. Dado que se está suscribiendo al canal dentro de un hilo, deberá darse de baja dentro de ese mismo hilo o el proceso simplemente se "colgará" esperando que aparezca mágicamente el hilo correcto. Manejo esa situación configurando un indicador de "cancelar suscripción" y luego enviando un mensaje. Luego, dentro del bloque on.message, pruebo el indicador de cancelación de suscripción y emito la cancelación de suscripción allí.

El módulo que proporcionó, con solo modificaciones menores de depuración:

require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []
    uri = URI.parse(ENV['REDISCLOUD_URL'])
    $redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts "Message event. Clients receiving:#{@clients.count};"
          @clients.each { |ws| ws.send(msg) }
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})

      ws.on :open do |event|
        @clients << ws
        puts "Open event. Clients open:#{@clients.count};"
      end

      ws.on :message do |event|
        receivers = $redis.publish(CHANNEL, event.data)
        puts "Message published:#{event.data}; Receivers:#{receivers};"
      end

      ws.on :close do |event|
        @clients.delete(ws)
        puts "Close event. Clients open:#{@clients.count};"
        ws = nil
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
end

El código de suscriptor de prueba que proporcioné:

# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do

  ws = WebSocket::Client::Simple.connect url

  ws.on :message do |msg|
    puts msg
  end

  ws.on :open do
    puts "-- Subscriber open (#{ws.url})"
  end

  ws.on :close do |e|
    puts "-- Subscriber close (#{e.inspect})"
    exit 1
  end

  ws.on :error do |e|
    puts "-- Subscriber error (#{e.inspect})"
  end

end

El código de editor de prueba que proporcioné. El editor y el suscriptor se pueden combinar fácilmente, ya que estas son solo pruebas:

# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do
  count ||= 0
  timer = EventMachine.add_periodic_timer(5+rand(5)) do
    count += 1
    send({"MESSAGE": "COUNT:#{count};"})
  end

  @ws = WebSocket::Client::Simple.connect url

  @ws.on :message do |msg|
    puts msg
  end

  @ws.on :open do
    puts "-- Publisher open"
  end

  @ws.on :close do |e|
    puts "-- Publisher close (#{e.inspect})"
    exit 1
  end

  @ws.on :error do |e|
    puts "-- Publisher error (#{e.inspect})"
    @ws.close
  end

  def self.send message
    payload = message.is_a?(Hash) ? message : {payload: message}
    @ws.send(payload.to_json)
  end
end

Un ejemplo de config.ru que ejecuta todo esto en la capa de middleware del rack:

require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new

Esto es Principal. Lo eliminé de mi versión en ejecución, por lo que es posible que deba modificarse si lo usa:

%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

  class Main < Sinatra::Base

    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    get "/" do
      erb :"index.html"
    end

    get "/assets/js/application.js" do
      content_type :js
      @scheme = env == "production" ? "wss://" : "ws://"
      erb :"application.js"
    end
  end