58. * <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param notificationHandler
* receives an Observable of notifications with which a user can complete or error, aborting the repeat.
* @param scheduler
* the {@link Scheduler} to emit the items on
* @return the source Observable modified with repeat logic
* @see <a href="http://reactivex.io/documentation/operators/repeat.html">ReactiveX operators documentation: Repeat</a>
*/
public final Observable<T> repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> dematerializedNotificationHandler = new Func1<Observable<? extends Notification<?>>, Observable<?>>()
@Override
public Observable<?> call(Observable<? extends Notification<?>> notifications) {
return notificationHandler.call(notifications.map(new Func1<Notification<?>, Void>() {
@Override
public Void call(Notification<?> notification) {
return null;
}
}));
}
};
return OnSubscribeRedo.repeat(this, dematerializedNotificationHandler, scheduler);
}
/**
* Returns an Observable that emits the same values as the source Observable with the exception of an
* {@code onCompleted}. An {@code onCompleted} notification from the source will result in the emission of
* a {@code void} item to the Observable provided as an argument to the {@code notificationHandler}
* function. If that Observable calls {@code onComplete} or {@code onError} then {@code repeatWhen} will
* call {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
* resubscribe to the source observable.
* <p>
* <img width="640" height="430" src=/slideshow/codefest2015-reactive-streams/46430538/"https:/raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeatWhen} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
* </dl>
*
* @param notificationHandler
* receives an Observable of notifications with which a user can complete or error, aborting the repeat.
* @return the source Observable modified with repeat logic
* @see <a href="http://reactivex.io/documentation/operators/repeat.html">ReactiveX operators documentation: Repeat</a>
*/
public final Observable<T> repeatWhen(final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler) {
Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> dematerializedNotificationHandler = new Func1<Observable<? extends Notification<?>>, Observable<?>>()
@Override
public Observable<?> call(Observable<? extends Notification<?>> notifications) {
return notificationHandler.call(notifications.map(new Func1<Notification<?>, Void>() {
@Override
public Void call(Notification<?> notification) {
return null;
}
}));
}
};
return OnSubscribeRedo.repeat(this, dematerializedNotificationHandler);
}
/**
* An Observable that never sends any information to an {@link Observer}.
* This Observable is useful primarily for testing purposes.
*
* @param <T>
* the type of item (not) emitted by the Observable
*/
private static class NeverObservable<T> extends Observable<T> {
public NeverObservable() {
super(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> observer) {
// do nothing
}
});
}
}
/**
* An Observable that invokes {@link Observer#onError onError} when the {@link Observer} subscribes to it.
*
* @param <T>
* the type of item (ostensibly) emitted by the Observable
*/
private static class ThrowObservable<T> extends Observable<T> {
public ThrowObservable(final Throwable exception) {
super(new OnSubscribe<T>() {
/**
* Accepts an {@link Observer} and calls its {@link Observer#onError onError} method.
*
* @param observer
* an {@link Observer} of this Observable
*/
@Override
/**
* Operator function for lifting into an Observable.
*/
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
/**
* Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass
* the values of the current Observable through the Operator function.
* <p>
* In other words, this allows chaining Observers together on an Observable for acting on the values within
* the Observable.
* <p> {@code
* observable.map(...).filter(...).take(5).lift(new OperatorA()).lift(new OperatorB(...)).subscribe()
* }
* <p>
* If the operator you are creating is designed to act on the individual items emitted by a source
* Observable, use {@code lift}. If your operator is designed to transform the source Observable as a whole
* (for instance, by applying a particular set of existing RxJava operators to it) use {@link #compose}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param lift the Operator that implements the Observable-operating function to be applied to the source
* Observable
* @return an Observable that is the result of applying the lifted Operator to the source Observable
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
*/
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> lift) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(lift).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
st.onError(e);
}
} catch (Throwable e) {
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
}
/**
* Transform an Observable by applying a particular Transformer function to it.
* <p>
* This method operates on the Observable itself whereas {@link #lift} operates on the Observable's
* Subscribers or Observers.
* <p>
* If the operator you are creating is designed to act on the individual items emitted by a source
* Observable, use {@link #lift}. If your operator is designed to transform the source Observable as a whole
* (for instance, by applying a particular set of existing RxJava operators to it) use {@code compose}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code compose} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param transformer implements the function that transforms the source Observable
* @return the source Observable, transformed by the transformer function
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
*/
@SuppressWarnings("unchecked")
public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
return ((Transformer<T, R>) transformer).call(this);
}
/**
* Transformer function used by {@link #compose}.
* @warn more complete description needed
*/
public static interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {
// cover for generics insanity
}
/*
* Operators Below Here
*
61. 61
Source
val iterableSource = Source(1 to 50)
val tickSource = Source(1 second, 1 second, "Tick")
val singleSource = Source.single("CodeFest")
val emptySource = Source.empty()
val zmqSource = ???
62. 62
Sink
val blackhole = Sink.ignore
val onComplete = Sink.onComplete { result =>
System.exit(0)
}
val foreach = Sink.foreach(println)
val firstElement = Sink.head[Int]
63. 63
Flow
implicit val as = ActorSystem("CodeFest")
implicit val materializer = ActorFlowMaterializer()
val source = Source(1 to 50)
val sink = Sink.foreach[Int](println)
val flow = source.to(sink)
flow.run()
64. 64
Flow
val flow2 = source
.map { x => x * 2 }
.filter { x => x % 3 == 0 }
.to(sink)
flow2.run()
65. 65
Flow
val source = Source(1 to 50)
val sink = Sink.foreach[String](println)
val flow2 = source
.map { x => x.toString }
.map { x => x / 13 }
.to(sink)
flow2.run()