diff --git a/CMakeLists.txt b/CMakeLists.txt index 1e06598..8f9a50c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -78,4 +78,5 @@ target_compile_definitions(${PROJECT_NAME} PUBLIC SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_INFO # 启用日志宏 CPPHTTPLIB_OPENSSL_SUPPORT # httplib 支持 openssl MQTT_STD_VARIANT + _WIN32_WINNT=0x0601 ) diff --git a/src/application.cpp b/src/application.cpp index e838148..a2fc98f 100644 --- a/src/application.cpp +++ b/src/application.cpp @@ -1,6 +1,7 @@ #include "application.h" #include #include "common/sys_util.h" +#include "mqtt/mqtt_svr.h" namespace zsy { @@ -9,6 +10,7 @@ namespace zsy std::shared_ptr Application::threadPool = nullptr; std::shared_ptr Application::eventManager = nullptr; std::shared_ptr Application::httpSvr = nullptr; + std::shared_ptr Application::mqttSvr = nullptr; std::shared_ptr Application::mqttCliHolder = nullptr; std::shared_ptr Application::oss = nullptr; @@ -119,6 +121,8 @@ namespace zsy oss = std::make_shared(config); + mqttSvr = std::make_shared(); + mqttCliHolder = std::make_shared(config, eventManager); reportSvr = std::make_shared(config); diff --git a/src/application.h b/src/application.h index b8c004a..d26ef4b 100644 --- a/src/application.h +++ b/src/application.h @@ -6,6 +6,8 @@ #include "oss/oss.h" #include "device_holder.h" #include "controller/test.h" +#include "mqtt/mqtt_svr.h" + namespace zsy { struct AppProperties @@ -44,10 +46,12 @@ namespace zsy static std::shared_ptr httpSvr; - static std::shared_ptr mqttCliHolder; - static std::shared_ptr oss; + static std::shared_ptr mqttSvr; + + static std::shared_ptr mqttCliHolder; + static std::shared_ptr reportSvr; static std::shared_ptr deviceHolder; diff --git a/src/barrier/recognize_data.cpp b/src/barrier/recognize_data.cpp index 815391e..f5918b4 100644 --- a/src/barrier/recognize_data.cpp +++ b/src/barrier/recognize_data.cpp @@ -1,9 +1,11 @@ #include "recognize_data.h" +#include + namespace zsy { RecognizeData::RecognizeData(bool isFront, std::string sn, std::string license, std::shared_ptr imageFile) - : isFront(isFront), sn(sn), license(license), imageFile(imageFile) + : isFront(isFront), sn(std::move(sn)), license(std::move(license)), imageFile(imageFile) { } } // zsy diff --git a/src/common/thread_pool.cpp b/src/common/thread_pool.cpp index 553937f..e001b50 100644 --- a/src/common/thread_pool.cpp +++ b/src/common/thread_pool.cpp @@ -77,8 +77,8 @@ namespace zsy { std::lock_guard lock(status_mtx); status = 2; - LOGGER_INFO("正在销毁线程池..."); + LOGGER_INFO("正在销毁线程池:{}", name); threads.clear(); - LOGGER_INFO("线程池已销毁"); + LOGGER_INFO("线程池已销毁:{}", name); } } // zsy diff --git a/src/mqtt/mqtt_cli.cpp b/src/mqtt/mqtt_cli.cpp index 11d1881..4972ab0 100644 --- a/src/mqtt/mqtt_cli.cpp +++ b/src/mqtt/mqtt_cli.cpp @@ -16,6 +16,8 @@ namespace zsy LOGGER_ERROR("MQTT:{} 正在连接...", config.clientId); ioc = std::make_shared(); cli = MQTT_NS::make_sync_client(*ioc, config.server, config.port); + cli->set_keep_alive_sec(30); + cli->set_pingresp_timeout(std::chrono::seconds(10)); cli->set_client_id(config.clientId); cli->set_user_name(config.username); cli->set_password(config.password); @@ -120,17 +122,21 @@ namespace zsy void MqttCli::onClose() { - std::unique_lock lock(status_mtx); - status = 0; + { + std::unique_lock lock(status_mtx); + status = 0; + } LOGGER_INFO("MQTT 连接已关闭:{}", config.clientId); reconnect(); } void MqttCli::onError(mqtt::error_code ec) { - std::unique_lock lock(status_mtx); - status = 0; - LOGGER_INFO("MQTT 错误:{} {}", config.clientId, ec.message()); + { + std::unique_lock lock(status_mtx); + status = 0; + } + LOGGER_INFO("MQTT 错误:{} {}", config.clientId, SysUtil::str_w_a(SysUtil::str_a_w(std::string(ec.message()),"GBK" ))); reconnect(); } diff --git a/src/mqtt/mqtt_svr.cpp b/src/mqtt/mqtt_svr.cpp new file mode 100644 index 0000000..39d962f --- /dev/null +++ b/src/mqtt/mqtt_svr.cpp @@ -0,0 +1,173 @@ +#include "mqtt_svr.h" +#include "common/logging.h" +#include "common/sys_util.h" + +namespace zsy +{ + void MqttSvr::run() + { + svr->listen(); + LOGGER_INFO("MQTT 服务启动成功"); + ioc->run(); + LOGGER_INFO("MQTT 服务已停止"); + } + + void MqttSvr::close_proc(con_sp_t const &con) + { + try + { + { + std::lock_guard lock(connections_mutex); + connections->erase(con); + } + std::lock_guard lock(subs_mutex); + auto &idx = subs->get(); + auto r = idx.equal_range(con); + idx.erase(r.first, r.second); + } catch (...) + { + LOGGER_ERROR("MQTT 服务关闭失败"); + } + } + + MqttSvr::MqttSvr(uint16_t port) + : connections(std::make_shared >()), + subs(std::make_shared()) + { + LOGGER_INFO("MQTT 服务正在启动,端口:{}", port); + + ioc = std::make_shared(); + + svr = std::make_shared >( + boost::asio::ip::tcp::endpoint( + boost::asio::ip::tcp::v4(), + boost::lexical_cast(port)), + *ioc); + + svr->set_error_handler([](MQTT_NS::error_code ec) + { + LOGGER_ERROR("MQTT 服务启动失败: {}", SysUtil::str_w_a(SysUtil::str_a_w( ec.message(),"GBK" ))); + }); + svr->set_accept_handler([this](con_sp_t spep) + { + auto &ep = *spep; + std::weak_ptr wp(spep); + + using packet_id_t = typename std::remove_reference_t::packet_id_t; + + ep.start_session(std::move(spep)); + + ep.set_close_handler([this, wp] + { + LOGGER_INFO("MQTT 客户端断开连接"); + auto sp = wp.lock(); + close_proc(sp); + }); + ep.set_error_handler([this, wp](MQTT_NS::error_code ec) + { + LOGGER_ERROR("MQTT 客户端断开连接: {}", SysUtil::str_w_a(SysUtil::str_a_w( ec.message(),"GBK" ))); + auto sp = wp.lock(); + close_proc(sp); + }); + + ep.set_connect_handler([this, wp](MQTT_NS::buffer client_id, MQTT_NS::optional username, MQTT_NS::optional password, MQTT_NS::optional will_, bool clean_session, std::uint16_t keep_alive) + { + using namespace MQTT_NS::literals; + LOGGER_INFO("MQTT 客户端连接成功:{}、{}、{}", client_id.to_string(), username ? username.value().to_string() : "none", password ? password.value().to_string() : "none"); + auto sp = wp.lock(); + std::lock_guard lock(connections_mutex); + connections->insert(sp); + sp->connack(false, MQTT_NS::connect_return_code::accepted); + return true; + }); + ep.set_disconnect_handler([this, wp] + { + LOGGER_INFO("MQTT 客户端断开连接"); + auto sp = wp.lock(); + close_proc(sp); + }); + ep.set_pingreq_handler([wp] + { + auto sp = wp.lock(); + sp->pingresp(); + return true; + }); + + ep.set_puback_handler([](packet_id_t packet_id) + { + // LOGGER_INFO("MQTT 客户端发布消息确认:{}", packet_id); + return true; + }); + ep.set_pubrec_handler([](packet_id_t packet_id) + { + // LOGGER_INFO("MQTT 客户端发布消息确认:{}", packet_id); + return true; + }); + ep.set_pubrel_handler([](packet_id_t packet_id) + { + // LOGGER_INFO("MQTT 客户端发布消息确认:{}", packet_id); + return true; + }); + ep.set_pubcomp_handler([](packet_id_t packet_id) + { + // LOGGER_INFO("MQTT 客户端发布消息确认:{}", packet_id); + return true; + }); + ep.set_publish_handler([this](MQTT_NS::optional packet_id, MQTT_NS::publish_options pubopts, MQTT_NS::buffer topic_name, MQTT_NS::buffer contents) + { + // LOGGER_INFO("MQTT 客户端发布消息:{}、{}", topic_name, contents); + std::lock_guard lock(subs_mutex); + auto const &idx = subs->get(); + auto r = idx.equal_range(topic_name); + for (; r.first != r.second; ++r.first) + { + r.first->con->publish( + topic_name, + contents, + std::min(r.first->qos_value, pubopts.get_qos()) + ); + } + return true; + }); + ep.set_subscribe_handler([this, wp](packet_id_t packet_id, std::vector entries) + { + std::vector res; + res.reserve(entries.size()); + auto sp = wp.lock(); + for (auto const &e: entries) + { + res.emplace_back(MQTT_NS::qos_to_suback_return_code(e.subopts.get_qos())); + std::lock_guard lock(subs_mutex); + subs->emplace(std::move(e.topic_filter), sp, e.subopts.get_qos()); + } + sp->suback(packet_id, res); + return true; + }); + ep.set_unsubscribe_handler([this, wp](packet_id_t packet_id, std::vector entries) + { + auto sp = wp.lock(); + for (auto const &e: entries) + { + std::lock_guard lock(subs_mutex); + auto it = subs->find(std::make_tuple(sp, e.topic_filter)); + if (it != subs->end()) + { + subs->erase(it); + } + } + sp->unsuback(packet_id); + return true; + }); + }); + + eventLoop = std::make_unique(&MqttSvr::run, this); + } + + MqttSvr::~MqttSvr() + { + LOGGER_INFO("MQTT 服务正在关闭"); + svr->close(); + ioc->stop(); + LOGGER_INFO("MQTT 服务已关闭"); + } +} // zsy diff --git a/src/mqtt/mqtt_svr.h b/src/mqtt/mqtt_svr.h new file mode 100644 index 0000000..fc43ce6 --- /dev/null +++ b/src/mqtt/mqtt_svr.h @@ -0,0 +1,88 @@ +#ifndef MQTT_SVR_H +#define MQTT_SVR_H + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + + +namespace zsy +{ + class MqttSvr + { + using con_t = MQTT_NS::server<>::endpoint_t; + using con_sp_t = std::shared_ptr; + + struct sub_con + { + sub_con(MQTT_NS::buffer topic, con_sp_t con, MQTT_NS::qos qos_value) + : topic(std::move(topic)), con(std::move(con)), qos_value(qos_value) + { + } + + MQTT_NS::buffer topic; + con_sp_t con; + MQTT_NS::qos qos_value; + }; + + struct tag_topic + { + }; + + struct tag_con + { + }; + + struct tag_con_topic + { + }; + + using mi_sub_con = boost::multi_index::multi_index_container< + sub_con, + boost::multi_index::indexed_by< + boost::multi_index::ordered_unique< + boost::multi_index::tag, + boost::multi_index::composite_key< + sub_con, + BOOST_MULTI_INDEX_MEMBER(sub_con, con_sp_t, con), + BOOST_MULTI_INDEX_MEMBER(sub_con, MQTT_NS::buffer, topic) + > + >, + boost::multi_index::ordered_non_unique< + boost::multi_index::tag, + BOOST_MULTI_INDEX_MEMBER(sub_con, MQTT_NS::buffer, topic) + >, + boost::multi_index::ordered_non_unique< + boost::multi_index::tag, + BOOST_MULTI_INDEX_MEMBER(sub_con, con_sp_t, con) + > + > + >; + + void run(); + + void close_proc(con_sp_t const &con); + std::mutex connections_mutex; + std::mutex subs_mutex; + std::unique_ptr eventLoop; + std::shared_ptr > connections; + std::shared_ptr subs; + std::shared_ptr ioc; + std::shared_ptr > svr; + + public: + MqttSvr(uint16_t port = 1883); + + ~MqttSvr(); + }; +} // zsy + +#endif //MQTT_SVR_H diff --git a/start.bat b/start_deviceAccessLayer.bat similarity index 100% rename from start.bat rename to start_deviceAccessLayer.bat