I’ve discussed Reactive Extensions with a few people after my Rx talk last week and thought I’d clear up some confusion by clarifying on this blog…
One thing to know about Reactive Extensions is that simply going from pull model (enumerating) to push model (observing) doesn’t give you immediate background thread processing – everything is still happening on the same thread.
For example, let’s look at the following code and put a few debugger breakpoints in it:
Right at the top, there’s an array of image names I’m adding to the ListBox (AddImage() method). In this WPF app, the OnLoaded() method handles the Loaded event, where foreach loop iterates through the array and calls the AddImage() method.
Red balls indicate the places where I’d put the breakpoints, and the number in it tells the order in which they would be hit. Following the track, it is obvious that the OnLoaded() method is blocking until all images in the array is added into the ListBox.
To transform this code to use the observer/pull model, the following changes are made:
- The array (now an IObservable<string>) is generated with the Generate() Observable constructor.
- foreach loop is removed from the OnLoaded() method and replaced with the Observable subscription, providing the methods for OnNext and OnCompleted.
The Subscribe() method looks harmless sitting alone on now shortened OnLoaded() method, but guess what – it’s still blocking it! Take a look at the execution path: the code would enter the method, and upon subscription, starts calling the AddImage() method. After that, it calls OnCompleted() and then, only then, continues the execution in the OnLoaded() method and finally exits.
The thing to be careful about here is the fact that even if the images.Subscribe() method looks innocent and instant, it may be executing a long(er) time, if an observable sequence is very long. Which also means that the current thread will be blocked for that time also. And we don’t want the UI thread blocked.
What we can do to achieve the behavior we want and expected (not blocking the UI) is forcing subscription on a new – background – thread and expect the observable to call us back on the UI thread. Note that the AddImage() method has to be called on the UI thread because the code accesses the ListBox and if called from a non-UI thread, the call would fail. The code below exercises these changes and results in a whole different flow: upon subscription, the UI thread exits the OnLoaded() method immediately, while the subscription processing is done in a new thread. ObserveOnDispatcher() operator ensures that for each element in the observable sequence, AddImage() method is safely called on the UI thread.