POMDPPlanners.planners package

Policy factory module for creating POMDP policies.

class POMDPPlanners.planners.BetaZero(environment, discount_factor, depth, name, action_sampler, k_a=1.0, alpha_a=0.5, k_o=1.0, alpha_o=0.5, exploration_constant=1.0, time_out_in_seconds=None, n_simulations=None, min_visit_count_per_action=1, network=None, belief_representation=None, state_dim=None, z_q=1.0, z_n=1.0, temperature=1.0, n_buffer=1, training_batch_size=256, training_epochs=10, learning_rate=0.001, weight_decay=0.0001, hidden_sizes=(128, 128), track_gradients=False, normalize_inputs=True, normalize_values=True, log_path=None, debug=False, use_queue_logger=False)[source]

Bases: DoubleProgressiveWideningMCTSPolicy, TrainablePolicy

BetaZero: Neural MCTS for POMDPs.

Extends DoubleProgressiveWideningMCTSPolicy with three key innovations from the BetaZero paper:

  1. PUCT selection: Replaces UCB1 using learned policy priors.

  2. Neural value estimation: Replaces random rollouts at leaf nodes.

  3. Policy iteration via ``fit()``: Collects episodes, computes Q-weighted policy targets, and trains the network.

The planner has two modes: - Online planning via action(belief): builds an MCTS tree with

PUCT and network value estimates.

  • Offline training via fit(): alternates data collection and network training.

Parameters:
network

Dual-head neural network for policy and value prediction.

belief_representation

Belief → feature-vector mapping φ(b).

z_q

Exponent for Q-value term in policy target.

z_n

Exponent for visit-count term in policy target.

temperature

Temperature τ for sharpening/smoothing policy target.

Example

>>> import numpy as np
>>> np.random.seed(42)
>>> from POMDPPlanners.environments.tiger_pomdp import TigerPOMDP
>>> from POMDPPlanners.core.belief import get_initial_belief
>>> from POMDPPlanners.utils.action_samplers import DiscreteActionSampler
>>> from POMDPPlanners.planners.mcts_planners.beta_zero.beta_zero import BetaZero
>>>
>>> env = TigerPOMDP(discount_factor=0.95)
>>> sampler = DiscreteActionSampler(env.get_actions())
>>> planner = BetaZero(
...     environment=env,
...     discount_factor=0.95,
...     depth=3,
...     name="BetaZero_Tiger",
...     action_sampler=sampler,
...     n_simulations=20,
...     state_dim=1,
... )
>>> belief = get_initial_belief(env, n_particles=10)
>>> actions, run_data = planner.action(belief)
>>> actions[0] in env.get_actions()
True
action(belief)[source]

Select an action via MCTS with PUCT and network value estimates.

If data collection is active (during fit()), also stores a pending training example from the tree root.

Return type:

Tuple[List[Any], PolicyRunData]

Parameters:

belief (Belief)

begin_collecting()[source]

Signal the start of a data-collection phase.

Return type:

None

buffer_size()[source]

Return the number of examples currently in the replay buffer.

Return type:

int

collect_episodes_batched(initial_belief_fn, n_episodes, episode_length)[source]

Collect training data using fast batched (network-only) rollouts.

Parameters:
  • initial_belief_fn (Callable[[], Belief]) – Callable returning a fresh initial belief.

  • n_episodes (int) – Number of episodes to collect.

  • episode_length (int) – Maximum steps per episode.

Return type:

None

end_collecting()[source]

Signal the end of a data-collection phase.

Return type:

None

finalize_episode(history)[source]

Process a completed episode into the replay buffer.

Parameters:

history – The History returned by the episode runner.

Return type:

None

get_metric_keys()[source]

Return the loss-metric key names produced by train_step().

Return type:

List[str]

get_network()[source]

Return the underlying trainable network, or None if not applicable.

Override in concrete policies to enable weight-histogram logging in TensorBoardCallback.

Return type:

AbstractBetaZeroNetwork

classmethod get_space_info()[source]

Get information about action and observation spaces.

Default implementation returns MIXED space types, which is appropriate for most progressive widening MCTS planners that support both discrete and continuous action spaces through the action sampler interface.

Subclasses can override this method to specify different space requirements (e.g., PFT_DPW specifies CONTINUOUS action space).

Return type:

PolicySpaceInfo

Returns:

PolicySpaceInfo with MIXED space types for both actions and observations

load_normalization_stats(filepath)[source]

Restore normalization statistics from a saved directory.

Should be called after network.load_weights() when loading a checkpoint that was saved with normalisation enabled.

Parameters:

filepath – Directory previously returned by save().

Return type:

None

prepare_episode()[source]

Reset per-episode scratch state before an episode begins.

Return type:

None

save(filepath=None)[source]

Save policy config and network weights to a directory.

Parameters:

filepath – Directory path. If None, uses default.

Return type:

Path

Returns:

Directory where the policy was saved.

train_step()[source]

Train the network on the current replay buffer.

Return type:

Dict[str, List[float]]

Returns:

Per-key lists of loss values produced during training.

class POMDPPlanners.planners.ConstrainedZero(environment, discount_factor, depth, name, action_sampler, failure_fn, delta_0=0.01, eta=1e-05, delta_compounding=1.0, k_a=1.0, alpha_a=0.5, k_o=1.0, alpha_o=0.5, exploration_constant=1.0, time_out_in_seconds=None, n_simulations=None, min_visit_count_per_action=1, network=None, belief_representation=None, state_dim=None, z_q=1.0, z_n=1.0, temperature=1.0, n_buffer=1, training_batch_size=256, training_epochs=10, learning_rate=0.001, weight_decay=0.0001, hidden_sizes=(128, 128), use_dropout=True, p_dropout=0.2, track_gradients=False, normalize_inputs=True, normalize_values=True, log_path=None, debug=False, use_queue_logger=False)[source]

Bases: BetaZero

ConstrainedZero: Neural MCTS for Chance-Constrained POMDPs.

Extends BetaZero with:

  1. 3-head network: Adds a failure probability head alongside policy and value.

  2. SPUCT selection: Safety-constrained PUCT that masks unsafe actions.

  3. Adaptive Delta (conformal inference): Calibrates the failure threshold during tree search using online conformal inference.

  4. Failure propagation: Tracks failure probability per action node using p = p_immediate + delta_compounding * (1 - p_immediate) * p_next.

  5. Constrained policy targets: Applies safety mask during target computation.

Parameters:
failure_fn

User-provided function state -> bool defining failure.

delta_0

Nominal failure probability threshold.

eta

Learning rate for adaptive Delta calibration.

delta_compounding

Discount factor for failure propagation.

Example

>>> import numpy as np
>>> np.random.seed(42)
>>> from POMDPPlanners.environments.tiger_pomdp import TigerPOMDP
>>> from POMDPPlanners.core.belief import get_initial_belief
>>> from POMDPPlanners.utils.action_samplers import DiscreteActionSampler
>>> from POMDPPlanners.planners.mcts_planners.constrained_zero.constrained_zero import ConstrainedZero
>>>
>>> env = TigerPOMDP(discount_factor=0.95)
>>> sampler = DiscreteActionSampler(env.get_actions())
>>> planner = ConstrainedZero(
...     environment=env,
...     discount_factor=0.95,
...     depth=3,
...     name="CZ_Tiger",
...     action_sampler=sampler,
...     n_simulations=20,
...     state_dim=1,
...     failure_fn=lambda s: False,
... )
>>> belief = get_initial_belief(env, n_particles=10)
>>> actions, run_data = planner.action(belief)
>>> actions[0] in env.get_actions()
True
get_metric_keys()[source]

Return the loss-metric key names produced by train_step().

Return type:

List[str]

network: ConstrainedZeroNetwork
class POMDPPlanners.planners.DiscreteActionSequencesPlanner(environment, discount_factor, name, depth, n_return_samples, log_path=None, debug=False, use_queue_logger=False)[source]

Bases: Policy

Open-loop planner for discrete action spaces using exhaustive sequence search.

This planner uses an open-loop strategy to find optimal action sequences by enumerating all possible action sequences up to a specified depth and selecting the sequence with the highest expected return. It’s particularly useful for problems with small action spaces and short planning horizons.

The algorithm works by: 1. Generating all possible action sequences of the specified depth 2. For each sequence, estimating the expected return through Monte Carlo sampling 3. Selecting the sequence with the maximum expected return 4. Returning the first action in the optimal sequence

Open-Loop vs Closed-Loop Planning: - Open-loop: Plans a complete action sequence without considering future observations - Closed-loop: Re-plans at each step based on new observations (like MCTS algorithms)

This approach is computationally intensive (O(|A|^depth)) but provides optimal solutions for the open-loop setting when the action space is manageable.

Parameters:
  • environment (DiscreteActionsEnvironment) – The discrete actions POMDP environment

  • discount_factor (float) – Discount factor for future rewards (0 < γ ≤ 1)

  • name (str) – Identifier for the planner instance

  • depth (int) – Planning horizon (number of actions in sequence)

  • n_return_samples (int) – Number of Monte Carlo samples for return estimation

  • log_path (Optional[Path]) – Optional path for logging planner execution details

  • debug (bool) – Enable debug mode for detailed execution traces

  • use_queue_logger (bool)

Example

>>> import numpy as np
>>> from POMDPPlanners.environments.tiger_pomdp import TigerPOMDP
>>> from POMDPPlanners.core.belief import get_initial_belief
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Create environment and planner
>>> tiger = TigerPOMDP(discount_factor=0.95)
>>> planner = DiscreteActionSequencesPlanner(
...     environment=tiger,
...     discount_factor=0.95,
...     name="ExamplePlanner",
...     depth=2,
...     n_return_samples=10
... )
>>>
>>> # Basic planner interface usage
>>> planner.name
'ExamplePlanner'
>>>
>>> # Action selection from belief
>>> initial_belief = get_initial_belief(tiger, n_particles=10)
>>> actions, run_data = planner.action(initial_belief)
>>>
>>> # Planner space information
>>> space_info = DiscreteActionSequencesPlanner.get_space_info()
>>> space_info.action_space.name
'DISCRETE'
action(belief)[source]

Select action(s) based on the current belief state.

This is the core method that implements the policy’s decision-making logic. It takes a belief state and returns the selected action(s) along with execution information and performance metrics.

Parameters:

belief (Belief) – Current belief state representing uncertainty over states

Returns:

  • List of selected actions (typically single action, but supports multiple)

  • PolicyRunData with execution metrics and performance information

Return type:

Tuple[List[Any], PolicyRunData]

Note

Subclasses must implement this method with their specific planning or decision-making algorithm.

estimate_return(action_sequence, belief)[source]
Return type:

float

Parameters:
classmethod get_info_variable_names()[source]

Get names of policy info variables.

Discrete action sequences planner does not produce any info variables.

Return type:

List[str]

Returns:

Empty list as this planner produces no info variables

classmethod get_space_info()[source]

Get space type requirements for this policy class.

This class method specifies what types of action and observation spaces this policy implementation can handle, enabling compatibility checking with environments.

Return type:

PolicySpaceInfo

Returns:

PolicySpaceInfo specifying required action and observation space types

Note

Subclasses must implement this method to declare their space compatibility. This is used for validation when pairing policies with environments.

search(belief)[source]
Return type:

Any

Parameters:

belief (Belief)

class POMDPPlanners.planners.ICVaRSparseSampling(environment, branching_factor, depth, alpha, name='ICVaRSparseSampling')[source]

Bases: SparseSamplingDiscreteActionsPlanner

Risk-sensitive sparse sampling planner using CVaR for value backups.

This planner extends the standard sparse sampling algorithm by replacing the expected value (mean) in Q-value computation with the Conditional Value at Risk (CVaR). CVaR focuses on the worst-alpha fraction of outcomes, making the planner risk-sensitive.

The standard Q-value update uses:

Q = immediate_cost + gamma * mean(child_v_values)

The ICVaR variant replaces this with:

Q = immediate_cost + gamma * CVaR_alpha(child_v_values)

Parameters:
alpha

CVaR confidence level (0 < alpha <= 1). Lower alpha means more risk-sensitive (focuses on worse outcomes). alpha=1.0 recovers the standard expected value.

Example

>>> import numpy as np
>>> from POMDPPlanners.environments.tiger_pomdp import TigerPOMDP
>>> from POMDPPlanners.core.belief import get_initial_belief
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Create environment and risk-sensitive planner
>>> tiger = TigerPOMDP(discount_factor=0.95)
>>> planner = ICVaRSparseSampling(
...     environment=tiger,
...     branching_factor=2,
...     depth=2,
...     alpha=0.3,
...     name="ICVaRPlanner"
... )
>>>
>>> # Basic planner interface usage
>>> planner.name
'ICVaRPlanner'
>>> planner.alpha
0.3
>>>
>>> # Action selection from belief
>>> initial_belief = get_initial_belief(tiger, n_particles=10)
>>> actions, run_data = planner.action(initial_belief)
>>>
>>> # Planner space information
>>> space_info = ICVaRSparseSampling.get_space_info()
>>> space_info.action_space.name
'DISCRETE'
classmethod get_info_variable_names()[source]

Get names of policy info variables.

Return type:

List[str]

Returns:

Empty list as this planner produces no info variables.

classmethod get_space_info()[source]

Get space type requirements for this policy class.

This class method specifies what types of action and observation spaces this policy implementation can handle, enabling compatibility checking with environments.

Return type:

PolicySpaceInfo

Returns:

PolicySpaceInfo specifying required action and observation space types

Note

Subclasses must implement this method to declare their space compatibility. This is used for validation when pairing policies with environments.

class POMDPPlanners.planners.ICVaR_PFT_DPW(environment, name, depth, action_sampler, discount_factor=0.95, time_out_in_seconds=None, n_simulations=None, alpha=0.1, delta=0.1, belief_child_num=5, min_immediate_cost=0.0, max_immediate_cost=1.0, min_visit_count_per_action=1, exploration_constant=1.0, k_a=1.0, alpha_a=0.5, k_o=1.0, alpha_o=0.5, entropy_weight=0.0, visit_count_penalty=0.0)[source]

Bases: PathSimulationPolicyCostSetting

Parameters:
classmethod get_space_info()[source]

Get information about the policy’s space.

Return type:

PolicySpaceInfo

is_terminal_belief(belief)[source]

Checks if all paricles are terminal states.

Return type:

bool

Parameters:

belief (Belief)

update_nodes(belief_node, action_node)[source]
Parameters:
class POMDPPlanners.planners.ICVaR_POMCPOW(environment, discount_factor, depth, exploration_constant, k_o, k_a, alpha_o, alpha_a, min_immediate_cost, max_immediate_cost, min_visit_count_per_action, delta, name, action_sampler, time_out_in_seconds=None, n_simulations=None, alpha=0.05, min_samples_per_node=10, log_path=None, debug=False, visit_count_penalty=0.0)[source]

Bases: PathSimulationPolicyCostSetting

Parameters:
classmethod get_space_info()[source]

Get information about action and observation spaces.

POMCPOW supports mixed-type spaces through its action sampler interface, allowing it to handle both discrete and continuous action spaces.

Return type:

PolicySpaceInfo

Returns:

PolicySpaceInfo with MIXED space types for both actions and observations

class POMDPPlanners.planners.PFT_DPW(environment, discount_factor, depth, name, action_sampler, k_a=1.0, alpha_a=0.5, k_o=1.0, alpha_o=0.5, exploration_constant=1.0, time_out_in_seconds=None, n_simulations=None, min_visit_count_per_action=1, log_path=None, debug=False, use_queue_logger=False)[source]

Bases: DoubleProgressiveWideningMCTSPolicy

PFT-DPW (Progressive Function Transfer with Double Progressive Widening) Algorithm.

PFT-DPW is a Monte Carlo Tree Search algorithm designed for continuous action spaces in POMDPs. It uses progressive widening to gradually expand both the action and observation spaces during tree search, enabling effective planning in problems with continuous or very large discrete action spaces.

Algorithm Overview: The algorithm operates through progressive expansion: 1. Action Progressive Widening: Gradually adds new actions based on visit counts 2. Observation Progressive Widening: Gradually adds new observation branches 3. UCB1 Exploration: Balances exploration of new actions with exploitation 4. Random Rollouts: Estimates values from leaf nodes using random simulations

Key Features: - Handles continuous action spaces through adaptive sampling - Uses UCB1-style exploration with progressive expansion - Supports custom action samplers for domain-specific action generation - Balances exploration of new actions with exploitation of promising ones - Performs random rollouts from leaf nodes for value estimation

Progressive Widening Parameters: - k_a, alpha_a: Control action space expansion (more actions added as visit_count^alpha_a) - k_o, alpha_o: Control observation space expansion - exploration_constant: UCB1 exploration parameter (higher = more exploration)

Parameters:
environment

The POMDP environment to plan for

discount_factor

Discount factor for future rewards (0 < γ ≤ 1)

depth

Maximum search depth for tree expansion

action_sampler

Strategy for sampling new actions during progressive widening

k_a, alpha_a

Action progressive widening parameters

k_o, alpha_o

Observation progressive widening parameters

exploration_constant

UCB1 exploration parameter

n_simulations

Number of simulations to run (mutually exclusive with timeout)

time_out_in_seconds

Time limit for planning (mutually exclusive with n_simulations)

Example

>>> import numpy as np
>>> from POMDPPlanners.environments.tiger_pomdp import TigerPOMDP
>>> from POMDPPlanners.core.belief import get_initial_belief
>>> from POMDPPlanners.utils.action_samplers import DiscreteActionSampler
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Create environment and planner
>>> tiger = TigerPOMDP(discount_factor=0.95)
>>> action_sampler = DiscreteActionSampler(tiger.get_actions())
>>> planner = PFT_DPW(
...     environment=tiger,
...     discount_factor=0.95,
...     depth=5,
...     name="ExamplePlanner",
...     action_sampler=action_sampler,
...     k_a=2.0,
...     alpha_a=0.5,
...     n_simulations=10
... )
>>>
>>> # Basic planner interface usage
>>> planner.name
'ExamplePlanner'
>>>
>>> # Action selection from belief
>>> initial_belief = get_initial_belief(tiger, n_particles=10)
>>> actions, run_data = planner.action(initial_belief)
>>>
>>> # Planner space information
>>> space_info = PFT_DPW.get_space_info()
>>> space_info.action_space.name
'MIXED'
sample_existing_belief_node(belief_node, action_node)[source]
Return type:

Tuple[BeliefNode, float]

Parameters:
class POMDPPlanners.planners.POMCP(environment, discount_factor, depth, exploration_constant, name, time_out_in_seconds=None, n_simulations=None, log_path=None, debug=False, use_queue_logger=False)[source]

Bases: PathSimulationPolicy

POMCP (Partially Observable Monte Carlo Planning) algorithm.

POMCP is a Monte Carlo Tree Search algorithm for POMDP planning that combines UCB1 action selection with particle filtering to handle continuous observation spaces. It builds a search tree through repeated simulations and provides theoretical convergence guarantees.

The algorithm uses UCB1 (Upper Confidence Bounds) to balance exploration and exploitation when selecting actions during tree search. It maintains belief states using particle filters and performs random rollouts to estimate values at leaf nodes.

Parameters:
environment

The POMDP environment to plan for

discount_factor

Discount factor for future rewards (0 < γ ≤ 1)

depth

Maximum search depth for tree expansion

exploration_constant

UCB1 exploration parameter (higher = more exploration)

timeout_in_seconds

Time limit for planning (mutually exclusive with n_simulations)

n_simulations

Number of simulations to run (mutually exclusive with timeout)

Note

In the original POMCP paper, the belief structure used was an unweighted particle belief that can be found in POMDPPlanners.core.belief.UnweightedParticleBelief. However, in this implementation, we keep the belief structure abstract to allow users to choose their preferred belief representation. In the usage example below, a weighted particle belief is used via the POMDPPlanners.core.belief.get_initial_belief() function.

Example

>>> import numpy as np
>>> from POMDPPlanners.environments.tiger_pomdp import TigerPOMDP
>>> from POMDPPlanners.core.belief import get_initial_belief
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Create environment and planner
>>> tiger = TigerPOMDP(discount_factor=0.95)
>>> planner = POMCP(
...     environment=tiger,
...     discount_factor=0.95,
...     depth=5,
...     exploration_constant=1.0,
...     name="ExamplePlanner",
...     n_simulations=10
... )
>>>
>>> # Basic planner interface usage
>>> planner.name
'ExamplePlanner'
>>>
>>> # Action selection from belief
>>> initial_belief = get_initial_belief(tiger, n_particles=10)
>>> actions, run_data = planner.action(initial_belief)
>>>
>>> # Planner space information
>>> space_info = POMCP.get_space_info()
>>> space_info.action_space.name
'DISCRETE'
get_explored_action_node(belief_node)[source]
Return type:

ActionNode

Parameters:

belief_node (BeliefNode)

classmethod get_space_info()[source]

Get space type requirements for this policy class.

This class method specifies what types of action and observation spaces this policy implementation can handle, enabling compatibility checking with environments.

Return type:

PolicySpaceInfo

Returns:

PolicySpaceInfo specifying required action and observation space types

Note

Subclasses must implement this method to declare their space compatibility. This is used for validation when pairing policies with environments.

random_rollout(state, depth)[source]
Return type:

float

Parameters:
update_nodes(belief_node, action_node, return_sample, state)[source]
Parameters:
class POMDPPlanners.planners.POMCPOW(environment, discount_factor, depth, exploration_constant, k_o, k_a, alpha_o, alpha_a, name, action_sampler, time_out_in_seconds=None, n_simulations=None, min_visit_count_per_action=1, log_path=None, debug=False, use_queue_logger=False)[source]

Bases: DoubleProgressiveWideningMCTSPolicy

POMCPOW (Partially Observable Monte Carlo Planning with Optimistic Weights) Algorithm.

POMCPOW is an advanced Monte Carlo Tree Search algorithm for POMDP planning that extends POMCP with double progressive widening. It combines UCB1 action selection with progressive widening for both actions and observations, making it particularly effective for problems with large or continuous action spaces.

Algorithm Overview: The algorithm operates through double progressive expansion: 1. Action Progressive Widening: Gradually adds new actions based on visit counts and α_a 2. Observation Progressive Widening: Gradually adds new observation branches based on k_o and α_o 3. Weighted Particle Beliefs: Maintains weighted particle representations in observation nodes 4. UCB1 Exploration: Balances exploration of new actions with exploitation using UCB1 5. Random Rollouts: Estimates values from leaf nodes using random simulations

Key Features: - Handles continuous and discrete action spaces through ActionSampler interface - Uses double progressive widening to manage tree growth - Maintains weighted particle beliefs for efficient belief approximation - Balances exploration of new actions with exploitation of promising ones - Supports configurable progressive widening parameters

Progressive Widening Parameters: - k_a, α_a: Control action progressive widening (new actions added when ⌊n^α_a⌋ > ⌊(n-1)^α_a⌋) - k_o, α_o: Control observation progressive widening (max observations ≤ k_o * n^α_o)

Parameters:
environment

The POMDP environment to plan for

discount_factor

Discount factor for future rewards (0 < γ ≤ 1)

depth

Maximum search depth for tree expansion

exploration_constant

UCB1 exploration parameter (higher = more exploration)

k_o

Observation progressive widening coefficient

k_a

Action progressive widening coefficient

alpha_o

Observation progressive widening exponent

alpha_a

Action progressive widening exponent

action_sampler

Action sampling strategy for progressive widening

time_out_in_seconds

Time limit for planning (mutually exclusive with n_simulations)

n_simulations

Number of simulations to run (mutually exclusive with timeout)

log_path

Optional path for logging policy execution

debug

Enable debug logging if True

Example

>>> import numpy as np
>>> from POMDPPlanners.environments.tiger_pomdp import TigerPOMDP
>>> from POMDPPlanners.core.belief import get_initial_belief
>>> from POMDPPlanners.utils.action_samplers import DiscreteActionSampler
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Create environment and planner
>>> tiger = TigerPOMDP(discount_factor=0.95)
>>> action_sampler = DiscreteActionSampler(tiger.get_actions())
>>> planner = POMCPOW(
...     environment=tiger,
...     discount_factor=0.95,
...     depth=5,
...     exploration_constant=1.0,
...     k_o=3.0,
...     k_a=3.0,
...     alpha_o=0.5,
...     alpha_a=0.5,
...     action_sampler=action_sampler,
...     n_simulations=10,
...     name="ExamplePlanner"
... )
>>>
>>> # Basic planner interface usage
>>> planner.name
'ExamplePlanner'
>>>
>>> # Action selection from belief
>>> initial_belief = get_initial_belief(tiger, n_particles=10)
>>> actions, run_data = planner.action(initial_belief)
>>>
>>> # Planner space information
>>> space_info = POMCPOW.get_space_info()
>>> space_info.action_space.name
'MIXED'
class POMDPPlanners.planners.POMCP_DPW(environment, discount_factor, depth, exploration_constant, k_o, k_a, alpha_o, alpha_a, name, action_sampler, time_out_in_seconds=None, n_simulations=None, min_visit_count_per_action=1, log_path=None, debug=False, use_queue_logger=False)[source]

Bases: DoubleProgressiveWideningMCTSPolicy

POMCP_DPW (Partially Observable Monte Carlo Planning with Double Progressive Widening) Algorithm.

POMCP_DPW is an advanced Monte Carlo Tree Search algorithm for POMDP planning that extends POMCP with double progressive widening. It combines UCB1 action selection with progressive widening for both actions and observations, making it particularly effective for problems with large or continuous action spaces.

Algorithm Overview: The algorithm operates through double progressive expansion: 1. Action Progressive Widening: Gradually adds new actions based on visit counts and α_a 2. Observation Progressive Widening: Gradually adds new observation branches based on k_o and α_o 3. Unweighted Particle Beliefs: Maintains unweighted particle representations in observation nodes (POMCP tradition) 4. UCB1 Exploration: Balances exploration of new actions with exploitation using UCB1 5. Random Rollouts: Estimates values from leaf nodes using random simulations

Key Features: - Handles continuous and discrete action spaces through ActionSampler interface - Uses double progressive widening to manage tree growth - Maintains unweighted particle beliefs for efficient belief approximation (following POMCP tradition) - Balances exploration of new actions with exploitation of promising ones - Supports configurable progressive widening parameters

Progressive Widening Parameters: - k_a, α_a: Control action progressive widening (new actions added when ⌊n^α_a⌋ > ⌊(n-1)^α_a⌋) - k_o, α_o: Control observation progressive widening (max observations ≤ k_o * n^α_o)

Parameters:
environment

The POMDP environment to plan for

discount_factor

Discount factor for future rewards (0 < γ ≤ 1)

depth

Maximum search depth for tree expansion

exploration_constant

UCB1 exploration parameter (higher = more exploration)

k_o

Observation progressive widening coefficient

k_a

Action progressive widening coefficient

alpha_o

Observation progressive widening exponent

alpha_a

Action progressive widening exponent

action_sampler

Action sampling strategy for progressive widening

time_out_in_seconds

Time limit for planning (mutually exclusive with n_simulations)

n_simulations

Number of simulations to run (mutually exclusive with timeout)

log_path

Optional path for logging policy execution

debug

Enable debug logging if True

Example

>>> import numpy as np
>>> from POMDPPlanners.environments.tiger_pomdp import TigerPOMDP
>>> from POMDPPlanners.core.belief import get_initial_belief
>>> from POMDPPlanners.utils.action_samplers import DiscreteActionSampler
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Create environment and planner
>>> tiger = TigerPOMDP(discount_factor=0.95)
>>> action_sampler = DiscreteActionSampler(tiger.get_actions())
>>> planner = POMCP_DPW(
...     environment=tiger,
...     discount_factor=0.95,
...     depth=5,
...     exploration_constant=1.0,
...     k_o=3.0,
...     k_a=3.0,
...     alpha_o=0.5,
...     alpha_a=0.5,
...     action_sampler=action_sampler,
...     n_simulations=10,
...     name="ExamplePlanner"
... )
>>>
>>> # Basic planner interface usage
>>> planner.name
'ExamplePlanner'
>>>
>>> # Action selection from belief
>>> initial_belief = get_initial_belief(tiger, n_particles=10)
>>> actions, run_data = planner.action(initial_belief)
>>>
>>> # Planner space information
>>> space_info = POMCP_DPW.get_space_info()
>>> space_info.action_space.name
'MIXED'
class POMDPPlanners.planners.PathSimulationPolicy(environment, discount_factor, name, n_simulations, time_out_in_seconds, action_sampler=None, log_path=None, debug=False, use_queue_logger=False)[source]

Bases: Policy

Abstract base class for Monte Carlo Tree Search algorithms in POMDP planning.

This class provides a common framework for MCTS-based POMDP planners that build search trees through path simulations. It handles the core tree construction loop and provides hooks for algorithm-specific simulation strategies.

The class supports two termination criteria: 1. Simulation count: Run a fixed number of MCTS simulations 2. Time limit: Run simulations for a specified time duration

Key Components: - Tree construction with configurable termination criteria - Automatic tree metrics collection for analysis - Action selection from the constructed search tree - Abstract simulation interface for algorithm specialization

Subclass Responsibilities: Concrete implementations must provide the _simulate_path method that defines how individual MCTS simulations are performed, including: - Node expansion strategies - Action selection during tree traversal - Value estimation and backpropagation

Parameters:
environment

The POMDP environment for planning

discount_factor

Discount factor for future rewards (0 < γ ≤ 1)

n_simulations

Number of MCTS simulations to run (mutually exclusive with timeout)

time_out_in_seconds

Time limit for planning in seconds (mutually exclusive with n_simulations)

name

Identifier for the policy instance

Algorithm Integration: This base class is used by several MCTS algorithms in the framework: - POMCP: Uses UCB1 for action selection with particle filtering - PFT-DPW: Implements progressive widening for continuous action spaces - Sparse-PFT: Combines sparse sampling with progressive widening

The common interface allows easy comparison and benchmarking of different MCTS variants while sharing the core tree construction infrastructure.

action(belief)[source]

Select action(s) based on the current belief state.

This is the core method that implements the policy’s decision-making logic. It takes a belief state and returns the selected action(s) along with execution information and performance metrics.

Parameters:

belief (Belief) – Current belief state representing uncertainty over states

Returns:

  • List of selected actions (typically single action, but supports multiple)

  • PolicyRunData with execution metrics and performance information

Return type:

Tuple[List[Any], PolicyRunData]

Note

Subclasses must implement this method with their specific planning or decision-making algorithm.

classmethod get_info_variable_names()[source]

Get names of tree metric info variables produced by path simulation policies.

Return type:

List[str]

Returns:

List of metric names from tree statistics

class POMDPPlanners.planners.PathSimulationPolicyCostSetting(environment, discount_factor, name, action_sampler=None, n_simulations=None, time_out_in_seconds=None, log_path=None, debug=False)[source]

Bases: PathSimulationPolicy

Parameters:
action(belief)[source]

Select action(s) based on the current belief state.

This is the core method that implements the policy’s decision-making logic. It takes a belief state and returns the selected action(s) along with execution information and performance metrics.

Parameters:

belief (Belief) – Current belief state representing uncertainty over states

Returns:

  • List of selected actions (typically single action, but supports multiple)

  • PolicyRunData with execution metrics and performance information

Return type:

Tuple[List[Any], PolicyRunData]

Note

Subclasses must implement this method with their specific planning or decision-making algorithm.

class POMDPPlanners.planners.SparsePFT(environment, discount_factor, gamma, depth, c_ucb, beta_ucb, belief_child_num, time_out_in_seconds=None, n_simulations=None, name='SparsePFT', log_path=None, debug=False, use_queue_logger=False)[source]

Bases: PathSimulationPolicy

Sparse-PFT (Sparse Progressive Function Transfer) Algorithm for POMDP Planning.

Sparse-PFT combines the efficiency of sparse sampling with progressive function transfer and Monte Carlo Tree Search for POMDP planning. It addresses the curse of dimensionality by limiting the number of children per belief-action node while using sophisticated exploration strategies to guide tree construction.

Algorithm Overview: The algorithm operates by: 1. Sparse Branching: Limits each action node to a fixed number of belief children 2. Progressive Selection: Uses modified UCB to balance exploration and exploitation 3. Adaptive Sampling: Samples existing children or generates new ones based on capacity 4. Random Rollouts: Estimates values from leaf nodes using random simulations

Key Features: - Sparse Tree Structure: Controls memory usage by limiting belief children per action - Enhanced UCB: Uses modified UCB formula with beta parameter for better exploration - Efficient Sampling: Balances between exploring existing branches and generating new ones - Discrete Actions: Optimized for discrete action spaces with discrete or mixed observations - Terminal State Handling: Properly detects when all particles reach terminal states

Mathematical Foundation: The algorithm uses a modified UCB selection criterion:

UCB(s,a) = Q(s,a) + c_ucb * beta_ucb * N(s) * (1/√N(s,a))

Where: - Q(s,a): Action-value estimate - c_ucb: Base exploration constant - beta_ucb: Additional exploration parameter - N(s): Visit count of belief node - N(s,a): Visit count of action node

Parameters:
environment

The discrete-action POMDP environment for planning

discount_factor

Discount factor for future rewards (0 < γ ≤ 1)

gamma

Alternative discount parameter for value computation

depth

Maximum search depth for tree expansion

c_ucb

Base exploration constant for UCB formula

beta_ucb

Additional exploration parameter for enhanced UCB

belief_child_num

Maximum number of belief children per action node

n_simulations

Number of MCTS simulations to perform

Example

>>> import numpy as np
>>> from POMDPPlanners.environments.tiger_pomdp import TigerPOMDP
>>> from POMDPPlanners.core.belief import get_initial_belief
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Create environment and planner
>>> tiger = TigerPOMDP(discount_factor=0.95)
>>> planner = SparsePFT(
...     environment=tiger,
...     discount_factor=0.95,
...     gamma=0.95,
...     depth=5,
...     c_ucb=1.0,
...     beta_ucb=2.0,
...     belief_child_num=3,
...     n_simulations=10,
...     name="ExamplePlanner"
... )
>>>
>>> # Basic planner interface usage
>>> planner.name
'ExamplePlanner'
>>>
>>> # Action selection from belief
>>> initial_belief = get_initial_belief(tiger, n_particles=10)
>>> actions, run_data = planner.action(initial_belief)
>>>
>>> # Planner space information
>>> space_info = SparsePFT.get_space_info()
>>> space_info.action_space.name
'DISCRETE'
get_explored_action_node(belief_node)[source]
Return type:

ActionNode

Parameters:

belief_node (BeliefNode)

classmethod get_space_info()[source]

Get space type requirements for this policy class.

This class method specifies what types of action and observation spaces this policy implementation can handle, enabling compatibility checking with environments.

Return type:

PolicySpaceInfo

Returns:

PolicySpaceInfo specifying required action and observation space types

Note

Subclasses must implement this method to declare their space compatibility. This is used for validation when pairing policies with environments.

random_rollout(state, depth)[source]
Return type:

float

Parameters:
update_nodes(belief_node, action_node, return_sample)[source]
Parameters:
class POMDPPlanners.planners.SparseSamplingDiscreteActionsPlanner(environment, branching_factor, depth, name='SparseSamplingDiscreteActionsPlanner')[source]

Bases: BaseSparseSamplingDiscreteActionsPlanner

Standard implementation of sparse sampling for POMDP planning.

This concrete implementation of sparse sampling uses standard value updates: - Q-values for actions are computed as immediate cost plus discounted future value - V-values for beliefs are computed as the minimum Q-value over actions (cost formulation) - Leaf nodes use only immediate cost estimates

The algorithm provides theoretical guarantees: with probability 1-δ, the computed policy is ε-optimal, where ε decreases with increasing depth and branching factor.

Example

>>> import numpy as np
>>> from POMDPPlanners.environments.tiger_pomdp import TigerPOMDP
>>> from POMDPPlanners.core.belief import get_initial_belief
>>> np.random.seed(42)  # For reproducible results
>>>
>>> # Create environment and planner
>>> tiger = TigerPOMDP(discount_factor=0.95)
>>> planner = SparseSamplingDiscreteActionsPlanner(
...     environment=tiger,
...     branching_factor=2,
...     depth=2,
...     name="ExamplePlanner"
... )
>>>
>>> # Basic planner interface usage
>>> planner.name
'ExamplePlanner'
>>>
>>> # Action selection from belief
>>> initial_belief = get_initial_belief(tiger, n_particles=10)
>>> actions, run_data = planner.action(initial_belief)
>>>
>>> # Planner space information
>>> space_info = SparseSamplingDiscreteActionsPlanner.get_space_info()
>>> space_info.action_space.name
'DISCRETE'
Parameters:

Subpackages