sql >> Base de Datos >  >> RDS >> Database

Configuración de Service Broker para procesamiento asíncrono

En mi último artículo, hablé sobre los beneficios de implementar el procesamiento asíncrono usando Service Broker en SQL Server sobre los otros métodos que existen para el procesamiento desacoplado de tareas largas. En este artículo, repasaremos todos los componentes que deben configurarse para una configuración básica de Service Broker en una sola base de datos y las consideraciones importantes para la gestión de conversaciones entre los servicios de Broker. Para comenzar, necesitaremos crear una base de datos y habilitar la base de datos para el uso de Service Broker:

CREATE DATABASE AsyncProcessingDemo;
GO
 
IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0
BEGIN
  ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER;
END
GO
 
USE AsyncProcessingDemo;
GO

Configuración de los componentes del intermediario

Los objetos básicos que deben crearse en la base de datos son los tipos de mensajes para los mensajes, un contrato que define cómo se enviarán los mensajes entre los servicios, una cola y el servicio iniciador, y una cola y el servicio de destino. Muchos ejemplos en línea para Service Broker muestran nombres de objetos complejos para los tipos de mensajes, contratos y servicios para Service Broker. Sin embargo, no existe un requisito para que los nombres sean complejos y se pueden usar nombres de objetos simples para cualquiera de los objetos.

Para los mensajes, necesitaremos crear un tipo de mensaje para la solicitud, que se llamará AsyncRequest y un tipo de mensaje para el resultado, que se llamará AsyncResult . Ambos utilizarán XML que será validado como correctamente formado por los servicios de intermediario para enviar y recibir los datos requeridos por los servicios.

-- Create the message types
CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE [AsyncResult]  VALIDATION = WELL_FORMED_XML;

El contrato especifica que AsyncRequest será enviado por el servicio de inicio al servicio de destino y que el servicio de destino devolverá un AsyncResult mensaje de vuelta al servicio de inicio. El contrato también puede especificar múltiples tipos de mensajes para el iniciador y el destino, o que un tipo de mensaje específico pueda ser enviado por cualquier servicio, si el procesamiento específico lo requiere.

-- Create the contract
CREATE CONTRACT [AsyncContract] 
(
  [AsyncRequest] SENT BY INITIATOR, 
  [AsyncResult]  SENT BY TARGET
);

Para cada uno de los servicios, se debe crear una cola para proporcionar almacenamiento de los mensajes recibidos por el servicio. El servicio de destino donde se enviará la solicitud debe crearse especificando el AsyncContract para permitir que se envíen mensajes al servicio. En este caso el servicio se llama ProcessingService y se creará en ProcessingQueue dentro de la base de datos. El servicio de inicio no requiere que se especifique un contrato, lo que hace que solo pueda recibir mensajes en respuesta a una conversación que se inició desde él.

-- Create the processing queue and service - specify the contract to allow sending to the service
CREATE QUEUE ProcessingQueue;
CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]);
 
-- Create the request queue and service 
CREATE QUEUE RequestQueue;
CREATE SERVICE [RequestService] ON QUEUE RequestQueue;

Enviar un mensaje para procesamiento

Como expliqué en el artículo anterior, prefiero implementar un procedimiento almacenado contenedor para enviar un nuevo mensaje a un servicio de intermediario, de modo que pueda modificarse una vez para escalar el rendimiento si es necesario. Este procedimiento es un contenedor simple para crear una nueva conversación y enviar el mensaje al ProcessingService .

-- Create the wrapper procedure for sending messages
CREATE PROCEDURE dbo.SendBrokerMessage 
	@FromService SYSNAME,
	@ToService   SYSNAME,
	@Contract    SYSNAME,
	@MessageType SYSNAME,
	@MessageBody XML
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
 
  BEGIN TRANSACTION;
 
  BEGIN DIALOG CONVERSATION @conversation_handle
    FROM SERVICE @FromService
    TO SERVICE @ToService
    ON CONTRACT @Contract
    WITH ENCRYPTION = OFF;
 
  SEND ON CONVERSATION @conversation_handle
    MESSAGE TYPE @MessageType(@MessageBody);
 
  COMMIT TRANSACTION;
END
GO

Usando el procedimiento almacenado contenedor ahora podemos enviar un mensaje de prueba al ProcessingService para validar que hemos configurado correctamente los servicios del bróker.

-- Send a request
EXECUTE dbo.SendBrokerMessage
  @FromService = N'RequestService',
  @ToService   = N'ProcessingService',
  @Contract    = N'AsyncContract',
  @MessageType = N'AsyncRequest',
  @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
 
-- Check for message on processing queue
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO

Procesamiento de mensajes

Si bien podríamos procesar manualmente los mensajes de ProcessingQueue , probablemente querremos que los mensajes se procesen automáticamente a medida que se envían al ProcessingService . Para hacer esto, se debe crear un procedimiento almacenado de activación que probaremos y luego vincularemos a la cola para automatizar el procesamiento tras la activación de la cola. Para procesar un mensaje necesitamos RECEIVE el mensaje de la cola dentro de una transacción, junto con el tipo de mensaje y el identificador de conversación para el mensaje. El tipo de mensaje garantiza que se aplique la lógica adecuada al mensaje que se está procesando, y el identificador de conversación permite que se envíe una respuesta al servicio de inicio cuando se haya procesado el mensaje.

El RECEIVE El comando permite que un solo mensaje o varios mensajes dentro del mismo identificador de conversación o grupo se procesen en una sola transacción. Para procesar múltiples mensajes, se debe usar una variable de tabla, o para procesar un solo mensaje, se puede usar una variable local. El procedimiento de activación a continuación recupera un solo mensaje de la cola, verifica el tipo de mensaje para determinar si es una AsyncRequest mensaje, y luego realiza el proceso de larga ejecución en función de la información del mensaje recibido. Si no recibe un mensaje dentro del ciclo, esperará hasta 5000 ms, o 5 segundos, para que otro mensaje ingrese a la cola antes de salir del ciclo y finalizar su ejecución. Después de procesar un mensaje, genera un AsyncResult mensaje y lo devuelve al iniciador en el mismo identificador de conversación del que se recibió el mensaje. El procedimiento también verifica el tipo de mensaje para determinar si un EndDialog o Error se ha recibido un mensaje para limpiar la conversación finalizándola.

-- Create processing procedure for processing queue
CREATE PROCEDURE dbo.ProcessingQueueActivation
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM ProcessingQueue
    ), TIMEOUT 5000;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END
 
    IF @message_type_name = N'AsyncRequest'
    BEGIN
      -- Handle complex long processing here
      -- For demonstration we'll pull the account number and send a reply back only
 
      DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT');
 
      -- Build reply message and send back
      DECLARE @reply_message_body XML = N'
        ' + CAST(@AccountNumber AS NVARCHAR(11)) + '
      ';
 
      SEND ON CONVERSATION @conversation_handle
        MESSAGE TYPE [AsyncResult] (@reply_message_body);
    END
 
    -- If end dialog message, end the dialog
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
      END CONVERSATION @conversation_handle;
    END
 
    -- If error message, log and end conversation
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
      -- Log the error code and perform any required handling here
      -- End the conversation for the error
      END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END
GO

La RequestQueue también necesitará procesar los mensajes que se le envían, por lo que un procedimiento adicional para procesar el AsyncResult es necesario crear los mensajes devueltos por el procedimiento ProcessingQueueActivation. Dado que sabemos que el mensaje AsnycResult significa que todo el trabajo de procesamiento se completó, la conversación puede finalizar una vez que procesemos ese mensaje, lo que enviará un mensaje EndDialog al ProcessingService, que luego será procesado por su procedimiento de activación para finalizar el conversación limpiando todo y evitando el fuego y olvídate de los problemas que ocurren cuando las conversaciones se terminan correctamente.

-- Create procedure for processing replies to the request queue
CREATE PROCEDURE dbo.RequestQueueActivation
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM RequestQueue
    ), TIMEOUT 5000;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END
 
    IF @message_type_name = N'AsyncResult'
    BEGIN
      -- If necessary handle the reply message here
      DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT');
 
      -- Since this is all the work being done, end the conversation to send the EndDialog message
      END CONVERSATION @conversation_handle;
    END
 
    -- If end dialog message, end the dialog
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
       END CONVERSATION @conversation_handle;
    END
 
    -- If error message, log and end conversation
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
       END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END
GO

Probar los procedimientos

Antes de automatizar el procesamiento de la cola para nuestros servicios, es importante probar los procedimientos de activación para garantizar que procesen los mensajes de manera adecuada y para evitar que se deshabilite una cola en caso de que ocurra un error que no se maneje correctamente. Como ya hay un mensaje en ProcessingQueue la ProcessingQueueActivation se puede ejecutar un procedimiento para procesar ese mensaje. Tenga en cuenta que WAITFOR hará que el procedimiento tarde 5 segundos en terminar, aunque el mensaje se procese inmediatamente desde la cola. Después de procesar el mensaje, podemos verificar que el procedimiento funcionó correctamente consultando RequestQueue para ver si un AsyncResult existe el mensaje, y luego podemos verificar que RequestQueueActivation el procedimiento funciona correctamente al ejecutarlo.

-- Process the message from the processing queue
EXECUTE dbo.ProcessingQueueActivation;
GO
 
-- Check for reply message on request queue
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO
 
-- Process the message from the request queue
EXECUTE dbo.RequestQueueActivation;
GO

Automatización del Procesamiento

En este punto, todos los componentes están completos para automatizar completamente nuestro procesamiento. Lo único que queda es vincular los procedimientos de activación a sus colas apropiadas y luego enviar otro mensaje de prueba para validar que se procese y que no quede nada en las colas después.

-- Alter the processing queue to specify internal activation
ALTER QUEUE ProcessingQueue
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = dbo.ProcessingQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );
GO
 
-- Alter the request queue to specify internal activation
ALTER QUEUE RequestQueue
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = dbo.RequestQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );
GO
 
-- Test automated activation
-- Send a request
 
EXECUTE dbo.SendBrokerMessage
	@FromService = N'RequestService',
	@ToService   = N'ProcessingService',
	@Contract    = N'AsyncContract',
	@MessageType = N'AsyncRequest',
	@MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
 
-- Check for message on processing queue 
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO
 
-- Check for reply message on request queue 
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO

Resumen

Los componentes básicos para el procesamiento asincrónico automatizado en SQL Server Service Broker se pueden configurar en una configuración de base de datos única para permitir el procesamiento desacoplado de tareas de ejecución prolongada. Esta puede ser una herramienta poderosa para mejorar el rendimiento de la aplicación, desde la experiencia del usuario final, al desvincular el procesamiento de las interacciones del usuario final con la aplicación.