<div id="app">
  <h2 class="time"></h2>
  <button id="theOnlyButton">Start timer</button>
</div>
// ========= Observable factory =========
function interval(delay) {
  return Observable(function(observer) {
    var counter   = 0;
    var callback  = () => observer.next(counter++);
    var _interval = setInterval(callback, delay);

    observer.setUnsubscribe(() => clearInterval(_interval));

    return observer.unsubscribe;
  });
}

// ========= Operators =========
function map(transformFn) {
  return source$ => Observable(observer => 
    source$.subscribe(value => observer.next(
      transformFn(value)
    ))
  );
}

function take(number) {
  return source$ => Observable(function(observer) {
    var count = 0;
    var unsubscribeSource = source$.subscribe(function(value) {
      count++;
      observer.next(value);        
      if (count === number) {
        observer.complete();
        unsubscribeSource();
      }
    });
  });
}

// ========= Demo =========
var textDisplay = document.querySelector('.time');
var button      = document.getElementById('theOnlyButton');
var plusOne     = num => num + 1;
var showText    = function(text) {
  textDisplay.textContent = text;
}

button.addEventListener('click', function() {
  var time$ = interval(1000).pipe(
    map(plusOne),
    map(readableTime),
    take(5)
  );

  time$.subscribe({
    next: showText,
    complete: () => showText("Time's out")
  });
});

// ========= Helpers =========
function paddedNumber(num) {
  return num.toString().padStart(2, '0');
}

function readableTime(time) {
  var minutes = Math.floor((time / 60) % 60);
  var seconds = Math.floor(time % 60);

  return paddedNumber(minutes) + ':' + paddedNumber(seconds);
}

// ========= same old thing in here =========

function Observable (subscriber) {
  var observable = {
    subscribe: observer => subscriber(SafeObserver(observer)),
    pipe: function (...fns) {
      return fns.reduce((source, fn) => fn(source), observable);
    }
  }
  
  return observable;
}

function SafeObserver (observer) {
  var safeObserver = {};
  var isSubscribed = true; //this will track our subscribtion state
  var _unsubscribe;

  // this is for convinience.
  // so we can subscribe like this: 
  // stream.subscribe(val => { ...some code })
  if(typeof observer === 'function'){
    observer = { next: observer };
  }

  safeObserver.next = function(value) {
    if (!isSubscribed || !observer.next) {
      return;
    }

    try {
      observer.next(value);
    } catch (e) {
      // we want to unsubscribe only if there is an error
      safeObserver.unsubscribe();
    }
  }

  safeObserver.error = function(err) {
    if (!isSubscribed || !observer.error) {
      return;
    }

    try {
      observer.error(err);
    } catch (e) {}

    // we will unsubscribe no matter what happens
    safeObserver.unsubscribe();
  }

  safeObserver.complete = function(err) {
    if (!isSubscribed || !observer.complete) {
      return;
    }

    try {
      observer.complete();
    } catch (e) {}

    // we will unsubscribe no matter what happens
    safeObserver.unsubscribe();
  }

  safeObserver.unsubscribe = function() {
    isSubscribed = false;

    if(_unsubscribe) {
      _unsubscribe();
    }
  }

  safeObserver.setUnsubscribe = function(unsub) {
    _unsubscribe = unsub;
  }

  return safeObserver;
}

External CSS

This Pen doesn't use any external CSS resources.

External JavaScript

This Pen doesn't use any external JavaScript resources.