POMDPPlanners.environments package

POMDP Environment Implementations.

This package contains concrete implementations of various POMDP environments used for testing and benchmarking planning algorithms. Each environment implements the core Environment interface with specific state spaces, action spaces, observation models, and reward functions.

Available Environments:

TigerPOMDP: Classic tiger problem with discrete states and observations CartPolePOMDP: Pole balancing task with continuous states, discrete actions MountainCarPOMDP: Car climbing hill task with continuous state space PushPOMDP: Object manipulation task with spatial reasoning SafeAntVelocityPOMDP: Safety-constrained ant navigation SanityPOMDP: Simple test environment for debugging DiscreteLightDarkPOMDP: Grid-based light-dark navigation ContinuousLightDarkPOMDP: Continuous light-dark navigation problem LaserTagPOMDP: Pursuit-evasion problem with robot tagging opponent RockSamplePOMDP: Rock sampling problem with sensor-based rock quality evaluation

Factory Functions:

get_environment: Create environment instances by name with parameters

class POMDPPlanners.environments.CartPolePOMDP(discount_factor, noise_cov, state_transition_cov=None, name='CartPolePOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

CartPole balancing task formulated as a POMDP.

This environment simulates the classic cart-pole balancing problem where an agent must apply left or right forces to keep a pole balanced on a moving cart. The challenge comes from noisy observations of the cart-pole state.

Problem Structure: - State: [cart_position, cart_velocity, pole_angle, pole_velocity] (continuous) - Actions: [left_force, right_force] (discrete) - Observations: Noisy state measurements (continuous) - Rewards: +1.0 per time step alive, 0.0 when terminated - Termination: Pole falls beyond angle threshold or cart moves too far

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> noise_cov = np.diag([0.1, 0.1, 0.1, 0.1])
>>> env = CartPolePOMDP(discount_factor=0.99, noise_cov=noise_cov)
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
DEFAULT_STATE_TRANSITION_COV = array([[1.0e-04, 0.0e+00, 0.0e+00, 0.0e+00],        [0.0e+00, 1.0e-04, 0.0e+00, 0.0e+00],        [0.0e+00, 0.0e+00, 2.5e-05, 0.0e+00],        [0.0e+00, 0.0e+00, 0.0e+00, 1.0e-04]])
compute_metrics(histories)[source]

Compute CartPole POMDP specific metrics from simulation histories.

Parameters:

histories (List[History]) – List of simulation histories

Return type:

List[MetricValue]

Returns:

List of MetricValue objects containing the computed metrics

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[int]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

get_metric_names()[source]

Get names of CartPole POMDP specific metrics.

Returns:

goal_reaching_rate

Return type:

List[str]

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (ndarray) – First observation to compare

  • observation2 (ndarray) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (ndarray) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (int) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (int) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (int) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (int) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.ContinuousLaserTagPOMDP(discount_factor, name='ContinuousLaserTagPOMDP', grid_size=(11.0, 7.0), walls=None, robot_radius=0.3, opponent_radius=0.3, tag_radius=0.5, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, robot_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), opponent_transition_cov_matrix=array([[0.05, 0.], [0., 0.05]]), pursuit_speed=0.6, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None)[source]

Bases: Environment

Continuous LaserTag POMDP with continuous [dx, dy, tag_flag] actions.

A pursuit-evasion problem in continuous 2-D space where a robot must navigate to tag an opponent. The robot receives noisy 8-direction laser range observations.

Example

>>> import numpy as np
>>> np.random.seed(42)
>>>
>>> # Initialize environment
>>> env = ContinuousLaserTagPOMDP(discount_factor=0.95)
>>>
>>> # Get initial state
>>> initial_state = env.initial_state_dist().sample()[0]
>>>
>>> # Sample complete step
>>> action = np.array([1.0, 0.0, 0.0])
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
  • discount_factor (float)

  • name (str)

  • grid_size (Tuple[float, float])

  • walls (Optional[List[Tuple[float, float, float, float]]])

  • robot_radius (float)

  • opponent_radius (float)

  • tag_radius (float)

  • tag_reward (float)

  • tag_penalty (float)

  • step_cost (float)

  • measurement_noise (float)

  • robot_transition_cov_matrix (np.ndarray)

  • opponent_transition_cov_matrix (np.ndarray)

  • pursuit_speed (float)

  • dangerous_areas (Optional[List[Tuple[float, float]]])

  • dangerous_area_radius (float)

  • dangerous_area_penalty (float)

  • output_dir (Optional[Path])

  • debug (bool)

  • use_queue_logger (bool)

  • initial_state (Optional[np.ndarray])

cache_visualization(history, cache_path)[source]

Cache visualization data for an episode history.

This method can be overridden by subclasses to provide environment-specific visualization caching capabilities.

Parameters:
  • history (List[StepData]) – List of step data from an episode

  • cache_path (Path) – Path where visualization data should be cached

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics from episode histories.

This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.

Parameters:

histories (List[History]) – List of episode histories to analyze

Return type:

List[MetricValue]

Returns:

List of computed metrics with confidence intervals

get_metric_names()[source]

Get names of environment-specific metrics.

This method returns the names of custom metrics that this environment computes in the compute_metrics() method. It enables users to discover what metrics are available for hyperparameter optimization.

Return type:

List[str]

Returns:

List of metric names that this environment produces. Default implementation returns empty list for environments without custom metrics.

Note

Subclasses that override compute_metrics() should also override this method to return the names of metrics they produce. Use an Enum to ensure consistency between the names returned here and the names used in compute_metrics().

property grid_size: ndarray
initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (Any) – First observation to compare

  • observation2 (Any) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (ndarray) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (ndarray) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (ndarray) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (ndarray) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (ndarray) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

property walls: ndarray
class POMDPPlanners.environments.ContinuousLaserTagPOMDPDiscreteActions(discount_factor, name='ContinuousLaserTagPOMDPDiscreteActions', grid_size=(11.0, 7.0), walls=None, robot_radius=0.3, opponent_radius=0.3, tag_radius=0.5, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, robot_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), opponent_transition_cov_matrix=array([[0.05, 0.], [0., 0.05]]), pursuit_speed=0.6, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None)[source]

Bases: ContinuousLaserTagPOMDP, DiscreteActionsEnvironment

Continuous LaserTag POMDP with discrete string actions.

Actions: "up", "down", "right", "left", "tag".

Example

>>> import numpy as np
>>> np.random.seed(42)
>>>
>>> env = ContinuousLaserTagPOMDPDiscreteActions(discount_factor=0.95)
>>>
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> env.is_terminal(initial_state)
False
Parameters:
  • discount_factor (float)

  • name (str)

  • grid_size (Tuple[float, float])

  • walls (Optional[List[Tuple[float, float, float, float]]])

  • robot_radius (float)

  • opponent_radius (float)

  • tag_radius (float)

  • tag_reward (float)

  • tag_penalty (float)

  • step_cost (float)

  • measurement_noise (float)

  • robot_transition_cov_matrix (np.ndarray)

  • opponent_transition_cov_matrix (np.ndarray)

  • pursuit_speed (float)

  • dangerous_areas (Optional[List[Tuple[float, float]]])

  • dangerous_area_radius (float)

  • dangerous_area_penalty (float)

  • output_dir (Optional[Path])

  • debug (bool)

  • use_queue_logger (bool)

  • initial_state (Optional[np.ndarray])

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[str]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (Any) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (Any) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.ContinuousLightDarkPOMDP(discount_factor, name='ContinuousLightDarkPOMDP', state_transition_cov_matrix=array([[0.05, 0.], [0., 0.05]]), observation_cov_matrix=array([[0.05, 0.], [0., 0.05]]), beacons=[(0, 0), (0, 5), (0, 10), (5, 0), (5, 5), (5, 10), (10, 0), (10, 5), (10, 10)], goal_state=array([10, 5]), start_state=array([0, 5]), obstacles=[(3, 7), (5, 5)], obstacle_hit_probability=0.2, obstacle_reward=-10.0, goal_reward=10.0, fuel_cost=2.0, grid_size=11, goal_state_radius=1.5, beacon_radius=1.0, obstacle_radius=1.5, reward_model_type=RewardModelType.STANDARD, observation_model_type=ObservationModelType.NORMAL_NOISE, penalty_decay=1.0, is_obstacle_hit_terminal=True)[source]

Bases: BaseLightDarkPOMDP

Continuous Light-Dark POMDP environment with continuous actions.

This environment extends the base Light-Dark problem to continuous 2D space with continuous action vectors. The agent navigates toward a goal while dealing with position-dependent observation noise and optional obstacles.

Key features: - Continuous 2D state and action spaces - Light beacons reduce observation noise when nearby - Multiple observation models available (normal noise, normal noise with no observation in dark) - Multiple reward models available (standard, decaying hit probability, dangerous states) - Optional obstacles with configurable hit penalties - Terminal conditions for goal reaching, obstacle hits, and boundary violations

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = ContinuousLightDarkPOMDP(
...     discount_factor=0.95,
...     goal_state=np.array([10, 5]),
...     start_state=np.array([0, 5])
... )
>>>
>>> # Get initial state
>>> initial_state = env.initial_state_dist().sample()[0]
>>>
>>> # Sample complete step (action must be provided based on environment type)
>>> action = np.array([1.0, 0.0])  # Move right
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
compute_metrics(histories)[source]

Compute environment-specific metrics from episode histories.

This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.

Parameters:

histories (List[History]) – List of episode histories to analyze

Return type:

List[MetricValue]

Returns:

List of computed metrics with confidence intervals

get_metric_names()[source]

Get names of Continuous Light-Dark POMDP specific metrics.

Returns:

goal_reaching_rate, obstacle_hit_rate, avg_obstacle_hit_counter, out_of_grid_rate, and avg_dangerous_states_counter

Return type:

List[str]

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (ndarray) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (ndarray) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (ndarray) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (ndarray) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (ndarray) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.ContinuousLightDarkPOMDPDiscreteActions(discount_factor, state_transition_cov_matrix=array([[1., 0.], [0., 1.]]), observation_cov_matrix=array([[1., 0.], [0., 1.]]), obstacle_hit_probability=0.2, obstacle_reward=-10.0, goal_reward=10.0, fuel_cost=2.0, grid_size=11, goal_state_radius=1.5, beacon_radius=1.0, obstacle_radius=1.5, name='ContinuousLightDarkPOMDPDiscreteActions', beacons=[(0, 0), (0, 5), (0, 10), (5, 0), (5, 5), (5, 10), (10, 0), (10, 5), (10, 10)], goal_state=array([10, 5]), start_state=array([0, 5]), obstacles=[(3, 7), (5, 5)], reward_model_type=RewardModelType.STANDARD, observation_model_type=ObservationModelType.NORMAL_NOISE, penalty_decay=1.0, is_obstacle_hit_terminal=True)[source]

Bases: ContinuousLightDarkPOMDP, DiscreteActionsEnvironment

Continuous Light-Dark POMDP environment with discrete actions.

This variant of the Continuous Light-Dark POMDP uses discrete directional actions (up, down, left, right) instead of continuous action vectors. The continuous state space and observation model are preserved.

Actions are mapped to unit vectors: - “up”: [0, 1] - “down”: [0, -1] - “right”: [1, 0] - “left”: [-1, 0]

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = ContinuousLightDarkPOMDPDiscreteActions(
...     discount_factor=0.95,
...     goal_state=np.array([10, 5]),
...     start_state=np.array([0, 5])
... )
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[Any]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (Any) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (Any) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.DiscreteLightDarkPOMDP(discount_factor, name='DiscreteLightDarkPOMDP', transition_error_prob=0.05, observation_error_prob=0.05, beacons=[(0, 0), (0, 5), (0, 10), (5, 0), (5, 5), (5, 10), (10, 0), (10, 5), (10, 10)], goal_state=array([10, 5]), start_state=array([0, 5]), obstacles=[(3, 7), (5, 5)], obstacle_hit_probability=0.2, obstacle_reward=-10.0, goal_reward=10.0, beacon_radius=1.0, fuel_cost=2.0, grid_size=11, is_stochastic_reward=True, observation_model_type=ObservationModelType.NORMAL)[source]

Bases: BaseLightDarkPOMDPDiscreteActions, DiscreteActionsEnvironment

Discrete Light-Dark POMDP Environment for Robot Navigation with Observation Uncertainty.

This environment implements a discretized version of the classic Light-Dark POMDP problem, where a robot must navigate from a start position to a goal position in a grid world with beacons and obstacles. The key challenge is that the robot’s observation quality depends on its distance from beacons - closer to beacons means more accurate observations.

Problem Description: The robot operates in a discrete grid world where it can move in four cardinal directions. The environment includes: - Beacons: Fixed positions that provide location reference with varying accuracy - Obstacles: Grid cells that incur penalties when hit - Goal: Target position that provides high reward when reached - Observation uncertainty: Decreases with proximity to beacons (light areas)

Key Features: - Discrete state space: Robot positions are restricted to grid cells - Discrete action space: North, South, East, West movements - Multiple observation models available (normal, no observation in dark) - Distance-dependent observation accuracy: Closer to beacons = better observations - Stochastic transitions: Actions may fail with configurable probability - Obstacle avoidance: Penalties for hitting obstacles during navigation - Configurable environment parameters: Grid size, beacon positions, obstacles

State Space: - 2D grid coordinates (x, y) representing robot position - Bounded by grid_size parameter (default: 11x11 grid)

Action Space: - Discrete actions: [‘North’, ‘South’, ‘East’, ‘West’] - Each action moves robot one grid cell in the corresponding direction - Boundary conditions: Actions that would move outside grid are blocked

Observation Space: - Discrete observations based on beacon proximity and noise - Observation accuracy improves with proximity to beacons - Stochastic observation errors controlled by observation_error_prob

Reward Structure: - Goal reward: Large positive reward for reaching the goal state - Obstacle penalty: Negative reward for hitting obstacles - Fuel cost: Small negative reward for each movement action - Distance-based penalties: Encourage efficient navigation

Parameters:
transition_error_prob

Probability that an action fails (results in different movement)

observation_error_prob

Probability of observation noise/error

is_stochastic_reward

Whether rewards include stochastic components

beacons

List of (x, y) beacon positions that provide navigation references

goal_state

Target position (x, y) that robot should reach

start_state

Initial robot position (x, y)

obstacles

List of (x, y) obstacle positions to avoid

grid_size

Dimension of the square grid world

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = DiscreteLightDarkPOMDP(
...     discount_factor=0.95,
...     transition_error_prob=0.1,
...     observation_error_prob=0.15,
...     beacons=[(1, 1), (2, 2)],
...     grid_size=11
... )
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False

References: - Platt, R., et al. “Belief space planning assuming maximum likelihood observations.” (2010) - Kurniawati, H., et al. “SARSOP: Efficient point-based POMDP planning by approximating optimally reachable belief spaces.” (2008) - Light-Dark domain: Classic POMDP benchmark for testing observation uncertainty

compute_metrics(histories)[source]

Compute environment-specific metrics from episode histories.

This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.

Parameters:

histories (List[History]) – List of episode histories to analyze

Return type:

List[MetricValue]

Returns:

List of computed metrics with confidence intervals

get_metric_names()[source]

Get names of Discrete Light-Dark POMDP specific metrics.

Returns:

goal_reaching_rate, obstacle_hit_rate, avg_obstacle_hit_counter, out_of_grid_rate, and avg_dangerous_states_counter

Return type:

List[str]

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (ndarray) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (Any) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (str) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

sample_next_step(state, action)[source]

Sample a complete state transition step.

This convenience method combines state transition, observation generation, and reward calculation in a single operation.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action to execute

Returns:

  • next_state: Sampled next state

  • next_observation: Sampled observation

  • reward: Immediate reward

Return type:

Tuple[Any, Any, float]

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.LaserTagPOMDP(discount_factor, name='LaserTagPOMDP', floor_shape=(11, 7), walls={(1, 2), (3, 0), (3, 4), (5, 0), (6, 4), (9, 1), (9, 4), (10, 6)}, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, dangerous_areas={(2, 5), (5, 3), (7, 1)}, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None, transition_error_prob=0.0)[source]

Bases: DiscreteActionsEnvironment

LaserTag POMDP environment implementation.

This is a pursuit-evasion problem where a robot must navigate a grid to tag an opponent. The robot receives noisy observations of the opponent’s position and must decide when and where to attempt tagging.

Problem Structure: - States: numpy array [robot_row, robot_col, opp_row, opp_col, terminal] - Actions: North(0), South(1), East(2), West(3), Tag(4) - Observations: 8-directional laser measurements (N,NE,E,SE,S,SW,W,NW) - Rewards: Tag success(+10), Tag failure(-10), Movement(-1)

Parameters:
floor_shape

Grid dimensions as (rows, cols)

walls

Set of wall positions as (row, col) tuples

tag_reward

Reward for successful tagging

tag_penalty

Penalty for unsuccessful tagging

step_cost

Cost per movement action

measurement_noise

Standard deviation of observation noise

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = LaserTagPOMDP(discount_factor=0.95)
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
cache_visualization(history, cache_path)[source]

Cache visualization of the LaserTag episode as an animated GIF.

Creates an animated visualization showing: - Robot movement (red circle) - Opponent movement (blue circle) - Walls (black squares) - Dangerous areas (red circles) - Action arrows showing robot’s intended movement - Laser measurements (green rays from robot position) - Belief particles (if available) showing robot’s belief about opponent location - Grid boundaries and coordinate system

Parameters:
  • history (List[StepData]) – The history of states, actions, and observations from an episode

  • cache_path (Path) – Path where to save the visualization GIF

Raises:
  • ValueError – If history is empty or contains invalid data

  • TypeError – If cache_path is not a Path object or doesn’t end with .gif

Return type:

None

compute_metrics(histories)[source]

Compute LaserTag POMDP specific metrics from simulation histories.

Return type:

List[MetricValue]

Parameters:

histories (List[History])

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[int]

get_metric_names()[source]

Get names of LaserTag POMDP specific metrics.

Returns:

tag_success_rate, average_episode_length, average_failed_tag_attempts, average_obstacle_collisions, average_dangerous_area_steps, and average_all_dangerous_encounters

Return type:

List[str]

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Observations are 8-dimensional laser measurements or terminal observations.

Return type:

bool

Parameters:
  • observation1 (Any)

  • observation2 (Any)

is_terminal(state)[source]

Check if a state is terminal.

Return type:

bool

Parameters:

state (ndarray)

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Return type:

ObservationModel

Parameters:
reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Return type:

float

Parameters:
state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Return type:

StateTransitionModel

Parameters:
class POMDPPlanners.environments.MountainCarPOMDP(discount_factor, state_transition_cov=None, name='MountainCarPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

Mountain Car problem formulated as a POMDP.

This environment simulates an underpowered car trying to reach the top of a steep mountain. The car must build momentum by oscillating back and forth to gain enough energy to reach the goal, with noisy observations of its state.

Problem Structure: - State: [position, velocity] (continuous, position ∈ [-1.2, 0.6], velocity ∈ [-0.07, 0.07]) - Actions: [-1 (reverse), 0 (neutral), 1 (forward)] (discrete) - Observations: Noisy state measurements (continuous) - Rewards: 0 for reaching goal (position ≥ 0.5), -1 per time step otherwise - Goal: Drive car to position ≥ 0.5 (top of mountain)

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = MountainCarPOMDP(discount_factor=0.99)
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
DEFAULT_STATE_TRANSITION_COV = array([[2.5e-05, 0.0e+00],        [0.0e+00, 1.0e-06]])
cache_visualization(history, cache_path)[source]

Cache visualization data for an episode history.

This method can be overridden by subclasses to provide environment-specific visualization caching capabilities.

Parameters:
  • history (List[StepData]) – List of step data from an episode

  • cache_path (Path) – Path where visualization data should be cached

Return type:

None

compute_metrics(histories)[source]

Compute Mountain Car POMDP specific metrics from simulation histories.

Parameters:

histories (List[History]) – List of simulation histories

Return type:

List[MetricValue]

Returns:

List of MetricValue objects containing the computed metrics

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[Any]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

get_metric_names()[source]

Get names of Mountain Car POMDP specific metrics.

Returns:

goal_reaching_rate

Return type:

List[str]

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (Tuple[float, float]) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (Tuple[float, float]) – The resulting state after taking an action

  • action (int) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (Tuple[float, float]) – Current state

  • action (int) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (int) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.PacManPOMDP(maze_size=(7, 7), walls=None, initial_pellets=None, initial_pacman_pos=(0, 0), num_ghosts=1, initial_ghost_positions=None, initial_ghost_pos=None, pellet_reward=10.0, ghost_collision_penalty=-100.0, step_penalty=-1.0, win_reward=100.0, ghost_aggressiveness=2.0, ghost_coordination='independent', ghost_strategies=None, observation_noise_factor=0.3, max_observation_noise=1.5, discount_factor=0.95, name='PacManPOMDP', output_dir=None, debug=False)[source]

Bases: DiscreteActionsEnvironment

PacMan POMDP environment inspired by the classic arcade game.

This environment implements a simplified PacMan game where PacMan must collect pellets while avoiding a single ghost. The ghost position is only partially observable through noisy sensor readings.

Parameters:
maze_size

Grid dimensions as (rows, cols)

walls

Set of wall positions as (row, col) tuples

initial_pellets

List of initial pellet positions

pellet_reward

Reward for collecting a pellet

ghost_collision_penalty

Penalty for collision with ghost

step_penalty

Cost per action

win_reward

Reward for collecting all pellets

ghost_aggressiveness

Temperature parameter for ghost movement policy

observation_noise_factor

Multiplier for observation noise based on distance

max_observation_noise

Maximum noise standard deviation

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = PacManPOMDP(maze_size=(7, 7))
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
array_to_observation(arr)[source]

Convert a flat numpy array back to a PacMan observation tuple.

Parameters:

arr (ndarray) – 1-D array of shape (2 * num_ghosts,).

Return type:

Tuple[Tuple[int, int], ...]

Returns:

Observation as tuple of (row, col) tuples.

array_to_state(arr)[source]

Convert a numpy array back to a PacManState.

Parameters:

arr (ndarray) – 1-D array of shape (self._state_dim,) produced by state_to_array().

Return type:

PacManState

Returns:

Reconstructed PacManState.

cache_visualization(history, cache_path)[source]

Cache visualization of episode history.

Parameters:
  • history (List[StepData]) – List of StepData objects representing the episode

  • cache_path (Path) – Path where the GIF should be saved

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics.

Return type:

List[MetricValue]

Parameters:

histories (List[History])

get_actions()[source]

Get all available actions.

Return type:

List[int]

get_metric_names()[source]

Get names of PacMan POMDP specific metrics.

Return type:

List[str]

Returns:

List containing metric names including standard metrics (win_rate, avg_pellets_collected, avg_episode_length, avg_pacman_closest_ghost_distance, avg_collision_encounters) and dynamically generated per-ghost distance metrics for multi-ghost scenarios (avg_pacman_ghost_0_distance, avg_pacman_ghost_1_distance, etc.)

property initial_ghost_pos: Tuple[int, int]

returns first ghost position.

Type:

Backward compatibility

initial_observation_dist()[source]

Get initial observation distribution.

Return type:

DiscreteDistribution

initial_state_dist()[source]

Get initial state distribution.

Return type:

DiscreteDistribution

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Return type:

bool

Parameters:
  • observation1 (Any)

  • observation2 (Any)

is_terminal(state)[source]

Check if state is terminal.

Return type:

bool

Parameters:

state (Any)

observation_model(next_state, action)[source]

Get observation model.

Return type:

PacManObservationModel

Parameters:
  • next_state (Any)

  • action (int)

observation_to_array(obs)[source]

Convert a PacMan observation tuple to a flat numpy array.

Parameters:

obs (Tuple[Tuple[int, int], ...]) – Observation as tuple of ghost (row, col) positions.

Return type:

ndarray

Returns:

1-D array of shape (2 * num_ghosts,).

reward(state, action)[source]

Calculate immediate reward.

Return type:

float

Parameters:
reward_batch(states, action)[source]

Calculate rewards for a batch of states.

Accepts either a 2-D numpy array of shape (N, state_dim) (vectorized path) or a sequence of PacManState objects (falls back to the loop-based default).

Computes deterministic reward components only: step penalty, pellet collection, and win bonus. Ghost collision penalty is excluded because it depends on stochastic ghost movement.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Array of shape (N, state_dim) or sequence of states.

  • action (int) – Discrete action index (0-3).

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_to_array(state)[source]

Convert a PacManState to a fixed-size numpy array.

The array layout is: [pac_row, pac_col, g0_row, g0_col, ..., pellet_mask[0..P-1], score, terminal]

Parameters:

state (PacManState) – A PacManState instance.

Return type:

ndarray

Returns:

1-D float array of shape (self._state_dim,).

state_transition_model(state, action)[source]

Get state transition model.

Return type:

PacManStateTransitionModel

Parameters:
states_to_array(states)[source]

Batch-convert a list of PacManState to a 2-D numpy array.

Parameters:

states (List[PacManState]) – List of PacManState instances.

Return type:

ndarray

Returns:

Array of shape (len(states), self._state_dim).

visualize_path(path, actions, cache_path)[source]

Visualize PacMan path through the maze using sprite-based rendering.

Parameters:
  • path (List[PacManState]) – List of states representing the path through the maze

  • actions (List[int]) – List of actions taken at each step

  • cache_path (Path) – Path where the GIF should be saved

class POMDPPlanners.environments.PushPOMDP(discount_factor, grid_size=10, push_threshold=1.0, friction_coefficient=0.3, observation_noise=0.1, obstacles=None, obstacle_radius=0.5, obstacle_penalty=-10.0, initial_state=None, transition_error_prob=0.0, name='PushPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

Robotic push task formulated as a POMDP.

This environment simulates a robot that must push an object to a target location on a 2D grid. The robot can move in four directions and pushes objects when close enough, with partial observability through noisy object position measurements.

Problem Structure: - State: [robot_x, robot_y, object_x, object_y, target_x, target_y] (continuous) - Actions: [“up”, “down”, “left”, “right”] (discrete) - Observations: [robot_x, robot_y, noisy_object_x, noisy_object_y, target_x, target_y] - Rewards: -distance_to_target + 100 (when object reaches target) - Termination: Object within 0.5 units of target position

Key Features: - Physics-based pushing with configurable friction - Distance-based pushing threshold - Noisy observations of object position only - Dense reward signal based on object-target distance - Obstacle collision detection with configurable penalties - Obstacles prevent robot and object movement through them

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = PushPOMDP(discount_factor=0.99)
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
cache_visualization(history, cache_path)[source]

Cache animated visualization of the push episode.

Creates an animated GIF showing the robot pushing the object toward the target, with obstacles, collision detection, distance indicators, and success feedback.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif

  • TypeError – If cache_path is not a Path object

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics from episode histories.

This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.

Parameters:

histories (List[History]) – List of episode histories to analyze

Return type:

List[MetricValue]

Returns:

List of computed metrics with confidence intervals

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[str]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

get_metric_names()[source]

Get names of Push POMDP specific metrics.

Return type:

List[str]

Returns:

List containing collision-related metric names

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (ndarray) – First observation to compare

  • observation2 (ndarray) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (ndarray) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (str) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (str) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

sample_next_step(state, action)[source]

Sample a complete state transition step.

This convenience method combines state transition, observation generation, and reward calculation in a single operation.

Parameters:
  • state (Any) – Current state

  • action (Any) – Action to execute

Returns:

  • next_state: Sampled next state

  • next_observation: Sampled observation

  • reward: Immediate reward

Return type:

Tuple[Any, Any, float]

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (str) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.RockSamplePOMDP(map_size=(5, 5), rock_positions=None, init_pos=(0, 0), sensor_efficiency=10.0, bad_rock_penalty=-10.0, good_rock_reward=10.0, step_penalty=0.0, sensor_use_penalty=0.0, exit_reward=10.0, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, discount_factor=0.95, name='RockSample', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

RockSample POMDP environment

This environment implements the classic rock sampling problem where a robot must navigate a grid, use sensors to evaluate rocks, and decide which ones to sample while balancing exploration costs and sampling rewards.

Parameters:
map_size

Grid dimensions as (rows, cols)

rock_positions

List of rock positions as (row, col) tuples

init_pos

Initial robot position

sensor_efficiency

Sensor noise parameter (higher = less noise)

bad_rock_penalty

Penalty for sampling a bad rock

good_rock_reward

Reward for sampling a good rock

step_penalty

Cost for each action

sensor_use_penalty

Additional cost for using sensor

exit_reward

Reward for reaching the exit

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = RockSamplePOMDP(map_size=(5, 5), rock_positions=[(0, 0), (2, 2), (3, 3)])
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
cache_visualization(history, cache_path)[source]

Cache visualization of episode history.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics.

Return type:

List[MetricValue]

Parameters:

histories (List[History])

get_actions()[source]

Get all available actions.

Return type:

List[int]

get_metric_names()[source]

Get names of RockSample POMDP specific metrics.

Returns:

avg_rocks_sampled, exit_success_rate, and average_dangerous_area_steps

Return type:

List[str]

initial_observation_dist()[source]

Get initial observation distribution.

Return type:

DiscreteDistribution

initial_state_dist()[source]

Get initial state distribution.

Return type:

DiscreteDistribution

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Return type:

bool

Parameters:
  • observation1 (Any)

  • observation2 (Any)

is_terminal(state)[source]

Check if state is terminal.

Return type:

bool

Parameters:

state (ndarray)

observation_model(next_state, action)[source]

Get observation model.

Return type:

RockSampleObservationModel

Parameters:
reward(state, action)[source]

Calculate immediate reward.

Return type:

float

Parameters:
sample_next_step(state, action)[source]

Override to avoid reward() recomputing next state.

Return type:

Tuple[ndarray, str, float]

Parameters:
state_transition_model(state, action)[source]

Get state transition model.

Return type:

RockSampleStateTransitionModel

Parameters:
visualize_path(path, actions, cache_path)[source]

Visualize robot path through the environment.

Parameters:
  • path (List[ndarray]) – List of states representing the path

  • actions (List[int]) – List of actions taken at each state

  • cache_path (Path) – Path where to save the animation (must end with .gif)

Return type:

None

class POMDPPlanners.environments.SafeAntVelocityPOMDP(discount_factor, safe_velocity_threshold=2.0, max_force=1.0, dt=0.1, mass=1.0, damping=0.1, position_noise=0.1, velocity_noise=0.2, safety_violation_penalty=-100.0, movement_reward_scale=1.0, name='SafeVelocityPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

Safety-critical velocity control task formulated as a POMDP.

This environment presents a safety-critical control problem where an agent must navigate while keeping velocity below a safety threshold. The challenge comes from balancing exploration rewards with safety constraints under noisy velocity observations.

Problem Structure: - State: [position_x, position_y, velocity_x, velocity_y] (continuous) - Actions: [0=no force, 1=small, 2=medium, 3=large force] (discrete) - Observations: Noisy position and velocity measurements (continuous) - Rewards: Movement reward - safety violation penalty (if unsafe) - Safety constraint: velocity magnitude ≤ safe_velocity_threshold - Termination: Velocity exceeds 1.5x safety threshold

Safety Features: - Tracks safety and critical violation rates - Heavy penalties for constraint violations - Configurable safety thresholds and penalties - Physics simulation with uncertainty in force direction

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = SafeAntVelocityPOMDP(discount_factor=0.99)
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
cache_visualization(history, cache_path)[source]

Cache animated visualization of the safety ant velocity episode.

Creates an animated GIF showing the ant’s movement trajectory with velocity vectors, safety zones, force applications, and safety constraint violations.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif

  • TypeError – If cache_path is not a Path object

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics from episode histories.

This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.

Parameters:

histories (List[History]) – List of episode histories to analyze

Return type:

List[MetricValue]

Returns:

List of computed metrics with confidence intervals

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[int]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

get_metric_names()[source]

Get names of Safety Ant Velocity POMDP specific metrics.

Returns:

safety_violation_rate, critical_violation_rate, total_safety_violations, and total_critical_violations

Return type:

List[str]

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (ndarray) – First observation to compare

  • observation2 (ndarray) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (ndarray) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (int) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (int) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (int) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

sample_next_step(state, action)[source]

Sample a complete state transition step.

This convenience method combines state transition, observation generation, and reward calculation in a single operation.

Parameters:
  • state (ndarray) – Current state

  • action (int) – Action to execute

Returns:

  • next_state: Sampled next state

  • next_observation: Sampled observation

  • reward: Immediate reward

Return type:

Tuple[ndarray, ndarray, float]

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (int) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.SanityPOMDP(discount_factor=0.95, output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

Simple sanity check POMDP environment for testing and debugging.

This environment provides the simplest possible POMDP formulation with deterministic dynamics and perfect observability. It serves as a baseline for testing POMDP algorithms and ensuring correctness.

Problem Structure: - States: 0 (good), 1 (bad) - Actions: 0 (choose good), 1 (choose bad) - Observations: Same as states (perfect observability) - Rewards: 1.0 for good state, 0.0 for bad state - Dynamics: Deterministic state transitions based on action

The optimal policy is trivial: always choose action 0 to stay in the good state.

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = SanityPOMDP(discount_factor=0.95)
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
  • discount_factor (float)

  • output_dir (Path | None)

  • debug (bool)

  • use_queue_logger (bool)

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[int]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

SanityInitialObservationDist

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

SanityInitialStateDist

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (int) – First observation to compare

  • observation2 (int) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (int) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (int) – The resulting state after taking an action

  • action (int) – The action that was executed

Return type:

SanityObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (int) – Current state

  • action (int) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (int) – Current state

  • action (int) – Action to be executed

Return type:

SanityStateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.TigerPOMDP(discount_factor, name='TigerPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

Tiger POMDP environment implementation.

This is the classic Tiger problem where an agent must decide which door to open to find treasure while avoiding the tiger. The agent can listen for acoustic cues but receives noisy observations.

Problem Structure: - States: tiger_left, tiger_right (tiger’s location) - Actions: listen, open_left, open_right - Observations: hear_left, hear_right, hear_nothing - Rewards: listen(-1), correct_door(+10), wrong_door(-100)

Parameters:
states

List of possible states

actions

List of possible actions

observations

List of possible observations

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> tiger = TigerPOMDP(discount_factor=0.95)
>>>
>>> # Get initial state and actions
>>> initial_state = tiger.initial_state_dist().sample()[0]
>>> actions = tiger.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = tiger.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> tiger.is_terminal(initial_state)
False
cache_history_artifacts(history, cache_path)[source]
Return type:

None

Parameters:
compute_metrics(histories)[source]

Compute Tiger POMDP specific metrics from simulation histories.

Parameters:

histories (List[History]) – List of simulation histories

Return type:

List[MetricValue]

Returns:

List of MetricValue objects containing the computed metrics

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[Any]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

get_metric_names()[source]

Get names of Tiger POMDP specific metrics.

Returns:

success_rate and average_listens

Return type:

List[str]

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (Any) – First observation to compare

  • observation2 (Any) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (str) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (str) – The resulting state after taking an action

  • action (str) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (str) – Current state

  • action (str) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (str) – Current state

  • action (str) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

Subpackages

Submodules

POMDPPlanners.environments.sanity_pomdp module

Sanity Check POMDP Environment Implementation.

This module implements a simple test environment used for debugging and sanity checking POMDP algorithms. The environment has deterministic dynamics and perfect observability, making it ideal for verifying algorithm correctness.

The Sanity POMDP features: - Two discrete states: 0 (good) and 1 (bad) - Two discrete actions: 0 (go to good state) and 1 (go to bad state) - Perfect observations: observation always equals the state - Simple reward structure: 1.0 for good state, 0.0 for bad state - No terminal states (infinite horizon)

This environment is primarily used for: - Testing POMDP algorithm implementations - Debugging belief updates and planning algorithms - Verifying that algorithms can solve trivial cases - Performance benchmarking baseline

Classes:

SanityStateTransitionModel: Deterministic state transitions SanityObservationModel: Perfect state observation SanityInitialStateDist: Always starts in good state SanityInitialObservationDist: Initial observation distribution SanityPOMDP: Main environment class for sanity testing

class POMDPPlanners.environments.sanity_pomdp.SanityInitialObservationDist[source]

Bases: Distribution

Initial observation distribution for Sanity POMDP.

This distribution always returns observation 0 (corresponding to the good state) as the initial observation, maintaining consistency with the initial state distribution and perfect observability property.

Example

Using the initial observation distribution:

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> # Create initial observation distribution
>>> initial_obs_dist = SanityInitialObservationDist()
>>>
>>> # Sample initial observation
>>> initial_obs = initial_obs_dist.sample()[0]  # Returns 0
>>> initial_obs == 0
True
>>>
>>> # Sample multiple observations
>>> observations = initial_obs_dist.sample(n_samples=3)  # Returns [0, 0, 0]
>>> len(observations) == 3
True
>>> all(obs == 0 for obs in observations)
True
>>>
>>> # Check observation probabilities
>>> prob = initial_obs_dist.probability([0])  # Returns [1.0]
>>> bool(prob[0] == 1.0)
True
probability(values)[source]

Calculate probabilities for given values.

Parameters:

values (List[int]) – List of values to calculate probabilities for

Return type:

ndarray

Returns:

Numpy array of probabilities corresponding to input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample initial observations.

Parameters:

n_samples (int) – Number of samples to return

Return type:

List[int]

Returns:

List of initial observations (always [0, 0, …])

class POMDPPlanners.environments.sanity_pomdp.SanityInitialStateDist[source]

Bases: Distribution

Initial state distribution for Sanity POMDP.

This distribution always returns state 0 (good state) as the initial state, providing a deterministic and predictable starting condition for testing.

Example

Using the initial state distribution:

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> # Create initial state distribution
>>> initial_dist = SanityInitialStateDist()
>>>
>>> # Sample initial state (always returns good state)
>>> initial_state = initial_dist.sample()[0]  # Returns 0
>>> initial_state == 0
True
>>>
>>> # Sample multiple initial states
>>> states = initial_dist.sample(n_samples=5)  # Returns [0, 0, 0, 0, 0]
>>> len(states) == 5
True
>>> all(state == 0 for state in states)
True
>>>
>>> # Check probability of initial states
>>> prob_good = initial_dist.probability([0])  # Returns [1.0]
>>> bool(prob_good[0] == 1.0)
True
>>> prob_bad = initial_dist.probability([1])   # Returns [0.0]
>>> bool(prob_bad[0] == 0.0)
True
probability(values)[source]

Calculate probabilities for given values.

Parameters:

values (List[int]) – List of values to calculate probabilities for

Return type:

ndarray

Returns:

Numpy array of probabilities corresponding to input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample initial states.

Parameters:

n_samples (int) – Number of samples to return

Return type:

List[int]

Returns:

List of initial states (always [0, 0, …])

class POMDPPlanners.environments.sanity_pomdp.SanityObservationModel(next_state, action)[source]

Bases: ObservationModel

Perfect observation model for Sanity POMDP.

This model provides perfect observability where the observation always exactly matches the state. This eliminates partial observability and makes the problem fully observable, which is ideal for testing algorithms in the simplest possible setting.

Parameters:
  • next_state (int)

  • action (int)

next_state

The state after action execution

action

The action that was taken (not used in observation generation)

Example

Using the observation model:

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> # Create observation model for good state
>>> obs_model = SanityObservationModel(next_state=0, action=0)
>>>
>>> # Sample observation (always matches state)
>>> observation = obs_model.sample()[0]  # Returns 0
>>> observation == 0
True
>>>
>>> # Check observation probabilities
>>> prob_correct = obs_model.probability([0])  # Returns [1.0]
>>> bool(prob_correct[0] == 1.0)
True
>>> prob_wrong = obs_model.probability([1])  # Returns [0.0]
>>> bool(prob_wrong[0] == 0.0)
True
probability(values)[source]

Calculate observation probabilities for given values.

Parameters:

values (List[int]) – List of observation values to calculate probabilities for

Return type:

ndarray

Returns:

Array of probabilities corresponding to the input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample observations from the observation model.

Parameters:

n_samples (int) – Number of observation samples to generate. Defaults to 1.

Return type:

List[int]

Returns:

List of sampled observations of length n_samples.

Note

Subclasses must implement this method according to their specific observation generation logic.

class POMDPPlanners.environments.sanity_pomdp.SanityPOMDP(discount_factor=0.95, output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

Simple sanity check POMDP environment for testing and debugging.

This environment provides the simplest possible POMDP formulation with deterministic dynamics and perfect observability. It serves as a baseline for testing POMDP algorithms and ensuring correctness.

Problem Structure: - States: 0 (good), 1 (bad) - Actions: 0 (choose good), 1 (choose bad) - Observations: Same as states (perfect observability) - Rewards: 1.0 for good state, 0.0 for bad state - Dynamics: Deterministic state transitions based on action

The optimal policy is trivial: always choose action 0 to stay in the good state.

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = SanityPOMDP(discount_factor=0.95)
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
  • discount_factor (float)

  • output_dir (Path | None)

  • debug (bool)

  • use_queue_logger (bool)

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[int]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

SanityInitialObservationDist

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

SanityInitialStateDist

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (int) – First observation to compare

  • observation2 (int) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (int) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (int) – The resulting state after taking an action

  • action (int) – The action that was executed

Return type:

SanityObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (int) – Current state

  • action (int) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (int) – Current state

  • action (int) – Action to be executed

Return type:

SanityStateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.sanity_pomdp.SanityStateTransitionModel(state, action)[source]

Bases: StateTransitionModel

Deterministic state transition model for Sanity POMDP.

This model implements completely deterministic state transitions where: - Action 0 always leads to state 0 (good state) - Action 1 always leads to state 1 (bad state)

The deterministic nature makes this ideal for testing and debugging POMDP algorithms since the outcomes are predictable.

Parameters:
state

Current state (0 or 1)

action

Action to be executed (0 or 1)

Example

Using the state transition model:

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> # Create transition model from bad state with good action
>>> transition_model = SanityStateTransitionModel(state=1, action=0)
>>>
>>> # Sample next state (always deterministic)
>>> next_state = transition_model.sample()[0]  # Returns 0 (good state)
>>> next_state == 0
True
>>>
>>> # Check probability of specific outcomes
>>> prob = transition_model.probability([0])  # Returns [1.0]
>>> bool(prob[0] == 1.0)
True
>>> prob_wrong = transition_model.probability([1])  # Returns [0.0]
>>> bool(prob_wrong[0] == 0.0)
True
probability(values)[source]

Calculate transition probabilities for given next states.

Parameters:

values (List[int]) – List of next state values to calculate probabilities for

Return type:

ndarray

Returns:

Array of transition probabilities corresponding to the input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample next states from the transition model.

Parameters:

n_samples (int) – Number of next state samples to generate. Defaults to 1.

Return type:

List[int]

Returns:

List of sampled next states of length n_samples.

Note

Subclasses must implement this method according to their specific state transition dynamics.

POMDPPlanners.environments.tiger_pomdp module

Tiger POMDP Environment Implementation.

This module implements the classic Tiger problem, a benchmark POMDP environment where an agent must determine which of two doors conceals a treasure and which conceals a tiger, using only noisy acoustic observations.

The Tiger problem features: - Two doors (left and right) with a tiger behind one and treasure behind the other - Three actions: listen (to get information), open_left, open_right - Three observations: hear_left, hear_right, hear_nothing - Listening provides 85% accurate information about the tiger’s location - Opening the correct door yields +10 reward, opening wrong door yields -100 - Listening costs -1 per action

Classes:

TigerStateTransition: State transition model for the Tiger problem TigerObservation: Observation model with noisy acoustic feedback TigerPOMDP: Main environment class implementing the Tiger problem

class POMDPPlanners.environments.tiger_pomdp.TigerObservation(next_state, action)[source]

Bases: ObservationModel

Observation model for the Tiger POMDP.

Provides noisy acoustic feedback when listening, with 85% accuracy. When doors are opened, no meaningful observation is provided.

Parameters:
  • next_state (str)

  • action (str)

next_state

The state after action execution

action

The action that was taken

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> # Create observation model for listening when tiger is left
>>> obs_listen = TigerObservation(next_state="tiger_left", action="listen")
>>> observation = obs_listen.sample()[0]
>>> observation in ["hear_left", "hear_right"]  # Listen gives acoustic feedback
True
>>> # Create observation model for opening door
>>> obs_open = TigerObservation(next_state="tiger_left", action="open_left")
>>> observation_open = obs_open.sample()[0]
>>> observation_open == "hear_nothing"  # Opening always gives no sound
True
>>> # Check observation probabilities
>>> prob_correct = obs_listen.probability(["hear_left"])
>>> bool(prob_correct[0] == 0.85)  # Correct observation probability
True
>>> prob_wrong = obs_listen.probability(["hear_right"])
>>> bool(prob_wrong[0] == 0.15)  # Wrong observation probability
True
>>> prob_nothing = obs_open.probability(["hear_nothing"])
>>> bool(prob_nothing[0] == 1.0)  # Opening door always gives no sound
True
probability(values)[source]

Calculate observation probabilities for given values.

Parameters:

values (List[Any]) – List of observation values to calculate probabilities for

Return type:

ndarray

Returns:

Array of probabilities corresponding to the input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample observations from the observation model.

Parameters:

n_samples (int) – Number of observation samples to generate. Defaults to 1.

Return type:

List[str]

Returns:

List of sampled observations of length n_samples.

Note

Subclasses must implement this method according to their specific observation generation logic.

class POMDPPlanners.environments.tiger_pomdp.TigerPOMDP(discount_factor, name='TigerPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

Tiger POMDP environment implementation.

This is the classic Tiger problem where an agent must decide which door to open to find treasure while avoiding the tiger. The agent can listen for acoustic cues but receives noisy observations.

Problem Structure: - States: tiger_left, tiger_right (tiger’s location) - Actions: listen, open_left, open_right - Observations: hear_left, hear_right, hear_nothing - Rewards: listen(-1), correct_door(+10), wrong_door(-100)

Parameters:
states

List of possible states

actions

List of possible actions

observations

List of possible observations

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> tiger = TigerPOMDP(discount_factor=0.95)
>>>
>>> # Get initial state and actions
>>> initial_state = tiger.initial_state_dist().sample()[0]
>>> actions = tiger.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = tiger.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> tiger.is_terminal(initial_state)
False
cache_history_artifacts(history, cache_path)[source]
Return type:

None

Parameters:
compute_metrics(histories)[source]

Compute Tiger POMDP specific metrics from simulation histories.

Parameters:

histories (List[History]) – List of simulation histories

Return type:

List[MetricValue]

Returns:

List of MetricValue objects containing the computed metrics

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[Any]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

get_metric_names()[source]

Get names of Tiger POMDP specific metrics.

Returns:

success_rate and average_listens

Return type:

List[str]

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (Any) – First observation to compare

  • observation2 (Any) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (str) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (str) – The resulting state after taking an action

  • action (str) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (str) – Current state

  • action (str) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (str) – Current state

  • action (str) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.tiger_pomdp.TigerPOMDPMetrics(*values)[source]

Bases: Enum

Metric names for Tiger POMDP environment.

AVERAGE_LISTENS = 'average_listens'
SUCCESS_RATE = 'success_rate'
class POMDPPlanners.environments.tiger_pomdp.TigerStateTransition(state, action)[source]

Bases: StateTransitionModel

State transition model for the Tiger POMDP.

The state only changes when a door is opened, after which the tiger is randomly placed behind one of the two doors for the next episode.

Parameters:
state

Current state (tiger_left or tiger_right)

action

Action to be taken (listen, open_left, or open_right)

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> # Create transition model for listening action
>>> transition_listen = TigerStateTransition(state="tiger_left", action="listen")
>>> next_state_listen = transition_listen.sample()[0]
>>> next_state_listen == "tiger_left"  # No state change when listening
True
>>> # Create transition model for opening door
>>> transition_open = TigerStateTransition(state="tiger_left", action="open_left")
>>> next_state_open = transition_open.sample()[0]
>>> next_state_open in ["tiger_left", "tiger_right"]  # Random outcome
True
>>> # Check probabilities for different outcomes
>>> prob_same = transition_listen.probability(["tiger_left"])
>>> bool(prob_same[0] == 1.0)  # Probability remains same when listening
True
>>> prob_random = transition_open.probability(["tiger_left"])
>>> bool(prob_random[0] == 0.5)  # Equal probability when opening
True
probability(values)[source]

Calculate transition probabilities for given next states.

Parameters:

values (List[Any]) – List of next state values to calculate probabilities for

Return type:

ndarray

Returns:

Array of transition probabilities corresponding to the input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample next states from the transition model.

Parameters:

n_samples (int) – Number of next state samples to generate. Defaults to 1.

Return type:

List[str]

Returns:

List of sampled next states of length n_samples.

Note

Subclasses must implement this method according to their specific state transition dynamics.