Практика многопоточности для начинающих

интервью Java задняя часть GitHub
Практика многопоточности для начинающих

предисловие

Некоторое время назад я увидел на сторонней платформе, что количество слов, которые я написал, на самом деле превысило 10 Вт.Трудно представить, что мне нужно быть умным, чтобы написать сочинение из 800 слов в старшей школе.использовать разрывы строкДоделать (люди, которые понимают, наверное, тоже это сделали😏).

После этого у меня выработалась привычка: я проверяю все, что могу проверить сам.

Поэтому я написал инструмент в свободное время от сверхурочной работы всю ночь в прошлую пятницу:

GitHub.com/crossover J я…

использоватьSpringBootВам нужна только одна строка команды, чтобы подсчитать, сколько слов вы написали.

java -jar nows-0.0.1-SNAPSHOT.jar /xx/Hexo/source/_posts

Передайте каталог статей для сканирования, чтобы вывести результаты (в настоящее время поддерживается только.mdконецMarkdownдокумент)

Конечно, результат хороший (400 000 слов), потому что в ранних блогах мне нравился большой почтовый индекс, а некоторые английские слова не фильтровались, поэтому результаты были совсем другими.

Если точна только китайская текстовая статистика, а инструмент имеет встроенные гибкие методы расширения, пользователи могут настраивать статистические стратегии, подробности см. ниже.

На самом деле этот инструмент достаточно прост, а объем кода небольшой, поэтому особо упоминать не о чем. Но после своих воспоминаний я обнаружил общее явление как в интервью, так и в общении с пользователями сети:

самыйНовичок в разработкеБуду смотреть на многопоточность, но почти не связанную с этим практику. Некоторые даже не знают, какая польза от многопоточности в реальной разработке.

По этой причине я хочу представить для таких друзей практичный и понятный многопоточный кейс на основе этого простого инструмента.

Хотя бы дайте знать:

  • Зачем нужна многопоточность?
  • Как реализовать многопоточную программу?
  • Проблемы и решения, вызванные многопоточностью?

Статистика одного потока

Прежде чем говорить о многопоточности, давайте поговорим о том, как реализована однопоточность.

Требования на этот раз также очень просты, просто нужно отсканировать каталог, чтобы прочитать все файлы ниже.

Все наши реализации имеют следующие шаги:

  • Прочитать все файлы в каталоге.
  • Хранить пути ко всем файлам в памяти.
  • Переберите все файлы и прочитайте количество слов в текстовых записях один за другим.

Давайте посмотрим, как реализованы первые два, а при сканировании в каталог нужно продолжить чтение файлов в текущем каталоге.

Такой сценарий очень подходит для рекурсии:

    public List<String> getAllFile(String path){

        File f = new File(path) ;
        File[] files = f.listFiles();
        for (File file : files) {
            if (file.isDirectory()){
                String directoryPath = file.getPath();
                getAllFile(directoryPath);
            }else {
                String filePath = file.getPath();
                if (!filePath.endsWith(".md")){
                    continue;
                }
                allFile.add(filePath) ;
            }
        }

        return allFile ;
    }
}

Сохраните путь к файлу в коллекции после прочтения.

Следует отметить, что это количество рекурсий необходимо контролировать, чтобы избежать переполнения стека (StackOverflow).

Наконец, прочитайте содержимое файла, используяJava8для чтения из потока в , поэтому код может быть более кратким:

Stream<String> stringStream = Files.lines(Paths.get(path), StandardCharsets.UTF_8);
List<String> collect = stringStream.collect(Collectors.toList());

Следующим шагом является чтение количества слов, и в то же время фильтрация некоторого специального текста (например, я хочу отфильтровать все пробелы, разрывы строк, гиперссылки и т. д.).

Расширяемость

Простая обработка может быть пройдена в приведенном выше коде.collectЗатем замените содержимое, которое необходимо отфильтровать, пустым.

Но каждый может думать иначе. Например, я просто хочу отфильтровать空格、换行、超链接Это нормально, но некоторым людям нужно удалить из него все английские слова и даже оставить разрывы строк (точно так же, как при написании текста, это может составить количество слов).

Все это требует более гибкого подхода.

читать вышеПроектирование перехватчика с использованием шаблона цепочки ответственностиНетрудно представить себе такой сценарий, в котором паттерн цепочки ответственности является более подходящим.

о责任链模式Конкретный контент не будет подробно описываться, если вам интересно, вы можете ознакомиться с ним.выше.

Давайте посмотрим непосредственно на реализацию здесь:

Определите абстрактный интерфейс и метод обработки цепочки ответственности:

public interface FilterProcess {
    /**
     * 处理文本
     * @param msg
     * @return
     */
    String process(String msg) ;
}

Реализация, которая обрабатывает пробелы и новые строки:

public class WrapFilterProcess implements FilterProcess{
    @Override
    public String process(String msg) {
        msg = msg.replaceAll("\\s*", "");
        return msg ;
    }
}

Реализация для обработки гиперссылок:

public class HttpFilterProcess implements FilterProcess{
    @Override
    public String process(String msg) {
        msg = msg.replaceAll("^((https|http|ftp|rtsp|mms)?:\\/\\/)[^\\s]+","");
        return msg ;
    }
}

Это должно быть обработано во время инициализацииhandleдобавляются в цепочку ответственности, обеспечивая при этомAPIОн может быть выполнен клиентом.

Такой простой инструмент для подсчета слов готов.

многопоточный режим

Это все еще очень быстро выполнить один раз при условии, что в моем районе есть десятки блогов, но что, если наших файлов десятки тысяч, сотни тысяч или даже миллионы.

Хотя функция может быть достигнута, вполне возможно, что такие затраты времени определенно будут увеличиваться в геометрической прогрессии.

В настоящее время многопоточность имеет преимущество, и несколько потоков могут читать файл по отдельности и, наконец, суммировать результаты.

Процесс этой реализации становится:

  • Прочитать все файлы в каталоге.
  • Оставьте путь к файлу другому потоку, чтобы он обрабатывал его сам.
  • Итоговые итоговые результаты.

Проблемы, вызванные многопоточностью

Не факт, что с многопоточностью все будет хорошо, рассмотрим первую проблему: разделяемые ресурсы.

Проще говоря, как обеспечить согласованность общего количества слов в многопоточной и однопоточной статистике.

Основываясь на моей локальной среде, давайте посмотрим на результаты однопоточной работы:

Всего: 414142 слов.

Далее переключитесь в многопоточный режим:

List<String> allFile = scannerFile.getAllFile(strings[0]);
logger.info("allFile size=[{}]",allFile.size());
for (String msg : allFile) {
	executorService.execute(new ScanNumTask(msg,filterProcessManager));
}

public class ScanNumTask implements Runnable {

    private static Logger logger = LoggerFactory.getLogger(ScanNumTask.class);

    private String path;

    private FilterProcessManager filterProcessManager;

    public ScanNumTask(String path, FilterProcessManager filterProcessManager) {
        this.path = path;
        this.filterProcessManager = filterProcessManager;
    }

    @Override
    public void run() {
        Stream<String> stringStream = null;
        try {
            stringStream = Files.lines(Paths.get(path), StandardCharsets.UTF_8);
        } catch (Exception e) {
            logger.error("IOException", e);
        }

        List<String> collect = stringStream.collect(Collectors.toList());
        for (String msg : collect) {
            filterProcessManager.process(msg);
        }
    }
}

Используйте пул потоков для управления потоками. Дополнительную информацию, связанную с пулом потоков, см. здесь:"Как элегантно использовать и понимать пулы потоков"

Результаты:

Мы обнаружим, что независимо от того, сколько раз мы его выполняем, это значение будет меньше ожидаемого.

Посмотрим, как там реализована статистика.

@Component
public class TotalWords {
    private long sum = 0 ;

    public void sum(int count){
        sum += count;
    }

    public long total(){
        return sum;
    }
}

Видно, что это просто накопление базисного типа. Так что же заставляет это значение быть меньше, чем ожидалось?

Я думаю, большинство людей скажут: многопоточная работа приведет к тому, что некоторые потоки перезапишут значения, вычисленные другими потоками.

Но на самом деле это только видимость проблемы, а первопричина до сих пор не ясна.

видимость памяти

Основная причина на самом деле вызвана моделью памяти Java (JMM) в соответствии с правилами.

Вот цитата из предыдущего«Изменчивое ключевое слово, которое вы должны знать»Объяснение:

так какJavaмодель памяти (JMM) предусматривает, что все переменные хранятся в основной памяти, а каждый поток имеет свою рабочую память (кеш).

Когда поток работает, ему необходимо скопировать данные из основной памяти в рабочую память. Таким образом, любая операция с данными основана на рабочей памяти (повышение эффективности) и не может напрямую оперировать данными в основной памяти и рабочей памяти других потоков, а затем обновлять обновленные данные в основной памяти.

Упомянутую здесь основную память можно просто рассматривать каккуча памяти, а рабочую память можно рассматривать какстек памяти.

Как показано ниже:

Следовательно, во время параллельной работы данные, считанные потоком B, могут оказаться данными до обновления потока A.

Более связанный контент больше не будет расширяться, и заинтересованные друзья могут пролистать предыдущие сообщения в блоге.

Давайте поговорим о том, как решить эту проблему напрямую, JDK действительно помог нам подумать об этих проблемах.

существуетjava.util.concurrentСуществует множество инструментов параллелизма, которые вы можете использовать в пакете параллелизма.

Здесь очень подходитAtomicLong, который может изменять данные атомарно.

Давайте посмотрим на модифицированную реализацию:

@Component
public class TotalWords {
    private AtomicLong sum = new AtomicLong() ;
    
    public void sum(int count){
        sum.addAndGet(count) ;
    }

    public  long total(){
        return sum.get() ;
    }
}

только что использовал два из нихAPIВот и все. Запустите программу еще раз, и вы найдетеРезультат все еще неправильный.

даже 0.

Межпоточная связь

В это время возникает новая проблема, давайте посмотрим, как получить суммарные данные.

List<String> allFile = scannerFile.getAllFile(strings[0]);
logger.info("allFile size=[{}]",allFile.size());
for (String msg : allFile) {
	executorService.execute(new ScanNumTask(msg,filterProcessManager));
}

executorService.shutdown();
long total = totalWords.total();
long end = System.currentTimeMillis();
logger.info("total sum=[{}],[{}] ms",total,end-start);

Я не знаю, видите ли вы проблему.На самом деле, когда общее число наконец напечатано, неизвестно, закончили ли выполнение другие потоки.

так какexecutorService.execute()Он вернется напрямую, поэтому при печати и получении данных ни один поток не закончил выполнение, что и приводит к такому результату.

Ранее я уже писал похожий контент о межпотоковой связи:"Глубокое понимание потокового взаимодействия"

Возможны следующие способы:

Вот как мы используем пул потоков:

Добавьте условие оценки после деактивации пула потоков:

executorService.shutdown();
while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
	logger.info("worker running");
}
long total = totalWords.total();
long end = System.currentTimeMillis();
logger.info("total sum=[{}],[{}] ms",total,end-start);

Итак, мы пробуем еще раз и обнаруживаем, что независимо от того, сколько раз результат будет правильным:

Повышение эффективности

Некоторые друзья могут спросить, этот метод не показал значительного повышения эффективности.

На самом деле это связано с тем, что у меня мало локальных файлов, а трудоемкая обработка одного файла относительно невелика.

Даже если число потоков открыто достаточно, чтобы вызывать частые переключения контекста, эффективность выполнения все равно снижается.

Чтобы имитировать повышение эффективности, я позволяю текущему потоку спать в течение 100 миллисекунд каждый раз, когда обрабатывается файл, чтобы имитировать время выполнения.

Давайте сначала посмотрим, сколько времени требуется для запуска одного потока.

Всего потрачено времени:[8404] ms

Затем требуется время, когда размер пула потоков равен 4:

Всего потрачено времени:[2350] ms

Можно видеть, что повышение эффективности все еще очень очевидно.

больше думать

Это всего лишь одно из применений многопоточности, и я считаю, что друзья, увидевшие это здесь, должны понимать это лучше.

Я оставлю вам упражнение после чтения.Сцена похожа:

В Redis или других носителях хранятся десятки миллионов данных номеров мобильных телефонов, каждый номер уникален, и необходимо пройти все эти номера в кратчайшие сроки.

Друзья, у которых есть идеи и интересы, могут оставить сообщение в конце статьи для участия в обсуждении 🤔🤨.

Суммировать

Надеюсь, у друзей, прочитавших ее, могут быть свои ответы на несколько вопросов в начале текста:

  • Зачем нужна многопоточность?
  • Как реализовать многопоточную программу?
  • Проблемы и решения, вызванные многопоточностью?

Код в статье здесь.

GitHub.com/crossover J я…

Ваши лайки и ретвиты — лучшая поддержка.