使用springboot框架实现socket通信的服务端,接收客户端发来的消息。
随项目启动而启动。
package com.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.pojo.Topic;
/**
* nio socket服务端
*/
@Component
public class SocketServer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 解码buffer
private Charset cs = Charset.forName("UTF-8");
// 接受数据缓冲区
private static ByteBuffer sBuffer = ByteBuffer.allocate(1024);
// 发送数据缓冲区
private static ByteBuffer rBuffer = ByteBuffer.allocate(1024);
// 选择器
private static Selector selector;
/**
* 启动socket服务,开启监听
*
* @param port
*/
public void startSocketServer(int port) {
try {
// 打开通信通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 获取套接字
ServerSocket serverSocket = serverSocketChannel.socket();
// 绑定端口号
serverSocket.bind(new InetSocketAddress(port));
// 打开监听
selector = Selector.open();
// 将通信信道注册到监听器
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 监听器一直监听,如果客户端有请求就会进入响应的时间处理
while (true) {
selector.select(); // select()一直阻塞直到相关事件发生或超时
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 监听到事件
for (SelectionKey key : selectionKeys) {
handle(key);
}
selectionKeys.clear(); // 清除处理过的事件
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 处理不同事件
*
* @param selectionKey
* @throws IOException
*/
private void handle(SelectionKey selectionKey) throws IOException {
ServerSocketChannel serverSocketChannel = null;
SocketChannel socketChannel = null;
String requestMsg = "";
int count = 0;
if (selectionKey.isAcceptable()) {
// 每有客户端连接,即注册通信信道为可读
serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接成功");
} else if (selectionKey.isReadable()) {
socketChannel = (SocketChannel) selectionKey.channel();
rBuffer.clear();
count = socketChannel.read(rBuffer);
// 读取数据
if (count > 0) {
rBuffer.flip();
requestMsg = String.valueOf(cs.decode(rBuffer).array());
String responseMsg = "客户端消息:" + requestMsg;
System.out.println(responseMsg);
sendKafka(requestMsg);
}
// 返回数据
// sBuffer = ByteBuffer.allocate(responseMsg.getBytes().length);
// sBuffer.put(responseMsg.getBytes());
// sBuffer.flip();
// socketChannel.write(sBuffer);
// socketChannel.close();
}
}
/**
* 将客户端发送的消息写入kafka队列
* @param msg
* @throws JsonMappingException
* @throws JsonProcessingException
*/
private void sendKafka(String msg) throws JsonMappingException, JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
Topic topic = objectMapper.readValue(msg, Topic.class);
kafkaTemplate.send(topic.getTopic(), String.valueOf(topic.getdValue()));
}
}
启动类
package com;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import com.socket.SocketServer;
import com.util.SpringUtil;
@SpringBootApplication
@ComponentScan("com")
public class KafkaStartApplication {
private final static Logger logger = LoggerFactory.getLogger(KafkaStartApplication.class);
public static void main(String[] args) throws Exception {
SpringApplication.run(KafkaStartApplication.class, args);
SocketServer server = new SocketServer();
server.startSocketServer(8888);
}
}
版权声明:本文为qq_41841482原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。