当前位置: 首页 > 游戏攻略> 正文

复现PPO算法在MiniGrid环境中的实践与经验总结

来源:网络 作者:趣玩小编 发布时间:2024-11-22 10:17:39

最近我在尝试复现PPO算法在MiniGrid环境中的运行,并记录下了一些经验和总结。

我选择了Empty-5x5和8x8这两个简单环境,主要是为了验证PPO算法的实现是否正确。

01 Proximal policy Optimization(PPO)

首先,策略梯度方法的梯度形式是...

然而,传统策略梯度方法容易一步走的太多,以至于越过了中间比较好的点(在参考知乎博客里称为overshooting)。一个直观的想法是限制策略每次不要更新太多,比如去约束新策略旧策略之间的KL散度(公式是 plog(p/q))...

我们把这个约束进行拉格朗日松弛,将它变成一个惩罚项...

然后再使用一些数学近似技巧,可以得到自然策略梯度(NPG)算法。

NPG算法貌似还有种种问题,比如KL散度的约束太紧,导致每次更新后的策略性能没有提升。我们希望每次策略更新后都带来性能提升,因此计算新策略旧策略之间预期回报的差异。这里采用计算advantage的方式...

其中优势函数(advantage)的定义是...

在公式(4)中,我们计算的advantage是在新策略的期望下的。但是,在新策略下蒙特卡洛采样(rollout)来算advantage期望太麻烦了,因此我们在原策略下rollout,并进行importance sampling,假装计算的是新策略下的advantage。这个advantage被称为替代优势(surrogate advantage)...

所产生的近似误差,貌似可以用两种策略之间最坏情况的KL散度表示...

其中C是一个常数。这貌似就是TRPO的单调改进定理,即,如果我们改进下限RHS,我们也会将目标LHS改进至少相同的量。

基于TRPO算法,我们可以得到PPO算法。PPO Penalty跟TRPO比较相近...

其中,KL散度惩罚的β是启发式确定的:PPO会设置一个目标散度\(\delta\),如果最终更新的散度超过目标散度的1.5倍,则下一次迭代我们将加倍β来加重惩罚。相反,如果更新太小,我们将β减半,从而扩大信任域。

接下来是PPO Clip,这貌似是目前最常用的PPO。PPO Penalty用β来惩罚策略变化,而PPO Clip与此不同,直接限制策略可以改变的范围。我们重新定义surrogate advantage...

其中,\(\rho_{t}\)为重要性采样的ratio...

公式(9)中,min括号里的第一项是ratio和advantage相乘,代表新策略下的advantage;min括号里的第二项是对ration进行的clip与advantage的相乘。这个min貌似可以限制策略变化不要太大。

02 如何复现PPO(参考stable baselines3和clean RL)

stable baselines3的PPO: https://github.com/DLR-RM/stable-baselines3/blob/master/stable_baselines3/ppo/ppo.py

clean RL的PPO: https://github.com/vwxyzjn/cleanrl/blob/master/cleanrl/ppo.py

代码主要结构如下,以stable baselines3为例:(仅保留主要结构,相当于伪代码,不保证正确性)

import torch
import torch.nn.functional as F
import numpy as np

# 1. collect rollout
self.policy.eval()
rollout_buffer.reset()
while not done:
    actions, values, log_probs = self.policy(self._last_obs)
    new_obs, rewards, dones, infos = env.step(clipped_actions)
    rollout_buffer.add(
        self._last_obs, actions, rewards,
        self._last_episode_starts, values, log_probs,
    )
    self._last_obs = new_obs
    self._last_episode_starts = dones

with torch.no_grad():
    # Compute value for the last timestep
    values = self.policy.predict_values(obs_as_tensor(new_obs, self.device)) 

rollout_buffer.compute_returns_and_advantage(last_values=values, dones=dones)


# 2. policy optimization
for rollout_data in self.rollout_buffer.get(self.batch_size):
    actions = rollout_data.actions
    values, log_prob, entropy = self.policy.evaluate_actions(rollout_data.observations, actions)
    advantages = rollout_data.advantages
    # Normalize advantage
    if self.normalize_advantage and len(advantages) > 1:
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

    # ratio between old and new policy, should be one at the first iteration
    ratio = torch.exp(log_prob - rollout_data.old_log_prob)

    # clipped surrogate loss
    policy_loss_1 = advantages * ratio
    policy_loss_2 = advantages * torch.clamp(ratio, 1 - clip_range, 1 + clip_range)
    policy_loss = -torch.min(policy_loss_1, policy_loss_2).mean()

    # Value loss using the TD(gae_lambda) target
    value_loss = F.mse_loss(rollout_data.returns, values_pred)

    # Entropy loss favor exploration
    entropy_loss = -torch.mean(entropy)

    loss = policy_loss + self.ent_coef * entropy_loss + self.vf_coef * value_loss

    # Optimization step
    self.policy.optimizer.zero_grad()
    loss.backward()
    # Clip grad norm
    torch.nn.utils.clip_grad_norm_(self.policy.parameters(), self.max_grad_norm)
    self.policy.optimizer.step()

大致流程:收集当前策略的rollout → 计算advantage → 策略优化。

计算advantage是由rollout_buffer.compute_returns_and_advantage函数实现的:

rb = rollout_buffer
last_gae_lam = 0
for step in reversed(range(buffer_size)):
    if step == buffer_size - 1:
        next_non_terminal = 1.0 - dones.astype(np.float32)
        next_values = last_values
    else:
        next_non_terminal = 1.0 - rb.episode_starts[step + 1]
        next_values = rb.values[step + 1]
    delta = rb.rewards[step] + gamma * next_values * next_non_terminal - rb.values[step]  # (1)
    last_gae_lam = delta + gamma * gae_lambda * next_non_terminal * last_gae_lam  # (2)
    rb.advantages[step] = last_gae_lam
rb.returns = rb.advantages + rb.values

其中,

  • (1) 行通过类似于TD error的形式(A = r + γV(s') - V(s)),计算当前t时刻的advantage;
  • (2) 行则是把t+1时刻的advantage乘gamma和gae_lambda传递过来。

03 记录一些踩坑经历

  1. PPO在收集rollout的时候,要在分布里采样,而非采用argmax动作,否则没有exploration。(PPO在分布里采样action,这样来保证探索,而非使用epsilon greedy等机制;听说epsilon greedy机制是value-based方法用的)
  2. 如果policy网络里有(比如说)batch norm,rollout时应该把policy开eval模式,这样就不会出错。
  3. (但是,不要加batch norm,加batch norm性能就不好了。听说RL不能加batch norm)
  4. minigrid简单环境,RNN加不加貌似都可以(?)
  5. 在算entropy loss的时候,要用真entropy,从Categorical分布里得到的entropy;不要用-logprob近似的,不然会导致策略分布熵变得很小炸掉。


相关攻略 更多 +
玩家最喜欢 更多 +
热门攻略 更多 +
热搜
查看完整榜单