0x00 目录

0x01 前言

Floodlight控制器中StatisticsCollector模块已经实现了用Port-Stats消息测量OpenFlow交换机端口的吞吐量,本文通过对StatisticsCollector模块进行扩展,并用Mininet搭建拓扑,实现在SDN网络中测量中链路丢包率。

0x02 基于OpenFlow的Port-Stats消息测量链路丢包率原理

请参考基于OpenFlow消息的网络测量方法

0x03 链路丢包率测量模块设计

StatisticsCollector模块分析

如图1所示,src/main/java目录下,net.floodlightcontroller.statistics包中StatisticsCollector.java文件定义了StatisticsCollector模块。该模块每隔10s创建多个线程轮询拓扑中所有交换机以获取Port-Stats响应消息,通过Port-Stats响应消息中的rx_bytes、tx_bytes、duration_sec和duration_nsec四个字段来测量端口吞吐量。因此,可以在该模块获取Port-Stats响应消息基础上,添加代码以实现链路丢包率的测量。

图1 net.floodlightcontroller.statistics包

StatisticsCollector模块中的PortStatsCollector内部类实现了端口吞吐量测量,如图2所示,它通过调用getSwitchStatistics方法来获取所有交换机的统计信息,然后根据上一次查询的Port-Stats消息以及本次查询的Port-Stats消息,分别提取rx_bytes、tx_bytes、duration_sec和duration_nsec字段,通过公式(2)和公式(3)即可计算出交换机端口发送吞吐量或接收吞吐量。

交换机端口吞吐量计算公式

图2 StatisticsCollector模块中PortStatsCollector类

StatisticsCollector模块中的getSwitchStatistics方法有两个重载版本,分别是

protected List<OFStatsReply> getSwitchStatistics(DatapathId switchId, OFStatsType statsType)

Map<DatapathId, List<OFStatsReply>> getSwitchStatistics(Set<DatapathId> dpids, OFStatsType statsType)

其中前者的作用是获取单台交换机的统计消息(以Stats结尾的消息,如Port-Stats消息)。该方法的实现如图3所示,首先构造一个Stats请求消息根据交换机DPID发送给指定交换机,然后接收交换机返回的Stats响应消息,并保存在List列表中。

图3 Statistics模块中getSwitchStatistics方法获取单台交换机的统计信息

后者的作用是创建多个线程,每个线程通过调用getSwitchStatistics(DatapathId switchId, OFStatsType statsType)方法负责获取一台交换机的统计消息,从而实现并行获取网络中所有交换机的统计消息。该方法的实现如图4所示。

图4 Statistics模块中getSwitchStatistics方法并行获取所有交换机的统计信息

linkdiscovery包分析

如图5所示,src/main/java目录下,net.floodlightcontroller.linkdiscovery包中包含了链路的定义Link.java,可获取拓扑中所有链路的接口ILinkDiscoveryService.java。
图5  net.floodlightcontroller.linkdiscovery包

如图6所示,在Floodlight中,由起始交换机DPID和端口ID以及终点交换机DPID和端口ID定义一条链路,可见链路是指交换机之间的连接关系,不包含主机与交换机之间的连接关系,同时链路是有方向的。

图6  Link类定义

如图7所示,ILinkDiscoveryService接口中定义了许多获取链路、链路类型等方法,其中

 public Map<Link, LinkInfo> getLinks();

方法可用于获取拓扑中所有的链路。

图7  ILinkDiscoveryService接口定义

链路丢包率测量模块设计思路

链路丢包率测量模块将在StatisticsCollector模块的基础上实现,其设计思路:

a.通过getSwitchStatistics方法发起查询,获取所有交换机的Port-Stats消息,并根据交换机DPID和其Port-Stats消息的映射关系存储在replies哈希表中
b.通过ILinkDiscoveryService接口获取所有链路,存储在links集合中
c.循环遍历links,分别从previousReplies和replies中获取上一次查询以及本次查询的Port-Stats消息,并从中提取tx_packets、rx_packets字段根据公式(1)计算当前遍历的链路的丢包率,将链路和链路丢包率的映射关系存储在linkLossResults哈希表中
d.用replies更新previousReplies
e.根据linkLossResults打印链路丢包率

链路丢包率计算公式

0x04 链路丢包率测量模块实现

  1. 在Eclipse中,src/main/java目录下,net.floodlightcontroller.statistics包中新建一个LinkLoss类,用于存储某条链路在查询间隙接收、发送数据包个数,以及丢包率,该类的实现代码如下:

     public class LinkLoss {
         private    Link link;    // 链路 = 起点(srcDPID + srcPort) + 终点(dstDPID + dstPort)
         private U64 txPackets;    // 两次查询间隙,链路起点发送的数据包个数
         private U64 rxPackets;    // 两次查询间隙,链路终点接收的数据包个数
         private U64 linkLoss;    // 两次查询间隙,链路丢包率
    
         public LinkLoss() {
         super();
         }
         public LinkLoss(Link link, U64 txPackets, U64 rxPackets,U64 linkLoss) {
             super();
             this.link = link;
             this.txPackets = txPackets;
             this.rxPackets = rxPackets;
             this.linkLoss = linkLoss;
         }
         public Link getLink() {
             return link;
         }
         public void setLink(Link link) {
             this.link = link;
         }
         public U64 getRxPackets() {
             return rxPackets;
         }
         public void setRxPackets(U64 rxPackets) {
             this.rxPackets = rxPackets;
         }
         public U64 getTxPackets() {
             return txPackets;
         }
         public void setTxPackets(U64 txPackets) {
             this.txPackets = txPackets;
         }
         public U64 getLinkLoss() {
             return linkLoss;
         }
         public void setLinkLoss(U64 linkLoss) {
             this.linkLoss = linkLoss;
         }
         @Override
         public String toString() {
             return "(" + link.getSrc() + "/" + link.getSrcPort() + ") -> (" + link.getDst() + "/" + link.getDstPort() + ") TxPackets:" + txPackets.getValue() + " RxPackets:" + rxPackets.getValue() + " LinkLoss:" + linkLoss.getValue() + "%";        
         }
     }
  1. 打开net.floodlightcontroller.statistics包中StatisticsCollector.java文件,在StatisticsCollector类中添加下列属性:

     // 获取链路的接口
     private static ILinkDiscoveryService linkDiscoveryService;
    
     // 保存上一次查询的Port-Stats消息
     private static HashMap<Link, List<OFPortStatsEntry>> priorLinkPortStats = new HashMap<Link, List<OFPortStatsEntry>>(); 
    
     // 保存本次查询的Port-Stats消息
     private static HashMap<Link, List<OFPortStatsEntry>> linkPortStats = new HashMap<Link, List<OFPortStatsEntry>>();
    
     // 保存上一次查询与本次查询间隙的链路丢包率
     private static HashMap<Link, LinkLoss> linkLossResults = new HashMap<Link, LinkLoss>();
    
     //保存查询次数
     private static int lookupCounter = 0;
  1. 为了保证ILinkDiscoveryService接口能够正常使用,还需要添加接口的依赖并进行初始化,StatisticsCollector.java的相应方法中添加下列带有ADD注释的代码:

     @Override
     public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
         Collection<Class<? extends IFloodlightService>> l =
                 new ArrayList<Class<? extends IFloodlightService>>();
         l.add(IOFSwitchService.class);
         l.add(IThreadPoolService.class);
         l.add(IRestApiService.class);
         // ADD:添加依赖
         l.add(ILinkDiscoveryService.class);
         return l;
     }
    
     @Override
     public void init(FloodlightModuleContext context)
             throws FloodlightModuleException {
         switchService = context.getServiceImpl(IOFSwitchService.class);
         threadPoolService = context.getServiceImpl(IThreadPoolService.class);
         restApiService = context.getServiceImpl(IRestApiService.class);
         // ADD:初始化接口
         linkDiscoveryService = context.getServiceImpl(ILinkDiscoveryService.class);
    
         Map<String, String> config = context.getConfigParams(this);
         if (config.containsKey(ENABLED_STR)) {
             try {
                 isEnabled = Boolean.parseBoolean(config.get(ENABLED_STR).trim());
             } catch (Exception e) {
                 log.error("Could not parse '{}'. Using default of {}", ENABLED_STR, isEnabled);
             }
         }
         log.info("Statistics collection {}", isEnabled ? "enabled" : "disabled");
    
         if (config.containsKey(INTERVAL_PORT_STATS_STR)) {
             try {
                 portStatsInterval = Integer.parseInt(config.get(INTERVAL_PORT_STATS_STR).trim());
             } catch (Exception e) {
                 log.error("Could not parse '{}'. Using default of {}", INTERVAL_PORT_STATS_STR, portStatsInterval);
             }
         }
         log.info("Port statistics collection interval set to {}s", portStatsInterval);
     }
  1. StatisticsCollector.java文件中将原来用于测量端口吞吐量的内部类PortStatsCollector的实现逻辑替换成以下代码:

     Private class PortStatsCollector implements Runnable {
         @Override
         // 计算丢包率
         public void run() {
             // 发起查询,获取所有交换机的Port-Stats消息,存储在replies中
             Map<DatapathId, List<OFStatsReply>> replies = getSwitchStatistics(switchService.getAllSwitchDpids(), OFStatsType.PORT);
             log.info("Replies Num:" + replies.size());
             // 查询次数计数器加一
             ++lookupCounter;
             log.info("Lookup Counter:" + lookupCounter);
    
             // 获取所有链路
             Set<Link> links = linkDiscoveryService.getLinks().keySet();
             log.info("Total Links:" + links.size());
    
             // 遍历链路
             for(Link l : links) {
                 log.info(l.toString());
                 // 从replies中获取本次查询中,与当前链路起点有关的Port-Stats消息,存储到HashMap<Link, List<OFPortStatsEntry>> linkPortStats中
                 List<OFStatsReply> srcPortStatsReplies = replies.get(l.getSrc());
                 boolean breakFlag = false;
                 for(OFStatsReply r : srcPortStatsReplies) {
                     OFPortStatsReply psr = (OFPortStatsReply) r;
                     for (OFPortStatsEntry pse : psr.getEntries()) {
                         if(pse.getPortNo().equals(l.getSrcPort())) {
                             List<OFPortStatsEntry> tmpList = linkPortStats.get(l);
                             if(tmpList == null || tmpList.isEmpty()) {
                                 tmpList = new ArrayList<OFPortStatsEntry>();
                                 tmpList.add(pse);
                             }
                             else {
                                 tmpList.add(pse);
                             }
                             linkPortStats.put(l, tmpList);
                             breakFlag = true;
                             break;
                         }
                     }
                     if(breakFlag)
                         break;
                 }
    
                 // 从replies中获取本次查询中,与当前链路终点有关的Port-Stats消息,存储到HashMap<Link, List<OFPortStatsEntry>> linkPortStats中
                 List<OFStatsReply> dstPortStatsReplies = replies.get(l.getDst());
                 breakFlag = false;
                 for(OFStatsReply r : dstPortStatsReplies) {
                     OFPortStatsReply psr = (OFPortStatsReply) r;
                     for (OFPortStatsEntry pse : psr.getEntries()) {
                         if(pse.getPortNo().equals(l.getDstPort())) {
                             List<OFPortStatsEntry> tmpList = linkPortStats.get(l);
                             if(tmpList == null || tmpList.isEmpty()) {
                                 tmpList = new ArrayList<OFPortStatsEntry>();
                                 tmpList.add(pse);
                             }
                             else {
                                 tmpList.add(pse);
                             }
                             linkPortStats.put(l, tmpList);
                             breakFlag = true;
                             break;
                         }
                     }
                     if(breakFlag)
                         break;
                 }
    
                 // 若只有一次查询,则不计算丢包率
                 if(lookupCounter == 1) {
                     continue;
                 }
    
                 // 获取上一次查询和本次查询的该链路的Port-Stats消息,计算该链路丢包率
                 OFPortStatsEntry srcPriorPSE = priorLinkPortStats.get(l).get(0);
                 OFPortStatsEntry dstPriorPSE = priorLinkPortStats.get(l).get(1);
    
                 OFPortStatsEntry srcCurrentPSE = linkPortStats.get(l).get(0);
                 OFPortStatsEntry dstCurrentPSE = linkPortStats.get(l).get(1);
    
                 U64 linkLoss = U64.ZERO;    // 丢包率范围:[0,1],linkloss中存储百分比
                 if(srcPriorPSE != null && dstPriorPSE != null && srcCurrentPSE != null && dstCurrentPSE != null) {
                     U64 srcTxPackets = U64.ofRaw(srcCurrentPSE.getTxPackets().getValue() - srcPriorPSE.getTxPackets().getValue() );
                     U64 dstRxPackets = U64.ofRaw(dstCurrentPSE.getRxPackets().getValue() - dstPriorPSE.getRxPackets().getValue() );
    
                     if(srcTxPackets.equals(U64.ZERO)) // 上一次查询与本次查询的间隙内,该链路上无数据包发送
                         linkLoss = U64.ZERO;
                     else if(srcTxPackets.compareTo(dstRxPackets) < 0)
                         linkLoss = U64.ofRaw(100);
                     else if(!srcTxPackets.equals(U64.ZERO))
                         linkLoss = U64.ofRaw(100 - (dstRxPackets.getValue() * 100 / srcTxPackets.getValue())) ;
                     linkLossResults.put(l, new LinkLoss(l, srcTxPackets, dstRxPackets, linkLoss));
                 }//if
             }//for
    
             // 更新priorLinkPortStats,并清空本次查询结果linkPortStats
             priorLinkPortStats =  new HashMap<Link, List<OFPortStatsEntry>>(linkPortStats);
             linkPortStats.clear();
    
             // 打印所有链路丢包率
             if(!linkLossResults.isEmpty()) {
                 String result = "";
                 for(Link l : linkLossResults.keySet()) {
                     result += linkLossResults.get(l).toString() + "\n";
                 }
                 log.info(result);
             }
         }//function
     }//class
  1. 在Floodlight源码src/main/resources目录下的floodlightdefault.properties文件中,修改代码设置StatisticsCollector模块随着Floodlight控制器一起启动。

     net.floodlightcontroller.statistics.StatisticsCollector.enable=TRUE

0x05 Mininet搭建拓扑测量链路丢包率

假设拓扑如图8所示,使用带有链路丢包率测量模块的Floodlight控制器测量拓扑中所有交换机之间链路的丢包率。
图8 链路丢包率测试拓扑

  1. 新开终端编译并启动Floodlight控制器。

     cd floodlight
     ant
     java –jar target/floodlight.jar
  2. 新开终端启动Mininet,构造如图8所示的测试拓扑。

     sudo mn --controller=remote,ip=127.0.0.1 --switch ovsk,protocols=OpenFlow13 --mac --topo=linear,2
  3. 在Mininet交互模式下打开主机H1的终端。

     mininet > xterm h1
  4. 在主机H1终端上使用hping3发包工具以500pps(packet per second)的速率发送数据包给主机H2,Floodlight控制器终端显示测量的S1与S2之间链路丢包率结果如图9所示。

     hping3 –i u2000 10.0.0.2

图9 S1与S2之间链路丢包率测量结果

0x06 参考



SDN     

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!