Or an adventure on reading data reactively from Cassandra.
Overview
Let’s first try to define what reactive means from programming point of view.
Functional reactive programming is programming paradigm for reactive programming using the building blocks of functional programming.
Functional programming is a programming paradigm, a style of building the structure and the elements of computer programs, that treats computation, as the evaluation of mathematical functions thats avoids state and mutable data. Functional programming emphasises functions that produce results that depend only on their inputs and not on program state.
How can we do functional programming in Java? Java is Object Oriented Programming language where mutable state is present everywhere.
Any java developer around the world have used any of the interfaces: java.lang.Runnable, java.util.Comparator, java.util.concurrent.Callable or java.awt.event.ActionListener. All of this interfaces are having only single method declared. These interfaces are known as Single Abstract Methods or SAM. A popular way as these are used is by creating Anonymous inner classes.
public class RunnableTest {
public static void main(Sting[] args){
new Thread(new Runnable(){
@Override
public void run(){
System.out.println("A new thread is running ...");
}
}).start();
}
}
Functional Programming in Java is hard since function is not included in the language specification. It will become simpler in Java 8 with introduction of “lambda’s”. But how can we do functional programming in Java?
Let’s see a simple example.
@FunctionalInterface
public interface Worker {
public void doWork();
}
public class FunctionalWorker {
public static void main(String[] args){
// anonymous inner class way
execute( new Worker(){
@Override
public void doWork() {
System.out.println ("working ...");
}
});
// lambda's way
execute(() -> System.out.println("working in lambda's way ..."));
}
public static void execute(Worker worker){
worker.doWork();
}
}
Reactive programming is a programming paradigm oriented around data flows and the propagation of changes. For example, in imperative programming setting, a := b+c, would mean that a is being assigned the result of b +c in the instant the expression is evaluated. Later values of b or c can be changed without effect on a. In reactive programming, the value of a will be automatically updated based on the new values.
So, we should have a good understanding of what Functional Reactive Programming is, so let’s go and build a prototype…
Reading data reactively from Cassandra
Cassandra is one of the NoSql storage out there is quite popular.
Let’s imagine that we have to build an Avatar service. This service will store avatars meta information and the content directly in cassandra.
The java driver that we are using is providing us support to query cassandra asynchronous, through the executeAsync() method. The invocation of this method will return a Future. As we all know java Futures are block-able and could not be composed.
Ok, so we have async support but we still need a way to be able to read it reactively…
Netflix built and later open sourced the RxJava library that is providing Functional Reactive Programming for Java (plus other JVM languages).
- The ability for the producer to signal to the consumer that there is no more data available.
- The ability for the producer to signal to the consumer that an error has occurred.
package net.devsprint.reactive.cassandra;
import java.util.concurrent.ExecutorService;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
/**
* Wraps an async execution of Datastax Java driver into an observable.
*
*/
public class ObservableCassandra {
public static Observable executeAsync(final Session execution,
final String query, final ExecutorService executorService) {
return Observable.create(new Observable.OnSubscribeFunc() {
@Override
public Subscription onSubscribe(final Observer super ResultSet> observer) {
try {
Futures.addCallback(execution.executeAsync(query),
new FutureCallback() {
@Override
public void onSuccess(ResultSet result) {
observer.onNext(result);
observer.onCompleted();
}
@Override
public void onFailure(Throwable t) {
observer.onError(t);
}
}, executorService);
} catch (Throwable e) {
// If any Throwable can be thrown from
// executeAsync
observer.onError(e);
}
return Subscriptions.empty();
}
});
}
}
A simple service could be implemented as following:
package net.devsprint.reactive.cassandra;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import rx.Observable;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
public class AvatarService {
private static final String QUERY = "select * avatars";
private static final ExecutorService executorService = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private final Session session;
public AvatarService(Session session) {
this.session = session;
}
Observable getAvatars() {
return ObservableCassandra
.executeAsync(session, QUERY, executorService);
}
}
Assuming that the query is heavy, we are providing a separate execution context where the callback will be executed.
With these two classes we have an Avatar service that could be started but it will not do any thing. It will start fetching the data from Cassandra only when there will be at least one subscriber.
A complete example could be found at Reactive Cassandra.
Meta: this post is part of the Java Advent Calendar and is licensed under the Creative Commons 3.0 Attribution license. If you like it, please spread the word by sharing, tweeting, FB, G+ and so on!