强化学习(on-policy)同步并行采样(on-line)的并行化效率分析

在强化学习中(on-line)的算法如果是on-policy的算法都是需要较大的采样样本的,因此采样的效率往往对整个算法运行效率有着自关重要的影响,在deepmind(Google)公司的强化学习的并行采样设计中往往使用带有timeout的带有时间过期的队列(queue)来进行并行采样的多进程间的同步和通信,但是这种设计往往十分复杂,编程难度也极大,不过也正因如此也比较吸引人们的关注,对此我也是如此,不过我一直在考虑这种timeout的同步通信方式是否真的可以提高运行效率,如果不能的话或者提高的效率有限的话,那么搞这么复杂的设计是不是自找苦吃,也或者只是一种炫技的操作呢。

为了分析强化学习的并行采样到底应该如何设计,或者说不同种类的on-line的on-policy的同步并行采样应该如何设计,为此在项目:

https://openi.pcl.ac.cn/devilmaycry812839668/pytorch-maml-rl

基础上修改出了下面的实验项目:

https://openi.pcl.ac.cn/devilmaycry812839668/rl-sampling-experiments

其实,这个新改出的项目其实就是改了两个文件,即,multi_task_sampler.py 和 sync_vector_env_steps.py

给出 multi_task_sampler.py 的代码:

点击查看代码
  ########################################   import os import time import signal import sys import copy import numpy as np from queue import Empty from multiprocessing import Queue from multiprocessing.sharedctypes import RawArray from typing import List, Callable, Tuple, Dict from gymnasium import Env  ################################## from ctypes import c_uint, c_float, c_double, c_int64  import torch   from maml_rl.envs.utils.sync_vector_env_steps import ChildEnv from maml_rl.samplers.sampler import make_env  NUMPY_TO_C_DTYPE = {np.float32: c_float,                      np.float64: c_double,                      np.uint8: c_uint,                     np.int64: c_int64,}    class MultiTaskSampler():     def __init__(self,                  env_name,                  env_kwargs,                  batch_size,                  policy,                  baseline,                  seed=None,                  num_workers=8):         self.seed = seed         self.batch_size = batch_size         self.num_workers = num_workers         self.policy = policy         self.baseline = baseline          env_fns = [make_env(env_name, env_kwargs=env_kwargs) for _ in range(batch_size*num_workers)]         self.vector_env = SynVectorEnv(env_fns, num_workers)         self.env = make_env(env_name, env_kwargs=env_kwargs)()          self.first_init = True      def close(self):         self.vector_env.close()      def sample(self, max_length, params=None):         self.done_times = np.array([max_length+1, ]*self.num_workers)         if self.first_init:             self.first_init = False             seed = self.seed         else:             seed = None          # self.task_results = [{"obs":[[],], "action":[[],], "reward":[[],], "terminated":[[],],          #                       "truncated":[[],], "length":0, "episodes":0}          #                         for _ in range(self.batch_size*self.num_workers)]         self.task_results = [[] for _ in range(self.num_workers)]          ####################################### reset_task         tasks_profile = self.env.unwrapped.sample_tasks(self.batch_size*self.num_workers)         self.vector_env.reset_task(tasks_profile)                 ####################################### reset         # obss, infos = self.vector_env.reset(seed=seed)         data_dict = self.vector_env.reset(seed=seed)           while data_dict:             print(len(data_dict))             remove_list = []             for i, (index, data) in enumerate(data_dict.items()):                 # data[1]   ### infos                 self.task_results[index].append(data[0])                 self.done_times[index] -= 1                  if self.done_times[index] == 0:                     remove_list.append(index)              for it in remove_list:                 data_dict.pop(it)             if len(data_dict) == 0:                 data_dict = self.vector_env.step_recv()                 continue               obs_numpy = np.concatenate([data_dict[index][0]["next_obs"] for index in data_dict.keys()], dtype=np.float32)              observations_tensor = torch.from_numpy(obs_numpy)             pi = self.policy(observations_tensor, params=params)             actions_tensor = pi.sample()             actions = actions_tensor.cpu().numpy()              for i, (index, data) in enumerate(data_dict.items()):                 # data[1]  ### infos                 # self.task_results[index][-1]["next_actions"] = actions[i*self.batch_size:(i+1)*self.batch_size]                  self.vector_env.step_send(index, actions[i*self.batch_size:(i+1)*self.batch_size])              data_dict = self.vector_env.step_recv()      """         next_obs         next_obs_truncated         next_actions          rewards         terminated         truncated           self.shared_data["actions"] = _actions         self.shared_data["rewards"] = _rewards         self.shared_data["next_obs"] = _next_obs         self.shared_data["next_obs_truncated"] = _next_obs_truncated         self.shared_data["terminated"] = _terminated         self.shared_data["truncated"] = _truncated      """     def collect(self):         for i, t_result in enumerate(self.task_results):             task_obs = []             task_actions = []             task_rewards = []             for j, step_dict in enumerate(t_result):                 if j == 0:                     obs = [step_dict["next_obs"],]                     actions = []                     rewards = []                     continue                 elif step_dict["terminated"]:                     actions.append(step_dict["actions"])                     rewards.append(step_dict["rewards"])                      # obs.append(step_dict["next_obs"])                     task_obs.append(obs)                     task_actions.append(actions)                     task_rewards.append(rewards)                      obs = [step_dict["next_obs"],]                     actions = []                     rewards = []                 elif step_dict["truncated"]:                     actions.append(step_dict["actions"])                     rewards.append(step_dict["rewards"])                      obs.append(step_dict["next_obs_truncated"])                      task_obs.append(obs)                     task_actions.append(actions)                     task_rewards.append(rewards)                     obs = [step_dict["next_obs"],]                     actions = []                     rewards = []                 else:                     actions.append(step_dict["actions"])                     rewards.append(step_dict["rewards"])                      obs.append(step_dict["next_obs"])             task_episodes = {}         for i, result in enumerate(self.task_results):             for j in range(self.batch_size):                 # result[j]   self.batch_size*i+j                 x = 0                 task_episodes[self.batch_size*i+j]={x:{"obs":[],}, }             obs = [[], ]             actions = [[], ]             rewards = [[], ]             terminated = [[], ]             truncated = [[], ]     #########################################1   class SynVectorEnv(object):     def __init__(self, env_fns:List[Callable[[], Env]], n_process:int):         self.n_process = n_process         self.env_fns = env_fns         assert len(env_fns)%n_process == 0, "env_fns must be divided by n_process"          self.notify_queues = [Queue() for _ in range(n_process)]         self.barrier = Queue()          # self.shared_data = {"actions":None,          #                     "rewards":None,         #                     "next_obs":None,         #                     "next_obs_truncated":None,         #                     "terminated":None,         #                     "truncated":None,}         self.shared_data = dict()         self.init_shared_data(env_fns)         self.sub_shared_data = []          self.runners = []         for i, _s_env_fns in enumerate(np.split(np.asarray(env_fns), n_process)):             _s_shared_data = dict()             for _name, _data in self.shared_data.items():                 _s_shared_data[_name] = np.split(_data, n_process)[i]             self.runners.append(ChildEnv(i, list(_s_env_fns), _s_shared_data, self.notify_queues[i], self.barrier))               self.sub_shared_data.append(_s_shared_data)          for r in self.runners:  ### 启动子进程             r.start()         self._running_flag = True         self.clean()      def init_shared_data(self, env_fns:List[Callable[[], Env]]):         N = len(env_fns)         _env:Env = env_fns[0]()         _obs, _ = _env.reset()          if _env.action_space.sample().size == 1:             _actions = np.zeros(N, dtype=_env.action_space.dtype)         else:             _actions = np.asarray([np.zeros(_env.action_space.sample().size, dtype=_env.action_space.dtype) for _ in range(N)])         _rewards = np.zeros(N, dtype=np.float32)         _next_obs = np.asarray([ _obs for env in env_fns ])         _next_obs_truncated = np.asarray([ _obs for env in env_fns ])         _terminated = np.zeros(N, dtype=np.float32)         _truncated = np.zeros(N, dtype=np.float32)          _actions = self._get_shared(_actions)         _rewards = self._get_shared(_rewards)         _next_obs = self._get_shared(_next_obs)         _next_obs_truncated = self._get_shared(_next_obs_truncated)         _terminated = self._get_shared(_terminated)         _truncated = self._get_shared(_truncated)          self.shared_data["actions"] = _actions         self.shared_data["rewards"] = _rewards         self.shared_data["next_obs"] = _next_obs         self.shared_data["next_obs_truncated"] = _next_obs_truncated         self.shared_data["terminated"] = _terminated         self.shared_data["truncated"] = _truncated         self.running_num = 0      def _get_shared(self, array:np.ndarray):         """         Returns a RawArray backed numpy array that can be shared between processes.         :param array: the array to be shared         :return: the RawArray backed numpy array         """         dtype = NUMPY_TO_C_DTYPE[array.dtype.type]          shared_array = RawArray(dtype, array.size)         return np.frombuffer(shared_array, dtype).reshape(array.shape)      """         instruction:             None: 结束进程             0 -> "reset": 重置环境, "seed": 设置随机种子             1 -> "reset_task": 重置环境任务             2 -> "step": 执行step操作     """     def reset(self, seed:int|None=None):         results = {}         for i, _queue in enumerate(self.notify_queues):             if seed is None:                 _queue.put((0, seed))             else:                 _queue.put((0, seed+i*len(self.env_fns)//self.n_process))          for _ in range(self.n_process):             _index, _instruction, _info = self.barrier.get()             if _instruction != 0:                 raise ValueError("instruction must be 0, reset operation")             # results[_index] = [np.copy(self.sub_shared_data[i]["next_obs"]), _info]             results[_index] = [copy.deepcopy(self.sub_shared_data[i]), _info]         return results      def reset_task(self, tasks):         for i, _queue in enumerate(self.notify_queues):             _queue.put((1, tasks[i*(self.n_process):(i+1)*(self.n_process)]))      def step_send(self, index:int, actions:np.ndarray):         self.sub_shared_data[index]["actions"][:] = actions         self.notify_queues[index].put((2, None))  ### None 只是为了占位用         self.running_num += 1      def step_recv(self):         if self.running_num == 0:             return None          _index, _instruction, _info = self.barrier.get()         assert _instruction == 2, "instruction must be 2, step operation"         results ={_index: [copy.deepcopy(self.sub_shared_data[_index]), _info]}         for _ in range(self.running_num-1):             try:                 _index, _instruction, _info = self.barrier.get(timeout=0.0001)                 # _index, _instruction, _info = self.barrier.get()                 assert _instruction == 2, "instruction must be 2, step operation"                  results[_index] = [copy.deepcopy(self.sub_shared_data[_index]), _info]             except Empty as e:                 # print(e)                 pass         self.running_num -= len(results)         return results      def close(self):         for queue in self.notify_queues:             queue.put((None, None))      def get_shared_variables(self) -> Dict[str, np.ndarray]:         return self.shared_data      def clean(self):         main_process_pid = os.getpid()         def signal_handler(signal, frame):             if os.getpid() == main_process_pid:                 print('Signal ' + str(signal) + ' detected, cleaning up.')                 if self._running_flag:                     self._running_flag = False                     self.close()                 print('Cleanup completed, shutting down...')                 sys.exit(0)         signal.signal(signal.SIGTERM, signal_handler)         signal.signal(signal.SIGINT, signal_handler) 

以及 sync_vector_env_steps.py 的代码实现:

点击查看代码
from multiprocessing import Process from multiprocessing import Queue import numpy as np from numpy import ndarray from typing import Callable, Dict, List, Tuple  """目前主要支持gymnasium环境,gym环境未成功支持""" from gymnasium import Env # try: #     import gymnasium as gym # except ImportError: #     import gym  # from gym import Env   """     多进程实现:         以子进程的方式运行游戏环境并与主进程交互数据 """ class ChildEnv(Process):      def __init__(self, index:int, env_fns:List[Callable[[], Env]], shared_data:Dict[str, ndarray],                   notify_queue:Queue, barrier:Queue):         super(ChildEnv, self).__init__()         self.index = index    # 子进程的编号         self.envs = [env_fn() for env_fn in env_fns]          self.actions = shared_data["actions"]         self.rewards = shared_data["rewards"]         self.next_obs = shared_data["next_obs"]         self.next_obs_truncated = shared_data["next_obs_truncated"]         self.terminated = shared_data["terminated"]         self.truncated = shared_data["truncated"]          self.notify_queue = notify_queue         self.barrier = barrier          # Check that all sub-environments are meta-learning environments         for env in self.envs:             if not hasattr(env.unwrapped, 'reset_task'):                 raise ValueError('The environment provided is not a '                                  'meta-learning environment. It does not have '                                  'the method `reset_task` implemented.')      def run(self):         super(ChildEnv, self).run()          """             instruction:                 None: 结束进程                 0 -> "reset": 重置环境, "seed": 设置随机种子                 1 -> "reset_task": 重置环境任务                 2 -> "step": 执行step操作         """         while True:             instruction, data = self.notify_queue.get()             if instruction == 2: # step操作                 # _actions = data  # return: (self.index, instruction, _infos)                 # _actions = self.actions                 pass             elif instruction == 0: # reset操作                 _seed = data if data else np.random.randint(2 ** 16 - 1)                 _infos = []                 for i, env in enumerate(self.envs):                     self.next_obs[i], _info = env.reset(seed=_seed+i)                     _infos.append(_info)                 self.barrier.put((self.index, instruction, _infos)) # 通知主进程, reset, observations初始化                 # self.barrier.put((self.index, instruction, None)) # 通知主进程, reset, observations初始化                 continue             elif instruction == 1: # reset_task操作                 _tasks = data                 for env, _task in zip(self.envs, _tasks):                     env.unwrapped.reset_task(_task)                 continue             elif instruction is None:                 break   ### exit sub_process             else:                 raise ValueError('Unknown instruction: {}'.format(instruction))              _infos = []                          for i, (env, action) in enumerate(zip(self.envs, self.actions)):                 next_obs, reward, terminated, truncated, info = env.step(action)                 if terminated:                     self.next_obs[i], _sub_info = env.reset()                     info["terminated"] = {"info":_sub_info, }                 elif truncated:                     self.next_obs_truncated[i] = next_obs                     self.next_obs[i], _sub_info = env.reset()                     info["truncated"] = {"info":_sub_info,                                          #  "next_obs":self.next_obs_truncated[i]                                          }                  self.rewards[i] = reward                 self.terminated[i] = terminated                 self.truncated[i] = truncated                  _infos.append(info)              # self.barrier.put((self.index, instruction, _infos))             self.barrier.put((self.index, instruction, None))  # 减少发送内容大小       """     # def step_wait(self):     def step(self, actions):         self._actions = actions          observations_list, infos = [], []         batch_ids, j = [], 0         num_actions = len(self._actions)         rewards = np.zeros((num_actions,), dtype=np.float32)         for i, env in enumerate(self.envs):             if self._dones[i]:                 continue              action = self._actions[j]             observation, rewards[j], self._done, _truncation, info = env.step(action)             batch_ids.append(i)              self._dones[i] = self._done or _truncation             if not self._dones[i]:                 observations_list.append(observation)                 infos.append(info)             j += 1         assert num_actions == j          if observations_list:             observations = create_empty_array(self.single_observation_space,                                               n=len(observations_list),                                               fn=np.zeros)             concatenate(self.single_observation_space,                         observations_list,                         observations)             observations = np.asarray(observations, dtype=np.float32)         else:             observations = None                   return (observations, rewards, np.copy(self._dones),                 {'batch_ids': batch_ids, 'infos': infos})     """ 

其中,sync_vector_env_steps.py 文件的实现是比较常见的,就是多进程同步并行,然后每个进程下环境串行,以此来增加当次的通信数据量并提高吞吐效率,最终实现提高并行运算的效率。

该实现中主进程和子进程间的通信使用共享内存的方式进行,同步的信号通过队列的方式进行传递,该中方式被认为是on-policy并行中效率最高的方式,具体可以参见gymnasium中的更详细的实现方式。

其实,最为关键的实现代码是在 文件 multi_task_sampler.py 中。

在这里设计 step_send 函数,该函数接受的是主进程发送给子进程的 actions数据,其中,index 代表发送给的目标子进程的ID编号。

    def step_send(self, index:int, actions:np.ndarray):         self.sub_shared_data[index]["actions"][:] = actions         self.notify_queues[index].put((2, None))  ### None 只是为了占位用         self.running_num += 1 

step_recv 函数代表着从子进程中接受传递给主进程的消息信号,其中,_index 代表着发送方的子进程的ID号,收到这个信号就说明这个子进程对应的共享内存的数据已经被子进程写好了,子进程只需要从这块共享数据中将数据取走即可。因为这块共享内存未来还是要被下次的存放操作所覆盖的,因此这里需要对这块共享内存中的数据就行deepcopy,以保证这部分内存的数据在后期不会被改变。

    def step_recv(self):         if self.running_num == 0:             return None          _index, _instruction, _info = self.barrier.get()         assert _instruction == 2, "instruction must be 2, step operation"         results ={_index: [copy.deepcopy(self.sub_shared_data[_index]), _info]}         for _ in range(self.running_num-1):             try:                 _index, _instruction, _info = self.barrier.get(timeout=0.0001)                 # _index, _instruction, _info = self.barrier.get()                 assert _instruction == 2, "instruction must be 2, step operation"                  results[_index] = [copy.deepcopy(self.sub_shared_data[_index]), _info]             except Empty as e:                 # print(e)                 pass         self.running_num -= len(results)         return results 

而其中体现出同步通信的队列的timeout的超时操作的实现如下:

_index, _instruction, _info = self.barrier.get(timeout=0.0001)

可以看到,如果timeout时间过小,那么主进程就不会等待所有子进程都完成一个step的操作,而是将部分进程的单步step数据传递给上一层的操作;同理,如果timeout的时间过大,那么也就意味着主进程会等待所有子进程的单步step的操作数据,然后一并上传递给上一层操作。

也正是这一点的不同才是Google的实现与其他实现的不同所在,在Google实现中是要把这个timeout设置为一个比较小的数值,以保证主进程不会一直堵塞在等待子进程数据的操作上,这样来提高算法的采样效率,而我一直疑问的也就是这个地方,这个timeout是否应该存在,存在的意义在哪,或者更应该说到底哪类应用中该设置这个timeout.

给出入口文件的部分代码,如下:

    if args.seed is not None:         torch.manual_seed(args.seed)         torch.cuda.manual_seed_all(args.seed)      env = gym.make(config['env-name'], **config.get('env-kwargs', {}))     env.close()      # Policy     policy = get_policy_for_env(env,                                 hidden_sizes=config['hidden-sizes'],                                 nonlinearity=config['nonlinearity'])     policy.to(args.device)      print(MultiTaskSampler)     xxx = MultiTaskSampler(env_name="HalfCheetahDir-v5", env_kwargs={}, batch_size=20, policy=policy, baseline=None, seed=None)      print("...")     import time     _a = time.time()     xxx.sample(100)     _b = time.time()     print(_b-_a)     xxx.close()     print("...") 

当 batch_size=20 时,8个子进程,也就是8个进程并行运行20x8=160个 mujoco 环境,当timeout=0.0001s时,运行结果如下:

强化学习(on-policy)同步并行采样(on-line)的并行化效率分析

强化学习(on-policy)同步并行采样(on-line)的并行化效率分析

当out=0.1s时,运行结果如下:

强化学习(on-policy)同步并行采样(on-line)的并行化效率分析

其中上面的运行命令如下:

time python x.py --config configs/maml/halfcheetah-dir.yaml --output-folder maml-halfcheetah-dir444 --seed 1 --num-workers 8 --use-cuda







给出另一种情况,当 batch_size=4000 时,8个子进程,也就是8个进程并行运行4000x8=32000个 mujoco 环境,当timeout=0.0001s时,运行结果如下:

强化学习(on-policy)同步并行采样(on-line)的并行化效率分析

当 batch_size=4000 时,8个子进程,也就是8个进程并行运行4000x8=32000个 mujoco 环境,当timeout=0.1s时,运行结果如下:

强化学习(on-policy)同步并行采样(on-line)的并行化效率分析

由上面的数据可以看到,在并行的环境数较少时,比如160个环境,timeout=0.0001s,这样大致每次收集到主进程的子进程数据大约为4个,当timeout=0.1s,收集到主进程的子进程数据大约为8个,这种情况下使用timeout=0.0001s这种情况其实性能并不比不设置timeout的强,该种情况下主进程完成对所有子进程数据的收集往往性能更高,因为这样可以较少进程间数据交互的次数也能减少下一步神经网络计算的次数;而当环境数较多的情况下,比如32000个环境时,这时二者的差距不大,因此从上面的数据中我们可以看到,使用timeout这种并行同步的设置并没有体现出任何的优势,反而凭空增加的代码复杂度。

可以知道,尤其是在并行环境数较少的情况下,在上面的实验中往往不设置timeout,也就是父进程等待所有子进程的数据然后再进行下一步的操作会有更好的性能体现。不过需要注意的是,这里的神经网络的体量比较小,一个神经网络就只有10几个MB或者几十个MB的参数,因此该种情况下使用timeout来只等待部分数据就开始计算的方式并不难提高运行效率,甚至会因为增加了与神经网络的交互次数反而增加了运行时长,需要知道的是在GPU算力可以cover的情况下,一个神经网络运行计算一个数据的时间和运行几百个甚至几千个数据的运行时间是一致的,而上面的情况即属于这种,因为这个神经网络参数少,因此GPU完成一次正反传的计算只与计算次数有关而和计算的数据量无关,而这种情况下使用timeout就会导致出现与GPU交互次数增加的问题,因此在环境数较少的情况下使用timeout并不难提高运行效率,甚至会导致运行效率。换句话来说,那就是在并行环境数较少的情况下是不建议使用timeout的并行同步方式。

不过也有特殊情况,其实就是不满足与GPU交互的时间不区分输入数据的个数的情况,比如神经网络参数比较多,这个情况即使是使用全部子进程的数据进行计算也不见得提高算法,因为该种情况下GPU交互次数的减少并不能提高算法性能,比如说此时GPU计算一次正反传播计算,100个数据需要0.001秒,而200个数据需要0.002秒,而与GPU进行数据交互的用时为0.00001秒,那么这种情况下使用timeout是可以加速运算效率的,但是在比较小的神经网络的情况下,比如上面的例子中,GPU计算一次正反传播计算,100个数据需要0.000001秒,而200个数据同样需要0.000001秒,那么这种情况下减少与GPU的数据交互次数才能有效的提高算法运行效率。





为了补充刚才的实验,下面给出batch_size=4000时,1000次循环计算的用时,此时timeout=0.0001时用时为:314.17秒,而timeout=0.1秒时,用时为:341.86秒。

根据这次的实验可以知道之前关于batch_size=4000的实验存在一定的误差,而以最后的实验数据为准。从这个实验可以知道当环境数足够多时,使用timeout才会有性能提升,之所以这样是因为当环境数足够多时每个子进程的单步step运行时间才会出现显著的差异,而这个差异是可以弥补掉增加与GPU交互次数而增加的运行时长的。我们假设每个环境完成单个step的运算时长为服从某个正太分布,(X_1、X_2、...... X_n)分别为不同环境的运行时长,那么(E(X_1、X_2、...... X_n)=E(X_1)+E(X_2)+......+ E(X_n)),有了期望的计算公式后我们知道根据方差的计算可以知道,随着环境数的增加,不同进程完成单step的用时的差别也会显著的增加,而出现各子进程完成单步计算的用时出现显著差异时使用timeout的方式使用GPU在单次数据交互时完成部分子进程的数据才会提高运行效率。

为了形象说明单进程环境数量增加会导致个子进程完成单step的用时的不同,给出下面的例子:

强化学习(on-policy)同步并行采样(on-line)的并行化效率分析

从这个例子可以看到,随着size的增加,两个队列的数据的sum的差别也就越大。

PS:

虽然使用timeout的设计方法可以提高(1)神经网络参数及大并且数据量极大的情况(比如大型的CNN或Tansformer神经网络结构,那种使用H100显卡,batch_size=32s时都会使用100%GPU,并且GPU温度直接上升到90摄氏度以上的情况);(2)单进程中环境数量极大情况(上面例子中为单进程运行4000个mujoco环境),否则使用这种timeout的设计是并不会起到什么实际作用的,并不会有什么运行效率的显著提升的。

这就和TensorFlow设计的多进程磁盘数据读取和特定的特制的磁盘系统和文件系统一样,只能对 1% 情况的应用有作用,在实际应用中在绝大多数情况下还比不上pytorch直接调用python自带库运行的效率高的“搞笑”想象。由于经常阅读Google的项目源代码,发现Google经常喜欢搞这种炫技的操作,虽然有的时候确实有神来一笔的妙用,不过绝大多数情况下都是要人不知所以的累赘操作。

发表评论

您必须 [ 登录 ] 才能发表留言!

相关文章