【Java并发编程】常见的并发队列及使用方式

  • Post author:
  • Post category:java


当多个线程同时访问共享资源时,为了保证数据的正确性和避免出现死锁、饥饿等问题,我们通常需要使用并发队列。

在Java中,常用的并发队列有以下几种:

BlockingQueue

阻塞队列,适用于生产者-消费者模式。线程可以在插入或取出元素时自动阻塞或等待,以达到线程间的同步。常用实现类有ArrayBlockingQueue和LinkedBlockingQueue等。

使用场景

适用于生产者-消费者模式或异步处理任务的场景,其中,ArrayBlockingQueue具有有限的容量,而LinkedBlockingQueue则可以选择是否限制容量。

示例代码

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
        new Thread(new Producer(blockingQueue)).start();
        new Thread(new Consumer(blockingQueue)).start();
    }
}

class Producer implements Runnable {
    private final BlockingQueue<String> blockingQueue;
    private int count;

    public Producer(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                String data = "data-" + count++;
                blockingQueue.put(data);
                System.out.println(Thread.currentThread().getName() + " produce " + data);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable {
    private final BlockingQueue<String> blockingQueue;

    public Consumer(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                String data = blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + " consume " + data);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

ConcurrentLinkedQueue

无界队列,采用非阻塞算法实现,适用于高并发环境下的队列操作。由于该队列不保证元素的顺序性,因此不支持阻塞操作。

使用场景

常用于多个线程之间进行任务分配和处理。

示例代码

import java.util.concurrent.ConcurrentLinkedQueue;

public class ConcurrentLinkedQueueDemo {
    public static void main(String[] args) {
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }
}

class Producer implements Runnable {
    private final ConcurrentLinkedQueue<String> queue;
    private int count;

    public Producer(ConcurrentLinkedQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            String data = "data-" + count++;
            queue.offer(data);
            System.out.println(Thread.currentThread().getName() + " produce " + data);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable {
    private final ConcurrentLinkedQueue<String> queue;

    public Consumer(ConcurrentLinkedQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            String data = queue.poll();
            if (data != null) {
                System.out.println(Thread.currentThread().getName() + " consume " + data);
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

PriorityBlockingQueue

基于优先级的阻塞队列,适用于按优先级处理任务的场景。元素可根据其自然排序或自定义排序规则进行插入,并具有阻塞特性。

使用场景

适用于按照优先级处理任务的场景。

示例代码

import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueDemo {
    public static void main(String[] args) {
        PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }
}

class Task implements Comparable<Task> {
    private final int priority;
    private final String name;

    public Task(int priority, String name) {
        this.priority = priority;
        this.name = name;
    }

    @Override
    public int compareTo(Task o) {
        return Integer.compare(priority, o.priority);
    }

    @Override
    public String toString() {
        return String.format("%s-%d", name, priority);
    }
}

class Producer implements Runnable {
    private final PriorityBlockingQueue<Task> queue;
    private int count;

    public Producer(PriorityBlockingQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            Task task = new Task(count % 10, "task-" + count++);
            queue.offer(task);
            System.out.println(Thread.currentThread().getName() + " produce " + task);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable {
    private final PriorityBlockingQueue<Task> queue;

    public Consumer(PriorityBlockingQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Task task = queue.take();
                System.out.println(Thread.currentThread().getName() + " consume " + task);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

SynchronousQueue

同步队列,适用于直接传递任务的场景。在该队列中,插入操作必须与删除操作配对,否则线程会被阻塞。

使用场景

适用于直接传递任务的场景,例如线程池中的任务执行。

import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueDemo {
    public static void main(String[] args) {
        SynchronousQueue<String> queue = new SynchronousQueue<>();
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }
}

class Producer implements Runnable {
    private final SynchronousQueue<String> queue;
    private int count;

    public Producer(SynchronousQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            String data = "data-" + count++;
            try {
                queue.put(data);
                System.out.println(Thread.currentThread().getName() + " produce " + data);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable {
    private final SynchronousQueue<String> queue;

    public Consumer(SynchronousQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                String data = queue.take();
                System.out.println(Thread.currentThread().getName() + " consume " + data);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

以上是Java中几种常用的并发队列,根据不同的使用场景可以选择不同的实现类。在使用时需要注意线程安全问题,避免并发操作产生数据不一致等问题。



版权声明:本文为qq845175537原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。