博文《The ONE使用笔记:深入源码理解消息创建过程》讲述了MessageEventGenerator如何创建消息。消息创建后,至于怎么发送(将哪些消息发送到哪些邻居),取决于路由策略。本文忽略路由策略,只介绍消息在The ONE是如何发送的。
1. 概述
1.1 消息流动
The ONE的消息缓冲区是messages,除此之外,还提供其他辅助缓冲区,如下,最后一个是供应用程序使用的,先不管它。
private HashMap<String, Message> incomingMessages; //正在发送的消息 private HashMap<String, Message> messages; //消息缓冲区,包括新创建的消息和接收到的消息 private HashMap<String, Message> deliveredMessages; //已成功投递的消息 private HashMap<String, Object> blacklistedMessages; //应用层删除的消息 The messages that Applications on this router have blacklisted
The ONE消息的转换如下图所示。新创建的消息放在messages
,正在传输的消息放在incomingMessages
;传输成功的消息若为目的节点则放在deliverredMessages
,否则放在messages
。
1.2 消息发送时机
MessageEventGenerator
创建的消息(确切的说,是由其产生消息创建事件,而后处理该事件,创建消息)放在节点的缓冲区(确切的说,是MessageRouter.java
的private HashMap<String, Message> messages)
。消息创建后,至于怎么发送(将哪些消息发送到哪些邻居),取决于路由策略。
自己写个路由MyRouter
,继承ActiveRouter
,ActiveRouter
又是继承MessageRouter
,其类图关系如下:
通常在自己的路由MyRouter
重写update()
方法,此时update
调用层次是这样的:MyRouter.update –> ActiveRouter.update –> MessageRouter.update
。在update
调用自己实现的tryOtherMessages
,在tryOtherMessages
通过相应的路由策略将要发送的消息以及发送到哪个节点(即消息-连接对)收集起来,最后调用tryMessagesForConnected
发送消息。主要代码如下:
private Tuple<Message, Connection> tryOtherMessages() { List<Tuple<Message, Connection>> messages = new ArrayList<Tuple<Message, Connection>>(); messages.add(new Tuple<Message, Connection>(m,con)); //将要发送的<Message, Connection>收集起来 return tryMessagesForConnected(messages); // try to send messages }
值得注意的是,tryMessagesForConnected
只处理一个消息的传送,而不是处理Tuple中的所有消息,因为一有消息发送,信道就被占用了。
//ActiveRouter.java protected Tuple<Message, Connection> tryMessagesForConnected(List<Tuple<Message, Connection>> tuples) { for (Tuple<Message, Connection> t : tuples) { Message m = t.getKey(); Connection con = t.getValue(); if (startTransfer(m, con) == RCV_OK) { //REV_OK相当于链路空闲,可以使用 return t; //只要有一个连接在传送就返回了 } } }
2. 开始传输
ActiveRouter.java
的startTransfer
开始传输(类似于网络层),调用Connection.java
的startTransfer
(类似于链路层)。
2.1 ActiveRouter.startTransfer
//ActiveRouter.java protected int startTransfer(Message m, Connection con) { int retVal; if (!con.isReadyForTransfer()) { //connection is up and there is no message being transferred return TRY_LATER_BUSY; } if (!policy.acceptSending(getHost(), con.getOtherNode(getHost()), con, m)) { //默认情况,所有消息都接受。 支持三种策略:simple policy, Hop Count, ModuleCommunicationBus(MCB) return MessageRouter.DENIED_POLICY; } retVal = con.startTransfer(getHost(), m); //相当于用物理层链路开始传输 if (retVal == RCV_OK) { // started transfer addToSendingConnections(con); //add connection(s) that are currently used for sending } else if (deleteDelivered && retVal == DENIED_OLD && m.getTo() == con.getOtherNode(this.getHost())) { this.deleteMessage(m.getId(), false); //final recipient has already received the msg -> delete it } return retVal; }
2.2 Connection.startTransfer
ActiveRouter.startTransfer
调用Connection.startTransfer
,Connection.startTransfer()
是抽象方法。Connection
有两个子类:CBRConnection
(A constant bit-rate connection)和VBRConnection
(The transmission speed is updated every round)。以CBRConnection
为例,其startTransfer
源码如下:
public int startTransfer(DTNHost from, Message m) { //判断没有消息在该connection传输,no message being transferred assert this.msgOnFly == null : "Already transferring " + this.msgOnFly + " from " + this.msgFromNode + " to " + this.getOtherNode(this.msgFromNode) + ". Can't " + "start transfer of " + m + " from " + from; this.msgFromNode = from; Message newMessage = m.replicate(); //复制消息 int retVal = getOtherNode(from).receiveMessage(newMessage, from); //Start receiving a message from another host if (retVal == MessageRouter.RCV_OK) { this.msgOnFly = newMessage; this.transferDoneTime = SimClock.getTime() + (1.0*m.getSize()) / this.speed; } return retVal; }
3. 开始接收消息
开始接收消息,把消息放到incomingMessages
,而不是直接放到messages
,因为传输需要时间(消息大小除以速度)。调用过程如下:DTNHost.receiveMessage –> ActiveRouter.receiveMessage –> MessageRouter.receiveMessage
,其源代码如下:
//DTNHost.java Start receiving a message from another host public int (Message m, DTNHost from) { int retVal = this.router.receiveMessage(m, from); if (retVal == MessageRouter.RCV_OK) { m.addNodeOnPath(this); // add this node on the messages path } return retVal; } //ActiveRouter.java重写了MessageRouter.java的receiveMessage,增加了判断 public int receiveMessage(Message m, DTNHost from) { int recvCheck = checkReceiving(m, from); //router isn't transferring, doesn't have the message and has room for it if (recvCheck != RCV_OK) { return recvCheck; } return super.receiveMessage(m, from); //调用MessageRouter.java的receiveMessage } //MessageRouter.java Try to start receiving a message from another host. public int receiveMessage(Message m, DTNHost from) { Message newMessage = m.replicate(); this.putToIncomingBuffer(newMessage, from); //才开始接收,还没接收完,先放在incomingMessages newMessage.addNodeOnPath(this.host); for (MessageListener ml : this.mListeners) { ml.messageTransferStarted(newMessage, from, getHost()); } return RCV_OK; // superclass always accepts messages }赞赏
微信赞赏
支付宝赞赏
您好,我想请问如果想要获取特定两个节点之间的connection应该怎么写代码啊?期待收到您的回复。
你好,我最近一直研究机会网络,刚开始仿真阶段,通过对ONE仿真器的学习,我发现仿真器中路由阶段同一时刻只能有一个节点转发信息,是这样的吗?也就是仿真器是单线程的?
的确,不过在每个updateInterval,都会遍历所有节点,所以相同的updateInterval,可以有多个节点同时转发消息。
您好,请问一下,计算每个信息的效用值时会用到全局参数,这个是在仿真场景中的变量,信息可以怎么获取到这个变量,有没有什么方法?
Which global variable did you mean?
比如仿真场景中的节点数?
Try this,
nrof_ hosts=SimScenario.getInstance().getHosts().size();
Spark如果我判断有哪些节点属于同一个节点的通讯范围内,one里有这样的方法可以调用吗?还是先要通过获取坐标计算距离之后再做判断呢?
这样够了吗?
for (Connection con : getConnections()) {
DTNHost other = con.getOtherNode(getHost());
}
Pingback: The ONE使用笔记:每个updateInterval都做些什么(world.update) | | Spark & Shine
刚开始学习THE ONE,感谢大神分享。
仔细看了一下源码,这里每次复制实际上Message的uniqueID都是不一样的(但是对于消息来说id和其他的属性是一致的)。是不是要理解为对于要传输的消息来说,只需要用id来区分,但是ONE有别的机制需要用uniqueID区分(比如链路中的和router中的消息)?所以要复制两次?
我现在回过头来看,复制两次的原因是用于区分不同阶段(startTransfer, receive)的消息(通过指定不同的uniqueID)。
谢谢博主~这两天又多看了一点,感觉ONE里面的实现机制和现实相比有点别扭。。。比如机会网络中的epidemic路由,在ONE中就是通过不停地update来实现泛洪的,而原论文中负责相遇节点感知的IMEP协议,还有hello、echo、sv交换和request这些全部都被拿掉了。如果想实现广播机制,ONE是否就不适用了?难道别人都没有这方面的需求和进一步研究吗?:(
是的,我最开始也很纠结仿真器与现实的差距,后面从仿真的角度去理解,也就不纠结了。
其实很多人都有广播的需求,包括我自己,在这里可以查看邮件组关于broadcast的讨论。
广播特性得自己实现。至于换不换仿真器,我个人看法是这样的,几乎没有一个仿真器可以满足你所有需求(比如ns2/3,是有广播机制,但不支持不含有位置节点的数据集),所以修改仿真器以适应自身仿真需求,可以视为你工作的一部分。
Spark您好!您把广播特性用代码实现过吗?实现的代码能发我一份参考一下吗?不胜感激。
抱歉,没有直接给源码的习惯。
我自己写了个,可是总是有错误,我又找不到,很是烦躁。
您好朋友,您将广播特性实现了吗在ONE里面?
Pingback: The ONE使用笔记:目录 | Spark & Shine
Pingback: The ONE使用笔记:深入源码理解消息接收过程 | Spark & Shine