Bacon.js

The observer pattern

The way FRP is explained often leads to confusion. Most people know what the P is, and the F seems fairly understandable, but the R can be misleading at times. Examples usually talk about the difference between expressions and statements. Rather than c = a + b setting a value right now, it is an expression which defines that c is always a plus b. It defines a relationship.

Reactive programming has its roots on the observer pattern: there's an observable subject which has a list of listeners, he observable notifies all of them when it has something to publish.

  • Functional programming: synchronous lists in memory
  • Reactive functional programming: dynamic/async events happening over time

A stream is an async collection

In the context of FRP can be modelled as async collections of events that "arrive" or happen over time. An event stream can be thought of as a pipeline events travel through.

For example, this event listener and handler in jQuery:

  $('#button').on('click', function (event) {
  console.log(event.target);
});

can be baconified by doing:

  clickStream = $('#button').asEventStream('click');
clickStream.onValue(function (event) {
  console.log(event.target);
})

Not a big difference so far, but Bacon provides a functional interface to manipulate and handle events (moving to ES6 from now on):

  clickStream
  .map(event => event.target)
  .onValue(element => console.log(element));

clickStream
  .skip(1)
  .take(4) // will only take the 2-5 click events
  .onValue(event => console.log(event.target));

clickStream
  .filter(event => event.type === 'click')
  .onValue(event => console.log(event.target))

Now try rewriting that using callbacks...

So, what's exactly FRP?

FRP is working with streams of values that change over time.

Perhaps an example would help. Imagine moving a mouse over your browser. It produces a stream of x and y values. Rather than using a callback for every mouse move, we can work with the mouse events as a single object over time: an event stream. Suppose we want to know when the mouse moves past a line on the screen at 100px. In regular code we could do this:

  $('body').on('mousemove', function (e) {
  if (e.pageX > 100) {
    console.log('we are over 100');
  }
};

With FRP we would create a stream based on mouse move, then filter it to only have X values over 100, like this:

  $('body').toEventStream('mousemove')
  .filter((v) => v > 100)
  .onValue((v) => console.log(`we are over 100: ${v}`));

We have separated the action, printing a message, from the source of the stream and any filter operations. We can also add more operations to the stream if we want, and abstract the filters out further.

Subscribing

Streams are lazy: we need to call onValue or nothing happens.

  const countriesStream = Bacon
    .fromPromise(fetch('/countries-visited'))
    .map(response => response.json());

The api hasn't been hit just yet... The promise won't be run not resolved/rejected until we subscribe to the stream.

  countriesStream.onValue(countries => console.log(countries));

Calling onValue on a stream (i.e. subscribing to the stream) will trigger whatever behaviour it is the stream is based off.

Map vs Flatmap

Given:

  const getUsersPromise = fetch('/get-users')
    .then(response => response.json());

and this API response:

  [
    { name: 'John Doe', age: 28 },
    { name: 'Jane Doe', age: 27 }
]

This will emit only once:

  Bacon
    .fromPromise(getUsersPromise)
    .onValue(person => console.log(person));

Whereas this piece of code will once for each element of the array (once for John, and once again for Jane):

  Bacon
    .fromPromise(getUsersPromise)
    .flatMap(Bacon.fromArray)
    .onValue(person => console.log(person));

Map does pure data transformation and it's a synchronous operation;

  Bacon
    .fromPromise(getCountriesPromise)
    .map('blah');

will synchronously override the API response (an array of countries) with the string 'blah'.

Flatmap, on the other hand, results in a new stream being created and is an asynchronous operation.

Both map and flatMap takes in a projection function.

Finally, to understand flatMap have a look at Ruby's flatMap language feature.

FlatMap example using jQuery and callbacks:

  $.ajax({ url: '/items' }).done(function (items) {
  if (items.error) {
    return handleError(items.error);
  }

  $.each(items, function (item) {
    $.ajax({ url: '/item/' + item.id }).done(function (item) {
      if (item.error) {
        return handleError(item.error);
      }

      renderItem(item, function (itemEl) {
        itemEl.children('.delete').click(function () {
          $.ajax({ url: '/item/' + item.id, type: 'DELETE' }).done(function (response) {
            if(response.error) {
              return handleError(response.error);
            }

            itemEl.remove();
          });
        });
      });
    });
  });
});

Tastier version with Bacon:

  var isError = function (serverResponse) {
  return typeof serverResponse.error !== 'undefined';
}

var isNotError = function (serverResponse) {
  return !isError(serverResponse);
}

var $allItems = Bacon.fromPromise($.ajax({ url: '/items' }));
var $errors = $allItems.filter(isError);
var $items = $allItems
  .filter(isNotError)
  .flatMap(function (item) {
    return Bacon.fromPromise($.ajax({ url: '/item' + item.id }));
  });

$errors.merge($items.filter(isError));

var $renderedItems = $items
  .filter(isNotError)
  .flatMap(function(item) {
    return Bacon.fromCallback(renderItem, item);  
  });

var $renderedItemsDeleteClicks = $renderedItems
  .flatMap(function (renderedItem) {
    return Bacon.fromEventTarget(renderedItem.children('.delete'), 'click', function () {
      return renderedItem;
    });
  });

var $deleteItemRequest = $renderedItemsDeleteClicks
  .flatMap(function (renderedItem) {
    return Bacon.fromPromise($.ajax({ url: '/item' + renderedItem.data('id'), type: 'DELETE' }));
  });

$errors.merge($deleteItemRequest.filter(isError));
$errors.onValue(handleError);
$deleteItemRequest.filter(isNotError).onValue('.remove');

Streams are cheap

Don't over nest streams, they are very cheap to instantiate! Break the down into multiple streams instead, this makes them way easier to test and debug.

Also, note that every time we chain an operator to our stream, a new event stream gets returned:

  Bacon
    .fromEvent(document.getElementById('btn'), 'click') // s1
    .map(1) // s2
  .scan(0, (x,y) => x + y) // s3
    .filter(value => value % 2 === 0) // s4
    .map(value => 2 * value) // s5
  .onValue(value => console.log(value));

Event Streams vs Properties

Both are observables, but properties hold onto their last emitted value:

  ---------(A)--------------(B)----->   Event Stream
                ^ nothing here

---------(A)--------------(B)----->   Property
                ^ still A

A property is basically an event stream with a notion of state. The property will remember the state of the stream (which is the event object or mapped value). It also provides two helper methods: scan and assign.

  buttonState = enable.merge(disable).toProperty(false); // initial state = false
buttonState.onValue((state) => $('#button').toggleClass('enable', state));

Different names: Signal = Property = Subject = Behaviour = Attribute = holds onto the latest event's value.

Combining with different flavours

They are just alternative syntaxes for the same use case:

  • combineWith:
  const countriesPromise = fetch('//api.myjson.com/bins/ulhan')
  .then(response => response.json());
const languagesPromise = fetch('//api.myjson.com/bins/17p0lr')
  .then(response => response.json());

const countriesStream = Bacon.fromPromise(countriesPromise);
const languagesStream = Bacon.fromPromise(languagesPromise);

const combinedStream = Bacon.combineWith(
  countriesStream,
  languagesStream,
  (countriesData, languagesData) => {
    return [...countriesData.countries, ...languagesData.languages];
  }
);

combinedStream.onValue(data => console.log(data));

  • combine:
  const combinedStream = countriesStream
    .combine(languagesStream, (countriesData, languagesData) => {
        return [
            ...countriesData.countries,
            ...languagesData.languages
        ];
    });

  • Parsing JSON data with combineTemplate:
  Bacon
  .combineTemplate({
    startDate: new Date(),
    showHeader: true,
    countries: countriesStream.map(res => res.countries),
    languages: languagesStream.map(res => res.languages)
  })
  .onValue(data => console.log(data));

Finally, here instead of using console.log as our subscription action we could use ReactDOM's render method to render our component providing the stream's value as props:

  Bacon
.combineTemplate({
    startDate: new Date(),
    showHeader: true,
    countries: countriesStream.map(res => res.countries),
    languages: languagesStream.map(res => res.languages)
  })
  .onValue(props => {
      ReactDOM.render(
          document.getElementById('root'),
          <App {...props} />
      );
  });

Here, App will receive startDate, showHeader, countries and languages as props.

Merging streams

Say we have two buttons, one for enabling something and one for bringing it back to a disabled state.

  const enable = $('#enable').asEventStream('click').map(true);
const disable = $('#disable').asEventStream('click').map(false);

enable
  .merge(disable)
  .onValue(state => console.log(state));

We also have Merge All:

  const loadingStream = Bacon.mergeAll(
    userClicksStream.map(true),
    timerStream.map(false)
);

Buses

A bus is a property: it holds its state at all times.

bus$.toProperty() is undone by property$.changes().

Use properties only if within a when or update. // @TODO wat

Bacon.update = Bacon.when + startWith

Avoid pushing to buses in streams.

Bus examples:

  var $deleteItem = new Bacon.Bus();
$deleteItem.plug(Bacon.fromEventTarget($('.delete'), 'click'));
$deleteItem.map('.target.remove');
$deleteItem.push($('item1'));

Hungry philosophers:

  var chopsticks = [new Bacon.Bus(), new Bacon.Bus(), new Bacon.Bus()]
var hungry     = [new Bacon.Bus(), new Bacon.Bus(), new Bacon.Bus()]
var eat = function(i) {
  return function() {
    setTimeout(function() {
      chopsticks[i].push({})
      chopsticks[(i+1) % 3].push({})
    }, 1000);
    return 'philosopher ' + i + ' eating'
  }
}

var dining = Bacon.when(
  [hungry[0], chopsticks[0], chopsticks[1]],  eat(0),
  [hungry[1], chopsticks[1], chopsticks[2]],  eat(1),
  [hungry[2], chopsticks[2], chopsticks[0]],  eat(2)
).log()

// make all chopsticks initially available
chopsticks[0].push({}); chopsticks[1].push({}); chopsticks[2].push({})
// make philosophers hungry in some way, in this case we just push to their bus
for (var i = 0; i < 3; i++) {
  hungry[0].push({}); hungry[1].push({}); hungry[2].push({})
}

Buses can be used as message queues:

  let messageQueue = new Bacon.Bus();

// plug event streams to queue
messageQueue.plug(enable.map({ type: 'enable' }));
messageQueue.plug(disable.map({ type: 'disable' }));

messageQueue.onValue((event) => console.log(event.type)); // listen and echo out event state

messageQueue.push({ type: 'disable' }); // push event manually, and log event

Skip vs Filter

Given:

  ------(🇦🇺)----------(🇨🇦)----------> s1
--------------------(🇨🇦)----------> s1.skip(1)

s1.skip(1) means the australian flag is not emitted (gets entirely skipped). The canadian flag is emitted normally. s1.skip(n) would skip the first n flags of the stream.

Filter, on the other hand, filters out events I'm not interested in, e.g. only hits to the "play" button, not other video events like pause, forward or stop.

  ------(⏩)---(▶️)------(⏸)-(⏹)-----> s1
            filter(event => event === ▶️)
-------------(▶️)--------------------> s1.filter(e => e !== ▶️)

Filter takes in what is known as a predicate or test function.

Combine vs Concat

Given s1, an event stream that emits birds, and s2 which is a dinosaur event stream, here's what concat does:

  --(🦆)--------------(🦅)---(🦉)-----------> s1
----------(🦖)--------------------(🦕)----> s2
             combineWith(s1, s2, ...)
----------(🦆🦖)------------------(🦅🦕)----> s1.combine(s2, fn)

Note how the combine stream only emits whenever both streams satisfy.

Concat, on the other hand, waits for the first stream to end before activating the second stream:

  --(🐠)--------------(🐟)-| s1
----(🍋)----(🍊)---| s2
--(🐠)--------------(🐟)-----(🍋)----(🍊)---| s1.concat(s2)

Combine vs When

  const isError = function(serverResponse) { return typeof serverResponse.error !== 'undefined' }
const isNotError = function(serverResponse) { return !isError(serverResponse); }

$items = Bacon.fromPromise($.ajax({ url: '/items' }));
$renderedItems = $items.filter(isNotError).flatMap(function(item) {
  return Bacon.fromCallback(renderItem, item);  
});
$quantity = $renderedItems.flatMap(function(element) {
  return Bacon.fromEventTarget($(element).children('.quantity'), 'change').map(function(event) {
    return [element, $(element).val()];
  });
});
$price = $quantity.map(function(data) {
  return $(data[0]).data('price') * data[1];
});

$refreshClick = Bacon.fromEventTarget($('#refresh_cart'), 'change');
Bacon.when(
  [$refreshClick, $quantity, $price], function(event, data, price) {
    $(data[0]).children('.internal-price').text(price);
    $(data[0]).children('.price').text(price);
  },
  [$quantity, $price], function(data, price) {
    $(data[0]).children('.internal-price').text(price);
  }
);

Debounce

Bacon.debounce(X) only emits once every X ms.

  Bacon
    .fromEvent(inputField, 'keyup')
    .map(getInputValue)
    .map(parseAsInput)
    .filter(isNumber)
    .debounce(300);

The resulting stream will emit at most a single event every 300ms. It doesn't fire an event on every key press, it rather waits for the user to pause.

This would prove super useful for a typeahead search, for instance, so that we don't spam the search endpoint.

FlatMapLatest: Ajax Search

  // Stream of search queries
var $query = $('#search').asEventStream('keyup').map(function(event) {
  return $(event.srcElement).val();
}).skipDuplicates();

// Search result strings
var $results = $query.throttle(500).flatMapLatest(function(query) {
  return Bacon.fromPromise($.ajax('/search/' + query))
}).mapError('Search error');

// Render results
$results.onValue(function(result) {
  $('#results').html(result);
});

// Search status
var searchStatus = $query.map(true).merge($results.map(false)).skipDuplicates().toProperty(false);

Bacon Errors

  streamFromRetry(...)
    .mapError(error => {
        return Bacon.error('oops');
    });

e.g.

  export default function (sport, seriesId, apiKey) {
    let url = `${statsApi}/${sport}/series/${seriesId}/currentseason.json?userkey=${apiKey}`;

    return streamFromRetry({
        url: url,
        retries: 3,
        delay: function () { return 5 * 1000; }
    })
    .map(function (response) {
        if (response.current_season) {
            return response.current_season;
        } else {
            return new bacon.Error('no current season data');
        }
    })
    .mapError(function (err) {
        console.error(`CurrentSeason Stream Error: ${err}`);
        return new bacon.Error('CurrentSeason failed', err);
    });
}

Debugging Bacon streams

  Observable.log()
Observable.toString()
Observable.deps()
Observable.internalDeps()
Bacon.spy(f)

Usage with Node.js

  getInvoiceStream = (id) -> Bacon.fromNodeCallback Invoice, 'findOne', id: id
getInvoiceDataStream = ($invoice) -> $invoice.flatMap (invoice) ->
  Bacon.fromNodeCallback invoice, 'toDeepJSON'

# Load Invoice and its deps
get: (req, res) ->
  $invoice = getInvoiceStream req.param 'id'
  $invoiceData = getInvoiceDataStream invoice
  $invoiceData.onValue _.bind(res.json, res)

  $errors = Bacon.mergeAll $invoice.errors(), $invoiceData.errors()
  $errors.onError _.bind(res.send, res, 500)

# Generate PDF export
pdf: (req, res) ->
  $invoice = getInvoiceStream req.param 'id'
  $invoiceData = getInvoiceDataStream $invoice
  $invoiceRender = $invoiceData.map renderInvoicePDF
  $invoiceRenderData = $invoiceRender.flatMap (pdf) -> Bacon.fromCallback pdf, 'output'
  $invoiceRenderData.onValue _.bind(res.end, res)

  $errors = Bacon.mergeAll $invoice.errors(), $invoiceData.errors()
  $errors.onError _.bind(res.send, res, 500)

Resources


615 0 5