# Building Blocks使用
此处使用JGroup 4.x版本,基于JDK 8
Building Blocks是针对JChannel的应用更上层的抽象。一般来说,JChannel作为发送信息(send、receive)的最小单元,而Building Blocks是基于JChannel提供更实用的功能。
下面以最基础的组员间消息分发为例,可以使用发送消息(消息分发)和RPC方式的方法调用两种。RPC方式是基于消息分发进行的扩展。
# 使用消息分发
消息分发使用MessageDispatcher,MessageDispatcher需要绑定JChannel和RequestHandler,前者用于发送消息,后者用于处理接收到的消息。
public class DispatchWithBuildingBlocksTest implements RequestHandler {
public void startInMessage() throws Exception {
// 创建两个JChannel,使用默认配置,并命名
JChannel channelA = new JChannel().name("channelA");
JChannel channelB = new JChannel().name("channelB");
MessageDispatcher dispA = new MessageDispatcher(channelA, this);
MessageDispatcher dispB = new MessageDispatcher(channelB, this);
// 连接到同一个group中
channelA.connect("test group");
channelB.connect("test group");
for(int i = 0; i < 10; i++){
Util.sleep(100);
System.out.println("Casting message #" + i);
byte[] payload = ("number #" + i).getBytes();
RspList list = dispA.castMessage(null, payload, 0, payload.length, RequestOptions.SYNC());
System.out.println("Responses: \n" + list);
}
Util.close(dispA, dispB, channelA, channelB);
}
@Override
public Object handle(Message message) throws Exception {
System.out.println("handle():" + message);
return "Success!";
}
}
这里使用RspList list = dispA.castMessage(null, payload, 0, payload.length, RequestOptions.SYNC());命令发送消息,表示发送给所有组员,且使用同步方式。此处同步方式封装了实际的RequestOptions的创建方式,其实内部设置为等待所有请求返回才返回。
默认的实现是使用同步的方式。也可以改用MessageDispatcher.setAsynDispatching(true);来开启异步状态。
# 使用RPC调用
RPC调用使用反射的方式。这种方式更灵活,注册JChannel和需要调用的方法的对象(Rpc对象,但对象类型不需要做额外声明),底层采用反射的方式就可以针对对象进行方法调用。
public class DispatchWithBuildingBlocksTest{
public void startInRpc() throws Exception{
RequestOptions opts = new RequestOptions(ResponseMode.GET_ALL, 5000);
JChannel channelA = new JChannel().name("channelA");
JChannel channelB = new JChannel().name("channelB");
MethodCall call = new MethodCall(getClass().getMethod("print", int.class));
RpcDispatcher dispA = new RpcDispatcher(channelA, this);
RpcDispatcher dispB = new RpcDispatcher(channelB, this);
channelA.connect("test group");
channelB.connect("test group");
for(int i = 0; i < 10; i++){
Util.sleep(100);
call.setArgs(i);
RspList rspList = dispA.callRemoteMethods(null, call, opts);
System.out.println("Responses: " + rspList);
}
Util.close(dispA, dispB, channelA, channelB);
}
public int print(int number) throws Exception{
return number * 2;
}
}
这里也可以看到,RequestOptions对象的创建实际上是包含了ResponseMode,这里使用的GET_ALL含义是方法阻塞到获取所有的组员的response后,再返回。
# 用MethodLookUp接口提高反射性能
实际上频繁地调用Rpc的默认接口是有性能损耗的,原因在于每次定义了方法名,JGroups底层都是根据反射方式在注册的Rpc对象中去寻找方法,因此每次都是一次反射调用。JGroups的作者针对这种场景,优化为可以我们直接将反射调用的方法保存下来,每次调用的时候直接按照一个id值去快速找到指定的方法对象,省去了反射寻找的性能消耗。
public interface MethodLookup {
Method findMethod(short var1);
}
MethodLookUp接口是在RcpDispatcher中含有的一个属性,但官方并没有实现,这其实是交给用户去自定义的接口。这里我自定义一个根据Hashmap来做的简单实现。
public class MethodMap implements MethodLookup {
private Map<Short, Method> methodMap = new HashMap<>();
public void put(short i, Method method) {
this.methodMap.put(i, method);
}
@Override
public Method findMethod(short i) {
return this.methodMap.get(i);
}
}
按照此接口实现的RPC调用:
// 定义需要rpc调用的方法
public class Function {
private int t = 0;
public void add(int i){
t += i;
}
public int get(){
return t;
}
}
public class RpcAdvanced {
public static void main(String[] args){
try {
new RpcAdvanced().methodLookup();
} catch (Exception e) {
e.printStackTrace();
}
}
public void methodLookup() throws Exception {
RequestOptions opts = new RequestOptions(ResponseMode.GET_ALL, 5000);
Function function = new Function();
JChannel channelA = new JChannel().name("channelA");
JChannel channelB = new JChannel().name("channelB");
RpcDispatcher dispA = new RpcDispatcher(channelA, function);
RpcDispatcher dispB = new RpcDispatcher(channelB, function);
MethodMap methodMap = new MethodMap();
methodMap.put((short) 0, function.getClass().getMethod("add", int.class));
methodMap.put((short)1, function.getClass().getMethod("get"));
dispA.setMethodLookup(methodMap);
dispB.setMethodLookup(methodMap);
channelA.connect("test group");
channelB.connect("test group");
MethodCall methodCall = new MethodCall((short)0, 2);
dispA.callRemoteMethods(null, methodCall, opts);
System.out.println(dispB.callRemoteMethods(null, new MethodCall((short)1), opts));
Util.close(dispA, dispB, channelA, channelB);
}
}
# 用MethodInvoker自定义方法调用
针对另一种更极端的场景,假设考虑到不允许使用反射的项目,直接使用反射可能会违反规定。这个时候JGroups开放了MethodInvoker接口。
public interface MethodInvoker {
Object invoke(Object var1, short var2, Object[] var3) throws Exception;
}
以下是具体实现。
public class MyMethodInvoker implements MethodInvoker {
@Override
public Object invoke(Object o, short i, Object[] objects) throws Exception {
// 假设这里传过来的都是Function
Function f = (Function) o;
switch (i){
case 0:
f.add((int)objects[0]);
return null;
case 1:
return f.get();
default:
throw new UnsupportedOperationException();
}
}
}
public class RpcSelfDefined {
public static void main(String[] args) throws Exception {
new RpcSelfDefined().methodInvokeTest();
}
public void methodInvokeTest() throws Exception {
RequestOptions opt = new RequestOptions(ResponseMode.GET_ALL, 5000);
JChannel channelA = new JChannel().name("channelA");
JChannel channelB = new JChannel().name("channelB");
Function function = new Function();
RpcDispatcher dispA = new RpcDispatcher(channelA, function);
RpcDispatcher dispB = new RpcDispatcher(channelB, function);
MethodInvoker methodInvoker = new MyMethodInvoker();
dispA.setMethodInvoker(methodInvoker);
dispB.setMethodInvoker(methodInvoker);
MethodCall methodCall = new MethodCall((short)0, 1);
dispA.callRemoteMethods(null, methodCall, opt);
MethodCall methodCall1 = new MethodCall((short)1);
System.out.println(dispB.callRemoteMethods(null, methodCall1, opt));
Util.close(dispA, dispB, channelA, channelB);
}
}