POMDPPlanners.environments.push_pomdp package

Push POMDP Environment Module.

This module provides the Push POMDP environment implementation and related components for robotic manipulation tasks.

Classes:

PushPOMDP: Main POMDP environment for robotic push tasks PushStateTransition: State transition model with physics-based pushing PushObservation: Observation model with noisy position measurements PushPOMDPVisualizer: Visualization utilities for Push POMDP episodes ContinuousPushPOMDP: Continuous-action Push POMDP environment ContinuousPushPOMDPDiscreteActions: Discrete-action wrapper ContinuousPushStateTransitionModel: Continuous push state transition ContinuousPushObservationModel: Continuous push observation model

class POMDPPlanners.environments.push_pomdp.ContinuousPushObservationModel(next_state, action, observation_noise, grid_size)[source]

Bases: ObservationModel

Noisy observation model for Continuous Push POMDP.

Robot and target positions are observed exactly; object position is observed with additive Gaussian noise.

Observation format: [robot_x, robot_y, noisy_obj_x, noisy_obj_y, target_x, target_y]

Parameters:
next_state

True state after action.

action

Action taken (2D vector).

observation_noise

Standard deviation of object position noise.

grid_size

Grid dimension.

Example

>>> import numpy as np
>>> np.random.seed(42)
>>> state = np.array([3.0, 3.0, 2.8, 3.2, 8.0, 8.0])
>>> action = np.array([1.0, 0.0])
>>> obs_model = ContinuousPushObservationModel(
...     next_state=state, action=action,
...     observation_noise=0.1, grid_size=10,
... )
>>> obs = obs_model.sample()[0]
>>> len(obs) == 6
True
>>> bool(obs[0] == 3.0)  # Robot exact
True
probability(values)[source]

Calculate observation probabilities for given values.

Parameters:

values (List[Any]) – List of observation values to calculate probabilities for

Return type:

ndarray

Returns:

Array of probabilities corresponding to the input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample observations from the observation model.

Parameters:

n_samples (int) – Number of observation samples to generate. Defaults to 1.

Return type:

List[ndarray]

Returns:

List of sampled observations of length n_samples.

Note

Subclasses must implement this method according to their specific observation generation logic.

class POMDPPlanners.environments.push_pomdp.ContinuousPushPOMDP(discount_factor, grid_size=10, push_threshold=1.0, friction_coefficient=0.3, max_push=2.0, observation_noise=0.1, obstacles=None, obstacle_penalty=-10.0, robot_radius=0.3, state_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), initial_state=None, name='ContinuousPushPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: Environment

Continuous-action Push POMDP environment.

A robot (circle) must push an object (point) to a target location on a 2D grid. The robot moves via continuous 2D action vectors with Gaussian noise; obstacles are axis-aligned squares.

State: [robot_x, robot_y, object_x, object_y, target_x, target_y] Actions: 2D numpy vectors Observations: [robot_x, robot_y, noisy_obj_x, noisy_obj_y, target_x, target_y]

Example

>>> import numpy as np
>>> np.random.seed(42)
>>>
>>> env = ContinuousPushPOMDP(discount_factor=0.99)
>>>
>>> initial_state = env.initial_state_dist().sample()[0]
>>>
>>> action = np.array([1.0, 0.0])
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> env.is_terminal(initial_state)
False
Parameters:
cache_visualization(history, cache_path)[source]

Cache animated visualization of the continuous push episode.

Creates an animated GIF showing the robot pushing the object toward the target, with rectangular obstacles, collision detection, distance indicators, and success feedback.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards.

  • cache_path (Path) – Path where to save the visualization (must end with .gif).

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif.

  • TypeError – If cache_path is not a Path object.

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics from episode histories.

This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.

Parameters:

histories (List[History]) – List of episode histories to analyze

Return type:

List[MetricValue]

Returns:

List of computed metrics with confidence intervals

get_metric_names()[source]

Get names of Continuous Push POMDP specific metrics.

Return type:

List[str]

Returns:

List of metric name strings.

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (ndarray) – First observation to compare

  • observation2 (ndarray) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (ndarray) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (ndarray) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (ndarray) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (ndarray) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (ndarray) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.push_pomdp.ContinuousPushPOMDPDiscreteActions(discount_factor, grid_size=10, push_threshold=1.0, friction_coefficient=0.3, max_push=2.0, observation_noise=0.1, obstacles=None, obstacle_penalty=-10.0, robot_radius=0.3, state_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), initial_state=None, name='ContinuousPushPOMDPDiscreteActions', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: ContinuousPushPOMDP, DiscreteActionsEnvironment

Discrete-action wrapper for the Continuous Push POMDP.

Maps string actions ["up", "down", "right", "left"] to unit vectors and delegates to the continuous parent.

Example

>>> import numpy as np
>>> np.random.seed(42)
>>>
>>> env = ContinuousPushPOMDPDiscreteActions(discount_factor=0.99)
>>>
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> env.is_terminal(initial_state)
False
Parameters:
get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[str]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (Any) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (Any) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.push_pomdp.ContinuousPushPOMDPVisualizer(env)[source]

Bases: object

Handles visualization and animation for Continuous Push POMDP environments.

This class encapsulates all visualization logic for Continuous Push POMDP episodes, creating animated GIFs showing robot movement (with circular body), object pushing, rectangular obstacle collisions, and task completion.

Parameters:

env (ContinuousPushPOMDP)

env

Reference to the ContinuousPushPOMDP environment instance.

grid_size

Size of the grid environment.

push_threshold

Distance threshold for robot to push object.

obstacles

Shape (M, 4) AABB array (cx, cy, hx, hy).

robot_radius

Radius of the robot body.

create_visualization(history, cache_path)[source]

Create animated visualization of a Continuous Push POMDP episode.

Creates an animated GIF showing the robot pushing the object toward the target, with rectangular obstacles, collision detection, distance indicators, and success feedback.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards.

  • cache_path (Path) – Path where to save the visualization (must end with .gif).

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif.

  • TypeError – If cache_path is not a Path object.

Return type:

None

class POMDPPlanners.environments.push_pomdp.ContinuousPushStateTransitionModel(state, action, state_transition_dist, grid_size, push_threshold, friction_coefficient, max_push, obstacles, robot_radius)[source]

Bases: StateTransitionModel

State transition model for Continuous Push POMDP.

Implements continuous robot movement with Gaussian noise and capped push mechanics. The robot is a circle; the object is a point. Both interact with axis-aligned square obstacles.

State representation: [robot_x, robot_y, object_x, object_y, target_x, target_y]

Parameters:
state

Current state vector.

action

2D movement vector.

grid_size

Grid dimension.

push_threshold

Maximum robot-object distance for a push.

friction_coefficient

Friction reducing push force (0-1).

max_push

Maximum push magnitude.

obstacles

Shape (M, 4) AABB array.

robot_radius

Robot body radius.

Example

>>> import numpy as np
>>> np.random.seed(42)
>>> state = np.array([2.0, 3.0, 2.5, 3.1, 8.0, 8.0])
>>> action = np.array([1.0, 0.0])
>>> cov = np.eye(2) * 0.01
>>> dist = CovarianceParameterizedMultivariateNormal(cov)
>>> transition = ContinuousPushStateTransitionModel(
...     state=state, action=action,
...     state_transition_dist=dist, grid_size=10,
...     push_threshold=1.0, friction_coefficient=0.3,
...     max_push=2.0, obstacles=np.empty((0, 4)),
...     robot_radius=0.3,
... )
>>> next_state = transition.sample()[0]
>>> len(next_state) == 6
True
probability(values)[source]

Calculate transition probabilities for given next states.

Parameters:

values (List[ndarray]) – List of next state values to calculate probabilities for

Return type:

ndarray

Returns:

Array of transition probabilities corresponding to the input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample next states from the transition model.

Parameters:

n_samples (int) – Number of next state samples to generate. Defaults to 1.

Return type:

List[ndarray]

Returns:

List of sampled next states of length n_samples.

Note

Subclasses must implement this method according to their specific state transition dynamics.

class POMDPPlanners.environments.push_pomdp.PushObservation(next_state, action, observation_noise, grid_size)[source]

Bases: ObservationModel

Noisy observation model for Push POMDP.

This model provides partial observability by adding Gaussian noise to the object’s position while keeping robot and target positions fully observable. This creates uncertainty about the exact object location, making planning challenging.

Observation format: [robot_x, robot_y, noisy_object_x, noisy_object_y, target_x, target_y]

Parameters:
next_state

True state after action execution

action

Action that was taken (not used in observation generation)

observation_noise

Standard deviation of Gaussian noise for object position

grid_size

Size of the grid environment

robot_pos

Robot position (observed exactly)

object_pos

True object position (observed with noise)

target_pos

Target position (observed exactly)

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> # True state after robot movement
>>> true_state = np.array([3.0, 3.0, 2.8, 3.2, 8.0, 8.0])
>>> action = "right"
>>> # Create observation model
>>> obs_model = PushObservation(
...     next_state=true_state,
...     action=action,
...     observation_noise=0.1,
...     grid_size=10
... )
>>> # Sample noisy observation
>>> observation = obs_model.sample()[0]
>>> len(observation) == 6  # [robot_x, robot_y, noisy_obj_x, noisy_obj_y, target_x, target_y]
True
>>> bool(observation[0] == 3.0)  # Robot position exact
True
>>> bool(observation[1] == 3.0)  # Robot position exact
True
>>> bool(observation[4] == 8.0)  # Target position exact
True
>>> # Calculate observation probability
>>> prob = obs_model.probability([observation])
>>> len(prob) == 1
True
probability(values)[source]

Calculate observation probabilities for given values.

Parameters:

values (List[Any]) – List of observation values to calculate probabilities for

Return type:

ndarray

Returns:

Array of probabilities corresponding to the input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample observations from the observation model.

Parameters:

n_samples (int) – Number of observation samples to generate. Defaults to 1.

Return type:

List[Any]

Returns:

List of sampled observations of length n_samples.

Note

Subclasses must implement this method according to their specific observation generation logic.

class POMDPPlanners.environments.push_pomdp.PushPOMDP(discount_factor, grid_size=10, push_threshold=1.0, friction_coefficient=0.3, observation_noise=0.1, obstacles=None, obstacle_radius=0.5, obstacle_penalty=-10.0, initial_state=None, transition_error_prob=0.0, name='PushPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

Robotic push task formulated as a POMDP.

This environment simulates a robot that must push an object to a target location on a 2D grid. The robot can move in four directions and pushes objects when close enough, with partial observability through noisy object position measurements.

Problem Structure: - State: [robot_x, robot_y, object_x, object_y, target_x, target_y] (continuous) - Actions: [“up”, “down”, “left”, “right”] (discrete) - Observations: [robot_x, robot_y, noisy_object_x, noisy_object_y, target_x, target_y] - Rewards: -distance_to_target + 100 (when object reaches target) - Termination: Object within 0.5 units of target position

Key Features: - Physics-based pushing with configurable friction - Distance-based pushing threshold - Noisy observations of object position only - Dense reward signal based on object-target distance - Obstacle collision detection with configurable penalties - Obstacles prevent robot and object movement through them

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = PushPOMDP(discount_factor=0.99)
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
cache_visualization(history, cache_path)[source]

Cache animated visualization of the push episode.

Creates an animated GIF showing the robot pushing the object toward the target, with obstacles, collision detection, distance indicators, and success feedback.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif

  • TypeError – If cache_path is not a Path object

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics from episode histories.

This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.

Parameters:

histories (List[History]) – List of episode histories to analyze

Return type:

List[MetricValue]

Returns:

List of computed metrics with confidence intervals

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[str]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

get_metric_names()[source]

Get names of Push POMDP specific metrics.

Return type:

List[str]

Returns:

List containing collision-related metric names

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (ndarray) – First observation to compare

  • observation2 (ndarray) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (ndarray) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (str) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (str) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

sample_next_step(state, action)[source]

Sample a complete state transition step.

This convenience method combines state transition, observation generation, and reward calculation in a single operation.

Parameters:
  • state (Any) – Current state

  • action (Any) – Action to execute

Returns:

  • next_state: Sampled next state

  • next_observation: Sampled observation

  • reward: Immediate reward

Return type:

Tuple[Any, Any, float]

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (str) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.push_pomdp.PushPOMDPVisualizer(env)[source]

Bases: object

Handles visualization and animation for Push POMDP environments.

This class encapsulates all visualization logic for Push POMDP episodes, creating animated GIFs showing robot movement, object pushing, obstacle collisions, and task completion.

Parameters:

env (PushPOMDP)

env

Reference to the PushPOMDP environment instance

grid_size

Size of the grid environment

push_threshold

Distance threshold for robot to push object

obstacles

List of obstacle positions

obstacle_radius

Radius of obstacles for collision detection

create_visualization(history, cache_path)[source]

Create animated visualization of a Push POMDP episode.

Creates an animated GIF showing the robot pushing the object toward the target, with obstacles, collision detection, distance indicators, and success feedback.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif

  • TypeError – If cache_path is not a Path object

Return type:

None

class POMDPPlanners.environments.push_pomdp.PushStateTransition(state, action, grid_size, push_threshold, friction_coefficient, obstacles=None, obstacle_radius=0.5, transition_error_prob=0.0)[source]

Bases: StateTransitionModel

State transition model for Push POMDP with physics-based pushing.

This model implements robot movement and object pushing dynamics on a 2D grid. The robot moves according to discrete actions, and can push objects when within the push threshold distance. Friction reduces push effectiveness.

State representation: [robot_x, robot_y, object_x, object_y, target_x, target_y]

Parameters:
state

Current state vector containing all entity positions

action

Movement action (“up”, “down”, “left”, “right”)

grid_size

Size of the grid environment

push_threshold

Maximum distance for robot to push object

friction_coefficient

Friction that reduces push force (0=no friction, 1=max friction)

robot_pos

Current robot position [x, y]

object_pos

Current object position [x, y]

target_pos

Target position [x, y] (fixed)

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> # Define state: [robot_x, robot_y, object_x, object_y, target_x, target_y]
>>> state = np.array([2.0, 3.0, 2.5, 3.1, 8.0, 8.0])
>>> action = "right"  # Move robot right
>>> # Create transition model
>>> transition = PushStateTransition(
...     state=state,
...     action=action,
...     grid_size=10,
...     push_threshold=1.0,
...     friction_coefficient=0.3
... )
>>> # Simulate step
>>> next_state = transition.sample()[0]
>>> len(next_state) == 6  # [robot_x, robot_y, object_x, object_y, target_x, target_y]
True
>>> isinstance(next_state, np.ndarray)
True
>>> bool(next_state[0] > state[0])  # Robot moved right
True
probability(values)[source]

Calculate probability of transitioning to given next states.

Accounts for transition error probability. With probability (1-p), the intended action executes. With probability p, a random error action (excluding the intended one) is selected uniformly.

Parameters:

values (List[Any]) – List of potential next states to evaluate

Return type:

ndarray

Returns:

Array of probabilities for each state in values

sample(n_samples=1)[source]

Sample next states from the transition model.

Parameters:

n_samples (int) – Number of next state samples to generate. Defaults to 1.

Return type:

List[Any]

Returns:

List of sampled next states of length n_samples.

Note

Subclasses must implement this method according to their specific state transition dynamics.

Subpackages

Submodules

POMDPPlanners.environments.push_pomdp.continuous_push_geometry module

Geometry utilities for the Continuous Push POMDP.

Provides circle-AABB overlap tests, point-in-AABB tests, collision resolution and grid clamping used by the continuous push environment and its vectorized belief updater.

Wall AABBs (obstacles) are stored as rows (cx, cy, hx, hy) where (cx, cy) is the center and (hx, hy) the half-extents. For square obstacles hx == hy.

Functions:

circle_aabb_overlap: Boolean circle-AABB overlap test. point_inside_aabb: Boolean point-inside-AABB test. resolve_circle_wall_collision: Push a circle out of overlapping AABBs. clamp_circle_to_grid: Clamp a circle center so the circle stays in-bounds. clamp_point_to_grid: Clamp a point to [0, grid_size-1]. batch_resolve_circle_wall_collision: Vectorized circle-AABB resolution. batch_clamp_circle_to_grid: Vectorized circle grid clamping. batch_point_inside_aabb: Vectorized point-inside-AABB test. batch_clamp_point_to_grid: Vectorized point grid clamping.

POMDPPlanners.environments.push_pomdp.continuous_push_geometry.batch_clamp_circle_to_grid(positions, radius, grid_size)[source]

Clamp an array of circle centers so circles stay in-bounds.

Parameters:
  • positions (ndarray) – Shape (N, 2).

  • radius (float) – Circle radius.

  • grid_size (float) – Grid dimension.

Return type:

ndarray

Returns:

Shape (N, 2) clamped positions.

POMDPPlanners.environments.push_pomdp.continuous_push_geometry.batch_clamp_point_to_grid(positions, grid_size)[source]

Clamp an array of points to [0, grid_size-1].

Parameters:
  • positions (ndarray) – Shape (N, 2).

  • grid_size (float) – Grid dimension.

Return type:

ndarray

Returns:

Shape (N, 2) clamped positions.

POMDPPlanners.environments.push_pomdp.continuous_push_geometry.batch_point_inside_aabb(points, walls)[source]

Test whether each point lies inside any AABB.

Parameters:
  • points (ndarray) – Shape (N, 2).

  • walls (ndarray) – Shape (M, 4) – AABBs.

Return type:

ndarray

Returns:

Shape (N,) boolean array – True where the point is inside at least one AABB.

POMDPPlanners.environments.push_pomdp.continuous_push_geometry.batch_resolve_circle_wall_collision(positions, radius, walls)[source]

Resolve circle-AABB collisions for an array of positions.

Parameters:
  • positions (ndarray) – Shape (N, 2).

  • radius (float) – Circle radius.

  • walls (ndarray) – Shape (M, 4) – wall AABBs.

Return type:

ndarray

Returns:

Shape (N, 2) resolved positions.

POMDPPlanners.environments.push_pomdp.continuous_push_geometry.circle_aabb_overlap(center, radius, wall)[source]

Test whether a circle overlaps an axis-aligned bounding box.

Parameters:
  • center (ndarray) – Shape (2,) – circle center (x, y).

  • radius (float) – Circle radius.

  • wall (ndarray) – Shape (4,) – AABB (cx, cy, hx, hy).

Return type:

bool

Returns:

True if the circle and AABB overlap.

POMDPPlanners.environments.push_pomdp.continuous_push_geometry.clamp_circle_to_grid(pos, radius, grid_size)[source]

Clamp a circle center so the full circle stays within [0, grid_size-1].

Parameters:
  • pos (ndarray) – Shape (2,) – circle center.

  • radius (float) – Circle radius.

  • grid_size (float) – Grid dimension (positions valid in [0, grid_size-1]).

Return type:

ndarray

Returns:

Clamped position as shape (2,) array.

POMDPPlanners.environments.push_pomdp.continuous_push_geometry.clamp_point_to_grid(pos, grid_size)[source]

Clamp a point to [0, grid_size-1].

Parameters:
  • pos (ndarray) – Shape (2,) – point position.

  • grid_size (float) – Grid dimension.

Return type:

ndarray

Returns:

Clamped position as shape (2,) array.

POMDPPlanners.environments.push_pomdp.continuous_push_geometry.point_inside_aabb(point, wall)[source]

Test whether a point lies inside an axis-aligned bounding box.

Parameters:
  • point (ndarray) – Shape (2,) – point (x, y).

  • wall (ndarray) – Shape (4,) – AABB (cx, cy, hx, hy).

Return type:

bool

Returns:

True if the point is inside the AABB.

POMDPPlanners.environments.push_pomdp.continuous_push_geometry.resolve_circle_wall_collision(pos, radius, walls)[source]

Push a circular entity out of any overlapping wall AABBs.

For each wall, if the entity circle overlaps the AABB, the entity is pushed along the axis of minimum penetration.

Parameters:
  • pos (ndarray) – Shape (2,) – entity center.

  • radius (float) – Entity body radius.

  • walls (ndarray) – Shape (M, 4) – wall AABBs.

Return type:

ndarray

Returns:

Resolved position as shape (2,) array.

POMDPPlanners.environments.push_pomdp.continuous_push_pomdp module

Continuous Push POMDP Environment Implementation.

This module implements a continuous-action variant of the Push POMDP where the robot moves via 2D action vectors with Gaussian noise, has a configurable radius, and obstacles are axis-aligned squares.

The Continuous Push POMDP features: - Continuous 2D state space: [robot_x, robot_y, object_x, object_y, target_x, target_y] - Continuous action space (2D movement vectors) - Robot modelled as a circle with configurable radius - Object modelled as a point - Square obstacles defined as axis-aligned bounding boxes - Gaussian transition noise on robot movement - Capped push force with friction - Noisy observations of object position

Classes:

ContinuousPushStateTransitionModel: Continuous movement with noise and push. ContinuousPushObservationModel: Noisy object position observations. ContinuousPushPOMDP: Main environment with continuous actions. ContinuousPushPOMDPDiscreteActions: Discrete action wrapper.

class POMDPPlanners.environments.push_pomdp.continuous_push_pomdp.ContinuousPushObservationModel(next_state, action, observation_noise, grid_size)[source]

Bases: ObservationModel

Noisy observation model for Continuous Push POMDP.

Robot and target positions are observed exactly; object position is observed with additive Gaussian noise.

Observation format: [robot_x, robot_y, noisy_obj_x, noisy_obj_y, target_x, target_y]

Parameters:
next_state

True state after action.

action

Action taken (2D vector).

observation_noise

Standard deviation of object position noise.

grid_size

Grid dimension.

Example

>>> import numpy as np
>>> np.random.seed(42)
>>> state = np.array([3.0, 3.0, 2.8, 3.2, 8.0, 8.0])
>>> action = np.array([1.0, 0.0])
>>> obs_model = ContinuousPushObservationModel(
...     next_state=state, action=action,
...     observation_noise=0.1, grid_size=10,
... )
>>> obs = obs_model.sample()[0]
>>> len(obs) == 6
True
>>> bool(obs[0] == 3.0)  # Robot exact
True
probability(values)[source]

Calculate observation probabilities for given values.

Parameters:

values (List[Any]) – List of observation values to calculate probabilities for

Return type:

ndarray

Returns:

Array of probabilities corresponding to the input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample observations from the observation model.

Parameters:

n_samples (int) – Number of observation samples to generate. Defaults to 1.

Return type:

List[ndarray]

Returns:

List of sampled observations of length n_samples.

Note

Subclasses must implement this method according to their specific observation generation logic.

class POMDPPlanners.environments.push_pomdp.continuous_push_pomdp.ContinuousPushPOMDP(discount_factor, grid_size=10, push_threshold=1.0, friction_coefficient=0.3, max_push=2.0, observation_noise=0.1, obstacles=None, obstacle_penalty=-10.0, robot_radius=0.3, state_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), initial_state=None, name='ContinuousPushPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: Environment

Continuous-action Push POMDP environment.

A robot (circle) must push an object (point) to a target location on a 2D grid. The robot moves via continuous 2D action vectors with Gaussian noise; obstacles are axis-aligned squares.

State: [robot_x, robot_y, object_x, object_y, target_x, target_y] Actions: 2D numpy vectors Observations: [robot_x, robot_y, noisy_obj_x, noisy_obj_y, target_x, target_y]

Example

>>> import numpy as np
>>> np.random.seed(42)
>>>
>>> env = ContinuousPushPOMDP(discount_factor=0.99)
>>>
>>> initial_state = env.initial_state_dist().sample()[0]
>>>
>>> action = np.array([1.0, 0.0])
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> env.is_terminal(initial_state)
False
Parameters:
cache_visualization(history, cache_path)[source]

Cache animated visualization of the continuous push episode.

Creates an animated GIF showing the robot pushing the object toward the target, with rectangular obstacles, collision detection, distance indicators, and success feedback.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards.

  • cache_path (Path) – Path where to save the visualization (must end with .gif).

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif.

  • TypeError – If cache_path is not a Path object.

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics from episode histories.

This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.

Parameters:

histories (List[History]) – List of episode histories to analyze

Return type:

List[MetricValue]

Returns:

List of computed metrics with confidence intervals

get_metric_names()[source]

Get names of Continuous Push POMDP specific metrics.

Return type:

List[str]

Returns:

List of metric name strings.

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (ndarray) – First observation to compare

  • observation2 (ndarray) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (ndarray) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (ndarray) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (ndarray) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (ndarray) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (ndarray) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.push_pomdp.continuous_push_pomdp.ContinuousPushPOMDPDiscreteActions(discount_factor, grid_size=10, push_threshold=1.0, friction_coefficient=0.3, max_push=2.0, observation_noise=0.1, obstacles=None, obstacle_penalty=-10.0, robot_radius=0.3, state_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), initial_state=None, name='ContinuousPushPOMDPDiscreteActions', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: ContinuousPushPOMDP, DiscreteActionsEnvironment

Discrete-action wrapper for the Continuous Push POMDP.

Maps string actions ["up", "down", "right", "left"] to unit vectors and delegates to the continuous parent.

Example

>>> import numpy as np
>>> np.random.seed(42)
>>>
>>> env = ContinuousPushPOMDPDiscreteActions(discount_factor=0.99)
>>>
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> env.is_terminal(initial_state)
False
Parameters:
get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[str]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (Any) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (Any) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.push_pomdp.continuous_push_pomdp.ContinuousPushPOMDPMetrics(*values)[source]

Bases: Enum

Metric names for Continuous Push POMDP environment.

GOAL_REACHING_RATE = 'goal_reaching_rate'
OBJECT_OBSTACLE_COLLISION_RATE = 'object_obstacle_collision_rate'
ROBOT_OBSTACLE_COLLISION_RATE = 'robot_obstacle_collision_rate'
TOTAL_ALL_OBSTACLE_COLLISIONS = 'total_all_obstacle_collisions'
TOTAL_OBJECT_OBSTACLE_COLLISIONS = 'total_object_obstacle_collisions'
TOTAL_OBSTACLE_COLLISION_RATE = 'total_obstacle_collision_rate'
TOTAL_ROBOT_OBSTACLE_COLLISIONS = 'total_robot_obstacle_collisions'
class POMDPPlanners.environments.push_pomdp.continuous_push_pomdp.ContinuousPushStateTransitionModel(state, action, state_transition_dist, grid_size, push_threshold, friction_coefficient, max_push, obstacles, robot_radius)[source]

Bases: StateTransitionModel

State transition model for Continuous Push POMDP.

Implements continuous robot movement with Gaussian noise and capped push mechanics. The robot is a circle; the object is a point. Both interact with axis-aligned square obstacles.

State representation: [robot_x, robot_y, object_x, object_y, target_x, target_y]

Parameters:
state

Current state vector.

action

2D movement vector.

grid_size

Grid dimension.

push_threshold

Maximum robot-object distance for a push.

friction_coefficient

Friction reducing push force (0-1).

max_push

Maximum push magnitude.

obstacles

Shape (M, 4) AABB array.

robot_radius

Robot body radius.

Example

>>> import numpy as np
>>> np.random.seed(42)
>>> state = np.array([2.0, 3.0, 2.5, 3.1, 8.0, 8.0])
>>> action = np.array([1.0, 0.0])
>>> cov = np.eye(2) * 0.01
>>> dist = CovarianceParameterizedMultivariateNormal(cov)
>>> transition = ContinuousPushStateTransitionModel(
...     state=state, action=action,
...     state_transition_dist=dist, grid_size=10,
...     push_threshold=1.0, friction_coefficient=0.3,
...     max_push=2.0, obstacles=np.empty((0, 4)),
...     robot_radius=0.3,
... )
>>> next_state = transition.sample()[0]
>>> len(next_state) == 6
True
probability(values)[source]

Calculate transition probabilities for given next states.

Parameters:

values (List[ndarray]) – List of next state values to calculate probabilities for

Return type:

ndarray

Returns:

Array of transition probabilities corresponding to the input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample next states from the transition model.

Parameters:

n_samples (int) – Number of next state samples to generate. Defaults to 1.

Return type:

List[ndarray]

Returns:

List of sampled next states of length n_samples.

Note

Subclasses must implement this method according to their specific state transition dynamics.

POMDPPlanners.environments.push_pomdp.continuous_push_pomdp_visualizer module

Visualization module for Continuous Push POMDP Environment.

This module provides visualization capabilities for Continuous Push POMDP episodes, creating animated GIFs showing robot movement, object pushing, obstacle collisions, and task completion.

Obstacles are axis-aligned bounding boxes rendered as rectangles. The robot is drawn as a circle with its configured radius. Actions are displayed as formatted 2D vectors.

Classes:
ContinuousPushPOMDPVisualizer: Handles all visualization logic for

Continuous Push POMDP

class POMDPPlanners.environments.push_pomdp.continuous_push_pomdp_visualizer.ContinuousPushPOMDPVisualizer(env)[source]

Bases: object

Handles visualization and animation for Continuous Push POMDP environments.

This class encapsulates all visualization logic for Continuous Push POMDP episodes, creating animated GIFs showing robot movement (with circular body), object pushing, rectangular obstacle collisions, and task completion.

Parameters:

env (ContinuousPushPOMDP)

env

Reference to the ContinuousPushPOMDP environment instance.

grid_size

Size of the grid environment.

push_threshold

Distance threshold for robot to push object.

obstacles

Shape (M, 4) AABB array (cx, cy, hx, hy).

robot_radius

Radius of the robot body.

create_visualization(history, cache_path)[source]

Create animated visualization of a Continuous Push POMDP episode.

Creates an animated GIF showing the robot pushing the object toward the target, with rectangular obstacles, collision detection, distance indicators, and success feedback.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards.

  • cache_path (Path) – Path where to save the visualization (must end with .gif).

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif.

  • TypeError – If cache_path is not a Path object.

Return type:

None

POMDPPlanners.environments.push_pomdp.push_pomdp module

Push POMDP Environment Implementation.

This module implements a robotic push task as a POMDP, where a robot must push an object to a target location on a 2D grid. The robot can move in four directions and pushes objects when within range, with noisy observations of the object’s position.

The Push POMDP features: - Continuous 2D state space: [robot_x, robot_y, object_x, object_y, target_x, target_y] - Discrete action space: [“up”, “down”, “left”, “right”] - Noisy observations of object position (robot and target positions are known) - Physics-based pushing mechanics with friction - Distance-based rewards encouraging object movement toward target

Key mechanics: - Robot must be within push_threshold distance to move objects - Friction reduces the effectiveness of pushes - Object position observations include Gaussian noise - Episode terminates when object reaches target

Classes:

PushStateTransition: Physics-based pushing dynamics PushObservation: Noisy object position observations PushPOMDP: Main push task environment with POMDP formulation

class POMDPPlanners.environments.push_pomdp.push_pomdp.FixedStateDistribution(state)[source]

Bases: Distribution

Deterministic distribution that always returns the same fixed state.

Parameters:

state (ndarray)

sample(n_samples=1)[source]

Sample values from the distribution.

Parameters:

n_samples (int) – Number of samples to return. Defaults to 1.

Return type:

List[Any]

Returns:

List of n_samples independent samples from the distribution

Note

Subclasses must implement this method according to their specific distribution type and parameters.

class POMDPPlanners.environments.push_pomdp.push_pomdp.PushObservation(next_state, action, observation_noise, grid_size)[source]

Bases: ObservationModel

Noisy observation model for Push POMDP.

This model provides partial observability by adding Gaussian noise to the object’s position while keeping robot and target positions fully observable. This creates uncertainty about the exact object location, making planning challenging.

Observation format: [robot_x, robot_y, noisy_object_x, noisy_object_y, target_x, target_y]

Parameters:
next_state

True state after action execution

action

Action that was taken (not used in observation generation)

observation_noise

Standard deviation of Gaussian noise for object position

grid_size

Size of the grid environment

robot_pos

Robot position (observed exactly)

object_pos

True object position (observed with noise)

target_pos

Target position (observed exactly)

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> # True state after robot movement
>>> true_state = np.array([3.0, 3.0, 2.8, 3.2, 8.0, 8.0])
>>> action = "right"
>>> # Create observation model
>>> obs_model = PushObservation(
...     next_state=true_state,
...     action=action,
...     observation_noise=0.1,
...     grid_size=10
... )
>>> # Sample noisy observation
>>> observation = obs_model.sample()[0]
>>> len(observation) == 6  # [robot_x, robot_y, noisy_obj_x, noisy_obj_y, target_x, target_y]
True
>>> bool(observation[0] == 3.0)  # Robot position exact
True
>>> bool(observation[1] == 3.0)  # Robot position exact
True
>>> bool(observation[4] == 8.0)  # Target position exact
True
>>> # Calculate observation probability
>>> prob = obs_model.probability([observation])
>>> len(prob) == 1
True
probability(values)[source]

Calculate observation probabilities for given values.

Parameters:

values (List[Any]) – List of observation values to calculate probabilities for

Return type:

ndarray

Returns:

Array of probabilities corresponding to the input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample observations from the observation model.

Parameters:

n_samples (int) – Number of observation samples to generate. Defaults to 1.

Return type:

List[Any]

Returns:

List of sampled observations of length n_samples.

Note

Subclasses must implement this method according to their specific observation generation logic.

class POMDPPlanners.environments.push_pomdp.push_pomdp.PushPOMDP(discount_factor, grid_size=10, push_threshold=1.0, friction_coefficient=0.3, observation_noise=0.1, obstacles=None, obstacle_radius=0.5, obstacle_penalty=-10.0, initial_state=None, transition_error_prob=0.0, name='PushPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

Robotic push task formulated as a POMDP.

This environment simulates a robot that must push an object to a target location on a 2D grid. The robot can move in four directions and pushes objects when close enough, with partial observability through noisy object position measurements.

Problem Structure: - State: [robot_x, robot_y, object_x, object_y, target_x, target_y] (continuous) - Actions: [“up”, “down”, “left”, “right”] (discrete) - Observations: [robot_x, robot_y, noisy_object_x, noisy_object_y, target_x, target_y] - Rewards: -distance_to_target + 100 (when object reaches target) - Termination: Object within 0.5 units of target position

Key Features: - Physics-based pushing with configurable friction - Distance-based pushing threshold - Noisy observations of object position only - Dense reward signal based on object-target distance - Obstacle collision detection with configurable penalties - Obstacles prevent robot and object movement through them

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = PushPOMDP(discount_factor=0.99)
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
cache_visualization(history, cache_path)[source]

Cache animated visualization of the push episode.

Creates an animated GIF showing the robot pushing the object toward the target, with obstacles, collision detection, distance indicators, and success feedback.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif

  • TypeError – If cache_path is not a Path object

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics from episode histories.

This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.

Parameters:

histories (List[History]) – List of episode histories to analyze

Return type:

List[MetricValue]

Returns:

List of computed metrics with confidence intervals

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[str]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

get_metric_names()[source]

Get names of Push POMDP specific metrics.

Return type:

List[str]

Returns:

List containing collision-related metric names

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (ndarray) – First observation to compare

  • observation2 (ndarray) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (ndarray) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (str) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (str) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

sample_next_step(state, action)[source]

Sample a complete state transition step.

This convenience method combines state transition, observation generation, and reward calculation in a single operation.

Parameters:
  • state (Any) – Current state

  • action (Any) – Action to execute

Returns:

  • next_state: Sampled next state

  • next_observation: Sampled observation

  • reward: Immediate reward

Return type:

Tuple[Any, Any, float]

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (str) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.push_pomdp.push_pomdp.PushPOMDPMetrics(*values)[source]

Bases: Enum

Metric names for Push POMDP environment.

GOAL_REACHING_RATE = 'goal_reaching_rate'
OBJECT_OBSTACLE_COLLISION_RATE = 'object_obstacle_collision_rate'
ROBOT_OBSTACLE_COLLISION_RATE = 'robot_obstacle_collision_rate'
TOTAL_ALL_OBSTACLE_COLLISIONS = 'total_all_obstacle_collisions'
TOTAL_OBJECT_OBSTACLE_COLLISIONS = 'total_object_obstacle_collisions'
TOTAL_OBSTACLE_COLLISION_RATE = 'total_obstacle_collision_rate'
TOTAL_ROBOT_OBSTACLE_COLLISIONS = 'total_robot_obstacle_collisions'
class POMDPPlanners.environments.push_pomdp.push_pomdp.PushStateTransition(state, action, grid_size, push_threshold, friction_coefficient, obstacles=None, obstacle_radius=0.5, transition_error_prob=0.0)[source]

Bases: StateTransitionModel

State transition model for Push POMDP with physics-based pushing.

This model implements robot movement and object pushing dynamics on a 2D grid. The robot moves according to discrete actions, and can push objects when within the push threshold distance. Friction reduces push effectiveness.

State representation: [robot_x, robot_y, object_x, object_y, target_x, target_y]

Parameters:
state

Current state vector containing all entity positions

action

Movement action (“up”, “down”, “left”, “right”)

grid_size

Size of the grid environment

push_threshold

Maximum distance for robot to push object

friction_coefficient

Friction that reduces push force (0=no friction, 1=max friction)

robot_pos

Current robot position [x, y]

object_pos

Current object position [x, y]

target_pos

Target position [x, y] (fixed)

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> # Define state: [robot_x, robot_y, object_x, object_y, target_x, target_y]
>>> state = np.array([2.0, 3.0, 2.5, 3.1, 8.0, 8.0])
>>> action = "right"  # Move robot right
>>> # Create transition model
>>> transition = PushStateTransition(
...     state=state,
...     action=action,
...     grid_size=10,
...     push_threshold=1.0,
...     friction_coefficient=0.3
... )
>>> # Simulate step
>>> next_state = transition.sample()[0]
>>> len(next_state) == 6  # [robot_x, robot_y, object_x, object_y, target_x, target_y]
True
>>> isinstance(next_state, np.ndarray)
True
>>> bool(next_state[0] > state[0])  # Robot moved right
True
probability(values)[source]

Calculate probability of transitioning to given next states.

Accounts for transition error probability. With probability (1-p), the intended action executes. With probability p, a random error action (excluding the intended one) is selected uniformly.

Parameters:

values (List[Any]) – List of potential next states to evaluate

Return type:

ndarray

Returns:

Array of probabilities for each state in values

sample(n_samples=1)[source]

Sample next states from the transition model.

Parameters:

n_samples (int) – Number of next state samples to generate. Defaults to 1.

Return type:

List[Any]

Returns:

List of sampled next states of length n_samples.

Note

Subclasses must implement this method according to their specific state transition dynamics.

class POMDPPlanners.environments.push_pomdp.push_pomdp.RandomInitialStateDistribution(grid_size, target_pos, obstacles, obstacle_radius, parent)[source]

Bases: Distribution

Random initial state distribution for Push POMDP.

Parameters:
sample(n_samples=1)[source]

Sample values from the distribution.

Parameters:

n_samples (int) – Number of samples to return. Defaults to 1.

Return type:

List[Any]

Returns:

List of n_samples independent samples from the distribution

Note

Subclasses must implement this method according to their specific distribution type and parameters.

POMDPPlanners.environments.push_pomdp.push_pomdp_visualizer module

Visualization module for Push POMDP Environment.

This module provides visualization capabilities for Push POMDP episodes, creating animated GIFs showing robot movement, object pushing, obstacle collisions, and task completion.

Classes:

PushPOMDPVisualizer: Handles all visualization logic for Push POMDP

class POMDPPlanners.environments.push_pomdp.push_pomdp_visualizer.PushPOMDPVisualizer(env)[source]

Bases: object

Handles visualization and animation for Push POMDP environments.

This class encapsulates all visualization logic for Push POMDP episodes, creating animated GIFs showing robot movement, object pushing, obstacle collisions, and task completion.

Parameters:

env (PushPOMDP)

env

Reference to the PushPOMDP environment instance

grid_size

Size of the grid environment

push_threshold

Distance threshold for robot to push object

obstacles

List of obstacle positions

obstacle_radius

Radius of obstacles for collision detection

create_visualization(history, cache_path)[source]

Create animated visualization of a Push POMDP episode.

Creates an animated GIF showing the robot pushing the object toward the target, with obstacles, collision detection, distance indicators, and success feedback.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif

  • TypeError – If cache_path is not a Path object

Return type:

None