配置MQTT参数Kconfig文件
config MQTT_PUB_TOPIC
string "MQTT publish topic"
default "tp2/test"
config MQTT_SUB_TOPIC
string "MQTT subscribe topic"
default "tp1/test"
config MQTT_BROKER_HOSTNAME
string "MQTT broker hostname"
default "xxxx-mqtt.xxxxxx.cn"
MQTT的API
订阅
/**@brief Function to subscribe to the configured topic
*/
static int subscribe(void)
{
struct mqtt_topic subscribe_topic = {
.topic = {
.utf8 = CONFIG_MQTT_SUB_TOPIC,
.size = strlen(CONFIG_MQTT_SUB_TOPIC)
},
.qos = MQTT_QOS_1_AT_LEAST_ONCE
};
const struct mqtt_subscription_list subscription_list = {
.list = &subscribe_topic,
.list_count = 1,
.message_id = 1234
};
LOG_INF("Subscribing to: %s len %u", CONFIG_MQTT_SUB_TOPIC,
(unsigned int)strlen(CONFIG_MQTT_SUB_TOPIC));
return mqtt_subscribe(&client, &subscription_list);
}
推送
/**@brief Function to publish data on the configured topic
*/
static int data_publish(struct mqtt_client *c, enum mqtt_qos qos,
uint8_t *data, size_t len)
{
struct mqtt_publish_param param;
param.message.topic.qos = qos;
param.message.topic.topic.utf8 = CONFIG_MQTT_PUB_TOPIC;
param.message.topic.topic.size = strlen(CONFIG_MQTT_PUB_TOPIC);
param.message.payload.data = data;
param.message.payload.len = len;
param.message_id = sys_rand32_get();
param.dup_flag = 0;
param.retain_flag = 0;
data_print("Publishing: ", data, len);
LOG_INF("to topic: %s len: %u",
CONFIG_MQTT_PUB_TOPIC,
(unsigned int)strlen(CONFIG_MQTT_PUB_TOPIC));
return mqtt_publish(c, ¶m);
}
MQTT事件处理
/**@brief MQTT client event handler
*/
void mqtt_evt_handler(struct mqtt_client *const c,
const struct mqtt_evt *evt)
{
int err;
switch (evt->type) {
case MQTT_EVT_CONNACK:
if (evt->result != 0) {
LOG_ERR("MQTT connect failed: %d", evt->result);
break;
}
LOG_INF("MQTT client connected");
subscribe();
break;
case MQTT_EVT_DISCONNECT:
LOG_INF("MQTT client disconnected: %d", evt->result);
break;
case MQTT_EVT_PUBLISH: {
const struct mqtt_publish_param *p = &evt->param.publish;
LOG_INF("MQTT PUBLISH result=%d len=%d",
evt->result, p->message.payload.len);
err = publish_get_payload(c, p->message.payload.len);
if (p->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
const struct mqtt_puback_param ack = {
.message_id = p->message_id
};
/* Send acknowledgment. */
mqtt_publish_qos1_ack(&client, &ack);
}
if (err >= 0) {
data_print("Received: ", payload_buf,
p->message.payload.len);
/* Echo back received data */
data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE,
payload_buf, p->message.payload.len);
} else if (err == -EMSGSIZE) {
LOG_ERR("Received payload (%d bytes) is larger than the payload buffer "
"size (%d bytes).",
p->message.payload.len, sizeof(payload_buf));
} else {
LOG_ERR("publish_get_payload failed: %d", err);
LOG_INF("Disconnecting MQTT client...");
err = mqtt_disconnect(c);
if (err) {
LOG_ERR("Could not disconnect: %d", err);
}
}
} break;
case MQTT_EVT_PUBACK:
if (evt->result != 0) {
LOG_ERR("MQTT PUBACK error: %d", evt->result);
break;
}
LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id);
break;
case MQTT_EVT_SUBACK:
if (evt->result != 0) {
LOG_ERR("MQTT SUBACK error: %d", evt->result);
break;
}
LOG_INF("SUBACK packet id: %u", evt->param.suback.message_id);
break;
case MQTT_EVT_PINGRESP:
if (evt->result != 0) {
LOG_ERR("MQTT PINGRESP error: %d", evt->result);
}
break;
default:
LOG_INF("Unhandled MQTT event type: %d", evt->type);
break;
}
}
配置MQTT服务器
/**@brief Resolves the configured hostname and
* initializes the MQTT broker structure
*/
static int broker_init(void)
{
int err;
struct addrinfo *result;
struct addrinfo *addr;
struct addrinfo hints = {
.ai_family = AF_INET,
.ai_socktype = SOCK_STREAM
};
err = getaddrinfo(CONFIG_MQTT_BROKER_HOSTNAME, NULL, &hints, &result);
if (err) {
LOG_ERR("getaddrinfo failed: %d", err);
return -ECHILD;
}
addr = result;
/* Look for address of the broker. */
while (addr != NULL) {
/* IPv4 Address. */
if (addr->ai_addrlen == sizeof(struct sockaddr_in)) {
struct sockaddr_in *broker4 =
((struct sockaddr_in *)&broker);
char ipv4_addr[NET_IPV4_ADDR_LEN];
broker4->sin_addr.s_addr =
((struct sockaddr_in *)addr->ai_addr)
->sin_addr.s_addr;
broker4->sin_family = AF_INET;
broker4->sin_port = htons(CONFIG_MQTT_BROKER_PORT);
inet_ntop(AF_INET, &broker4->sin_addr.s_addr,
ipv4_addr, sizeof(ipv4_addr));
LOG_INF("IPv4 Address found %s", log_strdup(ipv4_addr));
break;
} else {
LOG_ERR("ai_addrlen = %u should be %u or %u",
(unsigned int)addr->ai_addrlen,
(unsigned int)sizeof(struct sockaddr_in),
(unsigned int)sizeof(struct sockaddr_in6));
}
addr = addr->ai_next;
}
/* Free the address. */
freeaddrinfo(result);
return err;
}
测试结果
服务器自行搭建
可以参考我的另一篇文章
文章评论