POMDPPlanners.environments.safety_ant_velocity_pomdp package
Safety Ant Velocity POMDP Environment Package.
This package implements a safety-critical velocity control task where an agent must navigate while avoiding unsafe velocities.
- class POMDPPlanners.environments.safety_ant_velocity_pomdp.SafeAntVelocityObservation(next_state, action, position_noise=0.1, velocity_noise=0.2)[source]
Bases:
ObservationModelNoisy observation model for Safety Ant Velocity POMDP.
This model adds Gaussian noise to both position and velocity measurements, creating partial observability that makes velocity estimation challenging. Higher noise in velocity measurements reflects the difficulty of measuring velocity precisely in practice.
- next_state
True state after action execution
- action
Action that was taken (not used in observation generation)
- position_noise
Standard deviation of Gaussian noise for position
- velocity_noise
Standard deviation of Gaussian noise for velocity
- position
True position [x, y]
- velocity
True velocity [vx, vy]
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> # True state after physics simulation >>> true_state = np.array([0.6, -0.1, 1.2, 0.8]) # [x, y, vx, vy] >>> action = 2 >>> >>> # Create observation model >>> obs_model = SafeAntVelocityObservation( ... next_state=true_state, ... action=action, ... position_noise=0.1, ... velocity_noise=0.2 ... ) >>> >>> # Sample noisy observation >>> observation = obs_model.sample()[0] >>> # Returns [noisy_x, noisy_y, noisy_vx, noisy_vy] >>> # Position noise: ±0.1, velocity noise: ±0.2 >>> >>> # Calculate observation probability >>> prob = obs_model.probability([observation])
- probability(values)[source]
Calculate observation probabilities for given values.
- Parameters:
values (
List[Any]) – List of observation values to calculate probabilities for- Return type:
- Returns:
Array of probabilities corresponding to the input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- sample(n_samples=1)[source]
Sample observations from the observation model.
- Parameters:
n_samples (
int) – Number of observation samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled observations of length n_samples.
Note
Subclasses must implement this method according to their specific observation generation logic.
- class POMDPPlanners.environments.safety_ant_velocity_pomdp.SafeAntVelocityPOMDP(discount_factor, safe_velocity_threshold=2.0, max_force=1.0, dt=0.1, mass=1.0, damping=0.1, position_noise=0.1, velocity_noise=0.2, safety_violation_penalty=-100.0, movement_reward_scale=1.0, name='SafeVelocityPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
DiscreteActionsEnvironmentSafety-critical velocity control task formulated as a POMDP.
This environment presents a safety-critical control problem where an agent must navigate while keeping velocity below a safety threshold. The challenge comes from balancing exploration rewards with safety constraints under noisy velocity observations.
Problem Structure: - State: [position_x, position_y, velocity_x, velocity_y] (continuous) - Actions: [0=no force, 1=small, 2=medium, 3=large force] (discrete) - Observations: Noisy position and velocity measurements (continuous) - Rewards: Movement reward - safety violation penalty (if unsafe) - Safety constraint: velocity magnitude ≤ safe_velocity_threshold - Termination: Velocity exceeds 1.5x safety threshold
Safety Features: - Tracks safety and critical violation rates - Heavy penalties for constraint violations - Configurable safety thresholds and penalties - Physics simulation with uncertainty in force direction
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = SafeAntVelocityPOMDP(discount_factor=0.99) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- Parameters:
- cache_visualization(history, cache_path)[source]
Cache animated visualization of the safety ant velocity episode.
Creates an animated GIF showing the ant’s movement trajectory with velocity vectors, safety zones, force applications, and safety constraint violations.
- Parameters:
- Raises:
ValueError – If history is empty or cache_path doesn’t end with .gif
TypeError – If cache_path is not a Path object
- Return type:
- compute_metrics(histories)[source]
Compute environment-specific metrics from episode histories.
This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
ndarray) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- sample_next_step(state, action)[source]
Sample a complete state transition step.
This convenience method combines state transition, observation generation, and reward calculation in a single operation.
- class POMDPPlanners.environments.safety_ant_velocity_pomdp.SafeAntVelocityStateTransition(state, action, dt=0.1, mass=1.0, damping=0.1, max_force=1.0)[source]
Bases:
StateTransitionModelPhysics-based state transition model for Safety Ant Velocity POMDP.
This model simulates simplified physics with force application, damping, and random force directions. The agent can choose different force magnitudes but cannot control the direction, creating uncertainty in the outcomes.
Physics equations: - acceleration = (force - damping * velocity) / mass - velocity += acceleration * dt - position += velocity * dt
- state
Current state [position_x, position_y, velocity_x, velocity_y]
- action
Force magnitude index (0=no force, 1=small, 2=medium, 3=large)
- dt
Time step for physics integration
- mass
Mass of the agent (affects acceleration)
- damping
Damping coefficient (opposes velocity)
- max_force
Maximum force magnitude
- force_scales
Force scaling factors for each action [0.0, 0.33, 0.67, 1.0]
- position
Current position [x, y]
- velocity
Current velocity [vx, vy]
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> # Define current state [pos_x, pos_y, vel_x, vel_y] >>> state = np.array([0.5, -0.2, 1.0, 0.5]) >>> action = 2 # Apply medium force >>> >>> # Create transition model >>> transition = SafeAntVelocityStateTransition( ... state=state, ... action=action, ... dt=0.1, ... mass=1.0, ... damping=0.1, ... max_force=1.0 ... ) >>> >>> # Simulate physics step with random force direction >>> next_state = transition.sample()[0] >>> # Returns new [pos_x, pos_y, vel_x, vel_y] after physics >>> new_pos = next_state[:2] >>> new_vel = next_state[2:4]
- probability(values)[source]
Calculate transition probabilities for given next states.
Since the force direction is uniformly random over [-π, π], the probability distribution is continuous and depends on the distance from expected dynamics. We approximate this using a mixture of Gaussians representing the random force direction uncertainty.
- sample(n_samples=1)[source]
Sample next states from the transition model.
- Parameters:
n_samples (
int) – Number of next state samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled next states of length n_samples.
Note
Subclasses must implement this method according to their specific state transition dynamics.
- class POMDPPlanners.environments.safety_ant_velocity_pomdp.SafeAntVelocityVisualizer(env)[source]
Bases:
objectVisualizer for Safety Ant Velocity POMDP episodes.
This class creates animated visualizations showing the ant’s movement trajectory, velocity vectors, force applications, safety zones, and safety constraint violations.
- env
The SafeAntVelocityPOMDP environment instance
- safe_velocity_threshold
Maximum safe velocity magnitude
- max_force
Maximum force that can be applied
- create_animation(history, cache_path)[source]
Create animated visualization of the safety ant velocity episode.
Creates an animated GIF showing the ant’s movement trajectory with velocity vectors, safety zones, force applications, and safety constraint violations.
- Parameters:
- Raises:
ValueError – If history is empty or cache_path doesn’t end with .gif
TypeError – If cache_path is not a Path object
- Return type:
Subpackages
- POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp_beliefs package
SafetyAntVelocityVectorizedUpdaterSafetyAntVelocityVectorizedUpdater.obs_distSafetyAntVelocityVectorizedUpdater.dtSafetyAntVelocityVectorizedUpdater.massSafetyAntVelocityVectorizedUpdater.dampingSafetyAntVelocityVectorizedUpdater.max_forceSafetyAntVelocityVectorizedUpdater.force_scalesSafetyAntVelocityVectorizedUpdater.batch_observation_log_likelihood()SafetyAntVelocityVectorizedUpdater.batch_transition()SafetyAntVelocityVectorizedUpdater.config_idSafetyAntVelocityVectorizedUpdater.from_environment()
create_safety_ant_velocity_belief()- Submodules
- POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp_beliefs.safety_ant_velocity_belief_factory module
- POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp_beliefs.safety_ant_velocity_vectorized_updater module
SafetyAntVelocityVectorizedUpdaterSafetyAntVelocityVectorizedUpdater.obs_distSafetyAntVelocityVectorizedUpdater.dtSafetyAntVelocityVectorizedUpdater.massSafetyAntVelocityVectorizedUpdater.dampingSafetyAntVelocityVectorizedUpdater.max_forceSafetyAntVelocityVectorizedUpdater.force_scalesSafetyAntVelocityVectorizedUpdater.batch_observation_log_likelihood()SafetyAntVelocityVectorizedUpdater.batch_transition()SafetyAntVelocityVectorizedUpdater.config_idSafetyAntVelocityVectorizedUpdater.from_environment()
Submodules
POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp module
Safety Ant Velocity POMDP Environment Implementation.
This module implements a safety-critical velocity control task where an agent must navigate while avoiding unsafe velocities. The challenge is balancing exploration and movement rewards with safety constraints under partial observability.
The Safety Ant Velocity POMDP features: - Continuous 4D state space: [position_x, position_y, velocity_x, velocity_y] - Discrete action space: [0 (no force), 1 (small), 2 (medium), 3 (large force)] - Physics-based dynamics with force application and damping - Noisy observations of both position and velocity - Safety constraints on maximum velocity magnitude - Safety-focused metrics tracking violation rates
Key aspects: - Rewards encourage movement but heavily penalize safety violations - Episode terminates if velocity becomes critically high - Force direction is randomized to create uncertainty - Safety metrics track violation rates over episodes
- Classes:
SafeAntVelocityStateTransition: Physics simulation with force control SafeAntVelocityObservation: Noisy position and velocity observations SafeAntVelocityPOMDP: Main safety-critical velocity control environment
- class POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp.SafeAntVelocityObservation(next_state, action, position_noise=0.1, velocity_noise=0.2)[source]
Bases:
ObservationModelNoisy observation model for Safety Ant Velocity POMDP.
This model adds Gaussian noise to both position and velocity measurements, creating partial observability that makes velocity estimation challenging. Higher noise in velocity measurements reflects the difficulty of measuring velocity precisely in practice.
- next_state
True state after action execution
- action
Action that was taken (not used in observation generation)
- position_noise
Standard deviation of Gaussian noise for position
- velocity_noise
Standard deviation of Gaussian noise for velocity
- position
True position [x, y]
- velocity
True velocity [vx, vy]
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> # True state after physics simulation >>> true_state = np.array([0.6, -0.1, 1.2, 0.8]) # [x, y, vx, vy] >>> action = 2 >>> >>> # Create observation model >>> obs_model = SafeAntVelocityObservation( ... next_state=true_state, ... action=action, ... position_noise=0.1, ... velocity_noise=0.2 ... ) >>> >>> # Sample noisy observation >>> observation = obs_model.sample()[0] >>> # Returns [noisy_x, noisy_y, noisy_vx, noisy_vy] >>> # Position noise: ±0.1, velocity noise: ±0.2 >>> >>> # Calculate observation probability >>> prob = obs_model.probability([observation])
- probability(values)[source]
Calculate observation probabilities for given values.
- Parameters:
values (
List[Any]) – List of observation values to calculate probabilities for- Return type:
- Returns:
Array of probabilities corresponding to the input values
- Raises:
NotImplementedError – This method is not implemented by default. Subclasses should override if probability calculation is needed.
- sample(n_samples=1)[source]
Sample observations from the observation model.
- Parameters:
n_samples (
int) – Number of observation samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled observations of length n_samples.
Note
Subclasses must implement this method according to their specific observation generation logic.
- class POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp.SafeAntVelocityPOMDP(discount_factor, safe_velocity_threshold=2.0, max_force=1.0, dt=0.1, mass=1.0, damping=0.1, position_noise=0.1, velocity_noise=0.2, safety_violation_penalty=-100.0, movement_reward_scale=1.0, name='SafeVelocityPOMDP', output_dir=None, debug=False, use_queue_logger=False)[source]
Bases:
DiscreteActionsEnvironmentSafety-critical velocity control task formulated as a POMDP.
This environment presents a safety-critical control problem where an agent must navigate while keeping velocity below a safety threshold. The challenge comes from balancing exploration rewards with safety constraints under noisy velocity observations.
Problem Structure: - State: [position_x, position_y, velocity_x, velocity_y] (continuous) - Actions: [0=no force, 1=small, 2=medium, 3=large force] (discrete) - Observations: Noisy position and velocity measurements (continuous) - Rewards: Movement reward - safety violation penalty (if unsafe) - Safety constraint: velocity magnitude ≤ safe_velocity_threshold - Termination: Velocity exceeds 1.5x safety threshold
Safety Features: - Tracks safety and critical violation rates - Heavy penalties for constraint violations - Configurable safety thresholds and penalties - Physics simulation with uncertainty in force direction
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> >>> # Initialize environment >>> env = SafeAntVelocityPOMDP(discount_factor=0.99) >>> >>> # Get initial state and actions >>> initial_state = env.initial_state_dist().sample()[0] >>> actions = env.get_actions() >>> >>> # Sample complete step using convenience method >>> action = actions[0] >>> next_state, observation, reward = env.sample_next_step(initial_state, action) >>> >>> # Check terminal condition >>> env.is_terminal(initial_state) False
- Parameters:
- cache_visualization(history, cache_path)[source]
Cache animated visualization of the safety ant velocity episode.
Creates an animated GIF showing the ant’s movement trajectory with velocity vectors, safety zones, force applications, and safety constraint violations.
- Parameters:
- Raises:
ValueError – If history is empty or cache_path doesn’t end with .gif
TypeError – If cache_path is not a Path object
- Return type:
- compute_metrics(histories)[source]
Compute environment-specific metrics from episode histories.
This method can be overridden by subclasses to provide custom metric calculations beyond standard return and episode length.
- get_actions()[source]
Get all possible actions in the discrete action space.
Note
Subclasses must implement this method to enumerate all possible actions. This is used by planning algorithms that need to iterate over actions.
- initial_observation_dist()[source]
Get the initial observation distribution.
- Return type:
- Returns:
Distribution over initial observations
Note
Subclasses must implement this method to define initial observations.
- initial_state_dist()[source]
Get the initial state distribution.
- Return type:
- Returns:
Distribution over initial states
Note
Subclasses must implement this method to define the starting distribution.
- is_equal_observation(observation1, observation2)[source]
Check if two observations are equal.
- Parameters:
- Return type:
- Returns:
True if observations are considered equal, False otherwise
Note
Subclasses must implement this method to define observation equality. This is particularly important for discrete observation spaces.
- is_terminal(state)[source]
Check if a state is terminal.
- Parameters:
state (
ndarray) – State to check for terminal condition- Return type:
- Returns:
True if the state is terminal, False otherwise
Note
Subclasses must implement this method to define terminal conditions.
- observation_model(next_state, action)[source]
Get the observation model for a given next state and action.
- Parameters:
- Return type:
- Returns:
Observation model that can sample observations
Note
Subclasses must implement this method to define observation generation.
- reward(state, action)[source]
Calculate the immediate reward for a state-action pair.
- Parameters:
- Return type:
- Returns:
Immediate reward value
Note
Subclasses must implement this method to define reward structure.
- reward_batch(states, action)[source]
Calculate rewards for a batch of states given a single action.
Provides a loop-based default that subclasses can override with vectorized numpy implementations for better performance.
- sample_next_step(state, action)[source]
Sample a complete state transition step.
This convenience method combines state transition, observation generation, and reward calculation in a single operation.
- class POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp.SafeAntVelocityPOMDPMetrics(*values)[source]
Bases:
EnumMetric names for Safety Ant Velocity POMDP environment.
- CRITICAL_VIOLATION_RATE = 'critical_violation_rate'
- SAFETY_VIOLATION_RATE = 'safety_violation_rate'
- TOTAL_CRITICAL_VIOLATIONS = 'total_critical_violations'
- TOTAL_SAFETY_VIOLATIONS = 'total_safety_violations'
- class POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_pomdp.SafeAntVelocityStateTransition(state, action, dt=0.1, mass=1.0, damping=0.1, max_force=1.0)[source]
Bases:
StateTransitionModelPhysics-based state transition model for Safety Ant Velocity POMDP.
This model simulates simplified physics with force application, damping, and random force directions. The agent can choose different force magnitudes but cannot control the direction, creating uncertainty in the outcomes.
Physics equations: - acceleration = (force - damping * velocity) / mass - velocity += acceleration * dt - position += velocity * dt
- state
Current state [position_x, position_y, velocity_x, velocity_y]
- action
Force magnitude index (0=no force, 1=small, 2=medium, 3=large)
- dt
Time step for physics integration
- mass
Mass of the agent (affects acceleration)
- damping
Damping coefficient (opposes velocity)
- max_force
Maximum force magnitude
- force_scales
Force scaling factors for each action [0.0, 0.33, 0.67, 1.0]
- position
Current position [x, y]
- velocity
Current velocity [vx, vy]
Example
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> # Define current state [pos_x, pos_y, vel_x, vel_y] >>> state = np.array([0.5, -0.2, 1.0, 0.5]) >>> action = 2 # Apply medium force >>> >>> # Create transition model >>> transition = SafeAntVelocityStateTransition( ... state=state, ... action=action, ... dt=0.1, ... mass=1.0, ... damping=0.1, ... max_force=1.0 ... ) >>> >>> # Simulate physics step with random force direction >>> next_state = transition.sample()[0] >>> # Returns new [pos_x, pos_y, vel_x, vel_y] after physics >>> new_pos = next_state[:2] >>> new_vel = next_state[2:4]
- probability(values)[source]
Calculate transition probabilities for given next states.
Since the force direction is uniformly random over [-π, π], the probability distribution is continuous and depends on the distance from expected dynamics. We approximate this using a mixture of Gaussians representing the random force direction uncertainty.
- sample(n_samples=1)[source]
Sample next states from the transition model.
- Parameters:
n_samples (
int) – Number of next state samples to generate. Defaults to 1.- Return type:
- Returns:
List of sampled next states of length n_samples.
Note
Subclasses must implement this method according to their specific state transition dynamics.
POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_visualizer module
Visualization utilities for Safety Ant Velocity POMDP Environment.
This module provides visualization capabilities for the Safety Ant Velocity POMDP, creating animated GIF visualizations of episode trajectories with safety zones, velocity vectors, and safety constraint violations.
- Classes:
SafeAntVelocityVisualizer: Creates animated visualizations of episodes
- class POMDPPlanners.environments.safety_ant_velocity_pomdp.safety_ant_velocity_visualizer.SafeAntVelocityVisualizer(env)[source]
Bases:
objectVisualizer for Safety Ant Velocity POMDP episodes.
This class creates animated visualizations showing the ant’s movement trajectory, velocity vectors, force applications, safety zones, and safety constraint violations.
- env
The SafeAntVelocityPOMDP environment instance
- safe_velocity_threshold
Maximum safe velocity magnitude
- max_force
Maximum force that can be applied
- create_animation(history, cache_path)[source]
Create animated visualization of the safety ant velocity episode.
Creates an animated GIF showing the ant’s movement trajectory with velocity vectors, safety zones, force applications, and safety constraint violations.
- Parameters:
- Raises:
ValueError – If history is empty or cache_path doesn’t end with .gif
TypeError – If cache_path is not a Path object
- Return type: