Guice Integration in Hazelcast
posted at April 18, 2017 with tags guice, hazelcast, java

For many occassions I find the distributed ExecutorService of Hazelcast (aka. IExecutorService) pretty convenient to turn a set of nodes into a tamed cluster waiting for orders. You just submit an either Runnable or Callable<T> and Hazelcast takes care of the rest – executing the task on remote members, acknowledging the response(s) back, etc. Though note that since the method and its response will be delivered over the wire, it is no surprise that they all need to be Serializable.

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.Member;
import com.hazelcast.core.MultiExecutionCallback;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public enum HzGuiceDemo {;

    public static class ProcessorCountTask implements Serializable, Callable<Integer> {

        @Override
        public Integer call() {
            return Runtime.getRuntime().availableProcessors();
        }

    }

    public static void main(String[] args) throws Throwable {
        HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
        IExecutorService hzExecutorService = hzInstance.getExecutorService("ballpark");
        CompletableFuture<Integer> totalProcessorCountFuture = new CompletableFuture<>();
        hzExecutorService.submitToAllMembers(
                new ProcessorCountTask(),
                new MultiExecutionCallback() {

                    @Override
                    public void onResponse(Member member, Object value) {
                        // Ignored.
                    }

                    @Override
                    public void onComplete(Map<Member, Object> values) {
                        int totalProcessorCount = values
                            .values()
                            .stream()
                            .mapToInt(object -> (int) object)
                            .sum();
                        totalProcessorCountFuture.complete(totalProcessorCount);
                    }

                });
        int totalProcessorCount = totalProcessorCountFuture.get(10, TimeUnit.SECONDS);
        System.out.format("there are %d processors in total%n", totalProcessorCount);
        hzInstance.shutdown();
    }

}

Unfortunately many of our tasks are not isolated from the rest of the application state (i.e., stateless) as ProcessorCountTask given above. Most of the time the functional requirements necessitate access to the remote node state that is available through beans provided by the underlying dependency injection framework. Consider the following stateful PizzaService that is responsible for cooking pizzas to its users.

import javax.inject.Singleton;

import static com.google.common.base.Preconditions.checkArgument;

@Singleton
public static class PizzaService {

    private volatile int totalPizzaCount = 0;

    public synchronized int cook(int amount) {
        checkArgument(amount > 0, "expecting: amount > 0, found: %s", amount);
        availablePizzaCount += amount;
        System.out.format("🍕 cooking %d pizza(s)%n", amount);
    }

}

We further have a task class to remotely command PizzaService to cook:

import java.io.Serializable;

public static class PizzaCookTask implements Serializable, Runnable {

    @Inject
    private PizzaService pizzaService;

    private final int amount;

    public PizzaMakeTask(int amount) {
        this.amount = amount;
    }

    @Override
    public void run() {
        pizzaService.cook(amount);
    }

}

A naive approach to run this task on an IExecutorService would result in the following code:

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public enum HzGuiceDemo {;

    public static void main(String[] args) throws Throwable {
        HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
        IExecutorService hzExecutorService = hzInstance.getExecutorService("ballpark");
        hzExecutorService.executeOnAllMembers(new PizzaCookTask(1));
        hzInstance.shutdown();
    }

}

which fails with a sweet NullPointerException as follows:

Exception in thread "main" java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: java.lang.NullPointerException
  at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
  at com.vlkan.hzguicedemo.HzGuiceDemo.main(HzGuiceDemo.java:??)
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
  at com.hazelcast.executor.DistributedExecutorService$CallableProcessor.run(DistributedExecutorService.java:189)
  at com.hazelcast.util.executor.CachedExecutorServiceDelegate$Worker.run(CachedExecutorServiceDelegate.java:186)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
  at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76)
  at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:92)
Caused by: java.lang.NullPointerException
  at com.vlkan.hzguicedemo.HzGuiceDemo$PizzaCookTask.call(HzGuiceDemo.java:??)
  at com.vlkan.hzguicedemo.HzGuiceDemo$PizzaCookTask.call(HzGuiceDemo.java:??)

What is really happening here is that Hazelcast does not have a magical ball to guess the dependency injection framework you are using to process the @Inject-annotated properties of the PizzaCookTask. Though Hazelcast has something else: ManagedContext. In a nutshell, ManagedContext provides means to intercept class instantiation at deserialization. We can leverage this functionality to come up with a ManagedContext implementation that bakes Guice dependency injection into the Hazelcast class instantiation process.

import com.google.inject.Injector;
import com.hazelcast.core.ManagedContext;

import javax.inject.Inject;
import javax.inject.Singleton;

@Singleton
public class HazelcastGuiceManagedContext implements ManagedContext {

    private final Injector injector;

    @Inject
    public HazelcastGuiceManagedContext(Injector injector) {
        this.injector = injector;
    }

    @Override
    public Object initialize(Object instance) {
        injector.injectMembers(instance);
        return instance;
    }

}

Next all you need to do is to use this ManagedContext while creating your HazelcastInstance:

Injector injector = Guice.createInjector();
HazelcastGuiceManagedContext guiceManagedContext = injector.getInstance(HazelcastGuiceManagedContext.class);
Config hzConfig = new Config();
hzConfig.setManagedContext(guiceManagedContext);
HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(hzConfig);

While I have provided an example for Guice, this method is applicable to any dependency injection framework that provides an equivalent to Injector#injectMembers() of Guice. Needless to say, but Spring folks are already covered by SpringManagedContext shipped with Hazelcast.