Kalin Maldzhanski
we want systems that are Responsive, Resilient, Elastic and Message Driven. We call these Reactive Systems.Reactive Manifesto
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Publisher<T> {
public void subscribe(org.reactivestreams.Subscriber<? super T> s);
}
public interface Subscription {
public void request(long n);
public void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
final Outlet<Integer> A = builder.add(Source.single(0)).out();
final UniformFanOutShape<Integer, Integer> B = builder.add(Broadcast.create(2));
final UniformFanInShape<Integer, Integer> C = builder.add(Merge.create(2));
final FlowShape<Integer, Integer> D =
builder.add(Flow.of(Integer.class).map(i -> i + 1));
final UniformFanOutShape<Integer, Integer> E = builder.add(Balance.create(2));
final UniformFanInShape<Integer, Integer> F = builder.add(Merge.create(2));
final Inlet<Integer> G = builder.add(Sink.<Integer> foreach(System.out::println)).in();
builder.from(F).toFanIn(C);
builder.from(A).viaFanOut(B).viaFanIn(C).toFanIn(F);
builder.from(B).via(D).viaFanOut(E).toFanIn(F);
builder.from(E).toInlet(G);
Observable.range(1, 100).map(i -> i + 33)
.subscribe(System.out::println);
Flowable.range(1, 100).map(i -> i + 33)
.subscribe(System.out::println);
Flux.range(1, 100).map(i -> i + 33)
.subscribe(System.out::println);
Source.range(1, 100).map(i -> i + 33)
.runForeach(System.out::println, materializer);
Observable.just(8);
Observable.from(new Integer[]{8});
Observable.fromCallable(() -> 8);
Observable.empty();
Observable.never();
Observable.error(new IllegalStateException("why 8"));
Observable.create(subscriber -> {
subscriber.onNext(8);
subscriber.onCompleted();
});
|
|
Flux.just(8);
Flux.fromArray(new Integer[]{8});
Flux.from(new IntPublisher(8));
Flux.fromIterable(Arrays.asList(new Integer[]{8}));
Flux.fromStream(Stream.of(8));
Flux.empty();
Flux.never();
Flux.error(new IllegalStateException("why 8"));
Flux.create(e -> {
e.onDispose(()->{});
e.next(8);
e.complete();
}, FluxSink.OverflowStrategy.BUFFER);
TestSubscriber<Integer> ts = Flowable.range(1, 5).test();
TestObserver<Integer> to = Observable.range(1, 5).test();
TestObserver<Integer> tso = Single.just(1).test();
TestObserver<Integer> tmo = Maybe.just(1).test();
TestObserver<Integer> tco = Completable.complete().test();