# Building Blocks使用
此处使用JGroup 4.x版本,基于JDK 8
Building Blocks是针对JChannel的应用更上层的抽象。一般来说,JChannel作为发送信息(send、receive)的最小单元,而Building Blocks是基于JChannel提供更实用的功能。
下面以最基础的组员间消息分发为例,可以使用发送消息(消息分发)和RPC方式的方法调用两种。RPC方式是基于消息分发进行的扩展。
# 使用消息分发
消息分发使用MessageDispatcher,MessageDispatcher需要绑定JChannel和RequestHandler,前者用于发送消息,后者用于处理接收到的消息。
public class DispatchWithBuildingBlocksTest implements RequestHandler {
    public void startInMessage() throws Exception {
        // 创建两个JChannel,使用默认配置,并命名
        JChannel channelA = new JChannel().name("channelA");
        JChannel channelB = new JChannel().name("channelB");
        MessageDispatcher dispA = new MessageDispatcher(channelA, this);
        MessageDispatcher dispB = new MessageDispatcher(channelB, this);
        // 连接到同一个group中
        channelA.connect("test group");
        channelB.connect("test group");
        for(int i = 0; i < 10; i++){
            Util.sleep(100);
            System.out.println("Casting message #" + i);
            byte[] payload = ("number #" + i).getBytes();
            RspList list = dispA.castMessage(null, payload, 0, payload.length, RequestOptions.SYNC());
            System.out.println("Responses: \n" + list);
        }
        Util.close(dispA, dispB, channelA, channelB);
    }
    @Override
    public Object handle(Message message) throws Exception {
        System.out.println("handle():" + message);
        return "Success!";
    }
}
这里使用RspList list = dispA.castMessage(null, payload, 0, payload.length, RequestOptions.SYNC());命令发送消息,表示发送给所有组员,且使用同步方式。此处同步方式封装了实际的RequestOptions的创建方式,其实内部设置为等待所有请求返回才返回。
默认的实现是使用同步的方式。也可以改用MessageDispatcher.setAsynDispatching(true);来开启异步状态。
# 使用RPC调用
RPC调用使用反射的方式。这种方式更灵活,注册JChannel和需要调用的方法的对象(Rpc对象,但对象类型不需要做额外声明),底层采用反射的方式就可以针对对象进行方法调用。
public class DispatchWithBuildingBlocksTest{
    public void startInRpc() throws Exception{
        RequestOptions opts = new RequestOptions(ResponseMode.GET_ALL, 5000);
        JChannel channelA = new JChannel().name("channelA");
        JChannel channelB = new JChannel().name("channelB");
        MethodCall call = new MethodCall(getClass().getMethod("print", int.class));
        RpcDispatcher dispA = new RpcDispatcher(channelA, this);
        RpcDispatcher dispB = new RpcDispatcher(channelB, this);
        channelA.connect("test group");
        channelB.connect("test group");
        for(int i = 0; i < 10; i++){
            Util.sleep(100);
            call.setArgs(i);
            RspList rspList = dispA.callRemoteMethods(null, call, opts);
            System.out.println("Responses: " + rspList);
        }
        Util.close(dispA, dispB, channelA, channelB);
    }
    public int print(int number) throws Exception{
        return number * 2;
    }
}
这里也可以看到,RequestOptions对象的创建实际上是包含了ResponseMode,这里使用的GET_ALL含义是方法阻塞到获取所有的组员的response后,再返回。
# 用MethodLookUp接口提高反射性能
实际上频繁地调用Rpc的默认接口是有性能损耗的,原因在于每次定义了方法名,JGroups底层都是根据反射方式在注册的Rpc对象中去寻找方法,因此每次都是一次反射调用。JGroups的作者针对这种场景,优化为可以我们直接将反射调用的方法保存下来,每次调用的时候直接按照一个id值去快速找到指定的方法对象,省去了反射寻找的性能消耗。
public interface MethodLookup {
    Method findMethod(short var1);
}
MethodLookUp接口是在RcpDispatcher中含有的一个属性,但官方并没有实现,这其实是交给用户去自定义的接口。这里我自定义一个根据Hashmap来做的简单实现。
public class MethodMap implements MethodLookup {
    private Map<Short, Method> methodMap = new HashMap<>();
    public void put(short i, Method method) {
        this.methodMap.put(i, method);
    }
    @Override
    public Method findMethod(short i) {
        return this.methodMap.get(i);
    }
}
按照此接口实现的RPC调用:
// 定义需要rpc调用的方法
public class Function {
    private int t = 0;
    public void add(int i){
        t += i;
    }
    public int get(){
        return t;
    }
}
public class RpcAdvanced {
    public static void main(String[] args){
        try {
            new RpcAdvanced().methodLookup();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void methodLookup() throws Exception {
        RequestOptions opts = new RequestOptions(ResponseMode.GET_ALL, 5000);
        
        Function function = new Function();
        JChannel channelA = new JChannel().name("channelA");
        JChannel channelB = new JChannel().name("channelB");
        RpcDispatcher dispA = new RpcDispatcher(channelA, function);
        RpcDispatcher dispB = new RpcDispatcher(channelB, function);
        MethodMap methodMap = new MethodMap();
        methodMap.put((short) 0, function.getClass().getMethod("add", int.class));
        methodMap.put((short)1, function.getClass().getMethod("get"));
        dispA.setMethodLookup(methodMap);
        dispB.setMethodLookup(methodMap);
        channelA.connect("test group");
        channelB.connect("test group");
        MethodCall methodCall = new MethodCall((short)0, 2);
        dispA.callRemoteMethods(null, methodCall, opts);
        System.out.println(dispB.callRemoteMethods(null, new MethodCall((short)1), opts));
        Util.close(dispA, dispB, channelA, channelB);
    }
}
# 用MethodInvoker自定义方法调用
针对另一种更极端的场景,假设考虑到不允许使用反射的项目,直接使用反射可能会违反规定。这个时候JGroups开放了MethodInvoker接口。
public interface MethodInvoker {
    Object invoke(Object var1, short var2, Object[] var3) throws Exception;
}
以下是具体实现。
public class MyMethodInvoker implements MethodInvoker {
    @Override
    public Object invoke(Object o, short i, Object[] objects) throws Exception {
        // 假设这里传过来的都是Function
        Function f = (Function) o;
        switch (i){
            case 0:
                f.add((int)objects[0]);
                return null;
            case 1:
                return f.get();
            default:
                throw new UnsupportedOperationException();
        }
    }
}
public class RpcSelfDefined {
    public static void main(String[] args) throws Exception {
        new RpcSelfDefined().methodInvokeTest();
    }
    public void methodInvokeTest() throws Exception {
        RequestOptions opt = new RequestOptions(ResponseMode.GET_ALL, 5000);
        JChannel channelA = new JChannel().name("channelA");
        JChannel channelB = new JChannel().name("channelB");
        Function function = new Function();
        RpcDispatcher dispA = new RpcDispatcher(channelA, function);
        RpcDispatcher dispB = new RpcDispatcher(channelB, function);
        MethodInvoker methodInvoker = new MyMethodInvoker();
        dispA.setMethodInvoker(methodInvoker);
        dispB.setMethodInvoker(methodInvoker);
        MethodCall methodCall = new MethodCall((short)0, 1);
        dispA.callRemoteMethods(null, methodCall, opt);
        MethodCall methodCall1 = new MethodCall((short)1);
        System.out.println(dispB.callRemoteMethods(null, methodCall1, opt));
        Util.close(dispA, dispB, channelA, channelB);
    }
}