项目要求

  1. 使用 Dijkstra 算法(dial 实现)计算任意两点间的最短路;
  2. 使用最短路由配置任意两点间的通信业务;
  3. 将所有业务在可视化平台上进行展示。

Dijkstra 算法原理

基本思想

Dijkstra 算法使用了广度优先搜索解决赋权有向图或者无向图的单源最短路径问题,采用的是一种贪心的策略,构造一个循环,每次循环都增加一个顶点到最短路径树上,所选择的顶点即为在所有与树邻接的顶点中和源点最近的顶点, 其中就隐含了广度优先搜索的思想,逐步地发展最短路径树,直到它覆盖所有顶点。其中,记录每一个顶点的前继节点可以实现最短路的重建,对距离的更新可以保证现有路径为当前最优解。

基本步骤

  1. 初始化:使用 d(i)来表示顶点 i 的距离标记,初始化为无穷大;使用 p(i)来表示顶点 i 在最短路径树上的前继点,初始化为 null;使用 S 表示最短路径树上的顶点集合,初始化时加入源点 s,同时将 s 点的距离标记初始化为 0, 前继点为 null
  2. 对 s 点做更新,更新操作为:对需要更新的点 i 的所有邻接点,如果 i 点到源点的距离和 i 点到邻接点的权重之和小于邻接点到源点的的距离,由贪心算法求局部最优解的要求,将该邻接点到源点的距离标记更新为 i 点到源点的距离和 i 点到邻接点的权重之和,此时邻接点的前继点为 i。
  3. 循环扩张最小路径树,直到所有顶点都在该树上,循环内的操作为: 找出已经永久标记的顶点集合和未被标记的顶点集合之间的权重最小割边,将该割边上未被探索的顶点加入已被标记的顶点集合,同时更新对该顶点做更新,更新操作如(2)。
  4. 以上方法为单源多宿最短路的求解,若要求解单源单宿最短路,可以增加一个判断模块,当宿点被永久标记时终止循环。永久标记条件:当一点的所 有入度的另一端都已经被永久标记时,该点即可被永久标记。

示例

示例拓扑图

上图是一个无向有权图,若源点为 a,最小路径树的生成过程为:

  1. a->b:0+1< ∞ ; a −> c: 0 + 6 < ∞. 所以 d[b] = 1, d[c] = 6,前继点为 a,进入 while 循环,选择最小割边,将 b 加入最小路径树,更新 b;
  2. 对 b 的邻接点做更新,1+2 < d[d] ,1+5 < d[e] ,所以d[d] = d[b] + w[b,d] = 3,d[e] = d[b] +w[b,e] = 6 ,将 d,e 的前继点设为 b,进入下一次循环,选择最小割边, 将 d 加入最小路径树,更新 d;
  3. 一直进行如上的循环,直到所有顶点都在最短路径树上。

算法实现

引进新的数据结构——桶

特殊之处

根据距离值的最大值留出足够数量的桶,桶的标记即为距离值,根据顶点的距离标记将顶点加入不同的桶中,执行 FindMin 操作时直接从 标记最小的桶读起,节约了比较的时间。同时,距离标记更新后要将顶点转移到 新的桶里面,

桶的数量

令 C 表示边权的最大值,则距离的最大值为 nC,同时还有 一个距离为无穷大的桶,所以总共需要 nC+1 个桶。

缺陷

nC 数量级的桶太消耗空间了,有大量的桶一直闲置。

改进:循环桶

由于边界点的距离标记不会超过 d[i]+C(i 为当前标记点,C 为权重最大值), 对顶点 x 的标号为 d[x]mod(C+1),最多需要 C+1 个桶。

对于同一个桶里面的实际上距离标记不相同的点不必做额外处理,因为第一次永久标记的节点距离标记肯定是最短的,该永久标记点的邻接点不可能在同一个桶中(边权重值最大为 C,桶的编号为距离标记对 C+1 取余后的值),除非两个邻接点和该点之间的边权值相同,即使如此,选其中的任意一个也没有问题。 考虑是否标记邻接点时只需要判断邻接点是否已经被永久标记,是则不再标记, 否则永久标记。

每永久标记一次,桶的头部都要往后一个桶移动,并且保证改变桶的头部之 前先把头部桶里面的所有节点清除出桶,然后永久标记。

实现方式比较

数组(队列)实现

此方法为最通用的算法实现方法,也是对读者来说最容易理解的一种实现方法。核心思想为用一个数据结构来维护所有的边界点,相当于所有边界边被分成不同的小集合,与同一个边界点相关联的那些边只记录一个最小值。相比与直接简单粗暴地每次都搜索所有边再筛选后进行更新距离的 Dijkstra 的最直接实现算法,此算法的运行时间从 O(nm) 优化到了 O(n*n+m)。

堆实现

此方法逻辑上和 1 类似,只是用堆来维护 V-X 集合。堆本身可以利用 Python 中的 heapq 模块较为简单地实现各种操作,如存储时就给元素进行排序,在调用弹出元素时会自行选择最大或最小的元素进行弹出。距离值更新后要将元素先从堆中删除,再插入。这减少了算法中的代码编写和算法的复杂度。此方法下总 RT 为 O(mlogn)。

桶实现

此方法逻辑上和 1 类似,只是用桶来维护 V-X 集合。对普通桶,令 C 表示边权的最大值,则桶的数目上限为 nC+1。此时 RT 为 O(m+nC)。循环桶则是减小了桶的用量,但是 RT 不变。因为每个桶被检查的数目依旧会增大。从多次实验的结果来看,此方法在 RT 上实际上优于方法2。

最终,我们选择使用循环桶来实现 Dijkstra 算法。

循环桶实现 dijkstra 算法

步骤

  1. 初始化桶,把源节点加入到桶中。
  2. 弹出最小距离标记节点。
  3. 将最小距离标记节点永久标记。
  4. 将永久标记节点的所有邻接节点入桶。
  5. 重复 bcd 操作,直到所有桶空,算法结束。

示例

上图为一个有向有权图。若源点为 1 号点,则最短路径树的生成过程为

  1. 首先初始化循环桶,由于最大边权重为 15,所以 C 为 15。将点 1 放入桶内, 因为此时将点 1 的距离设为了 0,因此放入桶头(初始化时设置桶头为第一个桶),如图 2.5 所示。

桶状态示意图

  1. 进入循环,弹出最小距离标记节点。此时先从桶头开始搜索,按顺序寻找第 一个不为空的桶。在此范例中顺序为顺时针。找到该桶后开始弹出此桶内的元素。 在此范例中弹出的元素是 1 号;
  2. 之后开始检索,首先判断 1 号是否已经被永久标记,判断结果为 1 号未被永 久标记,则此时将 1 号永久标记。标记的同时永久记录此时的记录值,不再更改。
  3. 标记后搜索与 1 号相连的节点,将相应节点按照此时对应的距离对 16 取余后 的结果加入桶中。在本例中,加入 2 号(入 1 桶),再加入 3 号(入 12 桶)。如图 2.6 所示;
  4. 此时一轮循环已经结束,开始进行下一轮循环。桶头开始按顺时针方向移动, 直到找到下一个非空桶。在本例中它会发现 1 桶。开始清空此时的 1 桶。如图 2.7 所示;
  5. 2 号点被弹出,判断后进行永久标记并开始搜索和 2 号点相连的点。只要点未被永久标记,就按照 d 中规则将它们按照对应的距离值入相应的桶。需要注意的是,此时桶内会有两个 3 号点,因为它的距离值被更新过。但由于优先被弹出并且标记的 3 号点对应的距离一定更小,所以这种加入方式等效于让 3 号点移动到了能更快被弹出的桶内。见图 2.8;
  6. 重复上述过程,直到整个桶内无空桶,理论上整个图就完成了遍历,按照永久标记时记录的距离值可生成最小生成树。

桶状态示意图

代码实现

dijkstra 算法的实现

(1)初始化,将距离标记置为 99999,前继节点为空,源主机的距离标记赋为 0 。

1
2
3
4
5
6
7
8
9
def dijkstra(self,src_dpid,dst_dpid,src_port,dst_port): 2. c = 29
adjacent = self.get_adjacent(self.links)
buckets = BucketSet(c+1)
node_s = (0,src_dpid)
d = {}
for u in self.switches:
d[u] = 99999
d[src_dpid] = 0
pre = {}

(2)用循环桶来维护未标记点的集合,其中循环桶是自定义的一个类,需在前面中引用:

1
from BucketSet import BucketSet

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
buckets.add_thing(node_s)
flag = 1
while not buckets.SetEmpty() and flag == 1:
min_list = buckets.pop_min() #将桶中距离标记最小的点加入最短路径列表,实际上就是将已标记集合和未标记集合之间的最短割边加入最短路径树,此时还未永久标记,即 find_min 操作
while not len(min_list) == 0:
min_node = min_list.pop() #取出该点,与目的主机的进行比对,如果已经探测到目的主机,则跳出循环
if(min_node[1] == dst_dpid):
flag = 0
break
for u in self.switches: #取该点的邻接点,进行距离标记的更新,如果该点的下一点的距离标记比该点的距离标记和两点之间权值之和大,则将该点设为下一点的前继节点,将该点的距离标记和两点之间权值之和设为新的距离标记,即 update 操作
if(u in adjacent[min_node[1]]):
for link in self.links:
if (link['src_dpid'] == min_node[1]) and (link['dst_dpid']== u): #如果要寻找的最短路之间仅有一条边,这条边的权重即为最短路的权重
delay = link['delay']
break
if(d[min_node[1]]+delay < d[u]): #update 操作,权值更新
pre[u] = min_node[1] #前继节点更新
d[u] = d[min_node[1]] + delay #距离标记更新
buckets.add_thing((d[u],u)) #将 u 点的邻接点加入桶中

(3)重建最短路径

1
2
3
4
5
6
7
8
9
getpath = []
s = dst_dpid
while s != src_dpid: #从目的主机到源主机,将最短路径上的点加入 getpath 列表
getpath.append(s)
s = pre[s]
getpath.append(src_dpid)
getpath.reverse()#由于上面的操作存储的最短路径是反向的,需要使用 reverse 函数将列表中的序列进行反向

self.minpath = getpath#得到最终的最短路

(4)提供 ryu 配置最短路所需的端口信息,使流表的下发能够一步到位

1
2
3
4
5
6
7
8
9
10
11
12
ryu_path = []
in_port = src_port#从源主机到与它相连的交换机的端口号
for s1,s2 in zip(getpath[:-1],getpath[1:]):#分别获得从源交换机到主机的路径上的各个出端口和各个入端口
for link in self.links:
if (link['src_dpid'] == s1) and (link['dst_dpid'] == s2):
out_port = link['src_port_no']
ryu_path.append((s1,in_port,out_port))
in_port = link['dst_port_no']
if s2 == dst_dpid:
ryu_path.append((s2,in_port,dst_port))
break
return ryu_path

(5)求最短路径上的时延

1
2
3
4
5
6
7
def sum_delay(self,path,links): 
delay = 0
for j in range(len(path)-1):
for link in links:
if (link['src_dpid'] == path[j]) and (link['dst_dpid'] == path[j+1]):
delay += link['delay']
return delay

(6)获取邻接点

1
2
3
4
5
6
7
8
9
def get_adjacent(self,links): 
adjacent = {}
for switch in self.switches:
adjacent.setdefault(switch,[]) #邻接点列表初始化
for switch in self.switches:
for link in links:
if link['src_dpid'] == switch:
adjacent[switch].append(link['dst_dpid']) #以交换机作为 key,以邻接交换机作为 value
return adjacent

桶类的实现

(1)首先创建循环桶类,并对一些基础变量进行初始化

1
2
3
4
class BucketSet(object): 
header_loc = 0 #初始化桶头指针
things_number = 0 #初始化循环桶中节点的数目

(2)定义用于初始化的函数

1
2
3
4
5
6
7
8
9
def __init__(self,buc_num): 
self.bucket_num = buc_num #初始化小桶数目
self.buckets = [None]*buc_num #初始化循环桶整体,它由指定数目的小桶组成
self.init_buckets(buc_num)
self.header = self.buckets[0] #初始化头桶

def init_buckets(self,bucket_num):# 利用小桶类的初始化函树进行初始化
for i in range(bucket_num):
self.buckets[i] = BucketSet.Bucket(i)

(3)定义用于使头桶指针后移的函数。因为每次对桶的元素取出都是针对头桶, 所以在头桶空后需要调用此函数实现对后续桶的搜索

1
2
3
def move_header(self):# 头桶指针后移一位
self.header_loc = ((self.header_loc + 1 )% self.bucket_num)# 因为是循环桶所以需要用取余操作保证尾部和头部的衔接
self.header = self.buckets[self.header_loc]

(4)定义用于计算入桶的节点应该去哪个桶的函数

1
2
def Length_cal(self,length): 
return (length % self.bucket_num) #因为是循环桶所以需要用取余保证所得数字的正确

(5)定义用于处理节点加入桶的函数,作为和外部的关键接口

1
2
3
4
5
def add_thing(self,thing): 
length = thing[0] #输入此函数的节点数据中,第一项为它到 src 的距离
id = self.Length_cal(length) #计算该节点该去哪个桶
self.buckets[id].add_thing(thing) #把节点加入对应的桶
self.things_number += 1 #节点数加一

(6)定义判断函数,用于判断整个循环桶是否全空,也就是确定整个 Dijkstra 算法何时结束

1
2
def SetEmpty(self): 
return self.things_number == 0

(7)定义用于弹出桶中需要被永久标记的最短距离点的函数

1
2
3
4
5
6
def pop_min(self): 
while self.header.is_empty(): #寻找非空桶
self.move_header()
min_lists = self.header.pop_out() #将头桶内节点弹出
self.things_number = self.things_number - len(min_lists) #节点数减少
return

(8)在循环桶类内部定义桶类,即用于组成循环桶的小桶。依旧从初始化函数开始定义

1
2
3
4
5
6
class Bucket(object): 
things_amount = 0 #计数,记录每个小桶内节点数目
def __init__(self,bucketId): #初始化小桶,利用 list(),每个桶有对应的编号,从 0 到 C
self.bucket = list()
self.id = bucketId

(9)定义往小桶内加入节点的函数

1
2
3
def add_thing(self,thing): 
self.bucket.append(thing) #加入节点,利用 list()性质
self.things_amount += 1 #计数加

(10)定义弹出小桶内节点数据的函数

1
2
3
4
5
def pop_out(self):
things = self.bucket.copy() #things 记录了桶内现有节点
self.bucket.clear() #清空桶
self.things_amount = 0 #计数清 0
return thing

(11)定义判断小桶是否为空的函数

1
2
def is_empty(self): 
return self.things_amount == 0

RYU通信

(1)初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
class Kruskaltest(app_manager.RyuApp): 
OFP_VERSIONS=[ofproto_v1_3.OFP_VERSION]#确定控制器的 openflow 版本

def __init__(self,*args,**kwargs):
super(Kruskaltest,self).__init__(*args,**kwargs)#继承自己,方便后面的初始化步骤
self.mac_to_port={}#arp 列表初始化
self.topology_api_app=self
self.sleep_interval = 0.5#程序暂停的时间为 0.5 秒,以使 mininet 和 ryu 的拓扑建立保持同步,防止链路丢失
self.switches = []#交换机列表初始化
self.links = []#链路信息初始化
self.mst_links = []#最小生成树上的链路初始化
self.pathes = {}#路径初始化
self.banport = {}#被禁止的端口初始化

(2)流表配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@set_ev_cls(ofp_event.EventOFPSwitchFeatures,CONFIG_DISPATCHER)
def switch_features_handler(self,ev):
datapath=ev.msg.datapath
ofproto=datapath.ofproto
ofp_parser=datapath.ofproto_parser
match=ofp_parser.OFPMatch()
actions=[ofp_parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,ofproto.OFPCML_NO_BUF
FER)]
self.add_flow(datapath,0,match,actions)#配置缺省流表,match 域为空,对每一个交换机都添加该流表,当交换机无法处理收到的数据包时,执行 actions,把数据包向控制器转发
def add_flow(self,datapath,priority,match,actions):#控制器向交换机发流表的方法
ofproto=datapath.ofproto
ofp_parser=datapath.ofproto_parser
inst=[ofp_parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,actions)]#收到后立即执行 actions
#以上内容为流表信息的获取
mod=ofp_parser.OFPFlowMod(datapath=datapath,priority=priority,match=match,instructions=inst)#流表内容定义,第一个为流表从哪一个交换机发送过来,第二个为权重,第三个为匹配域,当收到的数据包和匹配域匹配上时执行 instructions
datapath.send_msg(mod)#发送流表

(3)拓扑发现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@set_ev_cls(event.EventSwitchEnter,[CONFIG_DISPATCHER,MAIN_DISPATCHER])
def switch_enter_handler(self,ev):
print('==================================')
print('switch entered...')
self.switches,self.links = self.get_topology() #通过后面的 get_topology 方法来获取交换机和链路信息
self.banport = self.find_banport(self.links[:]) #找到被禁止的端口,因为洪泛时不能有圈,否则会陷入死循环
for link in self.mst_links:
print("{} {}".format(link['src_dpid'],link['dst_dpid']))#打印最小生成树
print('==================================')

time.sleep(self.sleep_interval) #ryu 每发现一个交换机之后都要暂停一段时间,让mininetd 建立拓扑的速度能跟上,否则会造成链路丢失

def get_topology(self):
switch_list = get_switch(self.topology_api_app,None)
switches = [switch.dp.id for switch in switch_list]
print('switches:{}'.format(switches)) #得到并打印交换机列表
for switch in switches:
self.banport.setdefault(switch,[]) #初始化被禁止的端口列表
print('------------------------------------')
print("banport first_list is{}".format(self.banport))
link_list = get_link(self.topology_api_app,None)
links = [{'src_dpid':link.src.dpid,'src_port_no':link.src.port_no, 12. 'dst_dpid':link.dst.dpid,'dst_port_no':link.dst.port_no}
for link in link_list]
print("the getted link_list is{}".format(links)) #得到并打印链路信息,其中包含了源交换机的 dpid 和对应的端口号以及目的交换机的 dpid 和对应的端口号

#file = open("./topo24.json","r")
file = open("./topo7.json","r") #由于原来的 24 个交换机拓扑太大,故用 7 个交换机来作为测试,读取 topo7.json 里的信息
topoinfo = json.load(file)
for link in links:
for edge in topoinfo['links']:
if(["s"+str(link["src_dpid"]),"s"+str(link["dst_dpid"])] == edge["vertexs"] or ["s"+str(link["dst_dpid"]),"s"+str(link["src_dpid"])] == edge["vertexs"]):
link['delay'] = edge['delay']
break

(4)ryu 配置最短路

1
2
3
4
5
6
7
8
9
10
11
12
def configure_path(self,path,src,dst,datapath): 
print("configure the shortest path")
path_print=src
for switch,in_port,out_port in path: #下发流表
ofp_parser=datapath.ofproto_parser
match=ofp_parser.OFPMatch(in_port=in_port,eth_src=src,eth_dst=dst)
actions=[ofp_parser.OFPActionOutput(out_port)]
self.add_flow(datapath,1,match,actions)
path_print+="-->{}-[{}]-{}".format(in_port,switch,out_port) #更新打印的最短路序列
delay = self.sum_delay(self.minpath,self.links)
path_print+="-->"+dst
print("the shortest path is:{}".format(path_print),'delay:',delay)

(5)对 packetin 的修改

1
2
path=self.dijkstra(src_switch,dst_switch,first_port,final_port) #调用dijkstra,获取配置的路径信息
self.configure_path(path,src,dst,datapath) #配置最短路

成果展示

拓扑的构建

(1)mininet 侧

拓扑构建

(2)RYU 侧

RYU侧拓扑发现

(3)二十四节点网络拓扑可视化

拓扑示意图

Dijkstra 算法实现

(1)mininet 侧:连通性良好

pingall情况

(2)时延现象(以 h1 ping h19 为例)

mininet 侧时延信息

(3)ryu 侧

RYU 侧最短路输出

(4)最短路可视化 (黄色路径为最短路,黑色路径为未经过路径)

最短路可视化