修复问题

master
lzq 2025-07-17 13:24:37 +08:00
parent 375ac4fca9
commit 24ec736c8f
9 changed files with 288 additions and 10 deletions

View File

@ -78,4 +78,5 @@ target_compile_definitions(${PROJECT_NAME} PUBLIC
SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_INFO #
CPPHTTPLIB_OPENSSL_SUPPORT # httplib openssl
MQTT_STD_VARIANT
_WIN32_WINNT=0x0601
)

View File

@ -1,6 +1,7 @@
#include "application.h"
#include <fstream>
#include "common/sys_util.h"
#include "mqtt/mqtt_svr.h"
namespace zsy
{
@ -9,6 +10,7 @@ namespace zsy
std::shared_ptr<ThreadPool> Application::threadPool = nullptr;
std::shared_ptr<EventManager> Application::eventManager = nullptr;
std::shared_ptr<HttpSvr> Application::httpSvr = nullptr;
std::shared_ptr<MqttSvr> Application::mqttSvr = nullptr;
std::shared_ptr<MqttCliHolder> Application::mqttCliHolder = nullptr;
std::shared_ptr<OSS> Application::oss = nullptr;
@ -119,6 +121,8 @@ namespace zsy
oss = std::make_shared<OSS>(config);
mqttSvr = std::make_shared<MqttSvr>();
mqttCliHolder = std::make_shared<MqttCliHolder>(config, eventManager);
reportSvr = std::make_shared<ReportSvr>(config);

View File

@ -6,6 +6,8 @@
#include "oss/oss.h"
#include "device_holder.h"
#include "controller/test.h"
#include "mqtt/mqtt_svr.h"
namespace zsy
{
struct AppProperties
@ -44,10 +46,12 @@ namespace zsy
static std::shared_ptr<HttpSvr> httpSvr;
static std::shared_ptr<MqttCliHolder> mqttCliHolder;
static std::shared_ptr<OSS> oss;
static std::shared_ptr<MqttSvr> mqttSvr;
static std::shared_ptr<MqttCliHolder> mqttCliHolder;
static std::shared_ptr<ReportSvr> reportSvr;
static std::shared_ptr<DeviceHolder> deviceHolder;

View File

@ -1,9 +1,11 @@
#include "recognize_data.h"
#include <utility>
namespace zsy
{
RecognizeData::RecognizeData(bool isFront, std::string sn, std::string license, std::shared_ptr<std::istream> imageFile)
: isFront(isFront), sn(sn), license(license), imageFile(imageFile)
: isFront(isFront), sn(std::move(sn)), license(std::move(license)), imageFile(imageFile)
{
}
} // zsy

View File

@ -77,8 +77,8 @@ namespace zsy
{
std::lock_guard lock(status_mtx);
status = 2;
LOGGER_INFO("正在销毁线程池...");
LOGGER_INFO("正在销毁线程池{}", name);
threads.clear();
LOGGER_INFO("线程池已销毁");
LOGGER_INFO("线程池已销毁{}", name);
}
} // zsy

View File

@ -16,6 +16,8 @@ namespace zsy
LOGGER_ERROR("MQTT{} 正在连接...", config.clientId);
ioc = std::make_shared<boost::asio::io_context>();
cli = MQTT_NS::make_sync_client(*ioc, config.server, config.port);
cli->set_keep_alive_sec(30);
cli->set_pingresp_timeout(std::chrono::seconds(10));
cli->set_client_id(config.clientId);
cli->set_user_name(config.username);
cli->set_password(config.password);
@ -120,17 +122,21 @@ namespace zsy
void MqttCli::onClose()
{
std::unique_lock lock(status_mtx);
status = 0;
{
std::unique_lock lock(status_mtx);
status = 0;
}
LOGGER_INFO("MQTT 连接已关闭:{}", config.clientId);
reconnect();
}
void MqttCli::onError(mqtt::error_code ec)
{
std::unique_lock lock(status_mtx);
status = 0;
LOGGER_INFO("MQTT 错误:{} {}", config.clientId, ec.message());
{
std::unique_lock lock(status_mtx);
status = 0;
}
LOGGER_INFO("MQTT 错误:{} {}", config.clientId, SysUtil::str_w_a(SysUtil::str_a_w(std::string(ec.message()),"GBK" )));
reconnect();
}

View File

@ -0,0 +1,173 @@
#include "mqtt_svr.h"
#include "common/logging.h"
#include "common/sys_util.h"
namespace zsy
{
void MqttSvr::run()
{
svr->listen();
LOGGER_INFO("MQTT 服务启动成功");
ioc->run();
LOGGER_INFO("MQTT 服务已停止");
}
void MqttSvr::close_proc(con_sp_t const &con)
{
try
{
{
std::lock_guard<std::mutex> lock(connections_mutex);
connections->erase(con);
}
std::lock_guard<std::mutex> lock(subs_mutex);
auto &idx = subs->get<tag_con>();
auto r = idx.equal_range(con);
idx.erase(r.first, r.second);
} catch (...)
{
LOGGER_ERROR("MQTT 服务关闭失败");
}
}
MqttSvr::MqttSvr(uint16_t port)
: connections(std::make_shared<std::set<con_sp_t> >()),
subs(std::make_shared<mi_sub_con>())
{
LOGGER_INFO("MQTT 服务正在启动,端口:{}", port);
ioc = std::make_shared<boost::asio::io_context>();
svr = std::make_shared<MQTT_NS::server<> >(
boost::asio::ip::tcp::endpoint(
boost::asio::ip::tcp::v4(),
boost::lexical_cast<std::uint16_t>(port)),
*ioc);
svr->set_error_handler([](MQTT_NS::error_code ec)
{
LOGGER_ERROR("MQTT 服务启动失败: {}", SysUtil::str_w_a(SysUtil::str_a_w( ec.message(),"GBK" )));
});
svr->set_accept_handler([this](con_sp_t spep)
{
auto &ep = *spep;
std::weak_ptr<con_t> wp(spep);
using packet_id_t = typename std::remove_reference_t<decltype(ep)>::packet_id_t;
ep.start_session(std::move(spep));
ep.set_close_handler([this, wp]
{
LOGGER_INFO("MQTT 客户端断开连接");
auto sp = wp.lock();
close_proc(sp);
});
ep.set_error_handler([this, wp](MQTT_NS::error_code ec)
{
LOGGER_ERROR("MQTT 客户端断开连接: {}", SysUtil::str_w_a(SysUtil::str_a_w( ec.message(),"GBK" )));
auto sp = wp.lock();
close_proc(sp);
});
ep.set_connect_handler([this, wp](MQTT_NS::buffer client_id, MQTT_NS::optional<MQTT_NS::buffer> username, MQTT_NS::optional<MQTT_NS::buffer> password, MQTT_NS::optional<MQTT_NS::will> will_, bool clean_session, std::uint16_t keep_alive)
{
using namespace MQTT_NS::literals;
LOGGER_INFO("MQTT 客户端连接成功:{}、{}、{}", client_id.to_string(), username ? username.value().to_string() : "none", password ? password.value().to_string() : "none");
auto sp = wp.lock();
std::lock_guard<std::mutex> lock(connections_mutex);
connections->insert(sp);
sp->connack(false, MQTT_NS::connect_return_code::accepted);
return true;
});
ep.set_disconnect_handler([this, wp]
{
LOGGER_INFO("MQTT 客户端断开连接");
auto sp = wp.lock();
close_proc(sp);
});
ep.set_pingreq_handler([wp]
{
auto sp = wp.lock();
sp->pingresp();
return true;
});
ep.set_puback_handler([](packet_id_t packet_id)
{
// LOGGER_INFO("MQTT 客户端发布消息确认:{}", packet_id);
return true;
});
ep.set_pubrec_handler([](packet_id_t packet_id)
{
// LOGGER_INFO("MQTT 客户端发布消息确认:{}", packet_id);
return true;
});
ep.set_pubrel_handler([](packet_id_t packet_id)
{
// LOGGER_INFO("MQTT 客户端发布消息确认:{}", packet_id);
return true;
});
ep.set_pubcomp_handler([](packet_id_t packet_id)
{
// LOGGER_INFO("MQTT 客户端发布消息确认:{}", packet_id);
return true;
});
ep.set_publish_handler([this](MQTT_NS::optional<packet_id_t> packet_id, MQTT_NS::publish_options pubopts, MQTT_NS::buffer topic_name, MQTT_NS::buffer contents)
{
// LOGGER_INFO("MQTT 客户端发布消息:{}、{}", topic_name, contents);
std::lock_guard<std::mutex> lock(subs_mutex);
auto const &idx = subs->get<tag_topic>();
auto r = idx.equal_range(topic_name);
for (; r.first != r.second; ++r.first)
{
r.first->con->publish(
topic_name,
contents,
std::min(r.first->qos_value, pubopts.get_qos())
);
}
return true;
});
ep.set_subscribe_handler([this, wp](packet_id_t packet_id, std::vector<MQTT_NS::subscribe_entry> entries)
{
std::vector<MQTT_NS::suback_return_code> res;
res.reserve(entries.size());
auto sp = wp.lock();
for (auto const &e: entries)
{
res.emplace_back(MQTT_NS::qos_to_suback_return_code(e.subopts.get_qos()));
std::lock_guard<std::mutex> lock(subs_mutex);
subs->emplace(std::move(e.topic_filter), sp, e.subopts.get_qos());
}
sp->suback(packet_id, res);
return true;
});
ep.set_unsubscribe_handler([this, wp](packet_id_t packet_id, std::vector<MQTT_NS::unsubscribe_entry> entries)
{
auto sp = wp.lock();
for (auto const &e: entries)
{
std::lock_guard<std::mutex> lock(subs_mutex);
auto it = subs->find(std::make_tuple(sp, e.topic_filter));
if (it != subs->end())
{
subs->erase(it);
}
}
sp->unsuback(packet_id);
return true;
});
});
eventLoop = std::make_unique<std::jthread>(&MqttSvr::run, this);
}
MqttSvr::~MqttSvr()
{
LOGGER_INFO("MQTT 服务正在关闭");
svr->close();
ioc->stop();
LOGGER_INFO("MQTT 服务已关闭");
}
} // zsy

View File

@ -0,0 +1,88 @@
#ifndef MQTT_SVR_H
#define MQTT_SVR_H
#include <iostream>
#include <iomanip>
#include <set>
#include <mqtt_server_cpp.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/composite_key.hpp>
namespace zsy
{
class MqttSvr
{
using con_t = MQTT_NS::server<>::endpoint_t;
using con_sp_t = std::shared_ptr<con_t>;
struct sub_con
{
sub_con(MQTT_NS::buffer topic, con_sp_t con, MQTT_NS::qos qos_value)
: topic(std::move(topic)), con(std::move(con)), qos_value(qos_value)
{
}
MQTT_NS::buffer topic;
con_sp_t con;
MQTT_NS::qos qos_value;
};
struct tag_topic
{
};
struct tag_con
{
};
struct tag_con_topic
{
};
using mi_sub_con = boost::multi_index::multi_index_container<
sub_con,
boost::multi_index::indexed_by<
boost::multi_index::ordered_unique<
boost::multi_index::tag<tag_con_topic>,
boost::multi_index::composite_key<
sub_con,
BOOST_MULTI_INDEX_MEMBER(sub_con, con_sp_t, con),
BOOST_MULTI_INDEX_MEMBER(sub_con, MQTT_NS::buffer, topic)
>
>,
boost::multi_index::ordered_non_unique<
boost::multi_index::tag<tag_topic>,
BOOST_MULTI_INDEX_MEMBER(sub_con, MQTT_NS::buffer, topic)
>,
boost::multi_index::ordered_non_unique<
boost::multi_index::tag<tag_con>,
BOOST_MULTI_INDEX_MEMBER(sub_con, con_sp_t, con)
>
>
>;
void run();
void close_proc(con_sp_t const &con);
std::mutex connections_mutex;
std::mutex subs_mutex;
std::unique_ptr<std::jthread> eventLoop;
std::shared_ptr<std::set<con_sp_t> > connections;
std::shared_ptr<mi_sub_con> subs;
std::shared_ptr<boost::asio::io_context> ioc;
std::shared_ptr<MQTT_NS::server<> > svr;
public:
MqttSvr(uint16_t port = 1883);
~MqttSvr();
};
} // zsy
#endif //MQTT_SVR_H