现在位置: 首页 > Dart 教程 > 正文

Dart Stream 流

Stream(流)是 Dart 中处理连续异步事件序列的机制。

如果说 Future 是一次性的异步结果,那么 Stream 就是多次的、持续不断的异步数据流。

本章介绍 Stream 的概念、await for 监听、StreamController 创建以及单订阅与广播流的区别。


Stream 与事件序列

Stream 就像一条传送带,数据会随着时间推移一个个到来。

你不需要一次性等待所有数据,而是来一个处理一个。

Stream 的典型应用场景:

场景示例
用户输入事件按钮点击、鼠标移动、键盘输入
文件读取逐行读取大文件
网络数据WebSocket 消息、实时 API
定时器每秒触发一次的定时事件
状态变化Flutter 中的状态管理流

实例

Stream 的基本使用——listen 监听:

void main() {
  // 创建一个 Stream:每隔 1 秒发射一个数字
  var stream = Stream<int>.periodic(
    Duration(seconds: 1),
    (count) => count + 1,  // count 从 0 开始
  );

  print('开始监听 Stream...');

  // listen 订阅 Stream
  var subscription = stream.listen(
    (data) {
      // 每当有新数据到达时调用
      print('RUNOOB 收到数据: $data');
    },
    onError: (error) {
      // Stream 发生错误时调用
      print('错误: $error');
    },
    onDone: () {
      // Stream 关闭时调用
      print('Stream 已关闭');
    },
  );

  // 5 秒后取消订阅
  Future.delayed(Duration(seconds: 5), () {
    subscription.cancel();
    print('已取消订阅');
  });
}
开始监听 Stream...
RUNOOB 收到数据: 1
RUNOOB 收到数据: 2
RUNOOB 收到数据: 3
RUNOOB 收到数据: 4
RUNOOB 收到数据: 5
已取消订阅

listen() 返回一个 StreamSubscription 对象,你可以用它来控制订阅:

方法作用
subscription.pause()暂停接收数据
subscription.resume()恢复接收数据
subscription.cancel()取消订阅
subscription.isPaused是否处于暂停状态

await for 监听

除了 listen() 回调方式,还可以使用 await for 循环来消费 Stream。

await for 让 Stream 的处理逻辑像普通的 for 循环一样清晰。

实例

// 生成一个有限的数据流
Stream<int> countStream(int max) async* {
  for (int i = 1; i <= max; i++) {
    await Future.delayed(Duration(milliseconds: 300));
    yield i;  // yield 发射数据到 Stream
  }
}

Future<void> main() async {
  print('开始用 await for 消费 Stream...');

  // await for:等待每个数据到达,逐个处理
  await for (var value in countStream(5)) {
    print('RUNOOB 计数: $value');
  }

  print('Stream 消费完毕');
}
开始用 await for 消费 Stream...
RUNOOB 计数: 1
RUNOOB 计数: 2
RUNOOB 计数: 3
RUNOOB 计数: 4
RUNOOB 计数: 5
Stream 消费完毕

await for 的特点:

  • 和普通 for 循环写法相似,但每次迭代会等待下一个数据到达
  • 当 Stream 关闭时,循环自动结束
  • 只能在 async 函数中使用
  • 如果需要中途退出,使用 break(和普通 for 循环一样)

await for 适合"需要顺序处理所有数据"的场景,如读取文件所有行。listen() 适合"需要持续响应事件"的场景,如按钮点击。两者可以互相替代,但各有擅长的领域。


StreamController 创建 Stream

StreamController 是手动创建和控制 Stream 的工具。

你可以随时向其中添加数据、错误,或者关闭它。

实例

import 'dart:async';

// 使用 StreamController 实现一个简单的倒计时器
class CountdownTimer {
  final StreamController<int> _controller = StreamController<int>();
  Timer? _timer;
  int _remaining = 0;

  // 暴露 Stream 给外部订阅
  Stream<int> get tickStream => _controller.stream;

  // 开始倒计时
  void start(int seconds) {
    _remaining = seconds;

    // 立即发送初始值
    _controller.add(_remaining);

    _timer = Timer.periodic(Duration(seconds: 1), (timer) {
      _remaining--;

      if (_remaining > 0) {
        _controller.add(_remaining);  // 发送数据
      } else {
        _controller.add(0);           // 发送最后的 0
        _controller.close();          // 关闭 Stream
        timer.cancel();
      }
    });
  }

  // 取消倒计时
  void cancel() {
    _timer?.cancel();
    _controller.addError('倒计时被取消');  // 发送错误
    _controller.close();
  }

  // 释放资源
  void dispose() {
    _controller.close();
  }
}

Future<void> main() async {
  var timer = CountdownTimer();

  // 订阅倒计时事件
  timer.tickStream.listen(
    (remaining) {
      print('RUNOOB 倒计时: $remaining 秒');
    },
    onError: (error) {
      print('错误: $error');
    },
    onDone: () {
      print('倒计时结束!');
    },
  );

  timer.start(5);

  // 等待倒计时完成
  await Future.delayed(Duration(seconds: 6));
  timer.dispose();
}
RUNOOB 倒计时: 5 秒
RUNOOB 倒计时: 4 秒
RUNOOB 倒计时: 3 秒
RUNOOB 倒计时: 2 秒
RUNOOB 倒计时: 1 秒
RUNOOB 倒计时: 0 秒
倒计时结束!

async* 生成器函数

如果你只需要简单地生成一个数据序列,async* 比 StreamController 更方便。

实例

// async* 标记这是一个异步生成器函数
// 返回类型必须是 Stream
Stream<String> readLinesAsync() async* {
  var lines = ['第一行', '第二行', '第三行', 'RUNOOB'];

  for (var line in lines) {
    await Future.delayed(Duration(milliseconds: 500));
    yield line;  // yield 发射数据到 Stream
  }
  // 函数结束时 Stream 自动关闭
}

// 带错误处理的异步生成器
Stream<int> generateNumbersWithError() async* {
  for (int i = 1; i <= 5; i++) {
    await Future.delayed(Duration(milliseconds: 300));

    if (i == 3) {
      throw Exception('数字 3 出错了!');
    }
    yield i;
  }
}

Future<void> main() async {
  print('逐行读取:');
  await for (var line in readLinesAsync()) {
    print('  $line');
  }

  print('\n带错误的生成器:');
  try {
    await for (var num in generateNumbersWithError()) {
      print('  数字: $num');
    }
  } catch (e) {
    print('  捕获错误: $e');
  }
}
逐行读取:
  第一行
  第二行
  第三行
  RUNOOB

带错误的生成器:
  数字: 1
  数字: 2
  捕获错误: Exception: 数字 3 出错了!

单订阅 vs 广播流

Dart 的 Stream 分为两种类型:单订阅流(Single-subscription)和广播流(Broadcast)。

单订阅流(Single-subscription Stream)

这是默认的 Stream 类型。

它只能被一个监听者订阅,适合"从头到尾消费一次"的场景。

实例

void main() {
  // 单订阅流(默认)
  var stream = Stream.fromIterable([1, 2, 3]);

  // 第一个订阅:OK
  stream.listen((data) => print('订阅者1: $data'));

  // 第二个订阅:错误!单订阅流不能被多次订阅
  // stream.listen((data) => print('订阅者2: $data'));  // 运行时错误
}

广播流(Broadcast Stream)

广播流允许多个监听者同时订阅,适合事件广播场景。

实例

import 'dart:async';

void main() {
  // 创建一个广播流
  var controller = StreamController<int>.broadcast();

  // 多个订阅者可以同时监听
  controller.stream.listen(
    (data) => print('RUNOOB 订阅者A: 收到 $data'),
  );

  controller.stream.listen(
    (data) => print('RUNOOB 订阅者B: 收到 $data'),
  );

  // 发送数据,两个订阅者都会收到
  controller.add(1);
  controller.add(2);
  controller.add(3);

  // 延迟订阅也能收到后续数据(但不会收到之前的数据)
  Future.delayed(Duration(seconds: 1), () {
    controller.stream.listen(
      (data) => print('RUNOOB 迟到的订阅者C: 收到 $data'),
    );
    controller.add(4);
    controller.close();
  });
}
RUNOOB 订阅者A: 收到 1
RUNOOB 订阅者B: 收到 1
RUNOOB 订阅者A: 收到 2
RUNOOB 订阅者B: 收到 2
RUNOOB 订阅者A: 收到 3
RUNOOB 订阅者B: 收到 3
RUNOOB 订阅者A: 收到 4
RUNOOB 订阅者B: 收到 4
RUNOOB 迟到的订阅者C: 收到 4

两种 Stream 的对比:

特性单订阅流广播流
订阅者数量只能一个可以多个
数据重放从头开始消费只接收订阅后的数据
典型场景文件读取、HTTP 响应按钮点击、状态通知
创建方式StreamController()StreamController.broadcast()

广播流的"迟到订阅者"只能收到订阅之后的事件,之前的事件已经错过了。这是广播流和单订阅流最重要的行为差异。


Stream 常用转换方法

Stream 提供了一系列方法来转换和处理数据,类似于 List 的函数式方法。

实例

void main() async {
  var numbers = Stream.fromIterable([1, 2, 3, 4, 5, 6]);

  // map:转换每个数据
  var doubled = numbers.map((n) => n * 2);
  print('翻倍:');
  await for (var n in doubled) {
    print('  $n');
  }

  // where:过滤数据
  var source = Stream.fromIterable([10, 15, 20, 25, 30]);
  var even = source.where((n) => n % 2 == 0);
  print('偶数:');
  await for (var n in even) {
    print('  $n');
  }

  // take:只取前 N 个
  var infinite = Stream.periodic(
    Duration(milliseconds: 100),
    (i) => i + 1,
  );
  print('只取前 3 个:');
  await for (var n in infinite.take(3)) {
    print('  RUNOOB: $n');
  }

  // skip:跳过前 N 个
  var data = Stream.fromIterable([1, 2, 3, 4, 5]);
  print('跳过前 2 个:');
  await for (var n in data.skip(2)) {
    print('  $n');
  }

  // distinct:去重
  var duplicates = Stream.fromIterable([1, 2, 2, 3, 3, 3]);
  print('去重:');
  await for (var n in duplicates.distinct()) {
    print('  $n');
  }
}
翻倍:
  2
  4
  6
  8
  10
  12
偶数:
  10
  20
  30
只取前 3 个:
  RUNOOB: 1
  RUNOOB: 2
  RUNOOB: 3
跳过前 2 个:
  3
  4
  5
去重:
  1
  2
  3