How to implement a new algorithm¶
In this section, we will walk through the implementation of Deep Deterministic Policy Gradient (DDPG) algorithm, available at baconian/algo/ddpg.py
. It utilizes many functionalities
provided by the framework, which we will describe below.
- The
ModelFreeAlgo
andOffPolicyAlgo
Classes
For the algorithms in Baconian project, we have writen many abstract classes to indicate
the characteristics of the algorithm, in baconian/algo/rl_algo.py
.
The DDPG
class inherits from ModelFreeAlgo
and OffPolicyAlgo
classes’ ModelFreeAlgo
, OffPolicyAlgo
and other classes in baconian/algo/rl_algo.py
inherit Algo
class to categorize DRL algorithms.
from baconian.algo.algo import Algo
from baconian.algo.dynamics.dynamics_model import DynamicsModel
from baconian.core.core import EnvSpec
from baconian.common.logging import record_return_decorator
import numpy as np
class ModelFreeAlgo(Algo):
def __init__(self, env_spec: EnvSpec, name: str = 'model_free_algo', warm_up_trajectories_number=0):
super(ModelFreeAlgo, self).__init__(env_spec, name, warm_up_trajectories_number)
class OnPolicyAlgo(Algo):
pass
class OffPolicyAlgo(Algo):
pass
class ValueBasedAlgo(Algo):
pass
class PolicyBasedAlgo(Algo):
pass
class ModelBasedAlgo(Algo):
def __init__(self, env_spec, dynamics_model: DynamicsModel, name: str = 'model_based_algo'):
super(ModelBasedAlgo, self).__init__(env_spec, name)
self._dynamics_model = dynamics_model
self.dynamics_env = self._dynamics_model.return_as_env()
def train_dynamics(self, *args, **kwargs):
pass
@record_return_decorator(which_recorder='self')
def test_dynamics(self, env, sample_count, *args, **kwargs):
self.set_status('TEST')
env.set_status('TEST')
st = env.reset()
real_state_list = []
dyanmics_state_list = []
for i in range(sample_count):
ac = self.env_spec.action_space.sample()
self._dynamics_model.reset_state(state=st)
new_state_dynamics, _, _, _ = self.dynamics_env.step(action=ac, )
new_state_real, _, done, _ = env.step(action=ac)
real_state_list.append(new_state_real)
dyanmics_state_list.append(new_state_dynamics)
st = new_state_real
if done is True:
env.reset()
l1_loss = np.linalg.norm(np.array(real_state_list) - np.array(dyanmics_state_list), ord=1)
l2_loss = np.linalg.norm(np.array(real_state_list) - np.array(dyanmics_state_list), ord=2)
return dict(dynamics_test_l1_error=l1_loss, dynamics_test_l2_error=l2_loss)
def set_terminal_reward_function_for_dynamics_env(self, terminal_func, reward_func):
self.dynamics_env.set_terminal_reward_func(terminal_func, reward_func)
Each new algorithm should implement the methods and attributes defined in Algo
class (baconian/algo/algo.py
).
from baconian.core.core import Basic, EnvSpec, Env
from baconian.core.status import StatusWithSubInfo
import abc
from typeguard import typechecked
from baconian.common.logging import Recorder
from baconian.core.parameters import Parameters
from baconian.common.sampler.sample_data import TrajectoryData
class Algo(Basic):
"""
Abstract class for algorithms
"""
STATUS_LIST = ['CREATED', 'INITED', 'TRAIN', 'TEST']
INIT_STATUS = 'CREATED'
@typechecked
def __init__(self, env_spec: EnvSpec, name: str = 'algo', warm_up_trajectories_number=0):
"""
Constructor
:param env_spec: environment specifications
:type env_spec: EnvSpec
:param name: name of the algorithm
:type name: str
:param warm_up_trajectories_number: how many trajectories used to warm up the training
:type warm_up_trajectories_number: int
"""
super().__init__(status=StatusWithSubInfo(obj=self), name=name)
self.env_spec = env_spec
self.parameters = Parameters(dict())
self.recorder = Recorder(default_obj=self)
self.warm_up_trajectories_number = warm_up_trajectories_number
def init(self):
"""
Initialization method, such as network random initialization in Tensorflow
:return:
"""
self._status.set_status('INITED')
def warm_up(self, trajectory_data: TrajectoryData):
"""
Use some data to warm up the algorithm, e.g., compute the mean/std-dev of the state to perform normalization.
Data used in warm up process will not be added into the memory
:param trajectory_data: TrajectoryData object
:type trajectory_data: TrajectoryData
:return: None
"""
pass
def train(self, *arg, **kwargs) -> dict:
"""
Training API, specific arguments should be defined by each algorithms itself.
:return: training results, e.g., loss
:rtype: dict
"""
self._status.set_status('TRAIN')
return dict()
def test(self, *arg, **kwargs) -> dict:
"""
Testing API, most of the evaluation can be done by agent instead of algorithms, so this API can be skipped
:return: test results, e.g., rewards
:rtype: dict
"""
self._status.set_status('TEST')
return dict()
@abc.abstractmethod
def predict(self, *arg, **kwargs):
"""
Predict function, given the obs as input, return the action, obs will be read as the first argument passed into
this API, like algo.predict(obs=x, ...)
:return: predicted action
:rtype: np.ndarray
"""
raise NotImplementedError
@abc.abstractmethod
def append_to_memory(self, *args, **kwargs):
"""
For off-policy algorithm, use this API to append the data into replay buffer. samples will be read as the first
argument passed into this API, like algo.append_to_memory(samples=x, ...)
"""
raise NotImplementedError
@property
def is_training(self):
"""
A boolean indicate the if the algorithm is in training status
:return: True if in training
:rtype: bool
"""
return self.get_status()['status'] == 'TRAIN'
@property
def is_testing(self):
"""
A boolean indicate the if the algorithm is in training status
:return: True if in testing
:rtype: bool
"""
return self.get_status()['status'] == 'TEST'
- The
MultiPlaceholderInput
Class
The algorithms in Baconian project are mostly implemented with TensorFlow, similar in the process of
saving and loading the parameters. Hence, parameters are stored in the format of TensorFlow variables by
PlaceholderInput
and MultiPlaceholderInput
classes.
class DDPG(ModelFreeAlgo, OffPolicyAlgo, MultiPlaceholderInput):
# ...
@record_return_decorator(which_recorder='self')
def save(self, global_step, save_path=None, name=None, **kwargs):
save_path = save_path if save_path else GlobalConfig().DEFAULT_MODEL_CHECKPOINT_PATH
name = name if name else self.name
MultiPlaceholderInput.save(self, save_path=save_path, global_step=global_step, name=name, **kwargs)
return dict(check_point_save_path=save_path, check_point_save_global_step=global_step,
check_point_save_name=name)
@record_return_decorator(which_recorder='self')
def load(self, path_to_model, model_name, global_step=None, **kwargs):
MultiPlaceholderInput.load(self, path_to_model, model_name, global_step, **kwargs)
return dict(check_point_load_path=path_to_model, check_point_load_global_step=global_step,
check_point_load_name=model_name)
- Constructor
class DDPG(ModelFreeAlgo, OffPolicyAlgo, MultiPlaceholderInput):
required_key_dict = DictConfig.load_json(file_path=GlobalConfig().DEFAULT_DDPG_REQUIRED_KEY_LIST)
@typechecked()
def __init__(self,
env_spec: EnvSpec,
config_or_config_dict: (DictConfig, dict),
value_func: MLPQValueFunction,
policy: DeterministicMLPPolicy,
schedule_param_list=None,
name='ddpg',
replay_buffer=None):
"""
:param env_spec: environment specifications, like action apace or observation space
:param config_or_config_dict: configuraion dictionary, like learning rate or decay, if any
:param value_func: value function
:param policy: agent policy
:param schedule_param_list:
:param name: name of algorithm class instance
:param replay_buffer: replay buffer, if any
"""
ModelFreeAlgo.__init__(self, env_spec=env_spec, name=name)
config = construct_dict_config(config_or_config_dict, self)
self.config = config
self.actor = policy
self.target_actor = self.actor.make_copy(name_scope='{}_target_actor'.format(self.name),
name='{}_target_actor'.format(self.name),
reuse=False)
self.critic = value_func
self.target_critic = self.critic.make_copy(name_scope='{}_target_critic'.format(self.name),
name='{}_target_critic'.format(self.name),
reuse=False)
self.state_input = self.actor.state_input
if replay_buffer:
assert issubclass(replay_buffer, BaseReplayBuffer)
self.replay_buffer = replay_buffer
else:
self.replay_buffer = UniformRandomReplayBuffer(limit=self.config('REPLAY_BUFFER_SIZE'),
action_shape=self.env_spec.action_shape,
observation_shape=self.env_spec.obs_shape)
self.parameters = ParametersWithTensorflowVariable(tf_var_list=[],
rest_parameters=dict(),
to_scheduler_param_tuple=schedule_param_list,
name='ddpg_param',
source_config=config,
require_snapshot=False)
"""
self.parameters contains all the parameters (variables) of the algorithm
"""
self._critic_with_actor_output = self.critic.make_copy(reuse=True,
name='actor_input_{}'.format(self.critic.name),
state_input=self.state_input,
action_input=self.actor.action_tensor)
self._target_critic_with_target_actor_output = self.target_critic.make_copy(reuse=True,
name='target_critic_with_target_actor_output_{}'.format(
self.critic.name),
action_input=self.target_actor.action_tensor)
with tf.variable_scope(name):
self.reward_input = tf.placeholder(shape=[None, 1], dtype=tf.float32)
self.next_state_input = tf.placeholder(shape=[None, self.env_spec.flat_obs_dim], dtype=tf.float32)
self.done_input = tf.placeholder(shape=[None, 1], dtype=tf.bool)
self.target_q_input = tf.placeholder(shape=[None, 1], dtype=tf.float32)
done = tf.cast(self.done_input, dtype=tf.float32)
self.predict_q_value = (1. - done) * self.config('GAMMA') * self.target_q_input + self.reward_input
with tf.variable_scope('train'):
self.critic_loss, self.critic_update_op, self.target_critic_update_op, self.critic_optimizer, \
self.critic_grads = self._setup_critic_loss()
self.actor_loss, self.actor_update_op, self.target_actor_update_op, self.action_optimizer, \
self.actor_grads = self._set_up_actor_loss()
var_list = get_tf_collection_var_list(
'{}/train'.format(name)) + self.critic_optimizer.variables() + self.action_optimizer.variables()
self.parameters.set_tf_var_list(tf_var_list=sorted(list(set(var_list)), key=lambda x: x.name))
MultiPlaceholderInput.__init__(self,
sub_placeholder_input_list=[dict(obj=self.target_actor,
attr_name='target_actor',
),
dict(obj=self.actor,
attr_name='actor'),
dict(obj=self.critic,
attr_name='critic'),
dict(obj=self.target_critic,
attr_name='target_critic')
],
parameters=self.parameters)