sql >> Base de Datos >  >> RDS >> Sqlserver

Implementación de la carga incremental mediante la captura de datos modificados en SQL Server

Este artículo será interesante para aquellos que a menudo tienen que lidiar con la integración de datos.

Introducción

Supongamos que existe una base de datos donde los usuarios siempre modifican los datos (actualizar o eliminar). Tal vez, esta base de datos sea utilizada por una aplicación grande que no permite modificar la estructura de la tabla. La tarea es cargar datos de esta base de datos a otra base de datos en un servidor diferente de vez en cuando. La forma más sencilla de abordar el problema es cargar los datos nuevos desde una base de datos de origen a una base de datos de destino con una limpieza preliminar de la base de datos de destino. Puede utilizar este método siempre que el tiempo de carga de datos sea aceptable y no supere los plazos preestablecidos. ¿Qué sucede si se tarda varios días en cargar los datos? Además, los canales de comunicación inestables conducen a una situación en la que la carga de datos se detiene y se reinicia. Si enfrenta estos obstáculos, le sugiero que considere uno de los algoritmos de "recarga de datos". Significa que solo se han producido modificaciones de datos desde la última carga.

CDC

En SQL Server 2008, Microsoft introdujo un mecanismo de seguimiento de datos llamado Change Data Capture (CDC). A grandes rasgos, el objetivo de este mecanismo es que al habilitar CDC para cualquier tabla de base de datos se cree una tabla de sistema en la misma base de datos con un nombre similar al de la tabla original (el esquema será el siguiente:'cdc' como prefijo más el nombre de esquema antiguo más "_" y el final "_CT". Por ejemplo, la tabla original es dbo.Example, luego la tabla del sistema se llamará cdc.dbo_Example_CT). Almacenará todos los datos que hayan sido modificados.

En realidad, para profundizar en CDC, considere el ejemplo. Pero primero, asegúrese de que el Agente SQL que usa CDC funcione en la instancia de prueba de SQL Server.

Además, vamos a considerar un script que crea una base de datos y una tabla de prueba, completa esta tabla con datos y habilita CDC para esta tabla.

Para comprender y simplificar la tarea, utilizaremos una instancia de SQL Server sin distribuir las bases de datos de origen y de destino a diferentes servidores.

usar mastergo:crear una base de datos de origen si no existe (seleccionar * de sys.databases donde nombre ='db_src_cdc') =db_name() and is_cdc_enabled=1) exec sys.sp_cdc_enable_dbgo-- crear un rol para tablas con CDC si no existe (seleccione * de sys.sysusers where name ='CDC_Reader' and issqlrole=1) create role CDC_Readergo-- cree un tableif object_id('dbo.Example','U') es nulo crear tabla dbo.Example (ID int restricción de identidad PK_Example clave principal, Título varchar(200) no nulo)go-- rellene los valores tableinsert dbo.Example (Título)( 'Uno'),('Dos'),('Tres'),('Cuatro'),('Cinco');ir:habilitar CDC para la tabla si no existe (seleccione * de sys.tables donde is_tracked_by_cdc =1 y nombre ='Ejemplo') exec sys.sp_cdc_enable_table @source_schema ='dbo', @source_name ='Ejemplo', @role_name ='CDC_Reader'go-- rellene la tabla con algunos datos. Cambiaremos o eliminaremos algoactualizar dbo.Ejemploestablecer Título =revertir(Título)donde ID en (2,3,4);eliminar de dbo.Ejemplo donde ID en (1,2);establecer identidad_insertar dbo.Ejemplo en;insertar dbo. Ejemplo (ID, Título) valores (1, 'Uno'), (6, 'Seis'); establecer identidad_insertar dbo. Ejemplo desactivado; ir

Ahora, veamos lo que tenemos después de ejecutar este script en las tablas dbo.Example y cdc.dbo_Example_CT (cabe señalar que CDC es asíncrono. Los datos se rellenan en las tablas donde se almacena el seguimiento de cambios después de un cierto período de tiempo ).

seleccione * de dbo.Ejemplo;
ID Título---- ---------------------- 1 Uno 3 eerhT 4 ruoF 5 Cinco 6 Seis
seleccione row_number() sobre (partición por orden de ID por __$start_lsn desc, __$seqval desc) como __$rn, *desde cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operación __$update_mask ID Título------ --------------------- - ----------- ---------------------------- ------------ ---- ------------ --- ----------- 1 0x0000003A000000580005 NULL 0x0000003A00000000580003 2 0X03 1 One 2 0x0000003A000000560006 NULL 0x0000003A000000560002 1 0X03 1 ONO 1 0X000000000000560006 NULL000000000000 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three 2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT 1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 cuatro 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ruoF 1 0x0000003A000000580005 NULL 0x0000003A000000580003 0 

Considere en detalle la estructura de la tabla en la que se almacena el seguimiento de cambios. Los campos __ $start_lsn y __ $seqval son LSN (número de secuencia de registro en la base de datos) y el número de transacción dentro de la transacción, respectivamente. Hay una propiedad importante en estos campos, a saber, podemos estar seguros de que el registro con un LSN más alto se realizará más tarde. Debido a esta propiedad, podemos obtener fácilmente el estado más reciente de cada registro en la consulta, filtrando nuestra selección por la condición, donde __ $ rn =1.

El campo de operación __$ contiene el código de transacción:

  • 1:el registro se elimina
  • 2:se inserta el registro
  • 3, 4:el registro se actualiza. Los datos antiguos antes de la actualización son 3, los datos nuevos son 4.

Además de los campos de servicio con prefijo «__$», los campos de la tabla original están completamente duplicados. Esta información es suficiente para que podamos proceder a la carga incremental.

Configurar una base de datos para cargar datos

Cree una tabla en nuestra base de datos de destino de prueba, en la que se cargarán los datos, así como una tabla adicional para almacenar datos sobre el registro de carga.

use mastergo:cree una base de datos de destino si no existe (seleccione * de sys.databases where name ='db_dst_cdc') cree la base de datos db_dst_cdcgouse db_dst_cdcgo:cree una tabla si object_id('dbo.Example','U') es nulo crear tabla dbo. Ejemplo (ID int restricción PK_Example clave principal, Título varchar (200) no nulo) ir:crear una tabla para almacenar la carga logif object_id ('dbo.log_cdc', 'U') es nulo crear tabla dbo .log_cdc ( table_name nvarchar(512) no nulo, dt datetime no nulo predeterminado getdate(), lsn binary(10) no nulo predeterminado (0x0), restricción pk_log_cdc clave principal (table_name,dt desc) )go

Me gustaría llamar su atención sobre los campos de la tabla LOG_CDC:

  • TABLE_NAME almacena información sobre qué tabla se cargó (es posible cargar varias tablas en el futuro, desde diferentes bases de datos o incluso desde diferentes servidores; el formato de la tabla es 'SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME'
  • DT es un campo de fecha y hora de carga, que es opcional para la carga incremental. Sin embargo, será útil para auditar la carga.
  • LSN:después de cargar una tabla, necesitamos almacenar información sobre el lugar donde comenzar la próxima carga, si es necesario. En consecuencia, después de cada carga, agregamos el último (máximo) __ $ start_lsn en esta columna.

Algoritmo de carga de datos

Como se describió anteriormente, al usar la consulta, podemos obtener el estado más reciente de la tabla con la ayuda de las funciones de ventana. Si conocemos el LSN de la última carga, la próxima vez que carguemos podemos filtrar de la fuente todos los datos, cuyos cambios sean superiores al LSN almacenado, si hubo al menos una carga anterior completa:

con incr_Example as( select row_number() over ( partición por orden de ID por __$start_lsn desc, __$seqval desc ) como __$rn, * from db_src_cdc.cdc.dbo_Example_CT donde __$operación <> 3 y __$ start_lsn> @lsn)seleccione * de incr_Example

Luego, podemos obtener todos los registros para la carga completa, si el LSN de carga no está almacenado:

con incr_Example as( select row_number() over ( partición por orden de ID por __$start_lsn desc, __$seqval desc ) como __$rn, * from db_src_cdc.cdc.dbo_Example_CT donde __$operación <> 3 y __$ start_lsn> @lsn), full_Example as( select * from db_src_cdc.dbo.Example donde @lsn es nulo)select ID, Title, __$operationfrom incr_Examplewhere __$rn =1union allselect ID, Title, 2 as __$operationfrom full_Example

Por lo tanto, dependiendo del valor de @LSN, esta consulta mostrará todos los cambios más recientes (sin pasar por los intermedios) con el estado Eliminado o no, o todos los datos de la tabla original, agregando el estado 2 (nuevo registro) – este campo se usa solo para unificar dos selecciones. Con esta consulta, podemos implementar fácilmente la carga completa o la recarga mediante el comando MERGE (a partir de la versión de SQL 2008).

Para evitar cuellos de botella que puedan crear procesos alternativos y cargar datos coincidentes de diferentes tablas (en el futuro, cargaremos varias tablas y, posiblemente, puede haber relaciones relacionales entre ellas), sugiero usar una instantánea de DB en la base de datos de origen ( otra característica de SQL 2008).

El texto completo de la carga es el siguiente:

[expandir título=”Código”]

/* Algoritmo de carga de datos*/-- cree una instantánea de la base de datos si existe (seleccione * de sys.databases where name ='db_src_cdc_ss' ) suelte la base de datos db_src_cdc_ss;declare @query nvarchar(max);select @query =N' crear la base de datos db_src_cdc_ss en ( nombre =N'''+nombre+ ''', nombre de archivo =N'''+[nombre de archivo]+'.ss'' ) como instantánea de db_src_cdc'desde db_src_cdc.sys.sysfiles donde groupid =1; exec ( @query );-- leer LSN de la declaración de carga anterior @lsn binary(10) =(select max(lsn) from db_dst_cdc.dbo.log_cdc where table_name ='localhost.db_src_cdc.dbo.Example');-- borrar una tabla antes de la carga completa si @lsn es nulo tabla truncada db_dst_cdc.dbo.Example;-- proceso de carga con incr_Example como (seleccione row_number() over (partición por orden de ID por __$start_lsn desc, __$seqval desc) como __$rn , * from db_src_cdc_ss.cdc.dbo_Example_CT where __$operation <> 3 and __$start_lsn> @lsn), full_Example as( select * from db_src_cdc_ss.dbo.Example donde @lsn es nulo), cte_Example as( select ID, Título, __$operación de incr_Example donde __$rn =1 union all select ID, Title, 2 as __$operation from full_Example)merge db_dst_cdc.dbo.Example as trg using cte_Example as src on trg.ID=src.IDwhen matched and __$operación =1 luego eliminar cuando coincida y __$operación <> 1 luego actualizar conjunto trg.Title =src.Titlecuando no coincida con el destino y __$operación <> 1 luego insertar (ID, Título) valores (src.ID, src .Title);-- marca el final del proceso de carga y los últimos valores LSNinsert db_dst_cdc.dbo.log_cdc (table_name, lsn) ('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) from db_src_cdc_ss.cdc.dbo_Example_CT),0))-- elimine la instantánea de la base de datos si existe (seleccione * de sys.databases where name ='db_src_cdc_ss' ) elimine la base de datos db_src_cdc_ss

[/expandir]