POMDPPlanners.environments.safety_ant_velocity_pomdp package

Safety Ant Velocity POMDP Environment Package.

This package implements a safety-critical velocity control task where an agent must navigate while avoiding unsafe velocities.

class POMDPPlanners.environments.safety_ant_velocity_pomdp.SafeAntVelocityObservation(next_state, action, position_noise=0.1, velocity_noise=0.2)[source]

Bases: ObservationModel

Noisy observation model for Safety Ant Velocity POMDP.

This model adds Gaussian noise to both position and velocity measurements, creating partial observability that makes velocity estimation challenging. Higher noise in velocity measurements reflects the difficulty of measuring velocity precisely in practice.

Parameters:
next_state

True state after action execution

action

Action that was taken (not used in observation generation)

position_noise

Standard deviation of Gaussian noise for position

velocity_noise

Standard deviation of Gaussian noise for velocity

position

True position [x, y]

velocity

True velocity [vx, vy]

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> # True state after physics simulation
>>> true_state = np.array([0.6, -0.1, 1.2, 0.8])  # [x, y, vx, vy]
>>> action = 2
>>>
>>> # Create observation model
>>> obs_model = SafeAntVelocityObservation(
...     next_state=true_state,
...     action=action,
...     position_noise=0.1,
...     velocity_noise=0.2
... )
>>>
>>> # Sample noisy observation
>>> observation = obs_model.sample()[0]
>>> # Returns [noisy_x, noisy_y, noisy_vx, noisy_vy]
>>> # Position noise: ±0.1, velocity noise: ±0.2
>>>
>>> # Calculate observation probability
>>> prob = obs_model.probability([observation])
probability(values)[source]

Calculate observation probabilities for given values.

Parameters:

values (List[Any]) – List of observation values to calculate probabilities for

Return type:

ndarray

Returns:

Array of probabilities corresponding to the input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample observations from the observation model.

Parameters:

n_samples (int) – Number of observation samples to generate. Defaults to 1.

Return type:

List[Any]

Returns:

List of sampled observations of length n_samples.

Note

Subclasses must implement this method according to their specific observation generation logic.

class POMDPPlanners.environments.safety_ant_velocity_pomdp.SafeAntVelocityPOMDP(discount_factor, safe_velocity_threshold=2.0, max_force=1.0, dt=0.1, mass=1.0, damping=0.1, position_noise=0.1, velocity_noise=0.2, safety_violation_penalty=-100.0, movement_reward_scale=1.0, name='SafeVelocityPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

Safety-critical velocity control task formulated as a POMDP.

This environment presents a safety-critical control problem where an agent must navigate while keeping velocity below a safety threshold. The challenge comes from balancing exploration rewards with safety constraints under noisy velocity observations.

Problem Structure: - State: [position_x, position_y, velocity_x, velocity_y] (continuous) - Actions: [0=no force, 1=small, 2=medium, 3=large force] (discrete) - Observations: Noisy position and velocity measurements (continuous) - Rewards: Movement reward - safety violation penalty (if unsafe) - Safety constraint: velocity magnitude ≤ safe_velocity_threshold - Termination: Velocity exceeds 1.5x safety threshold

Safety Features: - Tracks safety and critical violation rates - Heavy penalties for constraint violations - Configurable safety thresholds and penalties - Physics simulation with uncertainty in force direction

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = SafeAntVelocityPOMDP(discount_factor=0.99)
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
cache_visualization(history, cache_path)[source]

Cache animated visualization of the safety ant velocity episode.

Creates an animated GIF showing the ant’s movement trajectory with velocity vectors, safety zones, force applications, and safety constraint violations.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif

  • TypeError – If cache_path is not a Path object

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics from episode histories.

This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.

Parameters:

histories (List[History]) – List of episode histories to analyze

Return type:

List[MetricValue]

Returns:

List of computed metrics with confidence intervals

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[int]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

get_metric_names()[source]

Get names of Safety Ant Velocity POMDP specific metrics.

Returns:

safety_violation_rate, critical_violation_rate, total_safety_violations, and total_critical_violations

Return type:

List[str]

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (ndarray) – First observation to compare

  • observation2 (ndarray) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (ndarray) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (int) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (int) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (int) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

sample_next_step(state, action)[source]

Sample a complete state transition step.

This convenience method combines state transition, observation generation, and reward calculation in a single operation.

Parameters:
  • state (ndarray) – Current state

  • action (int) – Action to execute

Returns:

  • next_state: Sampled next state

  • next_observation: Sampled observation

  • reward: Immediate reward

Return type:

Tuple[ndarray, ndarray, float]

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (int) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.safety_ant_velocity_pomdp.SafeAntVelocityStateTransition(state, action, dt=0.1, mass=1.0, damping=0.1, max_force=1.0)[source]

Bases: StateTransitionModel

Physics-based state transition model for Safety Ant Velocity POMDP.

This model simulates simplified physics with force application, damping, and random force directions. The agent can choose different force magnitudes but cannot control the direction, creating uncertainty in the outcomes.

Physics equations: - acceleration = (force - damping * velocity) / mass - velocity += acceleration * dt - position += velocity * dt

Parameters:
state

Current state [position_x, position_y, velocity_x, velocity_y]

action

Force magnitude index (0=no force, 1=small, 2=medium, 3=large)

dt

Time step for physics integration

mass

Mass of the agent (affects acceleration)

damping

Damping coefficient (opposes velocity)

max_force

Maximum force magnitude

force_scales

Force scaling factors for each action [0.0, 0.33, 0.67, 1.0]

position

Current position [x, y]

velocity

Current velocity [vx, vy]

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> # Define current state [pos_x, pos_y, vel_x, vel_y]
>>> state = np.array([0.5, -0.2, 1.0, 0.5])
>>> action = 2  # Apply medium force
>>>
>>> # Create transition model
>>> transition = SafeAntVelocityStateTransition(
...     state=state,
...     action=action,
...     dt=0.1,
...     mass=1.0,
...     damping=0.1,
...     max_force=1.0
... )
>>>
>>> # Simulate physics step with random force direction
>>> next_state = transition.sample()[0]
>>> # Returns new [pos_x, pos_y, vel_x, vel_y] after physics
>>> new_pos = next_state[:2]
>>> new_vel = next_state[2:4]
probability(values)[source]

Calculate transition probabilities for given next states.

Since the force direction is uniformly random over [-π, π], the probability distribution is continuous and depends on the distance from expected dynamics. We approximate this using a mixture of Gaussians representing the random force direction uncertainty.

Parameters:

values (List[Any]) – List of potential next states

Return type:

ndarray

Returns:

Array of (unnormalized) probabilities for each state

sample(n_samples=1)[source]

Sample next states from the transition model.

Parameters:

n_samples (int) – Number of next state samples to generate. Defaults to 1.

Return type:

List[Any]

Returns:

List of sampled next states of length n_samples.

Note

Subclasses must implement this method according to their specific state transition dynamics.

class POMDPPlanners.environments.safety_ant_velocity_pomdp.SafeAntVelocityVisualizer(env)[source]

Bases: object

Visualizer for Safety Ant Velocity POMDP episodes.

This class creates animated visualizations showing the ant’s movement trajectory, velocity vectors, force applications, safety zones, and safety constraint violations.

env

The SafeAntVelocityPOMDP environment instance

safe_velocity_threshold

Maximum safe velocity magnitude

max_force

Maximum force that can be applied

create_animation(history, cache_path)[source]

Create animated visualization of the safety ant velocity episode.

Creates an animated GIF showing the ant’s movement trajectory with velocity vectors, safety zones, force applications, and safety constraint violations.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif

  • TypeError – If cache_path is not a Path object

Return type:

None

Subpackages

Submodules

POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp module

Safety Ant Velocity POMDP Environment Implementation.

This module implements a safety-critical velocity control task where an agent must navigate while avoiding unsafe velocities. The challenge is balancing exploration and movement rewards with safety constraints under partial observability.

The Safety Ant Velocity POMDP features: - Continuous 4D state space: [position_x, position_y, velocity_x, velocity_y] - Discrete action space: [0 (no force), 1 (small), 2 (medium), 3 (large force)] - Physics-based dynamics with force application and damping - Noisy observations of both position and velocity - Safety constraints on maximum velocity magnitude - Safety-focused metrics tracking violation rates

Key aspects: - Rewards encourage movement but heavily penalize safety violations - Episode terminates if velocity becomes critically high - Force direction is randomized to create uncertainty - Safety metrics track violation rates over episodes

Classes:

SafeAntVelocityStateTransition: Physics simulation with force control SafeAntVelocityObservation: Noisy position and velocity observations SafeAntVelocityPOMDP: Main safety-critical velocity control environment

class POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp.SafeAntVelocityObservation(next_state, action, position_noise=0.1, velocity_noise=0.2)[source]

Bases: ObservationModel

Noisy observation model for Safety Ant Velocity POMDP.

This model adds Gaussian noise to both position and velocity measurements, creating partial observability that makes velocity estimation challenging. Higher noise in velocity measurements reflects the difficulty of measuring velocity precisely in practice.

Parameters:
next_state

True state after action execution

action

Action that was taken (not used in observation generation)

position_noise

Standard deviation of Gaussian noise for position

velocity_noise

Standard deviation of Gaussian noise for velocity

position

True position [x, y]

velocity

True velocity [vx, vy]

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> # True state after physics simulation
>>> true_state = np.array([0.6, -0.1, 1.2, 0.8])  # [x, y, vx, vy]
>>> action = 2
>>>
>>> # Create observation model
>>> obs_model = SafeAntVelocityObservation(
...     next_state=true_state,
...     action=action,
...     position_noise=0.1,
...     velocity_noise=0.2
... )
>>>
>>> # Sample noisy observation
>>> observation = obs_model.sample()[0]
>>> # Returns [noisy_x, noisy_y, noisy_vx, noisy_vy]
>>> # Position noise: ±0.1, velocity noise: ±0.2
>>>
>>> # Calculate observation probability
>>> prob = obs_model.probability([observation])
probability(values)[source]

Calculate observation probabilities for given values.

Parameters:

values (List[Any]) – List of observation values to calculate probabilities for

Return type:

ndarray

Returns:

Array of probabilities corresponding to the input values

Raises:

NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.

sample(n_samples=1)[source]

Sample observations from the observation model.

Parameters:

n_samples (int) – Number of observation samples to generate. Defaults to 1.

Return type:

List[Any]

Returns:

List of sampled observations of length n_samples.

Note

Subclasses must implement this method according to their specific observation generation logic.

class POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp.SafeAntVelocityPOMDP(discount_factor, safe_velocity_threshold=2.0, max_force=1.0, dt=0.1, mass=1.0, damping=0.1, position_noise=0.1, velocity_noise=0.2, safety_violation_penalty=-100.0, movement_reward_scale=1.0, name='SafeVelocityPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]

Bases: DiscreteActionsEnvironment

Safety-critical velocity control task formulated as a POMDP.

This environment presents a safety-critical control problem where an agent must navigate while keeping velocity below a safety threshold. The challenge comes from balancing exploration rewards with safety constraints under noisy velocity observations.

Problem Structure: - State: [position_x, position_y, velocity_x, velocity_y] (continuous) - Actions: [0=no force, 1=small, 2=medium, 3=large force] (discrete) - Observations: Noisy position and velocity measurements (continuous) - Rewards: Movement reward - safety violation penalty (if unsafe) - Safety constraint: velocity magnitude ≤ safe_velocity_threshold - Termination: Velocity exceeds 1.5x safety threshold

Safety Features: - Tracks safety and critical violation rates - Heavy penalties for constraint violations - Configurable safety thresholds and penalties - Physics simulation with uncertainty in force direction

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Initialize environment
>>> env = SafeAntVelocityPOMDP(discount_factor=0.99)
>>>
>>> # Get initial state and actions
>>> initial_state = env.initial_state_dist().sample()[0]
>>> actions = env.get_actions()
>>>
>>> # Sample complete step using convenience method
>>> action = actions[0]
>>> next_state, observation, reward = env.sample_next_step(initial_state, action)
>>>
>>> # Check terminal condition
>>> env.is_terminal(initial_state)
False
Parameters:
cache_visualization(history, cache_path)[source]

Cache animated visualization of the safety ant velocity episode.

Creates an animated GIF showing the ant’s movement trajectory with velocity vectors, safety zones, force applications, and safety constraint violations.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif

  • TypeError – If cache_path is not a Path object

Return type:

None

compute_metrics(histories)[source]

Compute environment-specific metrics from episode histories.

This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.

Parameters:

histories (List[History]) – List of episode histories to analyze

Return type:

List[MetricValue]

Returns:

List of computed metrics with confidence intervals

get_actions()[source]

Get all possible actions in the discrete action space.

Return type:

List[int]

Returns:

List containing all valid actions that can be executed

Note

Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.

get_metric_names()[source]

Get names of Safety Ant Velocity POMDP specific metrics.

Returns:

safety_violation_rate, critical_violation_rate, total_safety_violations, and total_critical_violations

Return type:

List[str]

initial_observation_dist()[source]

Get the initial observation distribution.

Return type:

Distribution

Returns:

Distribution over initial observations

Note

Subclasses must implement this method to define initial observations.

initial_state_dist()[source]

Get the initial state distribution.

Return type:

Distribution

Returns:

Distribution over initial states

Note

Subclasses must implement this method to define the starting distribution.

is_equal_observation(observation1, observation2)[source]

Check if two observations are equal.

Parameters:
  • observation1 (ndarray) – First observation to compare

  • observation2 (ndarray) – Second observation to compare

Return type:

bool

Returns:

True if observations are considered equal, False otherwise

Note

Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.

is_terminal(state)[source]

Check if a state is terminal.

Parameters:

state (ndarray) – State to check for terminal condition

Return type:

bool

Returns:

True if the state is terminal, False otherwise

Note

Subclasses must implement this method to define terminal conditions.

observation_model(next_state, action)[source]

Get the observation model for a given next state and action.

Parameters:
  • next_state (ndarray) – The resulting state after taking an action

  • action (int) – The action that was executed

Return type:

ObservationModel

Returns:

Observation model that can sample observations

Note

Subclasses must implement this method to define observation generation.

reward(state, action)[source]

Calculate the immediate reward for a state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (int) – Action executed from the state

Return type:

float

Returns:

Immediate reward value

Note

Subclasses must implement this method to define reward structure.

reward_batch(states, action)[source]

Calculate rewards for a batch of states given a single action.

Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.

Parameters:
  • states (Union[ndarray, Sequence[Any]]) – Sequence of states of length N.

  • action (int) – Action executed from each state.

Return type:

ndarray

Returns:

1-D array of reward values with shape (N,).

sample_next_step(state, action)[source]

Sample a complete state transition step.

This convenience method combines state transition, observation generation, and reward calculation in a single operation.

Parameters:
  • state (ndarray) – Current state

  • action (int) – Action to execute

Returns:

  • next_state: Sampled next state

  • next_observation: Sampled observation

  • reward: Immediate reward

Return type:

Tuple[ndarray, ndarray, float]

state_transition_model(state, action)[source]

Get the state transition model for a given state-action pair.

Parameters:
  • state (ndarray) – Current state

  • action (int) – Action to be executed

Return type:

StateTransitionModel

Returns:

State transition model that can sample next states

Note

Subclasses must implement this method to define state dynamics.

class POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp.SafeAntVelocityPOMDPMetrics(*values)[source]

Bases: Enum

Metric names for Safety Ant Velocity POMDP environment.

CRITICAL_VIOLATION_RATE = 'critical_violation_rate'
SAFETY_VIOLATION_RATE = 'safety_violation_rate'
TOTAL_CRITICAL_VIOLATIONS = 'total_critical_violations'
TOTAL_SAFETY_VIOLATIONS = 'total_safety_violations'
class POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp.SafeAntVelocityStateTransition(state, action, dt=0.1, mass=1.0, damping=0.1, max_force=1.0)[source]

Bases: StateTransitionModel

Physics-based state transition model for Safety Ant Velocity POMDP.

This model simulates simplified physics with force application, damping, and random force directions. The agent can choose different force magnitudes but cannot control the direction, creating uncertainty in the outcomes.

Physics equations: - acceleration = (force - damping * velocity) / mass - velocity += acceleration * dt - position += velocity * dt

Parameters:
state

Current state [position_x, position_y, velocity_x, velocity_y]

action

Force magnitude index (0=no force, 1=small, 2=medium, 3=large)

dt

Time step for physics integration

mass

Mass of the agent (affects acceleration)

damping

Damping coefficient (opposes velocity)

max_force

Maximum force magnitude

force_scales

Force scaling factors for each action [0.0, 0.33, 0.67, 1.0]

position

Current position [x, y]

velocity

Current velocity [vx, vy]

Example

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> # Define current state [pos_x, pos_y, vel_x, vel_y]
>>> state = np.array([0.5, -0.2, 1.0, 0.5])
>>> action = 2  # Apply medium force
>>>
>>> # Create transition model
>>> transition = SafeAntVelocityStateTransition(
...     state=state,
...     action=action,
...     dt=0.1,
...     mass=1.0,
...     damping=0.1,
...     max_force=1.0
... )
>>>
>>> # Simulate physics step with random force direction
>>> next_state = transition.sample()[0]
>>> # Returns new [pos_x, pos_y, vel_x, vel_y] after physics
>>> new_pos = next_state[:2]
>>> new_vel = next_state[2:4]
probability(values)[source]

Calculate transition probabilities for given next states.

Since the force direction is uniformly random over [-π, π], the probability distribution is continuous and depends on the distance from expected dynamics. We approximate this using a mixture of Gaussians representing the random force direction uncertainty.

Parameters:

values (List[Any]) – List of potential next states

Return type:

ndarray

Returns:

Array of (unnormalized) probabilities for each state

sample(n_samples=1)[source]

Sample next states from the transition model.

Parameters:

n_samples (int) – Number of next state samples to generate. Defaults to 1.

Return type:

List[Any]

Returns:

List of sampled next states of length n_samples.

Note

Subclasses must implement this method according to their specific state transition dynamics.

POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_visualizer module

Visualization utilities for Safety Ant Velocity POMDP Environment.

This module provides visualization capabilities for the Safety Ant Velocity POMDP, creating animated GIF visualizations of episode trajectories with safety zones, velocity vectors, and safety constraint violations.

Classes:

SafeAntVelocityVisualizer: Creates animated visualizations of episodes

class POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_visualizer.SafeAntVelocityVisualizer(env)[source]

Bases: object

Visualizer for Safety Ant Velocity POMDP episodes.

This class creates animated visualizations showing the ant’s movement trajectory, velocity vectors, force applications, safety zones, and safety constraint violations.

env

The SafeAntVelocityPOMDP environment instance

safe_velocity_threshold

Maximum safe velocity magnitude

max_force

Maximum force that can be applied

create_animation(history, cache_path)[source]

Create animated visualization of the safety ant velocity episode.

Creates an animated GIF showing the ant’s movement trajectory with velocity vectors, safety zones, force applications, and safety constraint violations.

Parameters:
  • history (List[StepData]) – Episode history containing states, actions, and rewards

  • cache_path (Path) – Path where to save the visualization (must end with .gif)

Raises:
  • ValueError – If history is empty or cache_path doesn’t end with .gif

  • TypeError – If cache_path is not a Path object

Return type:

None