How to implement a new dynamics modelΒΆ

New dynamics model in Baconian project are supposed to implement the methods and attributes defined in DynamicsModel class (baconian/algo/dynamics/dynamics_model.py).

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
    STATUS_LIST = ('CREATED', 'INITED')
    INIT_STATUS = 'CREATED'

    def __init__(self, env_spec: EnvSpec, parameters: Parameters = None, init_state=None, name='dynamics_model',
                 state_input_scaler: DataScaler = None,
                 action_input_scaler: DataScaler = None,
                 state_output_scaler: DataScaler = None):

        """

        :param env_spec: environment specifications, such as observation space and action space
        :type env_spec: EnvSpec
        :param parameters: parameters
        :type parameters: Parameters
        :param init_state: initial state of dymamics model
        :type init_state: str
        :param name: name of instance, 'dynamics_model' by default
        :type name: str
        :param state_input_scaler: data preprocessing scaler of state input
        :type state_input_scaler: DataScaler
        :param action_input_scaler: data preprocessing scaler of action input
        :type action_input_scaler: DataScaler
        :param state_output_scaler: data preprocessing scaler of state output
        :type state_output_scaler: DataScaler
        """
        super().__init__(name=name)
        self.env_spec = env_spec
        self.state = init_state
        self.parameters = parameters
        self.state_input = None
        self.action_input = None
        self.new_state_output = None
        self.recorder = Recorder(flush_by_split_status=False, default_obj=self)
        self._status = StatusWithSingleInfo(obj=self)
        self.state_input_scaler = state_input_scaler if state_input_scaler else IdenticalDataScaler(
            dims=env_spec.flat_obs_dim)
        self.action_input_scaler = action_input_scaler if action_input_scaler else IdenticalDataScaler(
            dims=env_spec.flat_action_dim)
        self.state_output_scaler = state_output_scaler if state_output_scaler else IdenticalDataScaler(
            dims=env_spec.flat_obs_dim)

    def init(self, *args, **kwargs):
        self.set_status('INITED')
        self.state = self.env_spec.obs_space.sample()

    @register_counter_info_to_status_decorator(increment=1, info_key='step_counter')
    def step(self, action: np.ndarray, state=None, allow_clip=False, **kwargs_for_transit):

        """
        State transition function (only support one sample transition instead of batch data)

        :param action: action to be taken
        :type action: np.ndarray
        :param state: current state, if None, will use stored state (saved from last transition)
        :type state: np.ndarray
        :param allow_clip: allow clip of observation space, default False
        :type allow_clip: bool
        :param kwargs_for_transit: extra kwargs for calling the _state_transit, this is typically related to the
                                    specific mode you used
        :type kwargs_for_transit:
        :return: new state after step
        :rtype: np.ndarray
        """
        state = np.array(state).reshape(self.env_spec.obs_shape) if state is not None else self.state
        action = action.reshape(self.env_spec.action_shape)
        if allow_clip is True:
            if state is not None:
                    state = self.env_spec.obs_space.clip(state)
            action = self.env_spec.action_space.clip(action)
        if self.env_spec.action_space.contains(action) is False:
            raise StateOrActionOutOfBoundError(
                'action {} out of bound of {}'.format(action, self.env_spec.action_space.bound()))
        if self.env_spec.obs_space.contains(state) is False:
            raise StateOrActionOutOfBoundError(
                'state {} out of bound of {}'.format(state, self.env_spec.obs_space.bound()))
        new_state = self._state_transit(state=state, action=self.env_spec.flat_action(action),
                                        **kwargs_for_transit)
        if allow_clip is True:
            new_state = self.env_spec.obs_space.clip(new_state)
        if self.env_spec.obs_space.contains(new_state) is False:
            raise StateOrActionOutOfBoundError(
                'new state {} out of bound of {}'.format(new_state, self.env_spec.obs_space.bound()))
        self.state = new_state
        return new_state

    @abc.abstractmethod
    def _state_transit(self, state, action, **kwargs) -> np.ndarray:
        """

        :param state: original state
        :type state: np.ndarray
        :param action:  action taken by agent
        :type action: np.ndarray
        :param kwargs:
        :type kwargs:
        :return: new state after transition
        :rtype: np.ndarray
        """
        raise NotImplementedError

    def copy_from(self, obj) -> bool:
        """

        :param obj: object to copy from
        :type obj:
        :return: True if successful else raise an error
        :rtype: bool
        """
        if not isinstance(obj, type(self)):
            raise TypeError('Wrong type of obj %s to be copied, which should be %s' % (type(obj), type(self)))
        return True

    def make_copy(self):
        """ Make a copy of parameters and environment specifications."""
        raise NotImplementedError

    def reset_state(self, state=None):
        """

        :param state: original state
        :type state: np.ndarray
        :return: a random sample space in observation space
        :rtype: np.ndarray
        """
        if state is not None:
            assert self.env_spec.obs_space.contains(state)
            self.state = state
        else:
            self.state = self.env_spec.obs_space.sample()

    def return_as_env(self) -> Env:
        """

        :return: an environment with this dynamics model
        :rtype: DynamicsEnvWrapper
        """
        return DynamicsEnvWrapper(dynamics=self,
                                  name=self._name + '_env')


Similar to algorithms, dynamics models are categorized in baconian/algo/dynamics/dynamics_model.py, such as GlobalDynamicsModel and DifferentiableDynamics.