socket 简单实现jms(消费者生产者模型)

  • Post author:
  • Post category:其他


本文基于socket通讯,以及lock锁机制来初步实现jms的异步队列。

设计分成三个部分,分别是信息队列管理类,服务端类和客户端类。


Buffer(信息队列管理类)

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Buffer {
    private static Queue<Object> queue = new LinkedList<Object>();

    private static int INITSIZE = 2;

    private Lock mutex;

    private Condition condition;

    private Buffer(){
        mutex = new ReentrantLock();
        condition = mutex.newCondition();
    }

    public static Buffer getIntance(){
        return QueueBuffer.instance;
    }

    static class QueueBuffer{
        private static Buffer instance = new Buffer();
    }

    public void setInitSize(int size){
        INITSIZE = size;
    }

    public void produce(String msg){
        mutex.lock();
        try {
            while(queue.size() >= INITSIZE ){
                System.out.println("queue wait to consume");
                condition.await();
            }

            queue.offer(msg);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            condition.signalAll();
            mutex.unlock();
        }

    }

    public Object consume(){
        mutex.lock();
        try {
            while (queue.size() == 0) {
                System.out.println("queue wait to produce");
                condition.await();
            }

            return queue.poll();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            return null;
        } finally {
            condition.signalAll();
            mutex.unlock();
        }
    }

    public int getQueueSize(){
        return queue.size();
    }
}


Server (服务端类)

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

import org.apache.commons.lang3.StringUtils;

public class Server extends Thread{

    private Socket socket;

    public Server(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        BufferedReader reader;
        try {
            InputStream in = socket.getInputStream();
            reader = new BufferedReader(
                    new InputStreamReader(
                            in));
            handle(socket,reader.readLine());
            reader.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    private void handle(Socket socket, String msg) throws IOException{
        PrintWriter pw = new PrintWriter(socket.getOutputStream());
        if (StringUtils.isNotBlank(msg)) {
            if (msg.contains("add")) {
                msg = msg.substring(msg.indexOf("add")+4);
                Buffer.getIntance().produce(msg);
                pw.write("server:add "+ msg +" to queue successfully");
            }else if(msg.contains("poll")){
                String consumeMsg = (String) Buffer.getIntance().consume();
                pw.write("server:remove "+ consumeMsg +" from queue successfully");
            }else if(msg.contains("size")){
                pw.write("server:size is "+ Buffer.getIntance().getQueueSize());
            }else{
                pw.write("server:no such command");
            }
        }else{
            pw.write("server:blank message");
        }
        pw.flush();
        socket.shutdownOutput();
        pw.close();
    }

    @SuppressWarnings("resource")
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(6666);
        while(true){
            new Server(serverSocket.accept()).start();
        }

    }
}


Client (客户端类)

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;


public class Client {
    private Socket socket;

    private String serverIP;

    private int port;

    public Client(String serverIP,int port) throws UnknownHostException, IOException{
        this.serverIP = serverIP;
        this.port = port;
    }

    public void run() throws IOException{
        while (true) {
            socket = new Socket(serverIP, port);
            input();
        }
    }

    @SuppressWarnings("resource")
    public void input() throws IOException{
        Scanner scanner = new Scanner(System.in);
        String servermsg = scanner.nextLine();
        PrintWriter pw = new PrintWriter(socket.getOutputStream());
        pw.write(servermsg);
        pw.flush();
        socket.shutdownOutput();
        BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        String msg;
        while ((msg = br.readLine()) != null) {
            System.out.println(msg);
        }
        pw.close();
        br.close();
    }

    public static void main(String[] args) throws UnknownHostException, IOException {
        Client c = new Client("127.0.0.1",6666);
        c.run();
    }

}



版权声明:本文为Ant_Shen原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。