交通运输系统模型与方法——FW法配流
- Wardrop第一平衡——用户均衡:当网络达到平衡状态时, 每个 OD 对的各条被使用的径路具有相等而且最小的行驶时间; 没有被使用的径路的行驶时间大于或等于最小行驶时间。
- 以Beckman模型表达平衡的概念与定义
- Frank-Wolfe算法:在每次迭代中, 将目标函数 f(x)线性化, 通过解线性规划来求得可行下降方向, 进而沿此方向在可行域内作一维搜索以得到新的迭代点。
苏福尔斯网络不被视为现实网络。 然而, 该网络在许多出版物中都使用。 它有利于代码调试。
1.SiouxFalls_net:主要包含计算阻抗信息的相关量
2.SiouxFalls_node:主要为整个网络各个节点位置信息
3.SiouxFalls_trips:主要包含整个网络各个节点 OD 信息
4.Sioux-Falls-Network: 主要为网络图
5.阻抗函数:link travel time=freeflowtime
(1+B*(flow/capacity)^power)*
> https://github.com/bstabler/TransportationNetworks
# coding=utf-8
import math
import re
import torch
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
import os
from datetime import datetime
import copy
import pylab as pl
matplotlib.rc('font', family='MicroSoft YaHei', weight='bold')
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
np.set_printoptions(suppress=True)
# 阻抗函数:link travel time=freeflowtime*(1+B*(flow/capacity)^power)
# xn1 = (1-a)*xn + a*yn
infinity = float("inf")
file_path_net = 'SiouxFalls_net_1.txt'
file_path_trips = 'SiouxFalls_trips_1.txt'
class Section():
# 定义路段类
def __init__(self, list, flow):
self.init_node = list[0]
self.term_node = list[1]
self.capacity = float(list[2])
self.length = int(list[3])
self.freeflowtime = int(list[4])
self.B = float(list[5])
self.power = int(list[6])
self.flow = flow
f_c_p = math.pow(self.flow / self.capacity, self.power)
self.travel_time = round(self.freeflowtime * (1 + self.B * f_c_p), 3)
def update_flow_time(self, flow_new): # 流量更新方法
self.flow = flow_new
f_c_p = math.pow(self.flow / self.capacity, self.power)
self.travel_time = round(self.freeflowtime * (1 + self.B * f_c_p), 3)
def build_section(filePath):
# 建立有路长、有时间花费的原始图,利用路网获取、更新、保存每个路段。
section_list = []
with open(filePath, 'r', encoding='utf-8') as f:
for line in f:
line_list = re.split(r'\t', line.strip(' ; \t \n'))
section_i = Section(line_list, 0)
section_list.append(section_i)
# print(len(section_list))
for sec in section_list:
# print(sec.__dict__)
pass
return section_list
def build_pointpair(filePath):
# 建立有OD点对之间流量需求的原始图,利用点对获取每个路段、路径的流量。
time_list = []
sim_list = []
dou_list = []
num = 0
with open(filePath, 'r', encoding='utf-8') as f:
for line in f:
line_list = re.findall(r':( .*?);', line)
for i in line_list:
t = i.strip(' ')
time_list.append(int(float(t)))
for time in time_list:
sim_list.append(time)
num += 1
while num >= 24:
dou_list.append(sim_list)
num = 0
sim_list = []
# print(dou_list)
return dou_list
def update_section(section_list, flow_list):
# 原地更新流量!!!已修改
# 根据新的输入流量,更新成该状态下的路网(即路段列表)
new_section_list = []
for i in range(len(section_list)):
flow_tmp = section_list[i].flow
section_list[i].update_flow_time(flow_list[i])
sec_tmp = copy.deepcopy(section_list) # 临时路段
new_section_list.append(sec_tmp[i])
section_list[i].update_flow_time(flow_tmp)
for sec in new_section_list:
# print(sec.__dict__)
pass
return new_section_list
def get_flow(section_list):
# 路网各路段流量
flow_list = []
for sec in section_list:
flow_list.append(sec.flow)
# print(flow_list)
return flow_list
def get_time(section_list):
# 路网各路段阻抗时间
time_list = []
for sec in section_list:
time_list.append(sec.travel_time)
# print(time_list)
return time_list
def cal_cost(section_list):
# 计算花费
cost = 0
for sec in section_list:
cost += sec.flow * sec.travel_time
# print(cost)
return cost
def search_best_way(section_list, o, d):
# 在上一迭代流量的阻抗下,Floyd算法寻找最短路
way_list = [o]
dis = []
for hang in range(24):
hang_jl_list = []
for gezi in range(24):
if hang == gezi:
hang_jl_list.append(0)
continue
for sec in section_list:
if hang == int(sec.init_node) - 1 and gezi == int(sec.term_node) - 1:
hang_jl_list.append(sec.travel_time)
if len(hang_jl_list) < gezi + 1:
hang_jl_list.append(infinity)
dis.append(hang_jl_list)
# print(dis)
prior = [[0 for i in range(1, 25)] for j in range(1, 25)] # 初始化一个空矩阵用来存放所经过的节点
for i in range(24):
for j in range(24):
prior[i][j] = j + 1
for m in range(24): # m相当于是中间点
for i in range(24): # i相当于是起始点
for j in range(24): # j相当于是终点
if (dis[i][j] > dis[i][m] + dis[m][j]):
dis[i][j] = dis[i][m] + dis[m][j]
prior[i][j] = prior[i][m]
else:
continue
# print(prior)
if prior[o - 1][d - 1] == d:
way_list.append(d)
else:
x = prior[o - 1][d - 1]
mid = [x] # 创建一个空列表装载路径中间点
while (prior[x - 1][d - 1] != d):
x = prior[x - 1][d - 1]
mid.append(x)
way_list.extend(mid)
way_list.append(d)
# print(dis)
# print(way_list, '==>总路程:', dis[o - 1][d - 1])
return way_list
def product_flow_list(way_list, section_list, flow):
# 根据最短路全有全无,分配路径流量至各路段
sec_tmp = copy.deepcopy(section_list)
for sec in sec_tmp:
change = 0
for i in range(len(way_list) - 1):
if sec.init_node == str(way_list[i]) and sec.term_node == str(way_list[i + 1]):
sec.update_flow_time(flow)
change = 1
if change == 0:
sec.update_flow_time(0)
# print(sec.__dict__)
new_flow = get_flow(sec_tmp)
# print(new_flow)
return new_flow
def graph_new_flow(section_list):
#将全部OD点对的流量分配至某一路网
flow_new = flow_init
for o in range(1, 25):
for d in range(1, 25):
way_list = search_best_way(section_list, o, d) # 根据上次迭代的阻抗,获得最短路径
flow_new = np.array(flow_new) + np.array(
product_flow_list(way_list, section_list, flow_given[o - 1][d - 1])) # 获得新的流量分配
flow_new.tolist()
# print(flow_new)
return flow_new
def find_best_alpha(flow_1, flow_2):
c = infinity
for alpha in np.arange(0.01, 1, 0.01):
flow_y_x = (np.array(flow_2) - np.array(flow_1)).tolist()
flow_a_y_x = [alpha * i for i in flow_y_x]
flow_n1 = (np.array(flow_1) + np.array(flow_a_y_x)).tolist()
section_list_1 = update_section(section_list_0, flow_n1) # 获得新的路段对象
time_list = get_time(section_list_1) # 获得新的路段阻抗
all = zip(flow_y_x, time_list)
cost_n1 = 0
for i, j in all:
cost_n1 += i * j
if abs(cost_n1) < c:
c = abs(cost_n1)
a = alpha
print(a)
# print(c)
return a
# ---------------------主程序----------------------------
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
flow_given = build_pointpair(file_path_trips)
flow_init = [0 for i in range(76)] # 获得初始流量分配
# print(flow_init)
section_list_0 = build_section(file_path_net) # 获得路段对象
get_flow(section_list_0)
get_time(section_list_0)
cal_cost(section_list_0)
flow_1 = graph_new_flow(section_list_0)
print(flow_1)
n = 1
cost_list = []
for i in range(10):
print('第%d次迭代' % (i + 1))
section_list_1 = update_section(section_list_0, flow_1) # 获得新的路段对象
time_1 = get_time(section_list_1) # 获得新的路段阻抗
cost_1 = cal_cost(section_list_1)
# print(flow_1) # flow_1为xn
# print(time_1) # time_1为t(xn)
# print(cost_1)
# 搜索可行方向
flow_2 = graph_new_flow(section_list_1)
way_list = search_best_way(section_list_1, 2, 8)
# print(flow_2) # flow_2为yn
alpha = find_best_alpha(flow_1, flow_2) # 寻找迭代步长
# alpha = 0.618 #0.618法,可进行尝试
x = [alpha * i for i in flow_1]
y = [alpha * i for i in flow_2]
flow_n1 = (np.array(flow_1) + np.array(y) - np.array(x)).tolist()
print(flow_n1) # flow_n1为xn+1
section_list_2 = update_section(section_list_1, flow_n1) # 获得新的路段对象
cost_n1 = cal_cost(section_list_2)
for sec in section_list_1:
# print(sec.__dict__)
pass
# print(get_time(section_list_2))
print(cost_n1)
cost_list.append(cost_n1)
# 迭代,令n=n+1
section_list_0 = update_section(section_list_2, flow_n1)
flow_1 = flow_n1
# print(get_flow(section_list_0))
print('################################################################')
print(cost_list)
aaa = [i for i in range(len(cost_list))]
plt.plot(aaa, cost_list)
ax = pl.gca()
ax.get_yaxis().get_major_formatter().set_scientific(False)
plt.xlabel('迭代次数')
plt.ylabel('路网总花费')
plt.title('平衡Frank-Wolfe算法30步')
plt.show()
str = '\n'
f = open("baojy1.txt", "w")
f.write(float.join(flow_n1))
f.write(float.join(cost_n1))
f.close()
版权声明:本文为BaoJY_0915原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。