使用boost实现tcp server/client

  • Post author:
  • Post category:其他




tcp client

CMakeLists.txt

cmake_minimum_required(VERSION 3.0.0)
project(Tcp_client VERSION 0.1.0)

#boost相关部分
find_package(Boost 1.77.0 COMPONENTS context thread date_time program_options filesystem system coroutine log_setup log REQUIRED)
if(Boost_FOUND)
  message("boost found")
  include_directories(${Boost_INCLUDE_DIRS})
  link_directories(${Boost_LIBRARY_DIRS})
else()
  message("boost not found")
endif()
#boost相关部分

#预定义_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING
add_compile_definitions(_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING)
add_compile_definitions(BOOST_BIND_GLOBAL_PLACEHOLDERS)

add_executable(Tcp_client main.cpp)

target_link_libraries(${PROJECT_NAME} 
libboost_thread.a 
libboost_filesystem.a 
libboost_log_setup.a 
libboost_log.a 
libboost_locale.a
libboost_coroutine.a 
libboost_context.a
pthread
)

main.cpp

#include <iostream>
#include <array>
#include <boost/asio.hpp>

using namespace boost::asio;

class Tcp_Client
{
public:
    Tcp_Client(io_service& io_service, ip::tcp::endpoint ep):ios(io_service)
    {
        ptr_socket_.reset(new ip::tcp::socket(io_service));

        boost::system::error_code ec;
        ptr_socket_->connect(ep, ec);
        if(ec)
        {
            std::cerr << "Error connecting to server: " << ec.message() << std::endl;
            bConnected = false;
            return;
        }
        bConnected = true;
    }

    void Async_read()
    {
        memset(data_.data(), 0, sizeof(data_));
        ptr_socket_->async_read_some(buffer(data_), [this](const boost::system::error_code &ec, size_t bytes) {
            if(!ec)
            {
                std::cout<<"recv size:"<<bytes<<std::endl;
                std::cout<<"recv data:"<<data_.data()<<std::endl;
                Async_read();
            }
            else if(ec == error::eof)
            {
                // 断开连接
                bConnected = false;
                std::cout<<"disconnected !"<<std::endl;
            }
            else
            {
                std::cout<<"read error: "<<ec.message()<<std::endl;
            }
        });
    }

    void Async_write(std::string message)
    {
        if(message.empty() || !bConnected)
        {
            return;
        }

        async_write(*ptr_socket_, buffer(message.c_str(), message.size()), [this](const boost::system::error_code &ec, size_t writed_bytes)
		{
			if (!ec)
			{
			
			}
            else
            {
                std::cout <<"send error:"<<ec.message() << std::endl;
                bConnected = false;
            }
		});
    } 

private:
    bool bConnected;
    io_service &ios;
    std::shared_ptr<ip::tcp::socket> ptr_socket_;

    std::array<char, 1024> data_;
};

int main(int argc, char **argv)
{
    if(argc < 3)
    {
        std::cerr<<"Few parameter !"<<std::endl;
        std::cerr <<"please input parameter: server_ip  server_port"<< std::endl;
        return 0;
    }

    std::string ip = argv[1];
    short port = atoi(argv[2]);


    ip::tcp::endpoint ep(ip::address_v4::from_string(ip), port);
    io_service ios_;
    Tcp_Client client(ios_, ep);
    client.Async_write("hello world !!!");
    client.Async_read();
    ios_.run();

    return 0;
}



tcp server

CMakeLists.txt

cmake_minimum_required(VERSION 3.0.0)
project(Tcp_link VERSION 0.1.0)

#boost相关部分
find_package(Boost 1.77.0 COMPONENTS context thread date_time program_options filesystem system coroutine log_setup log REQUIRED)
if(Boost_FOUND)
  message("boost found")
  include_directories(${Boost_INCLUDE_DIRS})
  link_directories(${Boost_LIBRARY_DIRS})
else()
  message("boost not found")
endif()
#boost相关部分

#预定义_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING
add_compile_definitions(_SILENCE_CXX17_ALLOCATOR_VOID_DEPRECATION_WARNING)
add_compile_definitions(BOOST_BIND_GLOBAL_PLACEHOLDERS)

add_executable(Tcp_link 
tcp_server.h 
tcp_server.cpp
main.cpp
)

set(CPACK_PROJECT_NAME ${PROJECT_NAME})
set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})

target_link_libraries(${PROJECT_NAME} 
libboost_thread.a 
libboost_filesystem.a 
libboost_log_setup.a 
libboost_log.a 
libboost_locale.a
libboost_coroutine.a 
libboost_context.a
pthread
)

tcp_server.h

#pragma once

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <iostream>
#include <array>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <atomic>

using namespace boost::asio;

typedef std::function<void(void)> Callback_Close;
typedef std::function<void(std::string)> Callback_SendMessage;

// 接收客户端发送来的消息
class Session_Recv : public std::enable_shared_from_this<Session_Recv>
{
public:
    Session_Recv(ip::tcp::socket socket);

    void SetClose(Callback_Close pClose);
    void SetMsg_Ntf(Callback_SendMessage pMsg);
    void async_read();
    bool GetState();

private:
    ip::tcp::socket socket_;
    std::array<char, 1024> data_;
    std::atomic<bool> connected_;

    Callback_Close ptr_close_;
    Callback_SendMessage ptr_message_;
};

// 向客户端发送消息
class Session_Send : public std::enable_shared_from_this<Session_Send>
{
public:
    Session_Send(ip::tcp::socket socket);

    void SetClose(Callback_Close pClose);
    void async_send(std::string message);
    bool GetState();

private:
    ip::tcp::socket socket_;
    std::atomic<bool> connected_;

    Callback_Close ptr_close_;
};

// 服务器 接收port_recv端口发来的数据    向port_send端口发送数据
class TcpServer
{
public:
    TcpServer(boost::asio::io_context &io_context, short port_recv, short port_send, std::string ip_recv = "", std::string ip_send = "");

    void Close_Session_Recv();
    void Close_Session_Send();
    void GetContent(std::string content);

private:
    void Async_Accept_Recv();
    void Async_Accept_Send();

    void SendMessage();

private:
    std::shared_ptr<std::thread> ptr_Thread_Msg;
    std::shared_ptr<boost::asio::ip::tcp::acceptor> Acceptor_Recv_;
    std::shared_ptr<boost::asio::ip::tcp::acceptor> Acceptor_Send_;

    std::shared_ptr<ip::tcp::endpoint> ptr_Recv_ep;
    std::shared_ptr<ip::tcp::endpoint> ptr_Send_ep;

    std::vector<std::shared_ptr<Session_Recv>> clients_recv;
    std::vector<std::shared_ptr<Session_Send>> clients_send;

    std::queue<std::string> Array_messages;
    std::atomic<bool> bStopSend;
    std::mutex mtx;
};

tcp_server.cpp

#include "tcp_server.h"

Session_Recv::Session_Recv(ip::tcp::socket socket):socket_(std::move(socket))
{
    connected_ = true;
    ptr_close_ = nullptr;
    ptr_message_ = nullptr;
}

/*===============================================
 * 函数:SetClose
 * 功能:设置关闭函数
 * 参数:pClose     回调函数
 * 返回:
===============================================*/
void Session_Recv::SetClose(Callback_Close pClose)
{
    ptr_close_ = pClose;
}

/*===============================================
 * 函数:SetMsg_Ntf
 * 功能:设置数据回调函数
 * 参数:pMsg     回调函数
 * 返回:
===============================================*/
void Session_Recv::SetMsg_Ntf(Callback_SendMessage pMsg)
{
    ptr_message_ = pMsg;
}

/*===============================================
 * 函数:async_read
 * 功能:异步读取
 * 参数:
 * 返回:
===============================================*/
void Session_Recv::async_read()
{
    auto self(shared_from_this());
    memset(data_.data(), 0, sizeof(data_));

	// 读操作完成时回调该函数, 读取到一些数据就会触发回调
	socket_.async_read_some(buffer(data_), [this, self](const boost::system::error_code &ec, size_t bytes) {
	    if (!ec)
	    {
            if(ptr_message_)
            {
                 
                ptr_message_(std::string(data_.data(), bytes));
            }
            std::cout << "size:"<< bytes<<std::endl;

            async_read();
        }
        else if(ec == error::eof)
        {
            // 断开连接
            std::cout <<"read disconnect:"<<ec.message() << std::endl;
            connected_ = false;
            if(ptr_close_)
            {
                ptr_close_();
            }
        }
        else
        {
            std::cout<<"read error:"<<ec.message()<<std::endl;
        }
	});
}

/*===============================================
 * 函数:GetState
 * 功能:获取连接状态
 * 参数:
 * 返回:true           已连接
 *      false          断开连接
===============================================*/
bool Session_Recv::GetState()
{
    return connected_;
}

Session_Send::Session_Send(ip::tcp::socket socket):socket_(std::move(socket))
{
    connected_ = true;
    ptr_close_ = nullptr;
}

/*===============================================
 * 函数:SetClose
 * 功能:设置关闭函数
 * 参数:pClose     回调函数
 * 返回:
===============================================*/
void Session_Send::SetClose(Callback_Close pClose)
{
    ptr_close_ = pClose;
}

/*===============================================
 * 函数:async_send
 * 功能:异步发送消息
 * 参数:message       消息
 * 返回:
===============================================*/
void Session_Send::async_send(std::string message)
{
    if(message.empty() || !connected_)
    {
        return;
    }

    auto self(shared_from_this());
    async_write(socket_, buffer(message.c_str(), message.size()), [this, self](const boost::system::error_code &ec, size_t writed_bytes)
		{
			if (!ec)
			{
			//	async_read();
			}
            else
            {
                std::cout <<"send error:"<<ec.message() << std::endl;
                connected_ = false;
                if(ptr_close_)
                {
                    ptr_close_();
                }
            }
		});
}

/*===============================================
 * 函数:GetState
 * 功能:获取连接状态
 * 参数:
 * 返回:true           已连接
 *      false          断开连接
===============================================*/
bool Session_Send::GetState()
{
    return connected_;
}

/

TcpServer::TcpServer(boost::asio::io_context &io_context, short port_recv, short port_send, std::string ip_recv, std::string ip_send)
{
    ptr_Thread_Msg.reset(new std::thread(std::bind(&TcpServer::SendMessage, this)));

    if("" == ip_recv)
    {
        ptr_Recv_ep.reset(new ip::tcp::endpoint(ip::tcp::v4(), port_recv));
    }
    else
    {
        ptr_Recv_ep.reset(new ip::tcp::endpoint(ip::address::from_string(ip_recv), port_recv));
    }
    Acceptor_Recv_.reset(new ip::tcp::acceptor(io_context, *ptr_Recv_ep));

    if("" == ip_send)
    {
        ptr_Send_ep.reset(new ip::tcp::endpoint(ip::tcp::v4(), port_send));
    }
    else
    {
        ptr_Send_ep.reset(new ip::tcp::endpoint(ip::address::from_string(ip_send), port_send));
    }
    Acceptor_Send_.reset(new ip::tcp::acceptor(io_context, *ptr_Send_ep));
    
    clients_recv.clear();
    clients_send.clear();

    Async_Accept_Recv();
    Async_Accept_Send();
}

/*===============================================
 * 函数:async_accept
 * 功能:异步处理客户端的连接请求
 * 参数:
 * 返回:
===============================================*/
void TcpServer::Async_Accept_Recv()
{
    Acceptor_Recv_->async_accept([this](boost::system::error_code ec, ip::tcp::socket socket)
        {
            if (!ec)
            {
                auto ptr = std::make_shared<Session_Recv>(std::move(socket));
                ptr->SetClose(std::bind(&TcpServer::Close_Session_Recv, this));
                ptr->SetMsg_Ntf(std::bind(&TcpServer::GetContent, this, std::placeholders::_1));
		        ptr->async_read();
                clients_recv.push_back(ptr);
            }

            Async_Accept_Recv();
        });
}

/*===============================================
 * 函数:Async_Accept_Send
 * 功能:异步处理客户端的连接请求
 * 参数:
 * 返回:
===============================================*/
void TcpServer::Async_Accept_Send()
{
    Acceptor_Send_->async_accept([this](boost::system::error_code ec, ip::tcp::socket socket)
        {
            if (!ec)
            {
                auto ptr = std::make_shared<Session_Send>(std::move(socket));
                ptr->SetClose(std::bind(&TcpServer::Close_Session_Send, this));
                clients_send.push_back(ptr);
            }

            Async_Accept_Send();
        });
}

/*===============================================
 * 函数:Close_session
 * 功能:关闭客户端
 * 参数:
 * 返回:
===============================================*/
void TcpServer::Close_Session_Recv()
{
    for(auto ptr = clients_recv.begin(); ptr != clients_recv.end();)
    {
        if (false == ptr->get()->GetState())
        {
            ptr->reset();
            clients_recv.erase(ptr);
            continue;
        }
        ptr++;
    }
    std::cout <<"Send_Data_Clients: "<<clients_recv.size()<<", Recv_Data_Clients: "<<clients_send.size()<< std::endl;
}

/*===============================================
 * 函数:Close_session
 * 功能:关闭客户端
 * 参数:
 * 返回:
===============================================*/
void TcpServer::Close_Session_Send()
{
    for(auto ptr = clients_send.begin(); ptr != clients_send.end();)
    {
        if(false == ptr->get()->GetState())
        {
            ptr->reset();
            clients_send.erase(ptr);
            continue;
        }
        ptr++;
    }
    std::cout <<"Send_Data_Clients: "<<clients_recv.size()<<", Recv_Data_Clients: "<<clients_send.size()<< std::endl;
}

/*===============================================
 * 函数:GetContent
 * 功能:获取接收到得消息
 * 参数:content         消息
 * 返回:
===============================================*/
void TcpServer::GetContent(std::string content)
{
    if(content.empty())
    {
        return;
    }

    if(mtx.try_lock())
    {
        Array_messages.push(content);
        mtx.unlock();
    }
}

/*===============================================
 * 函数:SendMessage
 * 功能:发送消息线程
 * 参数:
 * 返回:
===============================================*/
void TcpServer::SendMessage()
{
    std::string message = "";
    bStopSend = false;
    while(!bStopSend)
    {
        if(mtx.try_lock())
        {
            if(!Array_messages.empty())
            {
                message = Array_messages.front();
                Array_messages.pop();
            }
            mtx.unlock();
        }

        if(!message.empty())
        {
            for(auto ptr = clients_send.begin(); ptr != clients_send.end(); ptr ++)
            {
                std::cout << "send message" << std::endl;
                ptr->get()->async_send(message);
            }

            message.clear();
            continue;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }
}

main.cpp

#include "tcp_server.h"

int main(int argc, char **argv)
{
    if(argc < 3)
    {
        std::cerr <<"Few parameter !"<< std::endl;
        std::cerr <<"please input parameter: (recv)port  (send)port  (recv)ip  (send)ip"<< std::endl;
        std::cerr <<"port is necessary, ip is unnecessary"<< std::endl;
        return 0;
    }

    std::cout<<"start ..."<<std::endl;

    std::string ip_recv_ = "";
    std::string ip_send_ = "";
    int port_recv_ = atoi(argv[1]);
    int port_send_ = atoi(argv[2]);

    if(argc > 3)
    {
        ip_recv_ = argv[3];
    }

    if(argc > 4)
    {
        ip_recv_ = argv[4];
    }

    boost::asio::io_context io_context;
    TcpServer server(io_context, port_recv_, port_send_, ip_recv_, ip_recv_);
    io_context.run();
    return 0;
}



版权声明:本文为seaeress原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。