深度强化学习笔记——DDPG原理及实现(pytorch)

深度强化学习笔记——DDPG原理及实现(pytorch)概要 DDPG 算法原理 DeepDetermin DDPG 算法实现

概要

DDPG算法原理(Deep Deterministic Policy Gradient)

DDPG算法是基于DPG算法所提出的,属于无模型中的actor-critic方法中的off-policy算法(因为动作不是直接在交互的过程中更新的),之后学者又在此基础上提出了适合于多智能体环境的MADDPG (Multi Agent DDPG)算法。

可以说DDPG是在DQN算法的基础之上进行改进的,DQN存在的问题就在于它只能解决含有离散和低维度的动作空间的问题。而一般的物理问题或控制问题中,动作空间是连续的,比如控制一个倒立摆问题(gym里对应的是”Pendulum-v0″)。学者在借鉴DQN算法之后提出了DDPG算法,该算法的元素有以下几个:

  1. actor和critic这两个部分分别由训练的网络和目标网络构成,相当于是总共含有4个网络。
  2. 与DQN中一样,DDPG中也引入了experience buffer的机制,用于存储agent与环境交互的数据( ( s t , a t , r t , s t + 1 ) (s_{t},a_{t},r_{t}, s_{t+1}) (st,at,rt,st+1)
  3. 与DQN目标网络之间延迟复制现有网络不同的是,DDPG中采用soft update, 也就是缓慢地更新两个目标网络中的参数
  4. 在神经网络中加入batch normalization的技巧(以上几点说明了算法模型如何学习的过程,除此之外,在强化学习中,还必须要包括智能体如何进行探索的方法)
  5. DDPG算法采用向动作网络的输出中添加随机噪声的方式实现exploration。

下图是DDPG的伪代码示意:

在这里插入图片描述

在实际编程的过程中也是用的该损失函数,而不是之间将伪代码中的该函数的梯度进行实现。

而critic网络的参数更新方式是与DQN算法一样的,就是通过最小化目标网络与现有网络之间的均方误差来更新现有网络的参数,只不过唯一不同的地方在于目标网络的参数在DDPG算法中是缓慢更新的,而不是在DQN中每隔N步将现有网络的参数直接复制过来。

DDPG算法实现

本节对DDPG的算法进行实现,简化了一下此repo中的DDPG代码。此代码是用gym环境中的倒立摆环境,让agent学会如何让摆静止在倒挂的状态。该程序中我已经加入了必要的注释,目前训练结果不是很好,还需要继续调参和调试。

import torch import torch.nn as nn import torch.nn.functional as F import numpy as np import gym import time  hyper parameters  EPISODES = 200 EP_STEPS = 200 LR_ACTOR = 0.001 LR_CRITIC = 0.002 GAMMA = 0.9 TAU = 0.01 MEMORY_CAPACITY = 10000 BATCH_SIZE = 32 RENDER = False ENV_NAME = 'Pendulum-v0'  DDPG Framework # class ActorNet(nn.Module): # define the network structure for actor and critic def __init__(self, s_dim, a_dim): super(ActorNet, self).__init__() self.fc1 = nn.Linear(s_dim, 30) self.fc1.weight.data.normal_(0, 0.1) # initialization of FC1 self.out = nn.Linear(30, a_dim) self.out.weight.data.normal_(0, 0.1) # initilizaiton of OUT def forward(self, x): x = self.fc1(x) x = F.relu(x) x = self.out(x) x = torch.tanh(x) actions = x * 2 # for the game "Pendulum-v0", action range is [-2, 2] return actions class CriticNet(nn.Module): def __init__(self, s_dim, a_dim): super(CriticNet, self).__init__() self.fcs = nn.Linear(s_dim, 30) self.fcs.weight.data.normal_(0, 0.1) self.fca = nn.Linear(a_dim, 30) self.fca.weight.data.normal_(0, 0.1) self.out = nn.Linear(30, 1) self.out.weight.data.normal_(0, 0.1) def forward(self, s, a): x = self.fcs(s) y = self.fca(a) actions_value = self.out(F.relu(x+y)) return actions_value class DDPG(object): def __init__(self, a_dim, s_dim, a_bound): self.a_dim, self.s_dim, self.a_bound = a_dim, s_dim, a_bound self.memory = np.zeros((MEMORY_CAPACITY, s_dim * 2 + a_dim + 1), dtype=np.float32) self.pointer = 0 # serves as updating the memory data  # Create the 4 network objects self.actor_eval = ActorNet(s_dim, a_dim) self.actor_target = ActorNet(s_dim, a_dim) self.critic_eval = CriticNet(s_dim, a_dim) self.critic_target = CriticNet(s_dim, a_dim) # create 2 optimizers for actor and critic self.actor_optimizer = torch.optim.Adam(self.actor_eval.parameters(), lr=LR_ACTOR) self.critic_optimizer = torch.optim.Adam(self.critic_eval.parameters(), lr=LR_CRITIC) # Define the loss function for critic network update self.loss_func = nn.MSELoss() def store_transition(self, s, a, r, s_): # how to store the episodic data to buffer transition = np.hstack((s, a, [r], s_)) index = self.pointer % MEMORY_CAPACITY # replace the old data with new data  self.memory[index, :] = transition self.pointer += 1 def choose_action(self, s): # print(s) s = torch.unsqueeze(torch.FloatTensor(s), 0) return self.actor_eval(s)[0].detach() def learn(self): # softly update the target networks for x in self.actor_target.state_dict().keys(): eval('self.actor_target.' + x + '.data.mul_((1-TAU))') eval('self.actor_target.' + x + '.data.add_(TAU*self.actor_eval.' + x + '.data)') for x in self.critic_target.state_dict().keys(): eval('self.critic_target.' + x + '.data.mul_((1-TAU))') eval('self.critic_target.' + x + '.data.add_(TAU*self.critic_eval.' + x + '.data)') # sample from buffer a mini-batch data indices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE) batch_trans = self.memory[indices, :] # extract data from mini-batch of transitions including s, a, r, s_ batch_s = torch.FloatTensor(batch_trans[:, :self.s_dim]) batch_a = torch.FloatTensor(batch_trans[:, self.s_dim:self.s_dim + self.a_dim]) batch_r = torch.FloatTensor(batch_trans[:, -self.s_dim - 1: -self.s_dim]) batch_s_ = torch.FloatTensor(batch_trans[:, -self.s_dim:]) # make action and evaluate its action values a = self.actor_eval(batch_s) q = self.critic_eval(batch_s, a) actor_loss = -torch.mean(q) # optimize the loss of actor network self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step() # compute the target Q value using the information of next state a_target = self.actor_target(batch_s_) q_tmp = self.critic_target(batch_s_, a_target) q_target = batch_r + GAMMA * q_tmp # compute the current q value and the loss q_eval = self.critic_eval(batch_s, batch_a) td_error = self.loss_func(q_target, q_eval) # optimize the loss of critic network self.critic_optimizer.zero_grad() td_error.backward() self.critic_optimizer.step() # Training  # Define the env in gym env = gym.make(ENV_NAME) env = env.unwrapped env.seed(1) s_dim = env.observation_space.shape[0] a_dim = env.action_space.shape[0] a_bound = env.action_space.high a_low_bound = env.action_space.low ddpg = DDPG(a_dim, s_dim, a_bound) var = 3 # the controller of exploration which will decay during training process t1 = time.time() for i in range(EPISODES): s = env.reset() ep_r = 0 for j in range(EP_STEPS): if RENDER: env.render() # add explorative noise to action a = ddpg.choose_action(s) a = np.clip(np.random.normal(a, var), a_low_bound, a_bound) s_, r, done, info = env.step(a) ddpg.store_transition(s, a, r / 10, s_) # store the transition to memory if ddpg.pointer > MEMORY_CAPACITY: var *= 0.9995 # decay the exploration controller factor ddpg.learn() s = s_ ep_r += r if j == EP_STEPS - 1: print('Episode: ', i, ' Reward: %i' % (ep_r), 'Explore: %.2f' % var) if ep_r > -300 : RENDER = True break print('Running time: ', time.time() - t1) 

在这里插入图片描述

以下内容请跳过,这里只是记录一下训练结果不好的代码

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim # from tensorboardX import SummaryWriter import gym import numpy as np import random from torch.distributions import Normal from itertools import count # Define hyperparameters ENV_NAME = "Pendulum-v0" # gym env BATCH_SIZE = 100 # mini-batch size when sampled from buffer MEM_CAPACTIY = 10000 # Replay buffer size EPISODES = 200 STEPS = 200 GAMMA = 0.9 # discount factor LEARNING_RATE = 1e-3 # learning rate of optimizer TAU = 0.01 # update the target net parameter smoothly RANDOM_SEED = 9527 # fix the random seed # SAMPLE_FREQ = 2000  NOISE_VAR = 0.1 RENDER = False device = 'cuda' if torch.cuda.is_available() else 'cpu' # use GPU to train print(device) env = gym.make(ENV_NAME) ACTION_DIM = env.action_space.shape[0] #dim=1 STATE_DIM = env.observation_space.shape[0] # dim=3 ACTION_BOUND = env.action_space.high[0] # action interval [-2,2] np.random.seed(RANDOM_SEED) # fix the random seed directory = '.\\exp\\' class ReplayBuffer(): def __init__(self, max_size=MEM_CAPACTIY): self.storage = [] #empty list self.max_size = max_size self.pointer= 0 def store_transition(self, transition): if len(self.storage) == self.max_size: # replace the old data self.storage[self.pointer] = transition self.pointer = (self.pointer + 1) % self.max_size # point to next position else: self.storage.append(transition) def sample(self, batch_size): # Define the array of indices for random sampling from storage # the size of this array equals to batch_size ind_array = np.random.randint(0, len(self.storage),size=batch_size) s, a, r, s_, d = [], [], [], [], [] for i in ind_array: S, A, R, S_, D = self.storage[i] s.append(np.array(S, copy=False)) a.append(np.array(A, copy=False)) r.append(np.array(R, copy=False)) s_.append(np.array(S_, copy=False)) d.append(np.array(D, copy=False)) return np.array(s), np.array(a), np.array(r).reshape(-1, 1), np.array(s_), np.array(d).reshape(-1, 1) class Actor(nn.Module): def __init__(self, state_dim, action_dim, max_action): super(Actor, self).__init__() self.l1 = nn.Linear(state_dim, 30) self.l1.weight.data.normal_(0, 0.3) # initialization self.l2 = nn.Linear(30, action_dim) self.l2.weight.data.normal_(0, 0.3) # initialization self.max_action = max_action def forward(self, x): x = F.relu(self.l1(x)) x = self.max_action * torch.tanh(self.l2(x)) # the range of tanh is [-1, 1] return x class Critic(nn.Module): def __init__(self, state_dim, action_dim): super(Critic,self).__init__() self.l1 = nn.Linear(state_dim + action_dim, 30) self.l1.weight.data.normal_(0, 0.3) # initialization self.l2 = nn.Linear(30, 1) self.l2.weight.data.normal_(0, 0.3) # initialization def forward(self, x, a): x = F.relu(self.l1(torch.cat([x, a], 1))) x = self.l2(x) return x class DDPG(object): def __init__(self, state_dim, action_dim, max_action): # network, optimizer for actor self.actor = Actor(state_dim, action_dim, max_action).to(device) self.actor_target = Actor(state_dim, action_dim, max_action).to(device) self.actor_target.load_state_dict(self.actor.state_dict()) self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=LEARNING_RATE) # network, optimizer for critic self.critic = Critic(state_dim, action_dim).to(device) self.critic_target = Critic(state_dim, action_dim).to(device) self.critic_target.load_state_dict(self.critic.state_dict()) self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=LEARNING_RATE) # create replay buffer object self.replay_buffer = ReplayBuffer() def select_action(self, state): # select action based on actor network and add some noise on it for exploration state = torch.FloatTensor(state.reshape(1, -1)).to(device) action = self.actor(state).cpu().data.numpy().flatten() noise = np.random.normal(0, NOISE_VAR, size=ACTION_DIM).clip( env.action_space.low, env.action_space.high) action = action + noise return action def update(self): for i in range(EPISODES): s, a, r, s_, d = self.replay_buffer.sample(BATCH_SIZE) # transfer these tensors to GPU state = torch.FloatTensor(s).to(device) action = torch.FloatTensor(a).to(device) reward = torch.FloatTensor(r).to(device) next_state = torch.FloatTensor(s_).to(device) done = torch.FloatTensor(d).to(device) # compute the target Q value target_Q = self.critic_target(next_state, self.actor_target(next_state)) target_Q = reward + (done * GAMMA * target_Q).detach() # Get the current Q value current_Q = self.critic(state, action) # compute critic loss by MSE critic_loss = F.mse_loss(current_Q, target_Q) # use optimizer to update the critic network self.critic_optimizer.zero_grad() critic_loss.backward() self.critic_optimizer.step() # compute the actor loss and its gradient to update the parameters actor_loss = -self.critic(state,self.actor(state)).mean() self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step() # update the target network of actor and critic # zip() constructs tuple from iterable object for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()): target_param.data.copy_(TAU * param.data + (1-TAU) * target_param.data) for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()): target_param.data.copy_(TAU * param.data + (1-TAU) * target_param.data) def save(self): torch.save(self.actor.state_dict(), directory + 'actor.pth') torch.save(self.critic.state_dict(), directory + 'critic.pth') def load(self): self.actor.load_state_dict(torch.load(directory + 'actor.pth')) self.critic.load_state_dict(torch.load(directory + 'critic.pth')) def main(): agent = DDPG(STATE_DIM, ACTION_DIM, ACTION_BOUND) ep_r = 0 total_step = 0 for i in range(EPISODES): total_reward = 0 step = 0 state = env.reset() for t in count(): if RENDER == True and i > 100: env.render() # Render is unnecessary action = agent.select_action(state) # get the next transition by using current action next_state, reward, done, info = env.step(action) # store the transition to the buffer agent.replay_buffer.store_transition((state, action, reward / 10, next_state, np.float(done))) state = next_state if done: break step += 1 total_reward += reward total_step += step+1 print("Total T:{} Episode: \t{} Total Reward: \t{:0.2f}".format(total_step, i, total_reward)) agent.update() #NOISE_VAR *= 0.99 env.close() if __name__ == '__main__': main() 
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/225890.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 上午8:18
下一篇 2026年3月17日 上午8:19


相关推荐

  • 白盒测试和黑盒测试的区别

    白盒测试和黑盒测试的区别白盒测试 是指实际运行被测程序 通过程序的源代码进行测试而不使用用户界面 这种类型的测试需要从代码句法发现内部代码在算法 溢出 路径和条件等方面的缺点或者错误 进而加以修正 白盒测试把测试对象看作一个打开的盒子 黑盒测试 又称功能测试 数据驱动测试或基于规格说明的测试 是通过使用整个软件或某种软件功能来严格地测试 而并没有通过检查程序的源代码 或者很清楚地了解该软件的源代码程序具体是

    2025年8月10日
    4
  • 学习BoundsChecker

    对C++程序不熟悉,但是因为工作需要,要对一些程序进行测试分析,找出是否有内存泄露情况。在网上找到了大家比较推崇的BoundsChecker,安装后开始看软件自带的手册,为了让更多的同事一起学习,要将全英文的手册翻译成中文,在这个学习过程中,也把成果不断贴出来大家分享吧!今天先贴手册的目录                                                Boun

    2022年4月10日
    44
  • 网口调试步骤_万兆光口和千兆光口对接

    网口调试步骤_万兆光口和千兆光口对接千兆网口、光口调试总结配置6096端:工作模式的配置方式:1、 硬件配置,通过电阻上下拉确定;6096的硬件配置不可以错,其在portstatus寄存器状态中有相应的寄存器位体现硬件配置的工作模式。2、 软件配置,主要是配置链路层的工作模式。主要是设置PCS(Physicalcodingsublayer)寄存器。3、 Marvell的PHY芯片有个特性,叫P

    2025年11月11日
    6
  • IDEA全局搜索快捷键失效不起作用(Ctrl+Shift+F)

    IDEA全局搜索快捷键失效不起作用(Ctrl+Shift+F)原因 这种情况一般是快捷键冲突导致的 系统的快捷键或其他软件的快捷键和 IDEA 的快捷键冲突了 我们的搜狗输入法中 简繁切换的快捷键也是 Ctrl Shift F 导致我们 IDEA 的全局搜索失效了解决方式 1 在输入框点击右键 弹出的窗口点击更多设置 或有的版本是属性 2 选择属性设置 gt 按键 gt 系统功能快捷键设置 每个版本不同 总之就是找到输入法的快捷键设置的地方就 ok 3 找到简繁切换快捷键 将对钩取消就可以了

    2026年3月26日
    2
  • 网站优化网络推广怎么做_网站推广公司

    网站优化网络推广怎么做_网站推广公司如何优化网站,网站推广优化一般流程

    2022年4月21日
    59
  • 基于亚马逊云科技 Mac 实例部署 OpenClaw,深度苹果生态自动化的最佳选择

    基于亚马逊云科技 Mac 实例部署 OpenClaw,深度苹果生态自动化的最佳选择

    2026年3月13日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号