POMDPPlanners.planners.planners_utils package

Submodules

POMDPPlanners.planners.planners_utils.cvar_exploration module

POMDPPlanners.planners.planners_utils.cvar_exploration.get_explored_action_node(belief_node, min_immediate_cost, max_immediate_cost, depth, max_depth, gamma, exploration_constant, min_visit_count_per_action, alpha, delta, visit_count_penalty=0.0)[source]
Return type:

ActionNode

Parameters:

POMDPPlanners.planners.planners_utils.cvar_progressive_widening module

POMDPPlanners.planners.planners_utils.cvar_progressive_widening.cvar_action_progressive_widening(belief_node, alpha_a, action_sampler, exploration_constant, k_a, min_immediate_cost, max_immediate_cost, depth, max_depth, gamma, min_visit_count_per_action, alpha, delta, discrete_actions=False, visit_count_penalty=0.0)[source]
Return type:

ActionNode

Parameters:

POMDPPlanners.planners.planners_utils.dpw module

class POMDPPlanners.planners.planners_utils.dpw.ActionSampler[source]

Bases: ABC

Abstract base class for action sampling strategies in PFT-DPW.

Action samplers provide domain-specific strategies for generating new actions during progressive widening. This allows PFT-DPW to work with continuous or large discrete action spaces by intelligently sampling promising actions.

The ActionSampler interface enables flexible action space exploration by allowing custom sampling strategies that can incorporate domain knowledge, belief state information, or specialized sampling distributions.

The class is serializable and can be safely pickled/unpickled for distributed computing, caching, or saving/loading configurations.

Examples

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> from POMDPPlanners.planners.planners_utils.dpw import ActionSampler
>>>
>>> class ContinuousControlSampler(ActionSampler):
...     def __init__(self, action_bounds=(-1.0, 1.0), action_dim=2):
...         self.action_bounds = action_bounds
...         self.action_dim = action_dim
...
...     def sample(self, belief_node: Optional[BeliefNode] = None):
...         # Sample uniformly from action space
...         low, high = self.action_bounds
...         return np.random.uniform(low, high, size=self.action_dim)
>>>
>>> # Usage with PFT-DPW
>>> sampler = ContinuousControlSampler(action_bounds=(-2.0, 2.0), action_dim=4)
>>> action = sampler.sample()  # Returns 4D action vector
>>>
>>> # Serialization works automatically
>>> import pickle
>>> serialized = pickle.dumps(sampler)
>>> restored_sampler = pickle.loads(serialized)

Discrete action sampler with custom distribution:

import numpy as np
from POMDPPlanners.planners.planners_utils.dpw import ActionSampler

class WeightedDiscreteActionSampler(ActionSampler):
    def __init__(self, actions, probabilities=None):
        self.actions = actions
        # Use uniform probabilities if none provided
        if probabilities is None:
            self.probabilities = np.ones(len(actions)) / len(actions)
        else:
            self.probabilities = np.array(probabilities)
            self.probabilities /= np.sum(self.probabilities)  # Normalize

    def sample(self, belief_node: Optional[BeliefNode] = None):
        return np.random.choice(self.actions, p=self.probabilities)

# Prefer certain actions over others
actions = ["up", "down", "left", "right", "stay"]
probs = [0.2, 0.2, 0.2, 0.2, 0.2]  # Uniform
sampler = WeightedDiscreteActionSampler(actions, probs)

Belief-informed action sampler:

import numpy as np
from POMDPPlanners.planners.planners_utils.dpw import ActionSampler

class AdaptiveActionSampler(ActionSampler):
    def __init__(self, base_actions, exploration_noise=0.1):
        self.base_actions = base_actions
        self.exploration_noise = exploration_noise

    def sample(self, belief_node: Optional[BeliefNode] = None):
        if belief_node is not None and belief_node.visit_count > 10:
            # Use belief state to inform sampling
            best_action = self._get_best_action_from_belief(belief_node)
            # Add exploration noise
            noise = np.random.normal(0, self.exploration_noise, len(best_action))
            return best_action + noise
        else:
            # Random exploration for new nodes
            return np.random.choice(self.base_actions)

    def _get_best_action_from_belief(self, belief_node):
        # Simplified: return action from best child
        if belief_node.children:
            best_child = max(belief_node.children, key=lambda x: x.q_value)
            return best_child.action
        return np.random.choice(self.base_actions)

sampler = AdaptiveActionSampler([0, 1, 2, 3], exploration_noise=0.05)

Multi-modal action sampler for hybrid control:

import numpy as np
from POMDPPlanners.planners.planners_utils.dpw import ActionSampler

class MultiModalActionSampler(ActionSampler):
    def __init__(self, discrete_actions, continuous_bounds, mode_prob=0.5):
        self.discrete_actions = discrete_actions
        self.continuous_bounds = continuous_bounds
        self.mode_prob = mode_prob  # Probability of discrete vs continuous

    def sample(self, belief_node=None):
        if np.random.random() < self.mode_prob:
            # Sample discrete action
            return {"type": "discrete", "action": np.random.choice(self.discrete_actions)}
        else:
            # Sample continuous action
            low, high = self.continuous_bounds
            continuous_action = np.random.uniform(low, high, size=2)
            return {"type": "continuous", "action": continuous_action}

# For environments with both discrete and continuous actions
discrete_acts = ["stop", "emergency_brake", "lane_change"]
continuous_bounds = (-5.0, 5.0)  # Steering/acceleration range
sampler = MultiModalActionSampler(discrete_acts, continuous_bounds)

Goal-directed action sampler:

import numpy as np
from POMDPPlanners.planners.planners_utils.dpw import ActionSampler

class GoalDirectedActionSampler(ActionSampler):
    def __init__(self, goal_position, action_magnitude=1.0, goal_bias=0.7):
        self.goal_position = np.array(goal_position)
        self.action_magnitude = action_magnitude
        self.goal_bias = goal_bias

    def sample(self, belief_node=None):
        if np.random.random() < self.goal_bias and belief_node is not None:
            # Sample action towards goal based on current belief
            current_position = self._estimate_position(belief_node)
            direction = self.goal_position - current_position
            if np.linalg.norm(direction) > 0:
                direction = direction / np.linalg.norm(direction)
                return direction * self.action_magnitude

        # Random exploration
        angle = np.random.uniform(0, 2 * np.pi)
        return self.action_magnitude * np.array([np.cos(angle), np.sin(angle)])

    def _estimate_position(self, belief_node):
        # Simplified: use mean of particles in belief
        if hasattr(belief_node.belief, 'particles'):
            positions = [p[:2] for p in belief_node.belief.particles]  # First 2D as position
            return np.mean(positions, axis=0)
        return np.array([0.0, 0.0])

# Navigation towards specific goal
goal = [10.0, 5.0]
sampler = GoalDirectedActionSampler(goal, action_magnitude=2.0, goal_bias=0.8)
abstractmethod sample(belief_node=None)[source]

Sample a new action for progressive widening.

Parameters:

belief_node (Optional[BeliefNode]) – Optional belief node context for informed sampling

Return type:

Any

Returns:

A sampled action compatible with the environment’s action space

POMDPPlanners.planners.planners_utils.dpw.action_progressive_widening(belief_node, alpha_a, action_sampler, exploration_constant, k_a, min_visit_count_per_action=1)[source]

Select or add action using progressive widening strategy.

Progressive widening gradually expands the action space based on visit counts. New actions are added when ⌊n^α_a⌋ > ⌊(n-1)^α_a⌋, where n is the visit count. Otherwise, existing actions are selected using UCB1.

The progressive widening mechanism balances exploration and exploitation by: 1. Initially adding new actions frequently (exploration phase) 2. Gradually reducing the rate of new actions as visit count increases 3. Eventually relying primarily on UCB1 selection (exploitation phase)

Parameters:
  • belief_node (BeliefNode) – Current belief node to select action from

  • alpha_a (float) – Progressive widening exponent (0 < alpha_a ≤ 1). Lower values create fewer actions, higher values create more actions.

  • action_sampler (ActionSampler) – Action sampler for generating new actions

  • exploration_constant (float) – UCB1 exploration constant for existing actions

  • k_a (float)

  • min_visit_count_per_action (int)

Return type:

ActionNode

Returns:

Selected or newly created action node

Examples

Basic usage with continuous action sampler:

import numpy as np
from POMDPPlanners.planners.planners_utils.dpw import (
    ActionSampler, action_progressive_widening
)
from POMDPPlanners.core.tree import BeliefNode
from POMDPPlanners.core.belief import WeightedParticleBelief

# Create action sampler
class SimpleActionSampler(ActionSampler):
    def sample(self, belief_node=None):
        return np.random.uniform(-1, 1, size=2)

# Create belief node
particles = [[0.0, 0.0], [1.0, 1.0]]
log_weights = np.log(np.array([0.5, 0.5]))
belief = WeightedParticleBelief(particles, log_weights)
belief_node = BeliefNode(belief=belief)

# Progressive widening
action_sampler = SimpleActionSampler()
action_node = action_progressive_widening(
    belief_node=belief_node,
    alpha_a=0.5,  # Moderate exploration
    action_sampler=action_sampler,
    exploration_constant=1.0
)

Comparing different alpha_a values:

# Conservative exploration (fewer new actions)
conservative_action = action_progressive_widening(
    belief_node=belief_node,
    alpha_a=0.25,  # Low alpha = fewer actions
    action_sampler=action_sampler,
    exploration_constant=1.0
)

# Aggressive exploration (more new actions)
aggressive_action = action_progressive_widening(
    belief_node=belief_node,
    alpha_a=0.75,  # High alpha = more actions
    action_sampler=action_sampler,
    exploration_constant=1.0
)

Progressive widening in a loop (simulating MCTS):

import numpy as np
from POMDPPlanners.planners.planners_utils.dpw import ActionSampler, action_progressive_widening
from POMDPPlanners.core.tree import BeliefNode, ActionNode
from POMDPPlanners.core.belief import WeightedParticleBelief

# Setup
class DiscreteActionSampler(ActionSampler):
    def __init__(self, actions):
        self.actions = actions

    def sample(self, belief_node=None):
        return np.random.choice(self.actions)

particles = [[0], [1], [2]]
log_weights = np.log(np.array([1/3, 1/3, 1/3]))
belief = WeightedParticleBelief(particles, log_weights)
root_node = BeliefNode(belief=belief)

sampler = DiscreteActionSampler(['up', 'down', 'left', 'right'])

# Simulate multiple selections
for i in range(10):
    root_node.visit_count = i  # Simulate increasing visits
    action_node = action_progressive_widening(
        belief_node=root_node,
        alpha_a=0.5,
        action_sampler=sampler,
        exploration_constant=1.41  # sqrt(2)
    )
    print(f"Visit {i}: {len(root_node.children)} actions, selected {action_node.action}")

Tuning progressive widening parameters:

# Effect of alpha_a on action creation
visit_counts = range(1, 21)
alpha_values = [0.25, 0.5, 0.75, 1.0]

for alpha in alpha_values:
    action_counts = []
    for n in visit_counts:
        # Calculate when new actions would be created
        should_create = floor(n ** alpha) > floor((n-1) ** alpha) if n > 0 else True
        action_counts.append(1 if should_create else 0)

    total_new_actions = sum(action_counts)
    print(f"Alpha {alpha}: {total_new_actions} new actions in 20 visits")
POMDPPlanners.planners.planners_utils.dpw.ucb1_exploration(belief_node, exploration_constant)[source]

Select action from existing children using UCB1 criterion.

Uses Upper Confidence Bounds (UCB1) to balance exploration and exploitation: UCB1(a) = Q(a) + c * sqrt(log(N) / N(a)) where Q(a) is the average reward, N is parent visits, N(a) is action visits, and c is the exploration constant.

The UCB1 algorithm provides theoretical guarantees for multi-armed bandit problems and is widely used in Monte Carlo Tree Search algorithms. It automatically balances exploitation of good actions (high Q-values) with exploration of uncertain actions (low visit counts).

Parameters:
  • belief_node (BeliefNode) – Belief node with existing action children

  • exploration_constant (float) – Controls exploration vs exploitation trade-off. Higher values favor exploration, lower values favor exploitation. Common values: √2 ≈ 1.41 (theoretical optimum), 0.5-2.0 (practical range)

Return type:

ActionNode

Returns:

Action node with highest UCB1 value

Examples

Basic UCB1 action selection:

import numpy as np
from POMDPPlanners.planners.planners_utils.dpw import ucb1_exploration
from POMDPPlanners.core.tree import BeliefNode, ActionNode
from POMDPPlanners.core.belief import WeightedParticleBelief

# Create belief node with action children
particles = [[0.0], [1.0]]
log_weights = np.log(np.array([0.5, 0.5]))
belief = WeightedParticleBelief(particles, log_weights)
belief_node = BeliefNode(belief=belief)
belief_node.visit_count = 100

# Add action nodes with different Q-values and visit counts
actions_data = [
    {"action": "up", "q_value": 0.8, "visits": 30},
    {"action": "down", "q_value": 0.6, "visits": 20},
    {"action": "left", "q_value": 0.9, "visits": 40},
    {"action": "right", "q_value": 0.4, "visits": 10}
]

for data in actions_data:
    action_node = ActionNode(action=data["action"], parent=belief_node)
    action_node.q_value = data["q_value"]
    action_node.visit_count = data["visits"]

# Select action using UCB1
selected_action = ucb1_exploration(
    belief_node=belief_node,
    exploration_constant=1.41  # sqrt(2)
)
print(f"Selected action: {selected_action.action}")

Comparing exploration constants:

# Low exploration (favor exploitation)
conservative_action = ucb1_exploration(
    belief_node=belief_node,
    exploration_constant=0.1
)

# High exploration (favor exploration)
exploratory_action = ucb1_exploration(
    belief_node=belief_node,
    exploration_constant=3.0
)

# Balanced approach (theoretical optimum)
balanced_action = ucb1_exploration(
    belief_node=belief_node,
    exploration_constant=1.41  # sqrt(2)
)

Manual UCB1 calculation and verification:

import numpy as np
from POMDPPlanners.planners.planners_utils.dpw import ucb1_exploration

# Calculate UCB1 values manually
exploration_constant = 1.0
ucb1_values = []

for child in belief_node.children:
    exploration_term = exploration_constant * np.sqrt(
        np.log(belief_node.visit_count) / child.visit_count
    )
    ucb1 = child.q_value + exploration_term
    ucb1_values.append(ucb1)
    print(f"Action {child.action}: Q={child.q_value:.2f}, "
          f"exploration={exploration_term:.3f}, UCB1={ucb1:.3f}")

# Verify our function selects the highest UCB1
expected_best_idx = np.argmax(ucb1_values)
selected_action = ucb1_exploration(belief_node, exploration_constant)
actual_best_idx = belief_node.children.index(selected_action)

assert expected_best_idx == actual_best_idx, "UCB1 selection mismatch"

UCB1 in dynamic scenarios:

import numpy as np
from POMDPPlanners.planners.planners_utils.dpw import ucb1_exploration

# Simulate how UCB1 selection changes over time
belief_node.visit_count = 1

for round_num in range(1, 11):
    belief_node.visit_count = round_num * 10

    # Select action
    selected = ucb1_exploration(belief_node, exploration_constant=1.41)

    # Update the selected action (simulate learning)
    selected.visit_count += 1
    selected.q_value += (np.random.normal(0.5, 0.1) - selected.q_value) / selected.visit_count

    print(f"Round {round_num}: Selected {selected.action}, "
          f"Q={selected.q_value:.3f}, visits={selected.visit_count}")

Exploration vs exploitation analysis:

# Create scenario with clear best action vs uncertain actions
exploration_constants = [0.1, 0.5, 1.0, 1.41, 2.0, 5.0]
selection_counts = {c: {"up": 0, "down": 0, "left": 0, "right": 0} for c in exploration_constants}

for exploration_c in exploration_constants:
    # Run multiple selections
    for _ in range(100):
        selected = ucb1_exploration(belief_node, exploration_c)
        selection_counts[exploration_c][selected.action] += 1

    print(f"Exploration constant {exploration_c}:")
    for action, count in selection_counts[exploration_c].items():
        print(f"  {action}: {count}% selections")

UCB1 with confidence intervals:

import math
from POMDPPlanners.planners.planners_utils.dpw import ucb1_exploration

# Calculate confidence intervals for each action
exploration_constant = 1.41
confidence_level = 0.95

for child in belief_node.children:
    # UCB1 upper confidence bound
    confidence_radius = exploration_constant * math.sqrt(
        math.log(belief_node.visit_count) / child.visit_count
    )

    lower_bound = child.q_value - confidence_radius
    upper_bound = child.q_value + confidence_radius

    print(f"Action {child.action}: "
          f"Q={child.q_value:.3f} ± {confidence_radius:.3f} "
          f"[{lower_bound:.3f}, {upper_bound:.3f}]")

# The selected action has the highest upper bound
selected = ucb1_exploration(belief_node, exploration_constant)
print(f"Selected: {selected.action} (highest upper confidence bound)")

POMDPPlanners.planners.planners_utils.path_simulations_policy module

class POMDPPlanners.planners.planners_utils.path_simulations_policy.DoubleProgressiveWideningMCTSPolicy(environment, discount_factor, depth, name, action_sampler, k_a, alpha_a, k_o, alpha_o, exploration_constant, min_visit_count_per_action=1, time_out_in_seconds=None, n_simulations=None, log_path=None, debug=False, use_queue_logger=False)[source]

Bases: PathSimulationPolicy

Abstract base class for MCTS planners using double progressive widening.

This base class provides common initialization, parameter validation, and attributes for MCTS planners that use double progressive widening (both action and observation progressive widening). Subclasses implement their own simulation strategies while sharing common parameters and validation logic.

Progressive Widening Overview: Double progressive widening controls tree growth by limiting how many actions and observations are added to the tree based on visit counts: - Action widening: New actions added when ⌊k_a * n^α_a⌋ increases - Observation widening: Max observations limited by ⌊k_o * n^α_o⌋

Common Progressive Widening Parameters: - k_a, alpha_a: Control action progressive widening - k_o, alpha_o: Control observation progressive widening - exploration_constant: UCB1 exploration parameter

Parameters:
depth

Maximum search depth for tree expansion

exploration_constant

UCB1 exploration parameter (c in UCB1 formula)

action_sampler

Action sampling strategy for progressive widening

k_o

Observation progressive widening coefficient (k_o > 0)

k_a

Action progressive widening coefficient (k_a > 0)

alpha_o

Observation progressive widening exponent (0 < α_o ≤ 1)

alpha_a

Action progressive widening exponent (0 < α_a ≤ 1)

Subclasses:
  • POMCP_DPW: Uses unweighted particle beliefs with double progressive widening

  • POMCPOW: Uses weighted particle beliefs with double progressive widening

  • PFT_DPW: Uses progressive function transfer with custom simulation strategy

Note

This is an abstract base class and cannot be instantiated directly. Subclasses must implement the _simulate_path method.

action_sampler: ActionSampler
classmethod get_space_info()[source]

Get information about action and observation spaces.

Default implementation returns MIXED space types, which is appropriate for most progressive widening MCTS planners that support both discrete and continuous action spaces through the action sampler interface.

Subclasses can override this method to specify different space requirements (e.g., PFT_DPW specifies CONTINUOUS action space).

Return type:

PolicySpaceInfo

Returns:

PolicySpaceInfo with MIXED space types for both actions and observations

class POMDPPlanners.planners.planners_utils.path_simulations_policy.PathSimulationPolicy(environment, discount_factor, name, n_simulations, time_out_in_seconds, action_sampler=None, log_path=None, debug=False, use_queue_logger=False)[source]

Bases: Policy

Abstract base class for Monte Carlo Tree Search algorithms in POMDP planning.

This class provides a common framework for MCTS-based POMDP planners that build search trees through path simulations. It handles the core tree construction loop and provides hooks for algorithm-specific simulation strategies.

The class supports two termination criteria: 1. Simulation count: Run a fixed number of MCTS simulations 2. Time limit: Run simulations for a specified time duration

Key Components: - Tree construction with configurable termination criteria - Automatic tree metrics collection for analysis - Action selection from the constructed search tree - Abstract simulation interface for algorithm specialization

Subclass Responsibilities: Concrete implementations must provide the _simulate_path method that defines how individual MCTS simulations are performed, including: - Node expansion strategies - Action selection during tree traversal - Value estimation and backpropagation

Parameters:
environment

The POMDP environment for planning

discount_factor

Discount factor for future rewards (0 < γ ≤ 1)

n_simulations

Number of MCTS simulations to run (mutually exclusive with timeout)

time_out_in_seconds

Time limit for planning in seconds (mutually exclusive with n_simulations)

name

Identifier for the policy instance

Algorithm Integration: This base class is used by several MCTS algorithms in the framework: - POMCP: Uses UCB1 for action selection with particle filtering - PFT-DPW: Implements progressive widening for continuous action spaces - Sparse-PFT: Combines sparse sampling with progressive widening

The common interface allows easy comparison and benchmarking of different MCTS variants while sharing the core tree construction infrastructure.

action(belief)[source]

Select action(s) based on the current belief state.

This is the core method that implements the policy’s decision-making logic. It takes a belief state and returns the selected action(s) along with execution information and performance metrics.

Parameters:

belief (Belief) – Current belief state representing uncertainty over states

Returns:

  • List of selected actions (typically single action, but supports multiple)

  • PolicyRunData with execution metrics and performance information

Return type:

Tuple[List[Any], PolicyRunData]

Note

Subclasses must implement this method with their specific planning or decision-making algorithm.

classmethod get_info_variable_names()[source]

Get names of tree metric info variables produced by path simulation policies.

Return type:

List[str]

Returns:

List of metric names from tree statistics

class POMDPPlanners.planners.planners_utils.path_simulations_policy.PathSimulationPolicyCostSetting(environment, discount_factor, name, action_sampler=None, n_simulations=None, time_out_in_seconds=None, log_path=None, debug=False)[source]

Bases: PathSimulationPolicy

Parameters:
action(belief)[source]

Select action(s) based on the current belief state.

This is the core method that implements the policy’s decision-making logic. It takes a belief state and returns the selected action(s) along with execution information and performance metrics.

Parameters:

belief (Belief) – Current belief state representing uncertainty over states

Returns:

  • List of selected actions (typically single action, but supports multiple)

  • PolicyRunData with execution metrics and performance information

Return type:

Tuple[List[Any], PolicyRunData]

Note

Subclasses must implement this method with their specific planning or decision-making algorithm.

POMDPPlanners.planners.planners_utils.rollout module

POMDPPlanners.planners.planners_utils.rollout.random_rollout_action_sampler(state, depth, action_sampler, environment, discount_factor, max_depth=10)[source]

Perform random rollout to estimate value from leaf node.

Rollout policy samples random actions using the action_sampler until reaching maximum depth or terminal state. This provides value estimates for leaf nodes in the search tree during Monte Carlo Tree Search.

The rollout uses a random policy (via action_sampler) to quickly estimate the value of a state without expensive planning. This is a key component of MCTS algorithms where accurate value estimation is traded off against computational efficiency.

Parameters:
  • state (Any) – Current state to rollout from

  • depth (int) – Current depth in rollout (starts at 0)

  • action_sampler (ActionSampler) – Action sampler for selecting rollout actions

  • environment (Environment) – POMDP environment to simulate in

  • discount_factor (float) – Discount factor for future rewards (0 < γ ≤ 1)

  • max_depth (int) – Maximum rollout depth to prevent infinite loops

Return type:

float

Returns:

Total discounted return from rollout simulation

Examples

>>> import numpy as np
>>> np.random.seed(42)  # For reproducible results
>>> from POMDPPlanners.planners.planners_utils.rollout import random_rollout_action_sampler
>>> from POMDPPlanners.planners.planners_utils.dpw import ActionSampler
>>> from POMDPPlanners.environments.tiger_pomdp import TigerPOMDP
>>>
>>> # Simple action sampler for Tiger POMDP
>>> class TigerActionSampler(ActionSampler):
...     def sample(self, belief_node=None):
...         return np.random.choice(["listen", "open_left", "open_right"])
>>>
>>> # Create environment and sampler
>>> tiger = TigerPOMDP(discount_factor=0.95)
>>> action_sampler = TigerActionSampler()
>>>
>>> # Perform rollout from initial state
>>> initial_state = "tiger_left"
>>> rollout_value = random_rollout_action_sampler(
...     state=initial_state,
...     depth=0,
...     action_sampler=action_sampler,
...     environment=tiger,
...     discount_factor=0.95,
...     max_depth=10
... )