Paho MQTT C Client Library Example

A simple example of synchronous transceiver.

概述

这是一个使用mosquitto作为服务器,使用Paho MQTT C Client Library作为客户端的关于MQTT协议的应用实例,项目仓库MQTT Example。

mosquitto部署

 # 安装mosquitto
 sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
 sudo apt-get update

 sudo apt-get install mosquitto
 # mosquitto开发包
 # mosquitto也提供了对应的开发APIs,本例中使用的paho mqtt
 sudo apt-get install mosquitto-dev
 # mosquitto客户端
 sudo apt-get install mosquitto-clients

 # mosquitto启动服务器
 mosquitto -dp 1883

paho mqtt 同步和异步收发对比

同步收发模式和异步收发模式各有优缺点,官方的比较文档中简单的说明了两者的区别,最大的区别在于收发信息的处理和使用的API上。以下做一个简单介绍,具体如何使用应该依据应用开发需求而定,官方首页中有几个相关的简单例子。

同步模式

  • 发布:MQTTClient_Publish()MQTTClient_publishMessage(),假设设置服务质量为QoS1或QoS2,为了消息的成功发布,需要使用MQTTClient_waitForCompletion()等待确认消息的返回。

  • 订阅:需要相对频繁地调用(循环调用)MQTTClient_receive()MQTTClient_yield()来处理服务端的相应以及MQTT心跳以维持网络连接。

异步模式

  • 订阅和发布信息和同步模式类似。

  • 后台处理握手和网络连接。

  • 状态通知或消息收发使用开发者注册的回调函数进行处理。

Sample

在之前的文章中介绍了MQTT基础的通信模式,实际在开发过程中就是在模拟相关的过程,核心为CONNECTPUBLISHSUBSCRIBE控制报文,关于MQTT控制报文的分析可以看前文.

以下是同步收发的一个简单例子,代码仓库地址:MQTT_Sample。

 // publish client
 #include <stdio.h>
 #incldue <MQTTClient.h>

 int main(int argc, char *argv[])
 {
     char mqtt_buffer[64];
     const int time_out = 1000;
     int rv;
     int QOS = 1;
     
     MQTTClient m_mqttClient;
     MQTTClient_connectOptions m_mqttConnOpts;
     MQTTClient_message m_mqttMsg;
     MQTTClient_deliveryToken m_mqttToken;

     char *m_mqttBroker = "tcp://10.0.23.112:1883";
     char *m_mqtttClientId = "publish_client";
     char *m_mqttTopic = "test_sample";

     m_mqttConnOpts = MQTTClient_connectOptions_initializer;
     m_mqttMsg = MQTTClient_message_initializer;

     MQTTClient_create(&m_mqttClient, m_mqttBroker, m_mqtttClientId,
                       MQTTCLIENT_PERSISTENCE_NONE, nullptr);
     m_mqttConnOpts.username = "6L2EB2PNW1FzkO8HJZnj";
     m_mqttConnOpts.password = "123456";
     if ((rv = MQTTClient_connect(m_mqttClient, &m_mqttConnOpts)) != MQTTCLIENT_SUCCESS) {
         printf("MQTTClient_connect failure:%s\n", strerror(errno));
         return -1;
     }
     
      m_mqttConnOpts.keepAliveInterval = 60;
      m_mqttConnOpts.cleansession = 1;

     sprintf(mqtt_buffer, "{\"speed\": \"%.6f\"}", 100.0);

     m_mqttMsg.qos = QOS;
     m_mqttMsg.retained = 0;
     m_mqttMsg.payload = (void *)mqtt_buffer;
     m_mqttMsg.payloadlen = strlen(mqtt_buffer);
     MQTTClient_publishMessage(m_mqttClient, m_mqttTopic, &m_mqttMsg, &m_mqttToken);
     rv = MQTTClient_waitForCompletion(m_mqttClient, m_mqttToken, time_out);
     printf("Message with delivery token %d delivered\n", rv);
     printf("MQTT send %s_fps message: %s\n", __FUNCTION__, mqtt_buffer);
 }
 // subscribe client
 #include <stdio.h>
 #include <MQTTClient.h>

 int main(int argc, char *argv[])
 {
     char *mqtt_broker = "tcp://10.0.23.112:1883";
     char *client_id = "subscibe_client";
     int rv, i;
     char *topic = "test_sample";
     char *topic_name = nullptr;
     int topic_len;

     MQTTClient client;
     MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
     MQTTClient_deliveryToken token;
     MQTTClient_message *receive_msg = nullptr;
     conn_opts.keepAliveInterval = 60; 
     conn_opts.cleansession = 1; 

     if ((rv = MQTTClient_create(&client, mqtt_broker, client_id,
                                 MQTTCLIENT_PERSISTENCE_NONE, nullptr)) < 0) {
         printf("MQTTClient_create failure:%s\n", strerror(errno));
         return 0;
     }

     if ((rv = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
         printf("MQTTClient_connect failure:%s\n", strerror(errno));
         return 0;
     }

     MQTTClient_subscribe(client, topic, 1);

     // receive 函数放在外面传递信息不会改变
     while (1) {
         if ((rv = MQTTClient_receive(client, &topic_name, &topic_len,
                       &receive_msg, 1000000)) != MQTTCLIENT_SUCCESS)  //最后一个参数是超时时间,单位是毫秒
         {
             printf("MQTTClient_receive failure:%s\n", strerror(errno));
             break;
         }

         std::string ptr = (char *)receive_msg->payload;

         printf("Topic:%s\nTopic_len:%d\nmsg:", topic, topic_len, ptr);
         printf("\nmsg_len:%d\nmsg_id:%d\n", receive_msg->payloadlen,
                receive_msg->msgid);

         sleep(3);
     }
     printf("end\n");
     MQTTClient_disconnect(client, 10000);
     MQTTClient_destroy(&client);
     return 0;
 }

Last updated