How to implement a new dynamics modelΒΆ
New dynamics model in Baconian project are supposed to implement the methods and attributes defined in
DynamicsModel
class (baconian/algo/dynamics/dynamics_model.py
).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | STATUS_LIST = ('CREATED', 'INITED')
INIT_STATUS = 'CREATED'
def __init__(self, env_spec: EnvSpec, parameters: Parameters = None, init_state=None, name='dynamics_model',
state_input_scaler: DataScaler = None,
action_input_scaler: DataScaler = None,
state_output_scaler: DataScaler = None):
"""
:param env_spec: environment specifications, such as observation space and action space
:type env_spec: EnvSpec
:param parameters: parameters
:type parameters: Parameters
:param init_state: initial state of dymamics model
:type init_state: str
:param name: name of instance, 'dynamics_model' by default
:type name: str
:param state_input_scaler: data preprocessing scaler of state input
:type state_input_scaler: DataScaler
:param action_input_scaler: data preprocessing scaler of action input
:type action_input_scaler: DataScaler
:param state_output_scaler: data preprocessing scaler of state output
:type state_output_scaler: DataScaler
"""
super().__init__(name=name)
self.env_spec = env_spec
self.state = init_state
self.parameters = parameters
self.state_input = None
self.action_input = None
self.new_state_output = None
self.recorder = Recorder(flush_by_split_status=False, default_obj=self)
self._status = StatusWithSingleInfo(obj=self)
self.state_input_scaler = state_input_scaler if state_input_scaler else IdenticalDataScaler(
dims=env_spec.flat_obs_dim)
self.action_input_scaler = action_input_scaler if action_input_scaler else IdenticalDataScaler(
dims=env_spec.flat_action_dim)
self.state_output_scaler = state_output_scaler if state_output_scaler else IdenticalDataScaler(
dims=env_spec.flat_obs_dim)
def init(self, *args, **kwargs):
self.set_status('INITED')
self.state = self.env_spec.obs_space.sample()
@register_counter_info_to_status_decorator(increment=1, info_key='step_counter')
def step(self, action: np.ndarray, state=None, allow_clip=False, **kwargs_for_transit):
"""
State transition function (only support one sample transition instead of batch data)
:param action: action to be taken
:type action: np.ndarray
:param state: current state, if None, will use stored state (saved from last transition)
:type state: np.ndarray
:param allow_clip: allow clip of observation space, default False
:type allow_clip: bool
:param kwargs_for_transit: extra kwargs for calling the _state_transit, this is typically related to the
specific mode you used
:type kwargs_for_transit:
:return: new state after step
:rtype: np.ndarray
"""
state = np.array(state).reshape(self.env_spec.obs_shape) if state is not None else self.state
action = action.reshape(self.env_spec.action_shape)
if allow_clip is True:
if state is not None:
state = self.env_spec.obs_space.clip(state)
action = self.env_spec.action_space.clip(action)
if self.env_spec.action_space.contains(action) is False:
raise StateOrActionOutOfBoundError(
'action {} out of bound of {}'.format(action, self.env_spec.action_space.bound()))
if self.env_spec.obs_space.contains(state) is False:
raise StateOrActionOutOfBoundError(
'state {} out of bound of {}'.format(state, self.env_spec.obs_space.bound()))
new_state = self._state_transit(state=state, action=self.env_spec.flat_action(action),
**kwargs_for_transit)
if allow_clip is True:
new_state = self.env_spec.obs_space.clip(new_state)
if self.env_spec.obs_space.contains(new_state) is False:
raise StateOrActionOutOfBoundError(
'new state {} out of bound of {}'.format(new_state, self.env_spec.obs_space.bound()))
self.state = new_state
return new_state
@abc.abstractmethod
def _state_transit(self, state, action, **kwargs) -> np.ndarray:
"""
:param state: original state
:type state: np.ndarray
:param action: action taken by agent
:type action: np.ndarray
:param kwargs:
:type kwargs:
:return: new state after transition
:rtype: np.ndarray
"""
raise NotImplementedError
def copy_from(self, obj) -> bool:
"""
:param obj: object to copy from
:type obj:
:return: True if successful else raise an error
:rtype: bool
"""
if not isinstance(obj, type(self)):
raise TypeError('Wrong type of obj %s to be copied, which should be %s' % (type(obj), type(self)))
return True
def make_copy(self):
""" Make a copy of parameters and environment specifications."""
raise NotImplementedError
def reset_state(self, state=None):
"""
:param state: original state
:type state: np.ndarray
:return: a random sample space in observation space
:rtype: np.ndarray
"""
if state is not None:
assert self.env_spec.obs_space.contains(state)
self.state = state
else:
self.state = self.env_spec.obs_space.sample()
def return_as_env(self) -> Env:
"""
:return: an environment with this dynamics model
:rtype: DynamicsEnvWrapper
"""
return DynamicsEnvWrapper(dynamics=self,
name=self._name + '_env')
|
Similar to algorithms, dynamics models are categorized in baconian/algo/dynamics/dynamics_model.py
,
such as GlobalDynamicsModel
and DifferentiableDynamics
.