Stream

Asynchronous programming in Dart is characterized by the Future and Stream classes.

Future represents a single value (either an error or data) that it delivers asynchronously. Streams work similarly, but instead of a single thing, a stream can deliver zero or more values and errors over time.

A stream is a sequence of asynchronous events. It is like an asynchronous Iterable—where, instead of getting the next event when you ask for it, the stream tells you that there is an event when it is ready.

You can think Stream as a flow of data, where you sink the data on one side into the stream, and the data flows with the stream and available on the other side of the flow. Real application like video streaming or opening a large file uses Stream. For example, when you ask for a video from a Get/Post request, the video doesn't develivered at once, instead a chunk of the video is loaded, and then next chunk of the video data loaded, and it keeps continuing that way until the full video is loaded into the client side. This is called video streaming. Another application of Stream is opening a large file. Instead of getting all the content of the file at once, your code should get data in chunks so that it will be easier to load, and perform other tasks.

Receiving stream events: await for

The asynchronous for loop (commonly just called await for) iterates over the events of a stream.

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

Here in the above example, we have created a function sumStream and it takes a stream object. Through this stream we will keep getting events.

The function uses await for to iterate over a stream events. When an event arrives, the loop body executes, and then the execution pauses and waits for another event to occur. The await for statements keep iterating its body this way until the stream is done. When the stream is done the loop body ends and the function returns the result.

The function is marked as async and it is important when you are using await for loop. Because the function doesn't end immediately.

Here is the full example:

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

The function countStream creates a stream. We will learn how to create stream in next section.

Creating Stream: Using Asynchronous Generator

You can creating stream just by defining a function that returns a Stream object. It is also necessary to mark the function body as async*. This is called asynchronous generator.

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

In the above example, we have a function countStream that returns Stream<int>. The <int> after the Stream specifies that the data from the stream will be of type integer.

Note that the above function doesn't create the stream yet. It is just a function definition. You need to call this method in order to create the stream.

var stream = countStream(10);

The call to countStream function returns a Stream object.

Another important thing to note that when we create a Stream object, it doesn't start execute the function body immediately until you attach a listener to that stream. In order to stream the events, the stream object must have a listener attached to it. The following statement creates a listener for the stream:

var sum = await sumStream(stream);

The statement await for inside sumStream function creates a listener for that stream.The above statement cuases the stream to run the stream function body.

The stream closes automatically when the function countStream terminates.

yield

The yield statement adds the value to the stream. It is like a return statement but it doesn't end the loop or function.

Creating Stream: StreamController

Stream using asynchronous generator function is useful when you have a single data source, a single function engouh to handle the single data source. But what if you want to stream the data from different parts of your program and deals with different data source. StreamController class can be used in such situation.

StreamController<double> controller = StreamController<double>();

It creates a stream controller that you can use to manage the stream. To access the stream object through controller use the stream property:

Stream stream = controller.stream;

Subscribing to the Stream

You can use await for construct to iterate over the stream. There is an alternative way you can attach a listener to the stream. This is called subscribing to the stream. You are allowed to attach only one listener to the stream. Attaching more than one listener to the stream will give you exception.

stream.listen((value) {
  print('Value from controller: $value');
});

The listen method takes a callback and the callback receives data as the parameter, which is value in the above example. Whenever there is an even ready into the stream, the callback is called.

The method listen returns StreamSubscription object, which you can store in a variable so that it will be useful to cancel the subscription in later time.

StreamSubscription<double> streamSubscription = stream.listen((value) {
  print('Value from controller: $value');
});

Here’s the full definition for the listen() method:

StreamSubscription<T> listen (void onData(T event), {
    Function onError,
    void onDone(), // Invoked when the stream is closed
    bool cancelOnError // Kills the stream when an error occurs
  }
);

The listen method takes a callback as the first and required parameter, and that callback receives event which is the data. But there are few named parameters you can use. For example, to handle error you can provide a callback to onError parameter, and you can also provide a callback to onDone parameter that will be executed when the stream is closed.

stream.listen((value) {
    print('Value from controller: $value');
  },
  onError: (e) => print(e),
  onDone: () => print("Finished"),
);

Cancel a Subscription

To cancel the subscription call cancel method on StreamSubscription object.

StreamSubscription<double> streamSubscription = stream.listen((value) {
  print('Value from controller: $value');
});
streamSubscription.cancel();

It is always good practice to cancel the subscription when the stream closes and there will be no event in the stream to listen to.

Emit / Add a value onto the stream

To add a value to the stream you can use add method of controller object.

controller.add(12);

The above statement will produce the followin output:

Value from controller: 12

Sinking Data: EventSink

Another alternative way to add data to the stream using the sink property of the StreamController object.

streamController.sink.add(12);

The sink property is an object of class EventSink<T>.

Error Handling

You can provide a callback to listen method with onError parameter to handle error. To handle error in await for construct, use try-catch statement.

try {
  await for (var num in stream) {
    print(num);
  }
} catch (e) {
  print(e);
}

When an exception occurs, the await for loop doesn't wait for next event in the stream. The loop terminates.

Here is an example:

Stream<int> createNumberStreamWithException(int last) async* {
  for (int i = 1; i <= last; i++) {
    if (i == 5) {
      throw new Exception("Demo exception when accessing 5th number");
    }
    yield i; //to be able to send spaced out events
  }
}

void handlingExceptionUsingAwaitFor() async {
  var stream = createNumberStreamWithException(15);
  try {
    await for (var num in stream) {
      print(num);
    }
  } catch (e) {
    print(e);
  }
  print("Finished");
}

void main() {
  handlingExceptionUsingAwaitFor();
}

And the output will be:

1
2
3
4
Exception: Demo exception when accessing 5th number
Finished