Хотя сопрограмма является микропотоком, она не привязана к конкретному потоку, может выполняться в потоке А и приостанавливаться в определенный момент, когда запланировано возобновление выполнения в следующий раз, скорее всего, она будет выполнена в В нить.
1. с контекстом
Как и launch, async, runBlocking, withContext также относится к сборщикам Coroutine. Однако, в отличие от них, другие создают новую сопрограмму, а withContext не создает новую сопрограмму. withContext позволяет изменить поток выполнения сопрограммы, withContext необходимо передать CoroutineContext при его использовании.
launch {
val result1 = withContext(CommonPool) {
delay(2000)
1
}
val result2 = withContext(CommonPool) {
delay(1000)
2
}
val result = result1 + result2
println(result)
}
Thread.sleep(5000)
Результаты:
3
withContext может иметь возвращаемое значение, аналогичное async. Сопрограмма, созданная async, возвращает значение через метод await(). И withContext может вернуться напрямую.
launch {
val result1 = async {
delay(2000)
1
}
val result2 = async {
delay(1000)
2
}
val result = result1.await() + result2.await()
println(result)
}
Thread.sleep(5000)
Результаты:
3
2. Общий пул потоков
В приведенном выше примере withContext использует CommonPool. CommonPool наследует CoroutineDispatcher, что означает использование пула потоков для выполнения задач сопрограммы.
CommonPool чем-то похож на Schedulers.computation() RxJava, в основном для вычислительных задач, интенсивно использующих ЦП.
CommonPool использует пул для выполнения блоков.
override fun dispatch(context: CoroutineContext, block: Runnable) =
try { (pool ?: getOrCreatePoolSync()).execute(timeSource.trackTask(block)) }
catch (e: RejectedExecutionException) {
timeSource.unTrackTask()
DefaultExecutor.execute(block)
}
Если пул пуст, вызовите метод getOrCreatePoolSync(), чтобы создать пул.
@Synchronized
private fun getOrCreatePoolSync(): Executor =
pool ?: createPool().also { pool = it }
На данный момент метод createPool() является методом, создающим пул.
Во-первых, если диспетчер безопасности не пуст, используйте createPlainPool() для создания пула. В противном случае попробуйте создать ForkJoinPool, иначе используйте createPlainPool() для создания пула.
private fun createPool(): ExecutorService {
if (System.getSecurityManager() != null) return createPlainPool()
val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
?: return createPlainPool()
if (!usePrivatePool) {
Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
?.let { return it }
}
Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService }
?. let { return it }
return createPlainPool()
}
createPlainPool() будет использовать Executors.newFixedThreadPool() для создания пула потоков.
private fun createPlainPool(): ExecutorService {
val threadId = AtomicInteger()
return Executors.newFixedThreadPool(parallelism) {
Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
}
}
После общего понимания принципа создания CommonPool в исходном коде обнаруживается, что CoroutineDispatcher по умолчанию для CoroutineContext — это CommonPool.
/**
* This is the default [CoroutineDispatcher] that is used by all standard builders like
* [launch], [async], etc if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*
* It is currently equal to [CommonPool], but the value is subject to change in the future.
*/
@Suppress("PropertyName")
public actual val DefaultDispatcher: CoroutineDispatcher = CommonPool
Общий CoroutineDispatcher также может быть создан с помощью newSingleThreadContext(), newFixedThreadPoolContext() ThreadPoolDispatcher и функции расширения asCoroutineDispatcher() Executor.
В Android также доступен пользовательский интерфейс. Как следует из названия, он планирует выполнение в основном потоке Android.
3. Отменяемые сопрограммы
Объекты Job и Deferred могут отменять задачи.
3.1 cancel()
Используйте метод отмены():
val job = launch {
delay(1000)
println("Hello World!")
}
job.cancel()
println(job.isCancelled)
Thread.sleep(2000)
Результаты:
true
true означает, что задание было отменено и текст «Hello World!» не печатается
3.2 cancelAndJoin()
Используйте метод cancelAndJoin():
runBlocking<Unit> {
val job = launch {
repeat(100) { i ->
println("count time: $i")
delay(500)
}
}
delay(2100)
job.cancelAndJoin()
}
Результаты:
count time: 0
count time: 1
count time: 2
count time: 3
count time: 4
CancelAndJoin() эквивалентно использованию методов cancel() и join().
Метод join() используется для ожидания завершения запущенной сопрограммы и не распространяет свое исключение. Однако сбойная дочерняя сопрограмма также отменяет свою родительскую с соответствующим исключением.
3.3 Проверьте токен отмены сопрограммы
Если сопрограмма выполняла вычисления, она не может отменить без проверки флага отмены. Даже если вызывается cancel() или cancelAndJoin().
runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var tempTime = startTime
var i = 0
while (i < 100) {
if (System.currentTimeMillis() >= tempTime) {
println("count time: ${i++}")
tempTime += 500L
}
}
}
delay(2100)
job.cancelAndJoin()
}
Приведенный выше код по-прежнему печатается 100 раз.
При использованииisActiveОтметив флаг отмены, задание Job или Deferred's может быть отменено:
runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var tempTime = startTime
var i = 0
while (isActive) {
if (System.currentTimeMillis() >= tempTime) {
println("count time: ${i++}")
tempTime += 500L
}
}
}
delay(2100)
job.cancelAndJoin()
}
Результаты:
count time: 0
count time: 1
count time: 2
count time: 3
count time: 4
isActive является свойством CoroutineScope:
package kotlinx.coroutines.experimental
import kotlin.coroutines.experimental.*
import kotlin.internal.*
/**
* Receiver interface for generic coroutine builders, so that the code inside coroutine has a convenient
* and fast access to its own cancellation status via [isActive].
*/
public interface CoroutineScope {
/**
* Returns `true` when this coroutine is still active (has not completed and was not cancelled yet).
*
* Check this property in long-running computation loops to support cancellation:
* ```
* while (isActive) {
* // do some computation
* }
* ```
*
* This property is a shortcut for `coroutineContext.isActive` in the scope when
* [CoroutineScope] is available.
* See [coroutineContext][kotlin.coroutines.experimental.coroutineContext],
* [isActive][kotlinx.coroutines.experimental.isActive] and [Job.isActive].
*/
public val isActive: Boolean
/**
* Returns the context of this coroutine.
*
* @suppress: **Deprecated**: Replaced with top-level [kotlin.coroutines.experimental.coroutineContext].
*/
@Deprecated("Replace with top-level coroutineContext",
replaceWith = ReplaceWith("coroutineContext",
imports = ["kotlin.coroutines.experimental.coroutineContext"]))
@LowPriorityInOverloadResolution
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
public val coroutineContext: CoroutineContext
}
Суммировать:
В этой статье представлены три части: использование withContext, создание CommonPool и отмена сопрограмм. Среди них также представлено использование async и await.
Похожие статьи из этой серии:
Заметки о сопрограммах Kotlin (1)
Стек технологий Java и Android: еженедельно обновляйте и публикуйте оригинальные технические статьи, добро пожаловать, чтобы отсканировать QR-код общедоступной учетной записи ниже и подписаться, и с нетерпением ждем роста и развития вместе с вами.