Оригинальный адрес:у-у-у-у. xilidu.com/2018/01/22/…
В системе с высоким параллелизмом мы часто сталкиваемся с таким требованием: система генерирует большое количество запросов, но требования к реальному времени этих запросов не высоки. Мы можем объединить эти запросы, и когда будет достигнуто определенное количество, мы отправим их единообразно. Максимально используйте системный ввод-вывод для повышения пропускной способности системы.
Таким образом, структура слияния запросов должна учитывать следующие два требования:
- Отправлять данные, когда запрос набирает определенную сумму
- По прошествии времени, если запрос не доходит до указанного числа, также отправить
Поговорим о том, как добиться такого требования.
Прочитав эту статью, вы узнаете:
- ScheduledThreadPoolExecutor
- очередь блокировки
- потокобезопасные параметры
- Использование LockSupport
Идеи дизайна и воплощение
Давайте поговорим о конкретном представлении о том, как этого добиться. Я надеюсь, что вы сможете научиться некоторым процедурам для анализа проблем и разработки модулей.
- Какая структура данных используется для хранения запросов, которые необходимо объединить?
- Поскольку наша система используется в среде с высокой степенью параллелизма, мы не должны использовать ее, как обычно.
ArrayList
держать. Мы можем использовать блокирующие очереди для хранения запросов, которые необходимо объединить. - Наша структура данных должна предоставлять метод add() снаружи для отправки данных. Когда добавляются внешние данные, нам нужно проверить, не достигло ли количество данных в очереди нашего предела? Отправьте данные, когда число будет достигнуто, и продолжайте ждать, если оно не будет достигнуто.
- В структуре данных также должен быть предусмотрен метод timeOut(). Внешний таймер регулярно вызывает этот метод timeOut. Если метод вызывается, он напрямую отправляет данные на удаленное устройство.
- Когда условия выполняются, поток выполняет действие отправки, когда условия не выполняются, поток должен приостановиться и ждать, пока очередь не достигнет условий для отправки данных. Таким образом, мы можем рассмотреть возможность использования
LockSuppor.park()
иLockSuppor.unpark
для приостановки и активации рабочего потока.
- Поскольку наша система используется в среде с высокой степенью параллелизма, мы не должны использовать ее, как обычно.
После проведенного выше анализа имеем такую структуру данных:
private static class FlushThread<Item> implements Runnable{
private final String name;
//队列大小
private final int bufferSize;
//操作间隔
private int flushInterval;
//上一次提交的时间。
private volatile long lastFlushTime;
private volatile Thread writer;
//持有数据的阻塞队列
private final BlockingQueue<Item> queue;
//达成条件后具体执行的方法
private final Processor<Item> processor;
//构造函数
public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor<Item> processor) {
this.name = name;
this.bufferSize = bufferSize;
this.flushInterval = flushInterval;
this.lastFlushTime = System.currentTimeMillis();
this.processor = processor;
this.queue = new ArrayBlockingQueue<>(queueSize);
}
//外部提交数据的方法
public boolean add(Item item){
boolean result = queue.offer(item);
flushOnDemand();
return result;
}
//提供给外部的超时方法
public void timeOut(){
//超过两次提交超过时间间隔
if(System.currentTimeMillis() - lastFlushTime >= flushInterval){
start();
}
}
//解除线程的阻塞
private void start(){
LockSupport.unpark(writer);
}
//当前的数据是否大于提交的条件
private void flushOnDemand(){
if(queue.size() >= bufferSize){
start();
}
}
//执行提交数据的方法
public void flush(){
lastFlushTime = System.currentTimeMillis();
List<Item> temp = new ArrayList<>(bufferSize);
int size = queue.drainTo(temp,bufferSize);
if(size > 0){
try {
processor.process(temp);
}catch (Throwable e){
log.error("process error",e);
}
}
}
//根据数据的尺寸和时间间隔判断是否提交
private boolean canFlush(){
return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval;
}
@Override
public void run() {
writer = Thread.currentThread();
writer.setName(name);
while (!writer.isInterrupted()){
while (!canFlush()){
//如果线程没有被打断,且不达到执行的条件,则阻塞线程
LockSupport.park(this);
}
flush();
}
}
}
- Как добиться своевременной подачи?
Обычно, когда мы сталкиваемся с требованиями, связанными со временем, первое, что приходит на ум, — это использоватьScheduledThreadPoolExecutor
Пришло время вызвать метод timeOut класса FlushThread, если вы думаете оThread.sleep()
... тогда вам нужно больше учиться и больше смотреть на исходный код.
- Как еще улучшить пропускную способность системы?
мы используемFlushThread
ДостигнутоRunnable
Таким образом, мы можем рассмотреть возможность использования пула потоков для хранения несколькихFlushThread
.
Итак, у нас есть такой код:
public class Flusher<Item> {
private final FlushThread<Item>[] flushThreads;
private AtomicInteger index;
//防止多个线程同时执行。增加一个随机数间隔
private static final Random r = new Random();
private static final int delta = 50;
private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1);
private static ExecutorService POOL = Executors.newCachedThreadPool();
public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor<Item> processor) {
this.flushThreads = new FlushThread[threads];
if(threads > 1){
index = new AtomicInteger();
}
for (int i = 0; i < threads; i++) {
final FlushThread<Item> flushThread = new FlushThread<Item>(name+ "-" + i,bufferSiz,flushInterval,queueSize,processor);
flushThreads[i] = flushThread;
POOL.submit(flushThread);
//定时调用 timeOut()方法。
TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS);
}
}
// 对 index 取模,保证多线程都能被add
public boolean add(Item item){
int len = flushThreads.length;
if(len == 1){
return flushThreads[0].add(item);
}
int mod = index.incrementAndGet() % len;
return flushThreads[mod].add(item);
}
//上文已经描述
private static class FlushThread<Item> implements Runnable{
...省略
}
}
- Интерфейсно-ориентированное программирование для улучшения масштабируемости системы:
public interface Processor<T> {
void process(List<T> list);
}
использовать
Давайте напишем тестовый метод, чтобы проверить это:
//实现 Processor 将 String 全部输出
public class PrintOutProcessor implements Processor<String>{
@Override
public void process(List<String> list) {
System.out.println("start flush");
list.forEach(System.out::println);
System.out.println("end flush");
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
Flusher<String> stringFlusher = new Flusher<>("test",5,1000,30,1,new PrintOutProcessor());
int index = 1;
while (true){
stringFlusher.add(String.valueOf(index++));
Thread.sleep(1000);
}
}
}
Результат выполнения:
start flush
1
2
3
end flush
start flush
4
5
6
7
end flush
Мы обнаружили, что флеш не срабатывал до достижения 5 номеров. Потому что срабатывает отправка тайм-аута, хотя указанное 5 не было достигнуто данные, но сброс все еще выполняется.
Если мы удалимThread.sleep(1000);
Посмотрите еще раз на результат:
start flush
1
2
3
4
5
end flush
start flush
6
7
8
9
10
end flush
Отправляйте каждые 5 отсчетов. Идеально. . . .
Суммировать
Более яркий пример, объясняющий некоторые особенности использования многопоточности. Чтобы изучить многопоточность, вы должны больше думать и делать больше практических действий, и тогда у вас будут лучшие результаты. Я надеюсь, что вы получите что-то после прочтения этой статьи, добро пожаловать на обмен.
адрес гитхаба:GitHub.com/tune центр 007…
Адрес статьи из серии рамок от руки: