предисловие
Во Flutter есть два способа обработки асинхронных операций.Future
а такжеStream
,Future
для обработки одной асинхронной операции,Stream
Используется для обработки непрерывных асинхронных операций. Например, наливая воду в стакан для воды, наполняя стакан для воды в одинFuture
, последовательное наполнение нескольких чашек с водойStream
.
Детали трансляции
Stream
— это абстрактный класс, используемый для представления источника последовательности асинхронных данных. Это способ создания непрерывных событий, либо событий данных, либо событий ошибок, а также события завершения в конце потока.
abstract class Stream<T> {
Stream();
}
Stream
Разделяйте подписные потоки и широковещательные потоки.
Поток с одной подпиской позволяет установить только один прослушиватель перед отправкой полного события и начинает генерировать события только после установки прослушивателя в потоке и прекращает отправку событий при отмене прослушивателя. Установка дополнительных прослушивателей в одном потоке подписки не допускается, даже если первый прослушиватель отменен. Широковещательные потоки позволяют настроить несколько слушателей или добавить новых слушателей после отмены предыдущего.
Stream
Существуют синхронные и асинхронные потоки.
Разница в том, что поток синхронизации будет выполнятьсяadd
,addError
илиclose
метод немедленно слушателю потокаStreamSubscription
События отправляются, в то время как асинхронные потоки всегда отправляют события после завершения выполнения кода в очереди событий.
Stream
семья
StreamController
Поток с методами потока управления. Он может отправлять данные, ошибки и события завершения в свой поток, а также может проверять, приостановлен ли поток данных и есть ли слушатели.sync
Параметр определяет, является ли поток синхронным или асинхронным.
abstract class StreamController<T> implements StreamSink<T> {
Stream<T> get stream;
/// ...
}
StreamController _streamController = StreamController(
onCancel: () {},
onListen: () {},
onPause: () {},
onResume: () {},
sync: false,
);
StreamSink
流事件的入口。 поставкаadd
,addError
,addStream
способ отправки событий в поток.
abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> {
Future close();
/// ...
Future get done;
}
StreamSubscription
流的监听器。 поставкаcacenl
,pause
, resume
abstract class StreamSubscription<T> {
/// ...
}
StreamSubscription subscription = StreamController().stream.listen(print);
subscription.onDone(() => print('done'));
StreamBuilder
Виджет, отображающий интерфейс пользовательского интерфейса с использованием потоковых данных.
StreamBuilder(
// 数据流
stream: stream,
// 初始数据
initialData: 'loading...',
builder: (context, AsyncSnapshot snapshot) {
// AsyncSnapshot 对象为数据快照,缓存了当前数据和状态
// snapshot.connectionState
// snapshot.data
if (snapshot.hasData) {
Map data = snapshot.data;
return Text(data),
}
return CircularProgressIndicator();
},
)
Создать поток
Есть несколько способов создать в DartStream
- Создать новый поток из существующего
Stream
,использоватьmap
,where
,takeWhile
и другие методы.
// 整数流
Stream<int> intStream = StreamController<int>().stream;
// 偶数流
Stream<int> evenStream = intStream.where((int n) => n.isEven);
// 两倍流
Stream<int> doubleStream = intStream.map((int n) => n * 2);
// 数字大于 10 的流
Stream<int> biggerStream = intStream.takeWhile((int n) => n > 10);
- использовать
async*
функция.
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
yield i;
}
}
Stream stream = countStream(10);
stream.listen(print);
- использовать
StreamController
.
StreamController<Map> _streamController = StreamController(
onCancel: () {},
onListen: () {},
onPause: () {},
onResume: () {},
sync: false,
);
Stream _stream = _streamController.stream;
- использовать
Future
генерация объекта
Future<int> _delay(int seconds) async {
await Future.delayed(Duration(seconds: seconds));
return seconds;
}
List<Future> futures = [];
for (int i = 0; i < 10; i++) {
futures.add(_delay(3));
}
Stream _futuresStream = Stream.fromFutures(futures);
Применить поток
Stream Counter
Измените проект Flutter по умолчанию для использованияStream
выполнить
import 'dart:async';
import 'package:flutter/material.dart';
class StreamCounter extends StatefulWidget {
@override
_StreamCounterState createState() => _StreamCounterState();
}
class _StreamCounterState extends State<StreamCounter> {
// 创建一个 StreamController
StreamController<int> _counterStreamController = StreamController<int>(
onCancel: () {
print('cancel');
},
onListen: () {
print('listen');
},
);
int _counter = 0;
Stream _counterStream;
StreamSink _counterSink;
// 使用 StreamSink 向 Stream 发送事件,当 _counter 大于 9 时调用 close 方法关闭流。
void _incrementCounter() {
if (_counter > 9) {
_counterSink.close();
return;
}
_counter++;
_counterSink.add(_counter);
}
// 主动关闭流
void _closeStream() {
_counterStreamController.close();
}
@override
void initState() {
super.initState();
_counterSink = _counterStreamController.sink;
_counterStream = _counterStreamController.stream;
}
@override
void dispose() {
super.dispose();
_counterSink.close();
_counterStreamController.close();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Stream Counter'),
),
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: <Widget>[
Text('You have pushed the button this many times:'),
// 使用 StreamBuilder 显示和更新 UI
StreamBuilder<int>(
stream: _counterStream,
initialData: _counter,
builder: (context, snapshot) {
if (snapshot.connectionState == ConnectionState.done) {
return Text(
'Done',
style: Theme.of(context).textTheme.bodyText2,
);
}
int number = snapshot.data;
return Text(
'$number',
style: Theme.of(context).textTheme.bodyText2,
);
},
),
],
),
),
floatingActionButton: Row(
mainAxisAlignment: MainAxisAlignment.center,
children: <Widget>[
FloatingActionButton(
onPressed: _incrementCounter,
tooltip: 'Increment',
child: Icon(Icons.add),
),
SizedBox(width: 24.0),
FloatingActionButton(
onPressed: _closeStream,
tooltip: 'Close',
child: Icon(Icons.close),
),
],
),
);
}
}
NetWork Status
Отслеживайте состояние подключения к сети мобильного телефона, сначала добавьтеconnectivity
плагин
dependencies:
connectivity: ^0.4.8+2
import 'dart:async';
import 'package:connectivity/connectivity.dart';
import 'package:flutter/material.dart';
class NetWorkStatus extends StatefulWidget {
@override
_NetWorkStatusState createState() => _NetWorkStatusState();
}
class _NetWorkStatusState extends State<NetWorkStatus> {
StreamController<ConnectivityResult> _streamController = StreamController();
StreamSink _streamSink;
Stream _stream;
String _result;
void _checkStatus() async {
final ConnectivityResult result = await Connectivity().checkConnectivity();
if (result == ConnectivityResult.mobile) {
_result = 'mobile';
} else if (result == ConnectivityResult.wifi) {
_result = 'wifi';
} else if (result == ConnectivityResult.none) {
_result = 'none';
}
setState(() {});
}
@override
void initState() {
super.initState();
_stream = _streamController.stream;
_streamSink = _streamController.sink;
_checkStatus();
Connectivity().onConnectivityChanged.listen(
(ConnectivityResult result) {
_streamSink.add(result);
},
);
}
@override
dispose() {
super.dispose();
_streamSink.close();
_streamController.close();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Network Status'),
),
body: Center(
child: StreamBuilder<ConnectivityResult>(
stream: _stream,
builder: (context, AsyncSnapshot snapshot) {
if (snapshot.hasData) {
if (snapshot.data == ConnectivityResult.mobile) {
_result = 'mobile';
} else if (snapshot.data == ConnectivityResult.wifi) {
_result = 'wifi';
} else if (snapshot.data == ConnectivityResult.none) {
return Text('还没有链接网络');
}
}
if (_result == null) {
return CircularProgressIndicator();
}
return ResultText(_result);
},
),
),
);
}
}
class ResultText extends StatelessWidget {
final String result;
const ResultText(this.result);
@override
Widget build(BuildContext context) {
return RichText(
text: TextSpan(
style: TextStyle(color: Colors.black),
text: '正在使用',
children: [
TextSpan(
text: ' $result ',
style: TextStyle(
color: Colors.red,
fontSize: 20.0,
fontWeight: FontWeight.bold,
),
),
TextSpan(text: '链接网络'),
],
),
);
}
}
Random Article
Запрос сетевых данных для создания потока
dependencies:
dio: ^3.0.9
flutter_html: ^0.11.1
import 'dart:async';
import 'package:dio/dio.dart';
import 'package:flutter/material.dart';
import 'package:flutter_html/flutter_html.dart';
class RandomArticle extends StatefulWidget {
@override
_RandomArticleState createState() => _RandomArticleState();
}
class _RandomArticleState extends State<RandomArticle> {
static Dio _dio = Dio(
BaseOptions(baseUrl: 'https://interface.meiriyiwen.com'),
);
static Future<Map> _getArticle() async {
Response response = await _dio.get(
'/article/random',
queryParameters: {"dev": 1},
);
final data = response.data['data'];
return data;
}
Stream<Map> _futuresStream;
@override
void initState() {
List<Future<Map>> futures = [];
for (int i = 0; i < 10; i++) {
// 添加 Future
futures.add(_getArticle());
}
// 生成 Stream
_futuresStream = Stream<Map>.fromFutures(futures);
super.initState();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('Random Article')),
body: SingleChildScrollView(
child: Center(
child: StreamBuilder<Map>(
stream: _futuresStream,
builder: (context, AsyncSnapshot snapshot) {
if (snapshot.hasData) {
Map article = snapshot.data;
return Container(
child: Column(
children: <Widget>[
SizedBox(height: 24.0),
Text(
article['title'],
style: TextStyle(fontSize: 24.0),
),
Padding(
padding: const EdgeInsets.only(
top: 12.0,
left: 12.0,
right: 12.0,
bottom: 60.0,
),
child: Html(
data: article['content'],
),
),
],
),
);
}
return CircularProgressIndicator();
},
),
),
),
);
}
}
Broadcast Stream
Использовать трансляцию
import 'dart:async';
import 'package:flutter/material.dart';
class BroadcastStream extends StatefulWidget {
@override
_BroadcastStreamState createState() => _BroadcastStreamState();
}
class _BroadcastStreamState extends State<BroadcastStream> {
StreamController<int> _streamController = StreamController<int>.broadcast();
StreamSubscription _subscription1;
StreamSubscription _subscription2;
StreamSubscription _subscription3;
int _count = 0;
int _s1 = 0;
int _s2 = 0;
int _s3 = 0;
@override
void initState() {
_subscription1 = _streamController.stream.listen((n) {
setState(() {
_s1 += 1;
});
});
_subscription2 = _streamController.stream.listen((n) {
setState(() {
_s2 += 2;
});
});
_subscription3 = _streamController.stream.listen((n) {
setState(() {
_s3 -= 1;
});
});
super.initState();
}
void _add() {
if (_count > 10) {
// 大于 10 时停止第一个订阅
_subscription1.cancel();
}
_count++;
_streamController.add(_count);
}
@override
void dispose() {
super.dispose();
_streamController.close();
_subscription1.cancel();
_subscription2.cancel();
_subscription3.cancel();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Broadcast Stream'),
),
body: Container(
width: double.infinity,
height: MediaQuery.of(context).size.height,
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
crossAxisAlignment: CrossAxisAlignment.center,
children: [
Text('Count: $_count'),
SizedBox(height: 12.0),
Text('S1: $_s1'),
SizedBox(height: 12.0),
Text('S2: $_s2'),
SizedBox(height: 12.0),
Text('S3: $_s3'),
SizedBox(height: 12.0),
FloatingActionButton(
onPressed: _add,
child: Icon(Icons.plus_one),
),
],
),
),
);
}
}
Суммировать
Stream
— это один из способов работы с асинхронным программированием, он обеспечивает асинхронную последовательность событий, которые отправляются, когда вы готовы их получить. В Dart потоки делятся на синхронные потоки и асинхронные потоки, а также потоки одиночной подписки и широковещательные потоки.Существует несколько способов их создания.Stream
.
Ссылаться на
Асинхронное программирование: использование потоков
Всестороннее и глубокое понимание Stream
Building a Widget with StreamBuilder
StreamBuilder (виджет недели Flutter)
Потоки дротика - Флаттер фокуса
В этой статье используетсяmdniceнабор текста
PS: [Гуанчжоу] Автор может связаться с нами при приеме на работу.