前言

在利用强化学习进行自动驾驶开发时,虽然目前已经有了CARLA、CARSIM、TORCS等一系列开发环境,但针对本硕等一些电脑配置不高的学生党来说,一个可编辑性高、上手难度不大、不吃配置的开发环境,用来进行算法验证是非常必要的。

环境的官方连接如下:

https://highway-env.readthedocs.io/en/latest/

优点

1、对电脑配置要求不高;

2、具有一定的车辆动力学模型;

3、可以按自己的需求绘制道路形状、改变车辆及道路的相关参数。

贡献

虽然官方文档给出了如何调用baseline库里的成熟模型的方法,但初学人员比较倾向于自己搭建调试网络。基于这一需求,结合目前主流的深度学习框架pytorch,利用DDPG算法给出了对车辆进行横、纵向控制的代码,相关代码仅搭建出可以跑通的结构,并未对网络参数进行详细调整,具体调参可根据自己的需要进行,这里仅给出跑通的框架。

代码

import math
import random
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal
import matplotlib.pyplot as plt
import pprint
import highway_envuse_cuda = torch.cuda.is_available()
print(use_cuda)
device   = torch.device("cuda" if use_cuda else "cpu")class ValueNetwork(nn.Module):def __init__(self, num_inputs, num_actions, hidden_size ,init_w = 3e-3):super(ValueNetwork, self).__init__()self.linear1 = nn.Linear(num_inputs + num_actions, hidden_size)self.linear2 = nn.Linear(hidden_size, hidden_size)self.linear3 = nn.Linear(hidden_size, 1)self.linear3.weight.data.uniform_(-init_w,init_w)self.linear3.bias.data.uniform_(-init_w,init_w)def forward(self, state, action):x = torch.cat([state, action], 1)x = F.relu(self.linear1(x))x = F.relu(self.linear2(x))x = self.linear3(x)return xclass PolicyNetwork(nn.Module):def __init__(self, num_inputs, num_actions, hidden_size, init_w = 3e-3):super(PolicyNetwork, self).__init__()self.linear1 = nn.Linear(num_inputs, hidden_size)self.linear2 = nn.Linear(hidden_size, hidden_size)self.linear3 = nn.Linear(hidden_size, num_actions)# uniform_将tensor用从均匀分布中抽样得到的值填充。参数初始化self.linear3.weight.data.uniform_(-init_w, init_w)#也用用normal_(0, 0.1) 来初始化的,高斯分布中抽样填充,这两种都是比较有效的初始化方式self.linear3.bias.data.uniform_(-init_w, init_w)#其意义在于我们尽可能保持 每个神经元的输入和输出的方差一致。#使用 RELU(without BN) 激活函数时,最好选用 He 初始化方法,将参数初始化为服从高斯分布或者均匀分布的较小随机数#使用 BN 时,减少了网络对参数初始值尺度的依赖,此时使用较小的标准差(eg:0.01)进行初始化即可#但是注意DRL中不建议使用BNdef forward(self, x):x = F.relu(self.linear1(x))x = F.relu(self.linear2(x))x = F.tanh(self.linear3(x))return xdef get_action(self, state):state = torch.FloatTensor(state).unsqueeze(0).to(device)action = self.forward(state)return action.detach().cpu().numpy()[0]class OUNoise(object):def __init__(self, action_space, mu=0.0, theta = 0.15, max_sigma = 0.3, min_sigma = 0.3, decay_period = 10000):#decay_period要根据迭代次数合理设置self.mu = muself.theta = thetaself.sigma = max_sigmaself.max_sigma = max_sigmaself.min_sigma = min_sigmaself.decay_period = decay_periodself.action_dim = action_space.shape[0]self.low = action_space.lowself.high = action_space.highself.reset()def reset(self):self.state = np.ones(self.action_dim) *self.mudef evolve_state(self):x = self.statedx = self.theta* (self.mu - x) + self.sigma * np.random.randn(self.action_dim)self.state = x + dxreturn self.statedef get_action(self, action, t=0):ou_state = self.evolve_state()self.sigma = self.max_sigma - (self.max_sigma - self.min_sigma) * min(1.0, t / self.decay_period)return np.clip(action + ou_state, self.low, self.high)class ReplayBuffer:def __init__(self, capacity):self.capacity = capacityself.buffer = []self.position = 0def push(self, state, action, reward, next_state, done):if len(self.buffer) < self.capacity:self.buffer.append(None)self.buffer[self.position] = (state, action, reward, next_state, done)self.position = (self.position + 1) % self.capacitydef sample(self, batch_size):batch = random.sample(self.buffer, batch_size)state, action, reward, next_state, done = map(np.stack, zip(*batch))return state, action, reward, next_state, donedef __len__(self):return len(self.buffer)class NormalizedActions(gym.ActionWrapper):def action(self, action):low_bound = self.action_space.lowupper_bound = self.action_space.highaction = low_bound + (action + 1.0) * 0.5 * (upper_bound - low_bound)#将经过tanh输出的值重新映射回环境的真实值内action = np.clip(action, low_bound, upper_bound)return actiondef reverse_action(self, action):low_bound = self.action_space.lowupper_bound = self.action_space.high#因为激活函数使用的是tanh,这里将环境输出的动作正则化到(-1,1)action = 2 * (action - low_bound) / (upper_bound - low_bound) - 1action = np.clip(action, low_bound, upper_bound)return actionclass DDPG(object):def __init__(self, action_dim, state_dim, hidden_dim):super(DDPG,self).__init__()self.action_dim, self.state_dim, self.hidden_dim = action_dim, state_dim, hidden_dimself.batch_size = 28self.gamma = 0.99self.min_value = -np.infself.max_value = np.infself.soft_tau = 2e-2self.replay_buffer_size = 8000self.value_lr = 5e-3self.policy_lr = 5e-4self.value_net = ValueNetwork(state_dim, action_dim, hidden_dim).to(device)self.policy_net = PolicyNetwork(state_dim, action_dim, hidden_dim).to(device)self.target_value_net = ValueNetwork(state_dim, action_dim, hidden_dim).to(device)self.target_policy_net = PolicyNetwork(state_dim, action_dim, hidden_dim).to(device)for target_param, param in zip(self.target_value_net.parameters(), self.value_net.parameters()):target_param.data.copy_(param.data)for target_param, param in zip(self.target_policy_net.parameters(), self.policy_net.parameters()):target_param.data.copy_(param.data)self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=self.value_lr)self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=self.policy_lr)self.value_criterion = nn.MSELoss()self.replay_buffer = ReplayBuffer(self.replay_buffer_size)def ddpg_update(self):state, action, reward, next_state, done = self.replay_buffer.sample(self.batch_size)state = torch.FloatTensor(state).to(device)next_state = torch.FloatTensor(next_state).to(device)action = torch.FloatTensor(action).to(device)reward = torch.FloatTensor(reward).unsqueeze(1).to(device)done = torch.FloatTensor(np.float32(done)).unsqueeze(1).to(device)policy_loss = self.value_net(state, self.policy_net(state))policy_loss = -policy_loss.mean()next_action = self.target_policy_net(next_state)target_value = self.target_value_net(next_state, next_action.detach())expected_value = reward + (1.0 - done) * self.gamma * target_valueexpected_value = torch.clamp(expected_value, self.min_value, self.max_value)value = self.value_net(state, action)value_loss = self.value_criterion(value, expected_value.detach())self.policy_optimizer.zero_grad()policy_loss.backward()self.policy_optimizer.step()self.value_optimizer.zero_grad()value_loss.backward()self.value_optimizer.step()for target_param, param in zip(self.target_value_net.parameters(), self.value_net.parameters()):target_param.data.copy_(target_param.data * (1.0 - self.soft_tau) + param.data * self.soft_tau)for target_param, param in zip(self.target_policy_net.parameters(), self.policy_net.parameters()):target_param.data.copy_(target_param.data * (1.0 - self.soft_tau) + param.data * self.soft_tau)env = gym.make("fei-v0")#自定义的环境,与自带的racetrack环境相同,目前在学习如何自定义自己的环境
env.configure(
{"observation": {"type": "OccupancyGrid","features": ['presence','on_road', "vx", "vy"],# "features_range": {# "x": [-100, 100],# "y": [-100, 100],# "vx": [-20, 20],# "vy": [-20, 20]},"grid_size": [[-6, 6], [-9, 9]],"grid_step": [3, 3],#每个网格的大小"as_image": False,"align_to_vehicle_axes": True},"action": {"type": "ContinuousAction","longitudinal": True,"lateral": True},"simulation_frequency": 15,"policy_frequency": 5,"duration": 500,"collision_reward": -10,"lane_centering_cost": 6,"action_reward": -0.3,"controlled_vehicles": 1,"other_vehicles": 5,"screen_width": 600,"screen_height": 600,"centering_position": [0.5, 0.5],"scaling": 7,"show_trajectories": False,"render_agent": True,"offscreen_rendering": False
})env.reset()
env = NormalizedActions(env)ou_noise = OUNoise(env.action_space)state_dim = env.observation_space.shape[2]*env.observation_space.shape[1]*env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
print("状态维度"+str(state_dim))
print("动作维度"+str(action_dim))
# print(env.action_space)
hidden_dim = 256ddpg = DDPG(action_dim, state_dim, hidden_dim)max_steps = 250
rewards = []
batch_size = 32
VAR = 1  # control explorationfor step in range(max_steps):print("================第{}回合======================================".format(step+1))state = env.reset()state = torch.flatten(torch.tensor(state))ou_noise.reset()episode_reward = 0done = Falsest=0while not done:action = ddpg.policy_net.get_action(state)# print(action)action[0] = np.clip(np.random.normal(action[0],VAR),-1,1) # 在动作选择上添加随机噪声action[1] = np.clip(np.random.normal(action[1],VAR),-1,1) # 在动作选择上添加随机噪声# action = ou_noise.get_action(action, st)next_state, reward, done, _ = env.step(action)#奖励函数的更改需要自行打开安装的库在本地的位置进行修改next_state = torch.flatten(torch.tensor(next_state))if reward == 0.0:#车辆出界,回合结束reward = -10done = Trueddpg.replay_buffer.push(state, action, reward, next_state, done)if len(ddpg.replay_buffer) > batch_size:VAR *= .9995    # decay the action randomnessddpg.ddpg_update()state = next_stateepisode_reward += rewardenv.render()st=st+1rewards.append(episode_reward)print("回合奖励为:{}".format(episode_reward))
env.close()plt.plot(rewards)
plt.savefig('C:/Users/Administrator/Desktop/episode_reward.jpg')

后记

肯定会有我没意识到的错误,欢迎大家批评指正,大家共同进步!

基于highway-env的DDPG-pytorch自动驾驶实现相关推荐

  1. 基于RGB和LiDAR融合的自动驾驶3D语义分割

    基于RGB和LiDAR融合的自动驾驶3D语义分割 论文 RGB and LiDAR fusion based 3D Semantic Segmentationfor Autonomous Drivin ...

  2. 基于车路协同的高等级自动驾驶数据交互内容

    基于车路协同的高等级自动驾驶数据交互内容 基于车路协同的高等级自动驾驶系统组成 1. 系统架构 2. 系统功能 2.1 RSS各组成单元功能 2.2 VSS各组成单元功能 3. 系统交互 基于车路协同 ...

  3. 基于OpenCV和深度学习的自动驾驶遥控小车

    阅读原文 项目中使用的技术 Python + OpenCV Neural Network + Haar-Cascade Classifiers 项目目标 改装后遥控小车完成三个任务:自动驾驶,识别信号 ...

  4. 基于模型参考自适应控制(MRAC)的自动驾驶方向盘(油门)控制方法

    实习还是能学到很多学校学不到的东西,总结下实习期间学到的一个自适应控制方法.最近比较忙,先大致写下原理的笔记供自己复习,后面有空再更下仿真.如有错误请不吝赐教~ 背景 举个例子,目前公司自动驾驶车队主 ...

  5. 基于骁龙820A系统的自动驾驶发展趋势

    目前,随着无线技术的加速创新和发展,全球汽车行业正经历着前所未有的技术创新加速.数据显示,全球已有超过2000万辆汽车搭载了骁龙调制解调器. 高通提供了神经网络处理引擎(SNPE).这款深度学习开发套 ...

  6. 基于神经网络预测车道行驶的自动驾驶

    数据定义先假定 首先定义我们在自动驾驶,并且可以获取前后左右各个车的数据.传感器和摄像头已经在车身上部署好.  假定我们在车道上行驶,根据跟驰模型数据我们能知道的是: 1 左车距离 2 右车距离 3 ...

  7. 环宇智行基于NVIDIA TX2的L4级自动驾驶方案

    参考: http://www.in-driving.com/product/showproduct.php?lang=cn&id=54#ad-image-0 http://zhidx.com/ ...

  8. 基于激光雷达+惯导+轮速计的自动驾驶融合定位方案

    高精度定位是当前自动驾驶的基础.随着自动驾驶的日益发展,依靠单一传感器进行定位常常会受到各种限制,比如GPS信号不稳定或者消失.长直走廊或隧道等场景下激光点云发生漂移.轮胎打滑等造成瞬时轮速计异常等. ...

  9. 观点PK | 自动驾驶传感器“一哥之争”,这事儿你怎么看?

    自动驾驶现已成为人工智能技术应用落地的热门领域,但随着无人车迈出的步伐越大,面临的安全性方面的挑战也越大.而近期不断发生的自动驾驶车辆事故也将自动驾驶安全性的问题再次推向风口浪尖.自动驾驶车辆的安全性 ...

  10. 自动驾驶,别再谈「接管」色变了

    编者按:过去两年里,国内 Robotaxi 的公开运营多点开花,这些 Robotaxi 所搭载的自动驾驶系统也都具备了处理常见场景的能力,比如识别红绿灯.避让行人.变道超车等--在这种背景下,零接管似 ...

最新文章

  1. SQLServer------插入数据时出现IDENTITY_INSERT错误
  2. 西里尔字 - 俄罗斯
  3. 设计模式之建造者模式(01)
  4. python 函数的调用的时候参数的传递_python定义函数时的参数调用函数时的传参...
  5. 近期低分纯生信友好的期刊简介
  6. Jquery跳出each循环
  7. 兼容IE678的placeholder
  8. videowriter最小的编码格式_cv2.VideoWriter() 指定写入视频帧编码格式
  9. javaScript命名规范
  10. 机器学习之神经网络学习及其模型
  11. android实现欢迎启动界面
  12. redhat6.2 下gcc安装
  13. JavaWeb框架之Struts2 ---- 系列学习
  14. cad自动填写页码lisp_CAD图纸页码的自动生成-农夫也玩CAD
  15. 用计算机谈我还是从前那个少年,我还是从前那个少年是什么歌-我还是从前那个少年抖音歌曲介绍...
  16. C#学习笔记8 事件
  17. 编译一个java源程序文件,会产生多少个字节码文件
  18. 网络架构及其演变过程
  19. 利用MATLAB仿真实现交通红绿灯识别的目的
  20. pytorch中批量归一化BatchNorm1d和BatchNorm2d函数

热门文章

  1. mac 重新安装系统 填坑之路
  2. 学计算机用苹果本,如何快速学会使用苹果电脑?
  3. 在Mac环境下的NEO4J的下载与配置
  4. Node.js 应用全链路追踪技术——全链路信息存储
  5. 如何用idea打jar包
  6. 17-分析Ajax请求--爬取易烊千玺微博【以及简单的数据分析】
  7. x210 uboot编译
  8. 如何在 C# 中以编程方式将 IGS/IGES 文件转换为 PDF?
  9. 【程序员读论文】推荐一款OCR软件,识别PDF论文上的文字
  10. 进销存源码下载 mysql_java 进销存管理源码(含mysql脚本)