最近,做到一个实验,需要每隔一段时间,就改变数据的传输路径,如下图所示,主机之间的数据传输需要经过s1和s2两个交换机,交换机之间的链路有两条,实验是在这两条链路之间每隔10秒切换一次,也就是相当于ECMP的轮询算法(当然,若想以流为单位切换链路,比如说:上一个流经链路1下一条流经链路2,这种方式的轮询利用集线器的程序改变一下就可以实现)。
- 轮询,即各个流在多条路径之间轮询传输。

拓扑的端口对应关系:

接下来,重点说明程序如何写,不再赘述一些关于拓扑构建和轮询的一些概念。
一、程序
SDN架构下的网络数据传输均是通过流表进行的,也就是说,要想在两条链路之间进行切换,就需要对流表进行改变,这里的思想:
- 获取SDN交换机对象和SDN交换机的dpid,也就是s1和s2;
- 向s1,s2添加流表项,规则为走上面一条链路;
- 设定的时间间隔后,删除s1,s2的流表项,重新添加流表项,规则为走下面一条链路;
- 循环执行2,3过程。
首先,创建RRSwitchChange类,进行初始化,并初始化交换机字典,用来存储SDN交换机对象和id。
class RRSwitchChange(app_manager.RyuApp): def __init__(self, *args, kwargs): super(RRSwitchChange, self).__init__(*args, kwargs) # 用于存储交换机对象 self.datapaths = {}
接下来,向类中添加增加和删除流表项的方法。
# 删除流表项的方法,方法直接清除交换机的所有流表项 def del_flow(self, datapath, match): ofp = datapath.ofproto ofp_parser = datapath.ofproto_parser req = ofp_parser.OFPFlowMod(datapath=datapath, command=ofp.OFPFC_DELETE, out_port=ofp.OFPP_ANY, out_group=ofp.OFPG_ANY, match=match) datapath.send_msg(req) # 添加流表项的方法 def add_flow(self, datapath, priority, match, actions): ofp = datapath.ofproto ofp_parser = datapath.ofproto_parser command = ofp.OFPFC_ADD inst = [ofp_parser.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, actions)] req = ofp_parser.OFPFlowMod(datapath=datapath, command=command, priority=priority, match=match, instructions=inst) datapath.send_msg(req)
同时,由于删除和添加流表项的多项操作均相同,也就是如果单纯的调用添加和删除流表项的方法会显得程序冗余杂乱,所以,这里本实验添加了一个方法match_flow(),用于简化添加和删除流表项的重复调用的冗余操作。
def match_flow(self, dpid, in_port, out_port, priority, add_del): ofp_parser = self.datapaths[dpid].ofproto_parser ofp = self.datapaths[dpid].ofproto actions = [ofp_parser.OFPActionOutput(out_port)] # add_del变量用来判断是该执行删除还是添加操作,如果是1则执行添加,如果为0执行删除 if add_del == 1: match = ofp_parser.OFPMatch(in_port=in_port) self.add_flow(datapath=self.datapaths[dpid], priority=priority, match=match, actions=actions) if add_del == 0: match = ofp_parser.OFPMatch() self.del_flow(datapath=self.datapaths[dpid], match=match)
接下来,是整个程序的关键部分,即处理流表的删除与添加的函数。
def rr_link_change(self): # flag用于标记每一段时间间隔的流表项下发规则 # 例如flag=1表示走上面一条链路,flag=2表示走下面一条链路 flag = 1 # 一致循环的执行流表项的清除添加工作 while True: print('datapath:', self.datapaths) try: if flag == 1: print('flag:', flag) flag = 2 # 添加流表项之前先删除交换机s1,s2中存在的流表项 self.match_flow(dpid=1, in_port=1, out_port=1, priority=1, add_del=0) self.match_flow(dpid=2, in_port=1, out_port=1, priority=1, add_del=0) # 向交换机s1中添加流表项 self.match_flow(dpid=1, in_port=1, out_port=2, priority=1, add_del=1) self.match_flow(dpid=1, in_port=2, out_port=1, priority=1, add_del=1) # 向交换机s2中添加流表项 self.match_flow(dpid=2, in_port=1, out_port=2, priority=1, add_del=1) self.match_flow(dpid=2, in_port=2, out_port=1, priority=1, add_del=1) elif flag == 2: print('flag:', flag) flag = 1 self.match_flow(dpid=1, in_port=1, out_port=1, priority=1, add_del=0) self.match_flow(dpid=2, in_port=1, out_port=1, priority=1, add_del=0) self.match_flow(dpid=1, in_port=1, out_port=3, priority=1, add_del=1) self.match_flow(dpid=1, in_port=3, out_port=1, priority=1, add_del=1) self.match_flow(dpid=2, in_port=1, out_port=3, priority=1, add_del=1) self.match_flow(dpid=2, in_port=3, out_port=1, priority=1, add_del=1) except Exception as info: print('info:', info) hub.sleep(10) # 间隔10秒切换一次
最后,需要将方法 rr_link_change()进行调用,同时由于上述步骤用到了交换机的对象,所以需要获取交换机,将其写入字典中,这里使用交换机状态变化的函数,获取完成后在初始化中调用 rr_link_change()进行流表项的下发,为了持续的执行方法,而且不对程序的其他部分产生影响,这里利用协程进行方法的执行,代码如下。
@set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER]) def _state_change_handler(self, ev): datapath = ev.datapath if ev.state == MAIN_DISPATCHER: if datapath.id not in self.datapaths: self.datapaths[datapath.id] = datapath elif ev.state == DEAD_DISPATCHER: if datapath.id in self.datapaths: del self.datapaths[datapath.id]
def __init__(self, *args, kwargs): super(RRSwitchChange, self).__init__(*args, kwargs) self.datapaths = {} self.link_change = hub.spawn(self.rr_link_change)
至此,程序编写完成,接下来,在Ubuntu终端命令行执行程序。
二、实验
实验拓扑如下:

在程序执行前,查看s1和s2的流表项,流表项为空。接下来,输入命令执行程序,如下图所示。

然后,查看s1和s2的流表项,可以间隔时间查一次,就可以发现流表项每隔10秒更改一次,如下所示。

此时,h1 ping h2,就可以正常的进行通信了。

3、代码附录
from ryu.controller import ofp_event from ryu.lib import hub from ryu.base import app_manager from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER from ryu.controller.handler import set_ev_cls class RRSwitchChange(app_manager.RyuApp): def __init__(self, *args, kwargs): super(RRSwitchChange, self).__init__(*args, kwargs) self.datapaths = {} self.link_change = hub.spawn(self.rr_link_change) @set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER]) def _state_change_handler(self, ev): datapath = ev.datapath if ev.state == MAIN_DISPATCHER: if datapath.id not in self.datapaths: self.datapaths[datapath.id] = datapath elif ev.state == DEAD_DISPATCHER: if datapath.id in self.datapaths: del self.datapaths[datapath.id] def rr_link_change(self): flag = 1 while True: print('datapath:', self.datapaths) try: if flag == 1: print('flag:', flag) flag = 2 self.match_flow(dpid=1, in_port=1, out_port=1, priority=1, add_del=0) self.match_flow(dpid=2, in_port=1, out_port=1, priority=1, add_del=0) self.match_flow(dpid=1, in_port=1, out_port=2, priority=1, add_del=1) self.match_flow(dpid=1, in_port=2, out_port=1, priority=1, add_del=1) self.match_flow(dpid=2, in_port=1, out_port=2, priority=1, add_del=1) self.match_flow(dpid=2, in_port=2, out_port=1, priority=1, add_del=1) elif flag == 2: print('flag:', flag) flag = 1 self.match_flow(dpid=1, in_port=1, out_port=1, priority=1, add_del=0) self.match_flow(dpid=2, in_port=1, out_port=1, priority=1, add_del=0) self.match_flow(dpid=1, in_port=1, out_port=3, priority=1, add_del=1) self.match_flow(dpid=1, in_port=3, out_port=1, priority=1, add_del=1) self.match_flow(dpid=2, in_port=1, out_port=3, priority=1, add_del=1) self.match_flow(dpid=2, in_port=3, out_port=1, priority=1, add_del=1) except Exception as info: print('info:', info) hub.sleep(10) def match_flow(self, dpid, in_port, out_port, priority, add_del): ofp_parser = self.datapaths[dpid].ofproto_parser ofp = self.datapaths[dpid].ofproto actions = [ofp_parser.OFPActionOutput(out_port)] if add_del == 1: match = ofp_parser.OFPMatch(in_port=in_port) self.add_flow(datapath=self.datapaths[dpid], priority=priority, match=match, actions=actions) if add_del == 0: match = ofp_parser.OFPMatch() self.del_flow(datapath=self.datapaths[dpid], match=match) def del_flow(self, datapath, match): ofp = datapath.ofproto ofp_parser = datapath.ofproto_parser req = ofp_parser.OFPFlowMod(datapath=datapath, command=ofp.OFPFC_DELETE, out_port=ofp.OFPP_ANY, out_group=ofp.OFPG_ANY, match=match) datapath.send_msg(req) def add_flow(self, datapath, priority, match, actions): ofp = datapath.ofproto ofp_parser = datapath.ofproto_parser command = ofp.OFPFC_ADD inst = [ofp_parser.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, actions)] req = ofp_parser.OFPFlowMod(datapath=datapath, command=command, priority=priority, match=match, instructions=inst) datapath.send_msg(req)
github地址:https://github.com/Yang-Jianlin/ryu/blob/master/ryu/app/rr_switch_link_yjl.py
如有问题,请各位留言或者私信指出,谢谢啦。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/214020.html原文链接:https://javaforall.net
