如我们在前面所讲,JGroups是一个Java多播通信工具包,是对JDK的一种补充,所以本章主要讨论JGroups API,以及对API使用的简单示例。本章包括两部分,通道 API 及构建块 API,我们目的是通过本章的学习,使可以读者能够使用JGroups API构建自己的Java多播通信应用。
1.通道 API
这部分我们介绍构建可靠群组通信的 主要 类,我们主要集中于创建的使用通道。所有这些类位于 org.jgroups 包中。
1.1 工具类
org.jgroups.util.Util 包含许多实用的的方法,我们可以使用这些方法来简化我们的代码。
objectToByteBuffer(); objectFromByteBuffer();
第一种方法需要一个对象作为参数,序列化到一个字节缓冲区(该对象是可序列化或实现 externalizable 接口)。字节数组,然后返回。这种方法通常用于将对象序列化为字节缓冲区的消息。第二种方法返回一个重建的对象从一个缓冲区。这两种方法都抛出一个异常,如果该对象不能被序列化和反序列化。
objectToStream(); objectFromStream();
第一种方法需要一个对象,并将其写入到输出流。第二种方法需要输入流中读取对象。这两种方法都抛出一个异常,如果该对象不能被序列化和反序列化。
1.2 接口
接口被后面的一些实现类使用,所以我们先介绍 JGroups 通道接口。
1.2.1 MessageListener
MessageListener提供接收回调消息,获取和设定群组状态,通过流的形式获取和设定状态。
public interface MessageListener { void receive(Message msg); void getState(OutputStream output) throws Exception; void setState(InputStream input) throws Exception; }
当一个消息接收时 receive()方法被调用。getState()和 setState()方法用来获取群组状态和将自己的状态发送给群组,例如有成员加入到组或有成员离开组时需要重新设定状态。
1.2.2 MembershipListener
MembershipListener 接口与 MessageListener 接口类似:每当一个新的视图,怀疑消息,或块事件接收,对应的 MembershipListener 方法的实现将被调用。
public interface MembershipListener { void viewAccepted(View new_view); void suspect(Address suspected_mbr); void block(); void unblock(); }
通常情况回调实现类需要实现 viewAccepted() 方法,当新的成员加入群组,或已存在的成员离开群组或已存在的成员发生异常时通知接收者。当有成员被怀疑(发生异常)时 suspect() 方法被调运。
block() 方法用来通知组成员被阻止发送消息,这个具体是 FLUSH 协议完成的,比如在群组状态转换或视图在初始化过程中调运此方法来阻止成员发送消息。当 block() 方法被调运,所以成员的线程都被阻止发送消息,直到 FLUSH 调运 unblock() 方法来释放这些线程。
unblock() 方法用来通知组成员 FLUSH 协议已经完成,成员可以发送消息。
1.2.3 Receiver
public interface Receiver extends MessageListener, MembershipListener { }
Receiver 用来处理接收消息或视图发生变化等,当有消息接收到时 receive() 方法被调运,任何时候当视图发生变化时 viewAccepted() 方法被调运。
1.2.4 ReceiverAdapter
这个类实现了 Receiver 接口,所有实现方法为空,应用方法如果想实现回调我们需要继承 ReceiverAdapter,重写相关方法,通常我们实现receive() 方法和 viewAccepted() 方法。ReceiverAdapter 明细如下:
public class ReceiverAdapter implements Receiver { public void receive(Message msg) {} public void getState(OutputStream output) throws Exception {} public void setState(InputStream input) throws Exception {} public void viewAccepted(View view) {} public void suspect(Address mbr) {} public void block() {} public void unblock() {} }
1.2.5 ChannelListener
public interface ChannelListener { void channelConnected(Channel channel); void channelDisconnected(Channel channel); void channelClosed(Channel channel); }
一个类实现 ChannelListener 可以使用 Channel.addChannelListener()方法来注册获来获取通道状态的变化信息。当一个通道被关闭,断开或开启,相应的方法会被调用。
1.3 Address 接口
一个群组中的每个成员都有一个地址,它唯一标识成员。抽象这样的地址的接口是 Address,该地址需要具体的实现比较,排序等相关接口,JGroups的地址必须实现以下接口:
public interface Address extends Streamable, Comparable<Address>, Externalizable{ int size(); }
为了序列化及反序列化方便,size() 方法需要返回的地址实现的一个实例序列化形式占用的字节数。不要使用直接实现 Address,Address 作为一个不透明的集群节点的标识符!
实际上 Address 的实现需要依赖底层的通信协议(例如,UDP或TCP),这允许被使用的JGroups为所有可能的各种地址。由于一个地址唯一的标识一个通道,群组中的成员也是通过地址来发送消息到组中其他成员。
Address 的默认实现是 org.jgroups.util.UUID,它唯一标识节点,断开并重新连接到群集时,一个节点被赋予了新的 UUID。UUID 是不会直接显示出来,但通常显示为一个逻辑名称(之后会消息介绍逻辑名称)。这个名字可以通过用户或 JGroups,其唯一的目的是为了使日志输出的更具可读性。UUID 映射到 IP 地址和端口号,这些最终被传输协议用来来发送消息。
1.4 Message类
群组中成员之间以消息(org.jgroups.Message)的形式发送数据。一个成员可以发送一条消息给群组中的一个成员,也将这条消息发送给群组中的所有成员,群组中的所以成员共享一个通道。如下图为消息的结构示意图:
一条消息可以分为五个部分:
- 目的地址
- 源地址
- 标志符
- 有效负载
- 消息头
如下我们将对这五个部分依次做简单介绍。
1.4.1 目的地址
群组中接收成员的地址。如果为空,该消息将被发送到所有当前的群组成员。Message.getDest()返回消息的目的地地址。
1.4.2 源地址
发送者的地址。可以保留为空,如果为空,该字段将会由传输协议(如UDP)在将消息发送到网络上之前填写。
1.4.3 标识符
大小为一个字节。目前可识别的标志符有 OOB,DONT_BUNDLE,NO_FC,NO_RELIABILITY,NO_TOTAL_ORDER,NO_RELAY 和 RSVP。稍候我们将会做详细的讨论。
1.4.4 有效负载
实际的数据(以字节缓冲区的形式)。Message类包含方便的方法来设置一个序列化的对象,并检索了一遍,使用序列化将对象转换到/从一个字节缓冲区。如果缓冲区的子范围较大的缓冲区,一个消息会有一个偏移量和长度。
1.4.5 消息头
可以有一个队列的消息有连接到一个消息,任何不能够在有效负载中的信息可以作为消息头。方法putHeader(),getHeader()和removeHeader()的消息可以用于操纵控制消息头。需要声明,消息头是内部协议实现时做封装调运,应用程序代码不能够添加或删除消息头。
一条消息,是类似的一个IP包的有效载荷(一个字节缓冲器)和发送者和接收者(如地址)的地址组成。可以把网络上的任何消息路由到其目的地(接收地址),反馈消息可以返回到发送者的地址。
一条消息,通常并不需要填写发件人的地址发送邮件时,这是由协议栈将消息发送到网络上之前自动完成的。然而,可能存在这样的情况,当该消息的发送者希望得到一个从它自己的不同的地址,例如反馈消息发送到其他成员。
目标地址(接收器)可以是一个地址,表示一个成员的地址,例如确定从收到的消息,也可以是空的,这意味着该消息将被发送到该组的所有成员。一个典型的多播消息,发送字符串“Hello”的所有成员将看起来像这样:
Message msg=new Message(null, "Hello"); channel.send(msg);
1.5 消息头Header类
如上消息头是一个自定义的一个字节大小的信息,可以被添加到每个消息。JGroups的广泛使用消息头,例如添加序列号到每个消息(NAKACK 和 UNICAST),因此这些消息可以按照顺序发送。另外需要注意,消息头是被 JGroups 内部协议栈使用,外部应用程序不能够调运这些接口。
1.6 事件Event类
事件使 JGroups 协议栈之间可以相互交换信息,相比较消息,消息是群组中成员通过网络传输,而时间是在协议栈中由上到下或由下到上传输。类似消息头Header类,事件Event类是被 JGroups 内部协议栈使用,外部应用程序不能够调运这些接口。
1.7 视图View类
视图(org.jgroups.View)是一个群组中所有成员的列表。它由一个唯一标识自己的 ViewId 和所有成员的列表组成。每当有新的成员加入或现有的成员退出(或崩溃),视图就会被通道下层的协议栈自动重新安装。一个群组中的所有成员看到的视图是相同的。
需要注意,视图中所有成员在一个队列中,队列中第一个成员是协调者,因此群组中的成员可以很容易找到协调者而不需要与其他成员交互信息。任何情况视图中的第一个成员被认为是协调者。如果当前协调者因发生异常退出,视图队列更新,将当前异常协调者移除,这样群组中之前第二个成员变成第一个成员作为协调者。
下面的代码显示了如何发送(单播)消息给视图中的第一个成员:
View view = channel.getView(); Address first = view.getMembers().get(0); Message msg = new Message(first, "Hello world"); channel.send(msg);
1.7.1 ViewId
ViewId 是用来唯一标识视图,它包括的视图创建者的地址和一个序列号,ViewId 可以比较大小(实现了 equals()和hashCode())使用 HashMap 来保存。
1.7.2 MergeView
每当一组分裂成子类,例如由于网络分区,后面的子群合并到一起,这种情况一个 MergeView 代替 View 将被应用程序使用。MergeView 是 View 的一个子类,包含一些视图合并所需的变量,例如,视图 V1:(p,q,r,s,t) 分裂成 V2:(p,q,r)和 V2:(s,t),合并的视图可能是 V3:(p,q,r,s,t) ,在这种情况下 MergeView 包含变量 V2:(p,q,r)和 V2:(s,t)。
1.8 JChannel类
一个应用成员为了加入一个群组发送消息,首先它需要创建一个通道。通道是像一个插座。当客户端连接到一个通道,它给它想加入的群组一个名称。因此,通道(在其连接状态)总是与一个特定的组相关联。协议栈同样需要这个组的名字来找到彼此:当一个客户端连接到一个通道给定组的名称为 G,接着它会尝试找到具有相同名称的现有通道,并加入他们,这样一个新的视图是被安装(包含新的成员)。如果没有成员的存在,将创建一个新的组。下图为通道主要状态变化示意图:
当第一次创建一个通道,它是在未连接状态。如果通道在未连接状态执行某些特定方法(如发送/接收消息)会抛出异常。如果有客户端成功连接到通道,通道变为连接状态。在连接状态,通道可以接收/发送消息,同时也会收到通道中成员离开或新成员加入的通知消息。当通道断开,允许其返回到未连接状态。一个连接和未连接的通道都可以被关闭,这样设计也是为了通道的可重利用。当通道处于关闭状态,任何方法尝试操作通道都会抛出异常。当出于连接状态的通通执行关闭操作,首先通道变为未关闭状态,然后到关闭状态。
通道是构建可靠多播群组通信的最主要接口,接下来我们列出所有操作通道的方法,通过这些方法我们可以明细通道状态的变化。
1.8.1 创建通道
我们可以通过通道的构造方法创建一个通道。最被常用的方法如下:
public JChannel(String props) throws Exception;
props 参数指向一个 XML 文件,该文件定义了通道所使用的协议栈信息。如下为常见代码段:
JChannel ch = new JChannel("/home/kylin/udp.xml");
如果通道构造函数参数为null,默认的属性将被使用。如果无法创建通道,将会抛出一个异常。可能的原因包括协议中指定的属性参数,但没有被发现,或错误的协议参数。其他可选的构造方法如下:
public JChannel() throws Exception public JChannel(File properties) throws Exception public JChannel(Element properties) throws Exception public JChannel(URL properties) throws Exception public JChannel(ProtocolStackConfigurator configurator) throws Exception public JChannel(JChannel ch) throws Exception
如上这些构造函数的参数都定义了创建通道的协议栈。群组通信的各个成员通过通道彼此交换信息,而通道基于协议栈之上,所以我们需要在此处先简单描述一下协议栈,详细的描述将在下一章。如下为协议栈简单构成:
<config xmlns="urn:org:jgroups" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <UDP mcast_port="${jgroups.udp.mcast_port:45588}"/> <PING/> <MERGE2 /> <FD_SOCK/> <FD_ALL/> <pbcast.NAKACK2/> <UNICAST /> <pbcast.STABLE /> <pbcast.GMS /> <UFC /> <MFC /> <FRAG2 /> <RSVP /> <pbcast.STATE_TRANSFER /> </config>
如上一个协议栈由<config></config>元素包裹,该元素列出所有协议,从最底部的协议(UDP)到顶部协议(STATE_TRANSFER)。每一个元素定义了一个协议。
每一种协议都由一个Java类实现。当创建基于上述 XML 配置的协议栈,第一个元素(“UDP”)成为最下层的协议,第二个元素被至于元素(“UDP”)之上,类似堆栈协议创建将从底部到顶部。
协议栈中的每一个元素都有一个 Java 类,这些类位于 org.jgroups.protocols 包中。请注意,只有基本名称必须符合,而不是完全指定的类名(UDP,而非 org.jgroups.protocols.UDP)。如果没有找到,协议类的 JGroups 假设给定的名称是一个完全合格的类名,因此将尝试实例化这个类。如果这不起作用抛出一个异常。这使得协议类可以存在于不同的包,例如:一个有效的协议的名称可能是 com.sun.eng.protocols.reliable.UCAST。
每一层可以有零个或多个参数,这些参数在协议名称后括号中括号中以 key/value 的形式指定。在上面的例子中,UDP被配置一些选项,其中之一是IP组播端口(mcast_port)被设置为45588(如果没有使用系统参数 jgroups.udp.mcast_port 指定)。上面的协议栈中我们没有明细所有协议的参数,这些参数我们在接下来的章节详细讨论。
另为在一个群组中的所有成员必须使用相同的协议栈。
通常情况下,通道是通过传递一个 XML 配置文件的名称给 JChannel()构造函数来创建。除本声明的配置,JGroups 提供的API以编程方式创建一个通道。以这种方式首先是创建一个 JChannel,然后实例化一个 ProtocolStack,然后添加所有所需的协议到 ProtocolStack,最后调用init()在堆栈中设置它。如下是一个使用 JGroups API 创建通道的例子:
public class ProgrammaticChat { static final String BIND_ADDR = "192.168.1.101" ; public static void main(String[] args) throws Exception { JChannel channel = new JChannel(false); ProtocolStack stack = new ProtocolStack(); channel.setProtocolStack(stack); stack.addProtocols(new UDP().setValue("bind_addr", InetAddress.getByName(BIND_ADDR))) .addProtocol(new PING()) .addProtocol(new MERGE2()) .addProtocol(new FD_SOCK()) .addProtocol(new FD_ALL().setValue("timeout", 12000).setValue("interval", 3000)) .addProtocol(new VERIFY_SUSPECT()).addProtocol(new BARRIER()) .addProtocol(new NAKACK()).addProtocol(new UNICAST2()) .addProtocol(new STABLE()).addProtocol(new GMS()) .addProtocol(new UFC()).addProtocol(new MFC()) .addProtocol(new FRAG2()); stack.init(); channel.setReceiver(new ReceiverAdapter(){ public void receive(Message msg) { Address sender=msg.getSrc(); System.out.println(msg.getObject() + " [" + sender + "]"); } public void viewAccepted(View view) { System.out.println("view: " + view); }}); channel.connect("ChatCluster"); for (;;) { String line = Util.readStringFromStdin(": "); channel.send(null, line); } } }
1.8.2 给通道一个逻辑名称
通道可以指定一个逻辑名称。如果 JGroups 的逻辑名称没有被设置,则会自动生成一个,使用主机名和一个随机数,例如 localhost-35655。逻辑名称设定可以通过如下方法设定:
public void setName(String logical_name);
设定逻辑名必须要在连接通道之前进行,逻辑名设定后知道通道被销毁一直存在,如下为 JGroups 启动时显示设定的通道逻辑名:
GMS: address=JBoss Cluster, cluster=JChannelLogicNameTest, physical address=192.168.1.101:47094
如上通道逻辑名为 JBoss Cluster,物理地址为 192.168.1.101:47094。
1.8.3 自定义地址
我们可以自定义地址,这意味着,应用程序可以决定它使用什么样的地址。默认地址类型的 UUID,因为一些协议使用的 UUID,可以通过继承 UUID 来自定义自己的地址。这可以用来传递额外的数据通过地址,例如我们可以传递节点位置通过自定义地址。需要注意父类 UUID 中 equals(), hashCode() 及 compare() 则不需要改变。
1.8.4 加入到一个群组
当客户端要加入集群,它连接到一个通道通过要加入群集的名称:
public void connect(String cluster) throws Exception;
群集名称是要加入群集的名称。所有通道,调用connect()具有相同的名称,组成一个集群。集群中的任何通道上发送的消息,所有成员都能够收到。connect()方法一旦连接成功将会返回。如果通道在关闭状态,调运 connect()方法将会抛出异常。如果没有其他成员,即没有其他的成员连接到集群使用这个名字,然后一个新的群集创建和加入它作为第一个成员。在群集的第一个成员成为小组的协调员。协调器是负责当群组成员发生变化时安装新的视图。
1.8.5 在加入集群时获取一个成员的状态
客户端还可以加入集群,并在一次操作中获取集群状态。最好的方式来概念化的连接和获取状态的连接方法是把它作为一个常规的connect()和有getstate()连续执行的方法。然而,使用的连接,并获取定期连接状态连接方法有几个优点。首先,底层的消息交换是高度优化的,特别是如果使用 FLUSH 协议。但更重要的是,从客户的角度来看,连接和获取状态操作成为一个原子操作。
public void connect(String cluster, Address target, long timeout) throws Exception;
就像在一个普通的connect(),群集名称代表要加入集群。target 参数表示要获取状态的集群成员,如果 target 参数为空则表示获取群集中协调者的状态。timeout 参数表示加入群集和获取状态的时间,如果超过此时间则抛出异常。
1.8.6 获取本地地址和集群的名称
通道的方法getAddress()返回通道的地址的。当通道处于未连接状态时,该方法不可用。
public Address getAddress();
通道的方法 getClusterName()方法返回群集的名称。
public String getClusterName();
1.8.7 获取当前集群视图
使用如下方法可以获取当前状态的视图:
public View getView();
该方法获得当前通道的视图,视图在当有新成员加入或现有成员离开时发生变化。如果通道出于关闭或未连接状态时调运此方法返回空值。
1.8.8 发送消息
一旦客户连接到通道,我们可以使用下面中的任何一个 send() 方法发送消息:
public void send(Message msg) throws Exception public void send(Address dst, Object obj) throws Exception public void send(Address dst, byte[] buf) throws Exception public void send(Address dst, byte[] buf, int offset, int length) throws Exception
第一send()方法只有一个参数,就是要发送的消息。消息的目的地即可以是接收器的地址(单播)或空(多播)。当目标为空,则该消息将被发送到群集中的所有成员(包括其本身)。其它send()方法即可以发送字节数组也可以发送实现了序列化的对象。如果通道状态为关闭或未连接状态,发送方法或抛出异常。
如果目的地址为空值意味着该消息将被发送到集群中的所有成员。
channel.send(null, data);
当然,我们也可以将消息发送到特定的成员:
Address receiver = channel.getView().getMembers().get(0); channel.send(receiver, "hello world");
如上我们给集群的协调者(视图中的第一个成员)发送 hello world 字符串。
通常多播通信给群组中的所有成员发送消息,包括自己。我们可以通过如下方法屏蔽不给自己发送消息:
JChannel.setDiscardOwnMessages(boolean flag)
虽然JGroups的担保,消息最终会被送到在所有非故障的成员,有时这可能需要一段时间。例如,如果我们有一个重传协议负责确认,并当发送最后一条消息丢失,那么接收器将不得不等待,直到通知该消息已丢失,然后才可以重发。这是可以通过设置 Message.RSVP 标志:遇到这个标志时,消息发送阻塞,直到所有成员都确认接收消息。这也可作为另一个目的:如果我们发送一个有 RSVP 标记的消息,接着当 send()方法返回,我们确保所有成员接收消息。请注意,因为 RSVP 消息是昂贵的,可能会阻止发送者一段时间,它应该被谨慎使用。
要使用RSVP,必须做两件事情:首先 首先,RSVP 协议要在协议栈的配置中,如下:
<config> ... <RSVP /> ... </config>
其次,我们想要得到的同步确认的消息必须有设定 RSVP 标记:
Message msg=new Message(null, null, "hello world"); msg.setFlag(Message.RSVP); ch.send(msg);
1.8.9 接收消息
通常,客户端代码重写 ReceiverAdapter(或Receiver)中的 receive() 方法来负责处理接收消息。
public void receive(Message msg);
接收器可以通过通道使用JChannel.setReceiver()来注册。所以接收消息,视图状态变化都可以通过重写 ReceiverAdapter(或Receiver)中方法来实现监听。如下代码段:
49 channel.setReceiver(new ReceiverAdapter(){ 50 51 public void receive(Message msg) { 52 Address sender=msg.getSrc(); 53 System.out.println(msg.getObject() + " [" + sender + "]"); 54 } 55 56 public void viewAccepted(View view) { 57 System.out.println("view: " + view); 58 }});
如上51 行为重写 ReceiverAdapter receive() 方法,当有消息接收到时,该方法被调运;56 行重写 viewAccepted() 方法,当视图发生改变(有成员加入或退出)时,该方法被调运。
1.8.10 获取集群的状态
新加入的成员在开始工作之前,可能要检索的集群状态。这可以通过 getstate():
public void getState(Address target, long timeout) throws Exception;
此方法返回一个成员(通常是最年长的成员,即协调者)的状态。如果 target 参数为空则表示获取群集中协调者的状态。timeout 参数表示获取状态的时间,如果超过此时间则抛出异常。如果 timeout 参数值设为 0 则表示一直等待直到操作完成。需要注意,getState() 方法没有任何返回,如果直接返回会破坏通道 FIFO 属性,状态变化将会发生错误。
如果客户端代码需要参与到集群状态的变化,客户端代码需要在 ReceiverAdapter (Receiver) 的 getState() 和 setState() 中做相应的处理。
public void getState(OutputStream output) throws Exception; public void setState(InputStream input) throws Exception;
方法 getState() 作用于状态提供者(协调者),它需要将状态写到输出流,当写操作完成后不能够关闭输出流,如果关闭 JGroups 运行会抛出异常。而方法 setState() 作用于状态请求者,比如客户端代码执行了 JChannel.getState(), 它就是一个状态请求者,它需要从输入流读取当前状态,同样我们不能够关闭输入流,如果关闭 JGroups 运行会出错。如果要使用操作群组状态变化,一些状态变化相关的协议必须添加在协议栈的配置文件中,这些协议可以是 STATE_TRANSFER, STATE, 或 STATE_SOCK。
- STATE_TRANSFER 这是最原始的状态变化协议,底层使用字符缓存,在 getState() 和 setState() 方法中将字符缓存转换成输入流和输出流。
- STATE JGroups 3.0 后引入,它直接发送状态实例在状态请求者和状态提供者之间。
- STATE_SOCK 类似上面的协议,只是底层使用了 TCP 协议。
1.8.11 断开通道连接
断开一个通道是通过使用下面的方法来完成:
public void disconnect();
如果通道已经在断开或闭合的状态下,它不会有任何效果。如果连接时,它会离开集群会通过发送离开请求到目前的协调员。后者也将随之从视图中删除离开的节点,并安装一个新的视图中所有剩余的成员。断开连接成功后,该通道将是在未连接的状态下,可能会在日后重新连接。
1.8.12 关闭通道
要关闭释放一个通道实例(销毁相关联的协议栈,并释放所有的资源),使用 close()方法:
public void close();
关闭处于连接状态的通道,先断开通道再关闭通道。close()方法使通道出于关闭状态,通道出于关闭状态时没有进一步的操作是允许的。在这种状态下,通道实例不考虑再被任何一个应用程序使用。最终 JVM 垃圾回收器会释放通道使用的内存。
至此通道API介绍部分结束,我们在示例-1 基于JGroups的群组聊天程序(见本章结束部分)中展示一个示例程序,通过此程序我们具体实践使用通道API。
2 构建块 API
构建块位于通道之上,是对通道 API 的更高层面的抽象,如果客户端应用需要更高层抽象接口时可以用来代替通道层 API。通道是是简单的socket结构,构建块可以提供一些更抽象的接口,在某些情况下,构建块提供途径访问底层的通道,这样构建块不需要提供某些功能而通道可以直接被访问。构建块 API 位于在org.jgroups.blocks包中。
2.1 MessageDispatcher
通道是简单的模式,以异步方式发送和接收消息。然而,一个显着的群组通信模式的特性是同步通信。例如,发送方想将消息发送到组中,等待所有接收方响应。或其应用程序想将消息发送到组中,等待,直到大部分的接收器发送回响应,或直到发生超时。
MessageDispatcher提供阻塞(非阻塞)请求发送和响应的相互通信模式。它提供了同步(和异步)消息发送请求 - 响应的相互通信模式,如匹配一个或多个与原始请求的响应。使用这个类的一个例子是请求消息发送到所有群集成员,阻止,直到所有的响应已经收到,或直到超时已过。
与随后讨论的 RpcDispatcher 相反,MessageDispatcher 发送发送消息及得到相关接收者的响应,而 RpcDispatcher 调用远程方法并等待相关联的响应。RpcDispatcher扩展了 MessageDispatcher,并提供更高级别的抽象。RpcDispatcher 本质上是跨集群来执行远程方法调运。
MessageDispatcher 和 RpcDispatcher 都位于通道之上,因此创建一个实例 MessageDispatcher 需要一个通道作为参数。现在它可以被用来扮演客户端和服务器的双重角色:客户端发送请求和接收响应和服务器接收请求和发送响应,MessageDispatcher 允许一个应用程序在同一时间扮演这两个角色。为了能够提供服务器角色,执行服务器端的逻辑 RequestHandler.handle()方法被执行:
Object handle(Message msg) throws Exception;
每当接收到一个请求的handle()方法被调用。它必须返回一个值(必须是可序列化的,但不能为空),或者抛出一个异常。返回的值将被发送给请求发送者,同样异常也会传播给请求发送者。
2.1.1 RequestOptions
RequestOptions 用来保存执行运程方法调运的相关参数,使用 MessageDispatcher 发送消息或者通过 RpcDispatcher 调运远程节点成员的方法我们需要 RequestOptions 作为参数传递。RequestOptions 所携带的参数包括:
- 响应模式,决定调运是否阻塞,如果是,阻塞多长时间,具体模式包括:
GET_FIRST 阻塞直到第一个响应接收到
GET_ALL 阻塞直到所有响应接收到
GET_MAJORITY 阻塞知道大多数成员响应接收到
GET_NONE 非阻塞
- 超时时间,单位为毫秒,阻塞的最长时间,如果在这个时间里响应还没有接收到,TimeoutException 会被抛出;如果值为 0,则永远阻塞;如果响应模式为 GET_NONE 此参数无效被忽略
- 使用单播,如果为 true 表示我们将使用单播发送消息而不是发送多播消息
- 结果过滤,此属性用来过滤响应结果,或用户自己定义终止调运。比如,我们想得到 10 个成员的响应,但是当我们接收到 3 个非空的响应后我们就结束调运,在这种情况下我们可以通过此属性完成,稍候我们将做进一步的介绍
- 作用范围,并发消息处理时使用
- 标识符,标识消息,随后会有详细介绍
- 不包括列表,如果我们不想将消息发送给某一个成员,我们可以将他加入此列表。
2.1.2 MessageDispatcher 主要方法
public <T> RspList<T> castMessage(final Collection<Address> dests, Message msg, RequestOptions options) throws Exception public <T> NotifyingFuture<RspList<T>> castMessageWithFuture(final Collection<Address> dests, Message msg, RequestOptions options) throws Exception public <T> T sendMessage(Message msg, RequestOptions opts) throws Exception public <T> NotifyingFuture<T> sendMessageWithFuture(Message msg, RequestOptions options) throws Exception
由上之下,castMessage() 方法发送一个消息到一组成员,成员定义在 dests 中,如果 dests 为空则消息发送到当前群组中的所有成员。当然我们可以在消息中定义自己的目的地,如果消息中设定了自己的目的低则此项被重写。如果消息是同步模式(options.mode 定义),则 options.timeout 必须设定一个超时时间等待响应,超时后会抛出异常。castMessage() 返回一个 RspList 对象,该对象包括所有返回的信息,以目的成员地址/返回的键值对的形式保存所以返回。
执行 castMessageWithFuture() 方法立即返回且返回一个 NotifyingFuture 对象,我们可以给此对象注自己的 FutureListener 来处理响应信息。我们将在随后介绍此项用法。
sendMessage() 方法允许应用程序发送单播消息给群组中的某一个成员并等待该成员的响应。使用此方法时,我们必须在消息中定义接收者地址,否则 JGroups 会抛出异常。由于单播消息,响应模式属性可以忽略。
和castMessageWithFuture() 方法一样,sendMessageWithFuture() 方法也立即返回,返回一个 NotifyingFuture 对象,我们可以给此对象注自己的 FutureListener 来处理响应信息。
构建块接口的一个特点的加入容错能力,如果直接使用通道 API,如果远程出错则程序运行发生异常,但是如果使用构建块接口,如果远程调运出错,它同样会有结果返回,只是返回的消息标记为失败。RspList 应用方法处理各种情况:
public class RspList<T> implements Map<Address,Rsp> { public boolean isReceived(Address sender); public int numSuspectedMembers(); public List<T> getResults(); public List<Address> getSuspectedMembers(); public boolean isSuspected(Address sender); public Object get(Address sender); public int size(); }
getResults() 返回所有响应结果,numSuspectedMembers() 为所以失败的节点成员数,get() 返回特定成员的响应,isReceived() 用来检测某一成员的响应是否成功。
2.2 RpcDispatcher
RpcDispatcher 继承了 MessageDispatcher,它允许程序员调用远程方法,在所有(或单一)的集群成员选择等待的返回响应。应用程序通常会创建一个通道,然后基于该通道创建一个 RpcDispatcher。RpcDispatcher可以用来调用远程方法(客户端角色),并在同一时间可以被其他成员调运(服务器角色)。
相比 MessageDispatcher,RpcDispatcher 没有 handle()方法需要实现。相反要调用的方法可以直接定义在类上,且使用常规的方法定义(见实验三中例子)。底层使用反射来完成方法调运。要调用远程方法调用(单播和多播)使用下面的方法:
public <T> RspList<T> callRemoteMethods(Collection<Address> dests, String method_name, Object[] args, Class[] types, RequestOptions options) throws Exception public <T> RspList<T> callRemoteMethods(Collection<Address> dests, MethodCall method_call, RequestOptions options) throws Exception public <T> NotifyingFuture<RspList<T>> callRemoteMethodsWithFuture(Collection<Address> dests, MethodCall method_call, RequestOptions options) throws Exception public <T> T callRemoteMethod(Address dest, String method_name, Object[] args, Class[] types, RequestOptions options) throws Exception public <T> T callRemoteMethod(Address dest, MethodCall call, RequestOptions options) throws Exception public <T> NotifyingFuture<T> callRemoteMethodWithFuture(Address dest, MethodCall call, RequestOptions options) throws Exception
callRemoteMethods()方法参数 dests 指的是接收地址的列表。如果为null,所有的集群成员(包括发送者自己)的方法都被调运,该方法通常还包括一个字符串形的变量表示要调运的运程方法的名字和 RequestOptions (如前面所描述,用来指定包括响应模式,超时时间,是否单播调运等)。调运远程方法底层实现是 Java 反射技术,所以 callRemoteMethods()方法除了将远程方法名作为参数外,还需要传递远程方法的参数类型和参数值。JGroups 提供了 MethodCall 对象,用来封装反射调运所需要的方法名,方法参数及方法参数类型。和 MessageDispatcher 定义的方法类似,callRemoteMethods()可返回一个 RspList 对象或 NotifyingFuture 对象。RspList 包括所有成员的响应结果,包括出错异常(如果调运某一成员方法时发生异常,则返回异常)。NotifyingFuture 可用来注册 FutureListener 异步处理响应信息。
callRemoteMethod() 方法和 callRemoteMethods()方法相似,只是接收者为唯一的地址,该方法用来一对一发送消息。该方法的可以返回任意类型(与远程方法返回结果相同),同意也可以异步调运返回 NotifyingFuture 对象。
2.3 使用构建块 API 进行异步调运
当调用一个同步调用,调用线程被阻塞,直到响应收到为止。Future 对象允许方法调运立即返回,异步处理响应结果。如前面所描述 MessageDispatcher 和 RpcDispatcher 都提供了异步调运的方法,这些方法都会返回一个 NotifyingFuture 对象,我们可以给此对象注自己的 FutureListener 来处理响应信息。
2.3.1 NotifyingFuture
NotifyingFuture 接口继承了 java.util.concurrent.Future,提供了异步调运相关的方法,如 isDone(),isCancelled(),get()等,如下:
public interface NotifyingFuture<T> extends Future<T> { NotifyingFuture setListener(FutureListener<T> listener); }
setListener()方法用来注册监听器,处理响应结果。
2.3.2 FutureListener
public interface FutureListener<T> { void futureDone(Future<T> future); }
如上 FutureListener 接口定义了一个方法,当远程响应给方法被调运,我们只需要实现此方法来异步处理响应结果,Future 对象 get() 方法可以返回 RspList 对象。
2.4 返回过滤策略
响应返回过滤器允许应用程序代码挂接到接收群集成员的响应,可以让请求 - 响应的执行和相关代码知道响应是可以接受的,是否需要更多的答复,或是否调用(如果阻塞)可以返回。 RspFilter 接口如下所示:
public interface RspFilter { boolean isAcceptable(Object response, Address sender); boolean needMoreResponses(); }
isAcceptable()方法参数包括响应值和响应的发送者,并需要决定响应发送者是否添加到响应列表,响应发送者添加到响应列表返回true,否则返回false。 needMoreResponses()方法判断是否调用是否返回。
2.5 ReplicatedHashMap 与 ReplCache
JGroups 包中的 ReplicatedHashMap 与 ReplCache 位于 org.jgroups.blocks 包中,这两个类都是基于底层通道 API 来实现共享缓存的方案,具体前者在所有节点上共享资源,而后者是在指定个数的节点上共享资源。资源以键值对的方式保存在共享的缓存中,两个类都提供了 get(),put()等方法,集群中成员可以通过这些方法改变缓存。特别 ReplCache 提供分布式缓存相关的特性,它提供类似 put(K,V,R) 的方法,该方法中 R 表示 键值对资源在集群中保存几份,如果为 2 表示在集群中复制两份。
Comments