# JChannel初探

此处使用JGroup 4.x版本,基于JDK 8

聊到JGroup,其连接、发送、接收数据的最小单元,在JGroup中被抽象为JChannel。其实JGroup中的组(Group)概念,具体就表现为一组相同名字的JChannel的集合。应用进程想加入JGroup中的某个组(Group),就需要创建或者连接到一个指定名字的JChannel,成为组员(Member)。

JChannel和众多抽象为“连接”功能的对象一样,拥有自己的生命周期。一个直观的生命周期转换如下:(截取自官方手册)

2021-09-02T142903

# hello world

在这里我们写一个Demo。在本地建立两个JChannel,分别取名为A和B。将A和B连接到同一个Group组下面(组名就叫“test group”)。然后程序开启一个循环监听,监听来自标准输入的信息,并使用JChannelA发送到组中,默认发送给所有的组员。


import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.util.Util;

public class Demo {

    public static void main(String[] args) throws Exception {

        // 创建两个JChannel,使用默认配置,并命名
        JChannel channelA = new JChannel().name("channelA");
        JChannel channelB = new JChannel().name("channelB");

        // 给两个JChannel添加Receiver
        channelA.setReceiver(new ReceiverAdapter(){
            @Override
            public void viewAccepted(View new_view) {
                System.out.println("A view: " + new_view);
            }

            @Override
            public void receive(Message msg) {
                System.out.println("A<< " + msg.getObject() + " [" + msg.getSrc() + "]");
            }
        });

        channelB.setReceiver(new ReceiverAdapter(){
            @Override
            public void viewAccepted(View new_view) {
                System.out.println("B view: " + new_view);
            }

            @Override
            public void receive(Message msg) {
                System.out.println("B<< " + msg.getObject() + " [" + msg.getSrc() + "]");
            }
        });

        // 连接到同一个group中
        channelA.connect("test group");
        channelB.connect("test group");

        // 监听键盘输入
        for (;;){
            String line = Util.readStringFromStdin("");
            channelA.send(null, line);
        }

    }
}

我们运行程序,首先会在控制台看到连接到group的过程中打印出的信息:

-------------------------------------------------------------------
GMS: address=channelA, cluster=test group, physical address=192.168.120.238:55465
-------------------------------------------------------------------
-------------------------------------------------------------------
GMS: address=channelB, cluster=test group, physical address=192.168.120.238:57971
-------------------------------------------------------------------
A view: [channelA|1] (2) [channelA, channelB]
B view: [channelA|1] (2) [channelA, channelB]

可见此处JGroup新建了两个JChannel,并且地址名设置为我们自定义的名字。同时集群名(组名)为connect时连接的参数名。每个JChannel还绑定了一个物理地址。接下来,A和B获得了整个组的view信息,也就是说每个JChannel中都保存了整个组组员的列表。

我们在标准输入中键入Hello world!

Hello world!
A<< Hello world! [channelA]
B<< Hello world! [channelA]

可见,由channelA发出的信息,在组内每个组员都会被接收到。

# JChannel配置

以JGroup的jar中udp.xml为例,一个JGroup的配置xml文件以config标签为起始。

<!--
  Default stack using IP multicasting. It is similar to the "udp"
  stack in stacks.xml, but doesn't use streaming state transfer and flushing
  author: Bela Ban
-->
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd"
        >
    <UDP
         mcast_port="${jgroups.udp.mcast_port:45588}"
         ip_ttl="4"
         tos="8"
         ucast_recv_buf_size="5M"
         ucast_send_buf_size="5M"
         mcast_recv_buf_size="5M"
         mcast_send_buf_size="5M"
         max_bundle_size="64K"
         enable_diagnostics="true"
         thread_naming_pattern="cl"

         thread_pool.min_threads="0"
         thread_pool.max_threads="20"
         thread_pool.keep_alive_time="30000"/>

    <PING />
    <MERGE3 max_interval="30000"
            min_interval="10000"/>
    <FD_SOCK/>
    <FD_ALL/>
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK2 xmit_interval="500"
                    xmit_table_num_rows="100"
                    xmit_table_msgs_per_row="2000"
                    xmit_table_max_compaction_time="30000"
                    use_mcast_xmit="false"
                    discard_delivered_msgs="true"/>
    <UNICAST3 xmit_interval="500"
              xmit_table_num_rows="100"
              xmit_table_msgs_per_row="2000"
              xmit_table_max_compaction_time="60000"
              conn_expiry_timeout="0"/>
    <pbcast.STABLE desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true" join_timeout="2000"/>
    <UFC max_credits="10M"
         min_threshold="0.4"/>
    <MFC max_credits="10M"
         min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <RSVP resend_interval="2000" timeout="10000"/>
    <pbcast.STATE_TRANSFER />
</config>

这里每一个标签名代表的是org.jgroups.protocols这个包下面的类的名字。例如UDP类的路径为org.jgroups.protocols.UDP,pbcast.GMS路径为org.jgroups.protocols.pbcast.GMS。

这里的配置项都继承了Protocol抽象类,因此这里定义的就是Protocol Stack了。之所以叫栈(Stack),是因为实际layer的顺序是按照压栈顺序进行的。也就是说UDP会成为最后一个layer,而pbcast.STATE_TRANSFER会成为最上面的第一个layer。

protocal stack也可以在代码中定义。例如

Protocol[] prot_stack={
          new UDP().setValue("bind_addr", InetAddress.getByName("127.0.0.1")), 2
          new PING(),
          new MERGE3(),
          new FD_SOCK(),
          new FD_ALL(),
          new VERIFY_SUSPECT(),
          new BARRIER(),
          new NAKACK2(),
          new UNICAST3(),
          new STABLE(),
          new GMS(),
          new UFC(),
          new MFC(),
          new FRAG2()};
JChannel ch = new JChannel(prot_stack);

这里数组中的顺序和xml中顺序一样,也是bottom-first的。

# JChannel加入cluster/group的细节

connect()调用成功的时候,实际的组员才会以connected的形式加入到组里。也就是说假设一个JChannel调用connect方法时,当前组名中没有其他组员,这时这个组才会在逻辑上被创建。每个组的第一个组员(first member)会充当协调人(coordinator)的角色。协调人负责在组员发生变化时,生成新的view。因此实际上每个组员在获取view的时候,可以轻易地定位到协调人并从协调人那里获取view,而不需要和其他组员进行交流。

加入或创建组通常是使用connect接口直接完成的。

public synchronized JChannel connect(String cluster) throws Exception;

但有时在成功加入组的同时,我们往往会使用getState()方法判断集群或某个节点的状态。这就带来一个并发问题。如果当前节点先调用connect()方法,然后调用getState()方法,是不能保证getState()方法一定能获取到connect()连接时的状态的。因此JChannel设计了接口来解决这样的问题,目的是即避免了数据发送的分散,并且保证了二者操作的原子性。

public synchronized JChannel connect(String cluster_name, Address target, long timeout) throws Exception;

getState方法的返回值也要通过ReceiverAdapter中的回掉函数获取。原因在于state的message本身也是message,不能破坏JGroup的FIFO原则,因此也要当作message的一员返回。getState和setState接口参数不需要显示关闭,这个由JGroup来完成。假设A调用JChannel.getState(B),这里A的JChannel会触发setState回调,而B会触发getState回调。(触发顺序先B后A)

public void getState(OutputStream output) throws Exception;
public void setState(InputStream input) throws Exception;

# JChannel发送message的细节

channel的send()操作可以是异步的,而且因为JGroup有重传机制,在传对象、尤其是buffer的时候,要注意传的buffer不能被其他线程修改。

使用JChannel.setDiscardOwnMessages(boolean flag)来确定消息是否传给节点自己。

设置RSVP可以保证消息发送具有同步机制。当一个Message设置flag为RSVP,那么在所有的组员(不包括崩溃的节点)在确认接收到信息之前,发送者都会阻塞。并且这个flag的另一个作用是,可以保证当前节点在这个消息之前的消息,都被所有节点接收到了。比如P节点发送了1-10的消息,10标记为RSVP,那么当10消息的send成功返回时,P默认为所有节点都收到了1-10的消息。使用这个flag要保证协议栈中定义了重传和超时的layer,避免一直阻塞。

也可以设置RSVP_NB,达到上述要求的同时非阻塞。

# JChannel接收message的细节

这里有个注意点,我们定义接收的callback函数时,有如下接口:

public void receive(Message msg) {
    // do something
}

这里我们不能直接将msg中的buffer作为一个后备参数传给别的函数,并返回。因为实际上msg可能涉及到重用问题,也就是说这个receive返回后,msg中的内容会发生变化。因此使用时要在函数内将msg的内容解析并开辟新的内存转换为自己想用的内容(比如反序列化,或者copy)。

MessageBatch函数默认将Batch遍历并调用receive。

# 参考

http://www.jgroups.org/manual4/index.html#JChannel