top of page

Kopiowanie wierszy między dwiema tabelami SQL znajdującymi się na oddzielnych serwerach SQL wymaga zwrócenia uwagi na kilka punktów, ale dzięki InDriver definicję zadania kopiowania można teraz wykonać w mniej niż 25 wierszach kodu JavaScript.

Niezależnie od tego, czy serwery SQL są osobne, czy pochodzą od różnych dostawców (np. PostgreSQL i MS SQL Server), algorytm kopiowania musi wyodrębnić część danych (wiersze) z tabeli źródłowej i wstawić ją do tabeli docelowej.

Chociaż algorytmy kopiowania mogą się różnić w zależności od przypadku, istnieją wspólne założenia:

  1. Uzyskaj ostatni indeks wiersza skopiowany do tabeli docelowej na początku algorytmu kopiowania.

  2. Zachowaj ostatni indeks wiersza (indeks lub znacznik czasu) z ostatniej pomyślnej procedury kopiowania.

  3. Zabezpiecz algorytm przed błędami funkcji wyboru lub wstawiania – kontynuuj dopiero wtedy, gdy procedura kopiowania zakończy się powodzeniem.

  4. Zrównoważ częstotliwość kopiowania i rozmiar pojedynczej porcji kopii – liczbę wierszy. Dzięki temu dane są kopiowane szybciej niż generowane w tabeli źródłowej. Kopia skumulowanych wierszy powinna być wykonywana w akceptowalnym czasie, optymalizując obciążenie SQL Server. Wydajność kopiowania powinna być optymalna.

Wszystkie te aspekty zostały omówione w poniższym przykładzie.

Przykład kopiowania tabeli dziennika:

Copy SQL Tables between databases

Rozważmy tabelę dziennika o nazwie „inapi_log” z kolumnami (id jako SERIAL PRIMARY KEY, driver jako tekst, task jako tekst, type jako tekst, ts jako timestampz i msg jako tekst).

Ta tabela dziennika będzie stale kopiowana do innej tabeli z tymi samymi kolumnami, nazwanej „copy_inapi_log”, znajdującej się na oddzielnym serwerze SQL z szybkością 1000 wierszy na sekundę. Oba serwery w tym przykładzie to PostgreSQL.

Rozwiązanie

Konfiguracja InStudio przedstawiona poniżej ilustruje konfigurację InDriver, zawierającą zdefiniowane zadanie „kopiuj tabelę” i towarzyszący mu kod JavaScript.

Konfiguracja InStudio z zadaniem kopiowania tabeli SQL.

Zadanie kopiowania tabeli jest definiowane przez dwa bloki kodu - onStartup i onHook, jak pokazano poniżej:

  • onStartup

// Create destination (copy) table and indexes if not exist - public.copy_inapi_log

InDriver.sqlExecute("azureserver", "CREATE TABLE IF NOT EXISTS public.copy_inapi_log (id SERIAL PRIMARY KEY, driver text, task text, type text, ts timestamp with time zone, msg text);\

CREATE INDEX IF NOT EXISTS copy_inapi_log_driver_index ON public.copy_inapi_log USING BTREE (driver); \

CREATE INDEX IF NOT EXISTS copy_inapi_log_task_index ON public.copy_inapi_log USING BTREE (task);\

CREATE INDEX IF NOT EXISTS copy_inapi_log_ts_index ON public.copy_inapi_log USING BTREE (ts);\

CREATE INDEX IF NOT EXISTS copy_inapi_log_type_index ON public.copy_inapi_log USING BTREE (type);")

 

//Define a single copy size

var copy_size = 10

// define copy frequency 1/1s

InDriver.installHook(1000)

 

var last_copied_row_id = 0

//select the last copied row id from destination table 0 if no rows copied yet
let q = JSON.parse(InDriver.sqlExecute("azureserver", "select id from public.copy_inapi_log order by id desc limit 1"))

if (q.length === 1)

last_copied_row_id = q[0].id

 

  • onHook
     

copy()

 

function copy() {
   // Copy a portion of rows with size copy_size starting from the last_copied_row_id

  let copy = JSON.parse(
           InDriver.sqlExecute(
               "azureserver",
               ["SELECT id, driver, task, type, ts, msg FROM public.inapi_log WHERE id > ", last_copied_row_id, " ORDER BY id LIMIT ", copy_size]))


    // check select result
    if (copy.hasOwnProperty("Error")) {
      
// error
        InDriver.debug(
                  
"Copy failed at id: " + last_copied_row_id + " Source error ")
       return
  }

   if (!copy.length) {
       InDriver.debug(
"
No more rows to copy from id: " + last_copied_row_id)
       return
  }
 
  //build SQL insert query
    let sql = ["INSERT INTO public.copy_inapi_log (id, driver, task, type, ts, msg) VALUES "]
//iterate copy to build inserted values list
    for (var i = 0; i < copy.length; i++) {
       let
row = copy[i]
       if (
i > 0)
          
sql.push(",")
      
sql.push("(", row.id, ",'", row.driver, "','", row.task, "','", row.type, "','", row.ts, "',$magic$", row.msg, "$magic$)")
  }
 
sql.push(";")

 

    //execute SQL insert query
  let q = JSON.parse(InDriver.sqlExecute("localserver", sql))
 

   if (!q.hasOwnProperty("Error")) {
      
// Success
        let last = copy[copy.length - 1].id
       InDriver.debug(
                  
copy.length + " rows copied from id: " + last_copied_row_id + " to " + last)
      
last_copied_row_id = last
    } else
       InDriver.debug(
"Copy failed at id: " + last_copied_row_id)
}

 

Wynik

Jak widać na zrzucie ekranu poniżej, co sekundę do tabeli docelowej przesyłana jest partia 1000 wierszy.

Monitoring the performance of the copy algorithm

InAnalytics.io

Innowacyjna analityka danych

AND SYSTEMS Kraków | Polska

Zarejestruj się, aby otrzymywać najświeższe informacje

Dziękujemy za subskrypcję.

© AND SYSTEMY 2025

bottom of page