Заметки о сопрограммах Kotlin (2)

Java задняя часть Android RxJava Kotlin

躲雨的MM.jpg

Хотя сопрограмма является микропотоком, она не привязана к конкретному потоку, может выполняться в потоке А и приостанавливаться в определенный момент, когда запланировано возобновление выполнения в следующий раз, скорее всего, она будет выполнена в В нить.

1. с контекстом

Как и launch, async, runBlocking, withContext также относится к сборщикам Coroutine. Однако, в отличие от них, другие создают новую сопрограмму, а withContext не создает новую сопрограмму. withContext позволяет изменить поток выполнения сопрограммы, withContext необходимо передать CoroutineContext при его использовании.

    launch {

        val result1 = withContext(CommonPool) {

            delay(2000)
            1
        }

        val  result2 = withContext(CommonPool) {

            delay(1000)
            2
        }

        val  result = result1 + result2
        println(result)
    }

    Thread.sleep(5000)

Результаты:

3

withContext может иметь возвращаемое значение, аналогичное async. Сопрограмма, созданная async, возвращает значение через метод await(). И withContext может вернуться напрямую.

    launch {

        val result1 = async {

            delay(2000)
            1
        }

        val  result2 = async {

            delay(1000)
            2
        }

        val  result = result1.await() + result2.await()
        println(result)
    }

    Thread.sleep(5000)

Результаты:

3

2. Общий пул потоков

В приведенном выше примере withContext использует CommonPool. CommonPool наследует CoroutineDispatcher, что означает использование пула потоков для выполнения задач сопрограммы.

CommonPool.png

CommonPool чем-то похож на Schedulers.computation() RxJava, в основном для вычислительных задач, интенсивно использующих ЦП.

CommonPool использует пул для выполнения блоков.

    override fun dispatch(context: CoroutineContext, block: Runnable) =
        try { (pool ?: getOrCreatePoolSync()).execute(timeSource.trackTask(block)) }
        catch (e: RejectedExecutionException) {
            timeSource.unTrackTask()
            DefaultExecutor.execute(block)
        }

Если пул пуст, вызовите метод getOrCreatePoolSync(), чтобы создать пул.

    @Synchronized
    private fun getOrCreatePoolSync(): Executor =
        pool ?: createPool().also { pool = it }

На данный момент метод createPool() является методом, создающим пул.

Во-первых, если диспетчер безопасности не пуст, используйте createPlainPool() для создания пула. В противном случае попробуйте создать ForkJoinPool, иначе используйте createPlainPool() для создания пула.

    private fun createPool(): ExecutorService {
        if (System.getSecurityManager() != null) return createPlainPool()
        val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
            ?: return createPlainPool()
        if (!usePrivatePool) {
            Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
                ?.let { return it }
        }
        Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService }
            ?. let { return it }
        return createPlainPool()
    }

createPlainPool() будет использовать Executors.newFixedThreadPool() для создания пула потоков.

    private fun createPlainPool(): ExecutorService {
        val threadId = AtomicInteger()
        return Executors.newFixedThreadPool(parallelism) {
            Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
        }
    }

После общего понимания принципа создания CommonPool в исходном коде обнаруживается, что CoroutineDispatcher по умолчанию для CoroutineContext — это CommonPool.

/**
 * This is the default [CoroutineDispatcher] that is used by all standard builders like
 * [launch], [async], etc if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
 *
 * It is currently equal to [CommonPool], but the value is subject to change in the future.
 */
@Suppress("PropertyName")
public actual val DefaultDispatcher: CoroutineDispatcher = CommonPool

Общий CoroutineDispatcher также может быть создан с помощью newSingleThreadContext(), newFixedThreadPoolContext() ThreadPoolDispatcher и функции расширения asCoroutineDispatcher() Executor.

В Android также доступен пользовательский интерфейс. Как следует из названия, он планирует выполнение в основном потоке Android.

3. Отменяемые сопрограммы

Объекты Job и Deferred могут отменять задачи.

3.1 cancel()

Используйте метод отмены():

    val job = launch {
        delay(1000)
        println("Hello World!")
    }
    job.cancel()
    println(job.isCancelled)
    Thread.sleep(2000)

Результаты:

true

true означает, что задание было отменено и текст «Hello World!» не печатается

3.2 cancelAndJoin()

Используйте метод cancelAndJoin():

    runBlocking<Unit> {

        val job = launch {

            repeat(100) { i ->
                println("count time: $i")
                delay(500)
            }
        }
        delay(2100)
        job.cancelAndJoin()
    }

Результаты:

count time: 0
count time: 1
count time: 2
count time: 3
count time: 4

CancelAndJoin() эквивалентно использованию методов cancel() и join().

Метод join() используется для ожидания завершения запущенной сопрограммы и не распространяет свое исключение. Однако сбойная дочерняя сопрограмма также отменяет свою родительскую с соответствующим исключением.

3.3 Проверьте токен отмены сопрограммы

Если сопрограмма выполняла вычисления, она не может отменить без проверки флага отмены. Даже если вызывается cancel() или cancelAndJoin().

    runBlocking<Unit> {

        val startTime = System.currentTimeMillis()
        val job = launch {
            var tempTime = startTime
            var i = 0
            while (i < 100) {

                if (System.currentTimeMillis() >= tempTime) {
                    println("count time: ${i++}")
                    tempTime += 500L
                }
            }
        }
        delay(2100)
        job.cancelAndJoin()
    }

Приведенный выше код по-прежнему печатается 100 раз.

При использованииisActiveОтметив флаг отмены, задание Job или Deferred's может быть отменено:

    runBlocking<Unit> {

        val startTime = System.currentTimeMillis()
        val job = launch {
            var tempTime = startTime
            var i = 0
            while (isActive) {

                if (System.currentTimeMillis() >= tempTime) {
                    println("count time: ${i++}")
                    tempTime += 500L
                }
            }
        }
        delay(2100)
        job.cancelAndJoin()
    }

Результаты:

count time: 0
count time: 1
count time: 2
count time: 3
count time: 4

isActive является свойством CoroutineScope:

package kotlinx.coroutines.experimental

import kotlin.coroutines.experimental.*
import kotlin.internal.*

/**
 * Receiver interface for generic coroutine builders, so that the code inside coroutine has a convenient
 * and fast access to its own cancellation status via [isActive].
 */
public interface CoroutineScope {
    /**
     * Returns `true` when this coroutine is still active (has not completed and was not cancelled yet).
     *
     * Check this property in long-running computation loops to support cancellation:
     * ```
     * while (isActive) {
     *     // do some computation
     * }
     * ```
     *
     * This property is a shortcut for `coroutineContext.isActive` in the scope when
     * [CoroutineScope] is available.
     * See [coroutineContext][kotlin.coroutines.experimental.coroutineContext],
     * [isActive][kotlinx.coroutines.experimental.isActive] and [Job.isActive].
     */
    public val isActive: Boolean

    /**
     * Returns the context of this coroutine.
     *
     * @suppress: **Deprecated**: Replaced with top-level [kotlin.coroutines.experimental.coroutineContext].
     */
    @Deprecated("Replace with top-level coroutineContext",
        replaceWith = ReplaceWith("coroutineContext",
            imports = ["kotlin.coroutines.experimental.coroutineContext"]))
    @LowPriorityInOverloadResolution
    @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
    public val coroutineContext: CoroutineContext
}

Суммировать:

В этой статье представлены три части: использование withContext, создание CommonPool и отмена сопрограмм. Среди них также представлено использование async и await.

Похожие статьи из этой серии:

Заметки о сопрограммах Kotlin (1)


Стек технологий Java и Android: еженедельно обновляйте и публикуйте оригинальные технические статьи, добро пожаловать, чтобы отсканировать QR-код общедоступной учетной записи ниже и подписаться, и с нетерпением ждем роста и развития вместе с вами.