POMDPPlanners.core.simulation package

class POMDPPlanners.core.simulation.CategoricalHyperParameter(choices, name)[source]

Bases: NamedTuple

Parameters:
choices: list[Any]

Alias for field number 0

id()[source]
Return type:

str

name: str

Alias for field number 1

class POMDPPlanners.core.simulation.DataBaseInterface[source]

Bases: ABC

Abstract interface for database operations used by task managers.

This class defines the interface for caching simulation results, allowing different database implementations to be used interchangeably.

Examples

>>> class MockDatabase(DataBaseInterface):
...     def __init__(self):
...         self.data = {}
...
...     def get(self, key):
...         return self.data.get(key)
...
...     def is_key_in_cache(self, key):
...         return key in self.data
...
...     def set(self, key, value):
...         self.data[key] = value
...
...     def clear(self):
...         self.data.clear()
>>>
>>> db = MockDatabase()
>>> db.set("test_key", "test_value")
>>> db.is_key_in_cache("test_key")
True
>>> db.get("test_key")
'test_value'
abstractmethod clear()[source]

Clear all data from the database.

abstractmethod get(key)[source]

Retrieve a value from the database.

Parameters:

key (str) – The key to retrieve

Returns:

The stored value

Return type:

Any

abstractmethod is_key_in_cache(key)[source]

Check if a key exists in the database.

Parameters:

key (str) – The key to check

Returns:

True if key exists, False otherwise

Return type:

bool

abstractmethod set(key, value)[source]

Store a value in the database.

Parameters:
  • key (str) – The key to store under

  • value (Any) – The value to store

class POMDPPlanners.core.simulation.EnvironmentRunParams(environment, belief, policies, num_episodes, num_steps)[source]

Bases: object

Configuration parameters for environment evaluation runs.

This frozen dataclass contains all parameters needed to configure and execute an environment evaluation run. Input validation is performed at construction time to ensure all parameters are valid before execution begins.

environment

POMDP environment instance to evaluate policies in

belief

Initial belief state for the environment

policies

Sequence of policy instances to evaluate (must be non-empty)

num_episodes

Number of episodes to run per policy (must be positive)

num_steps

Maximum number of steps per episode (must be positive)

Raises:
  • ValueError – If any numerical parameter is non-positive or if policies list is empty

  • TypeError – If environment, belief, or any policy has incorrect type

Parameters:
belief: Belief
property config_id: str
environment: Environment
num_episodes: int
num_steps: int
policies: Sequence[Policy]
class POMDPPlanners.core.simulation.ExperimentVisualizer[source]

Bases: ABC

Strategy for rendering aggregated per-environment experiment artifacts.

An ExperimentVisualizer is invoked once per environment after the simulation phase completes. It receives the per-policy episode results and is responsible for writing visualization artifacts (plots, animations, summary files) into a caller-provided output directory.

Implementations are dispatched to worker processes via the simulator’s task manager, so they MUST be picklable and MUST NOT capture live execution state (live clients, open sockets, asyncio tasks, file handles, threading primitives).

Note

This is an abstract base class and cannot be instantiated directly.

abstractmethod render(env_name, environment, policy_results, policies, output_dir, cache_visualizations)[source]

Render aggregated artifacts for one environment.

Parameters:
  • env_name (str) – Name of the environment being visualized.

  • environment (Environment) – Environment instance whose results are being rendered.

  • policy_results (Dict[str, List[History]]) – Mapping from policy name to a list of histories produced by that policy on this environment.

  • policies (Sequence[Policy]) – Sequence of policy instances corresponding to policy_results keys.

  • output_dir (Path) – Directory under which artifacts are written. The caller guarantees the directory exists.

  • cache_visualizations (bool) – When True, implementations should also produce per-episode environment-specific caches (e.g. agent trajectory animations) under output_dir.

Return type:

Path

Returns:

Path to the directory containing the rendered artifacts (typically output_dir itself).

class POMDPPlanners.core.simulation.History(history, discount_factor, average_state_sampling_time, average_action_time, average_observation_time, average_belief_update_time, average_reward_time, actual_num_steps, reach_terminal_state, policy_run_data)[source]

Bases: object

Complete history of a POMDP simulation episode.

This class stores the complete history of a simulation episode, including all step data, timing information, and metadata about the episode.

Parameters:
history

List of StepData objects representing each step

discount_factor

Discount factor used for reward calculation

average_state_sampling_time

Average time spent sampling states

average_action_time

Average time spent selecting actions

average_observation_time

Average time spent processing observations

average_belief_update_time

Average time spent updating beliefs

average_reward_time

Average time spent calculating rewards

actual_num_steps

Actual number of steps taken in the episode

reach_terminal_state

Whether the episode reached a terminal state

policy_run_data

Additional data from the policy execution

Examples

>>> from POMDPPlanners.environments.tiger_pomdp import TigerPOMDP
>>> from POMDPPlanners.core.belief import WeightedParticleBelief
>>> from POMDPPlanners.core.policy import PolicyRunData
>>>
>>> env = TigerPOMDP(discount_factor=0.95)
>>> import numpy as np
>>> belief = WeightedParticleBelief(env.states, np.array([0.0, -0.1]))
>>> step = StepData("tiger_left", "listen", "tiger_left", "tiger_left", -1.0, belief)
>>> policy_data = PolicyRunData(info_variables=[])
>>>
>>> history = History(
...     history=[step],
...     discount_factor=0.95,
...     average_state_sampling_time=0.001,
...     average_action_time=0.01,
...     average_observation_time=0.002,
...     average_belief_update_time=0.005,
...     average_reward_time=0.001,
...     actual_num_steps=1,
...     reach_terminal_state=False,
...     policy_run_data=policy_data
... )
>>> history.discount_factor
0.95
>>> len(history.history)
1
>>> history.reach_terminal_state
False
actual_num_steps: int
average_action_time: float
average_belief_update_time: float
average_observation_time: float
average_reward_time: float
average_state_sampling_time: float
discount_factor: float
classmethod from_dict(data)[source]

Create a History instance from a dictionary.

Parameters:

data (dict) – Dictionary containing History data

Returns:

New History instance

Return type:

History

history: List[StepData]
policy_run_data: List[PolicyRunData]
reach_terminal_state: bool
to_dict()[source]

Convert History object to dictionary.

Returns:

Dictionary representation of the History object

Return type:

dict

class POMDPPlanners.core.simulation.HyperParameterRunParams(environment, belief, hyper_param_planner_config, num_episodes, num_steps, n_trials, parameters_to_optimize)[source]

Bases: object

Configuration parameters for hyperparameter optimization runs.

This frozen dataclass contains all parameters needed to configure and execute a hyperparameter optimization run. Input validation is performed at construction time to ensure all parameters are valid before optimization begins.

environment

POMDP environment instance to optimize policies for

belief

Initial belief state for the environment

hyper_param_planner_config

Configuration defining policy class, hyperparameters, and constant parameters for optimization

num_episodes

Number of episodes to run per trial (must be positive)

num_steps

Maximum number of steps per episode (must be positive)

n_trials

Number of optimization trials to execute (must be positive)

parameters_to_optimize

List of (metric_name, direction) tuples specifying which metrics to optimize and in which direction (maximize/minimize)

Raises:
  • ValueError – If any numerical parameter is non-positive, if hyperparameters or parameters_to_optimize are empty, or if metric names are invalid

  • TypeError – If environment, belief, or policy_cls have incorrect types

Parameters:
belief: Belief
property config_id: str
environment: Environment
hyper_param_planner_config: HyperParamPlannerConfig
n_trials: int
num_episodes: int
num_steps: int
parameters_to_optimize: List[Tuple[str, HyperParameterOptimizationDirection]]
class POMDPPlanners.core.simulation.MetricValue(name, value, lower_confidence_bound, upper_confidence_bound)[source]

Bases: NamedTuple

Parameters:
lower_confidence_bound: float

Alias for field number 2

name: str

Alias for field number 0

upper_confidence_bound: float

Alias for field number 3

value: float

Alias for field number 1

class POMDPPlanners.core.simulation.NumericalHyperParameter(low, high, name)[source]

Bases: NamedTuple

Parameters:
high: int | float

Alias for field number 1

id()[source]
Return type:

str

low: int | float

Alias for field number 0

name: str

Alias for field number 2

class POMDPPlanners.core.simulation.ParallelizationLevel(*values)[source]

Bases: Enum

Level at which parallelization is applied during hyperparameter tuning.

OPTUNA_TRIALS

Parallelize across Optuna trials. Multiple trials run concurrently while episodes within each trial run sequentially.

EPISODES

Parallelize across episodes within each trial. Optuna trials run sequentially while episodes within each trial run concurrently.

EPISODES = 'episodes'
OPTUNA_TRIALS = 'optuna_trials'
class POMDPPlanners.core.simulation.SimulationTask[source]

Bases: ABC

Abstract base class for simulation tasks.

This class defines the interface that all simulation tasks must implement. A simulation task represents a unit of work that can be executed and cached.

Examples

>>> class MySimulationTask(SimulationTask):
...     def __init__(self, config_id):
...         self.config_id = config_id
...
...     def run(self):
...         return f"Result for {self.config_id}"
...
...     def get_config_id(self):
...         return self.config_id
>>>
>>> task = MySimulationTask("test_config")
>>> task.get_config_id()
'test_config'
>>> task.run()
'Result for test_config'
abstractmethod get_config_id()[source]

Get a unique identifier for this task’s configuration.

Returns:

Unique configuration identifier for caching

Return type:

str

abstractmethod run()[source]

Execute the simulation task.

Returns:

The result of the simulation task

Return type:

Any

class POMDPPlanners.core.simulation.StepData(state, action, next_state, observation, reward, belief)[source]

Bases: NamedTuple

Parameters:
action: Any

Alias for field number 1

belief: Belief

Alias for field number 5

next_state: Any

Alias for field number 2

observation: Any

Alias for field number 3

reward: float | None

Alias for field number 4

state: Any

Alias for field number 0

class POMDPPlanners.core.simulation.TaskManager[source]

Bases: ABC

Abstract base class for task managers.

Task managers coordinate the execution of simulation tasks, handling caching, parallelization, and result collection.

Examples

>>> class SimpleTaskManager(TaskManager):
...     def run_tasks(self, tasks, task_identifiers):
...         results = []
...         identifiers = []
...         for task, identifier in zip(tasks, task_identifiers):
...             result = task.run()
...             results.append(result)
...             identifiers.append(identifier)
...         return results, identifiers
>>>
>>> class MyTask(SimulationTask):
...     def run(self): return "result"
...     def get_config_id(self): return "config"
>>>
>>> manager = SimpleTaskManager()
>>> tasks = [MyTask()]
>>> identifiers = ["task1"]
>>> results, ids = manager.run_tasks(tasks, identifiers)
>>> results[0]
'result'
>>> ids[0]
'task1'
abstractmethod run_tasks(tasks, task_identifiers)[source]

Execute a list of simulation tasks.

Parameters:
  • tasks (List[SimulationTask]) – List of simulation tasks to execute

  • task_identifiers (list) – List of identifiers for each task

Returns:

Results and successful task identifiers

Return type:

Tuple[List[Any], list]

class POMDPPlanners.core.simulation.TaskManagerExternalDB(cache_db, cache_dir=None, logger_debug=False, use_queue_logger=False, console_output=True, no_logs=False)[source]

Bases: TaskManager

Task manager that uses an external database for caching.

This task manager implements caching functionality using an external database interface, allowing simulation results to be cached and reused across runs.

Parameters:
cache_db

Database interface for caching results

cache_dir

Optional directory for logging and cache files

logger_debug

Whether to enable debug logging

use_queue_logger

Whether to use queue-based logging

Examples

>>> class MockDatabase(DataBaseInterface):
...     def __init__(self):
...         self.data = {}
...     def get(self, key): return self.data.get(key)
...     def is_key_in_cache(self, key): return key in self.data
...     def set(self, key, value): self.data[key] = value
...     def clear(self): self.data.clear()
>>>
>>> class MockTaskManager(TaskManagerExternalDB):
...     def _run_tasks(self, tasks):
...         return [task.run() for task in tasks]
>>>
>>> class MyTask(SimulationTask):
...     def run(self): return "cached_result"
...     def get_config_id(self): return "test_config"
>>>
>>> db = MockDatabase()
>>> manager = MockTaskManager(db)
>>> tasks = [MyTask()]
>>> identifiers = ["task1"]
>>> results, ids = manager.run_tasks(tasks, identifiers)
>>> results[0]
'cached_result'
>>> db.is_key_in_cache("test_config")
True
property logger: Logger

Get the logger instance for this task manager.

Returns:

Configured logger instance

Return type:

logging.Logger

run_tasks(tasks, task_identifiers)[source]

Execute tasks with caching support.

This method checks the cache for existing results before executing tasks, runs only uncached tasks, and stores new results in the cache.

Parameters:
  • tasks (List[SimulationTask]) – List of simulation tasks to execute

  • task_identifiers (list) – List of identifiers for each task

Returns:

Results and successful task identifiers

Return type:

Tuple[List[Any], list]

POMDPPlanners.core.simulation.history_to_discounted_return_value(history)[source]

Calculate the discounted return value from a simulation history.

This function computes the total discounted reward for an episode, where rewards are discounted by the discount factor raised to the power of the step index.

Parameters:

history (History) – The simulation history containing step data and discount factor

Returns:

The total discounted return value

Return type:

float

Submodules

POMDPPlanners.core.simulation.history module

class POMDPPlanners.core.simulation.history.History(history, discount_factor, average_state_sampling_time, average_action_time, average_observation_time, average_belief_update_time, average_reward_time, actual_num_steps, reach_terminal_state, policy_run_data)[source]

Bases: object

Complete history of a POMDP simulation episode.

This class stores the complete history of a simulation episode, including all step data, timing information, and metadata about the episode.

Parameters:
history

List of StepData objects representing each step

discount_factor

Discount factor used for reward calculation

average_state_sampling_time

Average time spent sampling states

average_action_time

Average time spent selecting actions

average_observation_time

Average time spent processing observations

average_belief_update_time

Average time spent updating beliefs

average_reward_time

Average time spent calculating rewards

actual_num_steps

Actual number of steps taken in the episode

reach_terminal_state

Whether the episode reached a terminal state

policy_run_data

Additional data from the policy execution

Examples

>>> from POMDPPlanners.environments.tiger_pomdp import TigerPOMDP
>>> from POMDPPlanners.core.belief import WeightedParticleBelief
>>> from POMDPPlanners.core.policy import PolicyRunData
>>>
>>> env = TigerPOMDP(discount_factor=0.95)
>>> import numpy as np
>>> belief = WeightedParticleBelief(env.states, np.array([0.0, -0.1]))
>>> step = StepData("tiger_left", "listen", "tiger_left", "tiger_left", -1.0, belief)
>>> policy_data = PolicyRunData(info_variables=[])
>>>
>>> history = History(
...     history=[step],
...     discount_factor=0.95,
...     average_state_sampling_time=0.001,
...     average_action_time=0.01,
...     average_observation_time=0.002,
...     average_belief_update_time=0.005,
...     average_reward_time=0.001,
...     actual_num_steps=1,
...     reach_terminal_state=False,
...     policy_run_data=policy_data
... )
>>> history.discount_factor
0.95
>>> len(history.history)
1
>>> history.reach_terminal_state
False
actual_num_steps: int
average_action_time: float
average_belief_update_time: float
average_observation_time: float
average_reward_time: float
average_state_sampling_time: float
discount_factor: float
classmethod from_dict(data)[source]

Create a History instance from a dictionary.

Parameters:

data (dict) – Dictionary containing History data

Returns:

New History instance

Return type:

History

history: List[StepData]
policy_run_data: List[PolicyRunData]
reach_terminal_state: bool
to_dict()[source]

Convert History object to dictionary.

Returns:

Dictionary representation of the History object

Return type:

dict

class POMDPPlanners.core.simulation.history.StepData(state, action, next_state, observation, reward, belief)[source]

Bases: NamedTuple

Parameters:
action: Any

Alias for field number 1

belief: Belief

Alias for field number 5

next_state: Any

Alias for field number 2

observation: Any

Alias for field number 3

reward: float | None

Alias for field number 4

state: Any

Alias for field number 0

POMDPPlanners.core.simulation.history.history_to_discounted_return_value(history)[source]

Calculate the discounted return value from a simulation history.

This function computes the total discounted reward for an episode, where rewards are discounted by the discount factor raised to the power of the step index.

Parameters:

history (History) – The simulation history containing step data and discount factor

Returns:

The total discounted return value

Return type:

float

POMDPPlanners.core.simulation.hyperparameter_tuning module

class POMDPPlanners.core.simulation.hyperparameter_tuning.CategoricalHyperParameter(choices, name)[source]

Bases: NamedTuple

Parameters:
choices: list[Any]

Alias for field number 0

id()[source]
Return type:

str

name: str

Alias for field number 1

class POMDPPlanners.core.simulation.hyperparameter_tuning.HyperParamPlannerConfig(policy_cls, hyper_parameters, constant_parameters, training_hyper_parameters=(), training_constant_parameters=None)[source]

Bases: object

Parameters:
property config_id: str
constant_parameters: Dict[str, Any]
hyper_parameters: Sequence[CategoricalHyperParameter | NumericalHyperParameter]
policy_cls: Type[Policy]
training_constant_parameters: Dict[str, Any] = None
training_hyper_parameters: Sequence[CategoricalHyperParameter | NumericalHyperParameter] = ()
class POMDPPlanners.core.simulation.hyperparameter_tuning.HyperParamPlannerConfigGenerator[source]

Bases: ABC

abstractmethod generate(environment)[source]
Return type:

HyperParamPlannerConfig

Parameters:

environment (Environment)

abstractmethod get_planner_space_info()[source]
Return type:

PolicySpaceInfo

class POMDPPlanners.core.simulation.hyperparameter_tuning.HyperParameterOptimizationDirection(*values)[source]

Bases: Enum

MAXIMIZE = 'maximize'
MINIMIZE = 'minimize'
class POMDPPlanners.core.simulation.hyperparameter_tuning.HyperParameterRunParams(environment, belief, hyper_param_planner_config, num_episodes, num_steps, n_trials, parameters_to_optimize)[source]

Bases: object

Configuration parameters for hyperparameter optimization runs.

This frozen dataclass contains all parameters needed to configure and execute a hyperparameter optimization run. Input validation is performed at construction time to ensure all parameters are valid before optimization begins.

environment

POMDP environment instance to optimize policies for

belief

Initial belief state for the environment

hyper_param_planner_config

Configuration defining policy class, hyperparameters, and constant parameters for optimization

num_episodes

Number of episodes to run per trial (must be positive)

num_steps

Maximum number of steps per episode (must be positive)

n_trials

Number of optimization trials to execute (must be positive)

parameters_to_optimize

List of (metric_name, direction) tuples specifying which metrics to optimize and in which direction (maximize/minimize)

Raises:
  • ValueError – If any numerical parameter is non-positive, if hyperparameters or parameters_to_optimize are empty, or if metric names are invalid

  • TypeError – If environment, belief, or policy_cls have incorrect types

Parameters:
belief: Belief
property config_id: str
environment: Environment
hyper_param_planner_config: HyperParamPlannerConfig
n_trials: int
num_episodes: int
num_steps: int
parameters_to_optimize: List[Tuple[str, HyperParameterOptimizationDirection]]
class POMDPPlanners.core.simulation.hyperparameter_tuning.NumericalHyperParameter(low, high, name)[source]

Bases: NamedTuple

Parameters:
high: int | float

Alias for field number 1

id()[source]
Return type:

str

low: int | float

Alias for field number 0

name: str

Alias for field number 2

class POMDPPlanners.core.simulation.hyperparameter_tuning.OptimizedPolicyResult(environment, policy, chosen_hyper_parameters, num_episodes, num_steps, parameters_to_optimize, optimized_metric_values)[source]

Bases: object

Result of hyperparameter optimization containing the optimized policy and metrics.

This frozen dataclass contains all information about a completed hyperparameter optimization run, including the optimized policy, chosen hyperparameters, and achieved metric values. Input validation is performed at construction time.

environment

POMDP environment instance used for optimization

policy

Optimized policy instance with best hyperparameters

chosen_hyper_parameters

Dictionary of hyperparameter names to chosen values

num_episodes

Number of episodes run per trial (must be positive)

num_steps

Maximum number of steps per episode (must be positive)

parameters_to_optimize

List of (metric_name, direction) tuples that were optimized

optimized_metric_values

Dictionary mapping metric names to achieved values (None if metric value not found)

Raises:
  • ValueError – If num_episodes or num_steps are non-positive, if chosen_hyper_parameters or parameters_to_optimize are empty, or if metric names are invalid

  • TypeError – If environment, policy types are incorrect, or if data structures have wrong types

Parameters:
chosen_hyper_parameters: dict
environment: Environment
num_episodes: int
num_steps: int
optimized_metric_values: Dict[str, float | None]
parameters_to_optimize: List[Tuple[str, HyperParameterOptimizationDirection]]
policy: Policy
class POMDPPlanners.core.simulation.hyperparameter_tuning.ParallelizationLevel(*values)[source]

Bases: Enum

Level at which parallelization is applied during hyperparameter tuning.

OPTUNA_TRIALS

Parallelize across Optuna trials. Multiple trials run concurrently while episodes within each trial run sequentially.

EPISODES

Parallelize across episodes within each trial. Optuna trials run sequentially while episodes within each trial run concurrently.

EPISODES = 'episodes'
OPTUNA_TRIALS = 'optuna_trials'
class POMDPPlanners.core.simulation.hyperparameter_tuning.ParameterToOptimizeMapper[source]

Bases: ABC

abstractmethod generate(environment, policy_cls=None)[source]
Return type:

List[Tuple[str, HyperParameterOptimizationDirection]]

Parameters:

POMDPPlanners.core.simulation.metrics module

class POMDPPlanners.core.simulation.metrics.MetricValue(name, value, lower_confidence_bound, upper_confidence_bound)[source]

Bases: NamedTuple

Parameters:
lower_confidence_bound: float

Alias for field number 2

name: str

Alias for field number 0

upper_confidence_bound: float

Alias for field number 3

value: float

Alias for field number 1

POMDPPlanners.core.simulation.simulation_configs module

class POMDPPlanners.core.simulation.simulation_configs.EnvironmentRunParams(environment, belief, policies, num_episodes, num_steps)[source]

Bases: object

Configuration parameters for environment evaluation runs.

This frozen dataclass contains all parameters needed to configure and execute an environment evaluation run. Input validation is performed at construction time to ensure all parameters are valid before execution begins.

environment

POMDP environment instance to evaluate policies in

belief

Initial belief state for the environment

policies

Sequence of policy instances to evaluate (must be non-empty)

num_episodes

Number of episodes to run per policy (must be positive)

num_steps

Maximum number of steps per episode (must be positive)

Raises:
  • ValueError – If any numerical parameter is non-positive or if policies list is empty

  • TypeError – If environment, belief, or any policy has incorrect type

Parameters:
belief: Belief
property config_id: str
environment: Environment
num_episodes: int
num_steps: int
policies: Sequence[Policy]
class POMDPPlanners.core.simulation.simulation_configs.EvaluationExperimentConfigCreator[source]

Bases: ABC

get_experiment_configs()[source]
Return type:

List[EnvironmentRunParams]

class POMDPPlanners.core.simulation.simulation_configs.HyperparameterOptimizationExperimentConfigCreator[source]

Bases: ABC

get_experiment_configs()[source]
Return type:

List[HyperParameterRunParams]

class POMDPPlanners.core.simulation.simulation_configs.PlannerGenerator[source]

Bases: ABC

abstractmethod generate(environment)[source]
Return type:

Policy

Parameters:

environment (Environment)

abstractmethod get_planner_space_info()[source]
Return type:

PolicySpaceInfo

POMDPPlanners.core.simulation.tasks module

class POMDPPlanners.core.simulation.tasks.DataBaseInterface[source]

Bases: ABC

Abstract interface for database operations used by task managers.

This class defines the interface for caching simulation results, allowing different database implementations to be used interchangeably.

Examples

>>> class MockDatabase(DataBaseInterface):
...     def __init__(self):
...         self.data = {}
...
...     def get(self, key):
...         return self.data.get(key)
...
...     def is_key_in_cache(self, key):
...         return key in self.data
...
...     def set(self, key, value):
...         self.data[key] = value
...
...     def clear(self):
...         self.data.clear()
>>>
>>> db = MockDatabase()
>>> db.set("test_key", "test_value")
>>> db.is_key_in_cache("test_key")
True
>>> db.get("test_key")
'test_value'
abstractmethod clear()[source]

Clear all data from the database.

abstractmethod get(key)[source]

Retrieve a value from the database.

Parameters:

key (str) – The key to retrieve

Returns:

The stored value

Return type:

Any

abstractmethod is_key_in_cache(key)[source]

Check if a key exists in the database.

Parameters:

key (str) – The key to check

Returns:

True if key exists, False otherwise

Return type:

bool

abstractmethod set(key, value)[source]

Store a value in the database.

Parameters:
  • key (str) – The key to store under

  • value (Any) – The value to store

class POMDPPlanners.core.simulation.tasks.SimulationTask[source]

Bases: ABC

Abstract base class for simulation tasks.

This class defines the interface that all simulation tasks must implement. A simulation task represents a unit of work that can be executed and cached.

Examples

>>> class MySimulationTask(SimulationTask):
...     def __init__(self, config_id):
...         self.config_id = config_id
...
...     def run(self):
...         return f"Result for {self.config_id}"
...
...     def get_config_id(self):
...         return self.config_id
>>>
>>> task = MySimulationTask("test_config")
>>> task.get_config_id()
'test_config'
>>> task.run()
'Result for test_config'
abstractmethod get_config_id()[source]

Get a unique identifier for this task’s configuration.

Returns:

Unique configuration identifier for caching

Return type:

str

abstractmethod run()[source]

Execute the simulation task.

Returns:

The result of the simulation task

Return type:

Any

class POMDPPlanners.core.simulation.tasks.TaskManager[source]

Bases: ABC

Abstract base class for task managers.

Task managers coordinate the execution of simulation tasks, handling caching, parallelization, and result collection.

Examples

>>> class SimpleTaskManager(TaskManager):
...     def run_tasks(self, tasks, task_identifiers):
...         results = []
...         identifiers = []
...         for task, identifier in zip(tasks, task_identifiers):
...             result = task.run()
...             results.append(result)
...             identifiers.append(identifier)
...         return results, identifiers
>>>
>>> class MyTask(SimulationTask):
...     def run(self): return "result"
...     def get_config_id(self): return "config"
>>>
>>> manager = SimpleTaskManager()
>>> tasks = [MyTask()]
>>> identifiers = ["task1"]
>>> results, ids = manager.run_tasks(tasks, identifiers)
>>> results[0]
'result'
>>> ids[0]
'task1'
abstractmethod run_tasks(tasks, task_identifiers)[source]

Execute a list of simulation tasks.

Parameters:
  • tasks (List[SimulationTask]) – List of simulation tasks to execute

  • task_identifiers (list) – List of identifiers for each task

Returns:

Results and successful task identifiers

Return type:

Tuple[List[Any], list]

class POMDPPlanners.core.simulation.tasks.TaskManagerExternalDB(cache_db, cache_dir=None, logger_debug=False, use_queue_logger=False, console_output=True, no_logs=False)[source]

Bases: TaskManager

Task manager that uses an external database for caching.

This task manager implements caching functionality using an external database interface, allowing simulation results to be cached and reused across runs.

Parameters:
cache_db

Database interface for caching results

cache_dir

Optional directory for logging and cache files

logger_debug

Whether to enable debug logging

use_queue_logger

Whether to use queue-based logging

Examples

>>> class MockDatabase(DataBaseInterface):
...     def __init__(self):
...         self.data = {}
...     def get(self, key): return self.data.get(key)
...     def is_key_in_cache(self, key): return key in self.data
...     def set(self, key, value): self.data[key] = value
...     def clear(self): self.data.clear()
>>>
>>> class MockTaskManager(TaskManagerExternalDB):
...     def _run_tasks(self, tasks):
...         return [task.run() for task in tasks]
>>>
>>> class MyTask(SimulationTask):
...     def run(self): return "cached_result"
...     def get_config_id(self): return "test_config"
>>>
>>> db = MockDatabase()
>>> manager = MockTaskManager(db)
>>> tasks = [MyTask()]
>>> identifiers = ["task1"]
>>> results, ids = manager.run_tasks(tasks, identifiers)
>>> results[0]
'cached_result'
>>> db.is_key_in_cache("test_config")
True
property logger: Logger

Get the logger instance for this task manager.

Returns:

Configured logger instance

Return type:

logging.Logger

run_tasks(tasks, task_identifiers)[source]

Execute tasks with caching support.

This method checks the cache for existing results before executing tasks, runs only uncached tasks, and stores new results in the cache.

Parameters:
  • tasks (List[SimulationTask]) – List of simulation tasks to execute

  • task_identifiers (list) – List of identifiers for each task

Returns:

Results and successful task identifiers

Return type:

Tuple[List[Any], list]

POMDPPlanners.core.simulation.visualizers module

Abstractions for rendering aggregated experiment visualizations.

Implementations are dispatched to worker processes via the simulator’s task manager (Dask, Joblib, PBS, Sequential), so they MUST be picklable and MUST NOT capture live execution state (clients, sockets, async tasks, file handles).

class POMDPPlanners.core.simulation.visualizers.ExperimentVisualizer[source]

Bases: ABC

Strategy for rendering aggregated per-environment experiment artifacts.

An ExperimentVisualizer is invoked once per environment after the simulation phase completes. It receives the per-policy episode results and is responsible for writing visualization artifacts (plots, animations, summary files) into a caller-provided output directory.

Implementations are dispatched to worker processes via the simulator’s task manager, so they MUST be picklable and MUST NOT capture live execution state (live clients, open sockets, asyncio tasks, file handles, threading primitives).

Note

This is an abstract base class and cannot be instantiated directly.

abstractmethod render(env_name, environment, policy_results, policies, output_dir, cache_visualizations)[source]

Render aggregated artifacts for one environment.

Parameters:
  • env_name (str) – Name of the environment being visualized.

  • environment (Environment) – Environment instance whose results are being rendered.

  • policy_results (Dict[str, List[History]]) – Mapping from policy name to a list of histories produced by that policy on this environment.

  • policies (Sequence[Policy]) – Sequence of policy instances corresponding to policy_results keys.

  • output_dir (Path) – Directory under which artifacts are written. The caller guarantees the directory exists.

  • cache_visualizations (bool) – When True, implementations should also produce per-episode environment-specific caches (e.g. agent trajectory animations) under output_dir.

Return type:

Path

Returns:

Path to the directory containing the rendered artifacts (typically output_dir itself).