Using RxDart with Bloc - Part 1

In this post, we will be looking at how we can use RxDart with Bloc library. I've divided this post into two parts to keep it small. In this part, we will see how to use RxDart to transform your events to states inside a Bloc. In the 2nd part, we will look at how to handle a stream of data coming from your service or repository layer and convert it to a stream of states inside Bloc.

You might not have come across such a use case in your app yet but keep reading if you're curious 🙂

Note: In this post, I won't be talking about using RxDart with a Bloc pattern. There are some excellent posts already on the web you can refer to for that. This post is specifically about using RxDart with the Bloc library.

Why RxDart

Whenever your app has to deal with some continuous stream of data, such as events, states, etc., you can use Stream to represent it. Dart already has good support for Streams built in natively. You can use the dart:async package.

However, sometimes you may need to use some reactive extension methods, which RxDart provides on top of this native stream API to make things easier. We will look at a few examples of using it with the Bloc library.

Bloc Transformations

When you look at the Bloc library, a Bloc is nothing but a box which accepts Stream<Events> as input and emits Stream<States> as output based on the logic written inside mapEventToState method.

However, apart from using mapEventToState, you could override a couple of other methods in our Bloc to control your incoming Stream<Events> and outgoing Stream<States>.

Note: We will only see a few examples below about using transformEvents in this post. Similarly, you could use tranformTransitions to change Transition<Event, State> to control your outgoing state changes.

Transforming Events

If we look at the default implementation of transformEvents method inside Bloc, it looks like the following:

Stream<Transition<Event, State>> transformEvents(
    Stream<Event> events,
    TransitionFunction<Event, State> transitionFn,
  ) {
    return events.asyncExpand(transitionFn);
}

Looking at the line highlighted above, we can see that it applies asyncExpand operator on our Stream<Event>. It ensures all events are processed in the same order they are received by default.

Let's say we have a couple of different use cases in our app where we want to transform these events before they call our mapEventToState method.

Case #1 - Text Search / Form Validation

For the text search feature, we will fire events from UI every time the user inputs something.

add(SearchEvent(input : <textInputTyped>))

With the default behaviour of Bloc(as we just saw above), for each keystroke, our mapEventToState method will be called with SearchEvent consisting of the latest typed input.

No problem with this approach. However, in our scenario, we call REST API every time we receive a new SearchEvent like the one below.


Stream<SearchState> mapEventToState(SearchEvent event) async* {
  try {
    yield SearchState.loading();
    final places = await _placesService.findPlaces(event.input);
    yield SearchState.success(places);
  } catch (e) {
    yield SearchState.error(e.message);
  }
}

It will result in our API getting called every time whenever a user types something in TextField. Shouldn't we search once the user is done typing? 🤔 Can we make the Bloc call our mapEventToState method only after the user has stopped typing?

Yes, that is where the transformEvents method comes into the picture. Using it, we can control our stream of events added to Bloc. By overriding transformEvents inside the Bloc and applying RxDart operators like debounceTime and switchMap to Stream<Event> we can get the intended behaviour.


Stream<Transition<SearchEvent, SearchState>> transformEvents(
    Stream<SearchEvent> events, transitionFn) {
  return events
      .debounceTime(const Duration(milliseconds: 300))
      .switchMap(transitionFn);
}

Let's look at the individual RxDart operators we're applying here.

  • debounceTime
  • switchMap

debounceTime

By applying debounceTime on Stream<Event>, we're debouncing events within 300 ms duration. So, when the user is typing their search input, and 300 ms hasn't passed between 2 different SearchEvent, we don't call our logic in the mapEventToState method. It will help us prevent spamming our REST API with incomplete typed inputs. Makes sense! Why is the switchMap operator required?

switchMap

If you look at the documentation of switchMap in RxDart, applying this operator the newly created Stream will be listened to and begin emitting items, and any previously created Stream will stop emitting

Sorry, what?! 😅 In simple words, Sorry! I mean diagram.

Let's say that the user has typed "De" and is waiting for the results.

call_in_progress

While the above API call is still in progress, if the user decides they need more granular search results, they type one more letter "l", making search input "Del". In that case, we don't want our previous API call results to show up. Only the latest search results with input "Del" should do. The switchMap operator allows us to do exactly that. By applying it, we can ignore previous results and, as a result, states coming from old input.

api_call_cancelled

You might ask why to ignore results from previous input "De" and do not show them in UI.

Valid question! Consider the scenario where the search results for "De" are delayed and received after "Del" from the API, we might end up showing results for "De" instead of "Del". IMO, this results in a poor search experience.


Case #2 - Add To Cart / Remove From Cart

add_or_remove_from_cart

Let's say we've CartBloc to handle adding and removing items in the Cart. We can represent those events like the following:

abstract class CartEvent extends Equatable {
  const CartEvent();

  
  List<Object> get props => [];
}

class AddToCartEvent extends CartEvent {
  final String productId;
  final String cartId;

  AddToCartEvent({
    this.productId,
    this.cartId,
  });

  
  List<Object> get props => [productId, cartId];
}

class RemoveFromCartEvent extends CartEvent {
  final String productId;
  final String cartId;

  RemoveFromCartEvent({
    this.productId,
    this.cartId,
  });

  
  List<Object> get props => [productId, cartId];
}

We can handle those events inside our mapEventToState method like below


Stream<CartState> mapEventToState(CartEvent event) async* {
  if (event is AddToCartEvent) {
    yield* _mapAddToCartEventToState(event);
  } else if (event is RemoveFromCartEvent) {
    yield* _mapRemoveFromCartEventToState(event);
  }
}

Stream<CartState> _mapAddToCartEventToState(AddToCartEvent event) async* {
  try{
    yield CartState.addingToCart(event.productId);

    final itemId = await _cartService.addToCart(event.cartId, event.productId);
    yield CartState.addedToCart(event.productId);
  }
  catch (e) {
    yield CartState.addToCartError(event.productId, e.message);
  }
}

Stream<CartState> _mapRemoveFromCartEventToState(RemoveFromCartEvent event) async* {
  try{
    yield CartState.removingFromCart(event.productId);

    final itemId = await _cartService.removeFromCart(event.cartId, event.productId);
    yield CartState.removedFromCart(event.productId);
  }
  catch (e) {
    yield CartState.removeFromCartError(event.productId, e.message);
  }
}

Good enough? So, what's the problem here? As we saw earlier, the default behaviour of transformEvents will trigger mapEventToState for each AddToCartEvent and RemoveFromCartEvent, which is fine.

However, suppose the user accidentally clicks the "Add/Remove" button for a particular veggie multiple times. In that case, it will result in various add/remove API calls being made for the same veggie unnecessarily. We can have logic in place to disable the "add/remove" button when in addingToCart/removingFromCart states while the API call is in progress to avoid this scenario entirely. But we can also use transformEvents here and provide custom implementation to avoid this scenario entirely without relying on disabling buttons in the UI layer based on states.

Overriding transformEvents to prevent duplicate add/remove events like the below:


Stream<Transition<CartEvent, CartState>> transformEvents(
    Stream<CartEvent> events, transitionFn) {
  return super.transformEvents(
    event.distinct(),
    transitionFn,
  );
}

You could write it as below. Both apply the same transformation on the input stream of events.


Stream<Transition<CartEvent, CartState>> transformEvents(
    Stream<CartEvent> events, transitionFn) {
  return events.distinct().asyncExpand(transitionFn)
}

You could write it as below. Both apply the same transformation on the input stream of events.

Using the distinct operator on our Stream<Event> here will make sure that two successive AddToCartEvent or RemoveFromCart for the same productId and cartId combination will not be allowed. It won't result in duplicate API calls because our mapEventToState wouldn't allow the same events because of the distinct operator. Plus, in this case, we didn't need Rx at all and Dart's native stream API was sufficient 😁 #FakeBlogTitle

Note: We're only covering the case where we're adding/removing only a single qty of a particular product in the Cart. You need to include the qty field in the event classes shown above for multiple qty.

What if API calls fail? In that case, because of the distinct operator, users wouldn't be able to retry adding/removing the same product in the cart, no? 🤔. Yes, to prevent that scenario during the API failure, we will have to emit the event add(CartFailureEvent()) in catch blocks so that the user can retry those failed add/remove actions.

// ...
// ...
Stream<CartState> _mapAddToCartEventToState(AddToCartEvent event) async* {
  try{
    yield CartState.addingToCart(event.productId);

    final itemId = await _cartService.addToCart(event.cartId, event.productId);
    yield CartState.addedToCart(event.productId);
  }
  catch (e) {
    add(CartFailureEvent());
    yield CartState.addToCartError(event.productId, e.message);
  }
}

// ...
// ...
class CartFailureEvent extends CartEvent {}

TBH, it is not a valid case to use override transformEvents at all, no? 😂 Yeah, but it is a pretty good trade-off to handle everything inside Bloc vs having a mix of logic in UI(disabling buttons during some state) + Bloc. Whatever you prefer!

Closing Note

Apart from the above 2 cases, there could be a few other use cases where we could use RxDart with Bloc. I've learnt this from Felix's(Bloc library author) FlutterSaurus codebase and have been using it in a few of the Blocs. I hope that was useful.

You can check out next post, where we will see how we can handle the stream of data coming in from the service/repository layer and convert it to a stream of states inside your mapEventToState method.