Skip to content

Conversation

@Crystark
Copy link

I added an OnSubscribe to deal with reading lines while the InputStream is being written to (e.g. for a large download). I hope this is useful.

@akarnokd
Copy link
Member

/cc @davidmoten

@abersnaze
Copy link
Contributor

How is it better than byLine(decode(from(input), "UTF-8"))? if you are just looking for convenience maybe just wrapping that line of code be good enough. I would like to avoid adding custom OnSubscribeInputStreamToLines code that needs to be tested and maintained.

@davidmoten
Copy link
Contributor

I agree with @abersnaze on this one. And I suppose an alternative is split(from(new BufferedReader(input, "UTF-8"), "\n"). A benchmark might tell us which to use.

Incidentally split and decode don't support backpressure. Would be a nice improvement. RxJavaString has been so immobile for a long time that I've implemented those things with backpressure in my own libraries but would be nice to see that functionality here under proper supervision.

@abersnaze
Copy link
Contributor

@davidmoten if you got the code what are you waiting for. Submit a PR!

@Crystark
Copy link
Author

@abersnaze@davidmoten

byLine(decode(from(input), "UTF-8")) was the first thing I tried:

@TestpublicvoidtestDownload() throwsInterruptedException{Strignuri = ... Observable .defer(() ->{try{InputStreamis = getAwsInputStream(uri); if (is != null){returnStringObservable.byLine(StringObservable.decode(StringObservable.from(is), "UTF-8"))} } catch (Throwablet){returnObservable.error(t)} returnObservable.empty()}) .doOnSubscribe(() -> L.warn("doOnSubscribe")) .doOnNext((t) -> L.warn("doOnNext - " + t)) .subscribe()}

This prints out doOnSubscribe then hangs and i see my network active, the file being downloaded.

However if i use OnSubscribeInputStreamToLines

@TestpublicvoidtestDownload() throwsInterruptedException{Strignuri = ... Observable .defer(() ->{try{InputStreamis = getAwsInputStream(uri); if (is != null){returnObservable.create(newOnSubscribeInputStreamToLines(is))} } catch (Throwablet){returnObservable.error(t)} returnObservable.empty()}) .doOnSubscribe(() -> L.warn("doOnSubscribe")) .doOnNext((t) -> L.warn("doOnNext - " + t)) .subscribe()}

I see my doOnNext being printed as the file is being downloaded. Maybe I just missed something ?

@Crystark
Copy link
Author

I just tested with the following that works.

StringObservable.split(StringObservable.from(newBufferedReader(newInputStreamReader(is, Charsets.UTF_8))), "\n")

So i retried removing byLine and replacing it by

StringObservable.split(StringObservable.decode(StringObservable.from(is), "UTF-8"), "\n");

And that works. The problem was just with the line separator actually. So yeah, it wasn't working because byLine doesn't handle all kinds of line separator (CR, LF, and CRLF). In my OnSubscribe I was using readLine which handles that:

Reads a line of text. A line is considered to be terminated by any one of a line feed ('\n'), a carriage return ('\r'), or a carriage return followed immediately by a linefeed. 

I'll close that PR. Maybe is it worth opening an issue about byLine ? And maybe also and issue to tackle the backpressure on decode and split as @davidmoten suggested ? For now i'll use my OnSubscribe as it works fine and has backpressure support.

@abersnaze
Copy link
Contributor

strange the byLine() is literally this.

publicstaticObservable<String> byLine(Observable<String> source){returnsplit(source, System.getProperty("line.separator"))}

and this code confirms System.getProperty("line.separator") and "\n" (on my machine) are the same.

System.out.println("System.getProperty(\"line.separator\") = " + Arrays.toString(System.getProperty("line.separator").getBytes())); System.out.println("\"\\n\" = " + Arrays.toString("\n".getBytes()));

outputs this

System.getProperty("line.separator") = [10] "\n" = [10] 

@Crystark Could you report what middle snippet of code outputs on your system?

@Crystark
Copy link
Author

I'm running my tests on windows so yeah, line separator is CRLF:

System.getProperty("line.separator") = [13, 10] "\n" = [10] "\r" = [13] 

IMO, the System's line separator should be ignored and byLine should be able to split any kind of line separator, whatever the system.

@abersnaze
Copy link
Contributor

@Crystark check of this #31 to see if works for you.

Sign up for freeto join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants

@Crystark@akarnokd@abersnaze@davidmoten