Skip to content

Commit

Permalink
Merge pull request #1615 from abersnaze/migrate-rx-string
Browse files Browse the repository at this point in the history
fix the usage of deprecated methods.
  • Loading branch information
benjchristensen committed Aug 19, 2014
2 parents d03e7d2 + 434f210 commit 2277236
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
Expand All @@ -38,7 +38,6 @@
import java.nio.charset.CodingErrorAction;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;

public class StringObservable {
Expand All @@ -56,34 +55,9 @@ public static Observable<byte[]> from(final InputStream i) {
return from(i, 8 * 1024);
}

private static class CloseableResource<S extends Closeable> implements Subscription {
private final AtomicBoolean unsubscribed = new AtomicBoolean();
private S closable;

public CloseableResource(S closeable) {
this.closable = closeable;
}

@Override
public void unsubscribe() {
if (unsubscribed.compareAndSet(false, true)) {
try {
closable.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

@Override
public boolean isUnsubscribed() {
return unsubscribed.get();
}
}

/**
* Func0 that allows throwing an {@link IOException}s commonly thrown during IO operations.
* @see StringObservable#from(UnsafeFunc0, UnsafeFunc1)
* @see StringObservable#using(UnsafeFunc0, Func1)
*
* @param <R>
*/
Expand All @@ -103,22 +77,27 @@ public static interface UnsafeFunc0<R> extends Callable<R> {
* @param observableFactory
* Converts the {@link Closeable} resource into a {@link Observable} with {@link #from(InputStream)} or {@link #from(Reader)}
* @return
* An {@link Observable} that automatically closes the resource when done.
*/
public static <R, S extends Closeable> Observable<R> using(final UnsafeFunc0<S> resourceFactory,
final Func1<S, Observable<R>> observableFactory) {
return Observable.using(new Func0<CloseableResource<S>>() {
return Observable.using(new Func0<S>() {
@Override
public CloseableResource<S> call() {
public S call() {
try {
return new CloseableResource<S>(resourceFactory.call());
return resourceFactory.call();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}, new Func1<CloseableResource<S>, Observable<R>>() {
}, observableFactory, new Action1<S>() {
@Override
public Observable<R> call(CloseableResource<S> t1) {
return observableFactory.call(t1.closable);
public void call(S resource) {
try {
resource.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
Expand Down Expand Up @@ -403,7 +382,7 @@ public StringBuilder call(StringBuilder a, String b) {
/**
* Maps {@link Observable}&lt;{@link Object}&gt; to {@link Observable}&lt;{@link String}&gt; by using {@link String#valueOf(Object)}
* @param src
* @return
* @return An {@link Observable} of only {@link String}s.
*/
public static Observable<String> toString(Observable<?> src) {
return src.map(new Func1<Object, String>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,51 +62,51 @@ public class StringObservableTest {

@Test
public void testMultibyteSpanningTwoBuffers() {
Observable<byte[]> src = Observable.from(new byte[] { (byte) 0xc2 }, new byte[] { (byte) 0xa1 });
String out = StringObservable.decode(src, "UTF-8").toBlockingObservable().single();
Observable<byte[]> src = Observable.just(new byte[] { (byte) 0xc2 }, new byte[] { (byte) 0xa1 });
String out = StringObservable.decode(src, "UTF-8").toBlocking().single();

assertEquals("\u00A1", out);
}

@Test
public void testMalformedAtTheEndReplace() {
Observable<byte[]> src = Observable.from(new byte[] { (byte) 0xc2 });
String out = decode(src, "UTF-8").toBlockingObservable().single();
Observable<byte[]> src = Observable.just(new byte[] { (byte) 0xc2 });
String out = decode(src, "UTF-8").toBlocking().single();

// REPLACEMENT CHARACTER
assertEquals("\uFFFD", out);
}

@Test
public void testMalformedInTheMiddleReplace() {
Observable<byte[]> src = Observable.from(new byte[] { (byte) 0xc2, 65 });
String out = decode(src, "UTF-8").toBlockingObservable().single();
Observable<byte[]> src = Observable.just(new byte[] { (byte) 0xc2, 65 });
String out = decode(src, "UTF-8").toBlocking().single();

// REPLACEMENT CHARACTER
assertEquals("\uFFFDA", out);
}

@Test(expected = RuntimeException.class)
public void testMalformedAtTheEndReport() {
Observable<byte[]> src = Observable.from(new byte[] { (byte) 0xc2 });
Observable<byte[]> src = Observable.just(new byte[] { (byte) 0xc2 });
CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder();
decode(src, charsetDecoder).toBlockingObservable().single();
decode(src, charsetDecoder).toBlocking().single();
}

@Test(expected = RuntimeException.class)
public void testMalformedInTheMiddleReport() {
Observable<byte[]> src = Observable.from(new byte[] { (byte) 0xc2, 65 });
Observable<byte[]> src = Observable.just(new byte[] { (byte) 0xc2, 65 });
CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder();
decode(src, charsetDecoder).toBlockingObservable().single();
decode(src, charsetDecoder).toBlocking().single();
}

@Test
public void testPropogateError() {
Observable<byte[]> src = Observable.from(new byte[] { 65 });
Observable<byte[]> src = Observable.just(new byte[] { 65 });
Observable<byte[]> err = Observable.error(new IOException());
CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder();
try {
decode(Observable.concat(src, err), charsetDecoder).toList().toBlockingObservable().single();
decode(Observable.concat(src, err), charsetDecoder).toList().toBlocking().single();
fail();
} catch (RuntimeException e) {
assertEquals(IOException.class, e.getCause().getClass());
Expand All @@ -115,11 +115,11 @@ public void testPropogateError() {

@Test
public void testPropogateErrorInTheMiddleOfMultibyte() {
Observable<byte[]> src = Observable.from(new byte[] { (byte) 0xc2 });
Observable<byte[]> src = Observable.just(new byte[] { (byte) 0xc2 });
Observable<byte[]> err = Observable.error(new IOException());
CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder();
try {
decode(Observable.concat(src, err), charsetDecoder).toList().toBlockingObservable().single();
decode(Observable.concat(src, err), charsetDecoder).toList().toBlocking().single();
fail();
} catch (RuntimeException e) {
assertEquals(MalformedInputException.class, e.getCause().getClass());
Expand All @@ -130,7 +130,7 @@ public void testPropogateErrorInTheMiddleOfMultibyte() {
public void testEncode() {
assertArrayEquals(
new byte[] { (byte) 0xc2, (byte) 0xa1 }, encode(Observable.just("\u00A1"), "UTF-8")
.toBlockingObservable().single());
.toBlocking().single());
}

@Test
Expand All @@ -144,11 +144,11 @@ public void testSplitOnOh() {
}

public void testSplit(String str, String regex, int limit, String... parts) {
testSplit(str, regex, 0, Observable.from(str), parts);
testSplit(str, regex, 0, Observable.just(str), parts);
for (int i = 0; i < str.length(); i++) {
String a = str.substring(0, i);
String b = str.substring(i, str.length());
testSplit(a + "|" + b, regex, limit, Observable.from(a, b), parts);
testSplit(a + "|" + b, regex, limit, Observable.just(a, b), parts);
}
}

Expand Down Expand Up @@ -176,7 +176,7 @@ public void testJoinMixed() {

@Test
public void testJoinWithEmptyString() {
Observable<String> source = Observable.from("", "b", "c");
Observable<String> source = Observable.just("", "b", "c");

Observable<String> result = join(source, ", ");

Expand All @@ -192,7 +192,7 @@ public void testJoinWithEmptyString() {

@Test
public void testJoinWithNull() {
Observable<String> source = Observable.from("a", null, "c");
Observable<String> source = Observable.just("a", null, "c");

Observable<String> result = join(source, ", ");

Expand All @@ -208,7 +208,7 @@ public void testJoinWithNull() {

@Test
public void testJoinSingle() {
Observable<String> source = Observable.from("a");
Observable<String> source = Observable.just("a");

Observable<String> result = join(source, ", ");

Expand Down Expand Up @@ -258,7 +258,7 @@ public void testJoinThrows() {
@Test
public void testFromInputStream() {
final byte[] inBytes = "test".getBytes();
final byte[] outBytes = from(new ByteArrayInputStream(inBytes)).toBlockingObservable().single();
final byte[] outBytes = from(new ByteArrayInputStream(inBytes)).toBlocking().single();
assertNotSame(inBytes, outBytes);
assertArrayEquals(inBytes, outBytes);
}
Expand All @@ -275,14 +275,14 @@ public synchronized int read(byte[] b, int off, int len) {
return super.read(b, off, len);
}
};
StringObservable.from(is).first().toBlockingObservable().single();
StringObservable.from(is).first().toBlocking().single();
assertEquals(1, numReads.get());
}

@Test
public void testFromReader() {
final String inStr = "test";
final String outStr = from(new StringReader(inStr)).toBlockingObservable().single();
final String outStr = from(new StringReader(inStr)).toBlocking().single();
assertNotSame(inStr, outStr);
assertEquals(inStr, outStr);
}
Expand All @@ -292,7 +292,7 @@ public void testByLine() {
String newLine = System.getProperty("line.separator");

List<Line> lines = byLine(Observable.from(Arrays.asList("qwer", newLine + "asdf" + newLine, "zx", "cv")))
.toList().toBlockingObservable().single();
.toList().toBlocking().single();

assertEquals(Arrays.asList(new Line(0, "qwer"), new Line(1, "asdf"), new Line(2, "zxcv")), lines);
}
Expand Down

0 comments on commit 2277236

Please sign in to comment.