# 深入理解JChannel

此处使用JGroup 4.x版本,基于JDK 8

# 从Protocol开始

以udp.xml为例,

<!--
  Default stack using IP multicasting. It is similar to the "udp"
  stack in stacks.xml, but doesn't use streaming state transfer and flushing
  author: Bela Ban
-->
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd"
        >
    <UDP
         mcast_port="${jgroups.udp.mcast_port:45588}"
         ip_ttl="4"
         tos="8"
         ucast_recv_buf_size="5M"
         ucast_send_buf_size="5M"
         mcast_recv_buf_size="5M"
         mcast_send_buf_size="5M"
         max_bundle_size="64K"
         enable_diagnostics="true"
         thread_naming_pattern="cl"

         thread_pool.min_threads="0"
         thread_pool.max_threads="20"
         thread_pool.keep_alive_time="30000"/>

    <PING />
    <MERGE3 max_interval="30000"
            min_interval="10000"/>
    <FD_SOCK/>
    <FD_ALL/>
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK2 xmit_interval="500"
                    xmit_table_num_rows="100"
                    xmit_table_msgs_per_row="2000"
                    xmit_table_max_compaction_time="30000"
                    use_mcast_xmit="false"
                    discard_delivered_msgs="true"/>
    <UNICAST3 xmit_interval="500"
              xmit_table_num_rows="100"
              xmit_table_msgs_per_row="2000"
              xmit_table_max_compaction_time="60000"
              conn_expiry_timeout="0"/>
    <pbcast.STABLE desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true" join_timeout="2000"/>
    <UFC max_credits="10M"
         min_threshold="0.4"/>
    <MFC max_credits="10M"
         min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <RSVP resend_interval="2000" timeout="10000"/>
    <pbcast.STATE_TRANSFER />
</config>

config下的每一个标签是一个实际的Protocol(org.jgroups.protocols这个包下面)。首先每个标签会被转化为一个ProtocolConfiguration对象。

// ProtocolStack.setup
public void setup(List<ProtocolConfiguration> configs) throws Exception {
        if(top_prot == null) {
            top_prot=new Configurator(this).setupProtocolStack(configs);
            top_prot.setUpProtocol(this);
            this.setDownProtocol(top_prot);
            bottom_prot=getBottomProtocol();
            initProtocolStack();
        }
    }

public static Protocol setupProtocolStack(List<ProtocolConfiguration> protocol_configs, ProtocolStack st) throws Exception {
        List<Protocol> protocols=createProtocolsAndInitializeAttrs(protocol_configs, st);
        // Fixes NPE with concurrent channel creation when using a shared stack (https://issues.jboss.org/browse/JGRP-1488)
        Protocol top_protocol=protocols.get(protocols.size() - 1);
        top_protocol.setUpProtocol(st);
        return connectProtocols(protocols);
    }

public static Protocol connectProtocols(List<Protocol> protocol_list) throws Exception {
        Protocol current_layer=null, next_layer=null;

        for(int i=0; i < protocol_list.size(); i++) {
            current_layer=protocol_list.get(i);
            if(i + 1 >= protocol_list.size())
                break;
            next_layer=protocol_list.get(i + 1);
            next_layer.setDownProtocol(current_layer);
            current_layer.setUpProtocol(next_layer);
        }
        // basic protocol sanity check
        sanityCheck(protocol_list);
        return current_layer;
    }

从ProtocolStack的setup可以看出,包装好的ProtocolConfiguration最终转换为了一个ProtocolStack。从最终调用的connectProtocols可以看出,实际上转换成的Protocol list被转换为了链式调用的Protocol结构。其中因为每一个next protocol的下面(down)是current,因此这是一个从下而上的stack构建方式。

我们再来看下一个Protocol是怎么构建的。


protected static Protocol createLayer(ProtocolStack stack, ProtocolConfiguration config) throws Exception {
        String protocol_name=config.getProtocolName();
        if(protocol_name == null)
            return null;
        Class<? extends Protocol> clazz=Util.loadProtocolClass(protocol_name, stack != null? stack.getClass() : null);
        if(clazz.getAnnotation(Deprecated.class) != null)
            log.warn("%s has been deprecated; please upgrade to a newer version of the protocol", clazz.getSimpleName());
        if(clazz.getAnnotation(RecommendedForUpgrade.class) != null)
            log.warn("there are more recent versions of %s present; " +
                       "please upgrade to a newer version of the protocol", clazz.getSimpleName());
        try {
            Protocol retval=clazz.getDeclaredConstructor().newInstance();
            if(stack != null)
                retval.setProtocolStack(stack);
            return retval;
        }
        catch(InstantiationException inst_ex) {
            throw new InstantiationException(String.format(Util.getMessage("ProtocolCreateError"),
                                                           protocol_name, inst_ex.getLocalizedMessage()));
        }
    }

public static final String PREFIX="org.jgroups.protocols.";

public static Class<? extends Protocol> loadProtocolClass(String protocol_name, Class<?> cl) throws Exception {
        String defaultProtocolName=Global.PREFIX + protocol_name;
        Class<? extends Protocol> clazz=null;
        try {
            clazz=Util.loadClass(defaultProtocolName, cl);
        }
        catch(ClassNotFoundException e) {
        }
        if(clazz == null) {
            try {
                clazz=Util.loadClass(protocol_name, cl);
            }
            catch(ClassNotFoundException e) {
            }
            if(clazz == null)
                throw new Exception(String.format(Util.getMessage("ProtocolLoadError"), protocol_name, defaultProtocolName));
        }
        return clazz;
    }

可见针对每一个Protocol name, 由global value(包名)拼接而成类名,从反射构建而来。

# 一个Protocol下的链式调用

Protocol的设计十分简洁。最主要的方法为“上浮”up和“下沉”down两个。下沉是发送给Stack的操作,而上浮是从Stack下层返回值的操作。

public abstract class Protocol {
    public Object down(Event evt) {
        return down_prot.down(evt);
    }
    public Object down(Message msg) {
        return down_prot.down(msg);
    }
    public Object up(Event evt) {
        return up_prot.up(evt);
    }
    public Object up(Message msg) {
        return up_prot.up(msg);
    }
}

一些具体的Protocol的实现根据需要做处理,或者传递。以伪代码为例:

public class SomeProtocol extends Protocol{
    public Object down(Event evt){
        if(evt.getType().equals(SOME_TYPE)){
            Object res;
            // do something
            return res; // 或者down_prot.down(evt) 继续处理。
        }else{
            down_prot.down(evt);
        }
    }
}

虽然ProtocolStack可以看作一个Protocol的栈,但其实ProtocolStack本身也继承自Protocol接口。

public class ProtocolStack extends Protocol {
    protected Protocol            top_prot;
    protected Protocol            bottom_prot;
    protected JChannel            channel;
    protected volatile boolean    stopped=true;


    public ProtocolStack topProtocol(Protocol top)       {this.top_prot=top; return this;}
    public ProtocolStack bottomProtocol(Protocol bottom) {this.bottom_prot=bottom; return this;}

    public Object up(Event evt) {
        return channel.up(evt);
    }
    public Object up(Message msg) {return channel.up(msg);}

    public void up(MessageBatch batch) {
        channel.up(batch);
    }

    public Object down(Event evt) {
        if(top_prot != null)
            return top_prot.down(evt);
        return null;
    }

    public Object down(Message msg) {
        if(top_prot != null)
            return top_prot.down(msg);
        return null;
    }
}

唯一不同的是,ProtocolStack可以看作是JChannel和栈顶的第一个Protocol的“连接器”。JChannel发送来的消息从ProtocolStack发送给真正的Protocol栈,而Protocol栈下层上浮的数据由ProtocolStack交给JChannel。

有了上面的基础,其实JChannel的操作就比较好理解了。

# JChannel的主动连接

连接的时候创建一个连接的Event,并将其下沉到ProtocolStack中。

protected synchronized JChannel connect(String cluster_name, boolean useFlushIfPresent) throws Exception {
        if(!_preConnect(cluster_name))
            return this;
        Event connect_event=new Event(useFlushIfPresent? Event.CONNECT_USE_FLUSH : Event.CONNECT, cluster_name);
        _connect(connect_event);
        state=State.CONNECTED;
        notifyChannelConnected(this);
        return this;
    }

protected JChannel _connect(Event evt) throws Exception {
        try {
            down(evt);
            return this;
        }
        catch(Exception ex) {
            cleanup();
            throw ex;
        }
}

public Object down(Event evt) {
        if(evt == null) return null;
        return prot_stack.down(evt);
}

# JChannel的主动发送

与主动连接同理。

public JChannel send(Message msg) throws Exception {
        if(msg == null)
            throw new NullPointerException("msg is null");
        checkClosedOrNotConnected();
        down(msg);
        return this;
    }

public Object down(Message msg) {
        if(msg == null) return null;
        if(stats) {
            sent_msgs++;
            sent_bytes+=msg.getLength();
        }
        return prot_stack.down(msg);
    }