'Storing data received via MQTT from ESP32 in a Python Database
I am working on monitoring the electric conssumption of a machine. I do it with Circuit Setup SSP Energy Meter (which includes an ESP32 and an ATM90E32) sampling every 50ms. I am sending data via MQTT to a Raspberry Pi, but now I have to store that data for further analysis, for what I am using a Python script. I am reading the data perfectly in the suscriptor (the Raspberry) but i just can't figure out how to store it properly.
This is the ESP32 code:
#define ESP32
#include <ArduinoJson.h>
#include <WiFi.h>
#include <WiFiClient.h>
#include <PubSubClient.h>
#include <SPI.h>
#include <ATM90E32.h>
unsigned long tiempo;
float Ts = 0.05;
const char* ssid = "xxx";
const char* password = "xxx";
const char* mqttServer = "192.168.0.xxx";
const int mqttPort = 1883;
const char* mqttUser = "xxx";
const char* mqttPassword = "xxx";
const char* HATopic = "home/energy/sensor";
unsigned short lineFreq = 389;
unsigned short PGAGain = 21;
unsigned short VoltageGain = 7908;
unsigned short CurrentGainCT1 = 39473;
unsigned short CurrentGainCT2 = 39473;
const int CS_pin = 5;
ATM90E32 eic{};
WiFiClient espClient;
PubSubClient client(espClient);
void setup() {
delay(2000);
Serial.begin(115200);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.println("Conectando a WiFi...");
}
Serial.println("Conectado a Wifi con éxito");
client.setServer(mqttServer, mqttPort);
while (!client.connected()) {
Serial.println("Conectando a MQTT...");
if (client.connect("EnergyMeterClient", mqttUser, mqttPassword )) {
Serial.println("Conectado");
} else {
Serial.print("Fallo con estado ");
Serial.print(client.state());
delay(2000);
}
}
Serial.println("Iniciar ATM90E32");
eic.begin(CS_pin, lineFreq, PGAGain, VoltageGain, CurrentGainCT1, 0, CurrentGainCT2);
delay(1000);
}
void loop() {
StaticJsonDocument<256> meterData;
float voltageA, voltageC, totalVoltage, currentCT1, currentCT2, totalCurrent, activePower, powerFactor, temp, freq, totalWatts;
unsigned short sys0 = eic.GetSysStatus0(); //EMMState0
unsigned short sys1 = eic.GetSysStatus1(); //EMMState1
unsigned short en0 = eic.GetMeterStatus0();//EMMIntState0
unsigned short en1 = eic.GetMeterStatus1();//EMMIntState1
Serial.println("Sys Status: S0:0x" + String(sys0, HEX) + " S1:0x" + String(sys1, HEX));
Serial.println("Meter Status: E0:0x" + String(en0, HEX) + " E1:0x" + String(en1, HEX));
delay(10);
if (sys0 == 65535 || sys0 == 0) Serial.println("Error: No se reciben datos del energy meter - comprueba el conexionado");
if ((millis()-tiempo)>(Ts)){
voltageA = eic.GetLineVoltageA();
voltageC = eic.GetLineVoltageC();
totalVoltage = voltageA;
currentCT1 = eic.GetLineCurrentA();
currentCT2 = eic.GetLineCurrentC();
totalCurrent = currentCT1 + currentCT2;
activePower = eic.GetTotalActivePower();
powerFactor = eic.GetTotalPowerFactor();
//temp = eic.GetTemperature();
//freq = eic.GetFrequency();
//totalWatts = (voltageA * currentCT1) + (voltageC * currentCT2);
}
/*
Serial.println("Voltage 1: " + String(voltageA) + "V");
Serial.println("Voltage 2: " + String(voltageC) + "V");
Serial.println("Current 1: " + String(currentCT1) + "A");
Serial.println("Current 2: " + String(currentCT2) + "A");
Serial.println("Active Power: " + String(realPower) + "W");
Serial.println("Power Factor: " + String(powerFactor));
Serial.println("Fundimental Power: " + String(eic.GetTotalActiveFundPower()) + "W");
Serial.println("Harmonic Power: " + String(eic.GetTotalActiveHarPower()) + "W");
Serial.println("Reactive Power: " + String(eic.GetTotalReactivePower()) + "var");
Serial.println("Apparent Power: " + String(eic.GetTotalApparentPower()) + "VA");
Serial.println("Phase Angle A: " + String(eic.GetPhaseA()));
Serial.println("Chip Temp: " + String(temp) + "C");
Serial.println("Frequency: " + String(freq) + "Hz");
*/
meterData["V1"] = voltageA;
//meterData["V2"] = voltageC;
meterData["I1"] = currentCT1;
//meterData["I2"] = currentCT2;
//meterData["totI"] = totalCurrent;
meterData["AP"] = activePower;
meterData["PF"] = powerFactor;
//meterData["t"] = temp;
//meterData["f"] = freq;
char JSONmessageBuffer[200];
serializeJson(meterData,JSONmessageBuffer);
Serial.println("Enviando mensaje al topic MQTT...");
Serial.println(JSONmessageBuffer);
if (client.publish(HATopic, JSONmessageBuffer) == true) {
Serial.println("Éxito enviando mensaje");
} else {
Serial.println("Error enviando mensaje");
}
client.loop();
Serial.println("-------------");
tiempo = millis();
}
This is what I read in the suscriptor:

This is the Python code (I found it on the internet and modified it) but I want to store the data in diferent columns ( Voltage - Current - Power - PowerFactor ) and also add a fifth one with the date and I don't really know how to do it. It's my first time working with both Python and SQL.
import paho.mqtt.client as mqtt
import sqlite3
from time import time
MQTT_HOST = '192.168.0.xxx'
MQTT_PORT = 1883
MQTT_CLIENT_ID = 'Python MQTT client'
MQTT_USER = 'xxx'
MQTT_PASSWORD = 'xxx'
TOPIC = 'home/energy/sensor'
DATABASE_FILE = 'mqtt.db'
def on_connect(mqtt_client, user_data, flags, conn_result):
mqtt_client.subscribe(TOPIC)
def on_message(mqtt_client, user_data, message):
payload = message.payload.decode('utf-8')
db_conn = user_data['db_conn']
sql = 'INSERT INTO datos_electricos (topic, payload, fecha) VALUES (?, ?, ?)'
cursor = db_conn.cursor()
cursor.execute(sql, (message.topic, payload, int(time())))
db_conn.commit()
cursor.close()
def main():
db_conn = sqlite3.connect(DATABASE_FILE)
sql = """
CREATE TABLE IF NOT EXISTS datos_electricos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
topic TEXT NOT NULL,
payload TEXT NOT NULL,
fecha TEXT NOT NULL
)
"""
cursor = db_conn.cursor()
cursor.execute(sql)
cursor.close()
mqtt_client = mqtt.Client(MQTT_CLIENT_ID)
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
mqtt_client.user_data_set({'db_conn': db_conn})
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(MQTT_HOST, MQTT_PORT)
mqtt_client.loop_forever()
main()
This is the database I managed to create (This data is just raw data, I am not measuring anything right now, just doing a quick test) but as I said I would like to know how to separate data from the JSON buffer into different columns and also how to store a proper datetime format:

Thank you very much for your help and your time.
Solution 1:[1]
Using Json.loads() worked for me. Thanks. This is the code btw (Now I just need to figure how to add a proper datetime and if I should do it on the ESP32 or when storing the data in SQLite)
import paho.mqtt.client as mqtt
import sqlite3
import json
from time import time
MQTT_HOST = '192.168.0.xxx'
MQTT_PORT = 1883
MQTT_CLIENT_ID = 'Python MQTT client'
MQTT_USER = 'xxx'
MQTT_PASSWORD = 'xxx'
TOPIC = 'home/energy/sensor'
DATABASE_FILE = 'mqtt.db'
def on_connect(mqtt_client, user_data, flags, conn_result):
mqtt_client.subscribe(TOPIC)
def on_message(mqtt_client, user_data, message):
payload = message.payload.decode('utf-8')
variables=json.loads(payload)
V=str(variables["V1"])
I=str(variables["I1"])
AP=str(variables["AP"])
PF=str(variables["PF"])
db_conn = user_data['db_conn'] #Conexion
cursor = db_conn.cursor() #Inicio cursor
cursor.execute("INSERT INTO datos_electricos (tension,intensidad,potencia,factor) VALUES (" + V + "," + I + "," + AP + "," + PF + ")")
db_conn.commit() #Envio de la instruccion SQL
cursor.close() #Cierre cursor
def main():
db_conn = sqlite3.connect(DATABASE_FILE)
sql = """
CREATE TABLE IF NOT EXISTS datos_electricos (
tension FLOAT,
intensidad FLOAT,
potencia FLOAT,
factor FLOAT
)
"""
cursor = db_conn.cursor()
cursor.execute(sql)
cursor.close()
mqtt_client = mqtt.Client(MQTT_CLIENT_ID)
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
mqtt_client.user_data_set({'db_conn': db_conn})
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(MQTT_HOST, MQTT_PORT)
mqtt_client.loop_forever()
main()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
