POMDPPlanners.environments package
POMDP Environment Implementations.
This package contains concrete implementations of various POMDP environments used for testing and benchmarking planning algorithms. Each environment implements the core Environment interface with specific state spaces, action spaces, observation models, and reward functions.
- Available Environments:
TigerPOMDP: Classic tiger problem with discrete states and observations CartPolePOMDP: Pole balancing task with continuous states, discrete actions MountainCarPOMDP: Car climbing hill task with continuous state space PushPOMDP: Object manipulation task with spatial reasoning SafeAntVelocityPOMDP: Safety-constrained ant navigation SanityPOMDP: Simple test environment for debugging DiscreteLightDarkPOMDP: Grid-based light-dark navigation ContinuousLightDarkPOMDP: Continuous light-dark navigation problem LaserTagPOMDP: Pursuit-evasion problem with robot tagging opponent RockSamplePOMDP: Rock sampling problem with sensor-based rock quality evaluation
- Factory Functions:
get_environment: Create environment instances by name with parameters
- class POMDPPlanners.environments.CartPolePOMDP(discount_factor, noise_cov, state_transition_cov=None, name='CartPolePOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
DiscreteActionsEnvironmentCartPole balancing task formulated as a POMDP.
This environment simulates the classic cart-pole balancing problem where an agent must apply left or right forces to keep a pole balanced on a moving cart. The challenge comes from noisy observations of the cart-pole state.
Problem Structure: - State: [cart_position, cart_velocity, pole_angle, pole_velocity] (continuous) - Actions: [left_force, right_force] (discrete) - Observations: Noisy state measurements (continuous) - Rewards: +1.0 per time step alive, 0.0 when terminated - Termination: Pole falls beyond angle threshold or cart moves too far
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> noise_cov = np.diag([0.1, 0.1, 0.1, 0.1]) >>> env = CartPolePOMDP(discount_factor=0.99, noise_cov=noise_cov) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- Parameters:
- DEFAULT_STATE_TRANSITION_COV = array([[1.0e-04, 0.0e+00, 0.0e+00, 0.0e+00], [0.0e+00, 1.0e-04, 0.0e+00, 0.0e+00], [0.0e+00, 0.0e+00, 2.5e-05, 0.0e+00], [0.0e+00, 0.0e+00, 0.0e+00, 1.0e-04]])
- compute_metrics(histories)[source]
Compute CartPole POMDP specific metrics from simulation histories.
- Parameters:
- Return type:
- Returns:
List of MetricValue objects containing the computed metrics
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
ndarray) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- class POMDPPlanners.environments.ContinuousLaserTagPOMDP(discount_factor, name='ContinuousLaserTagPOMDP', grid_size=(11.0, 7.0), walls=None, robot_radius=0.3, opponent_radius=0.3, tag_radius=0.5, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, robot_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), opponent_transition_cov_matrix=array([[0.05, 0.], [0., 0.05]]), pursuit_speed=0.6, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None)[source]
Bases:
EnvironmentContinuous LaserTag POMDP with continuous
[dx, dy, tag_flag]actions.A pursuit-evasion problem in continuous 2-D space where a robot must navigate to tag an opponent. The robot receives noisy 8-direction laser range observations.
Example
>>> import numpy as np >>> np.random.seed(42) >>> >>> # Initialize environment >>> env = ContinuousLaserTagPOMDP(discount_factor=0.95) >>> >>> # Get initial state >>> initial_state = env.initial_state_dist().sample()[0] >>> >>> # Sample complete step >>> action = np.array([1.0, 0.0, 0.0]) >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
name (str)
robot_radius (float)
opponent_radius (float)
tag_radius (float)
tag_reward (float)
tag_penalty (float)
step_cost (float)
measurement_noise (float)
robot_transition_cov_matrix (np.ndarray)
opponent_transition_cov_matrix (np.ndarray)
pursuit_speed (float)
dangerous_area_radius (float)
dangerous_area_penalty (float)
output_dir (Optional[Path])
debug (bool)
use_queue_logger (bool)
initial_state (Optional[np.ndarray])
- cache_visualization(history, cache_path)[source]
Cache visualization data for an episode history.
This method can be overridden by subclasses to provide environment-specific visualization caching capabilities.
- compute_metrics(histories)[source]
Compute environment-specific metrics from episode histories.
This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.
- get_metric_names()[source]
Get names of environment-specific metrics.
This method returns the names of custom metrics that this environment computes in the compute_metrics() method. It enables users to discover what metrics are available for hyperparameter optimization.
- Return type:
- Returns:
List of metric names that this environment produces. Default implementation returns empty list for environments without custom metrics.
Note
Subclasses that override compute_metrics() should also override this method to return the names of metrics they produce. Use an Enum to ensure consistency between the names returned here and the names used in compute_metrics().
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
ndarray) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- class POMDPPlanners.environments.ContinuousLaserTagPOMDPDiscreteActions(discount_factor, name='ContinuousLaserTagPOMDPDiscreteActions', grid_size=(11.0, 7.0), walls=None, robot_radius=0.3, opponent_radius=0.3, tag_radius=0.5, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, robot_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), opponent_transition_cov_matrix=array([[0.05, 0.], [0., 0.05]]), pursuit_speed=0.6, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None)[source]
Bases:
ContinuousLaserTagPOMDP,DiscreteActionsEnvironmentContinuous LaserTag POMDP with discrete string actions.
Actions:
"up","down","right","left","tag".Example
>>> import numpy as np >>> np.random.seed(42) >>> >>> env = ContinuousLaserTagPOMDPDiscreteActions(discount_factor=0.95) >>> >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
name (str)
robot_radius (float)
opponent_radius (float)
tag_radius (float)
tag_reward (float)
tag_penalty (float)
step_cost (float)
measurement_noise (float)
robot_transition_cov_matrix (np.ndarray)
opponent_transition_cov_matrix (np.ndarray)
pursuit_speed (float)
dangerous_area_radius (float)
dangerous_area_penalty (float)
output_dir (Optional[Path])
debug (bool)
use_queue_logger (bool)
initial_state (Optional[np.ndarray])
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- class POMDPPlanners.environments.ContinuousLightDarkPOMDP(discount_factor, name='ContinuousLightDarkPOMDP', state_transition_cov_matrix=array([[0.05, 0.], [0., 0.05]]), observation_cov_matrix=array([[0.05, 0.], [0., 0.05]]), beacons=[(0, 0), (0, 5), (0, 10), (5, 0), (5, 5), (5, 10), (10, 0), (10, 5), (10, 10)], goal_state=array([10, 5]), start_state=array([0, 5]), obstacles=[(3, 7), (5, 5)], obstacle_hit_probability=0.2, obstacle_reward=-10.0, goal_reward=10.0, fuel_cost=2.0, grid_size=11, goal_state_radius=1.5, beacon_radius=1.0, obstacle_radius=1.5, reward_model_type=RewardModelType.STANDARD, observation_model_type=ObservationModelType.NORMAL_NOISE, penalty_decay=1.0, is_obstacle_hit_terminal=True)[source]
Bases:
BaseLightDarkPOMDPContinuous Light-Dark POMDP environment with continuous actions.
This environment extends the base Light-Dark problem to continuous 2D space with continuous action vectors. The agent navigates toward a goal while dealing with position-dependent observation noise and optional obstacles.
Key features: - Continuous 2D state and action spaces - Light beacons reduce observation noise when nearby - Multiple observation models available (normal noise, normal noise with no observation in dark) - Multiple reward models available (standard, decaying hit probability, dangerous states) - Optional obstacles with configurable hit penalties - Terminal conditions for goal reaching, obstacle hits, and boundary violations
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = ContinuousLightDarkPOMDP( ... discount_factor=0.95, ... goal_state=np.array([10, 5]), ... start_state=np.array([0, 5]) ... ) >>> >>> # Get initial state >>> initial_state = env.initial_state_dist().sample()[0] >>> >>> # Sample complete step (action must be provided based on environment type) >>> action = np.array([1.0, 0.0]) # Move right >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
name (str)
state_transition_cov_matrix (ndarray)
observation_cov_matrix (ndarray)
goal_state (ndarray)
start_state (ndarray)
obstacle_hit_probability (float)
obstacle_reward (float)
goal_reward (float)
fuel_cost (float)
grid_size (int)
goal_state_radius (float)
beacon_radius (float)
obstacle_radius (float)
reward_model_type (RewardModelType)
observation_model_type (ObservationModelType)
penalty_decay (float)
is_obstacle_hit_terminal (bool)
- compute_metrics(histories)[source]
Compute environment-specific metrics from episode histories.
This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
ndarray) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- class POMDPPlanners.environments.ContinuousLightDarkPOMDPDiscreteActions(discount_factor, state_transition_cov_matrix=array([[1., 0.], [0., 1.]]), observation_cov_matrix=array([[1., 0.], [0., 1.]]), obstacle_hit_probability=0.2, obstacle_reward=-10.0, goal_reward=10.0, fuel_cost=2.0, grid_size=11, goal_state_radius=1.5, beacon_radius=1.0, obstacle_radius=1.5, name='ContinuousLightDarkPOMDPDiscreteActions', beacons=[(0, 0), (0, 5), (0, 10), (5, 0), (5, 5), (5, 10), (10, 0), (10, 5), (10, 10)], goal_state=array([10, 5]), start_state=array([0, 5]), obstacles=[(3, 7), (5, 5)], reward_model_type=RewardModelType.STANDARD, observation_model_type=ObservationModelType.NORMAL_NOISE, penalty_decay=1.0, is_obstacle_hit_terminal=True)[source]
Bases:
ContinuousLightDarkPOMDP,DiscreteActionsEnvironmentContinuous Light-Dark POMDP environment with discrete actions.
This variant of the Continuous Light-Dark POMDP uses discrete directional actions (up, down, left, right) instead of continuous action vectors. The continuous state space and observation model are preserved.
Actions are mapped to unit vectors: - “up”: [0, 1] - “down”: [0, -1] - “right”: [1, 0] - “left”: [-1, 0]
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = ContinuousLightDarkPOMDPDiscreteActions( ... discount_factor=0.95, ... goal_state=np.array([10, 5]), ... start_state=np.array([0, 5]) ... ) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
state_transition_cov_matrix (ndarray)
observation_cov_matrix (ndarray)
obstacle_hit_probability (float)
obstacle_reward (float)
goal_reward (float)
fuel_cost (float)
grid_size (int)
goal_state_radius (float)
beacon_radius (float)
obstacle_radius (float)
name (str)
goal_state (ndarray)
start_state (ndarray)
reward_model_type (RewardModelType)
observation_model_type (ObservationModelType)
penalty_decay (float)
is_obstacle_hit_terminal (bool)
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- class POMDPPlanners.environments.DiscreteLightDarkPOMDP(discount_factor, name='DiscreteLightDarkPOMDP', transition_error_prob=0.05, observation_error_prob=0.05, beacons=[(0, 0), (0, 5), (0, 10), (5, 0), (5, 5), (5, 10), (10, 0), (10, 5), (10, 10)], goal_state=array([10, 5]), start_state=array([0, 5]), obstacles=[(3, 7), (5, 5)], obstacle_hit_probability=0.2, obstacle_reward=-10.0, goal_reward=10.0, beacon_radius=1.0, fuel_cost=2.0, grid_size=11, is_stochastic_reward=True, observation_model_type=ObservationModelType.NORMAL)[source]
Bases:
BaseLightDarkPOMDPDiscreteActions,DiscreteActionsEnvironmentDiscrete Light-Dark POMDP Environment for Robot Navigation with Observation Uncertainty.
This environment implements a discretized version of the classic Light-Dark POMDP problem, where a robot must navigate from a start position to a goal position in a grid world with beacons and obstacles. The key challenge is that the robot’s observation quality depends on its distance from beacons - closer to beacons means more accurate observations.
Problem Description: The robot operates in a discrete grid world where it can move in four cardinal directions. The environment includes: - Beacons: Fixed positions that provide location reference with varying accuracy - Obstacles: Grid cells that incur penalties when hit - Goal: Target position that provides high reward when reached - Observation uncertainty: Decreases with proximity to beacons (light areas)
Key Features: - Discrete state space: Robot positions are restricted to grid cells - Discrete action space: North, South, East, West movements - Multiple observation models available (normal, no observation in dark) - Distance-dependent observation accuracy: Closer to beacons = better observations - Stochastic transitions: Actions may fail with configurable probability - Obstacle avoidance: Penalties for hitting obstacles during navigation - Configurable environment parameters: Grid size, beacon positions, obstacles
State Space: - 2D grid coordinates (x, y) representing robot position - Bounded by grid_size parameter (default: 11x11 grid)
Action Space: - Discrete actions: [‘North’, ‘South’, ‘East’, ‘West’] - Each action moves robot one grid cell in the corresponding direction - Boundary conditions: Actions that would move outside grid are blocked
Observation Space: - Discrete observations based on beacon proximity and noise - Observation accuracy improves with proximity to beacons - Stochastic observation errors controlled by observation_error_prob
Reward Structure: - Goal reward: Large positive reward for reaching the goal state - Obstacle penalty: Negative reward for hitting obstacles - Fuel cost: Small negative reward for each movement action - Distance-based penalties: Encourage efficient navigation
- Parameters:
discount_factor (float)
name (str)
transition_error_prob (float)
observation_error_prob (float)
goal_state (ndarray)
start_state (ndarray)
obstacle_hit_probability (float)
obstacle_reward (float)
goal_reward (float)
beacon_radius (float)
fuel_cost (float)
grid_size (int)
is_stochastic_reward (bool)
observation_model_type (ObservationModelType)
- transition_error_prob
Probability that an action fails (results in different movement)
- observation_error_prob
Probability of observation noise/error
- is_stochastic_reward
Whether rewards include stochastic components
- beacons
List of (x, y) beacon positions that provide navigation references
- goal_state
Target position (x, y) that robot should reach
- start_state
Initial robot position (x, y)
- obstacles
List of (x, y) obstacle positions to avoid
- grid_size
Dimension of the square grid world
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = DiscreteLightDarkPOMDP( ... discount_factor=0.95, ... transition_error_prob=0.1, ... observation_error_prob=0.15, ... beacons=[(1, 1), (2, 2)], ... grid_size=11 ... ) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
References: - Platt, R., et al. “Belief space planning assuming maximum likelihood observations.” (2010) - Kurniawati, H., et al. “SARSOP: Efficient point-based POMDP planning by approximating optimally reachable belief spaces.” (2008) - Light-Dark domain: Classic POMDP benchmark for testing observation uncertainty
- compute_metrics(histories)[source]
Compute environment-specific metrics from episode histories.
This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
ndarray) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- sample_next_step(state, action)[source]
Sample a complete state transition step.
This convenience method combines state transition, observation generation, and reward calculation in a single operation.
- class POMDPPlanners.environments.LaserTagPOMDP(discount_factor, name='LaserTagPOMDP', floor_shape=(11, 7), walls={(1, 2), (3, 0), (3, 4), (5, 0), (6, 4), (9, 1), (9, 4), (10, 6)}, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, dangerous_areas={(2, 5), (5, 3), (7, 1)}, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None, transition_error_prob=0.0)[source]
Bases:
DiscreteActionsEnvironmentLaserTag POMDP environment implementation.
This is a pursuit-evasion problem where a robot must navigate a grid to tag an opponent. The robot receives noisy observations of the opponent’s position and must decide when and where to attempt tagging.
Problem Structure: - States: numpy array [robot_row, robot_col, opp_row, opp_col, terminal] - Actions: North(0), South(1), East(2), West(3), Tag(4) - Observations: 8-directional laser measurements (N,NE,E,SE,S,SW,W,NW) - Rewards: Tag success(+10), Tag failure(-10), Movement(-1)
- Parameters:
discount_factor (float)
name (str)
tag_reward (float)
tag_penalty (float)
step_cost (float)
measurement_noise (float)
dangerous_area_radius (float)
dangerous_area_penalty (float)
output_dir (Path | None)
debug (bool)
use_queue_logger (bool)
initial_state (ndarray | None)
transition_error_prob (float)
- floor_shape
Grid dimensions as (rows, cols)
- walls
Set of wall positions as (row, col) tuples
- tag_reward
Reward for successful tagging
- tag_penalty
Penalty for unsuccessful tagging
- step_cost
Cost per movement action
- measurement_noise
Standard deviation of observation noise
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = LaserTagPOMDP(discount_factor=0.95) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- cache_visualization(history, cache_path)[source]
Cache visualization of the LaserTag episode as an animated GIF.
Creates an animated visualization showing: - Robot movement (red circle) - Opponent movement (blue circle) - Walls (black squares) - Dangerous areas (red circles) - Action arrows showing robot’s intended movement - Laser measurements (green rays from robot position) - Belief particles (if available) showing robot’s belief about opponent location - Grid boundaries and coordinate system
- Parameters:
- Raises:
ValueError – If history is empty or contains invalid data
TypeError – If cache_path is not a Path object or doesn’t end with .gif
- Return type:
- compute_metrics(histories)[source]
Compute LaserTag POMDP specific metrics from simulation histories.
- Return type:
- Parameters:
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
Observations are 8-dimensional laser measurements or terminal observations.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Return type:
- Parameters:
- class POMDPPlanners.environments.MountainCarPOMDP(discount_factor, state_transition_cov=None, name='MountainCarPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
DiscreteActionsEnvironmentMountain Car problem formulated as a POMDP.
This environment simulates an underpowered car trying to reach the top of a steep mountain. The car must build momentum by oscillating back and forth to gain enough energy to reach the goal, with noisy observations of its state.
Problem Structure: - State: [position, velocity] (continuous, position ∈ [-1.2, 0.6], velocity ∈ [-0.07, 0.07]) - Actions: [-1 (reverse), 0 (neutral), 1 (forward)] (discrete) - Observations: Noisy state measurements (continuous) - Rewards: 0 for reaching goal (position ≥ 0.5), -1 per time step otherwise - Goal: Drive car to position ≥ 0.5 (top of mountain)
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = MountainCarPOMDP(discount_factor=0.99) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- Parameters:
- DEFAULT_STATE_TRANSITION_COV = array([[2.5e-05, 0.0e+00], [0.0e+00, 1.0e-06]])
- cache_visualization(history, cache_path)[source]
Cache visualization data for an episode history.
This method can be overridden by subclasses to provide environment-specific visualization caching capabilities.
- compute_metrics(histories)[source]
Compute Mountain Car POMDP specific metrics from simulation histories.
- Parameters:
- Return type:
- Returns:
List of MetricValue objects containing the computed metrics
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
Tuple[float,float]) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- class POMDPPlanners.environments.PacManPOMDP(maze_size=(7, 7), walls=None, initial_pellets=None, initial_pacman_pos=(0, 0), num_ghosts=1, initial_ghost_positions=None, initial_ghost_pos=None, pellet_reward=10.0, ghost_collision_penalty=-100.0, step_penalty=-1.0, win_reward=100.0, ghost_aggressiveness=2.0, ghost_coordination='independent', ghost_strategies=None, observation_noise_factor=0.3, max_observation_noise=1.5, discount_factor=0.95, name='PacManPOMDP', output_dir=None, debug=False)[source]
Bases:
DiscreteActionsEnvironmentPacMan POMDP environment inspired by the classic arcade game.
This environment implements a simplified PacMan game where PacMan must collect pellets while avoiding a single ghost. The ghost position is only partially observable through noisy sensor readings.
- Parameters:
num_ghosts (int)
pellet_reward (float)
ghost_collision_penalty (float)
step_penalty (float)
win_reward (float)
ghost_aggressiveness (float)
ghost_coordination (str)
observation_noise_factor (float)
max_observation_noise (float)
discount_factor (float)
name (str)
output_dir (Path | None)
debug (bool)
- maze_size
Grid dimensions as (rows, cols)
- walls
Set of wall positions as (row, col) tuples
- initial_pellets
List of initial pellet positions
- pellet_reward
Reward for collecting a pellet
- ghost_collision_penalty
Penalty for collision with ghost
- step_penalty
Cost per action
- win_reward
Reward for collecting all pellets
- ghost_aggressiveness
Temperature parameter for ghost movement policy
- observation_noise_factor
Multiplier for observation noise based on distance
- max_observation_noise
Maximum noise standard deviation
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = PacManPOMDP(maze_size=(7, 7)) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- array_to_state(arr)[source]
Convert a numpy array back to a PacManState.
- Parameters:
arr (
ndarray) – 1-D array of shape(self._state_dim,)produced bystate_to_array().- Return type:
- Returns:
Reconstructed PacManState.
- get_metric_names()[source]
Get names of PacMan POMDP specific metrics.
- Return type:
- Returns:
List containing metric names including standard metrics (win_rate, avg_pellets_collected, avg_episode_length, avg_pacman_closest_ghost_distance, avg_collision_encounters) and dynamically generated per-ghost distance metrics for multi-ghost scenarios (avg_pacman_ghost_0_distance, avg_pacman_ghost_1_distance, etc.)
- property initial_ghost_pos: Tuple[int, int]
returns first ghost position.
- Type:
Backward compatibility
- reward_batch(states, action)[source]
Calculate rewards for a batch of states.
Accepts either a 2-D numpy array of shape
(N, state_dim)(vectorized path) or a sequence of PacManState objects (falls back to the loop-based default).Computes deterministic reward components only: step penalty, pellet collection, and win bonus. Ghost collision penalty is excluded because it depends on stochastic ghost movement.
- state_to_array(state)[source]
Convert a PacManState to a fixed-size numpy array.
The array layout is:
[pac_row, pac_col, g0_row, g0_col, ..., pellet_mask[0..P-1], score, terminal]- Parameters:
state (
PacManState) – A PacManState instance.- Return type:
- Returns:
1-D float array of shape
(self._state_dim,).
- states_to_array(states)[source]
Batch-convert a list of PacManState to a 2-D numpy array.
- Parameters:
states (
List[PacManState]) – List of PacManState instances.- Return type:
- Returns:
Array of shape
(len(states), self._state_dim).
- visualize_path(path, actions, cache_path)[source]
Visualize PacMan path through the maze using sprite-based rendering.
- Parameters:
path (
List[PacManState]) – List of states representing the path through the mazecache_path (
Path) – Path where the GIF should be saved
- class POMDPPlanners.environments.PushPOMDP(discount_factor, grid_size=10, push_threshold=1.0, friction_coefficient=0.3, observation_noise=0.1, obstacles=None, obstacle_radius=0.5, obstacle_penalty=-10.0, initial_state=None, transition_error_prob=0.0, name='PushPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
DiscreteActionsEnvironmentRobotic push task formulated as a POMDP.
This environment simulates a robot that must push an object to a target location on a 2D grid. The robot can move in four directions and pushes objects when close enough, with partial observability through noisy object position measurements.
Problem Structure: - State: [robot_x, robot_y, object_x, object_y, target_x, target_y] (continuous) - Actions: [“up”, “down”, “left”, “right”] (discrete) - Observations: [robot_x, robot_y, noisy_object_x, noisy_object_y, target_x, target_y] - Rewards: -distance_to_target + 100 (when object reaches target) - Termination: Object within 0.5 units of target position
Key Features: - Physics-based pushing with configurable friction - Distance-based pushing threshold - Noisy observations of object position only - Dense reward signal based on object-target distance - Obstacle collision detection with configurable penalties - Obstacles prevent robot and object movement through them
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = PushPOMDP(discount_factor=0.99) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
grid_size (int)
push_threshold (float)
friction_coefficient (float)
observation_noise (float)
obstacle_radius (float)
obstacle_penalty (float)
initial_state (ndarray | None)
transition_error_prob (float)
name (str)
output_dir (Path | None)
debug (bool)
use_queue_logger (bool)
- cache_visualization(history, cache_path)[source]
Cache animated visualization of the push episode.
Creates an animated GIF showing the robot pushing the object toward the target, with obstacles, collision detection, distance indicators, and success feedback.
- Parameters:
- Raises:
ValueError – If history is empty or cache_path doesn’t end with .gif
TypeError – If cache_path is not a Path object
- Return type:
- compute_metrics(histories)[source]
Compute environment-specific metrics from episode histories.
This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
ndarray) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- sample_next_step(state, action)[source]
Sample a complete state transition step.
This convenience method combines state transition, observation generation, and reward calculation in a single operation.
- class POMDPPlanners.environments.RockSamplePOMDP(map_size=(5, 5), rock_positions=None, init_pos=(0, 0), sensor_efficiency=10.0, bad_rock_penalty=-10.0, good_rock_reward=10.0, step_penalty=0.0, sensor_use_penalty=0.0, exit_reward=10.0, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, discount_factor=0.95, name='RockSample', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
DiscreteActionsEnvironmentRockSample POMDP environment
This environment implements the classic rock sampling problem where a robot must navigate a grid, use sensors to evaluate rocks, and decide which ones to sample while balancing exploration costs and sampling rewards.
- Parameters:
sensor_efficiency (float)
bad_rock_penalty (float)
good_rock_reward (float)
step_penalty (float)
sensor_use_penalty (float)
exit_reward (float)
dangerous_area_radius (float)
dangerous_area_penalty (float)
discount_factor (float)
name (str)
output_dir (Path | None)
debug (bool)
use_queue_logger (bool)
- map_size
Grid dimensions as (rows, cols)
- rock_positions
List of rock positions as (row, col) tuples
- init_pos
Initial robot position
- sensor_efficiency
Sensor noise parameter (higher = less noise)
- bad_rock_penalty
Penalty for sampling a bad rock
- good_rock_reward
Reward for sampling a good rock
- step_penalty
Cost for each action
- sensor_use_penalty
Additional cost for using sensor
- exit_reward
Reward for reaching the exit
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = RockSamplePOMDP(map_size=(5, 5), rock_positions=[(0, 0), (2, 2), (3, 3)]) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- class POMDPPlanners.environments.SafeAntVelocityPOMDP(discount_factor, safe_velocity_threshold=2.0, max_force=1.0, dt=0.1, mass=1.0, damping=0.1, position_noise=0.1, velocity_noise=0.2, safety_violation_penalty=-100.0, movement_reward_scale=1.0, name='SafeVelocityPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
DiscreteActionsEnvironmentSafety-critical velocity control task formulated as a POMDP.
This environment presents a safety-critical control problem where an agent must navigate while keeping velocity below a safety threshold. The challenge comes from balancing exploration rewards with safety constraints under noisy velocity observations.
Problem Structure: - State: [position_x, position_y, velocity_x, velocity_y] (continuous) - Actions: [0=no force, 1=small, 2=medium, 3=large force] (discrete) - Observations: Noisy position and velocity measurements (continuous) - Rewards: Movement reward - safety violation penalty (if unsafe) - Safety constraint: velocity magnitude ≤ safe_velocity_threshold - Termination: Velocity exceeds 1.5x safety threshold
Safety Features: - Tracks safety and critical violation rates - Heavy penalties for constraint violations - Configurable safety thresholds and penalties - Physics simulation with uncertainty in force direction
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = SafeAntVelocityPOMDP(discount_factor=0.99) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- Parameters:
- cache_visualization(history, cache_path)[source]
Cache animated visualization of the safety ant velocity episode.
Creates an animated GIF showing the ant’s movement trajectory with velocity vectors, safety zones, force applications, and safety constraint violations.
- Parameters:
- Raises:
ValueError – If history is empty or cache_path doesn’t end with .gif
TypeError – If cache_path is not a Path object
- Return type:
- compute_metrics(histories)[source]
Compute environment-specific metrics from episode histories.
This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
ndarray) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- sample_next_step(state, action)[source]
Sample a complete state transition step.
This convenience method combines state transition, observation generation, and reward calculation in a single operation.
- class POMDPPlanners.environments.SanityPOMDP(discount_factor=0.95, output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
DiscreteActionsEnvironmentSimple sanity check POMDP environment for testing and debugging.
This environment provides the simplest possible POMDP formulation with deterministic dynamics and perfect observability. It serves as a baseline for testing POMDP algorithms and ensuring correctness.
Problem Structure: - States: 0 (good), 1 (bad) - Actions: 0 (choose good), 1 (choose bad) - Observations: Same as states (perfect observability) - Rewards: 1.0 for good state, 0.0 for bad state - Dynamics: Deterministic state transitions based on action
The optimal policy is trivial: always choose action 0 to stay in the good state.
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = SanityPOMDP(discount_factor=0.95) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
int) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- class POMDPPlanners.environments.TigerPOMDP(discount_factor, name='TigerPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
DiscreteActionsEnvironmentTiger POMDP environment implementation.
This is the classic Tiger problem where an agent must decide which door to open to find treasure while avoiding the tiger. The agent can listen for acoustic cues but receives noisy observations.
Problem Structure: - States: tiger_left, tiger_right (tiger’s location) - Actions: listen, open_left, open_right - Observations: hear_left, hear_right, hear_nothing - Rewards: listen(-1), correct_door(+10), wrong_door(-100)
- Parameters:
- states
List of possible states
- actions
List of possible actions
- observations
List of possible observations
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> tiger = TigerPOMDP(discount_factor=0.95) >>> >>> # Get initial state and actions >>> initial_state = tiger.initial_state_dist().sample()[0] >>> actions = tiger.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = tiger.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> tiger.is_terminal(initial_state) False
- compute_metrics(histories)[source]
Compute Tiger POMDP specific metrics from simulation histories.
- Parameters:
- Return type:
- Returns:
List of MetricValue objects containing the computed metrics
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
str) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
Subpackages
- POMDPPlanners.environments.cartpole_pomdp package
CartPoleInitialStateDistributionCartPoleObservationCartPolePOMDPCartPolePOMDP.DEFAULT_STATE_TRANSITION_COVCartPolePOMDP.compute_metrics()CartPolePOMDP.get_actions()CartPolePOMDP.get_metric_names()CartPolePOMDP.initial_observation_dist()CartPolePOMDP.initial_state_dist()CartPolePOMDP.is_equal_observation()CartPolePOMDP.is_terminal()CartPolePOMDP.observation_model()CartPolePOMDP.reward()CartPolePOMDP.reward_batch()CartPolePOMDP.state_transition_model()
CartPolePOMDPMetricsCartPoleStateTransitionCartPoleStateTransition.stateCartPoleStateTransition.actionCartPoleStateTransition.force_magCartPoleStateTransition.total_massCartPoleStateTransition.polemass_lengthCartPoleStateTransition.gravityCartPoleStateTransition.lengthCartPoleStateTransition.kinematics_integratorCartPoleStateTransition.tauCartPoleStateTransition.masspoleCartPoleStateTransition.probability()CartPoleStateTransition.sample()
- Submodules
- POMDPPlanners.environments.cartpole_pomdp.cartpole_pomdp module
CartPoleInitialStateDistributionCartPoleObservationCartPolePOMDPCartPolePOMDP.DEFAULT_STATE_TRANSITION_COVCartPolePOMDP.compute_metrics()CartPolePOMDP.get_actions()CartPolePOMDP.get_metric_names()CartPolePOMDP.initial_observation_dist()CartPolePOMDP.initial_state_dist()CartPolePOMDP.is_equal_observation()CartPolePOMDP.is_terminal()CartPolePOMDP.observation_model()CartPolePOMDP.reward()CartPolePOMDP.reward_batch()CartPolePOMDP.state_transition_model()
CartPolePOMDPMetricsCartPoleStateTransitionCartPoleStateTransition.stateCartPoleStateTransition.actionCartPoleStateTransition.force_magCartPoleStateTransition.total_massCartPoleStateTransition.polemass_lengthCartPoleStateTransition.gravityCartPoleStateTransition.lengthCartPoleStateTransition.kinematics_integratorCartPoleStateTransition.tauCartPoleStateTransition.masspoleCartPoleStateTransition.probability()CartPoleStateTransition.sample()
- POMDPPlanners.environments.cartpole_pomdp.cartpole_pomdp_beliefs module
CartPoleVectorizedUpdaterCartPoleVectorizedUpdater.state_transition_distCartPoleVectorizedUpdater.obs_distCartPoleVectorizedUpdater.force_magCartPoleVectorizedUpdater.gravityCartPoleVectorizedUpdater.masscartCartPoleVectorizedUpdater.masspoleCartPoleVectorizedUpdater.total_massCartPoleVectorizedUpdater.lengthCartPoleVectorizedUpdater.polemass_lengthCartPoleVectorizedUpdater.tauCartPoleVectorizedUpdater.kinematics_integratorCartPoleVectorizedUpdater.batch_observation_log_likelihood()CartPoleVectorizedUpdater.batch_transition()CartPoleVectorizedUpdater.config_idCartPoleVectorizedUpdater.from_environment()
create_cartpole_belief()
- POMDPPlanners.environments.cartpole_pomdp.cartpole_pomdp_gaussian_beliefs module
- POMDPPlanners.environments.laser_tag_pomdp package
ContinuousLaserTagPOMDPContinuousLaserTagPOMDP.cache_visualization()ContinuousLaserTagPOMDP.compute_metrics()ContinuousLaserTagPOMDP.get_metric_names()ContinuousLaserTagPOMDP.grid_sizeContinuousLaserTagPOMDP.initial_observation_dist()ContinuousLaserTagPOMDP.initial_state_dist()ContinuousLaserTagPOMDP.is_equal_observation()ContinuousLaserTagPOMDP.is_terminal()ContinuousLaserTagPOMDP.observation_model()ContinuousLaserTagPOMDP.reward()ContinuousLaserTagPOMDP.reward_batch()ContinuousLaserTagPOMDP.state_transition_model()ContinuousLaserTagPOMDP.walls
ContinuousLaserTagPOMDPDiscreteActionsLaserTagObservationLaserTagPOMDPLaserTagPOMDP.floor_shapeLaserTagPOMDP.wallsLaserTagPOMDP.tag_rewardLaserTagPOMDP.tag_penaltyLaserTagPOMDP.step_costLaserTagPOMDP.measurement_noiseLaserTagPOMDP.cache_visualization()LaserTagPOMDP.compute_metrics()LaserTagPOMDP.get_actions()LaserTagPOMDP.get_metric_names()LaserTagPOMDP.initial_observation_dist()LaserTagPOMDP.initial_state_dist()LaserTagPOMDP.is_equal_observation()LaserTagPOMDP.is_terminal()LaserTagPOMDP.observation_model()LaserTagPOMDP.reward()LaserTagPOMDP.state_transition_model()
LaserTagStateTransition- Subpackages
- POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp_beliefs package
ContinuousLaserTagVectorizedUpdaterLaserTagVectorizedUpdatercreate_continuous_laser_tag_belief()create_laser_tag_belief()- Submodules
- POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp_beliefs.continuous_laser_tag_belief_factory module
- POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp_beliefs.continuous_laser_tag_vectorized_updater module
- POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp_beliefs.laser_tag_belief_factory module
- POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp_beliefs.laser_tag_vectorized_updater module
- POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp_beliefs package
- Submodules
- POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry module
- POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_pomdp module
ContinuousLaserTagObservationModelContinuousLaserTagPOMDPContinuousLaserTagPOMDP.cache_visualization()ContinuousLaserTagPOMDP.compute_metrics()ContinuousLaserTagPOMDP.get_metric_names()ContinuousLaserTagPOMDP.grid_sizeContinuousLaserTagPOMDP.initial_observation_dist()ContinuousLaserTagPOMDP.initial_state_dist()ContinuousLaserTagPOMDP.is_equal_observation()ContinuousLaserTagPOMDP.is_terminal()ContinuousLaserTagPOMDP.observation_model()ContinuousLaserTagPOMDP.reward()ContinuousLaserTagPOMDP.reward_batch()ContinuousLaserTagPOMDP.state_transition_model()ContinuousLaserTagPOMDP.walls
ContinuousLaserTagPOMDPDiscreteActionsContinuousLaserTagPOMDPMetricsContinuousLaserTagPOMDPMetrics.AVERAGE_ALL_DANGEROUS_ENCOUNTERSContinuousLaserTagPOMDPMetrics.AVERAGE_DANGEROUS_AREA_STEPSContinuousLaserTagPOMDPMetrics.AVERAGE_EPISODE_LENGTHContinuousLaserTagPOMDPMetrics.AVERAGE_FAILED_TAG_ATTEMPTSContinuousLaserTagPOMDPMetrics.AVERAGE_WALL_COLLISIONSContinuousLaserTagPOMDPMetrics.GOAL_REACHING_RATEContinuousLaserTagPOMDPMetrics.TAG_SUCCESS_RATE
ContinuousLaserTagStateTransitionModel
- POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_visualizer module
ContinuousLaserTagVisualizerContinuousLaserTagVisualizer.grid_sizeContinuousLaserTagVisualizer.wallsContinuousLaserTagVisualizer.robot_radiusContinuousLaserTagVisualizer.opponent_radiusContinuousLaserTagVisualizer.dangerous_areasContinuousLaserTagVisualizer.dangerous_area_radiusContinuousLaserTagVisualizer.create_visualization()
- POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp module
LaserTagObservationLaserTagPOMDPLaserTagPOMDP.floor_shapeLaserTagPOMDP.wallsLaserTagPOMDP.tag_rewardLaserTagPOMDP.tag_penaltyLaserTagPOMDP.step_costLaserTagPOMDP.measurement_noiseLaserTagPOMDP.cache_visualization()LaserTagPOMDP.compute_metrics()LaserTagPOMDP.get_actions()LaserTagPOMDP.get_metric_names()LaserTagPOMDP.initial_observation_dist()LaserTagPOMDP.initial_state_dist()LaserTagPOMDP.is_equal_observation()LaserTagPOMDP.is_terminal()LaserTagPOMDP.observation_model()LaserTagPOMDP.reward()LaserTagPOMDP.state_transition_model()
LaserTagPOMDPMetricsLaserTagPOMDPMetrics.AVERAGE_ALL_DANGEROUS_ENCOUNTERSLaserTagPOMDPMetrics.AVERAGE_DANGEROUS_AREA_STEPSLaserTagPOMDPMetrics.AVERAGE_EPISODE_LENGTHLaserTagPOMDPMetrics.AVERAGE_FAILED_TAG_ATTEMPTSLaserTagPOMDPMetrics.AVERAGE_OBSTACLE_COLLISIONSLaserTagPOMDPMetrics.GOAL_REACHING_RATELaserTagPOMDPMetrics.TAG_SUCCESS_RATE
LaserTagStateTransitionLaserTagStateTransition.stateLaserTagStateTransition.actionLaserTagStateTransition.floor_shapeLaserTagStateTransition.wallsLaserTagStateTransition.action_directionsLaserTagStateTransition.floor_shapeLaserTagStateTransition.probability()LaserTagStateTransition.sample()LaserTagStateTransition.walls
- POMDPPlanners.environments.laser_tag_pomdp.laser_tag_visualizer module
- POMDPPlanners.environments.light_dark_pomdp package
- Subpackages
- POMDPPlanners.environments.light_dark_pomdp.light_dark_pomdp_beliefs package
ContinuousLightDarkDistanceBasedVectorizedUpdaterContinuousLightDarkNoObsInDarkVectorizedUpdaterContinuousLightDarkVectorizedUpdaterDiscreteLightDarkDistanceBasedVectorizedUpdaterDiscreteLightDarkNoObsInDarkVectorizedUpdaterDiscreteLightDarkVectorizedUpdaterGaussianBeliefUpdaterTypecreate_continuous_light_dark_belief()create_continuous_light_dark_gaussian_belief()create_discrete_light_dark_belief()- Submodules
- POMDPPlanners.environments.light_dark_pomdp.light_dark_pomdp_beliefs.continuous_light_dark_belief_factory module
- POMDPPlanners.environments.light_dark_pomdp.light_dark_pomdp_beliefs.continuous_light_dark_gaussian_beliefs module
- POMDPPlanners.environments.light_dark_pomdp.light_dark_pomdp_beliefs.continuous_light_dark_vectorized_updater module
- POMDPPlanners.environments.light_dark_pomdp.light_dark_pomdp_beliefs.discrete_light_dark_belief_factory module
- POMDPPlanners.environments.light_dark_pomdp.light_dark_pomdp_beliefs.discrete_light_dark_vectorized_updater module
- POMDPPlanners.environments.light_dark_pomdp.light_dark_pomdp_utils package
- Submodules
- POMDPPlanners.environments.light_dark_pomdp.light_dark_pomdp_utils.base_light_dark_pomdp module
- POMDPPlanners.environments.light_dark_pomdp.light_dark_pomdp_utils.light_dark_observation_models module
- POMDPPlanners.environments.light_dark_pomdp.light_dark_pomdp_utils.light_dark_reward_models module
- POMDPPlanners.environments.light_dark_pomdp.light_dark_pomdp_utils.light_dark_visualizer module
- POMDPPlanners.environments.light_dark_pomdp.light_dark_pomdp_beliefs package
- Submodules
- POMDPPlanners.environments.light_dark_pomdp.continuous_light_dark_pomdp module
- POMDPPlanners.environments.light_dark_pomdp.discrete_light_dark_pomdp module
DiscreteLightDarkPOMDPDiscreteLightDarkPOMDP.transition_error_probDiscreteLightDarkPOMDP.observation_error_probDiscreteLightDarkPOMDP.is_stochastic_rewardDiscreteLightDarkPOMDP.beaconsDiscreteLightDarkPOMDP.goal_stateDiscreteLightDarkPOMDP.start_stateDiscreteLightDarkPOMDP.obstaclesDiscreteLightDarkPOMDP.grid_sizeDiscreteLightDarkPOMDP.compute_metrics()DiscreteLightDarkPOMDP.get_metric_names()DiscreteLightDarkPOMDP.is_terminal()DiscreteLightDarkPOMDP.observation_model()DiscreteLightDarkPOMDP.reward()DiscreteLightDarkPOMDP.reward_batch()DiscreteLightDarkPOMDP.sample_next_step()DiscreteLightDarkPOMDP.state_transition_model()
DiscreteLightDarkPOMDPMetricsObservationModelType
- Subpackages
- POMDPPlanners.environments.mountain_car_pomdp package
MountainCarObservationMountainCarPOMDPMountainCarPOMDP.DEFAULT_STATE_TRANSITION_COVMountainCarPOMDP.cache_visualization()MountainCarPOMDP.compute_metrics()MountainCarPOMDP.get_actions()MountainCarPOMDP.get_metric_names()MountainCarPOMDP.initial_observation_dist()MountainCarPOMDP.initial_state_dist()MountainCarPOMDP.is_equal_observation()MountainCarPOMDP.is_terminal()MountainCarPOMDP.observation_model()MountainCarPOMDP.reward()MountainCarPOMDP.reward_batch()MountainCarPOMDP.state_transition_model()
MountainCarPOMDPMetricsMountainCarTransition- Submodules
- POMDPPlanners.environments.mountain_car_pomdp.mountain_car_pomdp module
MountainCarObservationMountainCarPOMDPMountainCarPOMDP.DEFAULT_STATE_TRANSITION_COVMountainCarPOMDP.cache_visualization()MountainCarPOMDP.compute_metrics()MountainCarPOMDP.get_actions()MountainCarPOMDP.get_metric_names()MountainCarPOMDP.initial_observation_dist()MountainCarPOMDP.initial_state_dist()MountainCarPOMDP.is_equal_observation()MountainCarPOMDP.is_terminal()MountainCarPOMDP.observation_model()MountainCarPOMDP.reward()MountainCarPOMDP.reward_batch()MountainCarPOMDP.state_transition_model()
MountainCarPOMDPMetricsMountainCarTransition
- POMDPPlanners.environments.mountain_car_pomdp.mountain_car_pomdp_beliefs module
MountainCarVectorizedUpdaterMountainCarVectorizedUpdater.state_transition_distMountainCarVectorizedUpdater.obs_distMountainCarVectorizedUpdater.powerMountainCarVectorizedUpdater.gravityMountainCarVectorizedUpdater.max_speedMountainCarVectorizedUpdater.min_positionMountainCarVectorizedUpdater.max_positionMountainCarVectorizedUpdater.batch_observation_log_likelihood()MountainCarVectorizedUpdater.batch_transition()MountainCarVectorizedUpdater.config_idMountainCarVectorizedUpdater.from_environment()
create_mountain_car_belief()
- POMDPPlanners.environments.mountain_car_pomdp.mountain_car_pomdp_gaussian_beliefs module
- POMDPPlanners.environments.pacman_pomdp package
PacManObservationModelPacManPOMDPPacManPOMDP.maze_sizePacManPOMDP.wallsPacManPOMDP.initial_pelletsPacManPOMDP.pellet_rewardPacManPOMDP.ghost_collision_penaltyPacManPOMDP.step_penaltyPacManPOMDP.win_rewardPacManPOMDP.ghost_aggressivenessPacManPOMDP.observation_noise_factorPacManPOMDP.max_observation_noisePacManPOMDP.array_to_observation()PacManPOMDP.array_to_state()PacManPOMDP.cache_visualization()PacManPOMDP.compute_metrics()PacManPOMDP.get_actions()PacManPOMDP.get_metric_names()PacManPOMDP.initial_ghost_posPacManPOMDP.initial_observation_dist()PacManPOMDP.initial_state_dist()PacManPOMDP.is_equal_observation()PacManPOMDP.is_terminal()PacManPOMDP.observation_model()PacManPOMDP.observation_to_array()PacManPOMDP.reward()PacManPOMDP.reward_batch()PacManPOMDP.state_to_array()PacManPOMDP.state_transition_model()PacManPOMDP.states_to_array()PacManPOMDP.visualize_path()
PacManStatePacManStateTransitionModelPacManVectorizedUpdaterPacManVectorizedUpdater.maze_sizePacManVectorizedUpdater.num_ghostsPacManVectorizedUpdater.num_pelletsPacManVectorizedUpdater.state_dimPacManVectorizedUpdater.ghost_aggressivenessPacManVectorizedUpdater.ghost_coordinationPacManVectorizedUpdater.ghost_strategiesPacManVectorizedUpdater.observation_noise_factorPacManVectorizedUpdater.max_observation_noisePacManVectorizedUpdater.batch_observation_log_likelihood()PacManVectorizedUpdater.batch_transition()PacManVectorizedUpdater.config_idPacManVectorizedUpdater.from_environment()
create_pacman_belief()create_simple_maze_pacman()- Subpackages
- POMDPPlanners.environments.pacman_pomdp.pacman_pomdp_beliefs package
PacManVectorizedUpdatercreate_pacman_belief()- Submodules
- POMDPPlanners.environments.pacman_pomdp.pacman_pomdp_beliefs.pacman_belief_factory module
- POMDPPlanners.environments.pacman_pomdp.pacman_pomdp_beliefs.pacman_grid_utils module
- POMDPPlanners.environments.pacman_pomdp.pacman_pomdp_beliefs.pacman_vectorized_updater module
- POMDPPlanners.environments.pacman_pomdp.pacman_pomdp_beliefs package
- Submodules
- POMDPPlanners.environments.pacman_pomdp.pacman_pomdp module
PacManObservationModelPacManPOMDPPacManPOMDP.maze_sizePacManPOMDP.wallsPacManPOMDP.initial_pelletsPacManPOMDP.pellet_rewardPacManPOMDP.ghost_collision_penaltyPacManPOMDP.step_penaltyPacManPOMDP.win_rewardPacManPOMDP.ghost_aggressivenessPacManPOMDP.observation_noise_factorPacManPOMDP.max_observation_noisePacManPOMDP.array_to_observation()PacManPOMDP.array_to_state()PacManPOMDP.cache_visualization()PacManPOMDP.compute_metrics()PacManPOMDP.get_actions()PacManPOMDP.get_metric_names()PacManPOMDP.initial_ghost_posPacManPOMDP.initial_observation_dist()PacManPOMDP.initial_pelletsPacManPOMDP.initial_state_dist()PacManPOMDP.is_equal_observation()PacManPOMDP.is_terminal()PacManPOMDP.observation_model()PacManPOMDP.observation_to_array()PacManPOMDP.reward()PacManPOMDP.reward_batch()PacManPOMDP.state_to_array()PacManPOMDP.state_transition_model()PacManPOMDP.states_to_array()PacManPOMDP.visualize_path()
PacManPOMDPMetricsPacManStatePacManStateTransitionModelcreate_simple_maze_pacman()
- POMDPPlanners.environments.pacman_pomdp.pacman_visualizer module
- POMDPPlanners.environments.push_pomdp package
ContinuousPushObservationModelContinuousPushPOMDPContinuousPushPOMDP.cache_visualization()ContinuousPushPOMDP.compute_metrics()ContinuousPushPOMDP.get_metric_names()ContinuousPushPOMDP.initial_observation_dist()ContinuousPushPOMDP.initial_state_dist()ContinuousPushPOMDP.is_equal_observation()ContinuousPushPOMDP.is_terminal()ContinuousPushPOMDP.observation_model()ContinuousPushPOMDP.reward()ContinuousPushPOMDP.reward_batch()ContinuousPushPOMDP.state_transition_model()
ContinuousPushPOMDPDiscreteActionsContinuousPushPOMDPVisualizerContinuousPushStateTransitionModelContinuousPushStateTransitionModel.stateContinuousPushStateTransitionModel.actionContinuousPushStateTransitionModel.grid_sizeContinuousPushStateTransitionModel.push_thresholdContinuousPushStateTransitionModel.friction_coefficientContinuousPushStateTransitionModel.max_pushContinuousPushStateTransitionModel.obstaclesContinuousPushStateTransitionModel.robot_radiusContinuousPushStateTransitionModel.probability()ContinuousPushStateTransitionModel.sample()
PushObservationPushPOMDPPushPOMDP.cache_visualization()PushPOMDP.compute_metrics()PushPOMDP.get_actions()PushPOMDP.get_metric_names()PushPOMDP.initial_observation_dist()PushPOMDP.initial_state_dist()PushPOMDP.is_equal_observation()PushPOMDP.is_terminal()PushPOMDP.observation_model()PushPOMDP.reward()PushPOMDP.sample_next_step()PushPOMDP.state_transition_model()
PushPOMDPVisualizerPushStateTransitionPushStateTransition.statePushStateTransition.actionPushStateTransition.grid_sizePushStateTransition.push_thresholdPushStateTransition.friction_coefficientPushStateTransition.robot_posPushStateTransition.object_posPushStateTransition.target_posPushStateTransition.probability()PushStateTransition.sample()
- Subpackages
- POMDPPlanners.environments.push_pomdp.push_pomdp_beliefs package
ContinuousPushVectorizedUpdaterPushVectorizedUpdatercreate_continuous_push_belief()create_push_belief()- Submodules
- POMDPPlanners.environments.push_pomdp.push_pomdp_beliefs.continuous_push_belief_factory module
- POMDPPlanners.environments.push_pomdp.push_pomdp_beliefs.continuous_push_vectorized_updater module
- POMDPPlanners.environments.push_pomdp.push_pomdp_beliefs.push_belief_factory module
- POMDPPlanners.environments.push_pomdp.push_pomdp_beliefs.push_vectorized_updater module
- POMDPPlanners.environments.push_pomdp.push_pomdp_beliefs package
- Submodules
- POMDPPlanners.environments.push_pomdp.continuous_push_geometry module
- POMDPPlanners.environments.push_pomdp.continuous_push_pomdp module
ContinuousPushObservationModelContinuousPushPOMDPContinuousPushPOMDP.cache_visualization()ContinuousPushPOMDP.compute_metrics()ContinuousPushPOMDP.get_metric_names()ContinuousPushPOMDP.initial_observation_dist()ContinuousPushPOMDP.initial_state_dist()ContinuousPushPOMDP.is_equal_observation()ContinuousPushPOMDP.is_terminal()ContinuousPushPOMDP.observation_model()ContinuousPushPOMDP.reward()ContinuousPushPOMDP.reward_batch()ContinuousPushPOMDP.state_transition_model()
ContinuousPushPOMDPDiscreteActionsContinuousPushPOMDPMetricsContinuousPushPOMDPMetrics.GOAL_REACHING_RATEContinuousPushPOMDPMetrics.OBJECT_OBSTACLE_COLLISION_RATEContinuousPushPOMDPMetrics.ROBOT_OBSTACLE_COLLISION_RATEContinuousPushPOMDPMetrics.TOTAL_ALL_OBSTACLE_COLLISIONSContinuousPushPOMDPMetrics.TOTAL_OBJECT_OBSTACLE_COLLISIONSContinuousPushPOMDPMetrics.TOTAL_OBSTACLE_COLLISION_RATEContinuousPushPOMDPMetrics.TOTAL_ROBOT_OBSTACLE_COLLISIONS
ContinuousPushStateTransitionModelContinuousPushStateTransitionModel.stateContinuousPushStateTransitionModel.actionContinuousPushStateTransitionModel.grid_sizeContinuousPushStateTransitionModel.push_thresholdContinuousPushStateTransitionModel.friction_coefficientContinuousPushStateTransitionModel.max_pushContinuousPushStateTransitionModel.obstaclesContinuousPushStateTransitionModel.robot_radiusContinuousPushStateTransitionModel.probability()ContinuousPushStateTransitionModel.sample()
- POMDPPlanners.environments.push_pomdp.continuous_push_pomdp_visualizer module
- POMDPPlanners.environments.push_pomdp.push_pomdp module
FixedStateDistributionPushObservationPushPOMDPPushPOMDP.cache_visualization()PushPOMDP.compute_metrics()PushPOMDP.get_actions()PushPOMDP.get_metric_names()PushPOMDP.initial_observation_dist()PushPOMDP.initial_state_dist()PushPOMDP.is_equal_observation()PushPOMDP.is_terminal()PushPOMDP.observation_model()PushPOMDP.reward()PushPOMDP.sample_next_step()PushPOMDP.state_transition_model()
PushPOMDPMetricsPushPOMDPMetrics.GOAL_REACHING_RATEPushPOMDPMetrics.OBJECT_OBSTACLE_COLLISION_RATEPushPOMDPMetrics.ROBOT_OBSTACLE_COLLISION_RATEPushPOMDPMetrics.TOTAL_ALL_OBSTACLE_COLLISIONSPushPOMDPMetrics.TOTAL_OBJECT_OBSTACLE_COLLISIONSPushPOMDPMetrics.TOTAL_OBSTACLE_COLLISION_RATEPushPOMDPMetrics.TOTAL_ROBOT_OBSTACLE_COLLISIONS
PushStateTransitionPushStateTransition.statePushStateTransition.actionPushStateTransition.grid_sizePushStateTransition.push_thresholdPushStateTransition.friction_coefficientPushStateTransition.robot_posPushStateTransition.object_posPushStateTransition.target_posPushStateTransition.probability()PushStateTransition.sample()
RandomInitialStateDistribution
- POMDPPlanners.environments.push_pomdp.push_pomdp_visualizer module
- POMDPPlanners.environments.rock_sample_pomdp package
RockSampleObservationModelRockSamplePOMDPRockSamplePOMDP.map_sizeRockSamplePOMDP.rock_positionsRockSamplePOMDP.init_posRockSamplePOMDP.sensor_efficiencyRockSamplePOMDP.bad_rock_penaltyRockSamplePOMDP.good_rock_rewardRockSamplePOMDP.step_penaltyRockSamplePOMDP.sensor_use_penaltyRockSamplePOMDP.exit_rewardRockSamplePOMDP.cache_visualization()RockSamplePOMDP.compute_metrics()RockSamplePOMDP.get_actions()RockSamplePOMDP.get_metric_names()RockSamplePOMDP.initial_observation_dist()RockSamplePOMDP.initial_state_dist()RockSamplePOMDP.is_equal_observation()RockSamplePOMDP.is_terminal()RockSamplePOMDP.observation_model()RockSamplePOMDP.reward()RockSamplePOMDP.sample_next_step()RockSamplePOMDP.state_transition_model()RockSamplePOMDP.visualize_path()
RockSampleStateRockSampleStateTransitionModelRockSampleVectorizedUpdaterRockSampleVectorizedUpdater.map_rowsRockSampleVectorizedUpdater.map_colsRockSampleVectorizedUpdater.num_rocksRockSampleVectorizedUpdater.rock_positionsRockSampleVectorizedUpdater.sensor_efficiencyRockSampleVectorizedUpdater.batch_observation_log_likelihood()RockSampleVectorizedUpdater.batch_transition()RockSampleVectorizedUpdater.config_idRockSampleVectorizedUpdater.from_environment()
RockSampleVisualizerRockSampleVisualizer.envRockSampleVisualizer.map_sizeRockSampleVisualizer.rock_positionsRockSampleVisualizer.action_namesRockSampleVisualizer.action_to_vectorRockSampleVisualizer.dangerous_areasRockSampleVisualizer.dangerous_area_radiusRockSampleVisualizer.create_visualization()RockSampleVisualizer.visualize_path()
create_random_rock_sample()create_rock_sample_state()create_rocksample_belief()get_robot_pos()get_rocks()states_equal()- Subpackages
- Submodules
- POMDPPlanners.environments.rock_sample_pomdp.rock_sample_pomdp module
RockSampleObservationModelRockSamplePOMDPRockSamplePOMDP.map_sizeRockSamplePOMDP.rock_positionsRockSamplePOMDP.init_posRockSamplePOMDP.sensor_efficiencyRockSamplePOMDP.bad_rock_penaltyRockSamplePOMDP.good_rock_rewardRockSamplePOMDP.step_penaltyRockSamplePOMDP.sensor_use_penaltyRockSamplePOMDP.exit_rewardRockSamplePOMDP.cache_visualization()RockSamplePOMDP.compute_metrics()RockSamplePOMDP.get_actions()RockSamplePOMDP.get_metric_names()RockSamplePOMDP.initial_observation_dist()RockSamplePOMDP.initial_state_dist()RockSamplePOMDP.is_equal_observation()RockSamplePOMDP.is_terminal()RockSamplePOMDP.observation_model()RockSamplePOMDP.reward()RockSamplePOMDP.sample_next_step()RockSamplePOMDP.state_transition_model()RockSamplePOMDP.visualize_path()
RockSamplePOMDPMetricsRockSampleStateTransitionModelcreate_random_rock_sample()create_rock_sample_state()get_robot_pos()get_rocks()states_equal()
- POMDPPlanners.environments.rock_sample_pomdp.rock_sample_visualizer module
RockSampleVisualizerRockSampleVisualizer.envRockSampleVisualizer.map_sizeRockSampleVisualizer.rock_positionsRockSampleVisualizer.action_namesRockSampleVisualizer.action_to_vectorRockSampleVisualizer.dangerous_areasRockSampleVisualizer.dangerous_area_radiusRockSampleVisualizer.create_visualization()RockSampleVisualizer.visualize_path()
- POMDPPlanners.environments.safety_ant_velocity_pomdp package
SafeAntVelocityObservationSafeAntVelocityObservation.next_stateSafeAntVelocityObservation.actionSafeAntVelocityObservation.position_noiseSafeAntVelocityObservation.velocity_noiseSafeAntVelocityObservation.positionSafeAntVelocityObservation.velocitySafeAntVelocityObservation.probability()SafeAntVelocityObservation.sample()
SafeAntVelocityPOMDPSafeAntVelocityPOMDP.cache_visualization()SafeAntVelocityPOMDP.compute_metrics()SafeAntVelocityPOMDP.get_actions()SafeAntVelocityPOMDP.get_metric_names()SafeAntVelocityPOMDP.initial_observation_dist()SafeAntVelocityPOMDP.initial_state_dist()SafeAntVelocityPOMDP.is_equal_observation()SafeAntVelocityPOMDP.is_terminal()SafeAntVelocityPOMDP.observation_model()SafeAntVelocityPOMDP.reward()SafeAntVelocityPOMDP.reward_batch()SafeAntVelocityPOMDP.sample_next_step()SafeAntVelocityPOMDP.state_transition_model()
SafeAntVelocityStateTransitionSafeAntVelocityStateTransition.stateSafeAntVelocityStateTransition.actionSafeAntVelocityStateTransition.dtSafeAntVelocityStateTransition.massSafeAntVelocityStateTransition.dampingSafeAntVelocityStateTransition.max_forceSafeAntVelocityStateTransition.force_scalesSafeAntVelocityStateTransition.positionSafeAntVelocityStateTransition.velocitySafeAntVelocityStateTransition.probability()SafeAntVelocityStateTransition.sample()
SafeAntVelocityVisualizer- Subpackages
- POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp_beliefs package
SafetyAntVelocityVectorizedUpdatercreate_safety_ant_velocity_belief()- Submodules
- POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp_beliefs.safety_ant_velocity_belief_factory module
- POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp_beliefs.safety_ant_velocity_vectorized_updater module
- POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp_beliefs package
- Submodules
- POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp module
SafeAntVelocityObservationSafeAntVelocityObservation.next_stateSafeAntVelocityObservation.actionSafeAntVelocityObservation.position_noiseSafeAntVelocityObservation.velocity_noiseSafeAntVelocityObservation.positionSafeAntVelocityObservation.velocitySafeAntVelocityObservation.probability()SafeAntVelocityObservation.sample()
SafeAntVelocityPOMDPSafeAntVelocityPOMDP.cache_visualization()SafeAntVelocityPOMDP.compute_metrics()SafeAntVelocityPOMDP.get_actions()SafeAntVelocityPOMDP.get_metric_names()SafeAntVelocityPOMDP.initial_observation_dist()SafeAntVelocityPOMDP.initial_state_dist()SafeAntVelocityPOMDP.is_equal_observation()SafeAntVelocityPOMDP.is_terminal()SafeAntVelocityPOMDP.observation_model()SafeAntVelocityPOMDP.reward()SafeAntVelocityPOMDP.reward_batch()SafeAntVelocityPOMDP.sample_next_step()SafeAntVelocityPOMDP.state_transition_model()
SafeAntVelocityPOMDPMetricsSafeAntVelocityStateTransitionSafeAntVelocityStateTransition.stateSafeAntVelocityStateTransition.actionSafeAntVelocityStateTransition.dtSafeAntVelocityStateTransition.massSafeAntVelocityStateTransition.dampingSafeAntVelocityStateTransition.max_forceSafeAntVelocityStateTransition.force_scalesSafeAntVelocityStateTransition.positionSafeAntVelocityStateTransition.velocitySafeAntVelocityStateTransition.probability()SafeAntVelocityStateTransition.sample()
- POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_visualizer module
Submodules
POMDPPlanners.environments.sanity_pomdp module
Sanity Check POMDP Environment Implementation.
This module implements a simple test environment used for debugging and sanity checking POMDP algorithms. The environment has deterministic dynamics and perfect observability, making it ideal for verifying algorithm correctness.
The Sanity POMDP features: - Two discrete states: 0 (good) and 1 (bad) - Two discrete actions: 0 (go to good state) and 1 (go to bad state) - Perfect observations: observation always equals the state - Simple reward structure: 1.0 for good state, 0.0 for bad state - No terminal states (infinite horizon)
This environment is primarily used for: - Testing POMDP algorithm implementations - Debugging belief updates and planning algorithms - Verifying that algorithms can solve trivial cases - Performance benchmarking baseline
- Classes:
SanityStateTransitionModel: Deterministic state transitions SanityObservationModel: Perfect state observation SanityInitialStateDist: Always starts in good state SanityInitialObservationDist: Initial observation distribution SanityPOMDP: Main environment class for sanity testing
- class POMDPPlanners.environments.sanity_pomdp.SanityInitialObservationDist[source]
Bases:
DistributionInitial observation distribution for Sanity POMDP.
This distribution always returns observation 0 (corresponding to the good state) as the initial observation, maintaining consistency with the initial state distribution and perfect observability property.
Example
Using the initial observation distribution:
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> # Create initial observation distribution >>> initial_obs_dist = SanityInitialObservationDist() >>> >>> # Sample initial observation >>> initial_obs = initial_obs_dist.sample()[0] # Returns 0 >>> initial_obs == 0 True >>> >>> # Sample multiple observations >>> observations = initial_obs_dist.sample(n_samples=3) # Returns [0, 0, 0] >>> len(observations) == 3 True >>> all(obs == 0 for obs in observations) True >>> >>> # Check observation probabilities >>> prob = initial_obs_dist.probability([0]) # Returns [1.0] >>> bool(prob[0] == 1.0) True
- probability(values)[source]
Calculate probabilities for given values.
- Parameters:
values (
List[int]) – List of values to calculate probabilities for- Return type:
- Returns:
Numpy array of probabilities corresponding to input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- class POMDPPlanners.environments.sanity_pomdp.SanityInitialStateDist[source]
Bases:
DistributionInitial state distribution for Sanity POMDP.
This distribution always returns state 0 (good state) as the initial state, providing a deterministic and predictable starting condition for testing.
Example
Using the initial state distribution:
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> # Create initial state distribution >>> initial_dist = SanityInitialStateDist() >>> >>> # Sample initial state (always returns good state) >>> initial_state = initial_dist.sample()[0] # Returns 0 >>> initial_state == 0 True >>> >>> # Sample multiple initial states >>> states = initial_dist.sample(n_samples=5) # Returns [0, 0, 0, 0, 0] >>> len(states) == 5 True >>> all(state == 0 for state in states) True >>> >>> # Check probability of initial states >>> prob_good = initial_dist.probability([0]) # Returns [1.0] >>> bool(prob_good[0] == 1.0) True >>> prob_bad = initial_dist.probability([1]) # Returns [0.0] >>> bool(prob_bad[0] == 0.0) True
- probability(values)[source]
Calculate probabilities for given values.
- Parameters:
values (
List[int]) – List of values to calculate probabilities for- Return type:
- Returns:
Numpy array of probabilities corresponding to input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- class POMDPPlanners.environments.sanity_pomdp.SanityObservationModel(next_state, action)[source]
Bases:
ObservationModelPerfect observation model for Sanity POMDP.
This model provides perfect observability where the observation always exactly matches the state. This eliminates partial observability and makes the problem fully observable, which is ideal for testing algorithms in the simplest possible setting.
- next_state
The state after action execution
- action
The action that was taken (not used in observation generation)
Example
Using the observation model:
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> # Create observation model for good state >>> obs_model = SanityObservationModel(next_state=0, action=0) >>> >>> # Sample observation (always matches state) >>> observation = obs_model.sample()[0] # Returns 0 >>> observation == 0 True >>> >>> # Check observation probabilities >>> prob_correct = obs_model.probability([0]) # Returns [1.0] >>> bool(prob_correct[0] == 1.0) True >>> prob_wrong = obs_model.probability([1]) # Returns [0.0] >>> bool(prob_wrong[0] == 0.0) True
- probability(values)[source]
Calculate observation probabilities for given values.
- Parameters:
values (
List[int]) – List of observation values to calculate probabilities for- Return type:
- Returns:
Array of probabilities corresponding to the input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- sample(n_samples=1)[source]
Sample observations from the observation model.
- Parameters:
n_samples (
int) – Number of observation samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled observations of length n_samples.
Note
Subclasses must implement this method according to their specific observation generation logic.
- class POMDPPlanners.environments.sanity_pomdp.SanityPOMDP(discount_factor=0.95, output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
DiscreteActionsEnvironmentSimple sanity check POMDP environment for testing and debugging.
This environment provides the simplest possible POMDP formulation with deterministic dynamics and perfect observability. It serves as a baseline for testing POMDP algorithms and ensuring correctness.
Problem Structure: - States: 0 (good), 1 (bad) - Actions: 0 (choose good), 1 (choose bad) - Observations: Same as states (perfect observability) - Rewards: 1.0 for good state, 0.0 for bad state - Dynamics: Deterministic state transitions based on action
The optimal policy is trivial: always choose action 0 to stay in the good state.
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = SanityPOMDP(discount_factor=0.95) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
int) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- class POMDPPlanners.environments.sanity_pomdp.SanityStateTransitionModel(state, action)[source]
Bases:
StateTransitionModelDeterministic state transition model for Sanity POMDP.
This model implements completely deterministic state transitions where: - Action 0 always leads to state 0 (good state) - Action 1 always leads to state 1 (bad state)
The deterministic nature makes this ideal for testing and debugging POMDP algorithms since the outcomes are predictable.
- state
Current state (0 or 1)
- action
Action to be executed (0 or 1)
Example
Using the state transition model:
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> # Create transition model from bad state with good action >>> transition_model = SanityStateTransitionModel(state=1, action=0) >>> >>> # Sample next state (always deterministic) >>> next_state = transition_model.sample()[0] # Returns 0 (good state) >>> next_state == 0 True >>> >>> # Check probability of specific outcomes >>> prob = transition_model.probability([0]) # Returns [1.0] >>> bool(prob[0] == 1.0) True >>> prob_wrong = transition_model.probability([1]) # Returns [0.0] >>> bool(prob_wrong[0] == 0.0) True
- probability(values)[source]
Calculate transition probabilities for given next states.
- Parameters:
values (
List[int]) – List of next state values to calculate probabilities for- Return type:
- Returns:
Array of transition probabilities corresponding to the input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- sample(n_samples=1)[source]
Sample next states from the transition model.
- Parameters:
n_samples (
int) – Number of next state samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled next states of length n_samples.
Note
Subclasses must implement this method according to their specific state transition dynamics.
POMDPPlanners.environments.tiger_pomdp module
Tiger POMDP Environment Implementation.
This module implements the classic Tiger problem, a benchmark POMDP environment where an agent must determine which of two doors conceals a treasure and which conceals a tiger, using only noisy acoustic observations.
The Tiger problem features: - Two doors (left and right) with a tiger behind one and treasure behind the other - Three actions: listen (to get information), open_left, open_right - Three observations: hear_left, hear_right, hear_nothing - Listening provides 85% accurate information about the tiger’s location - Opening the correct door yields +10 reward, opening wrong door yields -100 - Listening costs -1 per action
- Classes:
TigerStateTransition: State transition model for the Tiger problem TigerObservation: Observation model with noisy acoustic feedback TigerPOMDP: Main environment class implementing the Tiger problem
- class POMDPPlanners.environments.tiger_pomdp.TigerObservation(next_state, action)[source]
Bases:
ObservationModelObservation model for the Tiger POMDP.
Provides noisy acoustic feedback when listening, with 85% accuracy. When doors are opened, no meaningful observation is provided.
- next_state
The state after action execution
- action
The action that was taken
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> # Create observation model for listening when tiger is left >>> obs_listen = TigerObservation(next_state="tiger_left", action="listen") >>> observation = obs_listen.sample()[0] >>> observation in ["hear_left", "hear_right"] # Listen gives acoustic feedback True
>>> # Create observation model for opening door >>> obs_open = TigerObservation(next_state="tiger_left", action="open_left") >>> observation_open = obs_open.sample()[0] >>> observation_open == "hear_nothing" # Opening always gives no sound True
>>> # Check observation probabilities >>> prob_correct = obs_listen.probability(["hear_left"]) >>> bool(prob_correct[0] == 0.85) # Correct observation probability True >>> prob_wrong = obs_listen.probability(["hear_right"]) >>> bool(prob_wrong[0] == 0.15) # Wrong observation probability True >>> prob_nothing = obs_open.probability(["hear_nothing"]) >>> bool(prob_nothing[0] == 1.0) # Opening door always gives no sound True
- probability(values)[source]
Calculate observation probabilities for given values.
- Parameters:
values (
List[Any]) – List of observation values to calculate probabilities for- Return type:
- Returns:
Array of probabilities corresponding to the input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- sample(n_samples=1)[source]
Sample observations from the observation model.
- Parameters:
n_samples (
int) – Number of observation samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled observations of length n_samples.
Note
Subclasses must implement this method according to their specific observation generation logic.
- class POMDPPlanners.environments.tiger_pomdp.TigerPOMDP(discount_factor, name='TigerPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
DiscreteActionsEnvironmentTiger POMDP environment implementation.
This is the classic Tiger problem where an agent must decide which door to open to find treasure while avoiding the tiger. The agent can listen for acoustic cues but receives noisy observations.
Problem Structure: - States: tiger_left, tiger_right (tiger’s location) - Actions: listen, open_left, open_right - Observations: hear_left, hear_right, hear_nothing - Rewards: listen(-1), correct_door(+10), wrong_door(-100)
- Parameters:
- states
List of possible states
- actions
List of possible actions
- observations
List of possible observations
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> tiger = TigerPOMDP(discount_factor=0.95) >>> >>> # Get initial state and actions >>> initial_state = tiger.initial_state_dist().sample()[0] >>> actions = tiger.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = tiger.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> tiger.is_terminal(initial_state) False
- compute_metrics(histories)[source]
Compute Tiger POMDP specific metrics from simulation histories.
- Parameters:
- Return type:
- Returns:
List of MetricValue objects containing the computed metrics
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
str) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- class POMDPPlanners.environments.tiger_pomdp.TigerPOMDPMetrics(*values)[source]
Bases:
EnumMetric names for Tiger POMDP environment.
- AVERAGE_LISTENS = 'average_listens'
- SUCCESS_RATE = 'success_rate'
- class POMDPPlanners.environments.tiger_pomdp.TigerStateTransition(state, action)[source]
Bases:
StateTransitionModelState transition model for the Tiger POMDP.
The state only changes when a door is opened, after which the tiger is randomly placed behind one of the two doors for the next episode.
- state
Current state (tiger_left or tiger_right)
- action
Action to be taken (listen, open_left, or open_right)
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> # Create transition model for listening action >>> transition_listen = TigerStateTransition(state="tiger_left", action="listen") >>> next_state_listen = transition_listen.sample()[0] >>> next_state_listen == "tiger_left" # No state change when listening True
>>> # Create transition model for opening door >>> transition_open = TigerStateTransition(state="tiger_left", action="open_left") >>> next_state_open = transition_open.sample()[0] >>> next_state_open in ["tiger_left", "tiger_right"] # Random outcome True
>>> # Check probabilities for different outcomes >>> prob_same = transition_listen.probability(["tiger_left"]) >>> bool(prob_same[0] == 1.0) # Probability remains same when listening True >>> prob_random = transition_open.probability(["tiger_left"]) >>> bool(prob_random[0] == 0.5) # Equal probability when opening True
- probability(values)[source]
Calculate transition probabilities for given next states.
- Parameters:
values (
List[Any]) – List of next state values to calculate probabilities for- Return type:
- Returns:
Array of transition probabilities corresponding to the input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- sample(n_samples=1)[source]
Sample next states from the transition model.
- Parameters:
n_samples (
int) – Number of next state samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled next states of length n_samples.
Note
Subclasses must implement this method according to their specific state transition dynamics.