Eric Windmill

Code Cartoon: Streams in Dart


This is an excerpt from the book Flutter in Action, published by Manning.

You won’t get very far into any Dart code before you have to start dealing with streams. Dart programming makes heavy use of streams, an async pattern known in many languages as “observables”. In Dart, this reactive style of programming is a first class citizen. In Flutter, Streams are necessary if you plan on taking advantage of all it’s features, including the StreamBuilder class.

Futures, completers, “async for”, streams, sinks and the listen keyword are all related, because they all fit under the “async” umbrella. Some (if not all) of these classes and features are built on top of each other, with Future being the basic building block.

In this post, I’ll start by briefly explaining the class Future, and then focus on streams in-depth. I’ll also explain some convenience features, like async/await and async for towards the end. This will all be done using very poorly drawn cartoons of a hamburger restaurant.

Futures

Imagine you’re visiting a hamburger restaurant. In the restaurant, you order your burger at the counter and get a receipt. Futures are a lot like that receipt. You, the burger order-er, tell the server that you’d like to buy a burger. The server gives you a receipt that guarantees you’ll get a burger as soon as one is ready.

So, you go wait until the server calls your number, and then delivers on the guarantee of a burger. The receipt is the Future. The receipt is your proof of purchase; a symbol that proves you are waiting for a burger. It’s a guarantee that a value will exist, but it isn’t quite ready. The burger is the value, not the future.

In code, a future is a placeholder for a value that will exist. The most common scenario for using futures is when you’re getting values over the network. In UI specifically, you could pass a Future<List<String>> into a list of items to display to the user. But, you may need to go fetch that list from an outside API over http. So you say “hey, UI, show a loading sign until this Future completes, but know that a list of strings for you to display is coming eventually.”

future comic

Futures are thenable (that is then-able), so when you call a future, you can always say myFutureMethod().then((returnValue) => ... do some code ... );

Future.then takes a callback, which will be executed when the future value resolves. In the burger restaurant, the callback is what you decide to do with the burger when you get it. The value passed into the callback is whatever the return value of the original future is. In sudo-code, using then would look like this: orderBurger().then(eatBurger());

void main() {
   print("A");
   /// `Duration` is a dart class that defines a _duration of time_. 
   /// In this case, it represents 1 millisecond.
   futurePrint(Duration(milliseconds: 1), "B").then((status) => print(status)); 
   print("C");
   futurePrint(Duration(milliseconds: 2), "D").then((status) => print(status));
   print("E");
 }

  /// All Future's are used by calling the callback that you give to "then" when the callback
  /// passed into the future completes. In this example, `Future.delayed` is a special
  /// constructor that just starts a timer (you tell it how long), and then when that timer
  /// finishes it calls it's callback. 
 Future futurePrint(Duration dur, String msg) {
   return Future.delayed(dur).then((onValue) => msg);
 }

// prints
A
C
E
B
D

async / await

The keywords async and await are the easiest way to wrap your head around async programming (in my opinion). In a nutshell, you can mark any function as async, and then tell that function to await async processes to finish before moving on. It’s basically like saying “Hey function, if you see the word await anywhere, just pause right there and wait for it to finish before moving to the next line.

Look at what happens when we re-use the example above but add a couple await keywords in the mix:

/// Mark the function as asynchronous with the `async` keyword.
void main() async {
   print("A");
   /// Mark the line (which should be a future) you want to pause on with `await`.
   await futurePrint(Duration(milliseconds: 1), "B").then((status) => print(status));
   print("C");
   await futurePrint(Duration(milliseconds: 2), "D").then((status) => print(status));
   print("E");
 }

 Future futrePrint(Duration dur, String msg) {
   return Future.delayed(dur).then((onValue) => msg);
 }

// prints the letters in alphabetical order
A 
B
C
D
E

The point here is that you canmake async code… synchronous. That sounds strange, to be sure, but it’s often handy. This example is contrived. Because you could get the same effect by simply writing:

void main() {
   print("A");
   print("B");
   print("C");
   print("D");
   print("E");
 }

But, this only works because in our example we know exactly what the async code will do: print letters. But, in real life, you likely are waiting for some information from an HTTP call, and you want to “pause” the code until that HTTP call completes.

/// You want the code to "pause" until the HTTP call finishes, or there will be no data 
/// in the variable `user` to print.
void main() async {
    var user = await Http.get("http://my-database.com/user/1");
    print(user);
 }

Catching errors with Futures

You cancan catch errors with async code in two ways, so you can handle errors or failed network calls gracefully. With plain ol’ future types, there’s a method called catchError.

 Future futrePrint(Duration dur, String msg) {
   return Future.delayed(dur).then((onValue) => msg);
 }

 main() {
    /// If theres an error in this async code, the error will 
    /// print rather than crashing your app.
    futrePrint(Duration(milliseconds: 2), "D")
          .then((status) => print(status))
          .catchError((err) => print(err)); 
 }

Catching errors with try and catch

You can also use try / catch blocks, especially useful with async / await.

void main() async {
    try {
      /// If anything fails in the entire `try { }` block, 
      /// then the catch block will run.
      print("A");
      await futrePrint(Duration(milliseconds: 1), "B")
        .then((status) => print(status));
      print("C");
      await futrePrint(Duration(milliseconds: 2), "D")
        .then((status) => print(status)).catchError((err) => print);
      print("E");
    } catch(err) {
      /// The catch bloc won't run if there are no errors. 
      /// This is extremely useful in asynchronous programming
    print("Err!! -- $err");
  }
}

Thus far, everything I’ve covered in this post is a primer. The real meat of whats important is working with streams. In general, you should remember a few concepts:

  1. Futures are the base of asynchronous programming in Dart.
  2. You can use Future.then or async / await to control if code should pause to wait for async code, or if it should simply keep going and worry about the Future when it’s done.
  3. onError and try / catch are both used to handle errors in async Dart.

Sinks and Streams (and StreamControllers)

Streams are a big part of Dart programming. One of the biggest concerns in building UI’s, it seems, is how to handle data asynchronously, such as rendering a list of information even though you need to fetch the information with an http request first. Streams are the way the Flutter team has decided to handle rendering data from the internet.

Streams are an asynchronous programming pattern, often called observables in other languages. From a high-level, streams provide a way for classes or objects to be notified when events happen.

NB: Stream is actually a specific class, and only one piece of the observable pattern. In general, though, “stream” is the word used to describe the concept as a whole.

The observer pattern reflects real life more than any other architecture pattern around. Throughout the day, how often are you given an update by email, from apps, or from real life conversations, and react to it? (This is a hint as to why it’s called reactive programming.)

In these real-life examples, you aren’t being proactive. In many cases, you send an email as a response to an email you’ve received. You’re reacting to an email. This means that you don’t have to worry about constantly asking all your colleagues, “Hey, should I send you an email?“. Rather, you don’t even think about it until you have to.

In code, the observer pattern is just that. There’ll be one object that’s passively waiting for to be notified by another object. Whenever it is updated, it takes appropriate action.

To expand on the burger future example, an subscriber might be the cook that’s actually making the burgers. That cooks job is completely reactive. She know’s how to make a burger when one is ordered, but she doesn’t actively seek out burger-eaters to make burgers for them. She just sits in the kitchen, waiting to be told what to cook.

The active part of this relationship is handled by the server behind the register. This server is given orders from customers, and then he turns around and passes (or emits) that information to the cook.

stream comic

There are three pieces of the observer pattern:

  • Sinks

  • Streams

  • Subscribers (also called: listeners, observers)

  • Sinks are the first stop for events in the observer pattern. A sink is the piece of the puzzle that you feed data into. It’s kind of the ‘central source’ for the whole process. In dart, a Sink is an abstract class that more specific types of sinks implement. The most common of which is the StreamController.

  • Streams are properties on the Sink. When the sink needs to notify listeners of new events, it does so via streams.

  • subscribers are the external classes or objects that are waiting to be notified. This is done by “listening” to streams.

Implementing streams

This is basically the boiler plate you’d need for any stream (using the burger example).

class BurgerStand {
  /// A stream controller is an implementation of a Sink, 
  /// with extra functionality. It has a `stream` property.
  StreamController _controller = new StreamController();
  /// This getter exposes the stream, which other objects can listen to.
  Stream get onNewOrder => _controller.stream; 
  
  /// This cook class is just to make the example clearer.
  /// A burger stand would have Cooks in OOP.
  Cook cook = new Cook();

  /// This method needs to be called once, 
  /// which will open up the conversation between the stream and the cook.
  void deliverOrderToCook() {
    /// `listen` is the simplest method to listen to streams. Whenever there's 
    /// a new value added to the sink this callback will be called.
    onNewOrder.listen((newOrder) {
      cook.prepareOrder(newOrder);
    });
  }

  void newOrder(String order) {
    /// StreamController.add(value) (or Sink) is the method used to tell 
    /// the controller about a new event / data. 
    /// It will start the process of delivering new information 
    /// to a subscriber (aka listener).
    _controller.add(order);
  }
}

class Cook {
  void prepareOrder(newOrder) {
    print("preparing $newOrder");
  }
}

main() {
  var burgerStand = new BurgerStand();
  burgerStand.deliverOrderToCook();

  burgerStand.newOrder("Burger");
  burgerStand.newOrder("Fries");
  burgerStand.newOrder("Fries, Animal Style");
  burgerStand.newOrder("Chicken nugs");
}

stream comic annotated

Broadcast streams

In our above burger code, the cook is probably getting pretty flustered. He’s working the burger station, the fries station and the chicken nugget stand. That’s too many jobs. This burger place probably needs to a hire a second cook, and split the responsibility.

In the code, that means there will be two cooks listening to the server, and reacting based on the information. But sinks, be default, can only be listened to once. Which is where the broadcast streams come in. StreamController.broadcast() is a constructor that returns a controller that can be listened to by multiple subscribers.

broadcast stream comic

I updated the burger example a bit to showcase this. The changes have been annotated.

class Cook {
  void prepareOrder(newOrder) {
    print("preparing $newOrder");
  }
}

/// First, I made three classes that will let us be sure the
/// right type of data is being passed into the controller,
/// but also let us determine what sub-type of `Order` it is.
class Order {}
class Burger extends Order {}
class Fries extends Order {}


class BurgerStand {
  StreamController<Order> _controller = new StreamController.broadcast();
  /// Now, there are two cooks, one for each cooking device.
  Cook grillCook = new Cook();
  Cook fryCook = new Cook();

  /// This is where the interesting part starts. Both of these getters
  /// are listening to the same stream, but only emitting the 
  /// events that receive the correct types. 
  /// It's explained in more detail below.
  Stream get onNewBurgerOrder => 
      _controller.stream.where((Order order) => (order is Burger));
  Stream get onNewFryOrder =>
      _controller.stream.where((Order order) => (order is Fries));

  /// In this method, there are now _two_ listeners on the same
  /// stream, but will only pass the event to the correct cook.
  void deliverOrderToCook() {
    onNewBurgerOrder.listen((newOrder) {
      grillCook.prepareOrder(newOrder);
    });

    onNewFryOrder.listen((newOrder) {
      fryCook.prepareOrder(newOrder);
    });
  }

  void newOrder(Order order) {
    _controller.add(order);
  }
}

main() {
  var burgerStand = new BurgerStand();
  burgerStand.deliverOrderToCook();
  
  /// Now, the `newOrder` method expects an `Order` type (or subtype).
  burgerStand.newOrder(new Burger()); 
  burgerStand.newOrder(new Fries());
}

The important part of all this is that both of the getters are referring the same stream. So, when the two “listeners” are created in the deliverOrderToCook method, they’re being called on two different references to the stream (onNewBurgerOrder and onNewFryOrder), but they’re listening to the same stream. This wouldn’t be possible on a standard stream, only broadcast streams.

broadcast stream comic

Also, it’s worth noting that where is one many methods on lists that are used to perform some action on each element in the list. More common methods of the same type are forEach and map. List.where basically filters out any element in the list if the callback argument doesn’t return true.

In this case, onNewBurgerOrder is saying “I only care about the elements of this stream where the type of the element is Burger. It works because both Burger and Fries subclass Order.

Higher Order Streams

Since streams are so common in Dart and Flutter, it’s not long until you’ll want to get a stream of data, but perform an action on every new piece of data from emitted from the stream. A stream itself returns a stream is called a higher order stream. (Similarly, higher order functions are functions that return new functions. That’s where the name comes from.)

Consider the hamburger shop again. Everything is ordered via meal numbers, like this:

|==== | 1: | Burger, Drink | 2: | Cheeseburger, Drink | 3: | etc… |====

But they serve roughly 10,000 items. Oh and, the cooks are actually robots that only understand binary. Which means the human server needs to emit information in a stream, but that stream needs to be fed into a translator, which will manipulate the data and then output a new stream with the same information in binary.

ho stream comic

That might look like this in the code:

class BurgerStand {
  Cook cook = new Cook();
  StreamController _controller = new StreamController.broadcast();
  Stream get onNewBurgerOrder => _controller.stream;

  void turnOnTranslator() {
    /// The `Stream.transform` method accepts a `StreamTransformer`,
    /// which changes data based on logic you provide.
    /// `Stream.transform` itself emits a stream, and can be listened to.
    onNewBurgerOrder
        .transform(new BeepBoopTranslator())
        .listen((data) => cook.prepareOrder(data));
  }

  void newOrder(int orderNum) {
    /// Add data to the `_controller` like in the last example. 
    /// It still has the same entry point.
    _controller.add(orderNum);
  }
}

And the StreamTransformer looks like this:

/// Extend `StreamTransformerBase`, which wraps a transformer and it's functionality.
class BeepBoopTranslator<S, T> extends StreamTransformerBase<S, T> {
  /// This class should have a `StreamTransformer`
  final StreamTransformer<S, T> transformer;

  /// Initialize the `transformer` with this static method on create.
  BeepBoopTranslator() : transformer = createTranslator();

  
  /// This is a required override, because `StreamTransformer` 
  /// will call it internally. I only needs to call it's own transformers "bind".
  Stream<T> bind(Stream<S> stream) => transformer.bind(stream);
  
  /// This method is where the goods are.
  /// A new `StreamTransformer` takes a callback, which it will
  /// automatically pass it's `inputStream` 
  /// (the base stream being transformed) 
  /// and that streams `cancelOnError` property.
  static StreamTransformer<S, T> createTranslator<S, T>() =>
      new StreamTransformer<S, T>((Stream inputStream, bool cancelOnError) {
        StreamController controller;
        StreamSubscription subscription;

        /// Within this callback, you create a new, inner `StreamController`.
        /// This controllers stream will be returned _after_ the data is transformed.
        controller = new StreamController<T>(
          /// On listen, you can take the base input stream (`inputStream`), 
          /// listen to _that_, and use it's callback to manipulate data,
          /// and then emit it to the _new_ controller. That is complicated. 
          /// In the plainest language, the stream controller (when listened to), 
          /// is turning around, listening to the original stream, using it's 
          /// callback to transform data, and then emitting that new data.
          onListen: () { 
            subscription = inputStream.listen(
                (data) => controller.add(binaryNum(data)),
                onDone: controller.close,
                onError: controller.addError,
                cancelOnError: cancelOnError);
          },
          onPause: ([Future<dynamic> resumeSignal]) =>
              subscription.pause(resumeSignal),
          onResume: () => subscription.resume(),
          onCancel: () => subscription.cancel(),
        );

        /// Finally, return the listen function on the new stream.
        return controller.stream.listen(null);
      });

  static int binaryNum(int tenBased) {
     // convert num into binary
  }
}

I realize that that code may have made it harder, but passing around and manipulating streams is hard. Streams are something that you just have to get used to by seeing them repeatedly.

Infrequent updates about Dart and Flutter:

Sign up for my mailing list to receive new articles, mainly about Dart and Flutter, and other programming technologies.