# 深入理解JChannel
此处使用JGroup 4.x版本,基于JDK 8
# 从Protocol开始
以udp.xml为例,
<!--
Default stack using IP multicasting. It is similar to the "udp"
stack in stacks.xml, but doesn't use streaming state transfer and flushing
author: Bela Ban
-->
<config xmlns="urn:org:jgroups"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd"
>
<UDP
mcast_port="${jgroups.udp.mcast_port:45588}"
ip_ttl="4"
tos="8"
ucast_recv_buf_size="5M"
ucast_send_buf_size="5M"
mcast_recv_buf_size="5M"
mcast_send_buf_size="5M"
max_bundle_size="64K"
enable_diagnostics="true"
thread_naming_pattern="cl"
thread_pool.min_threads="0"
thread_pool.max_threads="20"
thread_pool.keep_alive_time="30000"/>
<PING />
<MERGE3 max_interval="30000"
min_interval="10000"/>
<FD_SOCK/>
<FD_ALL/>
<VERIFY_SUSPECT timeout="1500" />
<BARRIER />
<pbcast.NAKACK2 xmit_interval="500"
xmit_table_num_rows="100"
xmit_table_msgs_per_row="2000"
xmit_table_max_compaction_time="30000"
use_mcast_xmit="false"
discard_delivered_msgs="true"/>
<UNICAST3 xmit_interval="500"
xmit_table_num_rows="100"
xmit_table_msgs_per_row="2000"
xmit_table_max_compaction_time="60000"
conn_expiry_timeout="0"/>
<pbcast.STABLE desired_avg_gossip="50000"
max_bytes="4M"/>
<pbcast.GMS print_local_addr="true" join_timeout="2000"/>
<UFC max_credits="10M"
min_threshold="0.4"/>
<MFC max_credits="10M"
min_threshold="0.4"/>
<FRAG2 frag_size="60K" />
<RSVP resend_interval="2000" timeout="10000"/>
<pbcast.STATE_TRANSFER />
</config>
config下的每一个标签是一个实际的Protocol(org.jgroups.protocols这个包下面)。首先每个标签会被转化为一个ProtocolConfiguration对象。
// ProtocolStack.setup
public void setup(List<ProtocolConfiguration> configs) throws Exception {
if(top_prot == null) {
top_prot=new Configurator(this).setupProtocolStack(configs);
top_prot.setUpProtocol(this);
this.setDownProtocol(top_prot);
bottom_prot=getBottomProtocol();
initProtocolStack();
}
}
public static Protocol setupProtocolStack(List<ProtocolConfiguration> protocol_configs, ProtocolStack st) throws Exception {
List<Protocol> protocols=createProtocolsAndInitializeAttrs(protocol_configs, st);
// Fixes NPE with concurrent channel creation when using a shared stack (https://issues.jboss.org/browse/JGRP-1488)
Protocol top_protocol=protocols.get(protocols.size() - 1);
top_protocol.setUpProtocol(st);
return connectProtocols(protocols);
}
public static Protocol connectProtocols(List<Protocol> protocol_list) throws Exception {
Protocol current_layer=null, next_layer=null;
for(int i=0; i < protocol_list.size(); i++) {
current_layer=protocol_list.get(i);
if(i + 1 >= protocol_list.size())
break;
next_layer=protocol_list.get(i + 1);
next_layer.setDownProtocol(current_layer);
current_layer.setUpProtocol(next_layer);
}
// basic protocol sanity check
sanityCheck(protocol_list);
return current_layer;
}
从ProtocolStack的setup可以看出,包装好的ProtocolConfiguration最终转换为了一个ProtocolStack。从最终调用的connectProtocols可以看出,实际上转换成的Protocol list被转换为了链式调用的Protocol结构。其中因为每一个next protocol的下面(down)是current,因此这是一个从下而上的stack构建方式。
我们再来看下一个Protocol是怎么构建的。
protected static Protocol createLayer(ProtocolStack stack, ProtocolConfiguration config) throws Exception {
String protocol_name=config.getProtocolName();
if(protocol_name == null)
return null;
Class<? extends Protocol> clazz=Util.loadProtocolClass(protocol_name, stack != null? stack.getClass() : null);
if(clazz.getAnnotation(Deprecated.class) != null)
log.warn("%s has been deprecated; please upgrade to a newer version of the protocol", clazz.getSimpleName());
if(clazz.getAnnotation(RecommendedForUpgrade.class) != null)
log.warn("there are more recent versions of %s present; " +
"please upgrade to a newer version of the protocol", clazz.getSimpleName());
try {
Protocol retval=clazz.getDeclaredConstructor().newInstance();
if(stack != null)
retval.setProtocolStack(stack);
return retval;
}
catch(InstantiationException inst_ex) {
throw new InstantiationException(String.format(Util.getMessage("ProtocolCreateError"),
protocol_name, inst_ex.getLocalizedMessage()));
}
}
public static final String PREFIX="org.jgroups.protocols.";
public static Class<? extends Protocol> loadProtocolClass(String protocol_name, Class<?> cl) throws Exception {
String defaultProtocolName=Global.PREFIX + protocol_name;
Class<? extends Protocol> clazz=null;
try {
clazz=Util.loadClass(defaultProtocolName, cl);
}
catch(ClassNotFoundException e) {
}
if(clazz == null) {
try {
clazz=Util.loadClass(protocol_name, cl);
}
catch(ClassNotFoundException e) {
}
if(clazz == null)
throw new Exception(String.format(Util.getMessage("ProtocolLoadError"), protocol_name, defaultProtocolName));
}
return clazz;
}
可见针对每一个Protocol name, 由global value(包名)拼接而成类名,从反射构建而来。
# 一个Protocol下的链式调用
Protocol的设计十分简洁。最主要的方法为“上浮”up和“下沉”down两个。下沉是发送给Stack的操作,而上浮是从Stack下层返回值的操作。
public abstract class Protocol {
public Object down(Event evt) {
return down_prot.down(evt);
}
public Object down(Message msg) {
return down_prot.down(msg);
}
public Object up(Event evt) {
return up_prot.up(evt);
}
public Object up(Message msg) {
return up_prot.up(msg);
}
}
一些具体的Protocol的实现根据需要做处理,或者传递。以伪代码为例:
public class SomeProtocol extends Protocol{
public Object down(Event evt){
if(evt.getType().equals(SOME_TYPE)){
Object res;
// do something
return res; // 或者down_prot.down(evt) 继续处理。
}else{
down_prot.down(evt);
}
}
}
虽然ProtocolStack可以看作一个Protocol的栈,但其实ProtocolStack本身也继承自Protocol接口。
public class ProtocolStack extends Protocol {
protected Protocol top_prot;
protected Protocol bottom_prot;
protected JChannel channel;
protected volatile boolean stopped=true;
public ProtocolStack topProtocol(Protocol top) {this.top_prot=top; return this;}
public ProtocolStack bottomProtocol(Protocol bottom) {this.bottom_prot=bottom; return this;}
public Object up(Event evt) {
return channel.up(evt);
}
public Object up(Message msg) {return channel.up(msg);}
public void up(MessageBatch batch) {
channel.up(batch);
}
public Object down(Event evt) {
if(top_prot != null)
return top_prot.down(evt);
return null;
}
public Object down(Message msg) {
if(top_prot != null)
return top_prot.down(msg);
return null;
}
}
唯一不同的是,ProtocolStack可以看作是JChannel和栈顶的第一个Protocol的“连接器”。JChannel发送来的消息从ProtocolStack发送给真正的Protocol栈,而Protocol栈下层上浮的数据由ProtocolStack交给JChannel。
有了上面的基础,其实JChannel的操作就比较好理解了。
# JChannel的主动连接
连接的时候创建一个连接的Event,并将其下沉到ProtocolStack中。
protected synchronized JChannel connect(String cluster_name, boolean useFlushIfPresent) throws Exception {
if(!_preConnect(cluster_name))
return this;
Event connect_event=new Event(useFlushIfPresent? Event.CONNECT_USE_FLUSH : Event.CONNECT, cluster_name);
_connect(connect_event);
state=State.CONNECTED;
notifyChannelConnected(this);
return this;
}
protected JChannel _connect(Event evt) throws Exception {
try {
down(evt);
return this;
}
catch(Exception ex) {
cleanup();
throw ex;
}
}
public Object down(Event evt) {
if(evt == null) return null;
return prot_stack.down(evt);
}
# JChannel的主动发送
与主动连接同理。
public JChannel send(Message msg) throws Exception {
if(msg == null)
throw new NullPointerException("msg is null");
checkClosedOrNotConnected();
down(msg);
return this;
}
public Object down(Message msg) {
if(msg == null) return null;
if(stats) {
sent_msgs++;
sent_bytes+=msg.getLength();
}
return prot_stack.down(msg);
}