带重连功能的websocket client

  • Post author:
  • Post category:其他


#include <websocketpp/config/asio_no_tls_client.hpp>
#include <websocketpp/client.hpp>

#include <websocketpp/common/thread.hpp>
#include <websocketpp/common/memory.hpp>

#include <cstdlib>
#include <iostream>
#include <map>
#include <string>
#include <sstream>
#include <thread>

#include <functional>

namespace websocketpp
{
namespace close
{
namespace status
{
const static value proactive_close = 4001;
}
} // namespace close
} // namespace websocketpp

typedef std::function<void(const std::string&)>                     WebsocketOnMessageT;
typedef std::function<void(int)>                                    WebsocketOnOpenT;
typedef std::function<void(int, websocketpp::close::status::value)> WebsocketOnCloseT;

class websocket_endpoint;
typedef websocket_endpoint               WebsocketClient;
typedef std::shared_ptr<WebsocketClient> WebsocketClientPtr;

typedef websocketpp::client<websocketpp::config::asio_client> client;

class connection_metadata
{
public:
    typedef websocketpp::lib::shared_ptr<connection_metadata> ptr;

    connection_metadata(int id, websocketpp::connection_hdl hdl, std::string uri, WebsocketOnMessageT onmsg,
                        WebsocketOnOpenT onopen, WebsocketOnCloseT onclose)
        : m_id(id)
        , m_hdl(hdl)
        , m_status("Connecting")
        , m_uri(uri)
        , m_server("N/A")
        , m_onmsg(onmsg)
        , m_onopen(onopen)
        , m_onclose(onclose)
    {
    }

    void on_open(client* c, websocketpp::connection_hdl hdl)
    {
        std::cout << "[on_open] id:" << m_id << std::endl;
        std::cout << "> connectted to " << m_uri << std::endl;
        m_status = "Open";

        client::connection_ptr con = c->get_con_from_hdl(hdl);
        m_server = con->get_response_header("Server");

        if (m_onopen)
        {
            m_onopen(m_id);
        }
    }

    void on_fail(client* c, websocketpp::connection_hdl hdl)
    {
        std::cout << "[on_fail] id:" << m_id << std::endl;
        m_status = "Failed";

        client::connection_ptr con = c->get_con_from_hdl(hdl);
        m_server = con->get_response_header("Server");
        m_error_reason = con->get_ec().message();

        if (m_onclose)
        {
            m_onclose(m_id, websocketpp::close::status::blank);
        }
    }

    void on_close(client* c, websocketpp::connection_hdl hdl)
    {
        std::cout << "[on_close] id:" << m_id << std::endl;
        m_status = "Closed";
        client::connection_ptr con = c->get_con_from_hdl(hdl);
        std::stringstream      s;
        s << "close code: " << con->get_remote_close_code() << " ("
          << websocketpp::close::status::get_string(con->get_remote_close_code())
          << "), close reason: " << con->get_remote_close_reason() << ", local code: " << con->get_local_close_code()
          << " (" << websocketpp::close::status::get_string(con->get_local_close_code())
          << "), local reason: " << con->get_local_close_reason();
        m_error_reason = s.str();
        std::cout << "> " << m_error_reason << std::endl;

        auto code = con->get_local_close_code();
        if (m_onclose)
        {
            m_onclose(m_id, code);
        }
    }

    void on_message(websocketpp::connection_hdl, client::message_ptr msg)
    {
        if (msg->get_opcode() == websocketpp::frame::opcode::text)
        {
            // m_messages.push_back("<< " + msg->get_payload());
            if (m_onmsg)
            {
                m_onmsg(msg->get_payload());
            }
        }
        else
        {
            // m_messages.push_back("<< " + websocketpp::utility::to_hex(msg->get_payload()));
            std::cout << "unknow msg opcode:" << msg->get_opcode() << std::endl;
        }
    }

    websocketpp::connection_hdl get_hdl() const { return m_hdl; }

    int get_id() const { return m_id; }

    std::string get_status() const { return m_status; }

    std::string get_uri() const { return m_uri; }

private:
    int                         m_id;
    websocketpp::connection_hdl m_hdl;
    std::string                 m_status;
    std::string                 m_uri;
    std::string                 m_server;
    std::string                 m_error_reason;
    std::vector<std::string>    m_messages;
    WebsocketOnMessageT         m_onmsg;
    WebsocketOnOpenT            m_onopen;
    WebsocketOnCloseT           m_onclose;
};

class websocket_endpoint
{
public:
    websocket_endpoint(int reconnect_interval = 1)
        : m_next_id(0)
        , m_reconnect_interval(reconnect_interval)
    {
        // std::cout << ">> websocket_endpoint::websocket_endpoint()" << std::endl;
        m_endpoint.clear_access_channels(websocketpp::log::alevel::all);
        m_endpoint.clear_error_channels(websocketpp::log::elevel::all);

        m_endpoint.init_asio();
        m_endpoint.start_perpetual();

        m_thread = websocketpp::lib::make_shared<websocketpp::lib::thread>(&client::run, &m_endpoint);
    }

    ~websocket_endpoint()
    {
        // std::cout << ">> websocket_endpoint::~websocket_endpoint()" << std::endl; //
        m_endpoint.stop_perpetual();

        for (con_list::const_iterator it = m_connection_list.begin(); it != m_connection_list.end(); ++it)
        {
            if (it->second->get_status() != "Open")
            {
                // Only close open connections
                continue;
            }

            std::cout << "> Closing connection " << it->second->get_id() << std::endl;

            websocketpp::lib::error_code ec;
            m_endpoint.close(it->second->get_hdl(), websocketpp::close::status::going_away, "", ec);
            if (ec)
            {
                std::cout << "> Error closing connection " << it->second->get_id() << ": " << ec.message() << std::endl;
            }
        }
        // std::cout << ">> m_thread->join()" << std::endl; //
        m_thread->join();
    }

    void set_onopen(WebsocketOnOpenT onopen) { m_onopen = onopen; }

    void set_onmessage(WebsocketOnMessageT onmsg) { m_onmsg = onmsg; }

    int connect(std::string const& uri)
    {
        websocketpp::lib::error_code ec;

        client::connection_ptr con = m_endpoint.get_connection(uri, ec);

        if (ec)
        {
            std::cout << "> Connect initialization error: " << ec.message() << std::endl;
            return -1;
        }

        int                      new_id = m_next_id++;
        connection_metadata::ptr metadata_ptr = websocketpp::lib::make_shared<connection_metadata>(
            new_id, con->get_handle(), uri, m_onmsg, m_onopen,
            websocketpp::lib::bind(&websocket_endpoint::on_close, this, websocketpp::lib::placeholders::_1,
                                   websocketpp::lib::placeholders::_2));
        m_connection_list[new_id] = metadata_ptr;

        con->set_open_handler(websocketpp::lib::bind(&connection_metadata::on_open, metadata_ptr, &m_endpoint,
                                                     websocketpp::lib::placeholders::_1));
        con->set_fail_handler(websocketpp::lib::bind(&connection_metadata::on_fail, metadata_ptr, &m_endpoint,
                                                     websocketpp::lib::placeholders::_1));
        con->set_close_handler(websocketpp::lib::bind(&connection_metadata::on_close, metadata_ptr, &m_endpoint,
                                                      websocketpp::lib::placeholders::_1));
        con->set_message_handler(websocketpp::lib::bind(&connection_metadata::on_message, metadata_ptr,
                                                        websocketpp::lib::placeholders::_1,
                                                        websocketpp::lib::placeholders::_2));

        m_endpoint.connect(con);

        return new_id;
    }

    void close(int id, websocketpp::close::status::value code, std::string reason)
    {
        websocketpp::lib::error_code ec;

        con_list::iterator metadata_it = m_connection_list.find(id);
        if (metadata_it == m_connection_list.end())
        {
            std::cout << "> No connection found with id " << id << std::endl;
            return;
        }

        m_endpoint.close(metadata_it->second->get_hdl(), code, reason, ec);
        if (ec)
        {
            std::cout << "> Error initiating close: " << ec.message() << std::endl;
        }
    }

    // send to all connection
    void send_to_all(const std::string& message)
    {
        for (con_list::const_iterator it = m_connection_list.begin(); it != m_connection_list.end(); ++it)
        {
            if (it->second->get_status() != "Open")
            {
                continue;
            }

            std::cout << "> Closing connection " << it->second->get_id() << std::endl;

            websocketpp::lib::error_code ec;
            m_endpoint.send(it->second->get_hdl(), message, websocketpp::frame::opcode::text, ec);
            if (ec)
            {
                std::cout << "> Error sending message: " << ec.message() << std::endl;
                return;
            }
        }
    }

    void send(int id, const std::string& message)
    {
        websocketpp::lib::error_code ec;

        con_list::iterator metadata_it = m_connection_list.find(id);
        if (metadata_it == m_connection_list.end())
        {
            std::cout << "> No connection found with id " << id << std::endl;
            return;
        }

        m_endpoint.send(metadata_it->second->get_hdl(), message, websocketpp::frame::opcode::text, ec);
        if (ec)
        {
            std::cout << "> Error sending message: " << ec.message() << std::endl;
            return;
        }
    }

    void on_open(int id)
    {
        if (m_onopen)
        {
            m_onopen(id);
        }
    }

    void on_close(int id, websocketpp::close::status::value code)
    {
        con_list::iterator metadata_it = m_connection_list.find(id);
        if (metadata_it == m_connection_list.end())
        {
            std::cout << "> No connection found with id " << id << std::endl;
            return;
        }
        std::string uri = metadata_it->second->get_uri();
        m_connection_list.erase(id);

        if (code != websocketpp::close::status::proactive_close && code != websocketpp::close::status::going_away)
        {
            std::cout << "> reconnect to " << uri << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(1000 * m_reconnect_interval));
            connect(uri);
        }
    }

private:
    typedef std::map<int, connection_metadata::ptr> con_list;

    client                                                 m_endpoint;
    websocketpp::lib::shared_ptr<websocketpp::lib::thread> m_thread;

    con_list            m_connection_list;
    int                 m_next_id;
    int                 m_reconnect_interval;
    WebsocketOnOpenT    m_onopen;
    WebsocketOnMessageT m_onmsg;
};


https://docs.websocketpp.org/



版权声明:本文为photon222原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。