How to implement a new algorithm¶

In this section, we will walk through the implementation of Deep Deterministic Policy Gradient (DDPG) algorithm, available at baconian/algo/ddpg.py. It utilizes many functionalities provided by the framework, which we will describe below.

  • The ModelFreeAlgo and OffPolicyAlgo Classes

For the algorithms in Baconian project, we have writen many abstract classes to indicate the characteristics of the algorithm, in baconian/algo/rl_algo.py. The DDPG class inherits from ModelFreeAlgo and OffPolicyAlgo classes’ ModelFreeAlgo, OffPolicyAlgo and other classes in baconian/algo/rl_algo.py inherit Algo class to categorize DRL algorithms.

from baconian.algo.algo import Algo
from baconian.algo.dynamics.dynamics_model import DynamicsModel
from baconian.core.core import EnvSpec
from baconian.common.logging import record_return_decorator
import numpy as np


class ModelFreeAlgo(Algo):
    def __init__(self, env_spec: EnvSpec, name: str = 'model_free_algo', warm_up_trajectories_number=0):
        super(ModelFreeAlgo, self).__init__(env_spec, name, warm_up_trajectories_number)


class OnPolicyAlgo(Algo):
    pass


class OffPolicyAlgo(Algo):
    pass


class ValueBasedAlgo(Algo):
    pass


class PolicyBasedAlgo(Algo):
    pass


class ModelBasedAlgo(Algo):
    def __init__(self, env_spec, dynamics_model: DynamicsModel, name: str = 'model_based_algo'):
        super(ModelBasedAlgo, self).__init__(env_spec, name)
        self._dynamics_model = dynamics_model
        self.dynamics_env = self._dynamics_model.return_as_env()

    def train_dynamics(self, *args, **kwargs):
        pass

    @record_return_decorator(which_recorder='self')
    def test_dynamics(self, env, sample_count, *args, **kwargs):
        self.set_status('TEST')
        env.set_status('TEST')
        st = env.reset()
        real_state_list = []
        dyanmics_state_list = []
        for i in range(sample_count):
            ac = self.env_spec.action_space.sample()
            self._dynamics_model.reset_state(state=st)
            new_state_dynamics, _, _, _ = self.dynamics_env.step(action=ac, )
            new_state_real, _, done, _ = env.step(action=ac)
            real_state_list.append(new_state_real)
            dyanmics_state_list.append(new_state_dynamics)
            st = new_state_real
            if done is True:
                env.reset()
        l1_loss = np.linalg.norm(np.array(real_state_list) - np.array(dyanmics_state_list), ord=1)
        l2_loss = np.linalg.norm(np.array(real_state_list) - np.array(dyanmics_state_list), ord=2)
        return dict(dynamics_test_l1_error=l1_loss, dynamics_test_l2_error=l2_loss)

    def set_terminal_reward_function_for_dynamics_env(self, terminal_func, reward_func):
        self.dynamics_env.set_terminal_reward_func(terminal_func, reward_func)

Each new algorithm should implement the methods and attributes defined in Algo class (baconian/algo/algo.py).

from baconian.core.core import Basic, EnvSpec, Env
from baconian.core.status import StatusWithSubInfo
import abc
from typeguard import typechecked
from baconian.common.logging import Recorder
from baconian.core.parameters import Parameters
from baconian.common.sampler.sample_data import TrajectoryData


class Algo(Basic):
    """
    Abstract class for algorithms
    """
    STATUS_LIST = ['CREATED', 'INITED', 'TRAIN', 'TEST']
    INIT_STATUS = 'CREATED'

    @typechecked
    def __init__(self, env_spec: EnvSpec, name: str = 'algo', warm_up_trajectories_number=0):
        """
        Constructor

        :param env_spec: environment specifications
        :type env_spec: EnvSpec
        :param name: name of the algorithm
        :type name: str
        :param warm_up_trajectories_number: how many trajectories used to warm up the training
        :type warm_up_trajectories_number: int
        """

        super().__init__(status=StatusWithSubInfo(obj=self), name=name)
        self.env_spec = env_spec
        self.parameters = Parameters(dict())
        self.recorder = Recorder(default_obj=self)
        self.warm_up_trajectories_number = warm_up_trajectories_number

    def init(self):
        """
        Initialization method, such as network random initialization in Tensorflow

        :return:
        """
        self._status.set_status('INITED')

    def warm_up(self, trajectory_data: TrajectoryData):
        """
        Use some data to warm up the algorithm, e.g., compute the mean/std-dev of the state to perform normalization.
        Data used in warm up process will not be added into the memory
        :param trajectory_data: TrajectoryData object
        :type trajectory_data: TrajectoryData

        :return: None
        """
        pass

    def train(self, *arg, **kwargs) -> dict:
        """
        Training API, specific arguments should be defined by each algorithms itself.

        :return: training results, e.g., loss
        :rtype: dict
        """

        self._status.set_status('TRAIN')
        return dict()

    def test(self, *arg, **kwargs) -> dict:
        """
        Testing API, most of the evaluation can be done by agent instead of algorithms, so this API can be skipped

        :return: test results, e.g., rewards
        :rtype: dict
        """

        self._status.set_status('TEST')
        return dict()

    @abc.abstractmethod
    def predict(self, *arg, **kwargs):
        """
        Predict function, given the obs as input, return the action, obs will be read as the first argument passed into
        this API, like algo.predict(obs=x, ...)

        :return: predicted action
        :rtype: np.ndarray
        """
        raise NotImplementedError

    @abc.abstractmethod
    def append_to_memory(self, *args, **kwargs):
        """
        For off-policy algorithm, use this API to append the data into replay buffer. samples will be read as the first
        argument passed into this API, like algo.append_to_memory(samples=x, ...)

        """
        raise NotImplementedError

    @property
    def is_training(self):
        """
        A boolean indicate the if the algorithm is in training status

        :return: True if in training
        :rtype: bool
        """
        return self.get_status()['status'] == 'TRAIN'

    @property
    def is_testing(self):
        """
        A boolean indicate the if the algorithm is in training status

        :return: True if in testing
        :rtype: bool
        """
        return self.get_status()['status'] == 'TEST'
  • The MultiPlaceholderInput Class

The algorithms in Baconian project are mostly implemented with TensorFlow, similar in the process of saving and loading the parameters. Hence, parameters are stored in the format of TensorFlow variables by PlaceholderInput and MultiPlaceholderInput classes.

class DDPG(ModelFreeAlgo, OffPolicyAlgo, MultiPlaceholderInput):

    # ...

    @record_return_decorator(which_recorder='self')
    def save(self, global_step, save_path=None, name=None, **kwargs):
        save_path = save_path if save_path else GlobalConfig().DEFAULT_MODEL_CHECKPOINT_PATH
        name = name if name else self.name
        MultiPlaceholderInput.save(self, save_path=save_path, global_step=global_step, name=name, **kwargs)
        return dict(check_point_save_path=save_path, check_point_save_global_step=global_step,
                    check_point_save_name=name)

    @record_return_decorator(which_recorder='self')
    def load(self, path_to_model, model_name, global_step=None, **kwargs):
        MultiPlaceholderInput.load(self, path_to_model, model_name, global_step, **kwargs)
        return dict(check_point_load_path=path_to_model, check_point_load_global_step=global_step,
                    check_point_load_name=model_name)
  • Constructor
class DDPG(ModelFreeAlgo, OffPolicyAlgo, MultiPlaceholderInput):
    required_key_dict = DictConfig.load_json(file_path=GlobalConfig().DEFAULT_DDPG_REQUIRED_KEY_LIST)

    @typechecked()
    def __init__(self,
                 env_spec: EnvSpec,
                 config_or_config_dict: (DictConfig, dict),
                 value_func: MLPQValueFunction,
                 policy: DeterministicMLPPolicy,
                 schedule_param_list=None,
                 name='ddpg',
                 replay_buffer=None):
        """

        :param env_spec: environment specifications, like action apace or observation space
        :param config_or_config_dict: configuraion dictionary, like learning rate or decay, if any
        :param value_func: value function
        :param policy: agent policy
        :param schedule_param_list:
        :param name: name of algorithm class instance
        :param replay_buffer: replay buffer, if any
        """
        ModelFreeAlgo.__init__(self, env_spec=env_spec, name=name)
        config = construct_dict_config(config_or_config_dict, self)

        self.config = config
        self.actor = policy
        self.target_actor = self.actor.make_copy(name_scope='{}_target_actor'.format(self.name),
                                                 name='{}_target_actor'.format(self.name),
                                                 reuse=False)
        self.critic = value_func
        self.target_critic = self.critic.make_copy(name_scope='{}_target_critic'.format(self.name),
                                                   name='{}_target_critic'.format(self.name),
                                                   reuse=False)

        self.state_input = self.actor.state_input

        if replay_buffer:
            assert issubclass(replay_buffer, BaseReplayBuffer)
            self.replay_buffer = replay_buffer
        else:
            self.replay_buffer = UniformRandomReplayBuffer(limit=self.config('REPLAY_BUFFER_SIZE'),
                                                           action_shape=self.env_spec.action_shape,
                                                           observation_shape=self.env_spec.obs_shape)

        self.parameters = ParametersWithTensorflowVariable(tf_var_list=[],
                                                           rest_parameters=dict(),
                                                           to_scheduler_param_tuple=schedule_param_list,
                                                           name='ddpg_param',
                                                           source_config=config,
                                                           require_snapshot=False)
        """
        self.parameters contains all the parameters (variables) of the algorithm
        """
        self._critic_with_actor_output = self.critic.make_copy(reuse=True,
                                                               name='actor_input_{}'.format(self.critic.name),
                                                               state_input=self.state_input,
                                                               action_input=self.actor.action_tensor)
        self._target_critic_with_target_actor_output = self.target_critic.make_copy(reuse=True,
                                                                                    name='target_critic_with_target_actor_output_{}'.format(
                                                                                        self.critic.name),
                                                                                    action_input=self.target_actor.action_tensor)

        with tf.variable_scope(name):
            self.reward_input = tf.placeholder(shape=[None, 1], dtype=tf.float32)
            self.next_state_input = tf.placeholder(shape=[None, self.env_spec.flat_obs_dim], dtype=tf.float32)
            self.done_input = tf.placeholder(shape=[None, 1], dtype=tf.bool)
            self.target_q_input = tf.placeholder(shape=[None, 1], dtype=tf.float32)
            done = tf.cast(self.done_input, dtype=tf.float32)
            self.predict_q_value = (1. - done) * self.config('GAMMA') * self.target_q_input + self.reward_input
            with tf.variable_scope('train'):
                self.critic_loss, self.critic_update_op, self.target_critic_update_op, self.critic_optimizer, \
                self.critic_grads = self._setup_critic_loss()
                self.actor_loss, self.actor_update_op, self.target_actor_update_op, self.action_optimizer, \
                self.actor_grads = self._set_up_actor_loss()

        var_list = get_tf_collection_var_list(
            '{}/train'.format(name)) + self.critic_optimizer.variables() + self.action_optimizer.variables()
        self.parameters.set_tf_var_list(tf_var_list=sorted(list(set(var_list)), key=lambda x: x.name))
        MultiPlaceholderInput.__init__(self,
                                       sub_placeholder_input_list=[dict(obj=self.target_actor,
                                                                        attr_name='target_actor',
                                                                        ),
                                                                   dict(obj=self.actor,
                                                                        attr_name='actor'),
                                                                   dict(obj=self.critic,
                                                                        attr_name='critic'),
                                                                   dict(obj=self.target_critic,
                                                                        attr_name='target_critic')
                                                                   ],
                                       parameters=self.parameters)