#include <websocketpp/config/asio_no_tls_client.hpp>
#include <websocketpp/client.hpp>
#include <websocketpp/common/thread.hpp>
#include <websocketpp/common/memory.hpp>
#include <cstdlib>
#include <iostream>
#include <map>
#include <string>
#include <sstream>
#include <thread>
#include <functional>
namespace websocketpp
{
namespace close
{
namespace status
{
const static value proactive_close = 4001;
}
} // namespace close
} // namespace websocketpp
typedef std::function<void(const std::string&)> WebsocketOnMessageT;
typedef std::function<void(int)> WebsocketOnOpenT;
typedef std::function<void(int, websocketpp::close::status::value)> WebsocketOnCloseT;
class websocket_endpoint;
typedef websocket_endpoint WebsocketClient;
typedef std::shared_ptr<WebsocketClient> WebsocketClientPtr;
typedef websocketpp::client<websocketpp::config::asio_client> client;
class connection_metadata
{
public:
typedef websocketpp::lib::shared_ptr<connection_metadata> ptr;
connection_metadata(int id, websocketpp::connection_hdl hdl, std::string uri, WebsocketOnMessageT onmsg,
WebsocketOnOpenT onopen, WebsocketOnCloseT onclose)
: m_id(id)
, m_hdl(hdl)
, m_status("Connecting")
, m_uri(uri)
, m_server("N/A")
, m_onmsg(onmsg)
, m_onopen(onopen)
, m_onclose(onclose)
{
}
void on_open(client* c, websocketpp::connection_hdl hdl)
{
std::cout << "[on_open] id:" << m_id << std::endl;
std::cout << "> connectted to " << m_uri << std::endl;
m_status = "Open";
client::connection_ptr con = c->get_con_from_hdl(hdl);
m_server = con->get_response_header("Server");
if (m_onopen)
{
m_onopen(m_id);
}
}
void on_fail(client* c, websocketpp::connection_hdl hdl)
{
std::cout << "[on_fail] id:" << m_id << std::endl;
m_status = "Failed";
client::connection_ptr con = c->get_con_from_hdl(hdl);
m_server = con->get_response_header("Server");
m_error_reason = con->get_ec().message();
if (m_onclose)
{
m_onclose(m_id, websocketpp::close::status::blank);
}
}
void on_close(client* c, websocketpp::connection_hdl hdl)
{
std::cout << "[on_close] id:" << m_id << std::endl;
m_status = "Closed";
client::connection_ptr con = c->get_con_from_hdl(hdl);
std::stringstream s;
s << "close code: " << con->get_remote_close_code() << " ("
<< websocketpp::close::status::get_string(con->get_remote_close_code())
<< "), close reason: " << con->get_remote_close_reason() << ", local code: " << con->get_local_close_code()
<< " (" << websocketpp::close::status::get_string(con->get_local_close_code())
<< "), local reason: " << con->get_local_close_reason();
m_error_reason = s.str();
std::cout << "> " << m_error_reason << std::endl;
auto code = con->get_local_close_code();
if (m_onclose)
{
m_onclose(m_id, code);
}
}
void on_message(websocketpp::connection_hdl, client::message_ptr msg)
{
if (msg->get_opcode() == websocketpp::frame::opcode::text)
{
// m_messages.push_back("<< " + msg->get_payload());
if (m_onmsg)
{
m_onmsg(msg->get_payload());
}
}
else
{
// m_messages.push_back("<< " + websocketpp::utility::to_hex(msg->get_payload()));
std::cout << "unknow msg opcode:" << msg->get_opcode() << std::endl;
}
}
websocketpp::connection_hdl get_hdl() const { return m_hdl; }
int get_id() const { return m_id; }
std::string get_status() const { return m_status; }
std::string get_uri() const { return m_uri; }
private:
int m_id;
websocketpp::connection_hdl m_hdl;
std::string m_status;
std::string m_uri;
std::string m_server;
std::string m_error_reason;
std::vector<std::string> m_messages;
WebsocketOnMessageT m_onmsg;
WebsocketOnOpenT m_onopen;
WebsocketOnCloseT m_onclose;
};
class websocket_endpoint
{
public:
websocket_endpoint(int reconnect_interval = 1)
: m_next_id(0)
, m_reconnect_interval(reconnect_interval)
{
// std::cout << ">> websocket_endpoint::websocket_endpoint()" << std::endl;
m_endpoint.clear_access_channels(websocketpp::log::alevel::all);
m_endpoint.clear_error_channels(websocketpp::log::elevel::all);
m_endpoint.init_asio();
m_endpoint.start_perpetual();
m_thread = websocketpp::lib::make_shared<websocketpp::lib::thread>(&client::run, &m_endpoint);
}
~websocket_endpoint()
{
// std::cout << ">> websocket_endpoint::~websocket_endpoint()" << std::endl; //
m_endpoint.stop_perpetual();
for (con_list::const_iterator it = m_connection_list.begin(); it != m_connection_list.end(); ++it)
{
if (it->second->get_status() != "Open")
{
// Only close open connections
continue;
}
std::cout << "> Closing connection " << it->second->get_id() << std::endl;
websocketpp::lib::error_code ec;
m_endpoint.close(it->second->get_hdl(), websocketpp::close::status::going_away, "", ec);
if (ec)
{
std::cout << "> Error closing connection " << it->second->get_id() << ": " << ec.message() << std::endl;
}
}
// std::cout << ">> m_thread->join()" << std::endl; //
m_thread->join();
}
void set_onopen(WebsocketOnOpenT onopen) { m_onopen = onopen; }
void set_onmessage(WebsocketOnMessageT onmsg) { m_onmsg = onmsg; }
int connect(std::string const& uri)
{
websocketpp::lib::error_code ec;
client::connection_ptr con = m_endpoint.get_connection(uri, ec);
if (ec)
{
std::cout << "> Connect initialization error: " << ec.message() << std::endl;
return -1;
}
int new_id = m_next_id++;
connection_metadata::ptr metadata_ptr = websocketpp::lib::make_shared<connection_metadata>(
new_id, con->get_handle(), uri, m_onmsg, m_onopen,
websocketpp::lib::bind(&websocket_endpoint::on_close, this, websocketpp::lib::placeholders::_1,
websocketpp::lib::placeholders::_2));
m_connection_list[new_id] = metadata_ptr;
con->set_open_handler(websocketpp::lib::bind(&connection_metadata::on_open, metadata_ptr, &m_endpoint,
websocketpp::lib::placeholders::_1));
con->set_fail_handler(websocketpp::lib::bind(&connection_metadata::on_fail, metadata_ptr, &m_endpoint,
websocketpp::lib::placeholders::_1));
con->set_close_handler(websocketpp::lib::bind(&connection_metadata::on_close, metadata_ptr, &m_endpoint,
websocketpp::lib::placeholders::_1));
con->set_message_handler(websocketpp::lib::bind(&connection_metadata::on_message, metadata_ptr,
websocketpp::lib::placeholders::_1,
websocketpp::lib::placeholders::_2));
m_endpoint.connect(con);
return new_id;
}
void close(int id, websocketpp::close::status::value code, std::string reason)
{
websocketpp::lib::error_code ec;
con_list::iterator metadata_it = m_connection_list.find(id);
if (metadata_it == m_connection_list.end())
{
std::cout << "> No connection found with id " << id << std::endl;
return;
}
m_endpoint.close(metadata_it->second->get_hdl(), code, reason, ec);
if (ec)
{
std::cout << "> Error initiating close: " << ec.message() << std::endl;
}
}
// send to all connection
void send_to_all(const std::string& message)
{
for (con_list::const_iterator it = m_connection_list.begin(); it != m_connection_list.end(); ++it)
{
if (it->second->get_status() != "Open")
{
continue;
}
std::cout << "> Closing connection " << it->second->get_id() << std::endl;
websocketpp::lib::error_code ec;
m_endpoint.send(it->second->get_hdl(), message, websocketpp::frame::opcode::text, ec);
if (ec)
{
std::cout << "> Error sending message: " << ec.message() << std::endl;
return;
}
}
}
void send(int id, const std::string& message)
{
websocketpp::lib::error_code ec;
con_list::iterator metadata_it = m_connection_list.find(id);
if (metadata_it == m_connection_list.end())
{
std::cout << "> No connection found with id " << id << std::endl;
return;
}
m_endpoint.send(metadata_it->second->get_hdl(), message, websocketpp::frame::opcode::text, ec);
if (ec)
{
std::cout << "> Error sending message: " << ec.message() << std::endl;
return;
}
}
void on_open(int id)
{
if (m_onopen)
{
m_onopen(id);
}
}
void on_close(int id, websocketpp::close::status::value code)
{
con_list::iterator metadata_it = m_connection_list.find(id);
if (metadata_it == m_connection_list.end())
{
std::cout << "> No connection found with id " << id << std::endl;
return;
}
std::string uri = metadata_it->second->get_uri();
m_connection_list.erase(id);
if (code != websocketpp::close::status::proactive_close && code != websocketpp::close::status::going_away)
{
std::cout << "> reconnect to " << uri << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000 * m_reconnect_interval));
connect(uri);
}
}
private:
typedef std::map<int, connection_metadata::ptr> con_list;
client m_endpoint;
websocketpp::lib::shared_ptr<websocketpp::lib::thread> m_thread;
con_list m_connection_list;
int m_next_id;
int m_reconnect_interval;
WebsocketOnOpenT m_onopen;
WebsocketOnMessageT m_onmsg;
};
版权声明:本文为photon222原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。