项目要求

  1. 配置一个广播通信业务,假设主机0 向所有其它主机进行广播。
  2. 使用Kruskal 算法计算广播使用的最小生成树。
  3. 将广播业务在可视化平台上进行展示。

基本原理

Kruskal 算法

最小生成树概念

引入一个实际问题:要在n 个城市中建立一个通信网络,则连通这n 个城市需要布置n-1 一条通信线路,这个时候我们需要考虑如何在成本最低的情况下建立这个通信网?

将问题抽象出来,即n 个城市就是图上的n 个顶点,然后,边表示两个城市的通信线路,每条边上的权重就是我们搭建这条线路所需要的成本,所以现在我们有n 个顶点的连通网可以建立不同的生成树,每一颗生成树都可以作为一个通信网,当我们构造这个连通网所花的成本最小时,搭建该连通网的生成树,就称为最小生成树。

具体概念:在一给定的无向图G = (V, E) 中,(u,v)(u,v) 代表连接顶点u 与顶点v 的边,而 ω(u,v)\omega (u,v)代表此边的权重,若存在 t 为 E 的子集且为无循环图,使得ω(t)=(u,v)tω(u,v)\omega (t) = \sum_{(u,v)\in t}^{} \omega (u,v)ω(t)\omega (t) 最小,则此T 为G 的最小生成树(最小权重生成树)。

基本思想

将所有边按照权值的大小进行升序排序,然后从小到大一一判断,条件为:如果这个边不会与之前选择的所有边组成回路,就可以作为最小生成树的一部分;反之,舍去。直到具有n 个顶点的连通网筛选出来n-1 条边为止。筛选出来的边和所有的顶点构成此连通网的最小生成树。

关键点

Kruskal 算法基本分为排序和成环检测两大环节,其中排序算法发展多年,已经将其复杂度压榨到了很小,尤其对于非线性时间比较类排序,其时间复杂度更是不能突破nlogn。

因此,kruskal 算法的关键点在于成环检测算法的优化。

最直接的实现方式是将图用邻接链表存储,调用广度优先算法(Breadth First Search,BFS),如果遍历到向相同的点,则说明有环,否则说明无环。引入新的数据结构——UNION-FIND,主要操作是UNION(CiC_i,CjC_j):将集合CiC_iCjC_j合并为一个集合,并用FIND(x)返回元素x 所在集合的名字,记为leader。若一条边的两顶点的leader 不同,说明该边是一条割边,即不会成环。

拓扑可视化

可视化思路

利用mininet 的net 操作打印出所读取的拓扑的网络节点关系信息,并且修改mininet 文件中的相关代码使得net 操作打印出的信息能直接进入用于存储信息的txt 文件中。之后利用python 中的networkx 库读取txt 文件内容并且利用Matplotlib 的绘图功能即可画出相应的拓扑图。

NetworkX 原理

NetworkX 是一款python 的软件包,它内置了常用的图和网络分析算法。但是在本项目中我们只使用它的创建简单图(有向图或无向图)的功能,再导入Matplotlib 的绘图接口绘出所需拓扑图的图像。

代码实现

拓扑建立

主体代码

1
2
3
4
5
6
7
8
9
10
11
with open("topo24.json",'r') as load_f:
topoinfo=json.load(load_f)

for h in range(1, topoinfo["host_no"]+1):
self.addHost("h"+str(h))

for s in range(1, topoinfo["switch_no"]+1):
self.addSwitch("s"+str(s))

for i in range(0,17):
self.addLink(topoinfo["links"][i]["vertexs"][0],topoinfo["links"][i]["vertexs"][1],delay=str(topoinfo["links"][i]["delay"])+'ms',bw=int(topoinfo["links"][i]["bw"]))

读取topo24.json 文件里的信息,存入topoinfo 中。通过addHost 读取并存储主机信息,通过addSwitch 读取并存储交换机信息,通过addLink 读取交换机和主机之间以及交换机之间的连接信息和时延带宽信息。

库函数说明

1
2
3
4
5
6
7
8
from mininet.topo import Topo #该类里面包含了addHost、addSwitch 和addLink 方法
from mininet.net import Mininet #该类提供了实现控制台pingall 等命令的方法、将搭建好的拓扑连接到Ryu 控制器的方法以及给拓扑中的主机和交换机分配MAC 地址以及IP 地址的方法
from mininet.node import CPULimitedHost #给控制器和交换机之间的接口设置IP 地址,并且对虚拟主机的CPU 划定带宽
from mininet.link import TCLink #引用TClink 类,设置虚拟链路的带宽和时延
from mininet.util import dumpNodeConnections #引用dumpNodeConnections 方法,断开和某一个节点的连接
from mininet.log import setLogLevel #引用setLogLevel,输出mininet 运行的状态
import os
import json

其中sys 和os 都是python 的系统操作模块,sys 提供了一组非常实用的服务,os 模块负责程序和操作系统的交互,提供了访问操作系统底层的接口。json 提供了读取json 文件的方法。

RYU通信

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Kruskaltest(app_manager.RyuApp):

OFP_VERSIONS=[ofproto_v1_3.OFP_VERSION] #确定控制器的openflow 版本

def __init__(self,*args,**kwargs):
super(Kruskaltest,self).__init__(*args,**kwargs) #继承自己,方便后面的初始化步骤
self.mac_to_port={} #arp 列表初始化
self.topology_api_app=self
self.sleep_interval = 0.5 #程序暂停的时间为0.5 秒,以使mininet 和ryu 的拓扑建立保持同步,防止链路丢失
self.switches = [] #交换机列表初始化
self.links = [] #链路信息初始化
self.mst_links = [] #最小生成树上的链路初始化
self.pathes = {} #路径初始化
self.banport = {} #被禁止的端口初始化

流表配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@set_ev_cls(ofp_event.EventOFPSwitchFeatures,CONFIG_DISPATCHER)
def switch_features_handler(self,ev):
datapath=ev.msg.datapath
ofproto=datapath.ofproto
ofp_parser=datapath.ofproto_parser

match=ofp_parser.OFPMatch()
actions=[ofp_parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,ofproto.OFPCML_NO_BUFFER)]
self.add_flow(datapath,0,match,actions) #配置缺省流表,match 域为空,对每一个交换机都添加该流表,当交换机无法处理收到的数据包时,执行actions,把数据包向控制器转发
def add_flow(self,datapath,priority,match,actions):#控制器向交换机发流表的方法
ofproto=datapath.ofproto
ofp_parser=datapath.ofproto_parser

inst=[ofp_parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,actions)]#收到后立即执行actions
#以上内容为流表信息的获取

mod=ofp_parser.OFPFlowMod(datapath=datapath,priority=priority,match=match,instructions=inst)#流表内容定义,第一个为流表从哪一个交换机发送过来,第二个为权重,第三个为匹配域,当收到的数据包和匹配域匹配上时执行instructions
datapath.send_msg(mod)#发送流表

拓扑发现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@set_ev_cls(event.EventSwitchEnter,[CONFIG_DISPATCHER,MAIN_DISPATCHER])
def switch_enter_handler(self,ev):
print('==================================')
print('switch entered...')
self.switches,self.links = self.get_topology() #通过后面的get_topology 方法来获取交换机和链路信息
#self.mst_links = self.kruskal(self.links[:])
self.banport = self.find_banport(self.links[:]) #找到被禁止的端口,因为洪泛时不能有圈,否则会陷入死循环
for link in self.mst_links:
print("{} {}".format(link['src_dpid'],link['dst_dpid'])) #打印最小生成树
print('==================================')

time.sleep(self.sleep_interval) #ryu 每发现一个交换机之后都要暂停一段时间,让mininetd 建立拓扑的速度能跟上,否则会造成链路丢失

def get_topology(self):
switch_list = get_switch(self.topology_api_app,None)
switches = [switch.dp.id for switch in switch_list]
print('switches:{}'.format(switches))#得到并打印交换机列表
for switch in switches:
self.banport.setdefault(switch,[])#初始化被禁止的端口列表
print('------------------------------------')
print("banport first_list is{}".format(self.banport))
link_list = get_link(self.topology_api_app,None)

links = [{'src_dpid':link.src.dpid,'src_port_no':link.src.port_no,'dst_dpid':link.dst.dpid,'dst_port_no':link.dst.port_no}
for link in link_list]
print("the getted link_list is{}".format(links)) #得到并打印链路信息,其中包含了源交换机的dpid 和对应的端口号以及目的交换机的dpid 和对应的端口号

#add 'delay' in link
#file = open("./topo24.json","r")
file = open("./topo7.json","r")#由于原来的24 个交换机拓扑太大,故用7 个交换机来作为测试,读取topo7.json 里的信息
topoinfo = json.load(file)
for link in links:
for edge in topoinfo['links']:
if(["s"+str(link["src_dpid"]),"s"+str(link["dst_dpid"])] == edge["vertexs"] or ["s"+str(link["dst_dpid"]),"s"+str(link["src_dpid"])] == edge["vertexs"]):
link['delay'] = edge['delay']
break

控制器对收到的数据包的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER) #监视器,当有packetin 消息进来时,
执行下面的代码
def packet_in_handler(self,ev):
print("==========================================================")
print('it has entered packet_in_handler func')
msg=ev.msg

datapath=msg.datapaths
ofproto=datapath.ofproto
ofp_parser=datapath.ofproto_parser

dpid=datapath.id #id from switch
#以上是对收到的数据包进行解析

self.mac_to_port.setdefault(dpid,{}) # 对控制器里面的mac_to_port 列表初始化
pkt=packet.Packet(msg.data) #读取需要传输的数据
eth=pkt.get_protocol(ethernet.ethernet) #读取以太网地址的数据包,里面包含了src 和dst 以及ethertype 等信息
in_port=msg.match['in_port'] #读取匹配域中的进入交换机的端口号
print("in_port is ",in_port)
src = eth.src
self.mac_to_port[dpid][src]=in_port #将源交换机的MAC 地址和源交换机进入该交换机的端口号对应起来
dst=eth.dst #目的主机的MAC 地址
print('the src is ', src)
print('the dst is ', dst)
print('the dpid is ', dpid)

print("self.mac_to_port:", self.mac_to_port)
pathes={} #路径初始化

#如果目的主机的MAC地址已经在控制器的mac_to_port列表里面有记录,直接下发流表
if dst in self.mac_to_port[dpid]:
print('dst mac is known')
out_port=self.mac_to_port[dpid][dst]
actions=[ofp_parser.OFPActionOutput(out_port)]
match=ofp_parser.OFPMatch(in_port=in_port,eth_dst=dst,eth_src=src)
#以上为对流表内容的定义

self.add_flow(datapath,1,match,actions) #权值为1,表示优先级先于缺省流表

print("add flow")

self.pathes.setdefault(src,{})
self.pathes[src].setdefault(dst,[]) #对从这个主机开始的路径做初始化

self.pathes[src][dst].append(dpid) #往路径上添加交换机

print("path appending:{}".format(self.pathes[src][dst])) #打印路径上的交换机
else: #如果控制器的mac_to_port 列表里面没有目的主机的mac 地址,执行以下代码
print('dst mac is unknown')
print('the dpid is ',dpid)
actions=[] #actions 初始化
for port in datapath.ports: #对于向控制器发送packetin 消息的交换机中的每一个端口,执行下面的代码
if(port not in self.banport[dpid]) and (port != in_port): #如果端口不是禁止端口并且不是数据包进入的端口,将端口加入执行洪泛操作的端口列表里面
actions.append(ofp_parser.OFPActionOutput(port))
out=ofp_parser.OFPPacketOut(datapath=datapath,buffer_id=msg.buffer_id,in_port=in_port,actions=actions,data=msg.data) #向出端口发送的数据包内容
datapath.send_msg(out)#发送数据包

库函数说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from ryu.base import app_manager #ryu.base 是ryu 的管理中心,起到加载各种ryu 应用、为ryu应用提供环境和提供不同ryu 应用之间的连接的作用,从ryu.base 里面引用的app_manager 类里面包含管理所有ryu 程序的方法,包括启动ryu、加载ryu 应用和关闭ryu 等
from ryu.topology.api import get_link,get_switch
from ryu.base import app_manager
from ryu.ofproto import ofproto_v1_3 #引入ofproto_v1_3 协议,该协议是本ryu 控制器遵守的协议
from ryu.controller.handler import set_ev_cls
from ryu.controller.handler import CONFIG_DISPATCHER,MAIN_DISPATCHER #引入了一个事件监视器,下面的语句引入了两种状态config 为配置状态,main 为运行状态
from ryu.lib.packet import packet #将收到的数据包里信息转换成特定格式
from ryu.lib.packet import ethernet #引入ethernet 类,将数据包的MAC 地址组织成特定格式
from ryu.topology import event #event 里面定义了后续使用的EventSwitchEnter 等类
from ryu.topology.api import get_switch,get_link #用于实现交换机和控制器的连接和通过LLDP协议读取交换机的拓扑
#import networkx as nx
from ryu.controller import ofp_event #ofp_event 类包含了之后需要使用的msg 和msg.datapath方法,前一个用于读取信息,后一个用于读取数据包的来自哪一个交换机
import time
import json
import string

kruskal 算法运用

概述

在基于SDN 的实际网络拓扑中,每个交换机和所属主机可以看成一个顶点,各交换机相连接的信道看成一条边,其权重则为每条信道的时延。对于交换机,还有一个重要的属性——端口,即交换机连接其他网络设备的接口。为了洪泛简便,我们采用逆向kruskal 算法的原理,即找出最小生成树不在的端口,记为banport。只要数据包在banport 停止传输,只在允许通行的端口传输,那么一定不会成环,必然是在最小生成树上。

变量说明

  • src_dpid:源交换机id

  • dst_dpid:目的交换机id

  • Links:边集,特点:双向;包含源交换机id 和端口,目的交换机id 和端口,权重

代码实现

1
links.sort(key = lambda x:x['delay']) #升序排序

对于排序,我们使用python 自带的对列表的排序,将关键字设为delay,即按时延为权重升序排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#成环检测(UNION-FIND 思想)
group = [[i] for i in range(1,len(self.switches)+1)] #创建包含交换机数量子集合的集合

for link in links:
if link['src_dpid']<link['dst_dpid']: #遍历每一条边,且每一条边只遍历一次
for i in range(len(group)):
if link["dst_dpid"] in group[i]:
m = i
if link["src_dpid"] in group[i]:
n = i #将源交换机和目的交换机所在的集合序号记录在m,n 中

#如果m,n 相等,即该边不是一条割边,则将其端口禁掉
if m == n and link['dst_port_no'] not in banport[link["dst_dpid"]]:
banport[link["dst_dpid"]].append(link['dst_port_no'])
banport[link["src_dpid"]].append(link['src_port_no'])
#如果不相等,则将两边端点并入一个集合
else:
group[m] = group[m] + group[n]
group[n] = []

对于成环检测的实现,我们采用UNION-FIND 的思想,每个集合的leader 即为集合的序号,如果源交换机和目的交换机的leader 的相同,说明二者在同一个集合,即会成环,则将二者连接的两个端口禁掉;若二者的leader 不同,说明二者不会成环,则更新集合,将二者集合合并。由于边集是升序排列,因此遍历时也从小到大遍历,当最终所有交换机都在同一集合中时,说明最小生成树已经形成。

成果展示

拓扑构建

(一)mininet侧

mininet侧拓扑信息

(二)RYU侧

RYU侧拓扑信息

(三)拓扑可视化

拓扑示意图

kruskal 算法实现

(一)禁止端口

禁止端口信息

(二)最小生成树可视化

最小生成树示意图

连通性测试

(一)随机ping测试

mininet侧,可以看到花费了0.499毫秒ping通

mininet侧打印信息

RYU侧,可以看到逐渐从2号主机沿着最小生成树扩展到达19号

RYU侧打印信息

(二)pingall测试

下图可见全部ping通,连通性良好。

全部ping通图