# JChannel初探
此处使用JGroup 4.x版本,基于JDK 8
聊到JGroup,其连接、发送、接收数据的最小单元,在JGroup中被抽象为JChannel。其实JGroup中的组(Group)概念,具体就表现为一组相同名字的JChannel的集合。应用进程想加入JGroup中的某个组(Group),就需要创建或者连接到一个指定名字的JChannel,成为组员(Member)。
JChannel和众多抽象为“连接”功能的对象一样,拥有自己的生命周期。一个直观的生命周期转换如下:(截取自官方手册)
# hello world
在这里我们写一个Demo。在本地建立两个JChannel,分别取名为A和B。将A和B连接到同一个Group组下面(组名就叫“test group”)。然后程序开启一个循环监听,监听来自标准输入的信息,并使用JChannelA发送到组中,默认发送给所有的组员。
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.util.Util;
public class Demo {
public static void main(String[] args) throws Exception {
// 创建两个JChannel,使用默认配置,并命名
JChannel channelA = new JChannel().name("channelA");
JChannel channelB = new JChannel().name("channelB");
// 给两个JChannel添加Receiver
channelA.setReceiver(new ReceiverAdapter(){
@Override
public void viewAccepted(View new_view) {
System.out.println("A view: " + new_view);
}
@Override
public void receive(Message msg) {
System.out.println("A<< " + msg.getObject() + " [" + msg.getSrc() + "]");
}
});
channelB.setReceiver(new ReceiverAdapter(){
@Override
public void viewAccepted(View new_view) {
System.out.println("B view: " + new_view);
}
@Override
public void receive(Message msg) {
System.out.println("B<< " + msg.getObject() + " [" + msg.getSrc() + "]");
}
});
// 连接到同一个group中
channelA.connect("test group");
channelB.connect("test group");
// 监听键盘输入
for (;;){
String line = Util.readStringFromStdin("");
channelA.send(null, line);
}
}
}
我们运行程序,首先会在控制台看到连接到group的过程中打印出的信息:
-------------------------------------------------------------------
GMS: address=channelA, cluster=test group, physical address=192.168.120.238:55465
-------------------------------------------------------------------
-------------------------------------------------------------------
GMS: address=channelB, cluster=test group, physical address=192.168.120.238:57971
-------------------------------------------------------------------
A view: [channelA|1] (2) [channelA, channelB]
B view: [channelA|1] (2) [channelA, channelB]
可见此处JGroup新建了两个JChannel,并且地址名设置为我们自定义的名字。同时集群名(组名)为connect时连接的参数名。每个JChannel还绑定了一个物理地址。接下来,A和B获得了整个组的view信息,也就是说每个JChannel中都保存了整个组组员的列表。
我们在标准输入中键入Hello world!
Hello world!
A<< Hello world! [channelA]
B<< Hello world! [channelA]
可见,由channelA发出的信息,在组内每个组员都会被接收到。
# JChannel配置
以JGroup的jar中udp.xml为例,一个JGroup的配置xml文件以config标签为起始。
<!--
Default stack using IP multicasting. It is similar to the "udp"
stack in stacks.xml, but doesn't use streaming state transfer and flushing
author: Bela Ban
-->
<config xmlns="urn:org:jgroups"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd"
>
<UDP
mcast_port="${jgroups.udp.mcast_port:45588}"
ip_ttl="4"
tos="8"
ucast_recv_buf_size="5M"
ucast_send_buf_size="5M"
mcast_recv_buf_size="5M"
mcast_send_buf_size="5M"
max_bundle_size="64K"
enable_diagnostics="true"
thread_naming_pattern="cl"
thread_pool.min_threads="0"
thread_pool.max_threads="20"
thread_pool.keep_alive_time="30000"/>
<PING />
<MERGE3 max_interval="30000"
min_interval="10000"/>
<FD_SOCK/>
<FD_ALL/>
<VERIFY_SUSPECT timeout="1500" />
<BARRIER />
<pbcast.NAKACK2 xmit_interval="500"
xmit_table_num_rows="100"
xmit_table_msgs_per_row="2000"
xmit_table_max_compaction_time="30000"
use_mcast_xmit="false"
discard_delivered_msgs="true"/>
<UNICAST3 xmit_interval="500"
xmit_table_num_rows="100"
xmit_table_msgs_per_row="2000"
xmit_table_max_compaction_time="60000"
conn_expiry_timeout="0"/>
<pbcast.STABLE desired_avg_gossip="50000"
max_bytes="4M"/>
<pbcast.GMS print_local_addr="true" join_timeout="2000"/>
<UFC max_credits="10M"
min_threshold="0.4"/>
<MFC max_credits="10M"
min_threshold="0.4"/>
<FRAG2 frag_size="60K" />
<RSVP resend_interval="2000" timeout="10000"/>
<pbcast.STATE_TRANSFER />
</config>
这里每一个标签名代表的是org.jgroups.protocols这个包下面的类的名字。例如UDP类的路径为org.jgroups.protocols.UDP,pbcast.GMS路径为org.jgroups.protocols.pbcast.GMS。
这里的配置项都继承了Protocol抽象类,因此这里定义的就是Protocol Stack了。之所以叫栈(Stack),是因为实际layer的顺序是按照压栈顺序进行的。也就是说UDP会成为最后一个layer,而pbcast.STATE_TRANSFER会成为最上面的第一个layer。
protocal stack也可以在代码中定义。例如
Protocol[] prot_stack={
new UDP().setValue("bind_addr", InetAddress.getByName("127.0.0.1")), 2
new PING(),
new MERGE3(),
new FD_SOCK(),
new FD_ALL(),
new VERIFY_SUSPECT(),
new BARRIER(),
new NAKACK2(),
new UNICAST3(),
new STABLE(),
new GMS(),
new UFC(),
new MFC(),
new FRAG2()};
JChannel ch = new JChannel(prot_stack);
这里数组中的顺序和xml中顺序一样,也是bottom-first的。
# JChannel加入cluster/group的细节
connect()调用成功的时候,实际的组员才会以connected的形式加入到组里。也就是说假设一个JChannel调用connect方法时,当前组名中没有其他组员,这时这个组才会在逻辑上被创建。每个组的第一个组员(first member)会充当协调人(coordinator)的角色。协调人负责在组员发生变化时,生成新的view。因此实际上每个组员在获取view的时候,可以轻易地定位到协调人并从协调人那里获取view,而不需要和其他组员进行交流。
加入或创建组通常是使用connect接口直接完成的。
public synchronized JChannel connect(String cluster) throws Exception;
但有时在成功加入组的同时,我们往往会使用getState()方法判断集群或某个节点的状态。这就带来一个并发问题。如果当前节点先调用connect()方法,然后调用getState()方法,是不能保证getState()方法一定能获取到connect()连接时的状态的。因此JChannel设计了接口来解决这样的问题,目的是即避免了数据发送的分散,并且保证了二者操作的原子性。
public synchronized JChannel connect(String cluster_name, Address target, long timeout) throws Exception;
getState方法的返回值也要通过ReceiverAdapter中的回掉函数获取。原因在于state的message本身也是message,不能破坏JGroup的FIFO原则,因此也要当作message的一员返回。getState和setState接口参数不需要显示关闭,这个由JGroup来完成。假设A调用JChannel.getState(B),这里A的JChannel会触发setState回调,而B会触发getState回调。(触发顺序先B后A)
public void getState(OutputStream output) throws Exception;
public void setState(InputStream input) throws Exception;
# JChannel发送message的细节
channel的send()操作可以是异步的,而且因为JGroup有重传机制,在传对象、尤其是buffer的时候,要注意传的buffer不能被其他线程修改。
使用JChannel.setDiscardOwnMessages(boolean flag)来确定消息是否传给节点自己。
设置RSVP可以保证消息发送具有同步机制。当一个Message设置flag为RSVP,那么在所有的组员(不包括崩溃的节点)在确认接收到信息之前,发送者都会阻塞。并且这个flag的另一个作用是,可以保证当前节点在这个消息之前的消息,都被所有节点接收到了。比如P节点发送了1-10的消息,10标记为RSVP,那么当10消息的send成功返回时,P默认为所有节点都收到了1-10的消息。使用这个flag要保证协议栈中定义了重传和超时的layer,避免一直阻塞。
也可以设置RSVP_NB,达到上述要求的同时非阻塞。
# JChannel接收message的细节
这里有个注意点,我们定义接收的callback函数时,有如下接口:
public void receive(Message msg) {
// do something
}
这里我们不能直接将msg中的buffer作为一个后备参数传给别的函数,并返回。因为实际上msg可能涉及到重用问题,也就是说这个receive返回后,msg中的内容会发生变化。因此使用时要在函数内将msg的内容解析并开辟新的内存转换为自己想用的内容(比如反序列化,或者copy)。
MessageBatch函数默认将Batch遍历并调用receive。
# 参考
http://www.jgroups.org/manual4/index.html#JChannel