Skip to content

Commit ae0cdf3

Browse files
Merge pull request #1 from ramakrishnajoshi/tasks/task_concurrent_api_calls
Tasks/task concurrent api calls
2 parents 36a3b81 + c5f4170 commit ae0cdf3

12 files changed

+429
-63
lines changed

app/build.gradle

+1-4
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,4 @@ dependencies {
5050
implementation 'com.squareup.okhttp3:logging-interceptor:3.6.0'
5151
//An Adapter for adapting RxJava 2.x types.
5252
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.2.0'
53-
54-
55-
56-
}
53+
}

app/src/main/AndroidManifest.xml

+9-14
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,15 @@
1515
tools:ignore="GoogleAppIndexingWarning"
1616
android:networkSecurityConfig="@xml/network_security_config">
1717

18-
<activity android:name=".retrofit_api_calls.RetrofitAPICallActivity">
19-
<!--<intent-filter>
20-
<action android:name="android.intent.action.MAIN" />
21-
<category android:name="android.intent.category.LAUNCHER" />
22-
</intent-filter>-->
23-
</activity>
24-
25-
<activity android:name=".MainActivity">
26-
<!--<intent-filter>-->
27-
<!--<action android:name="android.intent.action.MAIN" />-->
28-
<!--<category android:name="android.intent.category.LAUNCHER" />-->
29-
<!--</intent-filter>-->
30-
</activity>
31-
<activity android:name=".rx_api_calls.RxAPICallActivity" >
18+
<activity android:name=".retrofit_api_calls.RetrofitAPICallActivity"/>
19+
<activity android:name=".MainActivity"/>
20+
<activity android:name=".rx_api_calls.RxAPICallActivity"/>
21+
<!--<activity android:name="._operators.Rx_b_BufferAndDebounce" />-->
22+
<activity android:name="._operators.Rx_c_MergeOperator" />
23+
<activity android:name="._operators.Rx_d_ConcatOperator"/>
24+
<activity android:name=".case_studies.Rx_NonUIThreadCanAccessViews" />
25+
<activity android:name="._operators.Rx_a_CreateOperator" />
26+
<activity android:name="._operators.Rx_b_BufferAndDebounce" >
3227
<intent-filter>
3328
<action android:name="android.intent.action.MAIN" />
3429
<category android:name="android.intent.category.LAUNCHER" />
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.example.reactiveprogrammingusingrxjava2;
2+
3+
import android.support.v7.app.AppCompatActivity;
4+
import android.util.Log;
5+
import io.reactivex.Observable;
6+
import io.reactivex.Observer;
7+
8+
public class BaseActivity extends AppCompatActivity implements RxBasics{
9+
public void logMessage(String message){
10+
Log.e(this.getPackageName(), message);
11+
}
12+
13+
@Override
14+
public Observable getObservable() {
15+
return null;
16+
}
17+
18+
@Override
19+
public Observer getObserver() {
20+
return null;
21+
}
22+
}
23+
24+
interface RxBasics<T>{
25+
Observable<T> getObservable();
26+
Observer<T> getObserver();
27+
}
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.example.reactiveprogrammingusingrxjava2;
22

3+
import android.content.Context;
34
import android.support.v7.app.AppCompatActivity;
45
import android.os.Bundle;
6+
import android.util.Log;
57
import android.widget.Toast;
68

79
import java.util.concurrent.TimeUnit;
@@ -27,78 +29,79 @@ protected void onCreate(Bundle savedInstanceState) {
2729
@Override
2830
protected void onResume() {
2931
super.onResume();
30-
System.out.println("MainActivity.onResume threadId : " + Thread.currentThread().getId());
32+
logMessage("MainActivity.onResume threadId : " + Thread.currentThread().getId());
3133
justOperatorWorking();
3234
}
3335

3436
private void justOperatorWorking() {
37+
//Emit 0,1,2,3... after 2 seconds from calling this function and with a gap of 3s in-between emissions.
3538
Observable<Long> observable = Observable.interval(2, 3, TimeUnit.SECONDS);
3639

3740
Observer observer = new Observer() {
3841
@Override
3942
public void onSubscribe(Disposable d) {
40-
System.out.println("MainActivity.onSubscribe");
43+
logMessage("MainActivity.onSubscribe");
4144
}
4245

4346
@Override
4447
public void onNext(Object o) {
45-
System.out.println("onNext threadId : " + Thread.currentThread().getId());
46-
System.out.println("o = [" + o.toString() + "]");
48+
logMessage("onNext threadId : " + Thread.currentThread().getId() + " item:" + o.toString());
4749
Toast.makeText(MainActivity.this, " onNext" + o.toString() , Toast.LENGTH_SHORT).show();
4850
}
4951

5052
@Override
5153
public void onError(Throwable e) {
52-
System.out.println("e = [" + e + "]");
54+
logMessage("onError = [" + e + "]");
55+
Toast.makeText(MainActivity.this, " onError" + e.getLocalizedMessage() , Toast.LENGTH_LONG).show();
5356
}
5457

5558
@Override
5659
public void onComplete() {
57-
System.out.println("MainActivity.onComplete");
60+
logMessage("MainActivity.onComplete");
5861
}
5962
};
6063

6164
observable
62-
.subscribeOn(Schedulers.io())//whatever is executed downstream would be run on thread where observable is run (non-ui thread)
63-
//.map(x -> x+1) //using lambda
64-
.map(new Function<Long, Object>() {
65+
.subscribeOn(Schedulers.io()) //from here code is run on a new io thread(non-ui thread).
66+
.map(new Function<Long, Long>() {
6567
@Override
66-
public Object apply(Long aLong) {
67-
System.out.println("map operator .apply ThreadId : " + Thread.currentThread().getId());
68-
return aLong + 1;
69-
}
70-
})
71-
// .map(new TimeConverter<Long, Object>())
72-
.filter(new Predicate<Object>() {
73-
@Override
74-
public boolean test(Object o) throws Exception {
75-
if ((Long)o % 2 == 0)
76-
return true;
77-
else
78-
return false;
79-
80-
}
81-
})
82-
.distinctUntilChanged()
83-
.observeOn(AndroidSchedulers.mainThread())
84-
.map(new Function<Object, Object>() {
85-
@Override
86-
public Object apply(Object o) {
87-
System.out.println("map operator2 .apply ThreadId : " + Thread.currentThread().getId());
88-
return o;
68+
public Long apply(Long aLong) throws Exception {
69+
logMessage(aLong.toString());
70+
return aLong /*+ 1*/;
8971
}
9072
})
73+
//.map(new TimeConverter<Long, Long>())
74+
// .filter(new Predicate<Object>() {
75+
// @Override
76+
// public boolean test(Object o) throws Exception {
77+
// if ((Long)o % 2 == 0)
78+
// return true;
79+
// else
80+
// return false;
81+
// }
82+
// })
83+
// .filter((Predicate<Object>) o -> (Long) o % 2 == 0) //same thing as above but using lambda
84+
// .distinctUntilChanged()
85+
.observeOn(AndroidSchedulers.mainThread()) //from here the control is switched to ui-thread and code is run on ui thread.
86+
.map(x -> x/* * x*/)
87+
.skipLast(5)
9188
.subscribe(observer);
9289
}
9390

94-
class TimeConverter<Long, Object> implements Function<Long, Object> {
91+
/*
92+
* This class and it's function does nothing. IT has been written just to demonstrate various
93+
* ways in which value can be transformed and returned back.
94+
* */
95+
class TimeConverter implements Function<Long, Long> {
9596

9697
@Override
97-
public Object apply(Long aLong) {
98-
System.out.println("map operator .apply ThreadId : " + Thread.currentThread().getId());
99-
return (Object) aLong;
98+
public Long apply(Long aLong) {
99+
logMessage("map operator .apply ThreadId : " + Thread.currentThread().getId());
100+
return aLong;
100101
}
101102
}
102103

103-
104+
void logMessage(String message){
105+
Log.e(this.getPackageName(), message);
106+
}
104107
}

app/src/main/java/com/example/reactiveprogrammingusingrxjava2/RxConcept1Just.java

-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ private static void rangeOperatorWorking() {
2222
Disposable disposable = new Disposable() {
2323
@Override
2424
public void dispose() {
25-
2625
}
2726

2827
@Override
@@ -65,7 +64,6 @@ private static void justOperatorWorking() {
6564
Observer observer = new Observer() {
6665
@Override
6766
public void onSubscribe(Disposable d) {
68-
6967
}
7068

7169
@Override
@@ -75,12 +73,10 @@ public void onNext(Object o) {
7573

7674
@Override
7775
public void onError(Throwable e) {
78-
7976
}
8077

8178
@Override
8279
public void onComplete() {
83-
8480
}
8581
};
8682

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package com.example.reactiveprogrammingusingrxjava2._operators;
2+
3+
import android.os.Bundle;
4+
import android.support.annotation.Nullable;
5+
import android.view.View;
6+
import com.example.reactiveprogrammingusingrxjava2.BaseActivity;
7+
import com.example.reactiveprogrammingusingrxjava2.R;
8+
import io.reactivex.Observable;
9+
import io.reactivex.ObservableEmitter;
10+
import io.reactivex.ObservableOnSubscribe;
11+
import io.reactivex.Observer;
12+
import io.reactivex.android.schedulers.AndroidSchedulers;
13+
import io.reactivex.disposables.Disposable;
14+
import io.reactivex.schedulers.Schedulers;
15+
16+
public class Rx_a_CreateOperator extends BaseActivity {
17+
18+
Observable<Integer> observable;
19+
20+
@Override
21+
protected void onCreate(@Nullable Bundle savedInstanceState) {
22+
super.onCreate(savedInstanceState);
23+
setContentView(R.layout.activity_create_operator);
24+
25+
findViewById(R.id.button).setOnClickListener(new View.OnClickListener() {
26+
@Override
27+
public void onClick(View v) {
28+
//A new observer would be immediately notified of the latest data from the LiveData.
29+
observable
30+
.subscribeOn(Schedulers.computation())
31+
.observeOn(AndroidSchedulers.mainThread())
32+
.subscribe(getObserver());
33+
}
34+
});
35+
36+
getObservable()
37+
.subscribeOn(Schedulers.computation())
38+
.observeOn(AndroidSchedulers.mainThread()/*, true*/) //you can use observeOn(scheduler, true) to ensure onNext isn't skipped if there is a call to onError
39+
.subscribe(getObserver());
40+
}
41+
42+
/**
43+
* Expected Behavior -> Calls to emitter.onNext(1) , emitter.onNext(2), emitter.onError(new Throwable()), emitter.onNext(3), emitter.onComplete() would result
44+
* in the Observable emitting 1, 2, 3, error_message and onComplete. But actual behavior is when onError gets emitted, the remaining items like
45+
* onNext(3) etc are ignored/not emitted.
46+
* More here : https://github.com/ReactiveX/RxJava/issues/2887 and here https://github.com/ReactiveX/RxJava/issues/2887#issuecomment-345299685
47+
*
48+
* Note to future self because I know I'm going to forget this (again): you can use `.observeOn(Scheduler scheduler, boolean delayError)` to
49+
* ensure onNext isn't skipped by onError.
50+
*/
51+
@Override
52+
public Observable<Integer> getObservable() {
53+
observable = Observable.create(new ObservableOnSubscribe<Integer>() {
54+
@Override
55+
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
56+
emitter.onNext(1);
57+
emitter.onNext(2);
58+
emitter.onError(new Throwable()); //you can use .observeOn(scheduler, true) to ensure that onNext isn't skipped if there is a call to onError
59+
emitter.onNext(3);
60+
emitter.onComplete();
61+
62+
// emitter.onError(new Throwable()); would give below error as onComplete has already been called before calling onError
63+
// The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere
64+
// to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.Throwable
65+
}
66+
});
67+
return observable;
68+
}
69+
70+
@Override
71+
public Observer<Integer> getObserver() {
72+
Observer observer = new Observer<Integer>() {
73+
@Override
74+
public void onSubscribe(Disposable d) {
75+
logMessage("onSubscribe");
76+
}
77+
78+
@Override
79+
public void onNext(Integer o) {
80+
logMessage("onNext" + o);
81+
}
82+
83+
@Override
84+
public void onError(Throwable e) {
85+
logMessage("onError");
86+
}
87+
88+
@Override
89+
public void onComplete() {
90+
logMessage("onComplete");
91+
}
92+
};
93+
return observer;
94+
}
95+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.example.reactiveprogrammingusingrxjava2._operators;
2+
3+
import android.os.Bundle;
4+
import android.support.annotation.Nullable;
5+
import com.example.reactiveprogrammingusingrxjava2.BaseActivity;
6+
import java.util.List;
7+
import java.util.concurrent.TimeUnit;
8+
import io.reactivex.Observable;
9+
import io.reactivex.Observer;
10+
import io.reactivex.android.schedulers.AndroidSchedulers;
11+
import io.reactivex.disposables.Disposable;
12+
import io.reactivex.schedulers.Schedulers;
13+
14+
public class Rx_b_BufferAndDebounce extends BaseActivity {
15+
16+
@Override
17+
protected void onCreate(@Nullable Bundle savedInstanceState) {
18+
super.onCreate(savedInstanceState);
19+
20+
/*
21+
* Buffer gathers items emitted by an Observable into batches and emit the batch instead of emitting one item at a time.
22+
* */
23+
getObservable()
24+
.subscribeOn(Schedulers.io())
25+
.buffer(6, TimeUnit.SECONDS)
26+
.observeOn(AndroidSchedulers.mainThread())
27+
.subscribe(getObserver());
28+
}
29+
30+
@Override
31+
public Observable<Integer> getObservable() {
32+
return Observable.range(1, 99);
33+
// return Observable
34+
// .interval(0, 3, TimeUnit.SECONDS);
35+
//.cast(Integer.class); gives error as Long can't be converted/casted to Integer class.
36+
}
37+
38+
@Override
39+
public Observer<List<Integer>> getObserver() {
40+
return new Observer<List<Integer>>() {
41+
@Override
42+
public void onSubscribe(Disposable d) {
43+
logMessage("onSubscribe");
44+
}
45+
46+
@Override
47+
public void onNext(List<Integer> o) {
48+
logMessage(this.getClass().getEnclosingMethod().getName() + o.toString());
49+
}
50+
51+
/*
52+
* Control reaches onError when there is an error.
53+
* Ex: When Observable emits Long Integers and you try to cast to normal Integer class by using cast() operator then
54+
* there is an error as Long can't be converted to Integer
55+
* */
56+
@Override
57+
public void onError(Throwable e) {
58+
logMessage("onError" + e.getLocalizedMessage());
59+
}
60+
61+
@Override
62+
public void onComplete() {
63+
logMessage("onComplete");
64+
}
65+
};
66+
}
67+
}

0 commit comments

Comments
 (0)