top of page

Zaawansowane przetwarzanie danych szeregów czasowych z InDriver

Dane szeregów czasowych odnoszą się do sekwencji punktów danych zbieranych lub rejestrowanych w kolejnych odstępach czasu. Punkty te zazwyczaj mierzą tę samą zmienną (taką jak temperatura, wyniki sprzedaży czy ceny akcji) w określonym czasie, co pozwala na analizę wzorców, trendów oraz prognozowanie przyszłych wartości na podstawie danych historycznych.

Przykład punktów danych
 

Zbierając dane co minutę ze smart licznika energii, szereg czasowy może zawierać wiele wartości dla każdego znacznika czasu, jak pokazano poniżej:

2024-02-21 09:55:00

{
  "power1": 312.8,
  "power2": 281.56,
  "power3": 10.16,
  "voltage1": 238.72,
  "voltage2": 240.94,
  "voltage3": 240.05,
  "current1": 1.42,
  "current2": 1.46,
  "current3": 0.16,
  "energy1": 1699355,
  "energy2": 1734052.1,
  "energy3": 542968.2,
  "energy_total": 3976375.3,
 
"power_total": 604.45
}

2024-02-21 09:56:00

{
  "power1": 322.8,
  "power2": 241.5,
  "power3": 12.16,
  "voltage1": 238.12,
  "voltage2": 239.76,
  "voltage3": 224.67,
  "current1": 1.50,
  "current2": 1.46,
  "current3": 0.20,
  "energy1": 1699357,
  "energy2": 1734058.3,
  "energy3": 542969.3,
  "energy_total": 3976395.3
,
  "power_total": 614.25
}

2024-02-21 09:57:00

{
  "power1": 334.5,
  "power2": 234.78,
  "power3": 11.12,
  "voltage1": 236.32,
  "voltage2": 245.64,
  "voltage3": 238,01,
  "current1": 1.34,
  "current2": 1.56,
  "current3": 0.44,
  "energy1": 1699359,
  "energy2": 1734064.5,
  "energy3": 542971.3,
  "energy_total": 39764
40.1,
 
"power_total": 623.45
}

2024-02-21 09:58:00

{
  "power1": 362.8,
  "power2": 241.56,
  "power3": 15.16,
  "voltage1": 234.34,
  "voltage2": 241.56,
  "voltage3": 242.03,
  "current1": 1.99,
  "current2": 1.87,
  "current3": 0.14,
  "energy1": 1699373,
  "energy2": 1734066.3,
  "energy3": 542975.2,
  "energy_total": 3976495.8,

  "power_total": 620.76
}

2024-02-21 09:59:00

{
  "power1": 342.8,
  "power2": 251.56,
  "power3": 16.16,
  "voltage1": 24
8.72,
  "voltage2": 249.94,
  "voltage3": 248.05,
  "current1": 1.44,
  "current2": 1.45,
  "current3": 0.17,
  "energy1": 1699389,
  "energy2": 1734077.6,
  "energy3": 542999.2,
  "energy_total": 3976489.7,

  "power_total": 601.51
}

power_total

Wybrane zmienne, takie jak np. 'power_total', mogą być łatwo przedstawione na wykresie i wykorzystane w dalszych obliczeniach.

Uniwersalna struktura tabeli SQL

Wykorzystując kolumnę danych w formacie JSON, każdy punkt danych może zawierać wiele, uporządkowanych wartości, co czyni to wszechstronnym rozwiązaniem do zbierania różnorodnych danych.

Time Series Database Table
[ { "Shelly": { "power1": 62.70, "power2": 979.50, "power3": 52.34, "energy1": 543761.06, "energy2": 603290.83, "energy3": 173986.66, "current1": 0.45, "current2": 4.14, "current3": 0.29, "voltage1": 242.48, "voltage2": 238.30, "voltage3": 240.43, "power_total": 1094.20, "energy_total": 1321038.53 } }] 2023-03-18 10:00:00+00 Shelly source timestamp data [JSON]

Efektywne zarządzanie danymi szeregów czasowych z InDriver:

Wyjaśnienie kluczowych funkcji

JSON

InDriver upraszcza zbieranie danych szeregów czasowych z różnych źródeł.

Posiada funkcje odczytu/zapisu urządzeń, a także funkcje wykonywania zapytań REST API i SQL, które zwracają obiekty JSON. Te obiekty mogą być łatwo przetwarzane (modyfikowane, przycinane lub rozszerzane) i ponownie wykorzystywane jako dane wejściowe.

Przykład skryptu onHook w InDriver demonstruje zapytanie REST API, które zwraca JSON, używany jako dane wejściowe dla funkcji sqlExecute. Ten skrypt, uruchamiany w ustalonych odstępach czasu, łatwo pobiera i loguje dane do tabeli SQL z danymi szeregów czasowych.

let ts= InDriver.hookTs();

RestApi.sendRequest('shelly');

if (RestApi.isSucceeded()) {

const json = RestApi.getData('shelly');

const jsonObj = JSON.parse(json);

InDriver.sqlExecute("azureserver",

"insert into public.shelly (source, ts, data ) values ('Shelly','"+ts.toISOString()+"',$$"+json+"$$);");

}

24/7

Nieprzerwana stabilność

Zadania InDriver gwarantują ciągłe i stabilne zbieranie danych, bez żadnych przerw ani utraty danych. Połączenia zarówno z źródłami danych, jak i z miejscami docelowymi są stale monitorowane, a automatyczne ponowne połączenia są inicjowane w przypadku awarii.

Clock

00:00:00 00:01:00 00:02:00

Zbieranie danych zsynchronizowane w czasie

Dane są zbierane w dokładnych odstępach, zapewniając synchronizację z zegarem. Obejmuje to precyzyjne zbieranie danych w pełnych sekundach, minutach, kwadransach, godzinach lub dniach, wszystkie zgodnie z lokalnym czasem.

//call onHook script every full 10s, eg. 00:00:00, 00:00:10, 00:00:20...

InDriver.installHook(10000)

//call onHook script every full hour, eg. 00:00:00, 01:00:00, 02:00:00...

InDriver.installHook(3600000)

Dostępność w czasie rzeczywistym

Dzięki efektywnym implementacjom wątków na niskim poziomie, zadania InDriver osiągają wykonanie niemal w czasie rzeczywistym. Histogram poniżej przedstawia zmierzoną latencję pomiędzy zegarem czasu rzeczywistego a oczekiwanym znacznikiem czasu hook.

InDriver Latency Histogram

Synchronizacja

00:00:00 00:01:00 00:02:00

Zsynchronizowane źródła

Zbieranie danych odbywa się jednocześnie ze wszystkich źródeł, eliminując opóźnienia i rozbieżności.

Poniższy przykład przedstawia blok kodu 'begin...commit', w którym wszystkie funkcje 'readDevice' są wykonywane równocześnie, gwarantując zsynchronizowane zbieranie i logowanie danych z urządzeń.

ModbusApi.begin()

// Execute the following read function simultaneously after commitWait()

ModbusApi.readDevice( 'MoxaOne','{"name": "inputs", "type": "DISCRETEINPUTS", "address":1, "size":8}')

ModbusApi.readDevice( 'MoxaTwo','{"name": "inputs", "type": "DISCRETEINPUTS", "address":1, "size":8}')

ModbusApi.readDevice( 'MoxaThree','{"name": "inputs", "type": "DISCRETEINPUTS", "address":1, "size":8}')

ModbusApi.commitWait()

 

if (ModbusApi.isSucceeded()) {

let data = ModbusApi.getAllData()

let ts = InDriver.hookTs()

InDriver.sqlExecute("AzurePGSQL",["select tsapiinsert('public','modbus','3xIOLogicE1212', '", ts.toUTCString(), "','", data, "' );"])

InDriver.sendMessage(ts, '["device data","arch"]', data)

}

Interpolacja

Agregacja

EnergyDelta 00:00:00 03:00:00 02:00:00

Wbudowany algorytm agregacji

Wbudowany algorytm agregacji InDriver zapewnia, że wszelkie brakujące dane, potencjalnie utracone z powodu zakłóceń sieci, awarii serwerów lub niedostępności urządzeń, są bezproblemowo interpolowane w precyzyjnych, zsynchronizowanych ze zegarem znacznikach czasu. Ta funkcja wysokiej dostępności gwarantuje, że dane pozostają ciągłe, zsynchronizowane w czasie i starannie zorganizowane w interwałach. Algorytm umożliwia agregację w powszechnych odstępach czasu, takich jak 1 minuta, 15 minut, 1 godzina i 1 dzień, generując oddzielne tabele, które nie tylko zawierają wartości interpolowane, ale również dostarczają zebrane statystyki, takie jak wartość, minimum, maksimum, średnia, delta oraz delta w ciągu jednej godziny.

Cały ten proces jest wydajnie realizowany za pomocą jednego wywołania InDriver TSApi.

  • onStartup

InDriver.import("TsApi");

//set aggregation interval as 10 minutes

InDriver.installHook(600000)

// Define aggregator for 'shelly' table on 'azureserver' SQL server 

// Time zone: 'Europe/Warsaw' 

// Source: 'Shelly' time series data

TsApi.defineAggregator("AGG","azureserver","shelly","Europe/Warsaw",'["Shelly"]');
 

  • onHook

// onHook called every 10 minutes to aggregate new values

TsApi.aggregate("AGG");

JavaScript

Własne algorytmy

Wykorzystując swoją wiedzę z podstaw JavaScript, możesz łatwo opracować dostosowane algorytmy do pozyskiwania i przetwarzania danych. Nasza obszerna biblioteka gotowych funkcji upraszcza Twój workflow, umożliwiając osiągnięcie celów przy minimalnym kodzie.

Poniżej znajduje się krótki fragment kodu demonstrujący, jak wyodrębnić wartości z danych JSON zebranych ze smart licznika:

let ts= InDriver.hookTs();

RestApi.sendRequest('shelly');

if (RestApi.isSucceeded()) {

const json = RestApi.getData('shelly');

const jsonObj = JSON.parse(text);

//Extract energy and power values for each phase

let energy1 = Math.abs(jsonObj .data.device_status.emeters[0].total_returned);

let energy2 = Math.abs(jsonObj .data.device_status.emeters[1].total_returned);

let energy3 = Math.abs(jsonObj .data.device_status.emeters[2].total_returned);

let power1 = Math.abs(jsonObj .data.device_status.emeters[0].power);

let power2 = Math.abs(jsonObj .data.device_status.emeters[1].power);

let power3 = Math.abs(jsonObj .data.device_status.emeters[2].power);

//Build smart meter time series data point

let data = {

power1 : power1,

power2 : power2,

power3 : power3,

voltage1 : Math.abs(jsonObj .data.device_status.emeters[0].voltage),

voltage2 : Math.abs(jsonObj .data.device_status.emeters[1].voltage),

voltage3 : Math.abs(jsonObj .data.device_status.emeters[2].voltage),

current1 : Math.abs(jsonObj .data.device_status.emeters[0].current),

current2 : Math.abs(jsonObj .data.device_status.emeters[1].current),

current3 : Math.abs(jsonObj .data.device_status.emeters[2].current),

energy1 : energy1,

energy2 : energy2,

energy3 : energy3,

energy_total: energy1 + energy2 + energy3,

power_total: power1 + power2 + power3

}

let shelly = {Shelly:data};

let list = [];

list.push(shelly);

// Ensure data changes are detected. This smart meter sends updates irregularly, often repeating values in subsequent requests.

let currentJSON = JSON.stringify(list);

if (lastJSON !=currentJSON) {

// Log JSON object with extracted values to database

InDriver.sqlExecute("azureserver", "insert into public.shelly (source, ts, data ) values ('Shelly','"+ts.toISOString()+"',$$"+currentJSON+"$$);");

lastJSON = currentJSON;

InDriver.debug(currentJSON)

}

}

Przykład rzeczywisty - Pozyskiwanie danych z inteligentnego licznika

Ten przykład ilustruje kompleksowe zastosowanie InDriver do pozyskiwania, przetwarzania, interpolacji, agregacji i logowania danych szeregów czasowych. Pokazuje, jak InDriver ułatwia bezproblemową integrację i manipulację strumieniami danych, zapewniając efektywne zarządzanie danymi szeregów czasowych od początkowego zbierania do finalnego przechowywania, przy jednoczesnym zachowaniu integralności danych i dostarczaniu cennych informacji poprzez analizę statystyczną.

bottom of page