libwebsockets的学习

  • Post author:
  • Post category:其他


1 下载源码

  apt-get source libwebsockets-dev
  sudo apt-get install libwebsockets-dev

2 编译(查看readme.)

cd libwebsockets
mkdir build
cd build
cmake ../ -D

参考 README.build.md   cmake .. -DCMAKE_BUILD_TYPE=DEBUG

3 使用

3.1 Client

/*
 ============================================================================
 Name        : test-websocket.c
 Author      : cj
 Version     :
 Copyright   : Your copyright notice
 Description : Hello World in C, Ansi-style
 ============================================================================
 */

#include <stdio.h>
#include <stdlib.h>
#include <libwebsockets.h>
#include "clog.h"
#include <pthread.h>

struct session_data {
	char name[100];
};

static int ws_service_callback(struct lws *wsi,
		enum lws_callback_reasons reason, void *user, void *in, size_t size) {
	int n;
	struct session_data *session = (struct session_data*) user;

	switch (reason) {
	case LWS_CALLBACK_GET_THREAD_ID:
		break;
	case LWS_CALLBACK_CLIENT_ESTABLISHED: {
		log_d("CLIENT_ESTABLISHED");
		char *str = "hello start..";
		int len = strlen(str);
		unsigned char *out = (unsigned char *) malloc(
				sizeof(unsigned char)
						* (LWS_SEND_BUFFER_PRE_PADDING + len
								+ LWS_SEND_BUFFER_POST_PADDING));
		memcpy(out + LWS_SEND_BUFFER_PRE_PADDING, str, len);
		n = lws_write(wsi, out + LWS_SEND_BUFFER_PRE_PADDING, len,
				LWS_WRITE_TEXT);

		//sprintf(session->name, "start....");
		//log_d("n=%d", n);
	}
		break;
	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
		log_d("CONNECTION_ERROR");
		return -1;
		break;
	case LWS_CALLBACK_CLOSED:
		log_d("LWS_CALLBACK_CLOSED");
		break;
	case LWS_CALLBACK_WSI_DESTROY:
		log_d("LWS_CALLBACK_WSI_DESTROY");
		break;

	case LWS_CALLBACK_CLIENT_RECEIVE:
		log_d("session:%s, recv(len=%d)>%s", session->name, size, (char * ) in);
		{
			char *str = "hello world. recv";
			int len = strlen(str);
			unsigned char *out = (unsigned char *) malloc(
					sizeof(unsigned char) * (LWS_PRE + len));
			memcpy(out + LWS_PRE, str, len);

			n = lws_write(wsi, out + LWS_PRE, len, LWS_WRITE_BINARY);
			//log_d("n=%d", n);
			n = lws_callback_on_writable(wsi);
			//log_d("n=%d", n);
		}
		break;
	case LWS_CALLBACK_CLIENT_WRITEABLE: {
		//log_d("LWS_CALLBACK_CLIENT_WRITEABLE");
		char *str = "write.....";
		int len = strlen(str);
		unsigned char *out = (unsigned char *) malloc(
				sizeof(unsigned char) * (LWS_PRE + len));
		memcpy(out + LWS_PRE, str, len);

		n = lws_write(wsi, out + LWS_PRE, len, LWS_WRITE_BINARY);
		n = lws_callback_on_writable(wsi);
	}
		break;
	default:
		break;
	}

	return 0;
}
#ifdef WCLIENT
int main(int argc,char **argv) {
#else
int main_client() {
#endif
	const char *address = "127.0.0.1";
	struct lws_context *context = NULL;
	struct lws_context_creation_info info;
	struct lws *wsi = NULL;
	struct lws_protocols protocol;
//	struct sigaction act;
//	act.sa_handler = INT_HANDLER;
//	act.sa_flags = 0;
//	sigemptyset(&act.sa_mask);
//	sigaction( SIGINT, &act, 0);

	memset(&info, 0, sizeof info);
	info.port = CONTEXT_PORT_NO_LISTEN;
	info.iface = NULL;
	info.protocols = &protocol;
	info.ssl_cert_filepath = NULL;
	info.ssl_private_key_filepath = NULL;
	info.extensions = NULL;
	info.gid = -1;
	info.uid = -1;
	info.options = 0;

	protocol.name = "abc";
	protocol.callback = &ws_service_callback;
	protocol.per_session_data_size = sizeof(struct session_data);
	protocol.rx_buffer_size = 65535;
	protocol.id = 0;
	protocol.user = NULL;

	context = lws_create_context(&info);
	log_d("[Main] context created.");

	if (context == NULL) {
		log_d("[Main] context is NULL.");
		return -1;
	}

#if 0
	wsi = lws_client_connect(context, address, 1883, 0, "/123",
			"111:5000",
			NULL, protocol.name, -1);
#else
	struct lws_client_connect_info i;
	memset(&i, 0, sizeof(i));
	i.port = 8080;
	i.address = address;
	i.path = "abc";
	i.context = context;
	i.ssl_connection = 0;
	i.host = i.address;
	i.origin = i.address;
	i.ietf_version_or_minus_one = -1;
	i.client_exts = NULL;

	i.protocol = "abc";
	wsi = lws_client_connect_via_info(&i);
#endif

	if (wsi == NULL) {
		log_d("[Main] wsi create error.");
		return -1;
	}
	log_d("start....");

	int n = 0;
	while (n >= 0) {
		//当连接断开时需要重新连接在这使用标志位
		//if conflag==0  lws_client_connect_via_info
		n = lws_service(context, 50);
	}
	lws_context_destroy(context);
	log_d("close....");
	return EXIT_SUCCESS;
}
LWS_CALLBACK_CLIENT_ESTABLISHED 第一次连接 这是客户端
LWS_CALLBACK_CLIENT_RECEIVE 接收到数据 这是客户端
LWS_CALLBACK_CLIENT_WRITEABLE 可以发送数据了
LWS_CALLBACK_WSI_DESTROY 正在销毁,在这可以释放ESTABLISHED时malloc的数据

3.2 Server

/*
 ============================================================================
 Name        : test-websocket.c
 Author      : cj
 Version     :
 Copyright   : Your copyright notice
 Description : Hello World in C, Ansi-style
 ============================================================================
 */

#include <stdio.h>
#include <stdlib.h>
#include <libwebsockets.h>
#include "clog.h"
#include <pthread.h>

struct session_data {
	char name[100];
};

static int ws_service_callback(struct lws *wsi,
		enum lws_callback_reasons reason, void *user, void *in, size_t size) {
	int n;
	struct session_data *session = (struct session_data*) user;

	switch (reason) {
	case LWS_CALLBACK_GET_THREAD_ID:
		break;
	case LWS_CALLBACK_ESTABLISHED: {
		log_d("ESTABLISHED");
		char *str = "hello start..";
		int len = strlen(str);
		unsigned char *out = (unsigned char *) malloc(
				sizeof(unsigned char)
						* (LWS_SEND_BUFFER_PRE_PADDING + len
								+ LWS_SEND_BUFFER_POST_PADDING));
		memcpy(out + LWS_SEND_BUFFER_PRE_PADDING, str, len);

		n = lws_write(wsi, out + LWS_SEND_BUFFER_PRE_PADDING, len,
				LWS_WRITE_TEXT);

		sprintf(session->name, "start....");
	}
		break;
	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
		log_d("CONNECTION_ERROR");
		return -1;
		break;
	case LWS_CALLBACK_CLOSED:
		log_d("LWS_CALLBACK_CLOSED");
		break;
	case LWS_CALLBACK_WSI_DESTROY:
		log_d("LWS_CALLBACK_WSI_DESTROY");
		break;
	case LWS_CALLBACK_RECEIVE:
		//log_d("session:%s, recv(len=%d)>%s", session->name, size, (char * ) in);
		{
			char *str = "hello world. recv";
			int len = strlen(str);
			unsigned char *out = (unsigned char *) malloc(
					sizeof(unsigned char) * (LWS_PRE + len));
			memcpy(out + LWS_PRE, str, len);

			n = lws_write(wsi, out + LWS_PRE, len, LWS_WRITE_BINARY);
			n = lws_callback_on_writable(wsi);
		}
		break;
	case LWS_CALLBACK_SERVER_WRITEABLE: {
		//log_d("LWS_CALLBACK_CLIENT_WRITEABLE");
		char *str = "write....ser.";
		int len = strlen(str);
		unsigned char *out = (unsigned char *) malloc(
				sizeof(unsigned char) * (LWS_PRE + len));
		memcpy(out + LWS_PRE, str, len);

		n = lws_write(wsi, out + LWS_PRE, len, LWS_WRITE_BINARY);
		n = lws_callback_on_writable(wsi);
	}
		break;
	default:
		break;
	}

	return 0;
}

int main(void) {
	struct lws_context *context = NULL;
	struct lws_context_creation_info info;
	struct lws_protocols protocol;
//	struct sigaction act;
//	act.sa_handler = INT_HANDLER;
//	act.sa_flags = 0;
//	sigemptyset(&act.sa_mask);
//	sigaction( SIGINT, &act, 0);

	memset(&info, 0, sizeof info);
	info.port = 8080;
	info.iface = NULL;
	info.protocols = &protocol;
	info.ssl_cert_filepath = NULL;
	info.ssl_private_key_filepath = NULL;
	info.extensions = NULL;
	info.gid = -1;
	info.uid = -1;
	info.options = 0;

	protocol.name = "abc";
	protocol.callback = &ws_service_callback;
	protocol.per_session_data_size = sizeof(struct session_data);
	protocol.rx_buffer_size = 65535;
	protocol.id = 0;
	protocol.user = NULL;

	context = lws_create_context(&info);
	log_d("[Main] context created.");

	if (context == NULL) {
		log_d("[Main] context is NULL.");
		return -1;
	}

	log_d("start....");
	int n = 0;
	while (n >= 0) {
		n = lws_service(context, 50);
	}
	lws_context_destroy(context);

	return EXIT_SUCCESS;
}
LWS_CALLBACK_ESTABLISHED

收到客户端连接,在这里可以注册user (session_data) 到一个线程安全Lock的链表上

user中可以添加一个发送数据队列

LWS_CALLBACK_RECEIVE 收到客户端数据,后续需要写数据可以添加使用 lws_callback_on_writable 来处理写事件
LWS_CALLBACK_SERVER_WRITEABLE 可以写数据到客户端,添加 lws_callback_on_writable, 这时可以把session中存储的代发送数据发送出去
LWS_CALLBACK_WSI_DESTROY 释放数据开始 ,这里请取消注册到线程安全客户端队列里面
LWS_CALLBACK_CLOSED 有时候不会到这来,最好在DESTROY中处理

对于libwebsockets高级应用方面,我参考了

WebRTC+libwebsockets+Janus的秒开实践_一朵喇叭花压海棠的博客-CSDN博客_lws_callback_on_writable


里面说的 libwebsockets IO的优化

在这里可以参考一下mqtt服务器

mosquitto

C++ 处理封装逻辑


/*获取wsi的fd远程IP, 这样写差不多是为libwebsockets增加功能 */
static int lws_get_connectip(struct lws* wsi,
	char localip[20], int* localport,
	char remip[20], int* remport)
{
	lws_sockfd_type fd = lws_get_socket_fd(wsi);
	if (fd < 0)
	{
		perror("lws_get_socket_fd failed ! \n");
		return -1;
	}
	struct sockaddr_in sockAddr;
	socklen_t addrLen = sizeof(sockAddr);
	getsockname(fd, (struct sockaddr*)&sockAddr, &addrLen);
	inet_ntop(AF_INET, &sockAddr.sin_addr, localip, 20);
	*localport = ntohs(sockAddr.sin_port);
	addrLen = sizeof(sockAddr);
	getpeername(fd, (struct sockaddr*)&sockAddr, &addrLen); //得到远程IP地址和端口号
	inet_ntop(AF_INET, &sockAddr.sin_addr, remip, 20);		 //IPV4
	*remport = ntohs(sockAddr.sin_port);					 //对方的端口号
	return 0;
}

class BuffCache
{
	char* data;
	int size;
	int pos;
public:
	BuffCache()
	{
		data = NULL;
		size = 0;
		pos = 0;
	}

	~BuffCache()
	{
		if (data)
			free(data);
	}

	void addData(void* _data, int _size)
	{
		if (pos + _size > size)
		{
			size = pos + _size + 10;
			if (data)
			{
				data = (char*)realloc(data, size);
			}
			else
			{
				data = (char*)calloc(1, size);
			}
		}
		memcpy(data + pos, _data, _size);
		pos += size;
	}
	char* getData()
	{
		return data;
	}
	int getSize()
	{
		return pos;
	}
	void reset()
	{
		pos = 0;
	}

};

class WSMsg
{
	char* data;
	char* _ptr;
	size_t len;
public:
	typedef std::shared_ptr<WSMsg> Ptr;
	WSMsg()
	{
		data = NULL;
		len = 0;
	}

	~WSMsg()
	{
		if (data)
			free(data);
	}

	void setData(const void* _data, int size)
	{
		data = (char*)calloc(1, size + LWS_PRE + 20);

		if (data)
		{
			memcpy(data + LWS_PRE, _data, size);
			_ptr = data + LWS_PRE;
			len = size;
		}
	}

	void* getData()
	{
		return data + LWS_PRE;
	}

	size_t getLen()
	{
		return len;
	}

	static WSMsg::Ptr build(const void* data, int len)
	{
		WSMsg::Ptr msg = std::make_shared<WSMsg>();
		msg->setData(data, len);
		return msg;
	}

	static WSMsg::Ptr buildFromJSON(cJSON* json)
	{
		char* buff = cJSON_Print(json);
		WSMsg::Ptr msg = WSMsg::build(buff, strlen(buff));
		free(buff);
		return msg;
	}
};

static int _websocket_callback_app(struct lws* wsi,
	enum lws_callback_reasons reason, void* user, void* in, size_t len);

static struct lws_protocols protocols[] =
{
	//{ "http", lws_callback_http_dummy, 0, 0 },
	{
				"LxAppProtocol",								
				_websocket_callback_app,						/
				100, /
				1024 * 1024
	},							//500k
	/
	{ NULL, NULL, 0, 0 } /* terminator */
};

struct lws* wsi_connect_client(struct lws_context* context, const char* addr)
{
	struct lws_client_connect_info i;
	memset(&i, 0, sizeof(i));
	i.path = "/test";
	i.address = addr;
	i.port = 12345; //默认端口
	i.context = context;
	i.ssl_connection = 0;
	i.host = i.address;
	i.origin = i.address;
	i.ietf_version_or_minus_one = -1;
	i.client_exts = NULL;
	i.protocol = "xxxx";
	return lws_client_connect_via_info(&i);
}


//ws 会话
class WSSession
{
	struct lws* _wsi;
	int fd;
	char local_ip[20]; //本地
	int local_port;
	char rem_ip[20]; //远程IP
	int peer_port;	 //远程端口
	std::string uri;
	int conType;

	std::mutex _mtx;
	std::list<WSMsg::Ptr> _txMsgList; //发送消息列表
	bool _need_close;

	BuffCache bufCache;
public:
	typedef std::shared_ptr<WSSession> Ptr;

	typedef enum
	{
		//连接类型 //链接模式
		MyServer = 0, //我是服务器,别人连接我
		MyClient, //连接到外部,
	} ConnectType;

	WSSession()
	{
		data_type = WS_DATA_NONE;
		fd = 0;
		_need_close = false;
		memset(&lxProtoCtx, 0, sizeof(lxProtoCtx));
	}

	void set_lws(struct lws* wsi)
	{
		_wsi = wsi;
		fd = (int)lws_get_socket_fd(_wsi);
		//lws_get_protocol(wsi);
		lws_get_connectip(wsi, local_ip, &local_port, rem_ip, &peer_port);
		char buff[100];
		memset(buff, 0, sizeof(buff));
		lws_hdr_copy(wsi, buff, sizeof(buff), WSI_TOKEN_GET_URI);
		uri = buff;
		_need_close = false;
	}

	void setDeviceID(const char* id)
	{
		strcpy(lxProtoCtx.device_id, id);
	}

	struct lws* get_lws()
	{
		return _wsi;
	}

	void setConnectType(WSSession::ConnectType t)
	{
		conType = (int)t;
	}

	void debug()
	{
		std::cout << "fd=" << fd << std::endl;
		std::cout << "local_ip:" << local_ip << ":" << local_port << std::endl;
		std::cout << "rem_ip:" << rem_ip << ":" << peer_port << std::endl;
		std::cout << "uri:" << uri << std::endl;
	}
    
    //初始化处理
	void initSession();

	//接收消息
	void onRecv(void* in, int size, int is_final, int is_binary)
	{
		bufCache.addData(in, size);
		if (is_final)
		{
			char* buff = bufCache.getData();

			if (WS_DATA_TYPE_LXPROTO_JSON == data_type)
				onHandleLxProtoMsg(buff, bufCache.getSize());
			bufCache.reset();
		}
	}

	//处理lx协议消息
	int onHandleLxProtoMsg(const char* msg, int msglen);

	///
	void asyncClose()
	{
		_need_close = true;
	}

	bool isNeedClose()
	{
		return _need_close;
	}

	size_t sendMessage(WSMsg::Ptr& msg)
	{
		std::lock_guard<std::mutex> lock(_mtx);
		_txMsgList.push_back(msg);
		return _txMsgList.size();
	}

	size_t sendMessage(const void* data, int len)
	{
		std::lock_guard<std::mutex> lock(_mtx);
		WSMsg::Ptr msg = WSMsg::build(data, len);
		_txMsgList.push_back(msg);
		return _txMsgList.size();
	}

	size_t sendMessage(cJSON* json)
	{
		std::lock_guard<std::mutex> lock(_mtx);
		WSMsg::Ptr msg = WSMsg::buildFromJSON(json);
		_txMsgList.push_back(msg);
		return _txMsgList.size();
	}

	size_t msgCount()
	{
		std::lock_guard<std::mutex> lock(_mtx);
		return _txMsgList.size();
	}

	WSMsg::Ptr popFirstMsg()
	{
		std::lock_guard<std::mutex> lock(_mtx);
		if (_txMsgList.size() == 0)
		{
			return nullptr;
		}
		WSMsg::Ptr msg = _txMsgList.front();
		_txMsgList.pop_front();
		return msg;
	}

	bool checkMsg()
	{
		//判断的心跳消息
		if (msgCount() > 0 || isNeedClose())
		{
			//lws_callback_all_protocol
			lws_callback_on_writable(get_lws());
			return true;
		}
		return false;
	}
};

//Websocket Server
class WSServer
{
	std::atomic_bool _runflag;
	std::thread thread;
	int _listen_port;

	std::mutex _mtx;
	std::list<WSSession::Ptr> sessions;

	std::string _host_server;
	std::string _devid; //设备ID
	struct lws* _host_wsi;

	struct lws_context* _context;
public:
	WSServer()
	{
		_runflag = false;
		_listen_port = -1;
		_host_wsi = NULL;
		_context = NULL;
	}

	~WSServer()
	{
		if (_runflag)
		{
			stop();
			std::cout << __FUNCTION__ << std::endl;
		}
	}

	void setListenPort(int port)
	{
		_listen_port = port;
	}

	//是否运行中
	bool isRunning()
	{
		return _runflag;
	}

	//停止
	void stop()
	{
		if (_runflag)
		{
			_runflag = false;
			thread.join();
		}
	}

	//启动
	void start()
	{
		if (_runflag)
			throw std::runtime_error("is running...");
		_runflag = true;
		WSServer* serverPtr = this;
		thread = std::thread([serverPtr]
			{
				serverPtr->run();
			});
	}

	//添加需要连接的服务器
	void addConnect(const char* host, const char* devid)
	{
		_host_server = host;
		_devid = devid;
	}

	std::string& getDevID()
	{
		return _devid;
	}

	//连接失败
	void wsi_connect_error(struct lws* wsi)
	{
		_host_wsi = NULL;
	}

	//连接断开
	void wsi_distory(struct lws* wsi)
	{
		if (wsi == _host_wsi)
		{
			std::cout << "close _host_wsi" << std::endl;
			_host_wsi = NULL;
		}
		std::lock_guard<std::mutex> lock(_mtx);
		for (auto itr = sessions.begin(); itr != sessions.end();)
		{
			WSSession::Ptr s = (*itr);
			if (s->get_lws() == wsi)
				itr = sessions.erase(itr);
			else
				++itr;
		}
	}

	//添加一个会话
	void addSession(WSSession::Ptr& session)
	{
		std::lock_guard<std::mutex> lock(_mtx);
		sessions.push_back(session);
		LOGI("WSServer", "add session\n");

	}

	//广播一个消息
	void broadcastMessage(const void* data, int len)
	{
		std::lock_guard<std::mutex> lock(_mtx);
		for (auto itr = sessions.begin(); itr != sessions.end(); ++itr)
		{
			(*itr)->sendMessage(data, len);
		}
		if (sessions.size() > 0 && _context)
		{
			lws_cancel_service(_context);
		}
	}

	//检测发送消息
	void checkMsg()
	{
		std::lock_guard<std::mutex> lock(_mtx);
		for (auto itr = sessions.begin(); itr != sessions.end(); ++itr)
		{
			(*itr)->checkMsg();
		}
	}

	//如果this被释放会有问题哦..
	void run()
	{
		struct lws_context_creation_info info;
		struct lws_context* context = NULL;
		//struct lws_client_connect_info i;
		int ret = 0;

		memset(&info, 0, sizeof(info));
		info.port = CONTEXT_PORT_NO_LISTEN;
		info.port = _listen_port;
		info.iface = NULL;
		info.protocols = protocols;
		info.ssl_cert_filepath = NULL;
		info.ssl_private_key_filepath = NULL;
		info.extensions = NULL;//lws_get_internal_extensions();
		info.gid = -1;
		info.uid = -1;
		info.options = 0;
		//info.count_threads = 1;
		info.options = LWS_SERVER_OPTION_VALIDATE_UTF8;
		info.mounts = NULL;
		info.user = (void*)this;

		//是否启用服务器功能呢?
		context = lws_create_context(&info);
		if (!context)
		{
			perror("not create context");
			//lws_cancel_service(_lws_context_server);
			return;
		}
		_context = context;
		printf("user:%s\n", (char*)lws_context_user(context));

		while (!ret && isRunning())
		{
			//什么时候开始连接呢?
			if (_host_wsi == NULL && _host_server.length() > 0)
			{
				_host_wsi = wsi_connect_client(context, _host_server.c_str());
				LOGI("WSServer", "connect >>>> %p\n", _host_wsi);
			}
			//
			checkMsg();
			ret = lws_service(context, 1000);
		}
		lws_context_destroy(context);
		_context = NULL;
		LOGI("WSServer", "run exit\n");
	}
};

static int _websocket_callback_app(struct lws* wsi,
	enum lws_callback_reasons reason, void* user, void* in, size_t len)
{
    struct UserCtx
	{
		WSSession* session;
	};

	struct lws_context* ctx = lws_get_context(wsi);
	WSServer* wsServer = (WSServer*)lws_context_user(ctx);
	struct UserCtx* userCtx = (struct UserCtx*)user;

	switch ((int)reason)
	{
	case LWS_CALLBACK_PROTOCOL_INIT:
		break;
	case LWS_CALLBACK_WSI_CREATE: //user=NULL
								  //log_d("websocket", "LWS_CALLBACK_WSI_CREATE wsi_session=%p\n", wsi_session);
		break;
	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
		printf("服务器链接失败\r\n");
		if (wsServer)
			wsServer->wsi_connect_error(wsi);
		break;
	case LWS_CALLBACK_CLIENT_ESTABLISHED: //链接ws服务器成功,client mode
	case LWS_CALLBACK_ESTABLISHED:		  //webserver server模式时
	{
		printf("连接成功 \r\n");
		memset(userCtx, 0, sizeof(struct UserCtx));

		WSSession::Ptr session = std::make_shared<WSSession>();
		session->set_lws(wsi);
		if (reason == LWS_CALLBACK_CLIENT_ESTABLISHED)
			session->setConnectType(WSSession::ConnectType::MyClient);

		if (reason == LWS_CALLBACK_ESTABLISHED)
			session->setConnectType(WSSession::ConnectType::MyServer);

		session->debug();

		if (wsServer)
			wsServer->addSession(session);

		userCtx->session = session.get();

		std::string devID = wsServer->getDevID();

		session->setDeviceID(devID.c_str());
		session->initSession();

		//不能在这里发消息...........
		//lws_callback_on_writable(wsi);
		break;
	}
	case LWS_CALLBACK_CLOSED:
		printf("LWS_CALLBACK_CLOSED:链接断开:%p\n", wsi);
		break;
	case LWS_CALLBACK_WSI_DESTROY:
	{
		printf("LWS_CALLBACK_WSI_DESTROY:%p\n", wsi);
		if (wsServer)
			wsServer->wsi_distory(wsi);
		return -1;
	}
	break;
	/接收-发送/
	case LWS_CALLBACK_CLIENT_RECEIVE: //收到数据
	case LWS_CALLBACK_RECEIVE:		  //服务器模式接收
	{
		printf(">>>接收数据,%s isfinal:%d, 长度:%d\n",
			lws_frame_is_binary(wsi) ? "bin" : "txt",
			lws_is_final_fragment(wsi),
			len);
		userCtx->session->onRecv(in,
			len,
			lws_is_final_fragment(wsi),
			lws_frame_is_binary(wsi));
	}
	break;
	case LWS_CALLBACK_SERVER_WRITEABLE: //服务端写入
	case LWS_CALLBACK_CLIENT_WRITEABLE: //客户端写入
	{
		//作为服务器来发送消息时,text会被加密,固不能使用buff缓存
		//如果发送队列阻塞则不能发送消息.....

		WSMsg::Ptr msg = userCtx->session->popFirstMsg();
		if (msg)
		{
			char* ptr = (char*)(msg->getData());
			printf("发送消息:[%s], len=%d\n", ptr, msg->getLen());
			int ret = lws_write(wsi, (unsigned char*)ptr, msg->getLen(), LWS_WRITE_BINARY);
			//LWS_WRITE_BINARY);
			printf("发送:ret=%d\n", ret);
		}
		if (userCtx->session->isNeedClose())
			return -1;
#if 0
		{
			uint8_t ping[LWS_PRE + 125];
			int m, n = 0;

			n = lws_snprintf((char*)ping + LWS_PRE, 125,
				"ping body!");
			lwsl_user("Sending PING %d...\n", n);
			m = lws_write(wsi, ping + LWS_PRE, n, LWS_WRITE_PING);
			if (m < n) {
				lwsl_err("sending ping failed: %d\n", m);
				return -1;
			}
		}
#endif

	}
	break;
	}
	return 0;
}

void WebSocketServerTest()
	{
		WSServer server1;
		WSServer server2;

		server1.setListenPort(11258);

		char id[50];
		strcpy(id, "112233445677");

		server1.start();
		server2.addConnect("192.168.0.189", id);
		server2.start();

		const char* msg = "hello xxxxxx,ccccc";

		for (int i = 0; i < 100; i++)
		{
			Sleep(1*1000);
			server1.broadcastMessage(msg, strlen(msg));
			//server2.broadcastMessage(msg, strlen(msg));
		}

		server1.stop();
		server2.stop();

		LOGI("test", "stop\n");
	}



版权声明:本文为cjqqschoolqq原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。