Фреймворк Freehand — слияние запросов в среде с высокой степенью параллелизма

Java

Оригинальный адрес:у-у-у-у. xilidu.com/2018/01/22/…

В системе с высоким параллелизмом мы часто сталкиваемся с таким требованием: система генерирует большое количество запросов, но требования к реальному времени этих запросов не высоки. Мы можем объединить эти запросы, и когда будет достигнуто определенное количество, мы отправим их единообразно. Максимально используйте системный ввод-вывод для повышения пропускной способности системы.

Таким образом, структура слияния запросов должна учитывать следующие два требования:

  1. Отправлять данные, когда запрос набирает определенную сумму
  2. По прошествии времени, если запрос не доходит до указанного числа, также отправить

Поговорим о том, как добиться такого требования.

Прочитав эту статью, вы узнаете:

  • ScheduledThreadPoolExecutor
  • очередь блокировки
  • потокобезопасные параметры
  • Использование LockSupport

Идеи дизайна и воплощение

Давайте поговорим о конкретном представлении о том, как этого добиться. Я надеюсь, что вы сможете научиться некоторым процедурам для анализа проблем и разработки модулей.

  1. Какая структура данных используется для хранения запросов, которые необходимо объединить?
    • Поскольку наша система используется в среде с высокой степенью параллелизма, мы не должны использовать ее, как обычно.ArrayListдержать. Мы можем использовать блокирующие очереди для хранения запросов, которые необходимо объединить.
    • Наша структура данных должна предоставлять метод add() снаружи для отправки данных. Когда добавляются внешние данные, нам нужно проверить, не достигло ли количество данных в очереди нашего предела? Отправьте данные, когда число будет достигнуто, и продолжайте ждать, если оно не будет достигнуто.
    • В структуре данных также должен быть предусмотрен метод timeOut(). Внешний таймер регулярно вызывает этот метод timeOut. Если метод вызывается, он напрямую отправляет данные на удаленное устройство.
    • Когда условия выполняются, поток выполняет действие отправки, когда условия не выполняются, поток должен приостановиться и ждать, пока очередь не достигнет условий для отправки данных. Таким образом, мы можем рассмотреть возможность использованияLockSuppor.park()иLockSuppor.unparkдля приостановки и активации рабочего потока.

После проведенного выше анализа имеем такую ​​структуру данных:

private static class FlushThread<Item> implements Runnable{

        private final String name;

        //队列大小
        private final int bufferSize;
        //操作间隔
        private int flushInterval;

        //上一次提交的时间。
        private volatile long lastFlushTime;
        private volatile Thread writer;

        //持有数据的阻塞队列
        private final BlockingQueue<Item> queue;

        //达成条件后具体执行的方法
        private final Processor<Item> processor;

        //构造函数
        public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor<Item> processor) {
            this.name = name;
            this.bufferSize = bufferSize;
            this.flushInterval = flushInterval;
            this.lastFlushTime = System.currentTimeMillis();
            this.processor = processor;

            this.queue = new ArrayBlockingQueue<>(queueSize);

        }

        //外部提交数据的方法
        public boolean add(Item item){
            boolean result = queue.offer(item);
            flushOnDemand();
            return result;
        }

        //提供给外部的超时方法
        public void timeOut(){
            //超过两次提交超过时间间隔
            if(System.currentTimeMillis() - lastFlushTime >= flushInterval){
                start();
            }
        }
        
        //解除线程的阻塞
        private void start(){
            LockSupport.unpark(writer);
        }

        //当前的数据是否大于提交的条件
        private void flushOnDemand(){
            if(queue.size() >= bufferSize){
                start();
            }
        }

        //执行提交数据的方法
        public void flush(){
            lastFlushTime = System.currentTimeMillis();
            List<Item> temp = new ArrayList<>(bufferSize);
            int size = queue.drainTo(temp,bufferSize);
            if(size > 0){
                try {
                    processor.process(temp);
                }catch (Throwable e){
                    log.error("process error",e);
                }
            }
        }

        //根据数据的尺寸和时间间隔判断是否提交
        private boolean canFlush(){
            return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval;
        }

        @Override
        public void run() {
            writer = Thread.currentThread();
            writer.setName(name);

            while (!writer.isInterrupted()){
                while (!canFlush()){
                    //如果线程没有被打断,且不达到执行的条件,则阻塞线程
                    LockSupport.park(this);
                }
                flush();
            }

        }

    }
  1. Как добиться своевременной подачи?

Обычно, когда мы сталкиваемся с требованиями, связанными со временем, первое, что приходит на ум, — это использоватьScheduledThreadPoolExecutorПришло время вызвать метод timeOut класса FlushThread, если вы думаете оThread.sleep()... тогда вам нужно больше учиться и больше смотреть на исходный код.

  1. Как еще улучшить пропускную способность системы?

мы используемFlushThreadДостигнутоRunnableТаким образом, мы можем рассмотреть возможность использования пула потоков для хранения несколькихFlushThread.

Итак, у нас есть такой код:


public class Flusher<Item> {

    private final FlushThread<Item>[] flushThreads;

    private AtomicInteger index;

    //防止多个线程同时执行。增加一个随机数间隔
    private static final Random r = new Random();

    private static final int delta = 50;

    private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1);

    private static ExecutorService POOL = Executors.newCachedThreadPool();

    public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor<Item> processor) {

        this.flushThreads = new FlushThread[threads];


        if(threads > 1){
            index = new AtomicInteger();
        }

        for (int i = 0; i < threads; i++) {
            final FlushThread<Item> flushThread = new FlushThread<Item>(name+ "-" + i,bufferSiz,flushInterval,queueSize,processor);
            flushThreads[i] = flushThread;
            POOL.submit(flushThread);
            //定时调用 timeOut()方法。
            TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS);
        }
    }

    // 对 index 取模,保证多线程都能被add
    public boolean add(Item item){
        int len = flushThreads.length;
        if(len == 1){
            return flushThreads[0].add(item);
        }

        int mod = index.incrementAndGet() % len;
        return flushThreads[mod].add(item);

    }

    //上文已经描述
    private static class FlushThread<Item> implements Runnable{
        ...省略
    }
}

  1. Интерфейсно-ориентированное программирование для улучшения масштабируемости системы:
public interface Processor<T> {
    void process(List<T> list);
}

использовать

Давайте напишем тестовый метод, чтобы проверить это:

//实现 Processor 将 String 全部输出
public class PrintOutProcessor implements Processor<String>{
    @Override
    public void process(List<String> list) {

        System.out.println("start flush");

        list.forEach(System.out::println);

        System.out.println("end flush");
    }
}


public class Test {

    public static void main(String[] args) throws InterruptedException {

        Flusher<String> stringFlusher = new Flusher<>("test",5,1000,30,1,new PrintOutProcessor());

        int index = 1;
        while (true){
            stringFlusher.add(String.valueOf(index++));
            Thread.sleep(1000);
        }
    }
}

Результат выполнения:


start flush
1
2
3
end flush
start flush
4
5
6
7
end flush

Мы обнаружили, что флеш не срабатывал до достижения 5 номеров. Потому что срабатывает отправка тайм-аута, хотя указанное 5 не было достигнуто данные, но сброс все еще выполняется.

Если мы удалимThread.sleep(1000);Посмотрите еще раз на результат:

start flush
1
2
3
4
5
end flush
start flush
6
7
8
9
10
end flush

Отправляйте каждые 5 отсчетов. Идеально. . . .

Суммировать

Более яркий пример, объясняющий некоторые особенности использования многопоточности. Чтобы изучить многопоточность, вы должны больше думать и делать больше практических действий, и тогда у вас будут лучшие результаты. Я надеюсь, что вы получите что-то после прочтения этой статьи, добро пожаловать на обмен.

адрес гитхаба:GitHub.com/tune центр 007…

Адрес статьи из серии рамок от руки:

Фреймворк Freehand — реализация IoC

Фреймворк Freehand - реализация AOP