๐ ๊ณต๋ถํ๋ ์ง์ง์ํ์นด๋ ์ฒ์์ด์ง?
SQLite ๋ก pub ์์ ๋ฐ์ sub ๋ฐ์ดํฐ๋ฅผ db ์ ์ ์ฅํ๊ธฐ ๋ณธ๋ฌธ
๐ฉ๐ป IoT (Embedded)/Raspberry Pi
SQLite ๋ก pub ์์ ๋ฐ์ sub ๋ฐ์ดํฐ๋ฅผ db ์ ์ ์ฅํ๊ธฐ
์ง์ง์ํ์นด 2023. 12. 13. 14:26728x90
๋ฐ์ํ
raspberry pi ๊ฐ broker ์ด๋ฏ๋ก
raspberry pi ์์ sub ๋ก db๋ฅผ ๋ฐ๋๋ก ํฉ๋๋ค
sqlite ๋ฅผ ํตํด db table์ ๊ฐ ์ ์ฅํ๊ธฐ!
๐ง๐ SQLite
sudo apt-get install sqlite3 libsqlite3-dev
sudo apt-get install libpaho-mqtt-dev
๐ง๐ main.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
// C์ธ์ด๋ก ๊ตฌํ๋ SQL ๋ฐ์ดํฐ ๋ฒ ์ด์ค ์์ง
#include <sqlite3.h>
#include <time.h>
#include <unistd.h>
// MQ Telemetry Transport ๋ฒ์ 3.1 ํ๋กํ ์ฝ์ ๋ํ
// C ๊ตฌํ์ ํด๋ผ์ด์ธํธ ๊ธฐ๋ฅ์ด ํฌํจ๋ 32๋นํธ Windows ๋ผ์ด๋ธ๋ฌ๋ฆฌ
#include <MQTTClient.h>
#define MQTT_HOST "192.168.0.154"
#define MQTT_PORT 1883
#define MQTT_CLIENT_ID1 "sqlClient1"
#define MQTT_CLIENT_ID2 "sqlClient2"
#define MQTT_CLIENT_ID3 "sqlClient3"
#define MQTT_CLIENT_ID4 "sqlClient4"
#define TOPIC_ultra_1 "sensor/ultrasonic_1"
#define TOPIC_ultra_2 "sensor/ultrasonic_2"
#define TOPIC_button_1 "sensor/button_1"
#define TOPIC_button_2 "sensor/button_2"
#define DATABASE_FILE "mqtt.db"
// SQLite3 ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ฅผ ๋ค๋ฃจ๊ธฐ ์ํ ๊ฐ์ฒด
sqlite3 *db;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static int callback(void *NotUsed, int argc, char **argv, char **azColName)
{
int i;
for (i = 0; i < argc; i++)
{
printf("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL");
}
printf("\n");
return 0;
}
char ultrasonic_1[5];
char ultrasonic_2[5];
char button_1[5];
char button_2[5];
int sql_message() {
// printf("%s", ultrasonic_1);
// printf("%s", ultrasonic_2);
// printf("%s", button_1);
// printf("%s", button_2);
char insertSql[200];
// message->payload : payloadptr
snprintf(insertSql, sizeof(insertSql), "INSERT INTO sensors_data (ultrasonic_1, ultrasonic_2, button_1, button_2) VALUES ('%s', '%s', '%s', '%s');", ultrasonic_1, ultrasonic_2, button_1, button_2);
char *zErrMsg = 0;
pthread_mutex_lock(&mutex);
// SQL ๋ช
๋ น์ ์คํ
// (open ํ DB, SQL ๋ฌธ์ฅ, ์ฝ๋ฐฑํจ์ ์ด๋ฆ, ์ฝ๋ฐฑํจ์ ์ฒซ ๋ฒ์งธ ์ธ์, ERROR ๋ณ์)
int sql_rc = sqlite3_exec(db, insertSql, callback, 0, &zErrMsg);
pthread_mutex_unlock(&mutex);
// SQLITE_OK : ์ฝ๋๋ ์์
์ด ์ฑ๊ณตํ๊ณ ์ค๋ฅ๊ฐ ์์์
if (sql_rc != SQLITE_OK)
{
fprintf(stderr, "SQL error: %s\n", zErrMsg);
sqlite3_free(zErrMsg);
}
else
{
putchar('\n');
fprintf(stdout, "Record inserted successfully\n");
}
return 1;
}
int on_message_ultra_1(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
printf("%s", message->payload);
strcpy(ultrasonic_1, message->payload);
// ๋ฉ์์ง ํ์ด๋ก๋์ ํ ๋น๋ ์ถ๊ฐ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ํฌํจํ์ฌ MQTT ๋ฉ์์ง์ ํ ๋น๋ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ํด์
MQTTClient_freeMessage(&message);
// MQTT C ํด๋ผ์ด์ธํธ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์์ ํ ๋นํ ๋ฉ๋ชจ๋ฆฌ, ํนํ topic ์ด๋ฆ์ ํด์
MQTTClient_free(topicName);
return 1;
}
int on_message_ultra_2(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
printf("%s", message->payload);
strcpy(ultrasonic_2, message->payload);
// ๋ฉ์์ง ํ์ด๋ก๋์ ํ ๋น๋ ์ถ๊ฐ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ํฌํจํ์ฌ MQTT ๋ฉ์์ง์ ํ ๋น๋ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ํด์
MQTTClient_freeMessage(&message);
// MQTT C ํด๋ผ์ด์ธํธ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์์ ํ ๋นํ ๋ฉ๋ชจ๋ฆฌ, ํนํ topic ์ด๋ฆ์ ํด์
MQTTClient_free(topicName);
return 1;
}
int on_message_button_1(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
printf("%s", message->payload);
strcpy(button_1, message->payload);
// ๋ฉ์์ง ํ์ด๋ก๋์ ํ ๋น๋ ์ถ๊ฐ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ํฌํจํ์ฌ MQTT ๋ฉ์์ง์ ํ ๋น๋ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ํด์
MQTTClient_freeMessage(&message);
// MQTT C ํด๋ผ์ด์ธํธ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์์ ํ ๋นํ ๋ฉ๋ชจ๋ฆฌ, ํนํ topic ์ด๋ฆ์ ํด์
MQTTClient_free(topicName);
return 1;
}
int on_message_button_2(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
printf("%s", message->payload);
strcpy(button_2, message->payload);
sql_message();
// ๋ฉ์์ง ํ์ด๋ก๋์ ํ ๋น๋ ์ถ๊ฐ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ํฌํจํ์ฌ MQTT ๋ฉ์์ง์ ํ ๋น๋ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ํด์
MQTTClient_freeMessage(&message);
// MQTT C ํด๋ผ์ด์ธํธ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์์ ํ ๋นํ ๋ฉ๋ชจ๋ฆฌ, ํนํ topic ์ด๋ฆ์ ํด์
MQTTClient_free(topicName);
return 1;
}
int main()
{
int sql_rc, mqtt_ultra_1, mqtt_ultra_2, mqtt_button_1, mqtt_button_2 ;
MQTTClient client1, client2, client3, client4;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
// ------------------------------------------ SQLite ------------------------------------------
// DB ํ์ผ์ ์ฐ๊ฒฐํ๊ณ ๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ฐ์ฒด์ ๋ํ ํฌ์ธํฐ๋ฅผ ๋ฐํ
sql_rc = sqlite3_open(DATABASE_FILE, &db);
// ์ฐ๊ฒฐ ์คํจ์ ์ค๋ฅ ๋ฉ์์ง ์ถ๋ ฅ
if (sql_rc)
{
fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
return 0;
}
else
{
fprintf(stdout, "Opened database successfully\n");
}
char *sql = "CREATE TABLE IF NOT EXISTS sensors_data("
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
"ultrasonic_1 TEXT NOT NULL,"
"ultrasonic_2 TEXT NOT NULL,"
"button_1 TEXT NOT NULL,"
"button_2 TEXT NOT NULL,"
"created_at DATETIME DEFAULT (DATETIME('now', 'localtime')));";
pthread_mutex_lock(&mutex);
// SQL ๋ช
๋ น์ ์คํ
// (open ํ DB, SQL ๋ฌธ์ฅ, ์ฝ๋ฐฑํจ์ ์ด๋ฆ, ์ฝ๋ฐฑํจ์ ์ฒซ ๋ฒ์งธ ์ธ์, ERROR ๋ณ์)
sql_rc = sqlite3_exec(db, sql, callback, 0, 0);
pthread_mutex_unlock(&mutex);
// SQLITE_OK : ์ฝ๋๋ ์์
์ด ์ฑ๊ณตํ๊ณ ์ค๋ฅ๊ฐ ์์์
if (sql_rc != SQLITE_OK)
{
fprintf(stderr, "SQL error: %s\n", sqlite3_errmsg(db));
return 0;
}
else
{
fprintf(stdout, "Table created successfully\n");
}
// ------------------------------------------ MQTT ------------------------------------------
// (์๋ก ์์ฑํ ํด๋ผ์ด์ธํธ์ ํธ๋ค์ ๊ฐ๋ฆฌํค๋ ํฌ์ธํฐ, ์์ ๋๋ ํด๋ผ์ด์ธํธ ์ฐ๊ฒฐ ์์ฒญ์ ๋ชจ๋ํฐํ๋ MQTT ํฌํธ์ URI,
// ํด๋ผ์ด์ธํธ๋ฅผ ์๋ณํ๋ ๋ฐ ์ฌ์ฉ๋๋ ์ด๋ฆ, ํด๋ผ์ด์ธํธ ์ํ๊ฐ ๋ฉ๋ชจ๋ฆฌ์์ ๋ณด๋ฅ ์ค์ผ๋ก ์์คํ
์ฅ์ ๊ฐ ๋ฐ์ํ๋ ๊ฒฝ์ฐ ์์ค)
mqtt_ultra_1 = MQTTClient_create(&client1, MQTT_HOST, MQTT_CLIENT_ID1, MQTTCLIENT_PERSISTENCE_NONE, NULL);
mqtt_ultra_2 = MQTTClient_create(&client2, MQTT_HOST, MQTT_CLIENT_ID2, MQTTCLIENT_PERSISTENCE_NONE, NULL);
mqtt_button_1 = MQTTClient_create(&client3, MQTT_HOST, MQTT_CLIENT_ID3, MQTTCLIENT_PERSISTENCE_NONE, NULL);
mqtt_button_2 = MQTTClient_create(&client4, MQTT_HOST, MQTT_CLIENT_ID4, MQTTCLIENT_PERSISTENCE_NONE, NULL);
// TCP/IP ์ฐ๊ฒฐ์ด ๋ซํ์ง ์๋๋ก ์์ "ํ์ฑ ์ ์ง(keepalive)" ๋ฉ์์ง๋ฅผ 20์ด๋ง๋ค ๋ณด๋ด๊ธฐ
conn_opts.keepAliveInterval = 10;
// true๋ก ์ค์ ๋์๊ธฐ ๋๋ฌธ์ ์ด์ ์ฐ๊ฒฐ์์ ๋จ์ ์๋ ์์ฑ ์ค์ด๋ ๋ฉ์์ง๊ฐ ์๋ฃ๋์๋์ง ๊ฒ์ฌํ์ง ์๊ณ ์ธ์
์ด ์์
conn_opts.cleansession = 1;
// ํด๋ผ์ด์ธํธ๋ฅผ ์๋ฒ์ ์ฐ๊ฒฐํ๊ธฐ ์ ์ ์ฝ๋ฐฑ์ ์ค์
mqtt_ultra_1 = MQTTClient_setCallbacks(client1, NULL, NULL, on_message_ultra_1, NULL);
mqtt_ultra_2 = MQTTClient_setCallbacks(client2, NULL, NULL, on_message_ultra_2, NULL);
mqtt_button_1 = MQTTClient_setCallbacks(client3, NULL, NULL, on_message_button_1, NULL);
mqtt_button_2 = MQTTClient_setCallbacks(client4, NULL, NULL, on_message_button_2, NULL);
// ํด๋ผ์ด์ธํธ ํธ๋ค ๋ฐ ํฌ์ธํฐ๋ฅผ ์ฐ๊ฒฐ ์ต์
์ ์ธ์๋ก ์ ๋ฌ
if (((mqtt_ultra_1 = MQTTClient_connect(client1, &conn_opts)) != MQTTCLIENT_SUCCESS) ||
((mqtt_ultra_2 = MQTTClient_connect(client2, &conn_opts)) != MQTTCLIENT_SUCCESS) ||
((mqtt_button_1 = MQTTClient_connect(client3, &conn_opts)) != MQTTCLIENT_SUCCESS) ||
((mqtt_button_2 = MQTTClient_connect(client4, &conn_opts)) != MQTTCLIENT_SUCCESS))
{
fprintf(stderr, "Failed to connect, return code \n");
return 0;
}
// ์ ํํ ํ ํฝ์ ํด๋ผ์ด์ธํธ ์ ํ๋ฆฌ์ผ์ด์
์ ๊ตฌ๋
(client, TOPIC, QOS);
// QoS ์ค์ ์ ์ด ๊ตฌ๋
์์๊ฒ ์ก์ ๋ ๋ฉ์์ง์ ์ ์ฉ๋๋ ์ต๋ ์๋น์ค ํ์ง(QoS)์ ํ๋ณ
// ์๋ฒ๋ ์ด ์ค์ ์ ๋ฎ์ ๊ฐ ๋ฐ ์๋ ๋ฉ์์ง์ QoS์์ ๋ฉ์์ง๋ฅผ ์ก์
mqtt_ultra_1 = MQTTClient_subscribe(client1, TOPIC_ultra_1, 0);
mqtt_ultra_2 = MQTTClient_subscribe(client2, TOPIC_ultra_2, 0);
mqtt_button_1 = MQTTClient_subscribe(client3, TOPIC_button_1, 0);
mqtt_button_2 = MQTTClient_subscribe(client4, TOPIC_button_2, 0);
// ์ฐ๊ฒฐ ํธ์ถ์ด ์คํจํ๋ ๊ฒฝ์ฐ ํ๋ก๊ทธ๋จ์ ์ค๋ฅ ์ฝ๋ -1๋ก ์ข
๋ฃ
// ์ฐ๊ฒฐ ํธ์ถ์ด ์คํจํ๋ ๊ฒฝ์ฐ ํ๋ก๊ทธ๋จ์ ์ค๋ฅ ์ฝ๋ -1๋ก ์ข
๋ฃ
if ((mqtt_ultra_1 != MQTTCLIENT_SUCCESS) || (mqtt_ultra_2 != MQTTCLIENT_SUCCESS) ||
(mqtt_button_1 != MQTTCLIENT_SUCCESS) || (mqtt_button_1 != MQTTCLIENT_SUCCESS))
{
fprintf(stderr, "Failed to subscribe, return code \n");
return 0;
}
printf("Subscribed to topic: %s\n", TOPIC_ultra_1);
printf("Subscribed to topic: %s\n", TOPIC_ultra_2);
printf("Subscribed to topic: %s\n", TOPIC_button_1);
printf("Subscribed to topic: %s\n", TOPIC_button_2);
for (;;)
{
usleep(1000000); // Sleep for 1 second
}
// ํด๋ผ์ด์ธํธ์ ์ฐ๊ฒฐ ๋๊ธฐ (client, ์ ํ์๊ฐ)
// ํด๋ผ์ด์ธํธ๋ ์๋ฒ์์ ์ฐ๊ฒฐ์ ๋๊ณ ์ฝ๋ฐฑ ํจ์์์ ์์ฑ ์ค์ธ ๋ฉ์์ง๊ฐ ์๋ฃ๋๊ธฐ๋ฅผ ๊ธฐ๋ค๋ฆผ
// ์ ํ์๊ฐ์ ๋ฐ๋ฆฌ์ด ๋จ์๋ก ์ง์ (์ฐ๊ฒฐ์ ๋๊ธฐ ์ ์ ์ํํด์ผ ํ๋ ๋ค๋ฅธ ์์
์ด ์๋ฃ๋ ๋๊น์ง ์ต๋ 10์ด ๋์ ๊ธฐ๋ค๋ฆผ)
MQTTClient_disconnect(client1, 10000);
MQTTClient_disconnect(client2, 10000);
MQTTClient_disconnect(client3, 10000);
MQTTClient_disconnect(client4, 10000);
// ํด๋ผ์ด์ธํธ์ ์ฌ์ฉ๋ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๋น์ฐ๊ณ ํ๋ก๊ทธ๋จ์ ์ข
๋ฃ
MQTTClient_destroy(&client1);
MQTTClient_destroy(&client2);
MQTTClient_destroy(&client3);
MQTTClient_destroy(&client4);
// ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ๋ซ๊ธฐ
sqlite3_close(db);
return 0;
}
๐ง๐ Ultrasonic_button.ino
#include "WiFiS3.h"
#include <PubSubClient.h>
#include <NewPing.h>
// WiFi and MQTT ์
ํ
#define WLAN_SSID "class924"
#define WLAN_PASS "kosta90009"
#define MQTT_SERVER "192.168.0.154"
#define MQTT_PORT 1883
// sonar(TrigPin, EchoPin, MaxDistance);
// TrigPin๊ณผ EchoPin๊ณผ ์ต๋์ ํ๊ฑฐ๋ฆฌ(MaxDistance)์ ๊ฐ์ ์ ์ธ
NewPing sonar[2] = {
NewPing(9, 8, 50),
NewPing(11, 10, 50),
};
///////please enter your sensitive data in the Secret tab/arduino_secrets.h
char ssid[] = WLAN_SSID; // your network SSID (name)
char pass[] = WLAN_PASS; // your network password (use for WPA, or use as key for WEP)
int keyIndex = 0;
WiFiClient ethClient;
PubSubClient mqtt(ethClient);
int status = WL_IDLE_STATUS;
WiFiServer server(80);
void setup() {
Serial.begin(9600);
// button
pinMode(6,INPUT);
pinMode(7,INPUT);
delay(1000); // prevents usb driver crash on startup, do not omit this
while(!Serial); // Wait for Serial terminal to open port before starting program
connectWiFi();
// check for the WiFi module:
if (WiFi.status() == WL_NO_MODULE)
{
Serial.println("Communication with WiFi module failed!");
// don't continue
while (true)
;
}
// WiFi.firmwareVersion() : ๋ชจ๋์์ ์คํ ์ค์ธ ํ์จ์ด ๋ฒ์ ์ ๋ฌธ์์ด๋ก ๋ฐํ
String fv = WiFi.firmwareVersion();
if (fv < WIFI_FIRMWARE_LATEST_VERSION)
{
Serial.println("Please upgrade the firmware");
}
// attempt to connect to WiFi network:
while (status != WL_CONNECTED)
{
Serial.print("Attempting to connect to Network named: ");
Serial.println(ssid); // print the network name (SSID);
// Connect to WPA/WPA2 network. Change this line if using open or WEP network:
status = WiFi.begin(ssid, pass);
// wait 10 seconds for connection:
delay(5000);
}
printWifiStatus();
delay(5000);
// MQTT broker
mqtt.setServer(MQTT_SERVER, MQTT_PORT);
connectMQTT();
}
void loop() {
// UltraSonic
long sensor1val, sensor2val;
sensor1val = sonar[0].ping_cm();
sensor2val = sonar[1].ping_cm();
char buffer1[15];
char buffer2[15];
char str[] = "cm";
ltoa(sensor1val, buffer1, 10);
strcat(buffer1, str);
ltoa(sensor2val, buffer2, 10);
strcat(buffer2, str);
// button
int push1=digitalRead(6);
int push2=digitalRead(7);
Serial.println("============================");
Serial.println(buffer1);
Serial.println(buffer2);
Serial.println(push1);
Serial.println(push2);
mqtt.publish("sensor/ultrasonic_1", String(buffer1).c_str());
mqtt.publish("sensor/ultrasonic_2", String(buffer2).c_str());
mqtt.publish("sensor/button_1", String(push1).c_str());
mqtt.publish("sensor/button_2", String(push2).c_str());
delay(4000);
}
void connectWiFi() {
Serial.println("Connecting to WiFi");
WiFi.begin(WLAN_SSID, WLAN_PASS);
while (WiFi.status() != WL_CONNECTED) {
delay(1000);
Serial.println("Connecting to WiFi...");
}
Serial.println("Connected to WiFi");
}
void connectMQTT() {
Serial.println("Connecting to MQTT");
while (!mqtt.connected()) {
if (mqtt.connect("MQ135Client")) {
Serial.println("Connected to MQTT");
} else {
Serial.print("Failed to connect to MQTT, rc=");
Serial.print(mqtt.state());
Serial.println(" Retrying in 5 seconds...");
delay(5000);
}
}
}
void printWifiStatus() {
Serial.print("SSID: ");
Serial.println(WiFi.SSID());
IPAddress ip = WiFi.localIP();
Serial.print("IP Address: ");
Serial.println(ip);
long rssi = WiFi.RSSI();
Serial.print("Signal strength (RSSI):");
Serial.print(rssi);
Serial.println(" dBm");
Serial.print("To see this page in action, open a browser to http://");
Serial.println(ip);
}
๐ง๐ tasks.json
{
"version": "2.0.0",
"tasks": [
{
"label": "build",
"type": "shell",
"command": "gcc",
"args": [
"-g",
"-o",
"main",
"main.c",
"-lpaho-mqtt3c",
"-lsqlite3"
],
"group": {
"kind": "build",
"isDefault": true}
}
]
}
๐ง๐ ์คํํ๊ธฐ
==============================broker ํ๊ฒฝ (main.c)==========================
// broker ๊ฐ sub ๋ก ๋ฐ์ดํฐ ๊ฐ ๋ฐ๊ธฐ
./main
// main ์คํํ์ผ๋ก ์๊ธด mqtt.db ๋ณด๊ธฐ
sqlite3 mqtt.db
>> .tables
>> select * from sensors_data;
// table ์ ์ฒด๋ก ๋ณด๊ธฐ
>> .mode table
>> select * from sensors_data;
728x90
๋ฐ์ํ
'๐ฉโ๐ป IoT (Embedded) > Raspberry Pi' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Comments