পরিচিতি

আর এক্স বা রিএক্টিভ এক্স (Rx)  বর্তমান সময়ের সবচেয়ে আলোচিত API বা লাইব্রেরির এর মাঝে একটি। অবজারভার ,ইটারেটর প্যাটার্ন এবং ফাংশনাল প্রোগ্রামিং এর বৈশিষ্ট্য গুলো নিয়েই আর Rx উৎপত্তি। রিএক্ট মানে হল কোন নির্দিষ্ট ফাংশনালিটির এর উপর ভিত্তি করে রেসপন্স অথবা অউটপুট প্রদান করা ঠিক তেমনি রিএক্টিভ প্রোগ্রামিংও যখন কোন ডাটা পায় তখন ওই ডাটার উপর কাজ করে। নেটওয়ার্ক থেকে আমরা সবাই আসিংক্রনাস টাস্ক এর মাধ্যমে ডাটা পড়ে থাকি আর  রিএক্টিভ এই আসিংক্রনাস ডাটা স্ট্রিম এর উপর কাজ করে।Rx এর বিভিন্ন ইমপ্লিমেন্টেশন এর মাঝে আর এক্স জাভা (Rx java) একটি।

Rx এর প্রয়োজন

উদাহরন দিয়েই Rx এর প্রয়োজন বুঝানো যাক। ধরে নিলাম আমার দুইটা পোস্ট রিকুয়েস্ট পাঠাতে হবে যথাক্রমে “ক” এবং “খ”। “ক” পোস্ট রিকুয়েস্ট একটি করে ইমেজ আপ্লোড করে এবং সার্ভার ইমেজ এর পাথ রিটার্ন করে।“খ” পোস্ট রিকুয়েস্টটি “ক” এর রেস্পন্স নিয়ে আরেকটি পোস্ট রিকুয়েস্ট পাঠায় ডাটা সেভ করার জন্য। যদি এক এর অধিক কেউ ইমেজ আপ্লোড করে তাহলে “ক” এর সব কয়টি রিকুয়েস্ট এর রেস্পন্স পাওয়ার পর “খ” এর রিকুয়েস্ট পাঠানো লাগবে। এখন এই কাজটি যদি আমি আ্যসিঙ্ক টাস্ক দিয়ে করি তবে আমার মাল্টিপল আ্যসিঙ্ক টাস্ক ব্যবহার করা লাগবে এবং সেই সাথে এটাও খেয়াল রাখা লাগবে কখন সবগুলো ইমেজ আপ্লোড হয়েছে কারন এর পর ডাটা সেভ করা লাগবে কিন্তু এই কাজটি আমরা Rx Java র অভজারভেবলস, অভসারভার দিয়ে খুব সহজেই করে ফেলতে পারি। অভজারভেবলস, অভসারভার আবার কি? যদি এই প্রশ্নটাই আপনার মাথাই আসে তবে ধৈর্য ধরে আর একটু আমার সাথে থাকেন কারন আজ বেসিক Rx Java নিয়েই কিছু কথা বলব।

Dependency: কাজের সুবিধার্থে Rx java র gradle এবং maven এর dependency নিচে দেয়া হল। প্রয়োজনানুসারে প্রোজেক্ট এ add করে নিবেন।

Gradle: compile ‘io.reactivex:rxjava: 2.1.10’

Maven:

io.reactivex.rxjava2
rxjava
2.1.10

Observables – Observers:

অবজারভেবলস(Observable) হল ডাটার উৎপত্তিস্থল আর অবসারভার(Observer) হল একটা কম্পোনেন্ট বা এন্টিটি যা অবজারভেবলের ক্রিয়ার উপর প্রতিক্রিয়া করে।  ।যখনি কোন অবজারভার লিসেন করা শুরু করে তখনি অবজারভেবল শূন্য থেকে যেকোনো নাম্বার এর আইটেম (ডাটা ) প্রদান করে।তাহলে অবজারভার হল সেই এন্টিটি, যে কোন অবজারভেবলকে অবজারভ করে। একটি অবজারভেবলের একের অধিক অবজারভার থাকতে পারে, যখন অবজারভেবল কোন নতুন ডাটা পায় তখন অবজারভেবল onNext() মেথডের মাধ্যমে সকল রেজিস্টারড অবজারভারকে জানিয়ে দেয়। ঠিক একিভাবে সাকসেস বা এরর onComplete() এবং onError() মেথডের মাধ্যমে জানিয়ে দেয়। নিচের ছবিটি অবজারভেবল এবং অবজারভার এর রিলেশন প্রকাশ করে।

এই ছবিটি থেকে পরিষ্কার বুঝা যাচ্ছে যে অবজারভেবল এর ডাটা অবজারভার পড়ছে এবং যদি প্রয়োজন হয় তবে অবজারভার পড়ার আগেই পরিবর্তন বা পরিবর্ধন করে নেয়া হচ্ছে।

উদাহরণঃ

Observer observer;
List list = Arrays.asList(“JUG”, “BD”);

observer = new Observer() {
@Override
public void onSubscribe(@NonNull Disposable disposable) {
}

@Override
public void onNext(String s) {
System.out.println(s);
}

@Override
public void onError(Throwable throwable) {
}

@Override
public void onComplete() {
}
};

Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter observableEmitter) throws Exception {
for (String str : list) {
observer.onNext(str);
}

observer.onComplete();
}
});

observable.subscribe(observer);

Output:
JUG
BD

এই উধাহরণ খেয়াল করলে দেখব একটি স্ট্রিং লিস্ট,অবজারভেবল এবং অবজারভার ইনিশিয়ালাইজ করা হয়েছে। অবজারভেবল create() মেথড ইনপুট হিসেবে ObservableOnSubscribe এর ইমপ্লিমেন্টেশন নেয় এবং subscribe মেথড এর মাধ্যমে সকল অবজারভারকে একটি করে স্ট্রিং ইমিট করে ও  অবজারভার সেই স্ট্রিং কনসোলের অউটপুট হিসেবে দেখায়। “observable.subscribe(observer)” এই লাইনটি খুব ইম্পরট্যান্ট কারন এই লাইন এক্সিকিউট না হওয়া পর্যন্ত অবজারভেবল স্ট্রিং ইমিট করবে না।

নিচের উধাহরন হল উপরের উদাহরণের শর্ট ফর্ম ঃ

List list = Arrays.asList(“JUG”,”BD”);
Observable.fromIterable(list).subscribe(System.out::println);

Output:
JUG
BD

Rx Java তে বিভিন্ন ধরনের অবজারভেবল রয়েছেঃ Observable,Flowable,Single,MaybeCompletable

ঠিক একি ভাবে বিভিন্ন ধরনের অবজারভার আছেঃ Observer,SingleObserver,MaybeObserver,CompletableObserve

Subscriber:

সাবস্ক্রাইবার হল মূলত অবজারভার এর ইমপ্লেমেন্টেশন। সাবস্ক্রাইবারও অবজারভেবল অবজারভ করে এবং অবজারভেবলের ইমিটেড ডাটার উপর কাজ করে।কিন্তু সাবস্ক্রাইবার অবজারভেবলকে আনসাবস্ক্রাইব করতে পারে। যদি কখন এমন রিকুয়ারমেন্ট আসে যে যখন কোন সাবস্ক্রাইবার আনসাবস্ক্রাইব করলে কোন ইভেন্ট ফায়ার করা লাগবে তখন সাবস্ক্রিপশন এর এই ফিচার খুব কাজে আসবে। তছাড়া সাবস্ক্রাইবার “BackPressure” সাপোর্ট করে । যখন কোন টাস্ক পাইপলাইনে থাকে এবং আসিংক্রনাস স্টেজ তা ঠিক ভাবে প্রসেস করতে পারে না তখন সাবস্ক্রাইবার “BackPressure”  এর মাধ্যমে আপস্ট্রীম প্রডিউসারকে ধীর গতিতে ডাটা পাঠাতে বলে।

Operators: 
       অপারেটর হল ফাংশন বা মেথড যা ভ্যালু পরিবর্তন বা পরিশোধন এর মাধ্যমে ডাটার কারেকশন এবং এপি আই কলের সমস্যা সমাধান করে ।  “create ()” নিজেও একটি অপারেটর যা অবজারভেবল এর উপরে কাজ করে অবজারভেবল তৈরি করে। এই লিঙ্ক এ মুটামুটি সব অপেরাটর নিয়েই কথা বলা হয়েছে। নিচে map অপারেটর pictorial view এবং উধাহরণ দেয়া হলঃ

List words = Arrays.asList(“Jug_”,”Bd_”,”Festival_”);

Observable.fromIterable(words)
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s.replaceAll(“_”,””);
}
}).subscribe(System.out::println);
[/av_codeblock]

Output:
Jug
Bd
Festival

এই উধাহরণে স্ট্রিং ইনপুট হিসেবে নেয়া হয়েছে এবং map অপারেটর ইউজ করে নির্দিষ্ট character রিপ্লেস করে কনসোলে প্রিন্ট করা হয়েছে।

Rx Java র আরো গুরুত্বপূর্ণ ফিচার গুলোর মাঝে আছে Scheduler, Backpressure, Connectable Observable,different operators ইত্যাদি।

আজকের মত এই পর্যন্তই। ভবিষ্যতে নতুন কোন বিষয় নিয়ে কথা হবে। Happy Coding!!!

References:
http://reactivex.io/

https://github.com/ReactiveX/RxJava/wiki

http://www.vogella.com/tutorials/RxJava/article.html

https://www.infoq.com/articles/rxjava-by-example