diff --git a/rxjava-contrib/rxjava-string/build.gradle b/rxjava-contrib/rxjava-string/build.gradle
new file mode 100644
index 0000000000..5c578ae04d
--- /dev/null
+++ b/rxjava-contrib/rxjava-string/build.gradle
@@ -0,0 +1,30 @@
+apply plugin: 'osgi'
+
+sourceCompatibility = JavaVersion.VERSION_1_6
+targetCompatibility = JavaVersion.VERSION_1_6
+
+dependencies {
+ compile project(':rxjava-core')
+ testCompile project(":rxjava-core").sourceSets.test.output
+ provided 'junit:junit-dep:4.10'
+ provided 'org.mockito:mockito-core:1.8.5'
+}
+
+javadoc {
+ options {
+ doclet = "org.benjchristensen.doclet.DocletExclude"
+ docletpath = [rootProject.file('./gradle/doclet-exclude.jar')]
+ stylesheetFile = rootProject.file('./gradle/javadocStyleSheet.css')
+ windowTitle = "RxJava Javadoc ${project.version}"
+ }
+ options.addStringOption('top').value = '
RxJava
'
+}
+
+jar {
+ manifest {
+ name = 'rxjava-string'
+ instruction 'Bundle-Vendor', 'Netflix'
+ instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
+ instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
+ }
+}
diff --git a/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java b/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java
new file mode 100644
index 0000000000..dfe7ca3e68
--- /dev/null
+++ b/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java
@@ -0,0 +1,255 @@
+package rx.observables;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+import rx.Observable;
+import rx.Observer;
+import rx.Subscription;
+import rx.Observable.OnSubscribeFunc;
+import rx.util.functions.Func1;
+import rx.util.functions.Func2;
+
+public class StringObservable {
+ /**
+ * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
+ *
+ * @param src
+ * @param charsetName
+ * @return
+ */
+ public static Observable decode(Observable src, String charsetName) {
+ return decode(src, Charset.forName(charsetName));
+ }
+
+ /**
+ * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
+ *
+ * @param src
+ * @param charset
+ * @return
+ */
+ public static Observable decode(Observable src, Charset charset) {
+ return decode(src, charset.newDecoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE));
+ }
+
+ /**
+ * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
+ * This method allows for more control over how malformed and unmappable characters are handled.
+ *
+ * @param src
+ * @param charsetDecoder
+ * @return
+ */
+ public static Observable decode(final Observable src, final CharsetDecoder charsetDecoder) {
+ return Observable.create(new OnSubscribeFunc() {
+ @Override
+ public Subscription onSubscribe(final Observer super String> observer) {
+ return src.subscribe(new Observer() {
+ private ByteBuffer leftOver = null;
+
+ @Override
+ public void onCompleted() {
+ if (process(null, leftOver, true))
+ observer.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ if (process(null, leftOver, true))
+ observer.onError(e);
+ }
+
+ @Override
+ public void onNext(byte[] bytes) {
+ process(bytes, leftOver, false);
+ }
+
+ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
+ ByteBuffer bb;
+ if (last != null) {
+ if (next != null) {
+ // merge leftover in front of the next bytes
+ bb = ByteBuffer.allocate(last.remaining() + next.length);
+ bb.put(last);
+ bb.put(next);
+ bb.flip();
+ }
+ else { // next == null
+ bb = last;
+ }
+ }
+ else { // last == null
+ if (next != null) {
+ bb = ByteBuffer.wrap(next);
+ }
+ else { // next == null
+ return true;
+ }
+ }
+
+ CharBuffer cb = CharBuffer.allocate((int) (bb.limit() * charsetDecoder.averageCharsPerByte()));
+ CoderResult cr = charsetDecoder.decode(bb, cb, endOfInput);
+ cb.flip();
+
+ if (cr.isError()) {
+ try {
+ cr.throwException();
+ }
+ catch (CharacterCodingException e) {
+ observer.onError(e);
+ return false;
+ }
+ }
+
+ if (bb.remaining() > 0) {
+ leftOver = bb;
+ }
+ else {
+ leftOver = null;
+ }
+
+ String string = cb.toString();
+ if (!string.isEmpty())
+ observer.onNext(string);
+
+ return true;
+ }
+ });
+ }
+ });
+ }
+
+ /**
+ * Encodes a possible infinite stream of strings into a Observable of byte arrays.
+ *
+ * @param src
+ * @param charsetName
+ * @return
+ */
+ public static Observable encode(Observable src, String charsetName) {
+ return encode(src, Charset.forName(charsetName));
+ }
+
+ /**
+ * Encodes a possible infinite stream of strings into a Observable of byte arrays.
+ *
+ * @param src
+ * @param charset
+ * @return
+ */
+ public static Observable encode(Observable src, Charset charset) {
+ return encode(src, charset.newEncoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE));
+ }
+
+ /**
+ * Encodes a possible infinite stream of strings into a Observable of byte arrays.
+ * This method allows for more control over how malformed and unmappable characters are handled.
+ *
+ * @param src
+ * @param charsetEncoder
+ * @return
+ */
+ public static Observable encode(Observable src, final CharsetEncoder charsetEncoder) {
+ return src.map(new Func1() {
+ @Override
+ public byte[] call(String str) {
+ CharBuffer cb = CharBuffer.wrap(str);
+ ByteBuffer bb;
+ try {
+ bb = charsetEncoder.encode(cb);
+ } catch (CharacterCodingException e) {
+ throw new RuntimeException(e);
+ }
+ return Arrays.copyOfRange(bb.array(), bb.position(), bb.limit());
+ }
+ });
+ }
+
+ /**
+ * Gather up all of the strings in to one string to be able to use it as one message. Don't use this on infinite streams.
+ *
+ * @param src
+ * @return
+ */
+ public static Observable stringConcat(Observable src) {
+ return src.aggregate(new Func2() {
+ public String call(String a, String b) {
+ return a + b;
+ }
+ });
+ }
+
+ /**
+ * Rechunks the strings based on a regex pattern and works on infinite stream.
+ *
+ * resplit(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
+ * resplit(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
+ *
+ * See {@link Pattern}
+ *
+ * @param src
+ * @param regex
+ * @return
+ */
+ public static Observable split(final Observable src, String regex) {
+ final Pattern pattern = Pattern.compile(regex);
+ return Observable.create(new OnSubscribeFunc() {
+ @Override
+ public Subscription onSubscribe(final Observer super String> observer) {
+ return src.subscribe(new Observer() {
+ private String leftOver = null;
+
+ @Override
+ public void onCompleted() {
+ output(leftOver);
+ observer.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ output(leftOver);
+ observer.onError(e);
+ }
+
+ @Override
+ public void onNext(String segment) {
+ String[] parts = pattern.split(segment, -1);
+
+ if (leftOver != null)
+ parts[0] = leftOver + parts[0];
+ for (int i = 0; i < parts.length - 1; i++) {
+ String part = parts[i];
+ output(part);
+ }
+ leftOver = parts[parts.length - 1];
+ }
+
+ private int emptyPartCount = 0;
+ /**
+ * when limit == 0 trailing empty parts are not emitted.
+ * @param part
+ */
+ private void output(String part) {
+ if (part.isEmpty()) {
+ emptyPartCount++;
+ }
+ else {
+ for(; emptyPartCount>0; emptyPartCount--)
+ observer.onNext("");
+ observer.onNext(part);
+ }
+ }
+ });
+ }
+ });
+ }
+}
diff --git a/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java b/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java
new file mode 100644
index 0000000000..8ced455f63
--- /dev/null
+++ b/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java
@@ -0,0 +1,115 @@
+package rx.observables;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.MalformedInputException;
+
+import org.junit.Test;
+
+import rx.Observable;
+import rx.observables.BlockingObservable;
+import rx.observables.StringObservable;
+import rx.util.AssertObservable;
+
+public class StringObservableTest {
+
+ @Test
+ public void testMultibyteSpanningTwoBuffers() {
+ Observable src = Observable.from(new byte[] { (byte) 0xc2 }, new byte[] { (byte) 0xa1 });
+ String out = StringObservable.decode(src, "UTF-8").toBlockingObservable().single();
+
+ assertEquals("\u00A1", out);
+ }
+
+ @Test
+ public void testMalformedAtTheEndReplace() {
+ Observable src = Observable.from(new byte[] { (byte) 0xc2 });
+ String out = StringObservable.decode(src, "UTF-8").toBlockingObservable().single();
+
+ // REPLACEMENT CHARACTER
+ assertEquals("\uFFFD", out);
+ }
+
+ @Test
+ public void testMalformedInTheMiddleReplace() {
+ Observable src = Observable.from(new byte[] { (byte) 0xc2, 65 });
+ String out = StringObservable.decode(src, "UTF-8").toBlockingObservable().single();
+
+ // REPLACEMENT CHARACTER
+ assertEquals("\uFFFDA", out);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testMalformedAtTheEndReport() {
+ Observable src = Observable.from(new byte[] { (byte) 0xc2 });
+ CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder();
+ StringObservable.decode(src, charsetDecoder).toBlockingObservable().single();
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testMalformedInTheMiddleReport() {
+ Observable src = Observable.from(new byte[] { (byte) 0xc2, 65 });
+ CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder();
+ StringObservable.decode(src, charsetDecoder).toBlockingObservable().single();
+ }
+
+ @Test
+ public void testPropogateError() {
+ Observable src = Observable.from(new byte[] { 65 });
+ Observable err = Observable.error(new IOException());
+ CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder();
+ try {
+ StringObservable.decode(Observable.concat(src, err), charsetDecoder).toList().toBlockingObservable().single();
+ fail();
+ } catch (RuntimeException e) {
+ assertEquals(IOException.class, e.getCause().getClass());
+ }
+ }
+
+ @Test
+ public void testPropogateErrorInTheMiddleOfMultibyte() {
+ Observable src = Observable.from(new byte[] { (byte) 0xc2 });
+ Observable err = Observable.error(new IOException());
+ CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder();
+ try {
+ StringObservable.decode(Observable.concat(src, err), charsetDecoder).toList().toBlockingObservable().single();
+ fail();
+ } catch (RuntimeException e) {
+ assertEquals(MalformedInputException.class, e.getCause().getClass());
+ }
+ }
+
+ @Test
+ public void testEncode() {
+ assertArrayEquals(
+ new byte[] { (byte) 0xc2, (byte) 0xa1 },
+ StringObservable.encode(Observable.just("\u00A1"), "UTF-8").toBlockingObservable().single());
+ }
+
+ @Test
+ public void testSplitOnCollon() {
+ testSplit("boo:and:foo", ":", 0, "boo", "and", "foo");
+ }
+ @Test
+ public void testSplitOnOh() {
+ testSplit("boo:and:foo", "o", 0, "b", "", ":and:f");
+ }
+
+ public void testSplit(String str, String regex, int limit, String... parts) {
+ testSplit(str, regex, 0, Observable.from(str), parts);
+ for (int i = 0; i < str.length(); i++) {
+ String a = str.substring(0, i);
+ String b = str.substring(i, str.length());
+ testSplit(a+"|"+b, regex, limit, Observable.from(a, b), parts);
+ }
+ }
+
+ public void testSplit(String message, String regex, int limit, Observable src, String... parts) {
+ Observable act = StringObservable.split(src, regex);
+ Observable exp = Observable.from(parts);
+ AssertObservable.assertObservableEqualsBlocking("when input is "+message+" and limit = "+ limit, exp, act);
+ }
+}
diff --git a/rxjava-core/src/test/java/rx/util/AssertObservable.java b/rxjava-core/src/test/java/rx/util/AssertObservable.java
new file mode 100644
index 0000000000..1bd34fcd23
--- /dev/null
+++ b/rxjava-core/src/test/java/rx/util/AssertObservable.java
@@ -0,0 +1,139 @@
+package rx.util;
+
+import rx.Notification;
+import rx.Observable;
+import rx.util.functions.Func1;
+import rx.util.functions.Func2;
+
+public class AssertObservable {
+ /**
+ * Asserts that two Observables are equal. If they are not, an {@link AssertionError} is thrown
+ * with the given message. If expecteds
and actuals
are
+ * null
, they are considered equal.
+ *
+ * @param expected
+ * Observable with expected values.
+ * @param actual
+ * Observable with actual values
+ */
+ public static void assertObservableEqualsBlocking(Observable expected, Observable actual) {
+ assertObservableEqualsBlocking(null, expected, actual);
+ }
+
+ /**
+ * Asserts that two Observables are equal. If they are not, an {@link AssertionError} is thrown
+ * with the given message. If expected
and actual
are
+ * null
, they are considered equal.
+ *
+ * @param message
+ * the identifying message for the {@link AssertionError} (null
okay)
+ * @param expected
+ * Observable with expected values.
+ * @param actual
+ * Observable with actual values
+ */
+ public static void assertObservableEqualsBlocking(String message, Observable expected, Observable actual) {
+ assertObservableEquals(expected, actual).toBlockingObservable().lastOrDefault(null);
+ }
+
+ /**
+ * Asserts that two {@link Observable}s are equal and returns an empty {@link Observable}. If
+ * they are not, an {@link Observable} is returned that calls onError with an
+ * {@link AssertionError} when subscribed to. If expected
and actual
+ * are null
, they are considered equal.
+ *
+ * @param message
+ * the identifying message for the {@link AssertionError} (null
okay)
+ * @param expected
+ * Observable with expected values.
+ * @param actual
+ * Observable with actual values
+ */
+ public static Observable assertObservableEquals(Observable expected, Observable actual) {
+ return assertObservableEquals(null, expected, actual);
+ }
+
+ /**
+ * Asserts that two {@link Observable}s are equal and returns an empty {@link Observable}. If
+ * they are not, an {@link Observable} is returned that calls onError with an
+ * {@link AssertionError} when subscribed to with the given message. If expected
+ * and actual
are null
, they are considered equal.
+ *
+ * @param message
+ * the identifying message for the {@link AssertionError} (null
okay)
+ * @param expected
+ * Observable with expected values.
+ * @param actual
+ * Observable with actual values
+ */
+ public static Observable assertObservableEquals(final String message, Observable expected, Observable actual) {
+ if (actual == null && expected != null) {
+ return Observable.error(new AssertionError((message != null ? message + ": " : "") + "Actual was null and expected was not"));
+ }
+ if (actual != null && expected == null) {
+ return Observable.error(new AssertionError((message != null ? message + ": " : "") + "Expected was null and actual was not"));
+ }
+ if (actual == null && expected == null) {
+ return Observable.empty();
+ }
+
+ Func2 super Notification, ? super Notification, Notification> zipFunction = new Func2, Notification, Notification>() {
+ @Override
+ public Notification call(Notification expectedNotfication, Notification actualNotification) {
+ if (expectedNotfication.equals(actualNotification)) {
+ StringBuilder message = new StringBuilder();
+ message.append(expectedNotfication.getKind());
+ if (expectedNotfication.hasValue())
+ message.append(" ").append(expectedNotfication.getValue());
+ if (expectedNotfication.hasThrowable())
+ message.append(" ").append(expectedNotfication.getThrowable());
+ return new Notification("equals " + message.toString());
+ }
+ else {
+ StringBuilder error = new StringBuilder();
+ error.append("expected:<").append(expectedNotfication.getKind());
+ if (expectedNotfication.hasValue())
+ error.append(" ").append(expectedNotfication.getValue());
+ if (expectedNotfication.hasThrowable())
+ error.append(" ").append(expectedNotfication.getThrowable());
+ error.append("> but was:<").append(actualNotification.getKind());
+ if (actualNotification.hasValue())
+ error.append(" ").append(actualNotification.getValue());
+ if (actualNotification.hasThrowable())
+ error.append(" ").append(actualNotification.getThrowable());
+ error.append(">");
+
+ return new Notification(new AssertionError(error.toString()));
+ }
+ }
+ };
+
+ Func2, Notification, Notification> accumulator = new Func2, Notification, Notification>() {
+ @Override
+ public Notification call(Notification a, Notification b) {
+ String message = a.isOnError() ? a.getThrowable().getMessage() : a.getValue();
+ boolean fail = a.isOnError();
+
+ message += "\n\t" + (b.isOnError() ? b.getThrowable().getMessage() : b.getValue());
+ fail |= b.isOnError();
+
+ if (fail)
+ return new Notification(new AssertionError(message));
+ else
+ return new Notification(message);
+ }
+ };
+
+ Observable outcomeObservable = Observable.zip(expected.materialize(), actual.materialize(), zipFunction).aggregate(accumulator).map(new Func1, Notification>() {
+ @Override
+ public Notification call(Notification outcome) {
+ if (outcome.isOnError()) {
+ String fullMessage = (message != null ? message + ": " : "") + "Observables are different\n\t" + outcome.getThrowable().getMessage();
+ return new Notification(new AssertionError(fullMessage));
+ }
+ return new Notification();
+ }
+ }).dematerialize();
+ return outcomeObservable;
+ }
+}
diff --git a/rxjava-core/src/test/java/rx/util/AssertObservableTest.java b/rxjava-core/src/test/java/rx/util/AssertObservableTest.java
new file mode 100644
index 0000000000..f2182bd8cf
--- /dev/null
+++ b/rxjava-core/src/test/java/rx/util/AssertObservableTest.java
@@ -0,0 +1,28 @@
+package rx.util;
+
+import org.junit.Test;
+
+import rx.Observable;
+
+public class AssertObservableTest {
+
+ @Test
+ public void testPassNotNull() {
+ AssertObservable.assertObservableEqualsBlocking("foo", Observable.from(1, 2), Observable.from(1, 2));
+ }
+
+ @Test
+ public void testPassNull() {
+ AssertObservable.assertObservableEqualsBlocking("foo", null, null);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testFailNotNull() {
+ AssertObservable.assertObservableEqualsBlocking("foo", Observable.from(1, 2), Observable.from(1));
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testFailNull() {
+ AssertObservable.assertObservableEqualsBlocking("foo", Observable.from(1, 2), null);
+ }
+}
diff --git a/settings.gradle b/settings.gradle
index 01cf356da0..176e9150c5 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -7,4 +7,5 @@ include 'rxjava-core', \
'language-adaptors:rxjava-kotlin', \
'rxjava-contrib:rxjava-swing', \
'rxjava-contrib:rxjava-android', \
-'rxjava-contrib:rxjava-apache-http'
+'rxjava-contrib:rxjava-apache-http', \
+'rxjava-contrib:rxjava-string'