The ONE使用笔记:深入源码理解消息转发过程

博文《The ONE使用笔记:深入源码理解消息创建过程》讲述了MessageEventGenerator如何创建消息。消息创建后,至于怎么发送(将哪些消息发送到哪些邻居),取决于路由策略。本文忽略路由策略,只介绍消息在The ONE是如何发送的。

1. 概述

1.1 消息流动

The ONE的消息缓冲区是messages,除此之外,还提供其他辅助缓冲区,如下,最后一个是供应用程序使用的,先不管它。

private HashMap<String, Message> incomingMessages;   //正在发送的消息
private HashMap<String, Message> messages;           //消息缓冲区,包括新创建的消息和接收到的消息
private HashMap<String, Message> deliveredMessages;  //已成功投递的消息

private HashMap<String, Object> blacklistedMessages; //应用层删除的消息  The messages that Applications on this router have blacklisted

The ONE消息的转换如下图所示。新创建的消息放在messages,正在传输的消息放在incomingMessages;传输成功的消息若为目的节点则放在deliverredMessages,否则放在messages

image

1.2 消息发送时机

MessageEventGenerator创建的消息(确切的说,是由其产生消息创建事件,而后处理该事件,创建消息)放在节点的缓冲区(确切的说,是MessageRouter.javaprivate HashMap<String, Message> messages)。消息创建后,至于怎么发送(将哪些消息发送到哪些邻居),取决于路由策略。

自己写个路由MyRouter,继承ActiveRouterActiveRouter又是继承MessageRouter,其类图关系如下:

MessageRouter_class_diagram

通常在自己的路由MyRouter重写update()方法,此时update调用层次是这样的:MyRouter.update –> ActiveRouter.update –> MessageRouter.update。在update调用自己实现的tryOtherMessages,在tryOtherMessages通过相应的路由策略将要发送的消息以及发送到哪个节点(即消息-连接对)收集起来,最后调用tryMessagesForConnected发送消息。主要代码如下:

private Tuple<Message, Connection> tryOtherMessages() {
    List<Tuple<Message, Connection>> messages = new ArrayList<Tuple<Message, Connection>>();
    messages.add(new Tuple<Message, Connection>(m,con)); //将要发送的<Message, Connection>收集起来

    return tryMessagesForConnected(messages);    // try to send messages
}

值得注意的是,tryMessagesForConnected只处理一个消息的传送,而不是处理Tuple中的所有消息,因为一有消息发送,信道就被占用了。

//ActiveRouter.java
protected Tuple<Message, Connection> tryMessagesForConnected(List<Tuple<Message, Connection>> tuples) {
    for (Tuple<Message, Connection> t : tuples) {
        Message m = t.getKey();
        Connection con = t.getValue();

        if (startTransfer(m, con) == RCV_OK) {  //REV_OK相当于链路空闲,可以使用
            return t;                            //只要有一个连接在传送就返回了
        }
    }
}

2. 开始传输

ActiveRouter.javastartTransfer开始传输(类似于网络层),调用Connection.javastartTransfer(类似于链路层)。

2.1 ActiveRouter.startTransfer

//ActiveRouter.java
protected int startTransfer(Message m, Connection con) {
    int retVal;

    if (!con.isReadyForTransfer()) {  //connection is up and there is no message being transferred
        return TRY_LATER_BUSY;
    }

    if (!policy.acceptSending(getHost(), con.getOtherNode(getHost()), con, m)) { //默认情况,所有消息都接受。 支持三种策略:simple policy, Hop Count, ModuleCommunicationBus(MCB)
        return MessageRouter.DENIED_POLICY;
    }

    retVal = con.startTransfer(getHost(), m);  //相当于用物理层链路开始传输
    if (retVal == RCV_OK) { // started transfer
        addToSendingConnections(con);   //add connection(s) that are currently used for sending
    } else if (deleteDelivered && retVal == DENIED_OLD && m.getTo() == con.getOtherNode(this.getHost())) {
        this.deleteMessage(m.getId(), false);  //final recipient has already received the msg -> delete it
    }

    return retVal;
}

2.2 Connection.startTransfer

ActiveRouter.startTransfer调用Connection.startTransferConnection.startTransfer()是抽象方法。Connection有两个子类:CBRConnection (A constant bit-rate connection)和VBRConnection (The transmission speed is updated every round)。以CBRConnection为例,其startTransfer源码如下:

public int startTransfer(DTNHost from, Message m) {
    //判断没有消息在该connection传输,no message being transferred
    assert this.msgOnFly == null : "Already transferring " + this.msgOnFly + " from " + this.msgFromNode + " to " + this.getOtherNode(this.msgFromNode) + ". Can't " +     "start transfer of " + m + " from " + from;

    this.msgFromNode = from;
    Message newMessage = m.replicate(); //复制消息
    int retVal = getOtherNode(from).receiveMessage(newMessage, from); //Start receiving a message from another host

    if (retVal == MessageRouter.RCV_OK) {
        this.msgOnFly = newMessage;
        this.transferDoneTime = SimClock.getTime() + (1.0*m.getSize()) / this.speed;
    }

    return retVal;
}

3. 开始接收消息

开始接收消息,把消息放到incomingMessages,而不是直接放到messages,因为传输需要时间(消息大小除以速度)。调用过程如下:DTNHost.receiveMessage –> ActiveRouter.receiveMessage –> MessageRouter.receiveMessage,其源代码如下:

//DTNHost.java Start receiving a message from another host
public int  (Message m, DTNHost from) {
    int retVal = this.router.receiveMessage(m, from);

    if (retVal == MessageRouter.RCV_OK) {
        m.addNodeOnPath(this);    // add this node on the messages path
    }

    return retVal;
}

//ActiveRouter.java重写了MessageRouter.java的receiveMessage,增加了判断
public int receiveMessage(Message m, DTNHost from) {
    int recvCheck = checkReceiving(m, from);  //router isn't transferring, doesn't have the message and has room for it
    if (recvCheck != RCV_OK) {
        return recvCheck;
    }

    return super.receiveMessage(m, from); //调用MessageRouter.java的receiveMessage
}

//MessageRouter.java  Try to start receiving a message from another host.
public int receiveMessage(Message m, DTNHost from) {
    Message newMessage = m.replicate();

    this.putToIncomingBuffer(newMessage, from);    //才开始接收,还没接收完,先放在incomingMessages
    newMessage.addNodeOnPath(this.host);

    for (MessageListener ml : this.mListeners) {
        ml.messageTransferStarted(newMessage, from, getHost());
    }

    return RCV_OK; // superclass always accepts messages
}

发表评论

电子邮件地址不会被公开。 必填项已用*标注

20 thoughts on “The ONE使用笔记:深入源码理解消息转发过程

  • 2017年06月07日 星期三 at 11:08上午
    Permalink

    您好,我想请问如果想要获取特定两个节点之间的connection应该怎么写代码啊?期待收到您的回复。

    Reply
  • 2016年07月05日 星期二 at 10:23上午
    Permalink

    你好,我最近一直研究机会网络,刚开始仿真阶段,通过对ONE仿真器的学习,我发现仿真器中路由阶段同一时刻只能有一个节点转发信息,是这样的吗?也就是仿真器是单线程的?

    Reply
    • 2016年07月05日 星期二 at 04:53下午
      Permalink

      的确,不过在每个updateInterval,都会遍历所有节点,所以相同的updateInterval,可以有多个节点同时转发消息。

      Reply
      • 2016年07月09日 星期六 at 11:22上午
        Permalink

        您好,请问一下,计算每个信息的效用值时会用到全局参数,这个是在仿真场景中的变量,信息可以怎么获取到这个变量,有没有什么方法?

      • 2016年07月10日 星期日 at 10:48下午
        Permalink

        Which global variable did you mean?

      • 2016年07月11日 星期一 at 09:11上午
        Permalink

        比如仿真场景中的节点数?

      • 2016年07月11日 星期一 at 04:15下午
        Permalink

        Try this,

        nrof_ hosts=SimScenario.getInstance().getHosts().size();

  • 2016年05月30日 星期一 at 09:57下午
    Permalink

    Spark如果我判断有哪些节点属于同一个节点的通讯范围内,one里有这样的方法可以调用吗?还是先要通过获取坐标计算距离之后再做判断呢?

    Reply
    • 2016年05月31日 星期二 at 09:49下午
      Permalink

      这样够了吗?
      for (Connection con : getConnections()) {
      DTNHost other = con.getOtherNode(getHost());
      }

      Reply
  • Pingback: The ONE使用笔记:每个updateInterval都做些什么(world.update) | | Spark & Shine

  • 2016年01月01日 星期五 at 03:44下午
    Permalink

    刚开始学习THE ONE,感谢大神分享。
    仔细看了一下源码,这里每次复制实际上Message的uniqueID都是不一样的(但是对于消息来说id和其他的属性是一致的)。是不是要理解为对于要传输的消息来说,只需要用id来区分,但是ONE有别的机制需要用uniqueID区分(比如链路中的和router中的消息)?所以要复制两次?

    Reply
    • 2016年01月04日 星期一 at 06:12下午
      Permalink

      我现在回过头来看,复制两次的原因是用于区分不同阶段(startTransfer, receive)的消息(通过指定不同的uniqueID)。

      Reply
      • 2016年01月04日 星期一 at 09:55下午
        Permalink

        谢谢博主~这两天又多看了一点,感觉ONE里面的实现机制和现实相比有点别扭。。。比如机会网络中的epidemic路由,在ONE中就是通过不停地update来实现泛洪的,而原论文中负责相遇节点感知的IMEP协议,还有hello、echo、sv交换和request这些全部都被拿掉了。如果想实现广播机制,ONE是否就不适用了?难道别人都没有这方面的需求和进一步研究吗?:(

      • 2016年01月04日 星期一 at 10:54下午
        Permalink

        是的,我最开始也很纠结仿真器与现实的差距,后面从仿真的角度去理解,也就不纠结了。

        其实很多人都有广播的需求,包括我自己,在这里可以查看邮件组关于broadcast的讨论。

        广播特性得自己实现。至于换不换仿真器,我个人看法是这样的,几乎没有一个仿真器可以满足你所有需求(比如ns2/3,是有广播机制,但不支持不含有位置节点的数据集),所以修改仿真器以适应自身仿真需求,可以视为你工作的一部分。

      • 2016年04月24日 星期日 at 03:55下午
        Permalink

        Spark您好!您把广播特性用代码实现过吗?实现的代码能发我一份参考一下吗?不胜感激。

      • 2016年04月26日 星期二 at 12:18上午
        Permalink

        抱歉,没有直接给源码的习惯。

      • 2016年04月26日 星期二 at 09:43上午
        Permalink

        我自己写了个,可是总是有错误,我又找不到,很是烦躁。

      • 2016年04月25日 星期一 at 03:28下午
        Permalink

        您好朋友,您将广播特性实现了吗在ONE里面?

  • Pingback: The ONE使用笔记:目录 | Spark & Shine

  • Pingback: The ONE使用笔记:深入源码理解消息接收过程 | Spark & Shine