1. Обзор

В этой статье мы собираемся узнать о Future. Интерфейс, который существует со времен Java 1.5 и может быть весьма полезным при работе с асинхронными вызовами и параллельной обработкой.

2. Создание Future

Проще говоря, интерфейс Future представляет будущий результат асинхронного вычисления, который в конечном итоге появится в Future после завершения обработки.

Давайте посмотрим, как писать методы, которые создают и возвращают экземпляр Future.

Длительные методы являются хорошими кандидатами для асинхронной обработки и интерфейса Future. Это позволяет нам выполнять какой-то другой процесс, пока мы ожидаем завершения задачи, заключенной в Future.

Некоторые примеры операций, которые могли бы использовать асинхронную природу Future:

  • вычислительные интенсивные процессы (математические и научные расчеты)
  • манипулирование большими структурами данных (больших данных)
  • удаленные вызовы методов, загрузка файлов, очистка HTML, веб-сервисы.

2.1. Реализация Future с помощью FutureTask

Для нашего примера мы собираемся создать очень простой класс, который вычисляет квадрат целого числа. Это определенно не подходит под категорию длительных методов, но мы собираемся сделать вызов Thread.sleep(), чтобы завершить его за последнюю секунду:

public class SquareCalculator {    
    
    private ExecutorService executor 
      = Executors.newSingleThreadExecutor();
    
    public Future<Integer> calculate(Integer input) {        
        return executor.submit(() -> {
            Thread.sleep(1000);
            return input * input;
        });
    }
}

Кусок кода, который фактически выполняет вычисления, содержится в методе call(), представленном как лямбда-выражение. Как вы можете видеть, в этом нет ничего особенного, за исключением вызова sleep(), упомянутого ранее.

Это становится более интересным, когда мы обращаем наше внимание на использование Callable и ExecutorService.

Callable - это интерфейс, представляющий задачу, которая возвращает результат и имеет единственный метод вызова. Здесь мы создали его экземпляр, используя лямбда-выражение.

Создание экземпляра Callable никуда нас не приведет, нам все равно придется передать этот экземпляр исполнителю, который позаботится о запуске этой задачи в новом потоке и вернет нам нужный объект Future. Вот где ExecutorService и нужен.

Есть несколько способов получить экземпляр ExecutorService, большинство из них обеспечивается статическими методами фабрики служебного класса Executors. В этом примере мы использовали базовый newSingleThreadExecutor, который дает нам ExecutorService, способный обрабатывать один поток за раз.

Как только у нас есть объект ExecutorService, нам просто нужно вызвать submit(), передавая наш Callable в качестве аргумента. submit() позаботится о запуске задачи и вернет объект FutureTask, который является реализацией интерфейса Future.

3. Использование Future

До этого момента мы узнали, как создать экземпляр Future.

В этом разделе вы узнаете, как работать с этим экземпляром, изучив все методы, которые являются частью Future API.

3.1. Использование isDone() и get() чтобы получить результаты

Теперь нам нужно вызвать метод calculate() и использовать возвращенное Future, чтобы получить полученное целое число. Два метода из Future API помогут нам в этой задаче.

Future.isDone() сообщает нам, если исполнитель завершил обработку задачи. Если задача завершена, она вернет true, в противном случае она вернет false.

Метод, который возвращает фактический результат расчета - Future.get(). Обратите внимание, что этот метод блокирует выполнение до тех пор, пока задача не будет завершена, но в нашем примере это не будет проблемой, так как сначала проверьте, завершена ли задача, вызвав isDone().

Используя эти два метода, мы можем запустить какой-то другой код, ожидая завершения основной задачи.

Future<Integer> future = new SquareCalculator().calculate(10);

while(!future.isDone()) {
    System.out.println("Calculating...");
    Thread.sleep(300);
}

Integer result = future.get();

В этом примере мы выводим простое сообщение на вывод, чтобы пользователь знал, что программа выполняет вычисления.

Метод get() блокирует выполнение до завершения задачи. Но нам не нужно беспокоиться об этом, так как наш пример достигает точки, где get() вызывается только после того, как удостоверится, что задача выполнена. Таким образом, в этом сценарии future.get() всегда будет возвращаться немедленно.

Стоит отметить, что get() имеет перегруженную версию, которая принимает в качестве аргументов timeout и TimeUnit:

Integer result = future.get(500, TimeUnit.MILLISECONDS);

Разница между get(long, TimeUnit) и get() заключается в том, что первый вызовет исключение TimeoutException, если задача не вернется раньше указанного периода времени ожидания.

3.2. Отмена Future с cancel()

Предположим, мы запустили задачу, но по какой-то причине мы больше не заботимся о результате. Мы можем использовать Future.cancel(boolean), чтобы сказать исполнителю остановить операцию и прервать ее основной поток.

Future<Integer> future = new SquareCalculator().calculate(4);

boolean canceled = future.cancel(true);

Наш экземпляр Future из приведенного выше кода никогда не завершит свою работу. Фактически, если мы попытаемся вызвать get() из этого экземпляра, после вызова отмены, результатом будет CancellationException. Future.isCancelled() сообщит нам, если будущее уже было отменено. Это может быть очень полезно, чтобы избежать получения CancellationException.

Возможно, что отмена вызова не удалась. В этом случае его возвращаемое значение будет ложным. Обратите внимание, что cancel() принимает логическое значение в качестве аргумента, который контролирует, должен ли поток, выполняющий эту задачу, прерываться или нет.

4. Больше многопоточности с пулами потоков

Наш текущий ExecutorService является однопоточным, так как он был получен с Executors.newSingleThreadExecutor. Чтобы выделить эту единственную многопоточность, давайте запустим два вычисления одновременно:

SquareCalculator squareCalculator = new SquareCalculator();

Future<Integer> future1 = squareCalculator.calculate(10);
Future<Integer> future2 = squareCalculator.calculate(100);

while (!(future1.isDone() && future2.isDone())) {
    System.out.println(
      String.format(
        "future1 is %s and future2 is %s", 
        future1.isDone() ? "done" : "not done", 
        future2.isDone() ? "done" : "not done"
      )
    );
    Thread.sleep(300);
}

Integer result1 = future1.get();
Integer result2 = future2.get();

System.out.println(result1 + " and " + result2);

squareCalculator.shutdown();

Теперь давайте проанализируем вывод для этого кода:

calculating square for: 10
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
calculating square for: 100
future1 is done and future2 is not done
future1 is done and future2 is not done
future1 is done and future2 is not done
100 and 10000

Понятно, что процесс не параллельный. Обратите внимание, как вторая задача запускается только после завершения первой задачи, поэтому весь процесс занимает около секунды.

Чтобы сделать нашу программу действительно многопоточной, мы должны использовать другой вариант ExecutorService. Давайте посмотрим, как меняется поведение нашего примера, если мы используем пул потоков, предоставляемый фабричным методом Executors.newFixedThreadPool:

public class SquareCalculator {
 
    private ExecutorService executor = Executors.newFixedThreadPool(2);
    
    //...
}

С помощью простого изменения в нашем классе SquareCalculator теперь у нас есть исполнитель, который может использовать одновременные потоки.

Если мы снова запустим тот же самый клиентский код, получим следующий вывод

calculating square for: 10
calculating square for: 100
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
future1 is not done and future2 is not done
100 and 10000

Сейчас это выглядит намного лучше. Обратите внимание, как задачи начинаются и заканчиваются одновременно, и весь процесс занимает около секунды.

Существуют и другие фабричные методы, которые можно использовать для создания пулов потоков, например, Executors.newCachedThreadPool, который повторно использует ранее использованные потоки, когда они доступны, и Executors.newScheduledThreadPool, который планирует запуск команд после заданной задержки.

5. Обзор ForkJoinTask

ForkJoinTask - это абстрактный класс, который реализует Future и способен выполнять большое количество задач, размещаемых небольшим количеством реальных потоков в ForkJoinPool.

Тогда основной характеристикой ForkJoinTask является то, что он обычно порождает новые подзадачи как часть работы, необходимой для выполнения своей основной задачи. Он генерирует новые задачи с помощью вызова fork() и собирает все результаты с помощью join(), то есть имени класса.

Есть два абстрактных класса, которые реализуют ForkJoinTask: RecursiveTask, который возвращает значение после завершения, и RecursiveAction, который ничего не возвращает. Как видно из названий, эти классы должны использоваться для рекурсивных задач, таких как, например, навигация по файловой системе или сложные математические вычисления.

Давайте расширим наш предыдущий пример, чтобы создать класс, который, учитывая целое число, будет вычислять квадраты суммы для всех его факторных элементов. Так, например, если мы передадим число в наш калькулятор, мы должны получить результат из суммы 4² + 3² + 2² + 1², которая есть 30.

Прежде всего, нам нужно создать конкретную реализацию RecursiveTask и реализовать ее метод вычисления. Это где хорошо написать нашу бизнес-логику

public class FactorialSquareCalculator extends RecursiveTask<Integer> {
 
    private Integer n;

    public FactorialSquareCalculator(Integer n) {
        this.n = n;
    }

    @Override
    protected Integer compute() {
        if (n <= 1) {
            return n;
        }

        FactorialSquareCalculator calculator 
          = new FactorialSquareCalculator(n - 1);

        calculator.fork();

        return n * n + calculator.join();
    }
}

Обратите внимание, как мы достигаем рекурсивности, создавая новый экземпляр FactorialSquareCalculator в пределах вычислений. Вызывая fork, неблокирующий метод, мы просим ForkJoinPool инициировать выполнение этой подзадачи.

Метод join вернет результат этого вычисления, к которому мы добавим квадрат числа, которое мы в настоящее время посещаем.

Теперь нам просто нужно создать ForkJoinPool для обработки выполнения и управления потоками.

ForkJoinPool forkJoinPool = new ForkJoinPool();

FactorialSquareCalculator calculator = new FactorialSquareCalculator(10);

forkJoinPool.execute(calculator);

6. Заключение

В этой статье мы подробно рассмотрели интерфейс Future, посетив все его методы. Мы также узнали, как использовать возможности пулов потоков для запуска нескольких параллельных операций. Основные методы из класса ForkJoinTask, fork и join были также кратко описаны.