tcp client
CMakeLists.txt
cmake_minimum_required(VERSION 3.0.0)
project(Tcp_client VERSION 0.1.0)
#boost相关部分
find_package(Boost 1.77.0 COMPONENTS context thread date_time program_options filesystem system coroutine log_setup log REQUIRED)
if(Boost_FOUND)
message("boost found")
include_directories(${Boost_INCLUDE_DIRS})
link_directories(${Boost_LIBRARY_DIRS})
else()
message("boost not found")
endif()
#boost相关部分
#预定义_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING
add_compile_definitions(_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING)
add_compile_definitions(BOOST_BIND_GLOBAL_PLACEHOLDERS)
add_executable(Tcp_client main.cpp)
target_link_libraries(${PROJECT_NAME}
libboost_thread.a
libboost_filesystem.a
libboost_log_setup.a
libboost_log.a
libboost_locale.a
libboost_coroutine.a
libboost_context.a
pthread
)
main.cpp
#include <iostream>
#include <array>
#include <boost/asio.hpp>
using namespace boost::asio;
class Tcp_Client
{
public:
Tcp_Client(io_service& io_service, ip::tcp::endpoint ep):ios(io_service)
{
ptr_socket_.reset(new ip::tcp::socket(io_service));
boost::system::error_code ec;
ptr_socket_->connect(ep, ec);
if(ec)
{
std::cerr << "Error connecting to server: " << ec.message() << std::endl;
bConnected = false;
return;
}
bConnected = true;
}
void Async_read()
{
memset(data_.data(), 0, sizeof(data_));
ptr_socket_->async_read_some(buffer(data_), [this](const boost::system::error_code &ec, size_t bytes) {
if(!ec)
{
std::cout<<"recv size:"<<bytes<<std::endl;
std::cout<<"recv data:"<<data_.data()<<std::endl;
Async_read();
}
else if(ec == error::eof)
{
// 断开连接
bConnected = false;
std::cout<<"disconnected !"<<std::endl;
}
else
{
std::cout<<"read error: "<<ec.message()<<std::endl;
}
});
}
void Async_write(std::string message)
{
if(message.empty() || !bConnected)
{
return;
}
async_write(*ptr_socket_, buffer(message.c_str(), message.size()), [this](const boost::system::error_code &ec, size_t writed_bytes)
{
if (!ec)
{
}
else
{
std::cout <<"send error:"<<ec.message() << std::endl;
bConnected = false;
}
});
}
private:
bool bConnected;
io_service &ios;
std::shared_ptr<ip::tcp::socket> ptr_socket_;
std::array<char, 1024> data_;
};
int main(int argc, char **argv)
{
if(argc < 3)
{
std::cerr<<"Few parameter !"<<std::endl;
std::cerr <<"please input parameter: server_ip server_port"<< std::endl;
return 0;
}
std::string ip = argv[1];
short port = atoi(argv[2]);
ip::tcp::endpoint ep(ip::address_v4::from_string(ip), port);
io_service ios_;
Tcp_Client client(ios_, ep);
client.Async_write("hello world !!!");
client.Async_read();
ios_.run();
return 0;
}
tcp server
CMakeLists.txt
cmake_minimum_required(VERSION 3.0.0)
project(Tcp_link VERSION 0.1.0)
#boost相关部分
find_package(Boost 1.77.0 COMPONENTS context thread date_time program_options filesystem system coroutine log_setup log REQUIRED)
if(Boost_FOUND)
message("boost found")
include_directories(${Boost_INCLUDE_DIRS})
link_directories(${Boost_LIBRARY_DIRS})
else()
message("boost not found")
endif()
#boost相关部分
#预定义_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING
add_compile_definitions(_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING)
add_compile_definitions(BOOST_BIND_GLOBAL_PLACEHOLDERS)
add_executable(Tcp_link
tcp_server.h
tcp_server.cpp
main.cpp
)
set(CPACK_PROJECT_NAME ${PROJECT_NAME})
set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})
target_link_libraries(${PROJECT_NAME}
libboost_thread.a
libboost_filesystem.a
libboost_log_setup.a
libboost_log.a
libboost_locale.a
libboost_coroutine.a
libboost_context.a
pthread
)
tcp_server.h
#pragma once
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <iostream>
#include <array>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <atomic>
using namespace boost::asio;
typedef std::function<void(void)> Callback_Close;
typedef std::function<void(std::string)> Callback_SendMessage;
// 接收客户端发送来的消息
class Session_Recv : public std::enable_shared_from_this<Session_Recv>
{
public:
Session_Recv(ip::tcp::socket socket);
void SetClose(Callback_Close pClose);
void SetMsg_Ntf(Callback_SendMessage pMsg);
void async_read();
bool GetState();
private:
ip::tcp::socket socket_;
std::array<char, 1024> data_;
std::atomic<bool> connected_;
Callback_Close ptr_close_;
Callback_SendMessage ptr_message_;
};
// 向客户端发送消息
class Session_Send : public std::enable_shared_from_this<Session_Send>
{
public:
Session_Send(ip::tcp::socket socket);
void SetClose(Callback_Close pClose);
void async_send(std::string message);
bool GetState();
private:
ip::tcp::socket socket_;
std::atomic<bool> connected_;
Callback_Close ptr_close_;
};
// 服务器 接收port_recv端口发来的数据 向port_send端口发送数据
class TcpServer
{
public:
TcpServer(boost::asio::io_context &io_context, short port_recv, short port_send, std::string ip_recv = "", std::string ip_send = "");
void Close_Session_Recv();
void Close_Session_Send();
void GetContent(std::string content);
private:
void Async_Accept_Recv();
void Async_Accept_Send();
void SendMessage();
private:
std::shared_ptr<std::thread> ptr_Thread_Msg;
std::shared_ptr<boost::asio::ip::tcp::acceptor> Acceptor_Recv_;
std::shared_ptr<boost::asio::ip::tcp::acceptor> Acceptor_Send_;
std::shared_ptr<ip::tcp::endpoint> ptr_Recv_ep;
std::shared_ptr<ip::tcp::endpoint> ptr_Send_ep;
std::vector<std::shared_ptr<Session_Recv>> clients_recv;
std::vector<std::shared_ptr<Session_Send>> clients_send;
std::queue<std::string> Array_messages;
std::atomic<bool> bStopSend;
std::mutex mtx;
};
tcp_server.cpp
#include "tcp_server.h"
Session_Recv::Session_Recv(ip::tcp::socket socket):socket_(std::move(socket))
{
connected_ = true;
ptr_close_ = nullptr;
ptr_message_ = nullptr;
}
/*===============================================
* 函数:SetClose
* 功能:设置关闭函数
* 参数:pClose 回调函数
* 返回:
===============================================*/
void Session_Recv::SetClose(Callback_Close pClose)
{
ptr_close_ = pClose;
}
/*===============================================
* 函数:SetMsg_Ntf
* 功能:设置数据回调函数
* 参数:pMsg 回调函数
* 返回:
===============================================*/
void Session_Recv::SetMsg_Ntf(Callback_SendMessage pMsg)
{
ptr_message_ = pMsg;
}
/*===============================================
* 函数:async_read
* 功能:异步读取
* 参数:
* 返回:
===============================================*/
void Session_Recv::async_read()
{
auto self(shared_from_this());
memset(data_.data(), 0, sizeof(data_));
// 读操作完成时回调该函数, 读取到一些数据就会触发回调
socket_.async_read_some(buffer(data_), [this, self](const boost::system::error_code &ec, size_t bytes) {
if (!ec)
{
if(ptr_message_)
{
ptr_message_(std::string(data_.data(), bytes));
}
std::cout << "size:"<< bytes<<std::endl;
async_read();
}
else if(ec == error::eof)
{
// 断开连接
std::cout <<"read disconnect:"<<ec.message() << std::endl;
connected_ = false;
if(ptr_close_)
{
ptr_close_();
}
}
else
{
std::cout<<"read error:"<<ec.message()<<std::endl;
}
});
}
/*===============================================
* 函数:GetState
* 功能:获取连接状态
* 参数:
* 返回:true 已连接
* false 断开连接
===============================================*/
bool Session_Recv::GetState()
{
return connected_;
}
Session_Send::Session_Send(ip::tcp::socket socket):socket_(std::move(socket))
{
connected_ = true;
ptr_close_ = nullptr;
}
/*===============================================
* 函数:SetClose
* 功能:设置关闭函数
* 参数:pClose 回调函数
* 返回:
===============================================*/
void Session_Send::SetClose(Callback_Close pClose)
{
ptr_close_ = pClose;
}
/*===============================================
* 函数:async_send
* 功能:异步发送消息
* 参数:message 消息
* 返回:
===============================================*/
void Session_Send::async_send(std::string message)
{
if(message.empty() || !connected_)
{
return;
}
auto self(shared_from_this());
async_write(socket_, buffer(message.c_str(), message.size()), [this, self](const boost::system::error_code &ec, size_t writed_bytes)
{
if (!ec)
{
// async_read();
}
else
{
std::cout <<"send error:"<<ec.message() << std::endl;
connected_ = false;
if(ptr_close_)
{
ptr_close_();
}
}
});
}
/*===============================================
* 函数:GetState
* 功能:获取连接状态
* 参数:
* 返回:true 已连接
* false 断开连接
===============================================*/
bool Session_Send::GetState()
{
return connected_;
}
/
TcpServer::TcpServer(boost::asio::io_context &io_context, short port_recv, short port_send, std::string ip_recv, std::string ip_send)
{
ptr_Thread_Msg.reset(new std::thread(std::bind(&TcpServer::SendMessage, this)));
if("" == ip_recv)
{
ptr_Recv_ep.reset(new ip::tcp::endpoint(ip::tcp::v4(), port_recv));
}
else
{
ptr_Recv_ep.reset(new ip::tcp::endpoint(ip::address::from_string(ip_recv), port_recv));
}
Acceptor_Recv_.reset(new ip::tcp::acceptor(io_context, *ptr_Recv_ep));
if("" == ip_send)
{
ptr_Send_ep.reset(new ip::tcp::endpoint(ip::tcp::v4(), port_send));
}
else
{
ptr_Send_ep.reset(new ip::tcp::endpoint(ip::address::from_string(ip_send), port_send));
}
Acceptor_Send_.reset(new ip::tcp::acceptor(io_context, *ptr_Send_ep));
clients_recv.clear();
clients_send.clear();
Async_Accept_Recv();
Async_Accept_Send();
}
/*===============================================
* 函数:async_accept
* 功能:异步处理客户端的连接请求
* 参数:
* 返回:
===============================================*/
void TcpServer::Async_Accept_Recv()
{
Acceptor_Recv_->async_accept([this](boost::system::error_code ec, ip::tcp::socket socket)
{
if (!ec)
{
auto ptr = std::make_shared<Session_Recv>(std::move(socket));
ptr->SetClose(std::bind(&TcpServer::Close_Session_Recv, this));
ptr->SetMsg_Ntf(std::bind(&TcpServer::GetContent, this, std::placeholders::_1));
ptr->async_read();
clients_recv.push_back(ptr);
}
Async_Accept_Recv();
});
}
/*===============================================
* 函数:Async_Accept_Send
* 功能:异步处理客户端的连接请求
* 参数:
* 返回:
===============================================*/
void TcpServer::Async_Accept_Send()
{
Acceptor_Send_->async_accept([this](boost::system::error_code ec, ip::tcp::socket socket)
{
if (!ec)
{
auto ptr = std::make_shared<Session_Send>(std::move(socket));
ptr->SetClose(std::bind(&TcpServer::Close_Session_Send, this));
clients_send.push_back(ptr);
}
Async_Accept_Send();
});
}
/*===============================================
* 函数:Close_session
* 功能:关闭客户端
* 参数:
* 返回:
===============================================*/
void TcpServer::Close_Session_Recv()
{
for(auto ptr = clients_recv.begin(); ptr != clients_recv.end();)
{
if (false == ptr->get()->GetState())
{
ptr->reset();
clients_recv.erase(ptr);
continue;
}
ptr++;
}
std::cout <<"Send_Data_Clients: "<<clients_recv.size()<<", Recv_Data_Clients: "<<clients_send.size()<< std::endl;
}
/*===============================================
* 函数:Close_session
* 功能:关闭客户端
* 参数:
* 返回:
===============================================*/
void TcpServer::Close_Session_Send()
{
for(auto ptr = clients_send.begin(); ptr != clients_send.end();)
{
if(false == ptr->get()->GetState())
{
ptr->reset();
clients_send.erase(ptr);
continue;
}
ptr++;
}
std::cout <<"Send_Data_Clients: "<<clients_recv.size()<<", Recv_Data_Clients: "<<clients_send.size()<< std::endl;
}
/*===============================================
* 函数:GetContent
* 功能:获取接收到得消息
* 参数:content 消息
* 返回:
===============================================*/
void TcpServer::GetContent(std::string content)
{
if(content.empty())
{
return;
}
if(mtx.try_lock())
{
Array_messages.push(content);
mtx.unlock();
}
}
/*===============================================
* 函数:SendMessage
* 功能:发送消息线程
* 参数:
* 返回:
===============================================*/
void TcpServer::SendMessage()
{
std::string message = "";
bStopSend = false;
while(!bStopSend)
{
if(mtx.try_lock())
{
if(!Array_messages.empty())
{
message = Array_messages.front();
Array_messages.pop();
}
mtx.unlock();
}
if(!message.empty())
{
for(auto ptr = clients_send.begin(); ptr != clients_send.end(); ptr ++)
{
std::cout << "send message" << std::endl;
ptr->get()->async_send(message);
}
message.clear();
continue;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
main.cpp
#include "tcp_server.h"
int main(int argc, char **argv)
{
if(argc < 3)
{
std::cerr <<"Few parameter !"<< std::endl;
std::cerr <<"please input parameter: (recv)port (send)port (recv)ip (send)ip"<< std::endl;
std::cerr <<"port is necessary, ip is unnecessary"<< std::endl;
return 0;
}
std::cout<<"start ..."<<std::endl;
std::string ip_recv_ = "";
std::string ip_send_ = "";
int port_recv_ = atoi(argv[1]);
int port_send_ = atoi(argv[2]);
if(argc > 3)
{
ip_recv_ = argv[3];
}
if(argc > 4)
{
ip_recv_ = argv[4];
}
boost::asio::io_context io_context;
TcpServer server(io_context, port_recv_, port_send_, ip_recv_, ip_recv_);
io_context.run();
return 0;
}
版权声明:本文为seaeress原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。