POMDPPlanners.environments.laser_tag_pomdp package

LaserTag POMDP Environment Package.

This package implements the LaserTag pursuit-evasion POMDP environment in both discrete-grid and continuous-space variants.

Note

LaserTagState is now represented as numpy arrays with shape (5,). See laser_tag_pomdp.py for state vector structure documentation.

class POMDPPlanners.environments.laser_tag_pomdp.ContinuousLaserTagPOMDP(discount_factor, name='ContinuousLaserTagPOMDP', grid_size=(11.0, 7.0), walls=None, robot_radius=0.3, opponent_radius=0.3, tag_radius=0.5, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, robot_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), opponent_transition_cov_matrix=array([[0.05, 0.], [0., 0.05]]), pursuit_speed=0.6, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None)[source]

Bases: Environment

Continuous LaserTag POMDP with continuous [dx, dy, tag_flag] actions.

A pursuit-evasion problem in continuous 2-D space where a robot must navigate to tag an opponent. The robot receives noisy 8-direction laser range observations.

Example

>>> import numpy as np
>>> np.random.seed(42)
>>>
>>> # Initialize environment
>>> env = ContinuousLaserTagPOMDP(discount_factor=0.95)
>>>
>>> # Get initial state
>>> initial_state = env.initial_state_dist().sample()[0]
>>>
>>> # Sample complete step
>>> action = np.array([1.0, 0.0, 0.0])
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
  • discount_factor (float)

  • name (str)

  • grid_size (Tuple[float, float])

  • walls (Optional[List[Tuple[float, float, float, float]]])

  • robot_radius (float)

  • opponent_radius (float)

  • tag_radius (float)

  • tag_reward (float)

  • tag_penalty (float)

  • step_cost (float)

  • measurement_noise (float)

  • robot_transition_cov_matrix (np.ndarray)

  • opponent_transition_cov_matrix (np.ndarray)

  • pursuit_speed (float)

  • dangerous_areas (Optional[List[Tuple[float, float]]])

  • dangerous_area_radius (float)

  • dangerous_area_penalty (float)

  • output_dir (Optional[Path])

  • debug (bool)

  • use_queue_logger (bool)

  • initial_state (Optional[np.ndarray])

cache_visualization(history, cache_path)[source]

Cache visualization data for an episode history.

This method can be overridden by subclasses to provide environment-specific visualization caching capabilities.

Parameters:
  • history (List[StepData]) – List of step data from an episode

  • cache_path (Path) – Path where visualization data should be cached

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics from episode histories.

This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.

Parameters:

histories (List[History]) – List of episode histories to analyze

Return type:

List[MetricValue]

Returns:

List of computed metrics with confidence intervals

get_metric_names()[source]

Get names of environment-specific metrics.

This method returns the names of custom metrics that this environment computes in the compute_metrics() method. It enables users to discover what metrics are available for hyperparameter optimization.

Return type:

List[str]

Returns:

List of metric names that this environment produces. Default implementation returns empty list for environments without custom metrics.

Note

Subclasses that override compute_metrics() should also override this method to return the names of metrics they produce. Use an Enum to ensure consistency between the names returned here and the names used in compute_metrics().

property grid_size: ndarray
initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (Any) – First observation to compare

  • observation2 (Any) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (ndarray) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (ndarray) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (ndarray) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (ndarray) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (ndarray) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

property walls: ndarray
class POMDPPlanners.environments.laser_tag_pomdp.ContinuousLaserTagPOMDPDiscreteActions(discount_factor, name='ContinuousLaserTagPOMDPDiscreteActions', grid_size=(11.0, 7.0), walls=None, robot_radius=0.3, opponent_radius=0.3, tag_radius=0.5, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, robot_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), opponent_transition_cov_matrix=array([[0.05, 0.], [0., 0.05]]), pursuit_speed=0.6, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None)[source]

Bases: ContinuousLaserTagPOMDP, DiscreteActionsEnvironment

Continuous LaserTag POMDP with discrete string actions.

Actions: "up", "down", "right", "left", "tag".

Example

>>> import numpy as np
>>> np.random.seed(42)
>>>
>>> env = ContinuousLaserTagPOMDPDiscreteActions(discount_factor=0.95)
>>>
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> env.is_terminal(initial_state)
False
Parameters:
  • discount_factor (float)

  • name (str)

  • grid_size (Tuple[float, float])

  • walls (Optional[List[Tuple[float, float, float, float]]])

  • robot_radius (float)

  • opponent_radius (float)

  • tag_radius (float)

  • tag_reward (float)

  • tag_penalty (float)

  • step_cost (float)

  • measurement_noise (float)

  • robot_transition_cov_matrix (np.ndarray)

  • opponent_transition_cov_matrix (np.ndarray)

  • pursuit_speed (float)

  • dangerous_areas (Optional[List[Tuple[float, float]]])

  • dangerous_area_radius (float)

  • dangerous_area_penalty (float)

  • output_dir (Optional[Path])

  • debug (bool)

  • use_queue_logger (bool)

  • initial_state (Optional[np.ndarray])

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[str]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (Any) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (Any) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.laser_tag_pomdp.LaserTagObservation(next_state, action, measurement_noise=1.0, floor_shape=(7, 11), walls=None)[source]

Bases: ObservationModel

Observation model for LaserTag POMDP.

Provides 8-directional laser range measurements from the robot’s position. Each measurement represents the number of clear cells in that direction before hitting a wall or boundary, with Gaussian noise.

Parameters:
next_state

The state after action execution as numpy array (shape (5,))

action

The action that was taken

measurement_noise

Standard deviation of Gaussian measurement noise

floor_shape

Grid dimensions as (rows, cols)

walls

Set of wall positions

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> state = np.array([3.0, 5.0, 2.0, 4.0, 0.0])  # Robot at (3,5), opponent at (2,4)
>>> obs_model = LaserTagObservation(
...     next_state=state,
...     action=0,
...     measurement_noise=1.0,
...     floor_shape=(7, 11),
...     walls=set()
... )
>>> observations = obs_model.sample(n_samples=3)
>>> probabilities = obs_model.probability(observations)
probability(values)[source]

Calculate observation probabilities for given values.

Return type:

ndarray

Parameters:

values (List[Any])

sample(n_samples=1)[source]

Sample observations from the observation model.

Return type:

List[Tuple[float, ...]]

Returns:

List of 8-tuple observations representing laser measurements in 8 directions

Parameters:

n_samples (int)

class POMDPPlanners.environments.laser_tag_pomdp.LaserTagPOMDP(discount_factor, name='LaserTagPOMDP', floor_shape=(11, 7), walls={(1, 2), (3, 0), (3, 4), (5, 0), (6, 4), (9, 1), (9, 4), (10, 6)}, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, dangerous_areas={(2, 5), (5, 3), (7, 1)}, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None, transition_error_prob=0.0)[source]

Bases: DiscreteActionsEnvironment

LaserTag POMDP environment implementation.

This is a pursuit-evasion problem where a robot must navigate a grid to tag an opponent. The robot receives noisy observations of the opponent’s position and must decide when and where to attempt tagging.

Problem Structure: - States: numpy array [robot_row, robot_col, opp_row, opp_col, terminal] - Actions: North(0), South(1), East(2), West(3), Tag(4) - Observations: 8-directional laser measurements (N,NE,E,SE,S,SW,W,NW) - Rewards: Tag success(+10), Tag failure(-10), Movement(-1)

Parameters:
floor_shape

Grid dimensions as (rows, cols)

walls

Set of wall positions as (row, col) tuples

tag_reward

Reward for successful tagging

tag_penalty

Penalty for unsuccessful tagging

step_cost

Cost per movement action

measurement_noise

Standard deviation of observation noise

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = LaserTagPOMDP(discount_factor=0.95)
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
cache_visualization(history, cache_path)[source]

Cache visualization of the LaserTag episode as an animated GIF.

Creates an animated visualization showing: - Robot movement (red circle) - Opponent movement (blue circle) - Walls (black squares) - Dangerous areas (red circles) - Action arrows showing robot’s intended movement - Laser measurements (green rays from robot position) - Belief particles (if available) showing robot’s belief about opponent location - Grid boundaries and coordinate system

Parameters:
  • history (List[StepData]) – The history of states, actions, and observations from an episode

  • cache_path (Path) – Path where to save the visualization GIF

Raises:
  • ValueError – If history is empty or contains invalid data

  • TypeError – If cache_path is not a Path object or doesn’t end with .gif

Return type:

None

compute_metrics(histories)[source]

Compute LaserTag POMDP specific metrics from simulation histories.

Return type:

List[MetricValue]

Parameters:

histories (List[History])

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[int]

get_metric_names()[source]

Get names of LaserTag POMDP specific metrics.

Returns:

tag_success_rate, average_episode_length, average_failed_tag_attempts, average_obstacle_collisions, average_dangerous_area_steps, and average_all_dangerous_encounters

Return type:

List[str]

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Observations are 8-dimensional laser measurements or terminal observations.

Return type:

bool

Parameters:
  • observation1 (Any)

  • observation2 (Any)

is_terminal(state)[source]

Check if a state is terminal.

Return type:

bool

Parameters:

state (ndarray)

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Return type:

ObservationModel

Parameters:
reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Return type:

float

Parameters:
state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Return type:

StateTransitionModel

Parameters:
class POMDPPlanners.environments.laser_tag_pomdp.LaserTagStateTransition(state, action, action_directions, floor_shape, walls, transition_error_prob=0.0)[source]

Bases: StateTransitionModel

State transition model for LaserTag POMDP.

Handles robot movement (deterministic based on action) and opponent movement (probabilistic, with tendency to move toward robot’s position).

Parameters:
state

Current state as numpy array (shape (5,))

action

Action to be executed (0=North, 1=South, 2=East, 3=West, 4=Tag)

floor_shape

Tuple of (rows, cols) for grid dimensions

walls

Set of wall positions

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> state = np.array([3.0, 5.0, 2.0, 4.0, 0.0])  # Robot at (3,5), opponent at (2,4)
>>> action_directions = {
...     0: (-1, 0),  # North (up)
...     1: (1, 0),   # South (down)
...     2: (0, 1),   # East (right)
...     3: (0, -1),  # West (left)
...     4: (0, 0),   # Tag (no movement)
... }
>>> transition = LaserTagStateTransition(
...     state=state,
...     action=0,  # North
...     action_directions=action_directions,
...     floor_shape=(7, 11),
...     walls=set()
... )
>>> next_states = transition.sample(n_samples=5)
>>> probabilities = transition.probability(next_states)
probability(values)[source]

Calculate transition probabilities for given next states.

Return type:

ndarray

Parameters:

values (List[Any])

sample(n_samples=1)[source]

Sample next states from the transition model.

Return type:

List[ndarray]

Parameters:

n_samples (int)

Subpackages

Submodules

POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry module

Geometry utilities for the Continuous LaserTag POMDP.

Provides ray-AABB intersection, ray-circle intersection, wall collision resolution and grid clamping used by the continuous laser-tag environment and its vectorized belief updater.

Wall AABBs are stored as rows (cx, cy, hx, hy) where (cx, cy) is the center and (hx, hy) the half-extents. Entity radii are used for circle-AABB overlap tests during collision resolution.

Functions:
ray_aabb_distances: Vectorized ray-AABB slab intersection for multiple

rays originating from a single point against an array of AABBs.

ray_circle_distance: Distance along a ray to the nearest intersection

with a circle.

compute_laser_measurements: Full 8-direction laser scan from a position. resolve_wall_collision: Push a circular entity out of overlapping AABBs. clamp_to_grid: Clamp a 2-D position to the grid boundaries.

POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.batch_clamp_to_grid(positions, entity_radius, grid_size)[source]

Clamp an array of positions to the grid.

Parameters:
  • positions (ndarray) – Shape (N, 2).

  • entity_radius (float) – Entity body radius.

  • grid_size (ndarray) – Shape (2,)(width, height).

Return type:

ndarray

Returns:

Shape (N, 2) clamped positions.

POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.batch_laser_measurements(robot_positions, opponent_positions, opponent_radius, walls, grid_size)[source]

Compute 8-direction laser measurements for many particles.

Parameters:
  • robot_positions (ndarray) – Shape (N, 2).

  • opponent_positions (ndarray) – Shape (N, 2).

  • opponent_radius (float) – Opponent body radius.

  • walls (ndarray) – Shape (M, 4) – wall AABBs.

  • grid_size (ndarray) – Shape (2,)(width, height).

Return type:

ndarray

Returns:

Shape (N, 8) measurement array.

POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.batch_resolve_wall_collision(positions, entity_radius, walls)[source]

Resolve wall collisions for an array of positions.

Parameters:
  • positions (ndarray) – Shape (N, 2).

  • entity_radius (float) – Entity body radius.

  • walls (ndarray) – Shape (M, 4).

Return type:

ndarray

Returns:

Shape (N, 2) resolved positions.

POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.clamp_to_grid(position, entity_radius, grid_size)[source]

Clamp a position so the entity circle stays within [0, w] x [0, h].

Parameters:
  • position (ndarray) – Shape (2,) – entity center.

  • entity_radius (float) – Entity body radius.

  • grid_size (ndarray) – Shape (2,)(width, height) of the arena.

Return type:

ndarray

Returns:

Clamped position as shape (2,) array.

POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.compute_laser_measurements(robot_pos, opponent_pos, opponent_radius, walls, grid_size)[source]

Compute 8-direction laser measurements from the robot.

Each measurement is the distance to the nearest obstacle (wall AABB, opponent circle, or grid boundary) along the corresponding ray in LASER_DIRECTIONS.

Parameters:
  • robot_pos (ndarray) – Shape (2,) – robot (x, y).

  • opponent_pos (ndarray) – Shape (2,) – opponent (x, y).

  • opponent_radius (float) – Opponent body radius.

  • walls (ndarray) – Shape (M, 4) – wall AABBs.

  • grid_size (ndarray) – Shape (2,)(width, height) of the arena.

Return type:

ndarray

Returns:

Shape (8,) array of distances.

POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.ray_aabb_distances(origin, directions, walls)[source]

Compute distances from origin along each ray to the nearest wall AABB.

Uses the slab method. For each of the D directions the minimum positive intersection distance across all M walls is returned. If a ray does not hit any wall before _RAY_MAX the returned distance is _RAY_MAX.

Parameters:
  • origin (ndarray) – Shape (2,) – ray origin (x, y).

  • directions (ndarray) – Shape (D, 2) – unit direction vectors.

  • walls (ndarray) – Shape (M, 4) – AABBs (cx, cy, hx, hy).

Return type:

ndarray

Returns:

Shape (D,) array of nearest intersection distances (positive).

POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.ray_circle_distance(origin, direction, center, radius)[source]

Distance along a ray to the nearest intersection with a circle.

Parameters:
  • origin (ndarray) – Shape (2,) – ray origin.

  • direction (ndarray) – Shape (2,) – unit direction.

  • center (ndarray) – Shape (2,) – circle center.

  • radius (float) – Circle radius.

Return type:

float

Returns:

Positive intersection distance, or np.inf if no hit.

POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_geometry.resolve_wall_collision(position, entity_radius, walls)[source]

Push a circular entity out of any overlapping wall AABBs.

For each wall, if the entity circle overlaps the AABB, the entity is pushed along the axis of minimum penetration.

Parameters:
  • position (ndarray) – Shape (2,) – entity center.

  • entity_radius (float) – Entity body radius.

  • walls (ndarray) – Shape (M, 4) – wall AABBs.

Return type:

ndarray

Returns:

Resolved position as shape (2,) array.

POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_pomdp module

Continuous LaserTag POMDP Environment Implementation.

This module implements a continuous-space variant of the LaserTag pursuit-evasion POMDP where a robot must navigate to tag an opponent that moves stochastically through continuous 2-D space.

Two environment classes are provided:

State representation:

np.ndarray shape (5,)[robot_x, robot_y, opponent_x, opponent_y, terminal_flag]

Observation:

np.ndarray shape (8,) – noisy 8-direction laser range measurements. Terminal observation is np.full(8, -1.0).

Classes:

ContinuousLaserTagStateTransitionModel: State transition model. ContinuousLaserTagObservationModel: Observation model. ContinuousLaserTagPOMDP: Continuous-action environment. ContinuousLaserTagPOMDPDiscreteActions: Discrete-action variant.

class POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_pomdp.ContinuousLaserTagObservationModel(next_state, action, measurement_noise, walls, grid_size, opponent_radius)[source]

Bases: ObservationModel

Observation model for the Continuous LaserTag POMDP.

Provides 8-direction laser range measurements with Gaussian noise.

Example

>>> import numpy as np
>>> np.random.seed(42)
>>> state = np.array([3.0, 3.0, 8.0, 5.0, 0.0])
>>> walls = np.empty((0, 4))
>>> grid_size = np.array([11.0, 7.0])
>>> model = ContinuousLaserTagObservationModel(
...     next_state=state, action=np.array([1.0, 0.0, 0.0]),
...     measurement_noise=1.0, walls=walls,
...     grid_size=grid_size, opponent_radius=0.3,
... )
>>> obs = model.sample(n_samples=2)
>>> len(obs)
2
Parameters:
  • next_state (np.ndarray)

  • action (np.ndarray)

  • measurement_noise (float)

  • walls (np.ndarray)

  • grid_size (np.ndarray)

  • opponent_radius (float)

probability(values)[source]

Calculate observation probabilities for given values.

Parameters:

values (List[Any]) – List of observation values to calculate probabilities for

Return type:

ndarray

Returns:

Array of probabilities corresponding to the input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample observations from the observation model.

Parameters:

n_samples (int) – Number of observation samples to generate. Defaults to 1.

Return type:

List[ndarray]

Returns:

List of sampled observations of length n_samples.

Note

Subclasses must implement this method according to their specific observation generation logic.

class POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_pomdp.ContinuousLaserTagPOMDP(discount_factor, name='ContinuousLaserTagPOMDP', grid_size=(11.0, 7.0), walls=None, robot_radius=0.3, opponent_radius=0.3, tag_radius=0.5, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, robot_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), opponent_transition_cov_matrix=array([[0.05, 0.], [0., 0.05]]), pursuit_speed=0.6, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None)[source]

Bases: Environment

Continuous LaserTag POMDP with continuous [dx, dy, tag_flag] actions.

A pursuit-evasion problem in continuous 2-D space where a robot must navigate to tag an opponent. The robot receives noisy 8-direction laser range observations.

Example

>>> import numpy as np
>>> np.random.seed(42)
>>>
>>> # Initialize environment
>>> env = ContinuousLaserTagPOMDP(discount_factor=0.95)
>>>
>>> # Get initial state
>>> initial_state = env.initial_state_dist().sample()[0]
>>>
>>> # Sample complete step
>>> action = np.array([1.0, 0.0, 0.0])
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
  • discount_factor (float)

  • name (str)

  • grid_size (Tuple[float, float])

  • walls (Optional[List[Tuple[float, float, float, float]]])

  • robot_radius (float)

  • opponent_radius (float)

  • tag_radius (float)

  • tag_reward (float)

  • tag_penalty (float)

  • step_cost (float)

  • measurement_noise (float)

  • robot_transition_cov_matrix (np.ndarray)

  • opponent_transition_cov_matrix (np.ndarray)

  • pursuit_speed (float)

  • dangerous_areas (Optional[List[Tuple[float, float]]])

  • dangerous_area_radius (float)

  • dangerous_area_penalty (float)

  • output_dir (Optional[Path])

  • debug (bool)

  • use_queue_logger (bool)

  • initial_state (Optional[np.ndarray])

cache_visualization(history, cache_path)[source]

Cache visualization data for an episode history.

This method can be overridden by subclasses to provide environment-specific visualization caching capabilities.

Parameters:
  • history (List[StepData]) – List of step data from an episode

  • cache_path (Path) – Path where visualization data should be cached

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics from episode histories.

This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.

Parameters:

histories (List[History]) – List of episode histories to analyze

Return type:

List[MetricValue]

Returns:

List of computed metrics with confidence intervals

get_metric_names()[source]

Get names of environment-specific metrics.

This method returns the names of custom metrics that this environment computes in the compute_metrics() method. It enables users to discover what metrics are available for hyperparameter optimization.

Return type:

List[str]

Returns:

List of metric names that this environment produces. Default implementation returns empty list for environments without custom metrics.

Note

Subclasses that override compute_metrics() should also override this method to return the names of metrics they produce. Use an Enum to ensure consistency between the names returned here and the names used in compute_metrics().

property grid_size: ndarray
initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (Any) – First observation to compare

  • observation2 (Any) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (ndarray) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (ndarray) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (ndarray) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (ndarray) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (ndarray) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

property walls: ndarray
class POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_pomdp.ContinuousLaserTagPOMDPDiscreteActions(discount_factor, name='ContinuousLaserTagPOMDPDiscreteActions', grid_size=(11.0, 7.0), walls=None, robot_radius=0.3, opponent_radius=0.3, tag_radius=0.5, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, robot_transition_cov_matrix=array([[0.1, 0.], [0., 0.1]]), opponent_transition_cov_matrix=array([[0.05, 0.], [0., 0.05]]), pursuit_speed=0.6, dangerous_areas=None, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None)[source]

Bases: ContinuousLaserTagPOMDP, DiscreteActionsEnvironment

Continuous LaserTag POMDP with discrete string actions.

Actions: "up", "down", "right", "left", "tag".

Example

>>> import numpy as np
>>> np.random.seed(42)
>>>
>>> env = ContinuousLaserTagPOMDPDiscreteActions(discount_factor=0.95)
>>>
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> env.is_terminal(initial_state)
False
Parameters:
  • discount_factor (float)

  • name (str)

  • grid_size (Tuple[float, float])

  • walls (Optional[List[Tuple[float, float, float, float]]])

  • robot_radius (float)

  • opponent_radius (float)

  • tag_radius (float)

  • tag_reward (float)

  • tag_penalty (float)

  • step_cost (float)

  • measurement_noise (float)

  • robot_transition_cov_matrix (np.ndarray)

  • opponent_transition_cov_matrix (np.ndarray)

  • pursuit_speed (float)

  • dangerous_areas (Optional[List[Tuple[float, float]]])

  • dangerous_area_radius (float)

  • dangerous_area_penalty (float)

  • output_dir (Optional[Path])

  • debug (bool)

  • use_queue_logger (bool)

  • initial_state (Optional[np.ndarray])

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[str]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (Any) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (Any) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (Any) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_pomdp.ContinuousLaserTagPOMDPMetrics(*values)[source]

Bases: Enum

Metric names for Continuous LaserTag POMDP.

AVERAGE_ALL_DANGEROUS_ENCOUNTERS = 'average_all_dangerous_encounters'
AVERAGE_DANGEROUS_AREA_STEPS = 'average_dangerous_area_steps'
AVERAGE_EPISODE_LENGTH = 'average_episode_length'
AVERAGE_FAILED_TAG_ATTEMPTS = 'average_failed_tag_attempts'
AVERAGE_WALL_COLLISIONS = 'average_wall_collisions'
GOAL_REACHING_RATE = 'goal_reaching_rate'
TAG_SUCCESS_RATE = 'tag_success_rate'
class POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_pomdp.ContinuousLaserTagStateTransitionModel(state, action, robot_transition_dist, opponent_transition_dist, pursuit_speed, walls, grid_size, robot_radius, opponent_radius, tag_radius)[source]

Bases: StateTransitionModel

State transition model for the Continuous LaserTag POMDP.

Robot movement: next_pos = pos + action[:2] + noise where noise is sampled from a 2-D Gaussian. Opponent pursues the robot stochastically.

Example

>>> import numpy as np
>>> np.random.seed(42)
>>> from POMDPPlanners.utils.multivariate_normal import (
...     CovarianceParameterizedMultivariateNormal,
... )
>>> state = np.array([3.0, 3.0, 8.0, 5.0, 0.0])
>>> action = np.array([1.0, 0.0, 0.0])
>>> robot_dist = CovarianceParameterizedMultivariateNormal(np.eye(2) * 0.1)
>>> opponent_dist = CovarianceParameterizedMultivariateNormal(np.eye(2) * 0.05)
>>> walls = np.empty((0, 4))
>>> grid_size = np.array([11.0, 7.0])
>>> model = ContinuousLaserTagStateTransitionModel(
...     state=state, action=action,
...     robot_transition_dist=robot_dist,
...     opponent_transition_dist=opponent_dist,
...     pursuit_speed=0.6,
...     walls=walls, grid_size=grid_size,
...     robot_radius=0.3, opponent_radius=0.3, tag_radius=0.5,
... )
>>> samples = model.sample(n_samples=3)
>>> len(samples)
3
Parameters:
probability(values)[source]

Calculate transition probabilities for given next states.

Parameters:

values (List[Any]) – List of next state values to calculate probabilities for

Return type:

ndarray

Returns:

Array of transition probabilities corresponding to the input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample next states from the transition model.

Parameters:

n_samples (int) – Number of next state samples to generate. Defaults to 1.

Return type:

List[ndarray]

Returns:

List of sampled next states of length n_samples.

Note

Subclasses must implement this method according to their specific state transition dynamics.

POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_visualizer module

Continuous LaserTag POMDP Visualization Module.

This module provides visualization for the continuous-space LaserTag environment, creating animated GIF visualizations of episodes.

class POMDPPlanners.environments.laser_tag_pomdp.continuous_laser_tag_visualizer.ContinuousLaserTagVisualizer(grid_size, walls, robot_radius, opponent_radius, dangerous_areas, dangerous_area_radius)[source]

Bases: object

Handles visualization for the Continuous LaserTag POMDP.

Creates animated GIF visualizations showing robot and opponent movement as rendered icons, rectangular walls, laser rays, belief particles, and tag indicators. The robot is shown as a red humanoid and the opponent as a blue wheeled rover.

Parameters:
grid_size

Arena dimensions (width, height) as ndarray.

walls

Shape (M, 4) wall AABB array.

robot_radius

Robot body radius.

opponent_radius

Opponent body radius.

dangerous_areas

Dangerous area centers as (x, y) tuples.

dangerous_area_radius

Radius of dangerous areas.

create_visualization(history, cache_path)[source]

Create animated GIF visualization of a Continuous LaserTag episode.

Parameters:
  • history (List[StepData]) – Episode step data list.

  • cache_path (Path) – Path to save the GIF.

Raises:
Return type:

None

POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp module

LaserTag POMDP Environment Implementation.

This module implements the LaserTag problem, a pursuit-evasion POMDP environment where an agent must navigate a grid to tag an opponent that moves stochastically. The agent has noisy observations of the opponent’s location.

The LaserTag problem features: - A grid-based environment (default 7x11) with optional walls - Robot and opponent moving on discrete grid cells - 5 possible actions: North, South, East, West, Tag - 8-directional laser range measurements with Gaussian noise - Positive reward for successful tagging, negative reward for failed tag attempts - Step cost for each movement action - Opponent moves with 0.4 prob toward robot in x-dir, 0.4 prob toward robot in y-dir, 0.2 prob stay - When aligned on an axis, the 0.4 budget is split equally (0.2/0.2) between both directions

Classes:

LaserTagState: State representation with robot and opponent positions LaserTagStateTransition: State transition model for robot and opponent movement LaserTagObservation: Observation model with noisy opponent position measurements LaserTagPOMDP: Main environment class implementing the LaserTag problem

class POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp.LaserTagObservation(next_state, action, measurement_noise=1.0, floor_shape=(7, 11), walls=None)[source]

Bases: ObservationModel

Observation model for LaserTag POMDP.

Provides 8-directional laser range measurements from the robot’s position. Each measurement represents the number of clear cells in that direction before hitting a wall or boundary, with Gaussian noise.

Parameters:
next_state

The state after action execution as numpy array (shape (5,))

action

The action that was taken

measurement_noise

Standard deviation of Gaussian measurement noise

floor_shape

Grid dimensions as (rows, cols)

walls

Set of wall positions

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> state = np.array([3.0, 5.0, 2.0, 4.0, 0.0])  # Robot at (3,5), opponent at (2,4)
>>> obs_model = LaserTagObservation(
...     next_state=state,
...     action=0,
...     measurement_noise=1.0,
...     floor_shape=(7, 11),
...     walls=set()
... )
>>> observations = obs_model.sample(n_samples=3)
>>> probabilities = obs_model.probability(observations)
probability(values)[source]

Calculate observation probabilities for given values.

Return type:

ndarray

Parameters:

values (List[Any])

sample(n_samples=1)[source]

Sample observations from the observation model.

Return type:

List[Tuple[float, ...]]

Returns:

List of 8-tuple observations representing laser measurements in 8 directions

Parameters:

n_samples (int)

class POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp.LaserTagPOMDP(discount_factor, name='LaserTagPOMDP', floor_shape=(11, 7), walls={(1, 2), (3, 0), (3, 4), (5, 0), (6, 4), (9, 1), (9, 4), (10, 6)}, tag_reward=10.0, tag_penalty=10.0, step_cost=1.0, measurement_noise=1.0, dangerous_areas={(2, 5), (5, 3), (7, 1)}, dangerous_area_radius=1.0, dangerous_area_penalty=5.0, output_dir=None, debug=False, use_queue_logger=False, initial_state=None, transition_error_prob=0.0)[source]

Bases: DiscreteActionsEnvironment

LaserTag POMDP environment implementation.

This is a pursuit-evasion problem where a robot must navigate a grid to tag an opponent. The robot receives noisy observations of the opponent’s position and must decide when and where to attempt tagging.

Problem Structure: - States: numpy array [robot_row, robot_col, opp_row, opp_col, terminal] - Actions: North(0), South(1), East(2), West(3), Tag(4) - Observations: 8-directional laser measurements (N,NE,E,SE,S,SW,W,NW) - Rewards: Tag success(+10), Tag failure(-10), Movement(-1)

Parameters:
floor_shape

Grid dimensions as (rows, cols)

walls

Set of wall positions as (row, col) tuples

tag_reward

Reward for successful tagging

tag_penalty

Penalty for unsuccessful tagging

step_cost

Cost per movement action

measurement_noise

Standard deviation of observation noise

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = LaserTagPOMDP(discount_factor=0.95)
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
cache_visualization(history, cache_path)[source]

Cache visualization of the LaserTag episode as an animated GIF.

Creates an animated visualization showing: - Robot movement (red circle) - Opponent movement (blue circle) - Walls (black squares) - Dangerous areas (red circles) - Action arrows showing robot’s intended movement - Laser measurements (green rays from robot position) - Belief particles (if available) showing robot’s belief about opponent location - Grid boundaries and coordinate system

Parameters:
  • history (List[StepData]) – The history of states, actions, and observations from an episode

  • cache_path (Path) – Path where to save the visualization GIF

Raises:
  • ValueError – If history is empty or contains invalid data

  • TypeError – If cache_path is not a Path object or doesn’t end with .gif

Return type:

None

compute_metrics(histories)[source]

Compute LaserTag POMDP specific metrics from simulation histories.

Return type:

List[MetricValue]

Parameters:

histories (List[History])

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[int]

get_metric_names()[source]

Get names of LaserTag POMDP specific metrics.

Returns:

tag_success_rate, average_episode_length, average_failed_tag_attempts, average_obstacle_collisions, average_dangerous_area_steps, and average_all_dangerous_encounters

Return type:

List[str]

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Observations are 8-dimensional laser measurements or terminal observations.

Return type:

bool

Parameters:
  • observation1 (Any)

  • observation2 (Any)

is_terminal(state)[source]

Check if a state is terminal.

Return type:

bool

Parameters:

state (ndarray)

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Return type:

ObservationModel

Parameters:
reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Return type:

float

Parameters:
state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Return type:

StateTransitionModel

Parameters:
class POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp.LaserTagPOMDPMetrics(*values)[source]

Bases: Enum

Metric names for LaserTag POMDP environment.

AVERAGE_ALL_DANGEROUS_ENCOUNTERS = 'average_all_dangerous_encounters'
AVERAGE_DANGEROUS_AREA_STEPS = 'average_dangerous_area_steps'
AVERAGE_EPISODE_LENGTH = 'average_episode_length'
AVERAGE_FAILED_TAG_ATTEMPTS = 'average_failed_tag_attempts'
AVERAGE_OBSTACLE_COLLISIONS = 'average_obstacle_collisions'
GOAL_REACHING_RATE = 'goal_reaching_rate'
TAG_SUCCESS_RATE = 'tag_success_rate'
class POMDPPlanners.environments.laser_tag_pomdp.laser_tag_pomdp.LaserTagStateTransition(state, action, action_directions, floor_shape, walls, transition_error_prob=0.0)[source]

Bases: StateTransitionModel

State transition model for LaserTag POMDP.

Handles robot movement (deterministic based on action) and opponent movement (probabilistic, with tendency to move toward robot’s position).

Parameters:
state

Current state as numpy array (shape (5,))

action

Action to be executed (0=North, 1=South, 2=East, 3=West, 4=Tag)

floor_shape

Tuple of (rows, cols) for grid dimensions

walls

Set of wall positions

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> state = np.array([3.0, 5.0, 2.0, 4.0, 0.0])  # Robot at (3,5), opponent at (2,4)
>>> action_directions = {
...     0: (-1, 0),  # North (up)
...     1: (1, 0),   # South (down)
...     2: (0, 1),   # East (right)
...     3: (0, -1),  # West (left)
...     4: (0, 0),   # Tag (no movement)
... }
>>> transition = LaserTagStateTransition(
...     state=state,
...     action=0,  # North
...     action_directions=action_directions,
...     floor_shape=(7, 11),
...     walls=set()
... )
>>> next_states = transition.sample(n_samples=5)
>>> probabilities = transition.probability(next_states)
action_directions: Dict[int, Tuple[int, int]]
floor_shape: Tuple[int, int]
probability(values)[source]

Calculate transition probabilities for given next states.

Return type:

ndarray

Parameters:

values (List[Any])

sample(n_samples=1)[source]

Sample next states from the transition model.

Return type:

List[ndarray]

Parameters:

n_samples (int)

walls: Set[Tuple[int, int]]

POMDPPlanners.environments.laser_tag_pomdp.laser_tag_visualizer module

LaserTag POMDP Visualization Module.

This module provides visualization functionality for LaserTag POMDP environments, creating animated GIF visualizations of episodes.

class POMDPPlanners.environments.laser_tag_pomdp.laser_tag_visualizer.LaserTagVisualizer(floor_shape, walls, dangerous_areas, dangerous_area_radius)[source]

Bases: object

Handles visualization for LaserTag POMDP environments.

Creates animated GIF visualizations showing robot movement, opponent movement, walls, laser measurements, belief particles, and action indicators.

Parameters:
floor_shape

Grid dimensions as (rows, cols)

walls

Set of wall positions as (row, col) tuples

dangerous_areas

List of dangerous area center positions

dangerous_area_radius

Radius around dangerous area centers

create_visualization(history, cache_path)[source]

Create animated GIF visualization of a LaserTag episode.

Creates an animated visualization showing: - Robot movement (red circle with path trail) - Opponent movement (blue circle with path trail) - Walls (black squares) - Dangerous areas (red circles) - Action arrows showing robot’s intended movement - Laser measurements (green rays from robot position) - Belief particles (if available) showing robot’s belief about opponent location - Grid boundaries and coordinate system - Step counter and action labels

Parameters:
  • history (List[StepData]) – The history of states, actions, and observations from an episode

  • cache_path (Path) – Path where to save the visualization GIF

Raises:
  • ValueError – If history is empty or contains invalid data

  • TypeError – If cache_path is not a Path object or doesn’t end with .gif

Return type:

None