Springboot整合socket服务端

  • Post author:
  • Post category:其他


使用springboot框架实现socket通信的服务端,接收客户端发来的消息。

随项目启动而启动。

package com.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Set;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.pojo.Topic;

/**
 * nio socket服务端
 */
@Component
public class SocketServer {
	
	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	// 解码buffer
	private Charset cs = Charset.forName("UTF-8");

	// 接受数据缓冲区
	private static ByteBuffer sBuffer = ByteBuffer.allocate(1024);

	// 发送数据缓冲区
	private static ByteBuffer rBuffer = ByteBuffer.allocate(1024);

	// 选择器
	private static Selector selector;

	/**
	 * 启动socket服务,开启监听
	 * 
	 * @param port
	 */
	public void startSocketServer(int port) {
		try {
			// 打开通信通道
			ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
			// 设置为非阻塞
			serverSocketChannel.configureBlocking(false);
			// 获取套接字
			ServerSocket serverSocket = serverSocketChannel.socket();
			// 绑定端口号
			serverSocket.bind(new InetSocketAddress(port));
			// 打开监听
			selector = Selector.open();
			// 将通信信道注册到监听器
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
			// 监听器一直监听,如果客户端有请求就会进入响应的时间处理
			while (true) {
				selector.select(); // select()一直阻塞直到相关事件发生或超时
				Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 监听到事件
				for (SelectionKey key : selectionKeys) {
					handle(key);
				}
				selectionKeys.clear(); // 清除处理过的事件
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 处理不同事件
	 * 
	 * @param selectionKey
	 * @throws IOException
	 */
	private void handle(SelectionKey selectionKey) throws IOException {
		ServerSocketChannel serverSocketChannel = null;
		SocketChannel socketChannel = null;
		String requestMsg = "";
		int count = 0;
		if (selectionKey.isAcceptable()) {
			// 每有客户端连接,即注册通信信道为可读
			serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
			socketChannel = serverSocketChannel.accept();
			socketChannel.configureBlocking(false);
			socketChannel.register(selector, SelectionKey.OP_READ);
			System.out.println("客户端连接成功");
		} else if (selectionKey.isReadable()) {
			socketChannel = (SocketChannel) selectionKey.channel();
			rBuffer.clear();
			count = socketChannel.read(rBuffer);
			// 读取数据
			if (count > 0) {
				rBuffer.flip();
				requestMsg = String.valueOf(cs.decode(rBuffer).array());
				String responseMsg = "客户端消息:" + requestMsg;
				System.out.println(responseMsg);
				sendKafka(requestMsg);
			}
			// 返回数据
//			sBuffer = ByteBuffer.allocate(responseMsg.getBytes().length);
//			sBuffer.put(responseMsg.getBytes());
//			sBuffer.flip();
//			socketChannel.write(sBuffer);
//			socketChannel.close();
		}
	}
	
	/**
	 * 将客户端发送的消息写入kafka队列
	 * @param msg
	 * @throws JsonMappingException
	 * @throws JsonProcessingException
	 */
	private void sendKafka(String msg) throws JsonMappingException, JsonProcessingException {
		ObjectMapper objectMapper = new ObjectMapper();
		Topic topic = objectMapper.readValue(msg, Topic.class);
		kafkaTemplate.send(topic.getTopic(), String.valueOf(topic.getdValue()));
	}
}

启动类

package com;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

import com.socket.SocketServer;
import com.util.SpringUtil;

@SpringBootApplication
@ComponentScan("com")
public class KafkaStartApplication {

	private final static Logger logger = LoggerFactory.getLogger(KafkaStartApplication.class);
	public static void main(String[] args) throws Exception {
		SpringApplication.run(KafkaStartApplication.class, args);
		SocketServer server = new SocketServer();
		server.startSocketServer(8888);
	}
}



版权声明:本文为qq_41841482原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。