From e5da471776c961290870f6c8387ad437c9f3aa43 Mon Sep 17 00:00:00 2001 From: Jonas Adler Date: Tue, 1 Mar 2016 18:21:31 +0100 Subject: [PATCH 1/2] update rx dependency to 1.1.1 --- build.gradle | 2 +- .../operators/OnSubscribeInputStream.java | 27 +++++++++---------- .../internal/operators/OnSubscribeReader.java | 23 ++++++++-------- 3 files changed, 25 insertions(+), 27 deletions(-) diff --git a/build.gradle b/build.gradle index cdf240a..f4f4a93 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ apply plugin: 'rxjava-project' apply plugin: 'java' dependencies { - compile 'io.reactivex:rxjava:1.0.12' + compile 'io.reactivex:rxjava:1.1.1' testCompile 'junit:junit-dep:4.10' testCompile 'org.mockito:mockito-core:1.8.5' } diff --git a/src/main/java/rx/internal/operators/OnSubscribeInputStream.java b/src/main/java/rx/internal/operators/OnSubscribeInputStream.java index 5f82221..0078911 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeInputStream.java +++ b/src/main/java/rx/internal/operators/OnSubscribeInputStream.java @@ -1,13 +1,13 @@ package rx.internal.operators; +import rx.Observer; +import rx.observables.SyncOnSubscribe; + import java.io.IOException; import java.io.InputStream; import java.util.Arrays; -import rx.Subscriber; -import rx.observables.AbstractOnSubscribe; - -public final class OnSubscribeInputStream extends AbstractOnSubscribe { +public final class OnSubscribeInputStream extends SyncOnSubscribe { private final InputStream is; private final int size; @@ -18,25 +18,24 @@ public OnSubscribeInputStream(InputStream is, int size) { } @Override - protected InputStream onSubscribe(Subscriber subscriber) { - return is; + protected InputStream generateState() { + return this.is; } @Override - protected void next(SubscriptionState state) { - - InputStream is = state.state(); + protected InputStream next(InputStream state, Observer observer) { byte[] buffer = new byte[size]; try { - int count = is.read(buffer); + int count = this.is.read(buffer); if (count == -1) - state.onCompleted(); + observer.onCompleted(); else if (count < size) - state.onNext(Arrays.copyOf(buffer, count)); + observer.onNext(Arrays.copyOf(buffer, count)); else - state.onNext(buffer); + observer.onNext(buffer); } catch (IOException e) { - state.onError(e); + observer.onError(e); } + return state; } } diff --git a/src/main/java/rx/internal/operators/OnSubscribeReader.java b/src/main/java/rx/internal/operators/OnSubscribeReader.java index 75b5ee4..0d6b62d 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeReader.java +++ b/src/main/java/rx/internal/operators/OnSubscribeReader.java @@ -1,12 +1,12 @@ package rx.internal.operators; +import rx.Observer; +import rx.observables.SyncOnSubscribe; + import java.io.IOException; import java.io.Reader; -import rx.Subscriber; -import rx.observables.AbstractOnSubscribe; - -public final class OnSubscribeReader extends AbstractOnSubscribe { +public final class OnSubscribeReader extends SyncOnSubscribe { private final Reader reader; private final int size; @@ -17,23 +17,22 @@ public OnSubscribeReader(Reader reader, int size) { } @Override - protected Reader onSubscribe(Subscriber subscriber) { - return reader; + protected Reader generateState() { + return this.reader; } @Override - protected void next(SubscriptionState state) { - - Reader reader = state.state(); + protected Reader next(Reader state, Observer observer) { char[] buffer = new char[size]; try { int count = reader.read(buffer); if (count == -1) - state.onCompleted(); + observer.onCompleted(); else - state.onNext(String.valueOf(buffer, 0, count)); + observer.onNext(String.valueOf(buffer, 0, count)); } catch (IOException e) { - state.onError(e); + observer.onError(e); } + return reader; } } From f7e8244a894ab7d5a6e45090ab078a56a96e017b Mon Sep 17 00:00:00 2001 From: Jonas Adler Date: Wed, 2 Mar 2016 09:29:00 +0100 Subject: [PATCH 2/2] always use state param instead of field --- .../java/rx/internal/operators/OnSubscribeInputStream.java | 2 +- src/main/java/rx/internal/operators/OnSubscribeReader.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/rx/internal/operators/OnSubscribeInputStream.java b/src/main/java/rx/internal/operators/OnSubscribeInputStream.java index 0078911..8a531a3 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeInputStream.java +++ b/src/main/java/rx/internal/operators/OnSubscribeInputStream.java @@ -26,7 +26,7 @@ protected InputStream generateState() { protected InputStream next(InputStream state, Observer observer) { byte[] buffer = new byte[size]; try { - int count = this.is.read(buffer); + int count = state.read(buffer); if (count == -1) observer.onCompleted(); else if (count < size) diff --git a/src/main/java/rx/internal/operators/OnSubscribeReader.java b/src/main/java/rx/internal/operators/OnSubscribeReader.java index 0d6b62d..5c2cb19 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeReader.java +++ b/src/main/java/rx/internal/operators/OnSubscribeReader.java @@ -25,7 +25,7 @@ protected Reader generateState() { protected Reader next(Reader state, Observer observer) { char[] buffer = new char[size]; try { - int count = reader.read(buffer); + int count = state.read(buffer); if (count == -1) observer.onCompleted(); else @@ -33,6 +33,6 @@ protected Reader next(Reader state, Observer observer) { } catch (IOException e) { observer.onError(e); } - return reader; + return state; } }