POMDPPlanners.environments.laser_tag_pomdp package
LaserTag POMDP Environment Package.
This package implements the LaserTag pursuit-evasion POMDP environment in both discrete-grid and continuous-space variants.
Note
LaserTagState is now represented as numpy arrays with shape (5,). See laser_tag_pomdp.py for state vector structure documentation.
- class POMDPPlanners.environments.laser_tag_pomdp.ContinuousLaserTagPOMDP(discount_factor, name='ContinuousLaserTagPOMDP', grid_size=(11.0, 7.0), walls=None, robot_radius=0.3, opponent_radius=0.3, tag_radius=0.5, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, robot_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), opponent_transition_cov_matrix=array([[0.05, 0.], [0., 0.05]]), pursuit_speed=0.6, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None)[source]
Bases:
EnvironmentContinuous LaserTag POMDP with continuous
[dx, dy, tag_flag]actions.A pursuit-evasion problem in continuous 2-D space where a robot must navigate to tag an opponent. The robot receives noisy 8-direction laser range observations.
Example
>>> import numpy as np >>> np.random.seed(42) >>> >>> # Initialize environment >>> env = ContinuousLaserTagPOMDP(discount_factor=0.95) >>> >>> # Get initial state >>> initial_state = env.initial_state_dist().sample()[0] >>> >>> # Sample complete step >>> action = np.array([1.0, 0.0, 0.0]) >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
name (str)
robot_radius (float)
opponent_radius (float)
tag_radius (float)
tag_reward (float)
tag_penalty (float)
step_cost (float)
measurement_noise (float)
robot_transition_cov_matrix (np.ndarray)
opponent_transition_cov_matrix (np.ndarray)
pursuit_speed (float)
dangerous_area_radius (float)
dangerous_area_penalty (float)
output_dir (Optional[Path])
debug (bool)
use_queue_logger (bool)
initial_state (Optional[np.ndarray])
- cache_visualization(history, cache_path)[source]
Cache visualization data for an episode history.
This method can be overridden by subclasses to provide environment-specific visualization caching capabilities.
- compute_metrics(histories)[source]
Compute environment-specific metrics from episode histories.
This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.
- get_metric_names()[source]
Get names of environment-specific metrics.
This method returns the names of custom metrics that this environment computes in the compute_metrics() method. It enables users to discover what metrics are available for hyperparameter optimization.
- Return type:
- Returns:
List of metric names that this environment produces. Default implementation returns empty list for environments without custom metrics.
Note
Subclasses that override compute_metrics() should also override this method to return the names of metrics they produce. Use an Enum to ensure consistency between the names returned here and the names used in compute_metrics().
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
ndarray) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- class POMDPPlanners.environments.laser_tag_pomdp.ContinuousLaserTagPOMDPDiscreteActions(discount_factor, name='ContinuousLaserTagPOMDPDiscreteActions', grid_size=(11.0, 7.0), walls=None, robot_radius=0.3, opponent_radius=0.3, tag_radius=0.5, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, robot_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), opponent_transition_cov_matrix=array([[0.05, 0.], [0., 0.05]]), pursuit_speed=0.6, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None)[source]
Bases:
ContinuousLaserTagPOMDP,DiscreteActionsEnvironmentContinuous LaserTag POMDP with discrete string actions.
Actions:
"up","down","right","left","tag".Example
>>> import numpy as np >>> np.random.seed(42) >>> >>> env = ContinuousLaserTagPOMDPDiscreteActions(discount_factor=0.95) >>> >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
name (str)
robot_radius (float)
opponent_radius (float)
tag_radius (float)
tag_reward (float)
tag_penalty (float)
step_cost (float)
measurement_noise (float)
robot_transition_cov_matrix (np.ndarray)
opponent_transition_cov_matrix (np.ndarray)
pursuit_speed (float)
dangerous_area_radius (float)
dangerous_area_penalty (float)
output_dir (Optional[Path])
debug (bool)
use_queue_logger (bool)
initial_state (Optional[np.ndarray])
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- class POMDPPlanners.environments.laser_tag_pomdp.LaserTagObservation(next_state, action, measurement_noise=1.0, floor_shape=(7, 11), walls=None)[source]
Bases:
ObservationModelObservation model for LaserTag POMDP.
Provides 8-directional laser range measurements from the robot’s position. Each measurement represents the number of clear cells in that direction before hitting a wall or boundary, with Gaussian noise.
- Parameters:
- next_state
The state after action execution as numpy array (shape (5,))
- action
The action that was taken
- measurement_noise
Standard deviation of Gaussian measurement noise
- floor_shape
Grid dimensions as (rows, cols)
- walls
Set of wall positions
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> state = np.array([3.0, 5.0, 2.0, 4.0, 0.0]) # Robot at (3,5), opponent at (2,4) >>> obs_model = LaserTagObservation( ... next_state=state, ... action=0, ... measurement_noise=1.0, ... floor_shape=(7, 11), ... walls=set() ... ) >>> observations = obs_model.sample(n_samples=3) >>> probabilities = obs_model.probability(observations)
- class POMDPPlanners.environments.laser_tag_pomdp.LaserTagPOMDP(discount_factor, name='LaserTagPOMDP', floor_shape=(11, 7), walls={(1, 2), (3, 0), (3, 4), (5, 0), (6, 4), (9, 1), (9, 4), (10, 6)}, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, dangerous_areas={(2, 5), (5, 3), (7, 1)}, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None, transition_error_prob=0.0)[source]
Bases:
DiscreteActionsEnvironmentLaserTag POMDP environment implementation.
This is a pursuit-evasion problem where a robot must navigate a grid to tag an opponent. The robot receives noisy observations of the opponent’s position and must decide when and where to attempt tagging.
Problem Structure: - States: numpy array [robot_row, robot_col, opp_row, opp_col, terminal] - Actions: North(0), South(1), East(2), West(3), Tag(4) - Observations: 8-directional laser measurements (N,NE,E,SE,S,SW,W,NW) - Rewards: Tag success(+10), Tag failure(-10), Movement(-1)
- Parameters:
discount_factor (float)
name (str)
tag_reward (float)
tag_penalty (float)
step_cost (float)
measurement_noise (float)
dangerous_area_radius (float)
dangerous_area_penalty (float)
output_dir (Path | None)
debug (bool)
use_queue_logger (bool)
initial_state (ndarray | None)
transition_error_prob (float)
- floor_shape
Grid dimensions as (rows, cols)
- walls
Set of wall positions as (row, col) tuples
- tag_reward
Reward for successful tagging
- tag_penalty
Penalty for unsuccessful tagging
- step_cost
Cost per movement action
- measurement_noise
Standard deviation of observation noise
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = LaserTagPOMDP(discount_factor=0.95) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- cache_visualization(history, cache_path)[source]
Cache visualization of the LaserTag episode as an animated GIF.
Creates an animated visualization showing: - Robot movement (red circle) - Opponent movement (blue circle) - Walls (black squares) - Dangerous areas (red circles) - Action arrows showing robot’s intended movement - Laser measurements (green rays from robot position) - Belief particles (if available) showing robot’s belief about opponent location - Grid boundaries and coordinate system
- Parameters:
- Raises:
ValueError – If history is empty or contains invalid data
TypeError – If cache_path is not a Path object or doesn’t end with .gif
- Return type:
- compute_metrics(histories)[source]
Compute LaserTag POMDP specific metrics from simulation histories.
- Return type:
- Parameters:
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
Observations are 8-dimensional laser measurements or terminal observations.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Return type:
- Parameters:
- class POMDPPlanners.environments.laser_tag_pomdp.LaserTagStateTransition(state, action, action_directions, floor_shape, walls, transition_error_prob=0.0)[source]
Bases:
StateTransitionModelState transition model for LaserTag POMDP.
Handles robot movement (deterministic based on action) and opponent movement (probabilistic, with tendency to move toward robot’s position).
- Parameters:
- state
Current state as numpy array (shape (5,))
- action
Action to be executed (0=North, 1=South, 2=East, 3=West, 4=Tag)
- floor_shape
Tuple of (rows, cols) for grid dimensions
- walls
Set of wall positions
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> state = np.array([3.0, 5.0, 2.0, 4.0, 0.0]) # Robot at (3,5), opponent at (2,4) >>> action_directions = { ... 0: (-1, 0), # North (up) ... 1: (1, 0), # South (down) ... 2: (0, 1), # East (right) ... 3: (0, -1), # West (left) ... 4: (0, 0), # Tag (no movement) ... } >>> transition = LaserTagStateTransition( ... state=state, ... action=0, # North ... action_directions=action_directions, ... floor_shape=(7, 11), ... walls=set() ... ) >>> next_states = transition.sample(n_samples=5) >>> probabilities = transition.probability(next_states)
Subpackages
- POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp_beliefs package
ContinuousLaserTagVectorizedUpdaterContinuousLaserTagVectorizedUpdater.wallsContinuousLaserTagVectorizedUpdater.grid_sizeContinuousLaserTagVectorizedUpdater.robot_radiusContinuousLaserTagVectorizedUpdater.opponent_radiusContinuousLaserTagVectorizedUpdater.tag_radiusContinuousLaserTagVectorizedUpdater.pursuit_speedContinuousLaserTagVectorizedUpdater.measurement_noiseContinuousLaserTagVectorizedUpdater.batch_observation_log_likelihood()ContinuousLaserTagVectorizedUpdater.batch_transition()ContinuousLaserTagVectorizedUpdater.config_idContinuousLaserTagVectorizedUpdater.from_environment()
LaserTagVectorizedUpdaterLaserTagVectorizedUpdater.floor_shapeLaserTagVectorizedUpdater.valid_cellLaserTagVectorizedUpdater.wall_dist_tableLaserTagVectorizedUpdater.measurement_noiseLaserTagVectorizedUpdater.transition_error_probLaserTagVectorizedUpdater.batch_observation_log_likelihood()LaserTagVectorizedUpdater.batch_transition()LaserTagVectorizedUpdater.config_idLaserTagVectorizedUpdater.from_environment()
create_continuous_laser_tag_belief()create_laser_tag_belief()- Submodules
- POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp_beliefs.continuous_laser_tag_belief_factory module
- POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp_beliefs.continuous_laser_tag_vectorized_updater module
ContinuousLaserTagVectorizedUpdaterContinuousLaserTagVectorizedUpdater.wallsContinuousLaserTagVectorizedUpdater.grid_sizeContinuousLaserTagVectorizedUpdater.robot_radiusContinuousLaserTagVectorizedUpdater.opponent_radiusContinuousLaserTagVectorizedUpdater.tag_radiusContinuousLaserTagVectorizedUpdater.pursuit_speedContinuousLaserTagVectorizedUpdater.measurement_noiseContinuousLaserTagVectorizedUpdater.batch_observation_log_likelihood()ContinuousLaserTagVectorizedUpdater.batch_transition()ContinuousLaserTagVectorizedUpdater.config_idContinuousLaserTagVectorizedUpdater.from_environment()
- POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp_beliefs.laser_tag_belief_factory module
- POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp_beliefs.laser_tag_vectorized_updater module
LaserTagVectorizedUpdaterLaserTagVectorizedUpdater.floor_shapeLaserTagVectorizedUpdater.valid_cellLaserTagVectorizedUpdater.wall_dist_tableLaserTagVectorizedUpdater.measurement_noiseLaserTagVectorizedUpdater.transition_error_probLaserTagVectorizedUpdater.batch_observation_log_likelihood()LaserTagVectorizedUpdater.batch_transition()LaserTagVectorizedUpdater.config_idLaserTagVectorizedUpdater.from_environment()
Submodules
POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry module
Geometry utilities for the Continuous LaserTag POMDP.
Provides ray-AABB intersection, ray-circle intersection, wall collision resolution and grid clamping used by the continuous laser-tag environment and its vectorized belief updater.
Wall AABBs are stored as rows (cx, cy, hx, hy) where (cx, cy) is
the center and (hx, hy) the half-extents. Entity radii are used for
circle-AABB overlap tests during collision resolution.
- Functions:
- ray_aabb_distances: Vectorized ray-AABB slab intersection for multiple
rays originating from a single point against an array of AABBs.
- ray_circle_distance: Distance along a ray to the nearest intersection
with a circle.
compute_laser_measurements: Full 8-direction laser scan from a position. resolve_wall_collision: Push a circular entity out of overlapping AABBs. clamp_to_grid: Clamp a 2-D position to the grid boundaries.
- POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.batch_clamp_to_grid(positions, entity_radius, grid_size)[source]
Clamp an array of positions to the grid.
- POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.batch_laser_measurements(robot_positions, opponent_positions, opponent_radius, walls, grid_size)[source]
Compute 8-direction laser measurements for many particles.
- Parameters:
- Return type:
- Returns:
Shape
(N, 8)measurement array.
- POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.batch_resolve_wall_collision(positions, entity_radius, walls)[source]
Resolve wall collisions for an array of positions.
- POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.clamp_to_grid(position, entity_radius, grid_size)[source]
Clamp a position so the entity circle stays within
[0, w] x [0, h].
- POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.compute_laser_measurements(robot_pos, opponent_pos, opponent_radius, walls, grid_size)[source]
Compute 8-direction laser measurements from the robot.
Each measurement is the distance to the nearest obstacle (wall AABB, opponent circle, or grid boundary) along the corresponding ray in
LASER_DIRECTIONS.- Parameters:
- Return type:
- Returns:
Shape
(8,)array of distances.
- POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.ray_aabb_distances(origin, directions, walls)[source]
Compute distances from origin along each ray to the nearest wall AABB.
Uses the slab method. For each of the D directions the minimum positive intersection distance across all M walls is returned. If a ray does not hit any wall before
_RAY_MAXthe returned distance is_RAY_MAX.
- POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.ray_circle_distance(origin, direction, center, radius)[source]
Distance along a ray to the nearest intersection with a circle.
- POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.resolve_wall_collision(position, entity_radius, walls)[source]
Push a circular entity out of any overlapping wall AABBs.
For each wall, if the entity circle overlaps the AABB, the entity is pushed along the axis of minimum penetration.
POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_pomdp module
Continuous LaserTag POMDP Environment Implementation.
This module implements a continuous-space variant of the LaserTag pursuit-evasion POMDP where a robot must navigate to tag an opponent that moves stochastically through continuous 2-D space.
Two environment classes are provided:
ContinuousLaserTagPOMDP– continuous actions[dx, dy, tag_flag]ContinuousLaserTagPOMDPDiscreteActions– five string actions"up","down","right","left","tag"
- State representation:
np.ndarrayshape(5,)–[robot_x, robot_y, opponent_x, opponent_y, terminal_flag]- Observation:
np.ndarrayshape(8,)– noisy 8-direction laser range measurements. Terminal observation isnp.full(8, -1.0).- Classes:
ContinuousLaserTagStateTransitionModel: State transition model. ContinuousLaserTagObservationModel: Observation model. ContinuousLaserTagPOMDP: Continuous-action environment. ContinuousLaserTagPOMDPDiscreteActions: Discrete-action variant.
- class POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_pomdp.ContinuousLaserTagObservationModel(next_state, action, measurement_noise, walls, grid_size, opponent_radius)[source]
Bases:
ObservationModelObservation model for the Continuous LaserTag POMDP.
Provides 8-direction laser range measurements with Gaussian noise.
Example
>>> import numpy as np >>> np.random.seed(42) >>> state = np.array([3.0, 3.0, 8.0, 5.0, 0.0]) >>> walls = np.empty((0, 4)) >>> grid_size = np.array([11.0, 7.0]) >>> model = ContinuousLaserTagObservationModel( ... next_state=state, action=np.array([1.0, 0.0, 0.0]), ... measurement_noise=1.0, walls=walls, ... grid_size=grid_size, opponent_radius=0.3, ... ) >>> obs = model.sample(n_samples=2) >>> len(obs) 2
- Parameters:
- probability(values)[source]
Calculate observation probabilities for given values.
- Parameters:
values (
List[Any]) – List of observation values to calculate probabilities for- Return type:
- Returns:
Array of probabilities corresponding to the input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- sample(n_samples=1)[source]
Sample observations from the observation model.
- Parameters:
n_samples (
int) – Number of observation samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled observations of length n_samples.
Note
Subclasses must implement this method according to their specific observation generation logic.
- class POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_pomdp.ContinuousLaserTagPOMDP(discount_factor, name='ContinuousLaserTagPOMDP', grid_size=(11.0, 7.0), walls=None, robot_radius=0.3, opponent_radius=0.3, tag_radius=0.5, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, robot_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), opponent_transition_cov_matrix=array([[0.05, 0.], [0., 0.05]]), pursuit_speed=0.6, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None)[source]
Bases:
EnvironmentContinuous LaserTag POMDP with continuous
[dx, dy, tag_flag]actions.A pursuit-evasion problem in continuous 2-D space where a robot must navigate to tag an opponent. The robot receives noisy 8-direction laser range observations.
Example
>>> import numpy as np >>> np.random.seed(42) >>> >>> # Initialize environment >>> env = ContinuousLaserTagPOMDP(discount_factor=0.95) >>> >>> # Get initial state >>> initial_state = env.initial_state_dist().sample()[0] >>> >>> # Sample complete step >>> action = np.array([1.0, 0.0, 0.0]) >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
name (str)
robot_radius (float)
opponent_radius (float)
tag_radius (float)
tag_reward (float)
tag_penalty (float)
step_cost (float)
measurement_noise (float)
robot_transition_cov_matrix (np.ndarray)
opponent_transition_cov_matrix (np.ndarray)
pursuit_speed (float)
dangerous_area_radius (float)
dangerous_area_penalty (float)
output_dir (Optional[Path])
debug (bool)
use_queue_logger (bool)
initial_state (Optional[np.ndarray])
- cache_visualization(history, cache_path)[source]
Cache visualization data for an episode history.
This method can be overridden by subclasses to provide environment-specific visualization caching capabilities.
- compute_metrics(histories)[source]
Compute environment-specific metrics from episode histories.
This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.
- get_metric_names()[source]
Get names of environment-specific metrics.
This method returns the names of custom metrics that this environment computes in the compute_metrics() method. It enables users to discover what metrics are available for hyperparameter optimization.
- Return type:
- Returns:
List of metric names that this environment produces. Default implementation returns empty list for environments without custom metrics.
Note
Subclasses that override compute_metrics() should also override this method to return the names of metrics they produce. Use an Enum to ensure consistency between the names returned here and the names used in compute_metrics().
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
ndarray) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- class POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_pomdp.ContinuousLaserTagPOMDPDiscreteActions(discount_factor, name='ContinuousLaserTagPOMDPDiscreteActions', grid_size=(11.0, 7.0), walls=None, robot_radius=0.3, opponent_radius=0.3, tag_radius=0.5, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, robot_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), opponent_transition_cov_matrix=array([[0.05, 0.], [0., 0.05]]), pursuit_speed=0.6, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None)[source]
Bases:
ContinuousLaserTagPOMDP,DiscreteActionsEnvironmentContinuous LaserTag POMDP with discrete string actions.
Actions:
"up","down","right","left","tag".Example
>>> import numpy as np >>> np.random.seed(42) >>> >>> env = ContinuousLaserTagPOMDPDiscreteActions(discount_factor=0.95) >>> >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> env.is_terminal(initial_state) False
- Parameters:
discount_factor (float)
name (str)
robot_radius (float)
opponent_radius (float)
tag_radius (float)
tag_reward (float)
tag_penalty (float)
step_cost (float)
measurement_noise (float)
robot_transition_cov_matrix (np.ndarray)
opponent_transition_cov_matrix (np.ndarray)
pursuit_speed (float)
dangerous_area_radius (float)
dangerous_area_penalty (float)
output_dir (Optional[Path])
debug (bool)
use_queue_logger (bool)
initial_state (Optional[np.ndarray])
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- class POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_pomdp.ContinuousLaserTagPOMDPMetrics(*values)[source]
Bases:
EnumMetric names for Continuous LaserTag POMDP.
- AVERAGE_ALL_DANGEROUS_ENCOUNTERS = 'average_all_dangerous_encounters'
- AVERAGE_DANGEROUS_AREA_STEPS = 'average_dangerous_area_steps'
- AVERAGE_EPISODE_LENGTH = 'average_episode_length'
- AVERAGE_FAILED_TAG_ATTEMPTS = 'average_failed_tag_attempts'
- AVERAGE_WALL_COLLISIONS = 'average_wall_collisions'
- GOAL_REACHING_RATE = 'goal_reaching_rate'
- TAG_SUCCESS_RATE = 'tag_success_rate'
- class POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_pomdp.ContinuousLaserTagStateTransitionModel(state, action, robot_transition_dist, opponent_transition_dist, pursuit_speed, walls, grid_size, robot_radius, opponent_radius, tag_radius)[source]
Bases:
StateTransitionModelState transition model for the Continuous LaserTag POMDP.
Robot movement:
next_pos = pos + action[:2] + noisewhere noise is sampled from a 2-D Gaussian. Opponent pursues the robot stochastically.Example
>>> import numpy as np >>> np.random.seed(42) >>> from POMDPPlanners.utils.multivariate_normal import ( ... CovarianceParameterizedMultivariateNormal, ... ) >>> state = np.array([3.0, 3.0, 8.0, 5.0, 0.0]) >>> action = np.array([1.0, 0.0, 0.0]) >>> robot_dist = CovarianceParameterizedMultivariateNormal(np.eye(2) * 0.1) >>> opponent_dist = CovarianceParameterizedMultivariateNormal(np.eye(2) * 0.05) >>> walls = np.empty((0, 4)) >>> grid_size = np.array([11.0, 7.0]) >>> model = ContinuousLaserTagStateTransitionModel( ... state=state, action=action, ... robot_transition_dist=robot_dist, ... opponent_transition_dist=opponent_dist, ... pursuit_speed=0.6, ... walls=walls, grid_size=grid_size, ... robot_radius=0.3, opponent_radius=0.3, tag_radius=0.5, ... ) >>> samples = model.sample(n_samples=3) >>> len(samples) 3
- Parameters:
state (np.ndarray)
action (np.ndarray)
robot_transition_dist (CovarianceParameterizedMultivariateNormal)
opponent_transition_dist (CovarianceParameterizedMultivariateNormal)
pursuit_speed (float)
walls (np.ndarray)
grid_size (np.ndarray)
robot_radius (float)
opponent_radius (float)
tag_radius (float)
- probability(values)[source]
Calculate transition probabilities for given next states.
- Parameters:
values (
List[Any]) – List of next state values to calculate probabilities for- Return type:
- Returns:
Array of transition probabilities corresponding to the input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- sample(n_samples=1)[source]
Sample next states from the transition model.
- Parameters:
n_samples (
int) – Number of next state samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled next states of length n_samples.
Note
Subclasses must implement this method according to their specific state transition dynamics.
POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_visualizer module
Continuous LaserTag POMDP Visualization Module.
This module provides visualization for the continuous-space LaserTag environment, creating animated GIF visualizations of episodes.
- class POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_visualizer.ContinuousLaserTagVisualizer(grid_size, walls, robot_radius, opponent_radius, dangerous_areas, dangerous_area_radius)[source]
Bases:
objectHandles visualization for the Continuous LaserTag POMDP.
Creates animated GIF visualizations showing robot and opponent movement as rendered icons, rectangular walls, laser rays, belief particles, and tag indicators. The robot is shown as a red humanoid and the opponent as a blue wheeled rover.
- Parameters:
- grid_size
Arena dimensions
(width, height)as ndarray.
- walls
Shape
(M, 4)wall AABB array.
- robot_radius
Robot body radius.
- opponent_radius
Opponent body radius.
- dangerous_areas
Dangerous area centers as
(x, y)tuples.
- dangerous_area_radius
Radius of dangerous areas.
POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp module
LaserTag POMDP Environment Implementation.
This module implements the LaserTag problem, a pursuit-evasion POMDP environment where an agent must navigate a grid to tag an opponent that moves stochastically. The agent has noisy observations of the opponent’s location.
The LaserTag problem features: - A grid-based environment (default 7x11) with optional walls - Robot and opponent moving on discrete grid cells - 5 possible actions: North, South, East, West, Tag - 8-directional laser range measurements with Gaussian noise - Positive reward for successful tagging, negative reward for failed tag attempts - Step cost for each movement action - Opponent moves with 0.4 prob toward robot in x-dir, 0.4 prob toward robot in y-dir, 0.2 prob stay - When aligned on an axis, the 0.4 budget is split equally (0.2/0.2) between both directions
- Classes:
LaserTagState: State representation with robot and opponent positions LaserTagStateTransition: State transition model for robot and opponent movement LaserTagObservation: Observation model with noisy opponent position measurements LaserTagPOMDP: Main environment class implementing the LaserTag problem
- class POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp.LaserTagObservation(next_state, action, measurement_noise=1.0, floor_shape=(7, 11), walls=None)[source]
Bases:
ObservationModelObservation model for LaserTag POMDP.
Provides 8-directional laser range measurements from the robot’s position. Each measurement represents the number of clear cells in that direction before hitting a wall or boundary, with Gaussian noise.
- Parameters:
- next_state
The state after action execution as numpy array (shape (5,))
- action
The action that was taken
- measurement_noise
Standard deviation of Gaussian measurement noise
- floor_shape
Grid dimensions as (rows, cols)
- walls
Set of wall positions
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> state = np.array([3.0, 5.0, 2.0, 4.0, 0.0]) # Robot at (3,5), opponent at (2,4) >>> obs_model = LaserTagObservation( ... next_state=state, ... action=0, ... measurement_noise=1.0, ... floor_shape=(7, 11), ... walls=set() ... ) >>> observations = obs_model.sample(n_samples=3) >>> probabilities = obs_model.probability(observations)
- class POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp.LaserTagPOMDP(discount_factor, name='LaserTagPOMDP', floor_shape=(11, 7), walls={(1, 2), (3, 0), (3, 4), (5, 0), (6, 4), (9, 1), (9, 4), (10, 6)}, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, dangerous_areas={(2, 5), (5, 3), (7, 1)}, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None, transition_error_prob=0.0)[source]
Bases:
DiscreteActionsEnvironmentLaserTag POMDP environment implementation.
This is a pursuit-evasion problem where a robot must navigate a grid to tag an opponent. The robot receives noisy observations of the opponent’s position and must decide when and where to attempt tagging.
Problem Structure: - States: numpy array [robot_row, robot_col, opp_row, opp_col, terminal] - Actions: North(0), South(1), East(2), West(3), Tag(4) - Observations: 8-directional laser measurements (N,NE,E,SE,S,SW,W,NW) - Rewards: Tag success(+10), Tag failure(-10), Movement(-1)
- Parameters:
discount_factor (float)
name (str)
tag_reward (float)
tag_penalty (float)
step_cost (float)
measurement_noise (float)
dangerous_area_radius (float)
dangerous_area_penalty (float)
output_dir (Path | None)
debug (bool)
use_queue_logger (bool)
initial_state (ndarray | None)
transition_error_prob (float)
- floor_shape
Grid dimensions as (rows, cols)
- walls
Set of wall positions as (row, col) tuples
- tag_reward
Reward for successful tagging
- tag_penalty
Penalty for unsuccessful tagging
- step_cost
Cost per movement action
- measurement_noise
Standard deviation of observation noise
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = LaserTagPOMDP(discount_factor=0.95) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- cache_visualization(history, cache_path)[source]
Cache visualization of the LaserTag episode as an animated GIF.
Creates an animated visualization showing: - Robot movement (red circle) - Opponent movement (blue circle) - Walls (black squares) - Dangerous areas (red circles) - Action arrows showing robot’s intended movement - Laser measurements (green rays from robot position) - Belief particles (if available) showing robot’s belief about opponent location - Grid boundaries and coordinate system
- Parameters:
- Raises:
ValueError – If history is empty or contains invalid data
TypeError – If cache_path is not a Path object or doesn’t end with .gif
- Return type:
- compute_metrics(histories)[source]
Compute LaserTag POMDP specific metrics from simulation histories.
- Return type:
- Parameters:
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
Observations are 8-dimensional laser measurements or terminal observations.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Return type:
- Parameters:
- class POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp.LaserTagPOMDPMetrics(*values)[source]
Bases:
EnumMetric names for LaserTag POMDP environment.
- AVERAGE_ALL_DANGEROUS_ENCOUNTERS = 'average_all_dangerous_encounters'
- AVERAGE_DANGEROUS_AREA_STEPS = 'average_dangerous_area_steps'
- AVERAGE_EPISODE_LENGTH = 'average_episode_length'
- AVERAGE_FAILED_TAG_ATTEMPTS = 'average_failed_tag_attempts'
- AVERAGE_OBSTACLE_COLLISIONS = 'average_obstacle_collisions'
- GOAL_REACHING_RATE = 'goal_reaching_rate'
- TAG_SUCCESS_RATE = 'tag_success_rate'
- class POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp.LaserTagStateTransition(state, action, action_directions, floor_shape, walls, transition_error_prob=0.0)[source]
Bases:
StateTransitionModelState transition model for LaserTag POMDP.
Handles robot movement (deterministic based on action) and opponent movement (probabilistic, with tendency to move toward robot’s position).
- Parameters:
- state
Current state as numpy array (shape (5,))
- action
Action to be executed (0=North, 1=South, 2=East, 3=West, 4=Tag)
- floor_shape
Tuple of (rows, cols) for grid dimensions
- walls
Set of wall positions
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> state = np.array([3.0, 5.0, 2.0, 4.0, 0.0]) # Robot at (3,5), opponent at (2,4) >>> action_directions = { ... 0: (-1, 0), # North (up) ... 1: (1, 0), # South (down) ... 2: (0, 1), # East (right) ... 3: (0, -1), # West (left) ... 4: (0, 0), # Tag (no movement) ... } >>> transition = LaserTagStateTransition( ... state=state, ... action=0, # North ... action_directions=action_directions, ... floor_shape=(7, 11), ... walls=set() ... ) >>> next_states = transition.sample(n_samples=5) >>> probabilities = transition.probability(next_states)
POMDPPlanners.environments.laser_tag_pomdp.laser_tag_visualizer module
LaserTag POMDP Visualization Module.
This module provides visualization functionality for LaserTag POMDP environments, creating animated GIF visualizations of episodes.
- class POMDPPlanners.environments.laser_tag_pomdp.laser_tag_visualizer.LaserTagVisualizer(floor_shape, walls, dangerous_areas, dangerous_area_radius)[source]
Bases:
objectHandles visualization for LaserTag POMDP environments.
Creates animated GIF visualizations showing robot movement, opponent movement, walls, laser measurements, belief particles, and action indicators.
- Parameters:
- floor_shape
Grid dimensions as (rows, cols)
- walls
Set of wall positions as (row, col) tuples
- dangerous_areas
List of dangerous area center positions
- dangerous_area_radius
Radius around dangerous area centers
- create_visualization(history, cache_path)[source]
Create animated GIF visualization of a LaserTag episode.
Creates an animated visualization showing: - Robot movement (red circle with path trail) - Opponent movement (blue circle with path trail) - Walls (black squares) - Dangerous areas (red circles) - Action arrows showing robot’s intended movement - Laser measurements (green rays from robot position) - Belief particles (if available) showing robot’s belief about opponent location - Grid boundaries and coordinate system - Step counter and action labels
- Parameters:
- Raises:
ValueError – If history is empty or contains invalid data
TypeError – If cache_path is not a Path object or doesn’t end with .gif
- Return type: