POMDPPlanners.environments.rock_sample_pomdp package

RockSample POMDP Environment Module.

This module provides the RockSample POMDP environment implementation and related components for robot navigation and sampling tasks.

Classes:

RockSamplePOMDP: Main POMDP environment for rock sampling tasks RockSampleState: State representation with robot position and rock qualities RockSampleStateTransitionModel: State transition model for deterministic movements RockSampleObservationModel: Observation model with distance-dependent sensor noise RockSampleVisualizer: Visualization utilities for RockSample POMDP episodes

class POMDPPlanners.environments.rock_sample_pomdp.RockSampleObservationModel(next_state, action, pomdp)[source]

Bases: ObservationModel

Observation model for RockSample POMDP.

Parameters:
probability(values)[source]

Calculate observation probabilities.

Return type:

ndarray

Parameters:

values (List[str])

sample(n_samples=1)[source]

Sample observations.

Return type:

List[str]

Parameters:

n_samples (int)

class POMDPPlanners.environments.rock_sample_pomdp.RockSamplePOMDP(map_size=(5, 5), rock_positions=None, init_pos=(0, 0), sensor_efficiency=10.0, bad_rock_penalty=-10.0, good_rock_reward=10.0, step_penalty=0.0, sensor_use_penalty=0.0, exit_reward=10.0, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, discount_factor=0.95, name='RockSample', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

RockSample POMDP environment

This environment implements the classic rock sampling problem where a robot must navigate a grid, use sensors to evaluate rocks, and decide which ones to sample while balancing exploration costs and sampling rewards.

Parameters:
map_size

Grid dimensions as (rows, cols)

rock_positions

List of rock positions as (row, col) tuples

init_pos

Initial robot position

sensor_efficiency

Sensor noise parameter (higher = less noise)

bad_rock_penalty

Penalty for sampling a bad rock

good_rock_reward

Reward for sampling a good rock

step_penalty

Cost for each action

sensor_use_penalty

Additional cost for using sensor

exit_reward

Reward for reaching the exit

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = RockSamplePOMDP(map_size=(5, 5), rock_positions=[(0, 0), (2, 2), (3, 3)])
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
cache_visualization(history, cache_path)[source]

Cache visualization of episode history.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics.

Return type:

List[MetricValue]

Parameters:

histories (List[History])

get_actions()[source]

Get all available actions.

Return type:

List[int]

get_metric_names()[source]

Get names of RockSample POMDP specific metrics.

Returns:

avg_rocks_sampled, exit_success_rate, and average_dangerous_area_steps

Return type:

List[str]

initial_observation_dist()[source]

Get initial observation distribution.

Return type:

DiscreteDistribution

initial_state_dist()[source]

Get initial state distribution.

Return type:

DiscreteDistribution

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Return type:

bool

Parameters:
  • observation1 (Any)

  • observation2 (Any)

is_terminal(state)[source]

Check if state is terminal.

Return type:

bool

Parameters:

state (ndarray)

observation_model(next_state, action)[source]

Get observation model.

Return type:

RockSampleObservationModel

Parameters:
reward(state, action)[source]

Calculate immediate reward.

Return type:

float

Parameters:
sample_next_step(state, action)[source]

Override to avoid reward() recomputing next state.

Return type:

Tuple[ndarray, str, float]

Parameters:
state_transition_model(state, action)[source]

Get state transition model.

Return type:

RockSampleStateTransitionModel

Parameters:
visualize_path(path, actions, cache_path)[source]

Visualize robot path through the environment.

Parameters:
  • path (List[ndarray]) – List of states representing the path

  • actions (List[int]) – List of actions taken at each state

  • cache_path (Path) – Path where to save the animation (must end with .gif)

Return type:

None

POMDPPlanners.environments.rock_sample_pomdp.RockSampleState

alias of ndarray

class POMDPPlanners.environments.rock_sample_pomdp.RockSampleStateTransitionModel(state, action, pomdp)[source]

Bases: StateTransitionModel

State transition model for RockSample POMDP.

Parameters:
probability(values)[source]

Calculate transition probabilities for given next states.

Since RockSample has deterministic transitions, the probability is 1.0 for the correct next state and 0.0 for all others.

Parameters:

values (List[ndarray]) – List of next state values to calculate probabilities for

Return type:

ndarray

Returns:

Array of transition probabilities (1.0 for correct state, 0.0 otherwise)

sample(n_samples=1)[source]

Sample next states (deterministic transitions).

Return type:

List[ndarray]

Parameters:

n_samples (int)

class POMDPPlanners.environments.rock_sample_pomdp.RockSampleVectorizedUpdater(map_rows, map_cols, num_rocks, rock_positions, sensor_efficiency)[source]

Bases: VectorizedParticleBeliefUpdater

Vectorized particle belief updater for the RockSample POMDP.

Stores precomputed environment parameters and performs all-particle transitions and observation log-likelihood evaluations using NumPy operations. State layout per particle is [robot_row, robot_col, rock_0_quality, ..., rock_{R-1}_quality].

Parameters:
  • map_rows (int)

  • map_cols (int)

  • num_rocks (int)

  • rock_positions (np.ndarray)

  • sensor_efficiency (float)

map_rows

Number of grid rows.

map_cols

Number of grid columns.

num_rocks

Number of rocks in the environment.

rock_positions

Array of shape (R, 2) with rock (row, col) positions.

sensor_efficiency

Sensor noise parameter (higher = less noise).

batch_observation_log_likelihood(next_particles, action, observation)[source]

Compute observation log-likelihoods for all particles.

Parameters:
  • next_particles (ndarray) – Array of shape (N, 2 + num_rocks).

  • action (ndarray) – Scalar action index.

  • observation (ndarray) – Integer-encoded observation (0=none, 1=good, 2=bad).

Return type:

ndarray

Returns:

Log-likelihoods of shape (N,).

batch_transition(particles, action)[source]

Transition all particles for the given action.

Parameters:
  • particles (ndarray) – Array of shape (N, 2 + num_rocks).

  • action (ndarray) – Scalar action index.

Return type:

ndarray

Returns:

Next-state particles of shape (N, 2 + num_rocks).

property config_id: str

Return a deterministic identifier for this updater configuration.

classmethod from_environment(env)[source]

Construct an updater from a RockSamplePOMDP instance.

Return type:

RockSampleVectorizedUpdater

Parameters:

env (RockSamplePOMDP)

class POMDPPlanners.environments.rock_sample_pomdp.RockSampleVisualizer(env)[source]

Bases: object

Handles visualization and animation for RockSample POMDP environments.

This class encapsulates all visualization logic for RockSample POMDP episodes, creating animated GIFs showing robot movement, rock sampling, sensor checks, dangerous areas, and exit behavior.

Parameters:

env (RockSamplePOMDP)

env

Reference to the RockSamplePOMDP environment instance

map_size

Grid dimensions as (rows, cols)

rock_positions

List of rock positions

action_names

Names of available actions

action_to_vector

Mapping from action indices to direction vectors

dangerous_areas

List of dangerous area center positions

dangerous_area_radius

Radius around dangerous area centers

create_visualization(history, cache_path)[source]

Create animated visualization of a RockSample POMDP episode.

Creates an animated GIF showing the robot navigating, sampling rocks, using sensors, and exiting the grid.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif

  • TypeError – If cache_path is not a Path object or history is invalid

Return type:

None

visualize_path(path, actions, cache_path)[source]

Visualize robot path through the environment.

Parameters:
  • path (List[ndarray]) – List of states representing the path

  • actions (List[int]) – List of actions taken at each state

  • cache_path (Path) – Path where to save the animation

Return type:

None

POMDPPlanners.environments.rock_sample_pomdp.create_random_rock_sample(map_size=7, num_rocks=8, seed=None)[source]

Create a random RockSample instance.

Parameters:
  • map_size (int) – Size of square grid. Defaults to 7.

  • num_rocks (int) – Number of rocks to place. Defaults to 8.

  • seed (Optional[int]) – Random seed. Defaults to None.

Return type:

RockSamplePOMDP

Returns:

Randomly configured RockSample POMDP

POMDPPlanners.environments.rock_sample_pomdp.create_rock_sample_state(robot_pos, rocks)[source]

Create a RockSample state as a numpy array.

Parameters:
  • robot_pos (Tuple[int, int]) – Robot position as (row, col) tuple

  • rocks (Tuple[bool, ...]) – Tuple of booleans indicating rock quality (True=good, False=bad)

Returns:

[robot_row, robot_col, rock_0, rock_1, …, rock_n] where rock values are 1.0 for good (True) and 0.0 for bad (False)

Return type:

ndarray

POMDPPlanners.environments.rock_sample_pomdp.create_rocksample_belief(env, belief_type=BeliefType.VECTORIZED_PARTICLE, n_particles=200, **kwargs)[source]

Create a belief object for the RockSample POMDP.

Parameters:
  • env (RockSamplePOMDP) – RockSample environment instance.

  • belief_type (BeliefType) – Desired belief representation. Supports PARTICLE and VECTORIZED_PARTICLE.

  • n_particles (int) – Number of particles. Defaults to 200.

  • **kwargs (Any) – Reserved for future use.

Return type:

Belief

Returns:

A configured belief object.

Raises:

ValueError – If belief_type is not supported.

POMDPPlanners.environments.rock_sample_pomdp.get_robot_pos(state)[source]

Extract robot position from state array.

Parameters:

state (ndarray) – State array

Return type:

Tuple[int, int]

Returns:

Robot position as (row, col) tuple

POMDPPlanners.environments.rock_sample_pomdp.get_rocks(state)[source]

Extract rock qualities from state array.

Parameters:

state (ndarray) – State array

Return type:

Tuple[bool, ...]

Returns:

Tuple of booleans indicating rock quality

POMDPPlanners.environments.rock_sample_pomdp.states_equal(state1, state2)[source]

Check if two states are equal.

Parameters:
Return type:

bool

Returns:

True if states are equal

Subpackages

Submodules

POMDPPlanners.environments.rock_sample_pomdp.rock_sample_pomdp module

Module for RockSample POMDP environment.

This module provides the RockSample POMDP environment implementation based on the classic rock sampling problem.

The environment involves a robot navigating a grid world with rocks that are either good or bad. The robot must use a noisy sensor to determine rock quality and decide whether to sample them, balancing exploration and exploitation.

Classes:

RockSampleState: Represents the state of the environment RockSamplePOMDP: The main POMDP environment implementation

class POMDPPlanners.environments.rock_sample_pomdp.rock_sample_pomdp.RockSampleObservationModel(next_state, action, pomdp)[source]

Bases: ObservationModel

Observation model for RockSample POMDP.

Parameters:
probability(values)[source]

Calculate observation probabilities.

Return type:

ndarray

Parameters:

values (List[str])

sample(n_samples=1)[source]

Sample observations.

Return type:

List[str]

Parameters:

n_samples (int)

class POMDPPlanners.environments.rock_sample_pomdp.rock_sample_pomdp.RockSamplePOMDP(map_size=(5, 5), rock_positions=None, init_pos=(0, 0), sensor_efficiency=10.0, bad_rock_penalty=-10.0, good_rock_reward=10.0, step_penalty=0.0, sensor_use_penalty=0.0, exit_reward=10.0, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, discount_factor=0.95, name='RockSample', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

RockSample POMDP environment

This environment implements the classic rock sampling problem where a robot must navigate a grid, use sensors to evaluate rocks, and decide which ones to sample while balancing exploration costs and sampling rewards.

Parameters:
map_size

Grid dimensions as (rows, cols)

rock_positions

List of rock positions as (row, col) tuples

init_pos

Initial robot position

sensor_efficiency

Sensor noise parameter (higher = less noise)

bad_rock_penalty

Penalty for sampling a bad rock

good_rock_reward

Reward for sampling a good rock

step_penalty

Cost for each action

sensor_use_penalty

Additional cost for using sensor

exit_reward

Reward for reaching the exit

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = RockSamplePOMDP(map_size=(5, 5), rock_positions=[(0, 0), (2, 2), (3, 3)])
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
cache_visualization(history, cache_path)[source]

Cache visualization of episode history.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics.

Return type:

List[MetricValue]

Parameters:

histories (List[History])

get_actions()[source]

Get all available actions.

Return type:

List[int]

get_metric_names()[source]

Get names of RockSample POMDP specific metrics.

Returns:

avg_rocks_sampled, exit_success_rate, and average_dangerous_area_steps

Return type:

List[str]

initial_observation_dist()[source]

Get initial observation distribution.

Return type:

DiscreteDistribution

initial_state_dist()[source]

Get initial state distribution.

Return type:

DiscreteDistribution

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Return type:

bool

Parameters:
  • observation1 (Any)

  • observation2 (Any)

is_terminal(state)[source]

Check if state is terminal.

Return type:

bool

Parameters:

state (ndarray)

observation_model(next_state, action)[source]

Get observation model.

Return type:

RockSampleObservationModel

Parameters:
reward(state, action)[source]

Calculate immediate reward.

Return type:

float

Parameters:
sample_next_step(state, action)[source]

Override to avoid reward() recomputing next state.

Return type:

Tuple[ndarray, str, float]

Parameters:
state_transition_model(state, action)[source]

Get state transition model.

Return type:

RockSampleStateTransitionModel

Parameters:
visualize_path(path, actions, cache_path)[source]

Visualize robot path through the environment.

Parameters:
  • path (List[ndarray]) – List of states representing the path

  • actions (List[int]) – List of actions taken at each state

  • cache_path (Path) – Path where to save the animation (must end with .gif)

Return type:

None

class POMDPPlanners.environments.rock_sample_pomdp.rock_sample_pomdp.RockSamplePOMDPMetrics(*values)[source]

Bases: Enum

Metric names for RockSample POMDP environment.

AVERAGE_DANGEROUS_AREA_STEPS = 'average_dangerous_area_steps'
AVG_ROCKS_SAMPLED = 'avg_rocks_sampled'
EXIT_SUCCESS_RATE = 'exit_success_rate'
class POMDPPlanners.environments.rock_sample_pomdp.rock_sample_pomdp.RockSampleStateTransitionModel(state, action, pomdp)[source]

Bases: StateTransitionModel

State transition model for RockSample POMDP.

Parameters:
probability(values)[source]

Calculate transition probabilities for given next states.

Since RockSample has deterministic transitions, the probability is 1.0 for the correct next state and 0.0 for all others.

Parameters:

values (List[ndarray]) – List of next state values to calculate probabilities for

Return type:

ndarray

Returns:

Array of transition probabilities (1.0 for correct state, 0.0 otherwise)

sample(n_samples=1)[source]

Sample next states (deterministic transitions).

Return type:

List[ndarray]

Parameters:

n_samples (int)

POMDPPlanners.environments.rock_sample_pomdp.rock_sample_pomdp.create_random_rock_sample(map_size=7, num_rocks=8, seed=None)[source]

Create a random RockSample instance.

Parameters:
  • map_size (int) – Size of square grid. Defaults to 7.

  • num_rocks (int) – Number of rocks to place. Defaults to 8.

  • seed (Optional[int]) – Random seed. Defaults to None.

Return type:

RockSamplePOMDP

Returns:

Randomly configured RockSample POMDP

POMDPPlanners.environments.rock_sample_pomdp.rock_sample_pomdp.create_rock_sample_state(robot_pos, rocks)[source]

Create a RockSample state as a numpy array.

Parameters:
  • robot_pos (Tuple[int, int]) – Robot position as (row, col) tuple

  • rocks (Tuple[bool, ...]) – Tuple of booleans indicating rock quality (True=good, False=bad)

Returns:

[robot_row, robot_col, rock_0, rock_1, …, rock_n] where rock values are 1.0 for good (True) and 0.0 for bad (False)

Return type:

ndarray

POMDPPlanners.environments.rock_sample_pomdp.rock_sample_pomdp.get_robot_pos(state)[source]

Extract robot position from state array.

Parameters:

state (ndarray) – State array

Return type:

Tuple[int, int]

Returns:

Robot position as (row, col) tuple

POMDPPlanners.environments.rock_sample_pomdp.rock_sample_pomdp.get_rocks(state)[source]

Extract rock qualities from state array.

Parameters:

state (ndarray) – State array

Return type:

Tuple[bool, ...]

Returns:

Tuple of booleans indicating rock quality

POMDPPlanners.environments.rock_sample_pomdp.rock_sample_pomdp.states_equal(state1, state2)[source]

Check if two states are equal.

Parameters:
Return type:

bool

Returns:

True if states are equal

POMDPPlanners.environments.rock_sample_pomdp.rock_sample_visualizer module

Visualization module for RockSample POMDP Environment.

This module provides visualization capabilities for RockSample POMDP episodes, creating animated GIFs showing robot movement, rock sampling, sensor usage, and exit behavior.

Classes:

RockSampleVisualizer: Handles all visualization logic for RockSample POMDP

class POMDPPlanners.environments.rock_sample_pomdp.rock_sample_visualizer.RockSampleVisualizer(env)[source]

Bases: object

Handles visualization and animation for RockSample POMDP environments.

This class encapsulates all visualization logic for RockSample POMDP episodes, creating animated GIFs showing robot movement, rock sampling, sensor checks, dangerous areas, and exit behavior.

Parameters:

env (RockSamplePOMDP)

env

Reference to the RockSamplePOMDP environment instance

map_size

Grid dimensions as (rows, cols)

rock_positions

List of rock positions

action_names

Names of available actions

action_to_vector

Mapping from action indices to direction vectors

dangerous_areas

List of dangerous area center positions

dangerous_area_radius

Radius around dangerous area centers

create_visualization(history, cache_path)[source]

Create animated visualization of a RockSample POMDP episode.

Creates an animated GIF showing the robot navigating, sampling rocks, using sensors, and exiting the grid.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif

  • TypeError – If cache_path is not a Path object or history is invalid

Return type:

None

visualize_path(path, actions, cache_path)[source]

Visualize robot path through the environment.

Parameters:
  • path (List[ndarray]) – List of states representing the path

  • actions (List[int]) – List of actions taken at each state

  • cache_path (Path) – Path where to save the animation

Return type:

None