POMDPPlanners.environments.push_pomdp package
Push POMDP Environment Module.
This module provides the Push POMDP environment implementation and related components for robotic manipulation tasks.
- Classes:
PushPOMDP: Main POMDP environment for robotic push tasks PushStateTransition: State transition model with physics-based pushing PushObservation: Observation model with noisy position measurements PushPOMDPVisualizer: Visualization utilities for Push POMDP episodes ContinuousPushPOMDP: Continuous-action Push POMDP environment ContinuousPushPOMDPDiscreteActions: Discrete-action wrapper ContinuousPushStateTransitionModel: Continuous push state transition ContinuousPushObservationModel: Continuous push observation model
- class POMDPPlanners.environments.push_pomdp.ContinuousPushObservationModel(next_state, action, observation_noise, grid_size)[source]
Bases:
ObservationModelNoisy observation model for Continuous Push POMDP.
Robot and target positions are observed exactly; object position is observed with additive Gaussian noise.
Observation format: [robot_x, robot_y, noisy_obj_x, noisy_obj_y, target_x, target_y]
- next_state
True state after action.
- action
Action taken (2D vector).
- observation_noise
Standard deviation of object position noise.
- grid_size
Grid dimension.
Example
>>> import numpy as np >>> np.random.seed(42) >>> state = np.array([3.0, 3.0, 2.8, 3.2, 8.0, 8.0]) >>> action = np.array([1.0, 0.0]) >>> obs_model = ContinuousPushObservationModel( ... next_state=state, action=action, ... observation_noise=0.1, grid_size=10, ... ) >>> obs = obs_model.sample()[0] >>> len(obs) == 6 True >>> bool(obs[0] == 3.0) # Robot exact True
- probability(values)[source]
Calculate observation probabilities for given values.
- Parameters:
values (
List[Any]) – List of observation values to calculate probabilities for- Return type:
- Returns:
Array of probabilities corresponding to the input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- sample(n_samples=1)[source]
Sample observations from the observation model.
- Parameters:
n_samples (
int) – Number of observation samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled observations of length n_samples.
Note
Subclasses must implement this method according to their specific observation generation logic.
- class POMDPPlanners.environments.push_pomdp.ContinuousPushPOMDP(discount_factor, grid_size=10, push_threshold=1.0, friction_coefficient=0.3, max_push=2.0, observation_noise=0.1, obstacles=None, obstacle_penalty=-10.0, robot_radius=0.3, state_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), initial_state=None, name='ContinuousPushPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
EnvironmentContinuous-action Push POMDP environment.
A robot (circle) must push an object (point) to a target location on a 2D grid. The robot moves via continuous 2D action vectors with Gaussian noise; obstacles are axis-aligned squares.
State: [robot_x, robot_y, object_x, object_y, target_x, target_y] Actions: 2D numpy vectors Observations: [robot_x, robot_y, noisy_obj_x, noisy_obj_y, target_x, target_y]
Example
>>> import numpy as np >>> np.random.seed(42) >>> >>> env = ContinuousPushPOMDP(discount_factor=0.99) >>> >>> initial_state = env.initial_state_dist().sample()[0] >>> >>> action = np.array([1.0, 0.0]) >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
grid_size (int)
push_threshold (float)
friction_coefficient (float)
max_push (float)
observation_noise (float)
obstacle_penalty (float)
robot_radius (float)
state_transition_cov_matrix (ndarray)
initial_state (ndarray | None)
name (str)
output_dir (Path | None)
debug (bool)
use_queue_logger (bool)
- cache_visualization(history, cache_path)[source]
Cache animated visualization of the continuous push episode.
Creates an animated GIF showing the robot pushing the object toward the target, with rectangular obstacles, collision detection, distance indicators, and success feedback.
- Parameters:
- Raises:
ValueError – If history is empty or cache_path doesn’t end with .gif.
TypeError – If cache_path is not a Path object.
- Return type:
- compute_metrics(histories)[source]
Compute environment-specific metrics from episode histories.
This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
ndarray) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- class POMDPPlanners.environments.push_pomdp.ContinuousPushPOMDPDiscreteActions(discount_factor, grid_size=10, push_threshold=1.0, friction_coefficient=0.3, max_push=2.0, observation_noise=0.1, obstacles=None, obstacle_penalty=-10.0, robot_radius=0.3, state_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), initial_state=None, name='ContinuousPushPOMDPDiscreteActions', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
ContinuousPushPOMDP,DiscreteActionsEnvironmentDiscrete-action wrapper for the Continuous Push POMDP.
Maps string actions
["up", "down", "right", "left"]to unit vectors and delegates to the continuous parent.Example
>>> import numpy as np >>> np.random.seed(42) >>> >>> env = ContinuousPushPOMDPDiscreteActions(discount_factor=0.99) >>> >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
grid_size (int)
push_threshold (float)
friction_coefficient (float)
max_push (float)
observation_noise (float)
obstacle_penalty (float)
robot_radius (float)
state_transition_cov_matrix (ndarray)
initial_state (ndarray | None)
name (str)
output_dir (Path | None)
debug (bool)
use_queue_logger (bool)
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- class POMDPPlanners.environments.push_pomdp.ContinuousPushPOMDPVisualizer(env)[source]
Bases:
objectHandles visualization and animation for Continuous Push POMDP environments.
This class encapsulates all visualization logic for Continuous Push POMDP episodes, creating animated GIFs showing robot movement (with circular body), object pushing, rectangular obstacle collisions, and task completion.
- Parameters:
env (ContinuousPushPOMDP)
- env
Reference to the ContinuousPushPOMDP environment instance.
- grid_size
Size of the grid environment.
- push_threshold
Distance threshold for robot to push object.
- obstacles
Shape
(M, 4)AABB array(cx, cy, hx, hy).
- robot_radius
Radius of the robot body.
- create_visualization(history, cache_path)[source]
Create animated visualization of a Continuous Push POMDP episode.
Creates an animated GIF showing the robot pushing the object toward the target, with rectangular obstacles, collision detection, distance indicators, and success feedback.
- Parameters:
- Raises:
ValueError – If history is empty or cache_path doesn’t end with .gif.
TypeError – If cache_path is not a Path object.
- Return type:
- class POMDPPlanners.environments.push_pomdp.ContinuousPushStateTransitionModel(state, action, state_transition_dist, grid_size, push_threshold, friction_coefficient, max_push, obstacles, robot_radius)[source]
Bases:
StateTransitionModelState transition model for Continuous Push POMDP.
Implements continuous robot movement with Gaussian noise and capped push mechanics. The robot is a circle; the object is a point. Both interact with axis-aligned square obstacles.
State representation: [robot_x, robot_y, object_x, object_y, target_x, target_y]
- Parameters:
- state
Current state vector.
- action
2D movement vector.
- grid_size
Grid dimension.
- push_threshold
Maximum robot-object distance for a push.
- friction_coefficient
Friction reducing push force (0-1).
- max_push
Maximum push magnitude.
- obstacles
Shape
(M, 4)AABB array.
- robot_radius
Robot body radius.
Example
>>> import numpy as np >>> np.random.seed(42) >>> state = np.array([2.0, 3.0, 2.5, 3.1, 8.0, 8.0]) >>> action = np.array([1.0, 0.0]) >>> cov = np.eye(2) * 0.01 >>> dist = CovarianceParameterizedMultivariateNormal(cov) >>> transition = ContinuousPushStateTransitionModel( ... state=state, action=action, ... state_transition_dist=dist, grid_size=10, ... push_threshold=1.0, friction_coefficient=0.3, ... max_push=2.0, obstacles=np.empty((0, 4)), ... robot_radius=0.3, ... ) >>> next_state = transition.sample()[0] >>> len(next_state) == 6 True
- probability(values)[source]
Calculate transition probabilities for given next states.
- Parameters:
values (
List[ndarray]) – List of next state values to calculate probabilities for- Return type:
- Returns:
Array of transition probabilities corresponding to the input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- sample(n_samples=1)[source]
Sample next states from the transition model.
- Parameters:
n_samples (
int) – Number of next state samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled next states of length n_samples.
Note
Subclasses must implement this method according to their specific state transition dynamics.
- class POMDPPlanners.environments.push_pomdp.PushObservation(next_state, action, observation_noise, grid_size)[source]
Bases:
ObservationModelNoisy observation model for Push POMDP.
This model provides partial observability by adding Gaussian noise to the object’s position while keeping robot and target positions fully observable. This creates uncertainty about the exact object location, making planning challenging.
Observation format: [robot_x, robot_y, noisy_object_x, noisy_object_y, target_x, target_y]
- next_state
True state after action execution
- action
Action that was taken (not used in observation generation)
- observation_noise
Standard deviation of Gaussian noise for object position
- grid_size
Size of the grid environment
- robot_pos
Robot position (observed exactly)
- object_pos
True object position (observed with noise)
- target_pos
Target position (observed exactly)
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> # True state after robot movement >>> true_state = np.array([3.0, 3.0, 2.8, 3.2, 8.0, 8.0]) >>> action = "right"
>>> # Create observation model >>> obs_model = PushObservation( ... next_state=true_state, ... action=action, ... observation_noise=0.1, ... grid_size=10 ... )
>>> # Sample noisy observation >>> observation = obs_model.sample()[0] >>> len(observation) == 6 # [robot_x, robot_y, noisy_obj_x, noisy_obj_y, target_x, target_y] True >>> bool(observation[0] == 3.0) # Robot position exact True >>> bool(observation[1] == 3.0) # Robot position exact True >>> bool(observation[4] == 8.0) # Target position exact True
>>> # Calculate observation probability >>> prob = obs_model.probability([observation]) >>> len(prob) == 1 True
- probability(values)[source]
Calculate observation probabilities for given values.
- Parameters:
values (
List[Any]) – List of observation values to calculate probabilities for- Return type:
- Returns:
Array of probabilities corresponding to the input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- sample(n_samples=1)[source]
Sample observations from the observation model.
- Parameters:
n_samples (
int) – Number of observation samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled observations of length n_samples.
Note
Subclasses must implement this method according to their specific observation generation logic.
- class POMDPPlanners.environments.push_pomdp.PushPOMDP(discount_factor, grid_size=10, push_threshold=1.0, friction_coefficient=0.3, observation_noise=0.1, obstacles=None, obstacle_radius=0.5, obstacle_penalty=-10.0, initial_state=None, transition_error_prob=0.0, name='PushPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
DiscreteActionsEnvironmentRobotic push task formulated as a POMDP.
This environment simulates a robot that must push an object to a target location on a 2D grid. The robot can move in four directions and pushes objects when close enough, with partial observability through noisy object position measurements.
Problem Structure: - State: [robot_x, robot_y, object_x, object_y, target_x, target_y] (continuous) - Actions: [“up”, “down”, “left”, “right”] (discrete) - Observations: [robot_x, robot_y, noisy_object_x, noisy_object_y, target_x, target_y] - Rewards: -distance_to_target + 100 (when object reaches target) - Termination: Object within 0.5 units of target position
Key Features: - Physics-based pushing with configurable friction - Distance-based pushing threshold - Noisy observations of object position only - Dense reward signal based on object-target distance - Obstacle collision detection with configurable penalties - Obstacles prevent robot and object movement through them
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = PushPOMDP(discount_factor=0.99) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
grid_size (int)
push_threshold (float)
friction_coefficient (float)
observation_noise (float)
obstacle_radius (float)
obstacle_penalty (float)
initial_state (ndarray | None)
transition_error_prob (float)
name (str)
output_dir (Path | None)
debug (bool)
use_queue_logger (bool)
- cache_visualization(history, cache_path)[source]
Cache animated visualization of the push episode.
Creates an animated GIF showing the robot pushing the object toward the target, with obstacles, collision detection, distance indicators, and success feedback.
- Parameters:
- Raises:
ValueError – If history is empty or cache_path doesn’t end with .gif
TypeError – If cache_path is not a Path object
- Return type:
- compute_metrics(histories)[source]
Compute environment-specific metrics from episode histories.
This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
ndarray) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- sample_next_step(state, action)[source]
Sample a complete state transition step.
This convenience method combines state transition, observation generation, and reward calculation in a single operation.
- class POMDPPlanners.environments.push_pomdp.PushPOMDPVisualizer(env)[source]
Bases:
objectHandles visualization and animation for Push POMDP environments.
This class encapsulates all visualization logic for Push POMDP episodes, creating animated GIFs showing robot movement, object pushing, obstacle collisions, and task completion.
- Parameters:
env (PushPOMDP)
- env
Reference to the PushPOMDP environment instance
- grid_size
Size of the grid environment
- push_threshold
Distance threshold for robot to push object
- obstacles
List of obstacle positions
- obstacle_radius
Radius of obstacles for collision detection
- create_visualization(history, cache_path)[source]
Create animated visualization of a Push POMDP episode.
Creates an animated GIF showing the robot pushing the object toward the target, with obstacles, collision detection, distance indicators, and success feedback.
- Parameters:
- Raises:
ValueError – If history is empty or cache_path doesn’t end with .gif
TypeError – If cache_path is not a Path object
- Return type:
- class POMDPPlanners.environments.push_pomdp.PushStateTransition(state, action, grid_size, push_threshold, friction_coefficient, obstacles=None, obstacle_radius=0.5, transition_error_prob=0.0)[source]
Bases:
StateTransitionModelState transition model for Push POMDP with physics-based pushing.
This model implements robot movement and object pushing dynamics on a 2D grid. The robot moves according to discrete actions, and can push objects when within the push threshold distance. Friction reduces push effectiveness.
State representation: [robot_x, robot_y, object_x, object_y, target_x, target_y]
- Parameters:
- state
Current state vector containing all entity positions
- action
Movement action (“up”, “down”, “left”, “right”)
- grid_size
Size of the grid environment
- push_threshold
Maximum distance for robot to push object
- friction_coefficient
Friction that reduces push force (0=no friction, 1=max friction)
- robot_pos
Current robot position [x, y]
- object_pos
Current object position [x, y]
- target_pos
Target position [x, y] (fixed)
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> # Define state: [robot_x, robot_y, object_x, object_y, target_x, target_y] >>> state = np.array([2.0, 3.0, 2.5, 3.1, 8.0, 8.0]) >>> action = "right" # Move robot right
>>> # Create transition model >>> transition = PushStateTransition( ... state=state, ... action=action, ... grid_size=10, ... push_threshold=1.0, ... friction_coefficient=0.3 ... )
>>> # Simulate step >>> next_state = transition.sample()[0] >>> len(next_state) == 6 # [robot_x, robot_y, object_x, object_y, target_x, target_y] True >>> isinstance(next_state, np.ndarray) True >>> bool(next_state[0] > state[0]) # Robot moved right True
- probability(values)[source]
Calculate probability of transitioning to given next states.
Accounts for transition error probability. With probability (1-p), the intended action executes. With probability p, a random error action (excluding the intended one) is selected uniformly.
- sample(n_samples=1)[source]
Sample next states from the transition model.
- Parameters:
n_samples (
int) – Number of next state samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled next states of length n_samples.
Note
Subclasses must implement this method according to their specific state transition dynamics.
Subpackages
- POMDPPlanners.environments.push_pomdp.push_pomdp_beliefs package
ContinuousPushVectorizedUpdaterContinuousPushVectorizedUpdater.obs_distContinuousPushVectorizedUpdater.grid_sizeContinuousPushVectorizedUpdater.push_thresholdContinuousPushVectorizedUpdater.friction_coefficientContinuousPushVectorizedUpdater.max_pushContinuousPushVectorizedUpdater.obstaclesContinuousPushVectorizedUpdater.robot_radiusContinuousPushVectorizedUpdater.batch_observation_log_likelihood()ContinuousPushVectorizedUpdater.batch_transition()ContinuousPushVectorizedUpdater.config_idContinuousPushVectorizedUpdater.from_environment()
PushVectorizedUpdaterPushVectorizedUpdater.obs_distPushVectorizedUpdater.grid_sizePushVectorizedUpdater.push_thresholdPushVectorizedUpdater.friction_coefficientPushVectorizedUpdater.obstaclesPushVectorizedUpdater.obstacle_radiusPushVectorizedUpdater.transition_error_probPushVectorizedUpdater.ACTION_NAME_TO_INDEXPushVectorizedUpdater.ACTION_VECTORSPushVectorizedUpdater.batch_observation_log_likelihood()PushVectorizedUpdater.batch_transition()PushVectorizedUpdater.config_idPushVectorizedUpdater.from_environment()
create_continuous_push_belief()create_push_belief()- Submodules
- POMDPPlanners.environments.push_pomdp.push_pomdp_beliefs.continuous_push_belief_factory module
- POMDPPlanners.environments.push_pomdp.push_pomdp_beliefs.continuous_push_vectorized_updater module
ContinuousPushVectorizedUpdaterContinuousPushVectorizedUpdater.obs_distContinuousPushVectorizedUpdater.grid_sizeContinuousPushVectorizedUpdater.push_thresholdContinuousPushVectorizedUpdater.friction_coefficientContinuousPushVectorizedUpdater.max_pushContinuousPushVectorizedUpdater.obstaclesContinuousPushVectorizedUpdater.robot_radiusContinuousPushVectorizedUpdater.batch_observation_log_likelihood()ContinuousPushVectorizedUpdater.batch_transition()ContinuousPushVectorizedUpdater.config_idContinuousPushVectorizedUpdater.from_environment()
- POMDPPlanners.environments.push_pomdp.push_pomdp_beliefs.push_belief_factory module
- POMDPPlanners.environments.push_pomdp.push_pomdp_beliefs.push_vectorized_updater module
PushVectorizedUpdaterPushVectorizedUpdater.obs_distPushVectorizedUpdater.grid_sizePushVectorizedUpdater.push_thresholdPushVectorizedUpdater.friction_coefficientPushVectorizedUpdater.obstaclesPushVectorizedUpdater.obstacle_radiusPushVectorizedUpdater.transition_error_probPushVectorizedUpdater.ACTION_NAME_TO_INDEXPushVectorizedUpdater.ACTION_VECTORSPushVectorizedUpdater.batch_observation_log_likelihood()PushVectorizedUpdater.batch_transition()PushVectorizedUpdater.config_idPushVectorizedUpdater.from_environment()
Submodules
POMDPPlanners.environments.push_pomdp.continuous_push_geometry module
Geometry utilities for the Continuous Push POMDP.
Provides circle-AABB overlap tests, point-in-AABB tests, collision resolution and grid clamping used by the continuous push environment and its vectorized belief updater.
Wall AABBs (obstacles) are stored as rows (cx, cy, hx, hy) where
(cx, cy) is the center and (hx, hy) the half-extents. For
square obstacles hx == hy.
- Functions:
circle_aabb_overlap: Boolean circle-AABB overlap test. point_inside_aabb: Boolean point-inside-AABB test. resolve_circle_wall_collision: Push a circle out of overlapping AABBs. clamp_circle_to_grid: Clamp a circle center so the circle stays in-bounds. clamp_point_to_grid: Clamp a point to
[0, grid_size-1]. batch_resolve_circle_wall_collision: Vectorized circle-AABB resolution. batch_clamp_circle_to_grid: Vectorized circle grid clamping. batch_point_inside_aabb: Vectorized point-inside-AABB test. batch_clamp_point_to_grid: Vectorized point grid clamping.
- POMDPPlanners.environments.push_pomdp.continuous_push_geometry.batch_clamp_circle_to_grid(positions, radius, grid_size)[source]
Clamp an array of circle centers so circles stay in-bounds.
- POMDPPlanners.environments.push_pomdp.continuous_push_geometry.batch_clamp_point_to_grid(positions, grid_size)[source]
Clamp an array of points to
[0, grid_size-1].
- POMDPPlanners.environments.push_pomdp.continuous_push_geometry.batch_point_inside_aabb(points, walls)[source]
Test whether each point lies inside any AABB.
- POMDPPlanners.environments.push_pomdp.continuous_push_geometry.batch_resolve_circle_wall_collision(positions, radius, walls)[source]
Resolve circle-AABB collisions for an array of positions.
- POMDPPlanners.environments.push_pomdp.continuous_push_geometry.circle_aabb_overlap(center, radius, wall)[source]
Test whether a circle overlaps an axis-aligned bounding box.
- POMDPPlanners.environments.push_pomdp.continuous_push_geometry.clamp_circle_to_grid(pos, radius, grid_size)[source]
Clamp a circle center so the full circle stays within
[0, grid_size-1].
- POMDPPlanners.environments.push_pomdp.continuous_push_geometry.clamp_point_to_grid(pos, grid_size)[source]
Clamp a point to
[0, grid_size-1].
- POMDPPlanners.environments.push_pomdp.continuous_push_geometry.point_inside_aabb(point, wall)[source]
Test whether a point lies inside an axis-aligned bounding box.
POMDPPlanners.environments.push_pomdp.continuous_push_pomdp module
Continuous Push POMDP Environment Implementation.
This module implements a continuous-action variant of the Push POMDP where the robot moves via 2D action vectors with Gaussian noise, has a configurable radius, and obstacles are axis-aligned squares.
The Continuous Push POMDP features: - Continuous 2D state space: [robot_x, robot_y, object_x, object_y, target_x, target_y] - Continuous action space (2D movement vectors) - Robot modelled as a circle with configurable radius - Object modelled as a point - Square obstacles defined as axis-aligned bounding boxes - Gaussian transition noise on robot movement - Capped push force with friction - Noisy observations of object position
- Classes:
ContinuousPushStateTransitionModel: Continuous movement with noise and push. ContinuousPushObservationModel: Noisy object position observations. ContinuousPushPOMDP: Main environment with continuous actions. ContinuousPushPOMDPDiscreteActions: Discrete action wrapper.
- class POMDPPlanners.environments.push_pomdp.continuous_push_pomdp.ContinuousPushObservationModel(next_state, action, observation_noise, grid_size)[source]
Bases:
ObservationModelNoisy observation model for Continuous Push POMDP.
Robot and target positions are observed exactly; object position is observed with additive Gaussian noise.
Observation format: [robot_x, robot_y, noisy_obj_x, noisy_obj_y, target_x, target_y]
- next_state
True state after action.
- action
Action taken (2D vector).
- observation_noise
Standard deviation of object position noise.
- grid_size
Grid dimension.
Example
>>> import numpy as np >>> np.random.seed(42) >>> state = np.array([3.0, 3.0, 2.8, 3.2, 8.0, 8.0]) >>> action = np.array([1.0, 0.0]) >>> obs_model = ContinuousPushObservationModel( ... next_state=state, action=action, ... observation_noise=0.1, grid_size=10, ... ) >>> obs = obs_model.sample()[0] >>> len(obs) == 6 True >>> bool(obs[0] == 3.0) # Robot exact True
- probability(values)[source]
Calculate observation probabilities for given values.
- Parameters:
values (
List[Any]) – List of observation values to calculate probabilities for- Return type:
- Returns:
Array of probabilities corresponding to the input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- sample(n_samples=1)[source]
Sample observations from the observation model.
- Parameters:
n_samples (
int) – Number of observation samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled observations of length n_samples.
Note
Subclasses must implement this method according to their specific observation generation logic.
- class POMDPPlanners.environments.push_pomdp.continuous_push_pomdp.ContinuousPushPOMDP(discount_factor, grid_size=10, push_threshold=1.0, friction_coefficient=0.3, max_push=2.0, observation_noise=0.1, obstacles=None, obstacle_penalty=-10.0, robot_radius=0.3, state_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), initial_state=None, name='ContinuousPushPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
EnvironmentContinuous-action Push POMDP environment.
A robot (circle) must push an object (point) to a target location on a 2D grid. The robot moves via continuous 2D action vectors with Gaussian noise; obstacles are axis-aligned squares.
State: [robot_x, robot_y, object_x, object_y, target_x, target_y] Actions: 2D numpy vectors Observations: [robot_x, robot_y, noisy_obj_x, noisy_obj_y, target_x, target_y]
Example
>>> import numpy as np >>> np.random.seed(42) >>> >>> env = ContinuousPushPOMDP(discount_factor=0.99) >>> >>> initial_state = env.initial_state_dist().sample()[0] >>> >>> action = np.array([1.0, 0.0]) >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
grid_size (int)
push_threshold (float)
friction_coefficient (float)
max_push (float)
observation_noise (float)
obstacle_penalty (float)
robot_radius (float)
state_transition_cov_matrix (ndarray)
initial_state (ndarray | None)
name (str)
output_dir (Path | None)
debug (bool)
use_queue_logger (bool)
- cache_visualization(history, cache_path)[source]
Cache animated visualization of the continuous push episode.
Creates an animated GIF showing the robot pushing the object toward the target, with rectangular obstacles, collision detection, distance indicators, and success feedback.
- Parameters:
- Raises:
ValueError – If history is empty or cache_path doesn’t end with .gif.
TypeError – If cache_path is not a Path object.
- Return type:
- compute_metrics(histories)[source]
Compute environment-specific metrics from episode histories.
This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
ndarray) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- class POMDPPlanners.environments.push_pomdp.continuous_push_pomdp.ContinuousPushPOMDPDiscreteActions(discount_factor, grid_size=10, push_threshold=1.0, friction_coefficient=0.3, max_push=2.0, observation_noise=0.1, obstacles=None, obstacle_penalty=-10.0, robot_radius=0.3, state_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), initial_state=None, name='ContinuousPushPOMDPDiscreteActions', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
ContinuousPushPOMDP,DiscreteActionsEnvironmentDiscrete-action wrapper for the Continuous Push POMDP.
Maps string actions
["up", "down", "right", "left"]to unit vectors and delegates to the continuous parent.Example
>>> import numpy as np >>> np.random.seed(42) >>> >>> env = ContinuousPushPOMDPDiscreteActions(discount_factor=0.99) >>> >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
grid_size (int)
push_threshold (float)
friction_coefficient (float)
max_push (float)
observation_noise (float)
obstacle_penalty (float)
robot_radius (float)
state_transition_cov_matrix (ndarray)
initial_state (ndarray | None)
name (str)
output_dir (Path | None)
debug (bool)
use_queue_logger (bool)
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- class POMDPPlanners.environments.push_pomdp.continuous_push_pomdp.ContinuousPushPOMDPMetrics(*values)[source]
Bases:
EnumMetric names for Continuous Push POMDP environment.
- GOAL_REACHING_RATE = 'goal_reaching_rate'
- OBJECT_OBSTACLE_COLLISION_RATE = 'object_obstacle_collision_rate'
- ROBOT_OBSTACLE_COLLISION_RATE = 'robot_obstacle_collision_rate'
- TOTAL_ALL_OBSTACLE_COLLISIONS = 'total_all_obstacle_collisions'
- TOTAL_OBJECT_OBSTACLE_COLLISIONS = 'total_object_obstacle_collisions'
- TOTAL_OBSTACLE_COLLISION_RATE = 'total_obstacle_collision_rate'
- TOTAL_ROBOT_OBSTACLE_COLLISIONS = 'total_robot_obstacle_collisions'
- class POMDPPlanners.environments.push_pomdp.continuous_push_pomdp.ContinuousPushStateTransitionModel(state, action, state_transition_dist, grid_size, push_threshold, friction_coefficient, max_push, obstacles, robot_radius)[source]
Bases:
StateTransitionModelState transition model for Continuous Push POMDP.
Implements continuous robot movement with Gaussian noise and capped push mechanics. The robot is a circle; the object is a point. Both interact with axis-aligned square obstacles.
State representation: [robot_x, robot_y, object_x, object_y, target_x, target_y]
- Parameters:
- state
Current state vector.
- action
2D movement vector.
- grid_size
Grid dimension.
- push_threshold
Maximum robot-object distance for a push.
- friction_coefficient
Friction reducing push force (0-1).
- max_push
Maximum push magnitude.
- obstacles
Shape
(M, 4)AABB array.
- robot_radius
Robot body radius.
Example
>>> import numpy as np >>> np.random.seed(42) >>> state = np.array([2.0, 3.0, 2.5, 3.1, 8.0, 8.0]) >>> action = np.array([1.0, 0.0]) >>> cov = np.eye(2) * 0.01 >>> dist = CovarianceParameterizedMultivariateNormal(cov) >>> transition = ContinuousPushStateTransitionModel( ... state=state, action=action, ... state_transition_dist=dist, grid_size=10, ... push_threshold=1.0, friction_coefficient=0.3, ... max_push=2.0, obstacles=np.empty((0, 4)), ... robot_radius=0.3, ... ) >>> next_state = transition.sample()[0] >>> len(next_state) == 6 True
- probability(values)[source]
Calculate transition probabilities for given next states.
- Parameters:
values (
List[ndarray]) – List of next state values to calculate probabilities for- Return type:
- Returns:
Array of transition probabilities corresponding to the input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- sample(n_samples=1)[source]
Sample next states from the transition model.
- Parameters:
n_samples (
int) – Number of next state samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled next states of length n_samples.
Note
Subclasses must implement this method according to their specific state transition dynamics.
POMDPPlanners.environments.push_pomdp.continuous_push_pomdp_visualizer module
Visualization module for Continuous Push POMDP Environment.
This module provides visualization capabilities for Continuous Push POMDP episodes, creating animated GIFs showing robot movement, object pushing, obstacle collisions, and task completion.
Obstacles are axis-aligned bounding boxes rendered as rectangles. The robot is drawn as a circle with its configured radius. Actions are displayed as formatted 2D vectors.
- Classes:
- ContinuousPushPOMDPVisualizer: Handles all visualization logic for
Continuous Push POMDP
- class POMDPPlanners.environments.push_pomdp.continuous_push_pomdp_visualizer.ContinuousPushPOMDPVisualizer(env)[source]
Bases:
objectHandles visualization and animation for Continuous Push POMDP environments.
This class encapsulates all visualization logic for Continuous Push POMDP episodes, creating animated GIFs showing robot movement (with circular body), object pushing, rectangular obstacle collisions, and task completion.
- Parameters:
env (ContinuousPushPOMDP)
- env
Reference to the ContinuousPushPOMDP environment instance.
- grid_size
Size of the grid environment.
- push_threshold
Distance threshold for robot to push object.
- obstacles
Shape
(M, 4)AABB array(cx, cy, hx, hy).
- robot_radius
Radius of the robot body.
- create_visualization(history, cache_path)[source]
Create animated visualization of a Continuous Push POMDP episode.
Creates an animated GIF showing the robot pushing the object toward the target, with rectangular obstacles, collision detection, distance indicators, and success feedback.
- Parameters:
- Raises:
ValueError – If history is empty or cache_path doesn’t end with .gif.
TypeError – If cache_path is not a Path object.
- Return type:
POMDPPlanners.environments.push_pomdp.push_pomdp module
Push POMDP Environment Implementation.
This module implements a robotic push task as a POMDP, where a robot must push an object to a target location on a 2D grid. The robot can move in four directions and pushes objects when within range, with noisy observations of the object’s position.
The Push POMDP features: - Continuous 2D state space: [robot_x, robot_y, object_x, object_y, target_x, target_y] - Discrete action space: [“up”, “down”, “left”, “right”] - Noisy observations of object position (robot and target positions are known) - Physics-based pushing mechanics with friction - Distance-based rewards encouraging object movement toward target
Key mechanics: - Robot must be within push_threshold distance to move objects - Friction reduces the effectiveness of pushes - Object position observations include Gaussian noise - Episode terminates when object reaches target
- Classes:
PushStateTransition: Physics-based pushing dynamics PushObservation: Noisy object position observations PushPOMDP: Main push task environment with POMDP formulation
- class POMDPPlanners.environments.push_pomdp.push_pomdp.FixedStateDistribution(state)[source]
Bases:
DistributionDeterministic distribution that always returns the same fixed state.
- Parameters:
state (ndarray)
- sample(n_samples=1)[source]
Sample values from the distribution.
- Parameters:
n_samples (
int) – Number of samples to return. Defaults to 1.- Return type:
- Returns:
List of n_samples independent samples from the distribution
Note
Subclasses must implement this method according to their specific distribution type and parameters.
- class POMDPPlanners.environments.push_pomdp.push_pomdp.PushObservation(next_state, action, observation_noise, grid_size)[source]
Bases:
ObservationModelNoisy observation model for Push POMDP.
This model provides partial observability by adding Gaussian noise to the object’s position while keeping robot and target positions fully observable. This creates uncertainty about the exact object location, making planning challenging.
Observation format: [robot_x, robot_y, noisy_object_x, noisy_object_y, target_x, target_y]
- next_state
True state after action execution
- action
Action that was taken (not used in observation generation)
- observation_noise
Standard deviation of Gaussian noise for object position
- grid_size
Size of the grid environment
- robot_pos
Robot position (observed exactly)
- object_pos
True object position (observed with noise)
- target_pos
Target position (observed exactly)
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> # True state after robot movement >>> true_state = np.array([3.0, 3.0, 2.8, 3.2, 8.0, 8.0]) >>> action = "right"
>>> # Create observation model >>> obs_model = PushObservation( ... next_state=true_state, ... action=action, ... observation_noise=0.1, ... grid_size=10 ... )
>>> # Sample noisy observation >>> observation = obs_model.sample()[0] >>> len(observation) == 6 # [robot_x, robot_y, noisy_obj_x, noisy_obj_y, target_x, target_y] True >>> bool(observation[0] == 3.0) # Robot position exact True >>> bool(observation[1] == 3.0) # Robot position exact True >>> bool(observation[4] == 8.0) # Target position exact True
>>> # Calculate observation probability >>> prob = obs_model.probability([observation]) >>> len(prob) == 1 True
- probability(values)[source]
Calculate observation probabilities for given values.
- Parameters:
values (
List[Any]) – List of observation values to calculate probabilities for- Return type:
- Returns:
Array of probabilities corresponding to the input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- sample(n_samples=1)[source]
Sample observations from the observation model.
- Parameters:
n_samples (
int) – Number of observation samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled observations of length n_samples.
Note
Subclasses must implement this method according to their specific observation generation logic.
- class POMDPPlanners.environments.push_pomdp.push_pomdp.PushPOMDP(discount_factor, grid_size=10, push_threshold=1.0, friction_coefficient=0.3, observation_noise=0.1, obstacles=None, obstacle_radius=0.5, obstacle_penalty=-10.0, initial_state=None, transition_error_prob=0.0, name='PushPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
DiscreteActionsEnvironmentRobotic push task formulated as a POMDP.
This environment simulates a robot that must push an object to a target location on a 2D grid. The robot can move in four directions and pushes objects when close enough, with partial observability through noisy object position measurements.
Problem Structure: - State: [robot_x, robot_y, object_x, object_y, target_x, target_y] (continuous) - Actions: [“up”, “down”, “left”, “right”] (discrete) - Observations: [robot_x, robot_y, noisy_object_x, noisy_object_y, target_x, target_y] - Rewards: -distance_to_target + 100 (when object reaches target) - Termination: Object within 0.5 units of target position
Key Features: - Physics-based pushing with configurable friction - Distance-based pushing threshold - Noisy observations of object position only - Dense reward signal based on object-target distance - Obstacle collision detection with configurable penalties - Obstacles prevent robot and object movement through them
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = PushPOMDP(discount_factor=0.99) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
grid_size (int)
push_threshold (float)
friction_coefficient (float)
observation_noise (float)
obstacle_radius (float)
obstacle_penalty (float)
initial_state (ndarray | None)
transition_error_prob (float)
name (str)
output_dir (Path | None)
debug (bool)
use_queue_logger (bool)
- cache_visualization(history, cache_path)[source]
Cache animated visualization of the push episode.
Creates an animated GIF showing the robot pushing the object toward the target, with obstacles, collision detection, distance indicators, and success feedback.
- Parameters:
- Raises:
ValueError – If history is empty or cache_path doesn’t end with .gif
TypeError – If cache_path is not a Path object
- Return type:
- compute_metrics(histories)[source]
Compute environment-specific metrics from episode histories.
This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
ndarray) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- sample_next_step(state, action)[source]
Sample a complete state transition step.
This convenience method combines state transition, observation generation, and reward calculation in a single operation.
- class POMDPPlanners.environments.push_pomdp.push_pomdp.PushPOMDPMetrics(*values)[source]
Bases:
EnumMetric names for Push POMDP environment.
- GOAL_REACHING_RATE = 'goal_reaching_rate'
- OBJECT_OBSTACLE_COLLISION_RATE = 'object_obstacle_collision_rate'
- ROBOT_OBSTACLE_COLLISION_RATE = 'robot_obstacle_collision_rate'
- TOTAL_ALL_OBSTACLE_COLLISIONS = 'total_all_obstacle_collisions'
- TOTAL_OBJECT_OBSTACLE_COLLISIONS = 'total_object_obstacle_collisions'
- TOTAL_OBSTACLE_COLLISION_RATE = 'total_obstacle_collision_rate'
- TOTAL_ROBOT_OBSTACLE_COLLISIONS = 'total_robot_obstacle_collisions'
- class POMDPPlanners.environments.push_pomdp.push_pomdp.PushStateTransition(state, action, grid_size, push_threshold, friction_coefficient, obstacles=None, obstacle_radius=0.5, transition_error_prob=0.0)[source]
Bases:
StateTransitionModelState transition model for Push POMDP with physics-based pushing.
This model implements robot movement and object pushing dynamics on a 2D grid. The robot moves according to discrete actions, and can push objects when within the push threshold distance. Friction reduces push effectiveness.
State representation: [robot_x, robot_y, object_x, object_y, target_x, target_y]
- Parameters:
- state
Current state vector containing all entity positions
- action
Movement action (“up”, “down”, “left”, “right”)
- grid_size
Size of the grid environment
- push_threshold
Maximum distance for robot to push object
- friction_coefficient
Friction that reduces push force (0=no friction, 1=max friction)
- robot_pos
Current robot position [x, y]
- object_pos
Current object position [x, y]
- target_pos
Target position [x, y] (fixed)
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> # Define state: [robot_x, robot_y, object_x, object_y, target_x, target_y] >>> state = np.array([2.0, 3.0, 2.5, 3.1, 8.0, 8.0]) >>> action = "right" # Move robot right
>>> # Create transition model >>> transition = PushStateTransition( ... state=state, ... action=action, ... grid_size=10, ... push_threshold=1.0, ... friction_coefficient=0.3 ... )
>>> # Simulate step >>> next_state = transition.sample()[0] >>> len(next_state) == 6 # [robot_x, robot_y, object_x, object_y, target_x, target_y] True >>> isinstance(next_state, np.ndarray) True >>> bool(next_state[0] > state[0]) # Robot moved right True
- probability(values)[source]
Calculate probability of transitioning to given next states.
Accounts for transition error probability. With probability (1-p), the intended action executes. With probability p, a random error action (excluding the intended one) is selected uniformly.
- sample(n_samples=1)[source]
Sample next states from the transition model.
- Parameters:
n_samples (
int) – Number of next state samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled next states of length n_samples.
Note
Subclasses must implement this method according to their specific state transition dynamics.
- class POMDPPlanners.environments.push_pomdp.push_pomdp.RandomInitialStateDistribution(grid_size, target_pos, obstacles, obstacle_radius, parent)[source]
Bases:
DistributionRandom initial state distribution for Push POMDP.
- Parameters:
- sample(n_samples=1)[source]
Sample values from the distribution.
- Parameters:
n_samples (
int) – Number of samples to return. Defaults to 1.- Return type:
- Returns:
List of n_samples independent samples from the distribution
Note
Subclasses must implement this method according to their specific distribution type and parameters.
POMDPPlanners.environments.push_pomdp.push_pomdp_visualizer module
Visualization module for Push POMDP Environment.
This module provides visualization capabilities for Push POMDP episodes, creating animated GIFs showing robot movement, object pushing, obstacle collisions, and task completion.
- Classes:
PushPOMDPVisualizer: Handles all visualization logic for Push POMDP
- class POMDPPlanners.environments.push_pomdp.push_pomdp_visualizer.PushPOMDPVisualizer(env)[source]
Bases:
objectHandles visualization and animation for Push POMDP environments.
This class encapsulates all visualization logic for Push POMDP episodes, creating animated GIFs showing robot movement, object pushing, obstacle collisions, and task completion.
- Parameters:
env (PushPOMDP)
- env
Reference to the PushPOMDP environment instance
- grid_size
Size of the grid environment
- push_threshold
Distance threshold for robot to push object
- obstacles
List of obstacle positions
- obstacle_radius
Radius of obstacles for collision detection
- create_visualization(history, cache_path)[source]
Create animated visualization of a Push POMDP episode.
Creates an animated GIF showing the robot pushing the object toward the target, with obstacles, collision detection, distance indicators, and success feedback.
- Parameters:
- Raises:
ValueError – If history is empty or cache_path doesn’t end with .gif
TypeError – If cache_path is not a Path object
- Return type: