'Storing data received via MQTT from ESP32 in a Python Database

I am working on monitoring the electric conssumption of a machine. I do it with Circuit Setup SSP Energy Meter (which includes an ESP32 and an ATM90E32) sampling every 50ms. I am sending data via MQTT to a Raspberry Pi, but now I have to store that data for further analysis, for what I am using a Python script. I am reading the data perfectly in the suscriptor (the Raspberry) but i just can't figure out how to store it properly.

This is the ESP32 code:

#define ESP32

#include <ArduinoJson.h>
#include <WiFi.h>
#include <WiFiClient.h>
#include <PubSubClient.h>
#include <SPI.h>
#include <ATM90E32.h>

unsigned long tiempo;
float Ts = 0.05;

const char* ssid = "xxx"; 
const char* password =  "xxx";

const char* mqttServer = "192.168.0.xxx";
const int mqttPort = 1883;
const char* mqttUser = "xxx";
const char* mqttPassword = "xxx";
const char* HATopic = "home/energy/sensor";

unsigned short lineFreq = 389;             
unsigned short PGAGain = 21;   
unsigned short VoltageGain = 7908;
unsigned short CurrentGainCT1 = 39473;
unsigned short CurrentGainCT2 = 39473;
const int CS_pin = 5; 

ATM90E32 eic{}; 

WiFiClient espClient;
PubSubClient client(espClient);

void setup() {
  delay(2000);
  Serial.begin(115200);

  WiFi.begin(ssid, password);
 
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.println("Conectando a WiFi...");
  }
 
  Serial.println("Conectado a Wifi con éxito");

  client.setServer(mqttServer, mqttPort);
 
  while (!client.connected()) {
    Serial.println("Conectando a MQTT...");
 
    if (client.connect("EnergyMeterClient", mqttUser, mqttPassword )) {
 
      Serial.println("Conectado");
 
    } else {
 
      Serial.print("Fallo con estado ");
      Serial.print(client.state());
      delay(2000);
 
    }
  }
 
  Serial.println("Iniciar ATM90E32");
  eic.begin(CS_pin, lineFreq, PGAGain, VoltageGain, CurrentGainCT1, 0, CurrentGainCT2);
  delay(1000);
  
} 
 
void loop() {

  StaticJsonDocument<256> meterData;
 
  float voltageA, voltageC, totalVoltage, currentCT1, currentCT2, totalCurrent, activePower, powerFactor, temp, freq, totalWatts;

  unsigned short sys0 = eic.GetSysStatus0(); //EMMState0
  unsigned short sys1 = eic.GetSysStatus1(); //EMMState1
  unsigned short en0 = eic.GetMeterStatus0();//EMMIntState0
  unsigned short en1 = eic.GetMeterStatus1();//EMMIntState1

  Serial.println("Sys Status: S0:0x" + String(sys0, HEX) + " S1:0x" + String(sys1, HEX));
  Serial.println("Meter Status: E0:0x" + String(en0, HEX) + " E1:0x" + String(en1, HEX));
  delay(10);

  if (sys0 == 65535 || sys0 == 0) Serial.println("Error: No se reciben datos del energy meter - comprueba el conexionado");

if ((millis()-tiempo)>(Ts)){
  
  voltageA = eic.GetLineVoltageA();
  voltageC = eic.GetLineVoltageC();
  totalVoltage = voltageA;     

  currentCT1 = eic.GetLineCurrentA();
  currentCT2 = eic.GetLineCurrentC();
  totalCurrent = currentCT1 + currentCT2;

  activePower = eic.GetTotalActivePower();
  powerFactor = eic.GetTotalPowerFactor();
  //temp = eic.GetTemperature();
  //freq = eic.GetFrequency();
  //totalWatts = (voltageA * currentCT1) + (voltageC * currentCT2);
}

/*
  Serial.println("Voltage 1: " + String(voltageA) + "V");
  Serial.println("Voltage 2: " + String(voltageC) + "V");
  Serial.println("Current 1: " + String(currentCT1) + "A");
  Serial.println("Current 2: " + String(currentCT2) + "A");
  Serial.println("Active Power: " + String(realPower) + "W");
  Serial.println("Power Factor: " + String(powerFactor));
  Serial.println("Fundimental Power: " + String(eic.GetTotalActiveFundPower()) + "W");
  Serial.println("Harmonic Power: " + String(eic.GetTotalActiveHarPower()) + "W");
  Serial.println("Reactive Power: " + String(eic.GetTotalReactivePower()) + "var");
  Serial.println("Apparent Power: " + String(eic.GetTotalApparentPower()) + "VA");
  Serial.println("Phase Angle A: " + String(eic.GetPhaseA()));
  Serial.println("Chip Temp: " + String(temp) + "C");
  Serial.println("Frequency: " + String(freq) + "Hz");
*/

  meterData["V1"] = voltageA;
  //meterData["V2"] = voltageC;
  meterData["I1"] = currentCT1;
  //meterData["I2"] = currentCT2;
  //meterData["totI"] = totalCurrent;
  meterData["AP"] = activePower;
  meterData["PF"] = powerFactor;
  //meterData["t"] = temp;
  //meterData["f"] = freq;
  
  char JSONmessageBuffer[200];
  serializeJson(meterData,JSONmessageBuffer);
  Serial.println("Enviando mensaje al topic MQTT...");
  Serial.println(JSONmessageBuffer);

  if (client.publish(HATopic, JSONmessageBuffer) == true) {
    Serial.println("Éxito enviando mensaje");
  } else {
    Serial.println("Error enviando mensaje");
  }
 
  client.loop();
  Serial.println("-------------");
  
  tiempo = millis();
 
}

This is what I read in the suscriptor:

image

This is the Python code (I found it on the internet and modified it) but I want to store the data in diferent columns ( Voltage - Current - Power - PowerFactor ) and also add a fifth one with the date and I don't really know how to do it. It's my first time working with both Python and SQL.

import paho.mqtt.client as mqtt
import sqlite3
from time import time
 
MQTT_HOST = '192.168.0.xxx'
MQTT_PORT = 1883
MQTT_CLIENT_ID = 'Python MQTT client'
MQTT_USER = 'xxx'
MQTT_PASSWORD = 'xxx'
TOPIC = 'home/energy/sensor'
 
DATABASE_FILE = 'mqtt.db'
 
 
def on_connect(mqtt_client, user_data, flags, conn_result):
    mqtt_client.subscribe(TOPIC)
 
 
def on_message(mqtt_client, user_data, message):
    payload = message.payload.decode('utf-8')
 
    db_conn = user_data['db_conn']
    sql = 'INSERT INTO datos_electricos (topic, payload, fecha) VALUES (?, ?, ?)'
    cursor = db_conn.cursor()
    cursor.execute(sql, (message.topic, payload, int(time())))
    db_conn.commit()
    cursor.close()
 
 
def main():
    db_conn = sqlite3.connect(DATABASE_FILE)
    sql = """
    CREATE TABLE IF NOT EXISTS datos_electricos (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        topic TEXT NOT NULL,
        payload TEXT NOT NULL,
        fecha TEXT NOT NULL
    )
    """
    cursor = db_conn.cursor()
    cursor.execute(sql)
    cursor.close()
 
    mqtt_client = mqtt.Client(MQTT_CLIENT_ID)
    mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
    mqtt_client.user_data_set({'db_conn': db_conn})
 
    mqtt_client.on_connect = on_connect
    mqtt_client.on_message = on_message
 
    mqtt_client.connect(MQTT_HOST, MQTT_PORT)
    mqtt_client.loop_forever()
 
 
main()

This is the database I managed to create (This data is just raw data, I am not measuring anything right now, just doing a quick test) but as I said I would like to know how to separate data from the JSON buffer into different columns and also how to store a proper datetime format:

image2

Thank you very much for your help and your time.



Solution 1:[1]

Using Json.loads() worked for me. Thanks. This is the code btw (Now I just need to figure how to add a proper datetime and if I should do it on the ESP32 or when storing the data in SQLite)

import paho.mqtt.client as mqtt
import sqlite3
import json
from time import time
 
MQTT_HOST = '192.168.0.xxx'
MQTT_PORT = 1883
MQTT_CLIENT_ID = 'Python MQTT client'
MQTT_USER = 'xxx'
MQTT_PASSWORD = 'xxx'
TOPIC = 'home/energy/sensor'
 
DATABASE_FILE = 'mqtt.db'
 
 
def on_connect(mqtt_client, user_data, flags, conn_result):
    mqtt_client.subscribe(TOPIC)
 
 
def on_message(mqtt_client, user_data, message):
    payload = message.payload.decode('utf-8')
    variables=json.loads(payload)
    
    V=str(variables["V1"])
    I=str(variables["I1"])
    AP=str(variables["AP"])
    PF=str(variables["PF"])
    
    db_conn = user_data['db_conn'] #Conexion
    cursor = db_conn.cursor() #Inicio cursor
    cursor.execute("INSERT INTO datos_electricos (tension,intensidad,potencia,factor) VALUES (" + V + "," + I + "," + AP + "," + PF + ")")
    db_conn.commit() #Envio de la instruccion SQL
    cursor.close() #Cierre cursor
 
def main():
    db_conn = sqlite3.connect(DATABASE_FILE)
    sql = """
    CREATE TABLE IF NOT EXISTS datos_electricos (   
        tension FLOAT,
        intensidad FLOAT,
        potencia FLOAT,
        factor FLOAT
    )
    """
    cursor = db_conn.cursor()
    cursor.execute(sql)
    cursor.close()
 
    mqtt_client = mqtt.Client(MQTT_CLIENT_ID)
    mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
    mqtt_client.user_data_set({'db_conn': db_conn})
 
    mqtt_client.on_connect = on_connect
    mqtt_client.on_message = on_message
 
    mqtt_client.connect(MQTT_HOST, MQTT_PORT)
    mqtt_client.loop_forever()
 
 
main()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1