top of page

Kopiowanie wierszy między dwiema tabelami SQL znajdującymi się na oddzielnych serwerach SQL wymaga zwrócenia uwagi na kilka punktów, ale dzięki InDriver definicję zadania kopiowania można teraz wykonać w mniej niż 25 wierszach kodu JavaScript.

Niezależnie od tego, czy serwery SQL są osobne, czy pochodzą od różnych dostawców (np. PostgreSQL i MS SQL Server), algorytm kopiowania musi wyodrębnić część danych (wiersze) z tabeli źródłowej i wstawić ją do tabeli docelowej.

Chociaż algorytmy kopiowania mogą się różnić w zależności od przypadku, istnieją wspólne założenia:

  1. Uzyskaj ostatni indeks wiersza skopiowany do tabeli docelowej na początku algorytmu kopiowania.

  2. Zachowaj ostatni indeks wiersza (indeks lub znacznik czasu) z ostatniej pomyślnej procedury kopiowania.

  3. Zabezpiecz algorytm przed błędami funkcji wyboru lub wstawiania – kontynuuj dopiero wtedy, gdy procedura kopiowania zakończy się powodzeniem.

  4. Zrównoważ częstotliwość kopiowania i rozmiar pojedynczej porcji kopii – liczbę wierszy. Dzięki temu dane są kopiowane szybciej niż generowane w tabeli źródłowej. Kopia skumulowanych wierszy powinna być wykonywana w akceptowalnym czasie, optymalizując obciążenie SQL Server. Wydajność kopiowania powinna być optymalna.

Wszystkie te aspekty zostały omówione w poniższym przykładzie.

Przykład kopiowania tabeli dziennika:

Copy SQL Tables between databases

Rozważmy tabelę dziennika o nazwie „inapi_log” z kolumnami (id jako SERIAL PRIMARY KEY, driver jako tekst, task jako tekst, type jako tekst, ts jako timestampz i msg jako tekst).

Ta tabela dziennika będzie stale kopiowana do innej tabeli z tymi samymi kolumnami, nazwanej „copy_inapi_log”, znajdującej się na oddzielnym serwerze SQL z szybkością 1000 wierszy na sekundę. Oba serwery w tym przykładzie to PostgreSQL.

Rozwiązanie

Konfiguracja InStudio przedstawiona poniżej ilustruje konfigurację InDriver, zawierającą zdefiniowane zadanie „kopiuj tabelę” i towarzyszący mu kod JavaScript.

Konfiguracja InStudio z zadaniem kopiowania tabeli SQL.

Zadanie kopiowania tabeli jest definiowane przez dwa bloki kodu - onStartup i onHook, jak pokazano poniżej:

  • onStartup

// Create destination (copy) table and indexes if not exist - public.copy_inapi_log

InDriver.sqlExecute("azureserver", "CREATE TABLE IF NOT EXISTS public.copy_inapi_log (id SERIAL PRIMARY KEY, driver text, task text, type text, ts timestamp with time zone, msg text);\

CREATE INDEX IF NOT EXISTS copy_inapi_log_driver_index ON public.copy_inapi_log USING BTREE (driver); \

CREATE INDEX IF NOT EXISTS copy_inapi_log_task_index ON public.copy_inapi_log USING BTREE (task);\

CREATE INDEX IF NOT EXISTS copy_inapi_log_ts_index ON public.copy_inapi_log USING BTREE (ts);\

CREATE INDEX IF NOT EXISTS copy_inapi_log_type_index ON public.copy_inapi_log USING BTREE (type);")

 

//Define a single copy size

var copy_size = 10

// define copy frequency 1/1s

InDriver.installHook(1000)

 

var last_copied_row_id = 0

//select the last copied row id from destination table 0 if no rows copied yet
let q = JSON.parse(InDriver.sqlExecute("azureserver", "select id from public.copy_inapi_log order by id desc limit 1"))

if (q.length === 1)

last_copied_row_id = q[0].id

 

  • onHook
     

copy()

 

function copy() {
   // Copy a portion of rows with size copy_size starting from the last_copied_row_id

  let copy = JSON.parse(
           InDriver.sqlExecute(
               "azureserver",
               ["SELECT id, driver, task, type, ts, msg FROM public.inapi_log WHERE id > ", last_copied_row_id, " ORDER BY id LIMIT ", copy_size]))


    // check select result
    if (copy.hasOwnProperty("Error")) {
      
// error
        InDriver.debug(
                  
"Copy failed at id: " + last_copied_row_id + " Source error ")
       return
  }

   if (!copy.length) {
       InDriver.debug(
"
No more rows to copy from id: " + last_copied_row_id)
       return
  }
 
  //build SQL insert query
    let sql = ["INSERT INTO public.copy_inapi_log (id, driver, task, type, ts, msg) VALUES "]
//iterate copy to build inserted values list
    for (var i = 0; i < copy.length; i++) {
       let
row = copy[i]
       if (
i > 0)
          
sql.push(",")
      
sql.push("(", row.id, ",'", row.driver, "','", row.task, "','", row.type, "','", row.ts, "',$magic$", row.msg, "$magic$)")
  }
 
sql.push(";")

 

    //execute SQL insert query
  let q = JSON.parse(InDriver.sqlExecute("localserver", sql))
 

   if (!q.hasOwnProperty("Error")) {
      
// Success
        let last = copy[copy.length - 1].id
       InDriver.debug(
                  
copy.length + " rows copied from id: " + last_copied_row_id + " to " + last)
      
last_copied_row_id = last
    } else
       InDriver.debug(
"Copy failed at id: " + last_copied_row_id)
}

 

Wynik

Jak widać na zrzucie ekranu poniżej, co sekundę do tabeli docelowej przesyłana jest partia 1000 wierszy.

Monitoring the performance of the copy algorithm
bottom of page