guava中EvictingQueue使用与改进

  • Post author:
  • Post category:其他




一、简介

因为业务有一些服务器在国外,网络非常不稳定,执行http请求的时候波动很大。所以我们希望在网络变慢的时候通过http代理切换到其他服务器发送http请求。

如何界定变慢呢?

如果,最近N次执行http请求,执行时间超过阈值T的次数大于等于M,则认为当前网络慢。

所以我们需要保存最近N次执行http请求的时间,首先确定的是一个FIFO队列,有需要固定大小,为了避免重复造轮子,先决定使用guava的EvictingQueue。



二、EvictingQueue基本使用

@Test
public void testEvictingQueue(){
    EvictingQueue<Integer> queue = EvictingQueue.create(5);
    for (int i = 0; i < 10; i++) {
        queue.add(i);
        System.out.println(String.format("当前队列大小:%d,队列中元素:%s",queue.size(),StringUtils.join(queue.iterator(), ',')));
    }
}

结果

EvictingQueue通过create创建,参数是队列中最多存储的元素个数,超过这个数之后,就会开始移除最先加入的元素。



三、EvictingQueue的问题

EvictingQueue的问题是他是非线程安全的,看EvictingQueue的源码就知道,它其实就是封装的ArrayDeque。

可以通过下面的代码非常轻易的重现EvictingQueue的并发访问问题。

import com.google.common.collect.EvictingQueue;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;

import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class EvictingQueueTest {

    @Test
    public void testEvictingQueue(){
        EvictingQueue<Integer> queue = EvictingQueue.create(5);
        for (int i = 0; i < 10; i++) {
            queue.add(i);
            System.out.println(String.format("当前队列大小:%d,队列中元素:%s",queue.size(),StringUtils.join(queue.iterator(), ',')));
        }
    }

    public static void main(String[] arg){
        EvictingQueue<Integer> queue = EvictingQueue.create(10);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //十个生产线程不断向队列中写数据
        for(int i=0;i<10;i++){
            executorService.submit(new Producer(queue));
        }
        //一个生产线程不断去检测队列中满足条件的个数
        new Thread(new Consumer(queue)).start();
    }

    private static class Producer implements Runnable{

        private EvictingQueue<Integer> queue;

        public Producer(EvictingQueue<Integer> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            Random random = new Random();
            while (true){
                queue.add(random.nextInt(100));
            }
        }
    }

    private static class Consumer implements Runnable{

        private EvictingQueue<Integer> queue;

        public Consumer(EvictingQueue<Integer> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            while (true){
                int count = 0;
                Iterator<Integer> iterator = queue.iterator();
                while (iterator.hasNext()){
                    Integer integer = iterator.next();
                    if(integer < 50){
                        count++;
                    }
                }
                System.out.println("count:" + count);
            }
        }
    }
}



四、EvictingQueue改进方案

为了处理EvictingQueue的并发访问问题,我们自己写了一个类来解决这个问题

因为主要是为了统计监控,并不要求数据绝对准确,所以并没有使用synchronize之类同步,需要可以自己加上。

import java.io.Serializable;
import java.util.function.Predicate;

public class EvictingArray<T> implements Serializable {

    private static final long serialVersionUID = 0L;

    private Object[] elements;

    private int index;

    private int capacity;

    private int size;

    public static EvictingArray create(int capacity){
        return new EvictingArray(capacity);
    }

    public EvictingArray(int capacity) {
        this.elements = new Object[capacity];
        this.capacity = capacity;
        this.size = 0;
        this.index = 0;
    }

    public void add(T element){
        elements[index++ % capacity] = element;
        if(size < capacity){
            size++;
        }
    }

    public void clear(){
//        Arrays.fill(elements,null);
        this.size = 0;
        this.index = 0;
    }

    //获取满足条件的元素个数
    public int getQualifiedNums(Predicate<T> predicate){
        int num = 0;
        for(Object ele : elements){
            if(predicate.test((T) ele)){
                num++;
            }
        }
        return num;
    }

    public int getSize() {
        return size;
    }
}

测试EvictingArray:


import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Predicate;

public class EvictingArrayTest {

    public static void main(String[] arg){
        EvictingArray<Integer> queue = EvictingArray.create(10);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for(int i=0;i<10;i++){
            executorService.submit(new Producer(queue));
        }
        new Thread(new Consumer(queue)).start();
    }

    private static class Producer implements Runnable{

        private EvictingArray<Integer> array;

        public Producer(EvictingArray<Integer> array) {
            this.array = array;
        }

        @Override
        public void run() {
            Random random = new Random();
            while (true){
                array.add(random.nextInt(100));
            }
        }
    }

    private static class Consumer implements Runnable{

        private EvictingArray<Integer> array;

        public Consumer(EvictingArray<Integer> array) {
            this.array = array;
        }

        @Override
        public void run() {
            Predicate predicate = new Predicate<Integer>(){
                @Override
                public boolean test(Integer integer) {
                    return integer < 50;
                }
            };
            while (true){
                System.out.println("count:" + array.getQualifiedNums(predicate));
            }
        }
    }
}



版权声明:本文为trayvontang原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。