Reactive Mesh

Kalin Maldzhanski

Reactive Mesh

What is Reactive?

What is Reactive?

we want systems that are Responsive, Resilient, Elastic and Message Driven. We call these Reactive Systems.
Reactive Manifesto

What is Reactive?

React on Messages as they come
Essentially processing live data in a non-blocking potentially async manner

Fault Tolerant

Backpressure

What about ...?

Why Reactive?

Reactive Libraries

Reactive Enabled Libraries


Many

RxJava 1.x

RxJava 2.x

Reactor Core

Akka Streams

Reactive Streams


public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Publisher<T> {
    public void subscribe(org.reactivestreams.Subscriber<? super T> s);
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Reactive Streams

PublisherSubscriberSubscriptionSubscriptionOnSubscribeSubscribe()OnNext()Request()Cancel()

Why Reactive Streams?


Interoperability

Reactive Streams TCK

Marbles

http://rxmarbles.com/

Akka Runnable Graph

Akka Runnable Graph


  final Outlet<Integer> A = builder.add(Source.single(0)).out();
  final UniformFanOutShape<Integer, Integer> B = builder.add(Broadcast.create(2));
  final UniformFanInShape<Integer, Integer> C = builder.add(Merge.create(2));
  final FlowShape<Integer, Integer> D =
    builder.add(Flow.of(Integer.class).map(i -> i + 1));
  final UniformFanOutShape<Integer, Integer> E = builder.add(Balance.create(2));
  final UniformFanInShape<Integer, Integer> F = builder.add(Merge.create(2));
  final Inlet<Integer> G = builder.add(Sink.<Integer> foreach(System.out::println)).in();

  builder.from(F).toFanIn(C);
  builder.from(A).viaFanOut(B).viaFanIn(C).toFanIn(F);
  builder.from(B).via(D).viaFanOut(E).toFanIn(F);
  builder.from(E).toInlet(G);

Create Source RxJava 1.x


        Observable.just(8);
        Observable.from(new Integer[]{8});
        Observable.fromCallable(() -> 8);
        Observable.empty();
        Observable.never();
        Observable.error(new IllegalStateException("why 8"));
        Observable.create(subscriber -> {
            subscriber.onNext(8);
            subscriber.onCompleted();
        });
    

Create Source RxJava 2.x


Observable.just(8);
Observable.fromArray(new Integer[]{8});
Observable.fromCallable(() -> 8);
Observable.fromFuture(new IntFuture(8));
Observable.fromIterable(Arrays.asList(new Integer[]{8}));
Observable.fromPublisher(new IntPublisher(8));
Observable.empty();
Observable.never();
Observable.error(new IllegalStateException("why 8"));
Observable.create(e -> {
    e.setCancellable(() -> {});
    e.onNext(8);
    e.onComplete();});
    

Flowable.just(8);
Flowable.fromArray(8);
Flowable.fromCallable(() -> 8);
Flowable.fromFuture(new IntFuture(8));
Flowable.fromIterable(Arrays.asList(new Integer[]{8}));
Flowable.fromPublisher(new IntPublisher(8));
Flowable.empty();
Flowable.never();
Flowable.error(new IllegalStateException("why 8"));
Flowable.create(emitter -> {
    emitter.setCancellable(() -> {});
    emitter.onNext(8);
    emitter.onComplete();}, BackpressureStrategy.BUFFER);
    

Create Source Reactor


        Flux.just(8);
        Flux.fromArray(new Integer[]{8});
        Flux.from(new IntPublisher(8));
        Flux.fromIterable(Arrays.asList(new Integer[]{8}));
        Flux.fromStream(Stream.of(8));
        Flux.empty();
        Flux.never();
        Flux.error(new IllegalStateException("why 8"));
        Flux.create(e -> {
            e.onDispose(()->{});
            e.next(8);
            e.complete();
        }, FluxSink.OverflowStrategy.BUFFER);
    

Testing Reactive Code

Testing RxJava


TestSubscriber<Integer> ts = Flowable.range(1, 5).test();

TestObserver<Integer> to = Observable.range(1, 5).test();

TestObserver<Integer> tso = Single.just(1).test();

TestObserver<Integer> tmo = Maybe.just(1).test();

TestObserver<Integer> tco = Completable.complete().test();
            

Performance


http://akarnokd.blogspot.bg/2016/12/the-reactive-scrabble-benchmarks.html

Fusion

Fusion?

http://akarnokd.blogspot.bg/2016/03/operator-fusion-part-1.html

Macro Fusion

Micro Fusion

Don't Block, React !

Thank you!

Cheers!


Kalin Maldzhanski @djodjoni
https://djodjoni.github.io/prezz/2017-05-31-jprime-reactive-mesh

Powered by Shower