ESFramework 4.0 快速上手 属于ESFramework的傻瓜式应用,对于开发一般的通信程序已经足够了,但是如果要构建更高性能更灵活更具扩展性的应用,还是必须要从基础开始了解ESFramework的内核机制。本篇是为进入ESFramework内核的第一步。)

 

需要交互的分布式系统之间通过消息来传递有意义的信息。消息是通信框架的核心。离开了消息,再谈通信框架就没有任何意义,所以,消息是ESFramework中一个最核心的概念。  

一. 消息的类别

        在具体的应用中,我们需要对消息的类别进行定义,这有助于我们分析和讨论问题。消息大致可以分为4个类别:请求消息、回复消息、报告、通知、P2P消息。

       Client/Server模式中,出现最多的便是请求消息和回复消息。这两种类别的消息非常容易理解。

       报告指的是Client/Server模式中客户端发送给服务端的消息。但这种消息不需要服务端的回复。比如,客户端可能需要将自己的当前状态上报给服务端,这就可以通过“报告”消息来传递。

       通知指的是Client/Server模式中服务端主动发送给客户端的消息。通常,这种消息用于告诉客户端它所关心的服务器上的某个状态发生了变化。比如,即时通讯应用中,某个好友上线了,服务端就可以发一个通知来提醒当前用户。

       P2P消息指的是Client/Server模式中客户端与客户端之间通信的消息。这种消息可能通过服务器中转,也可能直接经过P2P通道发送。 

       对于P2P消息,可以进行更进一步的分类,因为P2P消息中也可能存在请求、回复、通知、报告等不同的更具体的类型。通信框架并不关心P2P消息的更详细分类,但是我们的应用程序中应该认真地区分它们。       

 

二. 消息的两种形态

消息的两种形态指的是消息在网络上进行传输的形态和消息在通信框架内部(或应用程序中)的形态。

在网络上,消息表现为一串字节流;而在基于面向对象的通信框架内部,消息通常表示为一个对象。为了使消息能在分布式系统之间进行传递和处理,则要求消息能在这两种形态之间进行转换,而且这种转换必须是无损的和可逆的。所谓“无损”,指的是在转换的过程中不会遗失任何信息;所谓“可逆”,指的是消息从一种形态转换到另一种形态,然后再从另一种形态转换到原来的形态,所得到的结果应该跟初始状态完全相同。

我们将消息对象转换为字节流的过程称为“消息序列化”,将字节流转换为消息对象的过程称为“消息解析”。

在通信框架内部识别一个消息非常容易,因为它们就是一个个的对象实例;而在网络上识别一个消息就要复杂很多,你需要判断出哪个字节是一个消息的开始,又到哪个字节是一个消息的结束。

 

三. 消息协议

通过网络相互通信的系统之间要想正常交互,它们必须有“共同的语言”,这种语言就是消息协议。遵守消息协议的消息才能被我们的系统所理解。

由于系统的底层通信会被ESFramework通信框架所解决,所以框架会给出一个抽象的消息协议(通过接口来展现),其最主要的目的是使通信框架能非常清晰的识别一个单独、完整的网络上的消息。系统中的消息必须遵循这个消息协议。

消息协议主要可以分为两类:文本协议(如基于xml的协议)和流协议(又称为二进制协议)。

1. 文本协议

“文本协议”采用起始标志符和结束标志符的方法来标志一个完整的消息。比如,规定所有的消息都以“#”开始,并且以“#”结束。

 如你所想,“文本协议”方案有着天生的缺陷,无论你采用哪个字符(或多个字符)作为起始结束标志,在消息的中间都可能出现同样的字符,如果出现这样的消息,就会导致该消息的识别发生错误,而且如果是在同一个Tcp连接上传输,就很可能导致后续所有消息的识别都出错。所以,如果使用文本协议,一定要保证消息的内容中不要包含标识符,一般可以使用复杂的标识符(连续地多个字符)或者转义符来解决这个问题。

采用标志符来识别消息还有一个缺陷,那就是效率低下。因为我们需要逐个扫描每一个字符来判断它是否是指定的标志符。

幸运的是,我们有更好的办法来解决这个问题,那就是使用“流协议”。

2. 流协议

流协议规定网络上传递的任何一个消息必须符合以下规则:

(1) 消息由“消息头”(Message Header)和“消息体”(Message Body)构成,消息体可以为空。

(2) 如果有消息体,则消息体必须紧接在消息头的尾部。

(3) 消息头中至少包含了一个信息,那就是消息体的长度。

(4) 同一个应用中所有的消息的消息头的长度都是固定的。

我们来分析一下,流协议是如何解决字符协议的缺陷的。当通信框架接收到一个网络消息字节流时,首先判断这个流的长度,如果长度小于应用指定的消息头的固定长度,则丢弃这个消息(或等待后续的字节流进行消息重组),否则解析消息头,并从消息头中获取消息体的长度,如果发现接收的字节流中消息体的长度小于预期的长度时,则丢弃这个消息(或等待后续的字节流进行消息重组),否则就可以对消息体进行解析了。

在采用基于非连接的通信协议(如Udp)时,我们可以直接丢弃不完整的消息;而在采用基于连接的通信协议(如Tcp)时,我们则可能需要对过长的消息进行分裂和对不完整的消息进行重组。由于类似Tcp这样的通信协议会保证数据正确无误地按顺序传送,所以依据上述的解析字节流的过程,通信框架会从字节流中正确地分解出一个个完整的消息,而不会发生错漏。 

四.ESFramework中的消息定义 

ESFramework即支持文本协议,也支持流协议,并且ESFramework提供了这两种类型的消息的消息头的实现、以及消息的序列化及消息的解析。使用ESFramework提供的实现,你就再不用为消息的封装、字节数组到消息的相互转换、消息粘包、消息断裂、消息重组等等这些事情而伤脑筋了。

我们先来看看消息接口的抽象定义IMessage:

    /// <summary>
    
/// 在网络上传递的消息的基础接口。    
    
/// </summary>    
    public interface IMessage
    {
        
/// <summary>
        
/// 发送消息的源地址。任何一个NetEngine都会保证,在接收完一条消息的时候,会为该字段正确地赋值。
        
/// </summary>
        IUserAddress SourceAddress { get; }

        
/// <summary>
        
/// 消息头。
        
/// </summary>
        IMessageHeader Header { get; }

        
/// <summary>
        
/// 获取整个消息的长度。如果为文本消息,则为整个文本的长度;如果为流消息,则为消息转化为流之后流的长度。
        
/// </summary>       
        int GetMessageLength();

        
/// <summary>
        
/// 将消息转化为字节流,以备发送或持久化。
        
/// </summary>        
        byte[] ToStream();
    }

  GetMessageLength方法,用于获取将当前消息对象转换为字节流后的长度。通常,在将消息对象转换成字节流之前,我们先得分配内存空间来存储结果字节流,通过这个方法,我们可以知道需要分配的内存空间有多大。
  ToStream方法用于将当前消息对象序列化为字节流。
  ESFramework规定每个消息必须有消息头IMessageHeader,消息头不能为null,ESFramework框架中的很多组件都需要根据消息头中的相关信息对消息进行相应的处理,比如加解密、分派、转发等。IMessageHeader定义如下:

    /// <summary>
    
/// 消息头接口,规定了消息头中至少包含的信息:发送者、接收者、目录类型、消息类型、消息ID。
    
/// zhuweisky 2005.12.07 
    
/// </summary>
    public interface IMessageHeader :ICloneable
    {
        
/// <summary>
        
/// 发出本消息的用户编号。注意,普通用户编号只能由数字和字母组成,并且其最大长度会由IMessageHeader的实现类给出具体规定。
        
/// </summary>
        string UserID { get;set;}

        
/// <summary>
        
/// 接收消息的目标用户编号。注意,普通用户编号只能由数字和字母组成,并且其最大长度会由IMessageHeader的实现类给出具体规定。        
        
/// </summary>
        string DestUserID { get;set;}  

        
/// <summary>
        
/// 消息的类型。
        
/// </summary>
        int MessageType { get;set;}

        
/// <summary>
        
/// 每个消息实例的唯一标志(也可用于将功能请求与其回复一一对应起来)
        
/// </summary>
        int MessageID { get;set;}        

        
IStringEncoder StringEncoder { get; }
    }    

  UserID属性指示了该消息是由哪个用户发出的,通信框架通过UserID对在线用户进行管理或对目标用户的在线活动进行跟踪。通常,用户都通过客户端登录到服务器,如果消息是从服务器直接发出的,那么该消息的UserID的值取为ESFramework.NetServer.SystemUserID,这是一个常量值。
  DestUserID属性仅在P2P消息或通知类型的消息中有效,它表示这个P2P消息将被发送给哪个指定的用户。对于P2P消息,服务端会根据消息头中的DestUserID将其转发给目标用户。当然,并不是所有的P2P消息都需要经过服务器进行转发,如果P2P通道可用,P2P消息将直接通过用户之间的P2P通道进行传送。

  MessageType属性表名了这条消息的具体类型,不同的ServiceKey标志着不同的消息类型。比如这是一个请求当前天气温度的请求消息,还是一个发给在线好友的聊天文字消息。消息分派器会根据消息的MessageType来将其分派到合适的消息处理器上进行处理。

  特别要指出的是,在具体设计MessageType时,对于一些简单的情况,可以将一对请求/回复消息的MessageType取同一个值。而对于复杂的情况,比如,P2P消息中的一对请求/回复消息,则MessageType必须取不同的值,否则,通信框架将无法依据MessageType判断收到的P2P消息到底是一个请求、还是一个回复。因为在客户端与客户端之间进行P2P通信时,请求和回复消息是双向的,即每一方都可以发送同样类型的请求和给出回复。而在客户端与服务端进行通信时,通常是客户端发送请求,服务端给出回复,在这种情况下,服务端发送同样类型的请求给客户端的可能性非常小,如果出现这种情况,仍然需要为一对请求/回复消息的MessageType取不同的值。

  MessageID属性是每个消息的唯一标志,也就是说框架中处理的任何两个消息的MessageID是不一样的。除了唯一标志每个消息外,在ESFramework中MessageID还有两个额外的用途。第一个用途是在基于非连接的通信中,消息监控器可以根据MessageID过滤掉重复的消息。第二个用途是对于请求/回复消息,使用MessageID可以将请求消息和回复消息一一对应起来。

  

  ESFramework内置了文本消息和流消息的基本封装,如下所示:

  

  从ESFramework的帮助文档“ESFramework4.0.chm”中可以了解更多关于StreamMessage和TextMessage的内容,这里不再赘述了。  

  实际上,ESFramework根本不用关心消息体对象如何解析(如StreamMessage的Body属性就是一个byte[],框架不需要解析它),对于消息体的解析是交给应用来完成的,毕竟,消息体的具体内容都是与应用紧密相关的,通信框架不会尝试去了解消息体中包含了哪些信息。通信框架只关心消息头这一个协议对象,消息头中包含了足够的信息让通信框架来完成分派、监控、转换、和处理等任务。

  应用层如何构造和解析消息了?ESFramework通过IContractHelper接口来进行规范,这个接口由应用程序去实现。

    /// <summary>
    
/// 与消息协议相关的决策。
    
/// 2005.10.07
    
/// </summary>
    public interface IContractHelper :IStringEncoder 
    {
        
/// <summary>
        
/// 获取消息的类型Common/P2PMessage/Broadcast。
        
/// 对于Passive来说,如果是P2PMessage/Broadcast,则可以通过P2PChannel发送。
        
/// 对于Server/Passive来说,如果是P2PMessage/Broadcast,可以决定MessageFilter是否捕获它(避免无谓的加密、解密等)。
        
/// 是否为P2PMessage/Broadcast,并不对消息处理器的选择产生任何影响,消息处理器仍然是根据messageType来选择的。
        
/// </summary>       
        MessageStyle GetMessageStyle(int messageType);      
        
        
/// <summary>
        
/// 解析消息体。
        
/// </summary>
        
/// <typeparam name="TBody">消息体的类型</typeparam>
        
/// <param name="msg">被解析的消息</param>
        
/// <returns>解析得到的消息体对象</returns>
        TBody ParseBody<TBody>(IMessage msg) where TBody :class ,new();

        
/// <summary>
        
/// 创建消息。实现该方法时,需要根据body的长度来设置header中MessageBodyLength属性。通常给Server端使用。
        
/// </summary>       
        IMessage CreateMessage<TBody>(IMessageHeader header, TBody body) where TBody : class;   
    }

   IStringEncoder接口用于编码和解码字符串,比如,你可以使用Unicode或者UTF-8对字符串进行编解码。

    /// <summary>
    
/// 字符串编解码器接口。
    
/// </summary>
    public interface IStringEncoder
    {
        
/// <summary>
        
/// 将字节流解码为字符串。
        
/// </summary>     
        string GetStrFromStream(byte[] stream, int offset, int len);

        
/// <summary>
        
/// 对字符串进行编码返回字节数组。
        
/// </summary>       
        byte[] GetBytesFromStr(string ss);
    }

   ESFramework使用MessageStyle对消息进行一个基本的分类:

    /// <summary>
    
/// 消息分类,Common/P2PMessage/Broadcast。
    
/// 对于Passive来说,如果是P2PMessage/Broadcast,则可以通过P2PChannel发送。
    
/// 对于Server/Passive来说,如果是P2PMessage/Broadcast,可以决定MessageFilter是否捕获它(避免无谓的加密、解密等)。
    
/// 是否为P2PMessage/Broadcast,并不对消息处理器的选择产生任何影响,消息处理器仍然是根据messageType来选择的。   
    
/// </summary>
    public enum MessageStyle
    {
        
/// <summary>
        
/// 普通消息。由Passive发出最终由Server接收的消息,以及由Server发出而由Passive接收的消息。
        
/// </summary>
        Common,
        
/// <summary>
        
/// 由Passive发出最终由另外一个Passive接收的消息。
        
/// </summary>
        P2P,
        
/// <summary>
        
/// 广播消息。由Passive发出最终由多个Passive接收的消息。
        
/// </summary>
        Broadcast
    }

 

 

五.ESPlus提供了默认消息头和IContractHelper的默认实现

  如果应用程序没有特殊的要求,可以直接使用ESPlus中提供的消息头ESPlus.Core命名空间中的TextMessageHeader和StreamMessageHeader以及对应的TextPlusContractHelper和StreamPlusContractHelper ,而免去了自己定义消息头和构造解析消息的工作。        

   TextMessageHeader的定义如下:

    /// <summary>
    
/// 常用的基于文本的消息头定义。
    
/// </summary>
    [Serializable]
    
public class TextMessageHeader :IMessageHeader
    {
        
#region Ctor  
        
public TextMessageHeader() { }
        
public TextMessageHeader( string _userID, int _messageType, string _destID ,int _messageID)
        {            
            
this.MessageType = _messageType;            
            
this.UserID = _userID;
            
this.messageID = _messageID;
            
this.DestUserID = _destID;
        }    
        
#endregion

        
#region IMessageHeader 成员   
        
#region UserID
        
private string userID;
        
public string UserID
        {
            
get
            {
                
return this.userID;
            }
            
set
            {
                
this.userID = value;
            }
        }
        
#endregion

        
#region DestUserID
        
private string destUserID = "";
        
public string DestUserID
        {
            
get { return destUserID; }
            
set { destUserID = value; }
        }
        
#endregion
       
        
#region MessageType
        
private int messageType;
        
public int MessageType
        {
            
get { return messageType; }
            
set { messageType = value; }
        }
        
#endregion       

        
#region MessageID
        
private int messageID = 0;
        
public int MessageID
        {
            
get { return messageID; }
            
set { messageID = value; }
        }
        
#endregion        

        
#region StringEncoder
        
public IStringEncoder StringEncoder
        {
            
get
            {
                
return DefaultStringEncoder.Instance;
            }
        }
        
#endregion 
        
#endregion       
    
        
#region ICloneable 成员

        
public object Clone()
        {
            
return this.MemberwiseClone();
        }

        
#endregion
    }

 

  StreamMessageHeader的定义稍微复杂些,如下所示:

    /// <summary>
    
/// 常用的基于流的消息头定义。固定长度36 ,UserID允许最大长度为11。MessageType取值为0~65535
    
/// </summary>
    [Serializable]
    
public class StreamMessageHeader : IStreamMessageHeader
    {
        
public static readonly int MessageHeaderLength = 36 ;
        
public static readonly ushort MessageStartToken = 0xFFFF;

        
#region Ctor
        
public StreamMessageHeader() { }
        
public StreamMessageHeader(string _userID, int _messageType, int _bodyLen, string _destID, int _messageID)
        {           
            
this.MessageType = _messageType;            
            
this.MessageBodyLength = _bodyLen;
            
this.UserID = _userID;
            
this.DestUserID = _destID;
            
this.messageID = _messageID;
        }    
        
#endregion

        
#region IMessageHeader 成员
        
#region Token
        
private ushort startToken = StreamMessageHeader.MessageStartToken;
        
public ushort StartToken
        {
            
get { return startToken; }
            
set { startToken = value; }
        } 
        
#endregion

        
#region UserID
        
private string userID = "";
        
public string UserID
        {
            
get
            {
                
return this.userID;
            }
            
set
            {
                
this.userID = value;               
            }
        }  
        
#endregion

        
#region DestUserID
        
private string destUserID = "";
        
public string DestUserID
        {
            
get
            {
                
return this.destUserID;
            }
            
set
            {
                
this.destUserID = value;               
            }
        }  
        
#endregion       

        
#region MessageType
        
private ushort messageType = 0;
        
public int MessageType
        {
            
get
            {
                
return this.messageType;
            }
            
set
            {
                
this.messageType = (ushort)value;
            }
        } 
        
#endregion

        
#region MessageID
        
private int messageID = 0;
        
public int MessageID
        {
            
get
            {
                
return this.messageID;
            }
            
set
            {
                
this.messageID = value;
            }
        } 
        
#endregion

        
#region StringEncoder
        
public IStringEncoder StringEncoder
        {
            
get
            {
                
return DefaultStringEncoder.Instance;
            }
        } 
        
#endregion

        
#endregion        

        
#region Clone
        
public object Clone()
        {
            
return new StreamMessageHeader(this.userID, this.messageType, this.messageBodyLength, this.destUserID, this.messageID);  
        }
        
#endregion

        
#region IStreamMessageHeader 成员

        
#region MessageBodyLength
        
private int messageBodyLength = 0;
        
public int MessageBodyLength
        {
            
get
            {
                
return this.messageBodyLength;
            }
            
set
            {
                
this.messageBodyLength = value;
            }
        }
        
        
#endregion

        
#region HeaderLength
        
public int HeaderLength
        {
            
get
            {
                
return StreamMessageHeader.MessageHeaderLength;
            }
        } 
        
#endregion

        
#region ParseHeader
        
public static StreamMessageHeader ParseHeader(byte[] data, int offset)
        {
            
try
            {
                StreamMessageHeader header 
= new StreamMessageHeader();
                
int curOffset = offset;
                header.startToken 
= BitConverter.ToUInt16(data, curOffset);
                curOffset 
+= 2;
                header.messageType 
= BitConverter.ToUInt16(data, curOffset);
                curOffset 
+= 2;
                header.messageID 
= BitConverter.ToInt32(data, curOffset);
                curOffset 
+= 4;
                header.messageBodyLength 
= BitConverter.ToInt32(data, curOffset);
                curOffset 
+= 4;
                
byte userIDLen = data[curOffset];
                curOffset 
+= 1;
                header.userID 
= DefaultStringEncoder.Instance.GetStrFromStream(data, curOffset, userIDLen);
                curOffset 
+= 11;
                
byte destUserIDLen = data[curOffset];
                curOffset 
+= 1;
                header.destUserID 
= DefaultStringEncoder.Instance.GetStrFromStream(data, curOffset, destUserIDLen);
                curOffset 
+= 11;

                
return header;
            }
            
catch
            {
                
return null;
            }
        }
        
#endregion

        
#region ToStream
        
public byte[] ToStream()
        {           
            
byte[] stream = new byte[this.HeaderLength];
            
int offset = 0;

            
#region FillFields   
            
byte[] bStartToken = BitConverter.GetBytes(this.startToken);
            
for (int i = 0; i < bStartToken.Length; i++)
            {
                stream[offset 
+ i] = bStartToken[i];
            }
            offset 
+= 2;     

            
byte[] bMessageType = BitConverter.GetBytes(this.messageType);
            
for (int i = 0; i < bMessageType.Length; i++)
            {
                stream[offset 
+ i] = bMessageType[i];
            }
            offset 
+= 2;         

            
byte[] bRandomNum = BitConverter.GetBytes(this.messageID);
            
for (int i = 0; i < bRandomNum.Length; i++)
            {
                stream[offset 
+ i] = bRandomNum[i];
            }
            offset 
+= 4;

            
byte[] bMessageBodyLength = BitConverter.GetBytes(this.messageBodyLength);
            
for (int i = 0; i < bMessageBodyLength.Length; i++)
            {
                stream[offset 
+ i] = bMessageBodyLength[i];
            }
            offset 
+= 4;

            
#region UserID
            
byte[] bUserID = DefaultStringEncoder.Instance.GetBytesFromStr(this.userID);
            stream[offset] 
= (byte)bUserID.Length;
            offset 
+= 1;

            
for (int i = 0; i < bUserID.Length; i++)
            {
                stream[offset 
+ i] = bUserID[i];
            }
            offset 
+= 11
            
#endregion

            
#region DestUserID
            
byte[] bDestUserID = DefaultStringEncoder.Instance.GetBytesFromStr(this.destUserID);
            stream[offset] 
= (byte)bDestUserID.Length;
            offset 
+= 1;

            
for (int i = 0; i < bDestUserID.Length; i++)
            {
                stream[offset 
+ i] = bDestUserID[i];
            }
            offset 
+= 11;     
            
#endregion       

            
#endregion
            
return stream;
        }
        
#endregion

        
#region ToStream2
        
public void ToStream(byte[] buff, int offset)
        {
             
byte[] stream = this.ToStream();
             Buffer.BlockCopy(stream, 
0, buff, offset, stream.Length);           
        }
        
#endregion
        
#endregion
    }    

 

  大家要特别注意:

(1)消息头转化为流后的长度必须是固定的,即36字节。所以ToStream方法得到的byte[]的长度一定是36。

(2)MessageBodyLength属性指明了消息体的长度,ESFramwork依据该属性判断一条完整的消息是否接收完毕。

(3)如果使用ESPlus定义的流消息头,则应用程序必须保证UserID的长度不超过11位。否则,需要应用程序定义自己的消息头,以容纳更大长度的UserID。

(4)字符串编解码采用UTF-8。

 

   StreamPlusContractHelper采用了紧凑的序列化器ESBasic.Serialization.ICompactSerializer来序列化消息体(后面再详加拆解),而TextPlusContractHelper采用XML来进行文本消息的封装。

   为了使通信的消息个头更小,我们建议使用流协议 -- 即使用StreamMessage和对应的StreamPlusContractHelper,这样可以节省大量的带宽,而且消息解析/构造的性能也不会降低。

 

  关于消息的介绍就到这里,下篇我们将介绍ESFramework中的网络引擎部分。 

 

关于ESFramework 4.0的链接:

ESFramework 4.0 概述

ESFramework 4.0 快速上手

ESFramework 4.0 性能测试

 

作者: zhuweisky 发表于 2010-12-24 10:14 原文链接

推荐.NET配套的通用数据层ORM框架:CYQ.Data 通用数据层框架
新浪微博粉丝精灵,刷粉丝、刷评论、刷转发、企业商家微博营销必备工具"