Dart Stream 流
Stream(流)是 Dart 中处理连续异步事件序列的机制。
如果说 Future 是一次性的异步结果,那么 Stream 就是多次的、持续不断的异步数据流。
本章介绍 Stream 的概念、await for 监听、StreamController 创建以及单订阅与广播流的区别。
Stream 与事件序列
Stream 就像一条传送带,数据会随着时间推移一个个到来。
你不需要一次性等待所有数据,而是来一个处理一个。
Stream 的典型应用场景:
| 场景 | 示例 |
|---|---|
| 用户输入事件 | 按钮点击、鼠标移动、键盘输入 |
| 文件读取 | 逐行读取大文件 |
| 网络数据 | WebSocket 消息、实时 API |
| 定时器 | 每秒触发一次的定时事件 |
| 状态变化 | Flutter 中的状态管理流 |
实例
Stream 的基本使用——listen 监听:
// 创建一个 Stream:每隔 1 秒发射一个数字
var stream = Stream<int>.periodic(
Duration(seconds: 1),
(count) => count + 1, // count 从 0 开始
);
print('开始监听 Stream...');
// listen 订阅 Stream
var subscription = stream.listen(
(data) {
// 每当有新数据到达时调用
print('RUNOOB 收到数据: $data');
},
onError: (error) {
// Stream 发生错误时调用
print('错误: $error');
},
onDone: () {
// Stream 关闭时调用
print('Stream 已关闭');
},
);
// 5 秒后取消订阅
Future.delayed(Duration(seconds: 5), () {
subscription.cancel();
print('已取消订阅');
});
}
开始监听 Stream... RUNOOB 收到数据: 1 RUNOOB 收到数据: 2 RUNOOB 收到数据: 3 RUNOOB 收到数据: 4 RUNOOB 收到数据: 5 已取消订阅
listen() 返回一个 StreamSubscription 对象,你可以用它来控制订阅:
| 方法 | 作用 |
|---|---|
| subscription.pause() | 暂停接收数据 |
| subscription.resume() | 恢复接收数据 |
| subscription.cancel() | 取消订阅 |
| subscription.isPaused | 是否处于暂停状态 |
await for 监听
除了 listen() 回调方式,还可以使用 await for 循环来消费 Stream。
await for 让 Stream 的处理逻辑像普通的 for 循环一样清晰。
实例
Stream<int> countStream(int max) async* {
for (int i = 1; i <= max; i++) {
await Future.delayed(Duration(milliseconds: 300));
yield i; // yield 发射数据到 Stream
}
}
Future<void> main() async {
print('开始用 await for 消费 Stream...');
// await for:等待每个数据到达,逐个处理
await for (var value in countStream(5)) {
print('RUNOOB 计数: $value');
}
print('Stream 消费完毕');
}
开始用 await for 消费 Stream... RUNOOB 计数: 1 RUNOOB 计数: 2 RUNOOB 计数: 3 RUNOOB 计数: 4 RUNOOB 计数: 5 Stream 消费完毕
await for 的特点:
- 和普通 for 循环写法相似,但每次迭代会等待下一个数据到达
- 当 Stream 关闭时,循环自动结束
- 只能在 async 函数中使用
- 如果需要中途退出,使用 break(和普通 for 循环一样)
await for 适合"需要顺序处理所有数据"的场景,如读取文件所有行。listen() 适合"需要持续响应事件"的场景,如按钮点击。两者可以互相替代,但各有擅长的领域。
StreamController 创建 Stream
StreamController 是手动创建和控制 Stream 的工具。
你可以随时向其中添加数据、错误,或者关闭它。
实例
// 使用 StreamController 实现一个简单的倒计时器
class CountdownTimer {
final StreamController<int> _controller = StreamController<int>();
Timer? _timer;
int _remaining = 0;
// 暴露 Stream 给外部订阅
Stream<int> get tickStream => _controller.stream;
// 开始倒计时
void start(int seconds) {
_remaining = seconds;
// 立即发送初始值
_controller.add(_remaining);
_timer = Timer.periodic(Duration(seconds: 1), (timer) {
_remaining--;
if (_remaining > 0) {
_controller.add(_remaining); // 发送数据
} else {
_controller.add(0); // 发送最后的 0
_controller.close(); // 关闭 Stream
timer.cancel();
}
});
}
// 取消倒计时
void cancel() {
_timer?.cancel();
_controller.addError('倒计时被取消'); // 发送错误
_controller.close();
}
// 释放资源
void dispose() {
_controller.close();
}
}
Future<void> main() async {
var timer = CountdownTimer();
// 订阅倒计时事件
timer.tickStream.listen(
(remaining) {
print('RUNOOB 倒计时: $remaining 秒');
},
onError: (error) {
print('错误: $error');
},
onDone: () {
print('倒计时结束!');
},
);
timer.start(5);
// 等待倒计时完成
await Future.delayed(Duration(seconds: 6));
timer.dispose();
}
RUNOOB 倒计时: 5 秒 RUNOOB 倒计时: 4 秒 RUNOOB 倒计时: 3 秒 RUNOOB 倒计时: 2 秒 RUNOOB 倒计时: 1 秒 RUNOOB 倒计时: 0 秒 倒计时结束!
async* 生成器函数
如果你只需要简单地生成一个数据序列,async* 比 StreamController 更方便。
实例
// 返回类型必须是 Stream
Stream<String> readLinesAsync() async* {
var lines = ['第一行', '第二行', '第三行', 'RUNOOB'];
for (var line in lines) {
await Future.delayed(Duration(milliseconds: 500));
yield line; // yield 发射数据到 Stream
}
// 函数结束时 Stream 自动关闭
}
// 带错误处理的异步生成器
Stream<int> generateNumbersWithError() async* {
for (int i = 1; i <= 5; i++) {
await Future.delayed(Duration(milliseconds: 300));
if (i == 3) {
throw Exception('数字 3 出错了!');
}
yield i;
}
}
Future<void> main() async {
print('逐行读取:');
await for (var line in readLinesAsync()) {
print(' $line');
}
print('\n带错误的生成器:');
try {
await for (var num in generateNumbersWithError()) {
print(' 数字: $num');
}
} catch (e) {
print(' 捕获错误: $e');
}
}
逐行读取: 第一行 第二行 第三行 RUNOOB 带错误的生成器: 数字: 1 数字: 2 捕获错误: Exception: 数字 3 出错了!
单订阅 vs 广播流
Dart 的 Stream 分为两种类型:单订阅流(Single-subscription)和广播流(Broadcast)。
单订阅流(Single-subscription Stream)
这是默认的 Stream 类型。
它只能被一个监听者订阅,适合"从头到尾消费一次"的场景。
实例
// 单订阅流(默认)
var stream = Stream.fromIterable([1, 2, 3]);
// 第一个订阅:OK
stream.listen((data) => print('订阅者1: $data'));
// 第二个订阅:错误!单订阅流不能被多次订阅
// stream.listen((data) => print('订阅者2: $data')); // 运行时错误
}
广播流(Broadcast Stream)
广播流允许多个监听者同时订阅,适合事件广播场景。
实例
void main() {
// 创建一个广播流
var controller = StreamController<int>.broadcast();
// 多个订阅者可以同时监听
controller.stream.listen(
(data) => print('RUNOOB 订阅者A: 收到 $data'),
);
controller.stream.listen(
(data) => print('RUNOOB 订阅者B: 收到 $data'),
);
// 发送数据,两个订阅者都会收到
controller.add(1);
controller.add(2);
controller.add(3);
// 延迟订阅也能收到后续数据(但不会收到之前的数据)
Future.delayed(Duration(seconds: 1), () {
controller.stream.listen(
(data) => print('RUNOOB 迟到的订阅者C: 收到 $data'),
);
controller.add(4);
controller.close();
});
}
RUNOOB 订阅者A: 收到 1 RUNOOB 订阅者B: 收到 1 RUNOOB 订阅者A: 收到 2 RUNOOB 订阅者B: 收到 2 RUNOOB 订阅者A: 收到 3 RUNOOB 订阅者B: 收到 3 RUNOOB 订阅者A: 收到 4 RUNOOB 订阅者B: 收到 4 RUNOOB 迟到的订阅者C: 收到 4
两种 Stream 的对比:
| 特性 | 单订阅流 | 广播流 |
|---|---|---|
| 订阅者数量 | 只能一个 | 可以多个 |
| 数据重放 | 从头开始消费 | 只接收订阅后的数据 |
| 典型场景 | 文件读取、HTTP 响应 | 按钮点击、状态通知 |
| 创建方式 | StreamController() | StreamController.broadcast() |
广播流的"迟到订阅者"只能收到订阅之后的事件,之前的事件已经错过了。这是广播流和单订阅流最重要的行为差异。
Stream 常用转换方法
Stream 提供了一系列方法来转换和处理数据,类似于 List 的函数式方法。
实例
var numbers = Stream.fromIterable([1, 2, 3, 4, 5, 6]);
// map:转换每个数据
var doubled = numbers.map((n) => n * 2);
print('翻倍:');
await for (var n in doubled) {
print(' $n');
}
// where:过滤数据
var source = Stream.fromIterable([10, 15, 20, 25, 30]);
var even = source.where((n) => n % 2 == 0);
print('偶数:');
await for (var n in even) {
print(' $n');
}
// take:只取前 N 个
var infinite = Stream.periodic(
Duration(milliseconds: 100),
(i) => i + 1,
);
print('只取前 3 个:');
await for (var n in infinite.take(3)) {
print(' RUNOOB: $n');
}
// skip:跳过前 N 个
var data = Stream.fromIterable([1, 2, 3, 4, 5]);
print('跳过前 2 个:');
await for (var n in data.skip(2)) {
print(' $n');
}
// distinct:去重
var duplicates = Stream.fromIterable([1, 2, 2, 3, 3, 3]);
print('去重:');
await for (var n in duplicates.distinct()) {
print(' $n');
}
}
翻倍: 2 4 6 8 10 12 偶数: 10 20 30 只取前 3 个: RUNOOB: 1 RUNOOB: 2 RUNOOB: 3 跳过前 2 个: 3 4 5 去重: 1 2 3
