本文基于socket通讯,以及lock锁机制来初步实现jms的异步队列。
设计分成三个部分,分别是信息队列管理类,服务端类和客户端类。
Buffer(信息队列管理类)
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Buffer {
private static Queue<Object> queue = new LinkedList<Object>();
private static int INITSIZE = 2;
private Lock mutex;
private Condition condition;
private Buffer(){
mutex = new ReentrantLock();
condition = mutex.newCondition();
}
public static Buffer getIntance(){
return QueueBuffer.instance;
}
static class QueueBuffer{
private static Buffer instance = new Buffer();
}
public void setInitSize(int size){
INITSIZE = size;
}
public void produce(String msg){
mutex.lock();
try {
while(queue.size() >= INITSIZE ){
System.out.println("queue wait to consume");
condition.await();
}
queue.offer(msg);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
condition.signalAll();
mutex.unlock();
}
}
public Object consume(){
mutex.lock();
try {
while (queue.size() == 0) {
System.out.println("queue wait to produce");
condition.await();
}
return queue.poll();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
} finally {
condition.signalAll();
mutex.unlock();
}
}
public int getQueueSize(){
return queue.size();
}
}
Server (服务端类)
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.commons.lang3.StringUtils;
public class Server extends Thread{
private Socket socket;
public Server(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader reader;
try {
InputStream in = socket.getInputStream();
reader = new BufferedReader(
new InputStreamReader(
in));
handle(socket,reader.readLine());
reader.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void handle(Socket socket, String msg) throws IOException{
PrintWriter pw = new PrintWriter(socket.getOutputStream());
if (StringUtils.isNotBlank(msg)) {
if (msg.contains("add")) {
msg = msg.substring(msg.indexOf("add")+4);
Buffer.getIntance().produce(msg);
pw.write("server:add "+ msg +" to queue successfully");
}else if(msg.contains("poll")){
String consumeMsg = (String) Buffer.getIntance().consume();
pw.write("server:remove "+ consumeMsg +" from queue successfully");
}else if(msg.contains("size")){
pw.write("server:size is "+ Buffer.getIntance().getQueueSize());
}else{
pw.write("server:no such command");
}
}else{
pw.write("server:blank message");
}
pw.flush();
socket.shutdownOutput();
pw.close();
}
@SuppressWarnings("resource")
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(6666);
while(true){
new Server(serverSocket.accept()).start();
}
}
}
Client (客户端类)
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;
public class Client {
private Socket socket;
private String serverIP;
private int port;
public Client(String serverIP,int port) throws UnknownHostException, IOException{
this.serverIP = serverIP;
this.port = port;
}
public void run() throws IOException{
while (true) {
socket = new Socket(serverIP, port);
input();
}
}
@SuppressWarnings("resource")
public void input() throws IOException{
Scanner scanner = new Scanner(System.in);
String servermsg = scanner.nextLine();
PrintWriter pw = new PrintWriter(socket.getOutputStream());
pw.write(servermsg);
pw.flush();
socket.shutdownOutput();
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String msg;
while ((msg = br.readLine()) != null) {
System.out.println(msg);
}
pw.close();
br.close();
}
public static void main(String[] args) throws UnknownHostException, IOException {
Client c = new Client("127.0.0.1",6666);
c.run();
}
}
版权声明:本文为Ant_Shen原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。