知道了The ONE消息创建(详情见博文《The ONE使用笔记:深入源码理解消息创建过程》)和转发(详情见博文《The ONE使用笔记:深入源码理解消息转发过程》),消息接收就相对简单了。
1. 接收消息
在ActiveRouter.update
会检查消息是否传输完毕,相关源代码如下:
//ActiveRouter.java public void update() { super.update(); if (con.isMessageTransferred()) { //消息是否传输完毕,即是否过了(1.0*m.getSize())/this.speed没 if (con.getMessage() != null) { transferDone(con); con.finalizeTransfer(); //完成消息传输 } } }
2.完成传输
2.1 con.finalizeTransfer
//Connection.java public void finalizeTransfer() { assert this.msgOnFly != null : "Nothing to finalize in " + this; assert msgFromNode != null : "msgFromNode is not set"; this.bytesTransferred += msgOnFly.getSize(); getOtherNode(msgFromNode).messageTransferred(this.msgOnFly.getId(), msgFromNode); //处理该传输完成的消息 clearMsgOnFly(); }
2.2 messageTransferred
messageTransferred
完成消息接收,将接收到消息根据情况放入相应的缓冲区,其调用关系是:DTNHost.messageTransferred --> ActiveRouter.messageTransferred --> MessageRouter.messageTransferred
。相关源代码如下:
//DTNHost.java public void messageTransferred(String id, DTNHost from) { this.router.messageTransferred(id, from); } //ActiveRouter.java public Message messageTransferred(String id, DTNHost from) { Message m = super.messageTransferred(id, from); /*** 相对于MessageRouter.messageTransferred, 增加了对response消息的处理 ***/ if (m.getTo() == getHost() && m.getResponseSize() > 0) { Message res = new Message(this.getHost(),m.getFrom(), RESPONSE_PREFIX+m.getId(), m.getResponseSize()); //generate a response message this.createNewMessage(res); this.getMessage(RESPONSE_PREFIX+m.getId()).setRequest(m); } return m; } //MessageRouter.java public Message messageTransferred(String id, DTNHost from) { Message incoming = removeFromIncomingBuffer(id, from); //将消息从incomingMessages删除 boolean isFinalRecipient; //消息传递到目的节点 isFinalRecipient = aMessage.getTo() == this.host; boolean isFirstDelivery; //消息传递到目的节点,且第一次 isFirstDelivery = isFinalRecipient && !isDeliveredMessage(aMessage); incoming.setReceiveTime(SimClock.getTime()); //设置消息接收时间 /*** 将消息交给应用层处理(如果有的话), 有些应用会丢弃该消息 ***/ Message outgoing = incoming; for (Application app : getApplications(incoming.getAppID())) { outgoing = app.handle(outgoing, this.host); if (outgoing == null) break; // Some app wanted to drop the message } Message aMessage = (outgoing==null)?(incoming):(outgoing); /*** 将消息根据实际情况放入相应的缓冲区 ***/ if (!isFinalRecipient && outgoing!=null) { addToMessages(aMessage, false); //incomingMessages --> messages } else if (isFirstDelivery) { this.deliveredMessages.put(id, aMessage); //incomingMessages --> deliveredMessages } else if (outgoing == null) { this.blacklistedMessages.put(id, null); // Blacklist messages that an app wants to drop. } return aMessage; } //MessageRouter.java Adds a message to the message buffer and informs message listeners protected void addToMessages(Message m, boolean newMessage) { this.messages.put(m.getId(), m); //放到消息缓冲区messages中 if (newMessage) { for (MessageListener ml : this.mListeners) { ml.newMessage(m); } } }
至此,消息接收完毕。
注:对于消息外部事件MessageRelayEvent
,当处理该事件processEvent
时,会调用messageTransferred
完成上述的过程。
微信赞赏
支付宝赞赏
请问在Epidemic路由中,源节点产生了一条消息时设置了一个TTL时间,中间节点复制转发该消息时,TTL时间是会重新赋一个跟开始源节点所设置相同的TTL值,还是原TTL值减去仿真已经进行的时间?或者是其他什么值?
跟消息创建的TTL值一样。
恩,看了源代码确实如此,不过我很奇怪,为什么ONE没有实现比较简单的Two hop relay的代码?你手头有相关代码,能够参考一下吗?
不懂你说的”two hop relay”是什么意思。
如果想要在获得节点的消息数量,并设计计数器进行统计来判断节点选择不同的缓存管理算法,那么这段代码应在哪里进行改动和添加呢?跪求回答
updated()
大神,是在active router中的update更改吗?还需要更改别的类了吗?
请问如果我要在消息发送完成之后,将该消息从发送节点中的缓存中删除,该怎样做
在
messageTransferred()
添加你的代码。感谢,还有一个问题:假如某个消息已经到达目的节点,那么这个消息的其他副本会继续在网络中传播吗
会的。
谢谢
节点在通信范围时会不断地建立连接,那么如何计算节点的相遇间隔时间呢
相遇间隔时间是从连接建立到连接断开的间隔。
Pingback: The ONE使用笔记:DirectDelivery路由 | | Spark & Shine
Pingback: The ONE使用笔记:每个updateInterval都做些什么(world.update) | | Spark & Shine
请问这里的isFirstDelivery 是表示消息第一次传输就到了目的节点的意思吗?
是的。
Pingback: The ONE使用笔记:目录 | Spark & Shine