Using RxDart with Bloc - Part 2

This is 2nd part of 2 series post on using RxDart with the Bloc library. In 1st part, we covered using RxDart to transform events and states inside Bloc for specific use cases. In this post, we will see how we can handle the stream of data and convert it to a stream of states. Let's get started! 🚀

Futures vs Streams in Bloc

When getting some data from API/DB, our Bloc usually talks to some layer like service or repository, which returns one-time async data using Future. It is how we typically consume Future and convert it to states.

  1. Show some loading indicators.
  2. Fetch data from your service/repository.
  3. Based on the result from the previous step, show a success/error state.
Stream<TodoState> mapEventToState(TodoEvent event) async* {
  if (event is GetTodosEvent) {
    try {
      // Show loading indicator
      yield TodoState.loading();

      // Get the data from service
      final todos = await _service.getTodos();

      // Show data in UI
      yield TodoState.success(todos);
    } on GetTodosException catch (e) {
      yield TodoState.error(e.errorMessage);
    } catch (e) {
      yield TodoState.error(genericErrorMessage);
    }
  }
}

However, sometimes we receive a continuous stream of data.

E.x - Firebase Realtime DB data, WebSockets, etc.

We can represent this continuous data as a Stream in Dart. Can we write similar logic when dealing with a Stream of data(Todos) inside our mapEventToState method 🤔

Let's find out

StreamSubscription _todosSubscription;
...
...
Stream<TodoState> mapEventToState(TodoEvent event) async* {
  if (event is GetTodosEvent) {
    _todosSubscription?.cancel();
    _todosSubscription = _service.todos().listen(
      (todos) {
        yield TodoState.success(todos);
      },
      onError: (e, _) {
        yield TodoState.error(e.errorMessage);
      },
    );
  }
}

Wait! That is not a valid code. You cannot yield a state from inside that listen() method. Because we can only yield from inside the async generator, and listen() isn't an async generator. What do we do then 🤔

Quick Google-Fu told me we could emit additional events inside our listen and onError methods and then listen to those events inside the mapEventToState method to yield states. What?? 😅

Something like:

StreamSubscription _todosSubscription;
...
...
Stream<TodoState> mapEventToState(TodoEvent event) async* {
  if (event is GetTodosEvent) {
    _todosSubscription?.cancel();
    _todosSubscription = _service.todos().listen(
      (todos) {
        add(TodosLoadedEvent(todos));
      },
      onError: (e, _) {
        add(TodosErrorEvent(e));
      },
    );
  } else if (event is TodosLoadedEvent) {
    yield TodoState.success(todos);
  } else if (event is TodosErrorEvent) {
    yield TodoState.error(event.errorMessage);
  }
}

We can make it more readable by using yield* and separating different _mapEventToState methods.

StreamSubscription _todosSubscription;
...
...
Stream<TodoState> mapEventToState(TodoEvent event) async* {
  if (event is GetTodosEvent) {
    yield* _mapGetTodosEventToState(event);
  } else if (event is TodosLoadedEvent) {
    yield* _mapTodosLoadedEventToState(event);
  } else if (event is TodosErrorEvent) {
    yield* _mapTodosErrorEventToState(event);
  }
}

Stream<TodoState> _mapGetTodosEventToState(GetTodosEvent event) {
  _todosSubscription?.cancel();
  _todosSubscription = _service.todos().listen(
    (todos) {
      add(TodosLoadedEvent(todos));
    },
    onError: (e, _) {
      add(TodosErrorEvent(e));
    },
  );
}

Stream<TodoState> _mapTodosLoadedEventToState(TodosLoadedEvent event) async* {
  yield TodoState.success(todos);
}

Stream<TodoState> _mapTodosErrorEventToState(TodosErrorEvent event) async* {
  yield TodoState.error(event.errorMessage);
}

However, it feels wrong due to multiple reasons.

  • Additional events like TodosLoadedEvent and TodosErrorEvent need to be created to convert incoming data from our stream into states.
  • Events are not linked to any UI action. It will quickly become a source of confusion for future maintainers.
  • Too many hoops to jump to understand the flow.

Is there any other way? Of course, it is. Otherwise, what was the purpose of this blog post 😜

Stream and Async Generator

Before we discuss the new approach, let us first understand how streams and async generators work.

// ...
void main(List<String> arguments) async {
  final _intController = StreamController<int>.broadcast();

  // ...
  // Removed for brevity
  _intController.add(10);
  Future.delayed(Duration(seconds: 1), () {
    _intController.add(11);
  });
  //...
}
// ...
// ...

In the above snippet, we have a StreamController of type int. It emits two ints - 10 and 11(after a 1s delay).

import 'dart:async';
import 'package:rxdart/rxdart.dart';

void main(List<String> arguments) async {
  final _intController = StreamController<int>.broadcast();

  final _subscription =
      _intController.stream.asyncExpand(_mapEventToState).listen(
    (value) {
      print('listen : $value');
    },
    onError: (e) {
      print('onError: ${e.message}');
    },
  );

  //...
  //...
}

Stream<int> _mapEventToState(int to) async* {
  if (to <= 10) {
    for (var i = 1; i <= to; i++) {
      yield i;
    }
  } else {
    yield* RangeStream(to, to + 9); // It is endInclusive
  }
}

We listen to the stream emitted from this StreamController and apply _mapEventToState() method to the result received before printing out the final value to the console output.

It is similar to how Bloc converts our Stream<Event> to Stream<State> using the logic we write inside the mapEventToState method. Though this is an approximate implementation inside the bloc, you get the gist. So, let's now focus on the mapEventToState method here.

Stream<int> _mapEventToState(int to) async* {
  if (to <= 10) {
    for (var i = 1; i <= to; i++) {
      yield i;
    }
  } else {
    yield* RangeStream(to, to + 10);
  }
}

It is an async generator that converts one Stream<int> into another Stream<int> based on the input parameter.

If to <= 10, it goes through for loop and emits int's one by one from 1 to 10 using the yield keyword. It is similar to how we yield our states inside the mapEventToState method inside Bloc.

If to > 10, it will tell RangeStream from RxDart, which is another stream emitting a sequence of Integers within a specified range.

Console ouput of the code would be:

listen : 1
listen : 2
listen : 3
listen : 4
listen : 5
listen : 6
listen : 7
listen : 8
listen : 9
listen : 10
listen : 11
listen : 12
listen : 13
listen : 14
listen : 15
listen : 16
listen : 17
listen : 18
listen : 19
listen : 20
Exited

I've added below a complete example we discussed for reference:

import 'dart:async';
import 'package:rxdart/rxdart.dart';

void main(List<String> arguments) async {
  final _intController = StreamController<int>.broadcast();

  final _subscription =
      _intController.stream.asyncExpand(_mapEventToState).listen(
    (value) {
      print('listen : $value');
    },
    onError: (e) {
      print('onError: ${e.message}');
    },
  );

  _intController.add(10);
  Future.delayed(Duration(seconds: 1), () {
    _intController.add(11);
  });
}

Stream<int> _mapEventToState(int to) async* {
  if (to <= 10) {
    for (var i = 1; i <= to; i++) {
      yield i;
    }
  } else {
    yield* RangeStream(to, to + 9); // It is endInclusive
  }
}

Final Solution

Stream<int> _mapEventToState(int to) async* {
  if (to <= 10) {
    for (var i = 1; i <= to; i++) {
      yield i;
    }
  } else {
    yield* RangeStream(to, to + 9); // It is endInclusive
  }
}

Using yield* on the source of the stream directly can work without creating adding additional events, as done earlier. Let's try that out on the original example. Using the relation between the async generator and the stream we learned above, we can do the following:


Stream<TodoState> mapEventToState(TodoEvent event) async* {
  if (event is GetTodosEvent) {
    yield* _service.todos();
  }
}

We need to convert Stream<Todos> to Stream<TodoState> for our UI to render. We can do that by using map and a couple of other RxDart lib operators like onErrorReturnWith and startWith.


Stream<TodoState> mapEventToState(TodoEvent event) async* {
  if (event is GetTodosEvent) {
    yield* _service
        .todos()
        .map<TodoState>((todos) => TodoState.success(todos))
        .onErrorReturnWith((err) => TodoState.error(err.message))
        .startWith(TodoState.loading());
  }
}
  • map => converts list of todos to TodoState.success
  • onErrorReturnWith => converts errors emitted from todos stream to TodoState.error
  • startWith => called when a stream is subscribed first time on the call of GetTodosEvent to emit TodoState.loading.

In this way, we're connecting the entire stream chain inside the bloc like the following:

// This is how it connects inside bloc
events.asyncExpand((event){
  if (event is GetTodosEvent) {
    yield* _service
        .todos()
        .map<TodoState>((todos) => TodoState.success(todos))
        .onErrorReturnWith((err) => TodoState.error(err.message))
        .startWith(TodoState.loading());
  }
}).listen((transition) {
  // bloc emits transitions and state here internally
  // which "BlocBuilder" and "BlocConsumer" listens to
})

I've created sample repo in case you want to play with the small todo example we're talking about in this post. I hope that was useful. Thanks for reading 🙂