deviceAccessLayer/src/mqtt/mqtt_cli.cpp

200 lines
6.2 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#include "mqtt_cli.h"
#include <boost/asio/io_context.hpp>
#include "application.h"
#include "common/sys_util.h"
namespace zsy
{
const std::string MqttCli::TYPE_REMOTE = "remote";
const std::string MqttCli::TYPE_LOCAL = "local";
void MqttCli::connect()
{
try
{
LOGGER_ERROR("MQTT{} 正在连接...", config.clientId);
ioc = std::make_shared<boost::asio::io_context>();
cli = MQTT_NS::make_sync_client(*ioc, config.server, config.port);
cli->set_client_id(config.clientId);
cli->set_user_name(config.username);
cli->set_password(config.password);
cli->set_clean_session(true);
cli->set_connack_handler([this](bool sp, MQTT_NS::connect_return_code connack_return_code)
{
return onConnAck(sp, connack_return_code);
});
cli->set_close_handler([this]
{
onClose();
});
cli->set_error_handler([this](MQTT_NS::error_code ec)
{
onError(ec);
});
cli->set_publish_handler([this](MQTT_NS::optional<packet_id_t> packet_id, MQTT_NS::publish_options pubopts, MQTT_NS::buffer topic_,MQTT_NS::buffer payload_)
{
return onMessage(packet_id, pubopts, topic_, payload_);
});
eventLoop = std::make_unique<std::jthread>([this] { this->run(); });
} catch (const std::exception &e)
{
LOGGER_ERROR("MQTT{} 连接失败:{}", config.clientId, e.what());
}
catch (...)
{
LOGGER_ERROR("MQTT{} 连接失败", config.clientId);
}
}
MqttCli::MqttCli(MqttProperties &config, std::shared_ptr<EventManager> eventManager)
: status(0),
config(config),
eventManager(eventManager)
{
connect();
}
void MqttCli::reconnect()
{
do
{
std::unique_lock lock(status_mtx);
if (status == 2) break;
mqtt::error_code ec;
cli->connect(ec);
if (!ec) break;
LOGGER_INFO("MQTT 10 秒后重新连接:{}{}", config.clientId, SysUtil::str_w_a(SysUtil::str_a_w( ec.message(),"GBK" )));
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} while (true);
}
void MqttCli::run()
{
try
{
reconnect();
ioc->run();
} catch (std::exception &e)
{
std::unique_lock lock(status_mtx);
status = 0;
LOGGER_ERROR("MQTT 连接失败:{}", SysUtil::str_w_a(SysUtil::str_a_w(std::string(e.what()),"GBK" )));
}
catch (...)
{
std::unique_lock lock(status_mtx);
status = 0;
LOGGER_ERROR("未知异常MQTT 连接失败");
}
}
bool MqttCli::onConnAck(bool sp, mqtt::connect_return_code connack_return_code)
{
std::unique_lock lock(status_mtx);
try
{
if (connack_return_code == MQTT_NS::connect_return_code::accepted)
{
status = 1;
LOGGER_INFO("MQTT 连接成功:{}", config.clientId);
for (const auto &[topic, qos]: config.subscribes)
{
cli->subscribe(topic, static_cast<MQTT_NS::qos>(qos));
LOGGER_INFO("MQTT{} 已订阅:【{} {}】", config.clientId, topic, qos);
}
} else
{
status = 0;
LOGGER_INFO("MQTT 连接失败:{}", config.clientId);
}
} catch (...)
{
status = 0;
LOGGER_ERROR("MQTT 连接失败:{}", config.clientId);
}
return true;
}
void MqttCli::onClose()
{
std::unique_lock lock(status_mtx);
status = 0;
LOGGER_INFO("MQTT 连接已关闭:{}", config.clientId);
reconnect();
}
void MqttCli::onError(mqtt::error_code ec)
{
std::unique_lock lock(status_mtx);
status = 0;
LOGGER_INFO("MQTT 错误:{} {}", config.clientId, ec.message());
reconnect();
}
bool MqttCli::onMessage(mqtt::optional<packet_id_t> packet_id, mqtt::publish_options pubopts, mqtt::buffer topic_, mqtt::buffer payload_)
{
auto topic = topic_.to_string();
auto payload = payload_.to_string();
LOGGER_INFO("MQTT 收到消息,设备名称:{}ID{},主题:{},消息内容:{}", config.name, config.clientId, topic, payload);
eventManager->publish(config.clientId + "/" + topic, payload);
return true;
}
std::string MqttCli::getClientId() const
{
return config.clientId;
}
void MqttCli::publish(std::string topic, std::string payload, uint8_t qos)
{
std::shared_lock<std::shared_mutex> lock(status_mtx);
if (status == 0)
{
LOGGER_ERROR("MQTT 未连接,无法发布消息:{}", config.clientId);
return;
}
cli->publish(topic, payload, static_cast<MQTT_NS::qos>(qos));
}
MqttCli::~MqttCli()
{
std::unique_lock lock(status_mtx);
status = 2;
cli->disconnect();
ioc->stop();
}
MqttCliHolder::MqttCliHolder(const nlohmann::json &config, std::shared_ptr<EventManager> eventManager)
: configs(config.get<MqttCliConfig>().mqtts)
{
for (auto mqtt: configs)
{
auto cli = std::make_shared<MqttCli>(mqtt, eventManager);
if (mqtt.sn == MqttCli::TYPE_LOCAL)
{
eventManager->subscribe(mqtt.clientId + "/stop", [](const EventManager::Event &)
{
Application::stop();
});
}
mqttClis.emplace(mqtt.sn, cli);
}
}
std::shared_ptr<MqttCli> MqttCliHolder::localCli()
{
if (!mqttClis.contains(MqttCli::TYPE_LOCAL))return nullptr;
return mqttClis.at(MqttCli::TYPE_LOCAL);
}
std::shared_ptr<MqttCli> MqttCliHolder::remoteCli()
{
if (!mqttClis.contains(MqttCli::TYPE_REMOTE))return nullptr;
return mqttClis.at(MqttCli::TYPE_REMOTE);
}
} // zsy