陈拓 2021/04/17-2021/04/19

1. 概述

在《用C语言实现mosquitto MQTT订阅消息》

https://zhuanlan.zhihu.com/p/365190438

https://blog.csdn.net/chentuo2000/article/details/115747492

和《用C语言实现mosquitto MQTT订阅消息(异步)》

https://zhuanlan.zhihu.com/p/365483724

https://blog.csdn.net/chentuo2000/article/details/115786111

两篇文章中我们已经通过MQTT订阅收到了客户端的消息,本文介绍将其中的数据存储到MySQL数据库中。

在《MQTT服务器Mosquitto 2.x编译安装配置》

https://zhuanlan.zhihu.com/p/365103802

https://blog.csdn.net/chentuo2000/article/details/115731687

一文中我们下载了mosquitto 2.x源码,其中有个写MySQL数据库的例子:

在GitHub上mosquitto官方也有这个例子:

https://github.com/eclipse/mosquitto/blob/master/examples/mysql_log/mysql_log.c

2. 查看mysql_log.c代码

cat mosquitto-2.0.9/examples/mysql_log/mysql_log.c

见“附录:mysql_log.c源代码”。

3. 安装MySQL数据库

见《树莓派安装使用数据库MariaDB (MySQL)》

https://blog.csdn.net/chentuo2000/article/details/108702880

4. 创建数据表

  • 查看MySQL进程

ps -ef | grep mysql

MySQL已启动。

  • 创建MySQL用户

在《树莓派安装使用数据库MariaDB (MySQL)》

https://blog.csdn.net/chentuo2000/article/details/108702880

一文中我们创建了用户ct。

  • 以用户ct登录MySQL

mysql -uct -pct

  • 查看数据库

show databases;

在《树莓派安装使用数据库MariaDB (MySQL)》

https://blog.csdn.net/chentuo2000/article/details/108702880

一文中我们创建了数据库smarthome。

  • 打开数据库

use smarthome;

  • 查看数据库smarthome中的表

show tables;

  • 查看表结构

describe temperature;

5. 修改C程序

  • 再开一个终端窗口
  • 进入mysql_log目录

cd mosquitto-2.0.9/examples/mysql_log/

  • 修改C程序mysql_log.c

有关C语言对MySQL的操作请看《树莓派采集温度数据并存入数据库(C语言版)》

https://blog.csdn.net/chentuo2000/article/details/108779497

一文。

针对我的环境进行修改:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <mosquitto.h>
#include <mysql.h>#define db_host "localhost"
#define db_username "ct"
#define db_password "ct"
#define db_database "smarthome"
#define db_port 3306#define db_query "INSERT INTO temperature (deviceid, celsius_temp) VALUES (?,?)"#define mqtt_host "localhost"
#define mqtt_port 1883static int run = 1;
static MYSQL_STMT *stmt = NULL;void handle_signal(int s)
{printf("\n Capture sign no:%d\n", s);run = 0;
}void connect_callback(struct mosquitto *mosq, void *obj, int reason_code)
{
}void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{printf("%s %d %s\n", message->topic, message->qos, (char *)message->payload);MYSQL_BIND bind[2];memset(bind, 0, sizeof(bind));bind[0].buffer_type = MYSQL_TYPE_STRING;bind[0].buffer = message->topic;bind[0].buffer_length = strlen(message->topic);// Note: payload is normally a binary blob and could contains// NULL byte. This sample does not handle it and assume payload is a// string.bind[1].buffer_type = MYSQL_TYPE_STRING;bind[1].buffer = message->payload;bind[1].buffer_length = message->payloadlen;mysql_stmt_bind_param(stmt, bind);mysql_stmt_execute(stmt);
}int main(int argc, char *argv[])
{MYSQL *connection;my_bool reconnect = true;char clientid[24];struct mosquitto *mosq;int rc = 0;signal(SIGINT, handle_signal); // 捕捉终端CTRL+c产生的SIGINT信号signal(SIGTERM, handle_signal); // 程序结束(terminate)信号mysql_library_init(0, NULL, NULL);mosquitto_lib_init();connection = mysql_init(NULL);if(connection){mysql_options(connection, MYSQL_OPT_RECONNECT, &reconnect);connection = mysql_real_connect(connection, db_host, db_username, db_password, db_database, db_port, NULL, 0);if(connection){stmt = mysql_stmt_init(connection);mysql_stmt_prepare(stmt, db_query, strlen(db_query));memset(clientid, 0, 24);snprintf(clientid, 23, "mysql_log_%d", getpid());mosq = mosquitto_new(clientid, true, connection);if(mosq){mosquitto_connect_callback_set(mosq, connect_callback);mosquitto_message_callback_set(mosq, message_callback);mosquitto_username_pw_set(mosq, "ct", "1qaz2wsx");rc = mosquitto_connect(mosq, mqtt_host, mqtt_port, 60);mosquitto_subscribe(mosq, NULL, "#", 0);while(run){rc = mosquitto_loop(mosq, -1, 1);if(run && rc){sleep(20);//mosquitto_reconnect(mosq);}}mosquitto_reconnect(mosq);}mysql_stmt_close(stmt);mysql_close(connection);}else{fprintf(stderr, "Error: Unable to connect to database.\n");printf("%s\n", mysql_error(connection));rc = 1;}}else{fprintf(stderr, "Error: Unable to start mysql.\n");rc = 1;}mysql_library_end();mosquitto_lib_cleanup();return rc;
}

说明:

(1) 通配符#的使用

mosquitto_subscribe(mosq, NULL, "#", 0);

在订阅中使用通配符可以收到多个主题的消息,通配符的详细用法可以看MQTT文档。

(2) CTRL+c正常结束程序

signal(SIGINT, handle_signal); // 捕捉终端CTRL+c产生的SIGINT信号

通过捕捉终端操作CTRL+c使程序能够正常退出以释放资源,而不是用CTRL+z强行终止程序的运行。

(3) 和《用C语言实现mosquitto MQTT订阅消息》

https://zhuanlan.zhihu.com/p/365190438

https://blog.csdn.net/chentuo2000/article/details/115747492

一文中调用无限循环的方法mosquitto_loop_forever(mosq, -1, 1);不同,这里采用等价的方法:

while(run){rc = mosquitto_loop(mosq, -1, 1);if(run && rc){sleep(20);//mosquitto_reconnect(mosq);}
}

这种方法更具灵活性。

  • 编辑mysql_log.c

nano mysql_log.c

6. 修改Makefile文件

  • 例子中的Makefile文件

cat Makefile

  • 修改为
CFLAGS=-Wall -ggdb
LDFLAGS=../../lib/libmosquitto.so.1 -lmariadbclient -lsqlite3.PHONY: all cleanall : mosquitto_mysql_logmosquitto_mysql_log : mysql_log.o${CC} $^ -o $@ ${LDFLAGS}mysql_log.o : mysql_log.c${CC} -c $^ -o $@ ${CFLAGS} -I../../include -I/usr/include/mariadbclean :-rm -f *.o mosquitto_mysql_log

根据我的MySQL和mosquitto的头文件和库文件的位置修改。

  • 头文件和库文件的位置

(1) MySQL头文件

ls -l /usr/include/mariadb/mysql.h

(2) MySQL库文件

ls -l /usr/lib/arm-linux-gnueabihf/libmariadbclient.so

ls -l /usr/lib/arm-linux-gnueabihf/libsqlite3.so

libmariadbclient.so和libsqlite3.so在默认库文件的目录/usr/lib中,不需要指出路径。

(3) mosquitto头文件

ls -l ../../include/mosquitto.h

(4) mosquitto库文件

ls -l ../../lib/libmosquitto.so.1

7. 编译

make

8. 本地测试

  • 运行./mosquitto_mysql_log

订阅subscribe程序进入循环等待接收消息。

  • 在另一个终端窗口中发布publish消息

订阅窗口收到消息,显示topic、qos、payload,并将topic和payload存入数据库,对用表temperature的deviceid和celsius_temp字段。

按CTRL+c可以正常推出程序,这样可以完成资源释放和回收工作。按CTRL+z可以强行中断程序运行,但跳过了资源释放语句的执行。

重新运行./mosquitto_mysql_log

  • 再发布消息

订阅窗口收到消息:

可以看到,在mosquitto_subscribe(mosq, NULL, "#", 0);语句中使用了通配符后订阅程序可以接收不同主题的消息。在这里我们可以用主题temperature001和temperature002表示两个温度传感器。

  • 查看数据库

在数据库窗口查看刚才收到并保存的数据:

select * from temperature where deviceid like 'temperature%';

9. 远程测试

  • 用MQTT.fx测试

详细说明见《树莓派MQTT服务远程测试MQTT.fx》

https://zhuanlan.zhihu.com/p/363373024

https://blog.csdn.net/chentuo2000/article/details/115539377

点击Publish:

  • 订阅测试窗口收到消息

  • 查看数据库

附录:mysql_log.c源代码

#include <signal.h>
#include <stdio.h>
#include <string.h>#ifndef WIN32
#  include <unistd.h>
#else
#  include <process.h>
#  define snprintf sprintf_s
#endif#include <mosquitto.h>
#include <mysql/mysql.h>#define db_host "localhost"
#define db_username "mqtt_log"
#define db_password "password"
#define db_database "mqtt_log"
#define db_port 3306#define db_query "INSERT INTO mqtt_log (topic, payload) VALUES (?,?)"#define mqtt_host "localhost"
#define mqtt_port 1883static int run = 1;
static MYSQL_STMT *stmt = NULL;void handle_signal(int s)
{run = 0;
}void connect_callback(struct mosquitto *mosq, void *obj, int result)
{
}void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{MYSQL_BIND bind[2];memset(bind, 0, sizeof(bind));bind[0].buffer_type = MYSQL_TYPE_STRING;bind[0].buffer = message->topic;bind[0].buffer_length = strlen(message->topic);// Note: payload is normally a binary blob and could contains// NULL byte. This sample does not handle it and assume payload is a// string.bind[1].buffer_type = MYSQL_TYPE_STRING;bind[1].buffer = message->payload;bind[1].buffer_length = message->payloadlen;mysql_stmt_bind_param(stmt, bind);mysql_stmt_execute(stmt);
}int main(int argc, char *argv[])
{MYSQL *connection;my_bool reconnect = true;char clientid[24];struct mosquitto *mosq;int rc = 0;signal(SIGINT, handle_signal);signal(SIGTERM, handle_signal);mysql_library_init(0, NULL, NULL);mosquitto_lib_init();connection = mysql_init(NULL);if(connection){mysql_options(connection, MYSQL_OPT_RECONNECT, &reconnect);connection = mysql_real_connect(connection, db_host, db_username, db_password, db_database, db_port, NULL, 0);if(connection){stmt = mysql_stmt_init(connection);mysql_stmt_prepare(stmt, db_query, strlen(db_query));memset(clientid, 0, 24);snprintf(clientid, 23, "mysql_log_%d", getpid());mosq = mosquitto_new(clientid, true, connection);if(mosq){mosquitto_connect_callback_set(mosq, connect_callback);mosquitto_message_callback_set(mosq, message_callback);rc = mosquitto_connect(mosq, mqtt_host, mqtt_port, 60);mosquitto_subscribe(mosq, NULL, "#", 0);while(run){rc = mosquitto_loop(mosq, -1, 1);if(run && rc){sleep(20);mosquitto_reconnect(mosq);}}mosquitto_destroy(mosq);}mysql_stmt_close(stmt);mysql_close(connection);}else{fprintf(stderr, "Error: Unable to connect to database.\n");printf("%s\n", mysql_error(connection));rc = 1;}}else{fprintf(stderr, "Error: Unable to start mysql.\n");rc = 1;}mysql_library_end();mosquitto_lib_cleanup();return rc;
}

将MQTT收到的数据保存到MySQL数据库相关推荐

  1. 将labview连续数据保存到mysql数据库器

    这一篇是在之前完成Labview和mysql连接,并且进行了简单的CRUD删除的基础上来的.我们一般不会拿Labview来做学生这种数据管理系统,而是对于基本传感器数据的采集和保存,而传感器采集数据会 ...

  2. mysql打印语句_大数据挖掘—(八):scrapy爬取数据保存到MySql数据库

    (大数据挖掘-(七):读懂MySql数据库操作)(大数据挖掘神器--scrapy spider爬虫框架(五):解析多层网页) 通过往期的文章分享,我们了解了如何爬取想要的数据到Items中,也了解了如 ...

  3. 读dataframe存入mysql_DataFrame数据保存到mysql数据库中

    本篇文章主要基于python3.6与pandas实现以下数据库操作功能: 创建数据库 数据库创建表 数据库批量插入数据 数据库更新数据 数据库配置 class sqlConfig: db_name = ...

  4. python读取串口数据保存到mysql数据库_Python3读取Excel数据存入MySQL的方法

    Python是数据分析的强大利器. 利用Python做数据分析,第一步就是学习如何读取日常工作中产生各种excel报表并存入数据中,方便后续数据处理. 这里向大家分享python3如何使用xlrd读取 ...

  5. 爬取网贷之家平台数据保存到mysql数据库

    # coding utf-8 import requests import json import datetime import pymysqluser_agent = 'User-Agent: M ...

  6. python股票接口_Python 从 sina 股票数据接口读取数据,并保存到 MySQL 数据库

    说明 从 sina 的数据接口获取数据,之后,保存到 MySql 数据库 文件:getDataFromSina.py ''' Created on 2018年2月11日 @author: Livon ...

  7. Pandas的学习(读取mongodb数据库集合到DataFrame,将DataFrame类型数据保存到mongodb数据库中)

    1.读取mongodb数据库集合到DataFrame import pymongo import pandas as pdclient = pymongo.MongoClient("数据库连 ...

  8. android数据库给单选赋值,如何使用android studio将单选按钮的值保存到mysql数据库?...

    我想创建一个投票应用程序,其中有不同的职位,其中有两个职位每个都有两个联系人.我希望当我选择一个人记录到数据库. [职位的disgn与它的参赛者部] 下面是XML代码的一部分:如何使用android ...

  9. python hive mysql_[7] 编写Python脚本将Hive的运算结果保存到MySQL数据库中(1) - 摩西莫西 - ITeye技术网站...

    编写Python脚本将Hive的运算结果保存到MySQL数据库中(1) 很多情况下,需要将Hive中的运算结果保存到MySQL数据库中,可以通过简单的Python脚本来实现. 例子1:如果获取Hive ...

最新文章

  1. 芯片初创公司一亿融资可以烧多久
  2. IBM发布全新合作伙伴计划 共赢认知商业时代
  3. 第15届全国大学生智能汽车竞赛 人工智能挑战赛(百度)
  4. MyEclipse快捷键与插件大全
  5. .NET混淆器 Dotfuscator如何保护应用程序?控制流了解一下!
  6. 一个服务器多个网站多个域名,多个域名一个服务器吗
  7. Exchange Server2010部署完后的配置:CA、Outlook Anywhere、OWA域名简写
  8. 总结我在架构师升级过程中的那些坑以及各种体会
  9. 【gateway系列】手把手教你gateway整合nacos注册中心
  10. java实验二 类和对象
  11. 《TCP/IP Sockets编程(C语言实现) (第2版)》 代码下载(链接以及文件打包)
  12. linux grep (转)
  13. 全国医疗机构勒索病毒事件公告:阿里云发布公益行动
  14. 科技文献检索(八)——检索技术
  15. 建立人脉关系以及可能认识的人推荐
  16. java使用ffmpeg对视频进行转码和分辨率转换
  17. ssl证书是什么,ssl证书有什么作用
  18. SpringMVC的数据请求
  19. 用一朵花开的时间,记录自己的成长
  20. macd的python代码同花顺_超牛MACD(代编写程序化交易模型)-同花顺公式 -程序化交易(CXH99.COM)...

热门文章

  1. 程序员必备75道逻辑思维题(附答案)之一
  2. java创建线程的两种方式及区别
  3. 初识「零知识」与「证明」
  4. 小马哥------高仿机 杂牌机常识与刷机救砖心得与技巧
  5. codingair 云服务平台分享视频
  6. 人员梯度培养_如何系统培养重点员工,构建后备人才梯队?
  7. Codeforces Round #530 (Div. 1) C. Construct a tree 想法
  8. 推出 RxR:多语言指令跟随导航基准数据集
  9. html 表单格式插入图片
  10. php 基于ICMP协议实现一个ping命令