Hello. When thinking about changing the rollout function as mentioned in #34 , I realized that the model should have a terminal state in order for the POMCP planner to not add any value after taking a decision within a trial. However, after implementing such a state using the reward and transition models, I found that the POMCP planning tree always takes a decision after the first observation, with really low belief on the corresponding state. Since this was not happening before, I was wondering whether I incorrectly implemented the goal state.
First, this is how the tree for a given trial looks
TRIAL 0 (true state s_3-t_0)
--------------------
STEP 0 (6 steps remaining)
Current belief (based on 0s of data):
s_0-t_0 -> 0.096
s_1-t_0 -> 0.075
s_2-t_0 -> 0.065
s_3-t_0 -> 0.062
s_4-t_0 -> 0.079
s_5-t_0 -> 0.066
s_6-t_0 -> 0.089
s_7-t_0 -> 0.1
s_8-t_0 -> 0.085
s_9-t_0 -> 0.087
s_10-t_0 -> 0.099
s_11-t_0 -> 0.097
TreeDebugger@
_VNodePP(n=14603, v=-26.635)
- [0] a_0: QNode(n=1, v=-100.000)
- [1] a_1: QNode(n=7, v=-84.286)
- [2] a_10: QNode(n=7, v=-84.286)
- [3] a_11: QNode(n=7, v=-84.286)
- [4] a_2: QNode(n=1, v=-100.000)
- [5] a_3: QNode(n=7, v=-84.286)
- [6] a_4: QNode(n=1, v=-100.000)
- [7] a_5: QNode(n=1, v=-100.000)
- [8] a_6: QNode(n=1, v=-100.000)
- [9] a_7: QNode(n=1, v=-100.000)
- [10] a_8: QNode(n=1, v=-100.000)
- [11] a_9: QNode(n=1, v=-100.000)
- [12] a_wait: QNode(n=14567, v=-26.635)
(Pdb) dd[12]
a_wait⟶_QNodePP(n=14567, v=-26.635)
- [0] o_0: VNodeParticles(n=1567, v=-24.875)
- [1] o_1: VNodeParticles(n=880, v=-29.977)
- [2] o_10: VNodeParticles(n=1942, v=-24.316)
- [3] o_11: VNodeParticles(n=1496, v=-30.452)
- [4] o_2: VNodeParticles(n=1201, v=-26.077)
- [5] o_3: VNodeParticles(n=788, v=-56.256)
- [6] o_4: VNodeParticles(n=756, v=-36.302)
- [7] o_5: VNodeParticles(n=1165, v=-31.310)
- [8] o_6: VNodeParticles(n=1238, v=-24.548)
- [9] o_7: VNodeParticles(n=986, v=-28.266)
- [10] o_8: VNodeParticles(n=1150, v=-31.324)
- [11] o_9: VNodeParticles(n=1386, v=-29.629)
Here we suppose we get o_3, the observation corresponding to the correct state:
(Pdb) dd[12][5]
o_3⟶_VNodePP(n=788, v=-56.256)
- [0] a_0: QNode(n=1, v=-100.000)
- [1] a_1: QNode(n=1, v=-100.000)
- [2] a_10: QNode(n=1, v=-100.000)
- [3] a_11: QNode(n=1, v=-100.000)
- [4] a_2: QNode(n=1, v=-100.000)
- [5] a_3: QNode(n=772, v=-56.256)
- [6] a_4: QNode(n=1, v=-100.000)
- [7] a_5: QNode(n=1, v=-100.000)
- [8] a_6: QNode(n=1, v=-100.000)
- [9] a_7: QNode(n=1, v=-100.000)
- [10] a_8: QNode(n=1, v=-100.000)
- [11] a_9: QNode(n=1, v=-100.000)
- [12] a_wait: QNode(n=5, v=-62.203)
What surprises me here is the high number of simulations going to a_3 here given that the only thing that happens is going to the terminal state:
(Pdb) dd[12][5][5]
a_3⟶_QNodePP(n=772, v=-56.256)
- [0] o_term: VNodeParticles(n=771, v=0.000)
While there are only 5 simulations for the 'wait' action, which in turn would collect more information and has different possible observations:
(Pdb) dd[12][5][12]
a_wait⟶_QNodePP(n=5, v=-62.203)
- [0] o_3: VNodeParticles(n=0, v=0.000)
- [1] o_4: VNodeParticles(n=0, v=0.000)
- [2] o_5: VNodeParticles(n=0, v=0.000)
- [3] o_6: VNodeParticles(n=0, v=0.000)
- [4] o_8: VNodeParticles(n=0, v=0.000)
I don't think this behavior is normal... it did not change when changing the exploration constant between the default value and 50. The terminal state is implemented as a state with id
'term' (i.e. s_term
). Here are my transition and reward models:
class TDTransitionModel(TransitionModel):
def __init__(self, n_targets, n_steps):
super().__init__(n_targets)
if not isinstance(n_steps, int):
raise TypeError(f"Invalid number of steps: {n_steps}. It must be an integer.")
self.n_steps = n_steps
self.max_t = self.n_steps - 1 # To handle 0 indexing of states and time steps
def probability(self, next_state, state, action):
"""Returns the probability p(s'|s, a)"""
# If the current state is the terminal state, transition to itself with probability 1
if "term" in state.name:
if "term" in next_state.name:
return 1.0
else:
return 0.0
else: # Not terminal state
if state.t == self.max_t or "wait" not in action.name: # Last time step or decision
if "term" in next_state.name: # Transition to terminal state
return 1
else:
return 0
else: # Wait action on time steps other than the last one
if next_state.t == state.t + 1: # For the next time step
if next_state.id == state.id:
return 1.0 - 1e-9
else: # Other states in the next time step
return 1e-9
else: # Can't travel through time... yet
return 0
def sample(self, state, action):
"""Randomly samples next state according to transition model"""
# Always sample the terminal state if current state is terminal
if "term" in state.name:
return TDState("term", self.max_t)
else: # Not terminal state
if state.t == self.max_t or "wait" not in action.name: # Last time step or decision
return TDState("term", 0)
else: # Wait action on time steps other than the last one
next_step = state.t + 1
return TDState(state.id, next_step)
def get_all_states(self, t_step=None):
"""Returns a list of all states"""
if t_step is not None: # Get list of states for a given time_step
all_states = [TDState(s, t_step) for s in range(self.n_targets)]
else: # All states, all time steps
all_states = [TDState(s, d) for s, d in itertools.product(range(self.n_targets), range(self.n_steps))]
all_states.append(TDState("term", 0))
return all_states
class RewardModel(pomdp_py.RewardModel):
def __init__(self, hit_reward=10, miss_cost=-100, wait_cost=-1):
if not all(isinstance(attr, int) for attr in [hit_reward, miss_cost, wait_cost]):
raise TypeError("All cost/reward values must be integers.")
self.hit_reward = hit_reward
self.miss_cost = miss_cost
self.wait_cost = wait_cost
def _reward_func(self, state, action):
"""
The correct action is assumed to be the one that shares ID (i.e., number) with a given state,
since we assume that each flicker is embedded in a button or actuator. Any action on the
terminal state gives a reward of 0.
"""
if 'term' in state.name:
return 0
else:
if 'wait' in action.name:
return self.wait_cost
else:
if action.id == state.id: # HIT
return self.hit_reward
else: # MISS
return self.miss_cost
def sample(self, state, action, next_state):
"""Deterministic reward"""
return self._reward_func(state, action)
Here is also my observation_model in case it helps:
class TDObservationModel(ObservationModel):
def __init__(self, features):
self.observation_matrix = self._make_obs_matrix(features)
self.n_steps, self.n_states, self.n_obs = self.observation_matrix.shape
def probability(self, observation, next_state, action):
if "term" in observation.name: # Terminal observation
if "term" in next_state.name or "wait" not in action.name: # Transition to terminal state
return 1
else:
return 0
else: # Non-terminal observation
if "term" in next_state.name or "wait" not in action.name:
return 0
else:
obs_idx = observation.id
state_idx = next_state.id
state_step = next_state.t - 1 # observation_matrix[0] corresponds to when next_state.t is 1
return self.observation_matrix[state_step][state_idx][obs_idx]
def sample(self, next_state, action):
if "term" in next_state.name or "wait" not in action.name: # Transition to terminal state
return BCIObservation('term')
else: # Other transitions
state_idx = next_state.id
state_step = next_state.t - 1 # observation_matrix[0] corresponds to when next_state.t is 1
obs_p = self.observation_matrix[state_step][state_idx]
return np.random.choice(self.get_all_observations(include_terminal=False), p=obs_p)