POMDPPlanners.planners.planners_utils package
Submodules
POMDPPlanners.planners.planners_utils.cvar_exploration module
POMDPPlanners.planners.planners_utils.cvar_progressive_widening module
- POMDPPlanners.planners.planners_utils.cvar_progressive_widening.cvar_action_progressive_widening(belief_node, alpha_a, action_sampler, exploration_constant, k_a, min_immediate_cost, max_immediate_cost, depth, max_depth, gamma, min_visit_count_per_action, alpha, delta, discrete_actions=False, visit_count_penalty=0.0)[source]
- Return type:
- Parameters:
belief_node (BeliefNode)
alpha_a (float)
action_sampler (ActionSampler)
exploration_constant (float)
k_a (float)
min_immediate_cost (float)
max_immediate_cost (float)
depth (int)
max_depth (int)
gamma (float)
min_visit_count_per_action (int)
alpha (float)
delta (float)
discrete_actions (bool)
visit_count_penalty (float)
POMDPPlanners.planners.planners_utils.dpw module
- class POMDPPlanners.planners.planners_utils.dpw.ActionSampler[source]
Bases:
ABCAbstract base class for action sampling strategies in PFT-DPW.
Action samplers provide domain-specific strategies for generating new actions during progressive widening. This allows PFT-DPW to work with continuous or large discrete action spaces by intelligently sampling promising actions.
The ActionSampler interface enables flexible action space exploration by allowing custom sampling strategies that can incorporate domain knowledge, belief state information, or specialized sampling distributions.
The class is serializable and can be safely pickled/unpickled for distributed computing, caching, or saving/loading configurations.
Examples
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> from POMDPPlanners.planners.planners_utils.dpw import ActionSampler >>> >>> class ContinuousControlSampler(ActionSampler): ... def __init__(self, action_bounds=(-1.0, 1.0), action_dim=2): ... self.action_bounds = action_bounds ... self.action_dim = action_dim ... ... def sample(self, belief_node: Optional[BeliefNode] = None): ... # Sample uniformly from action space ... low, high = self.action_bounds ... return np.random.uniform(low, high, size=self.action_dim) >>> >>> # Usage with PFT-DPW >>> sampler = ContinuousControlSampler(action_bounds=(-2.0, 2.0), action_dim=4) >>> action = sampler.sample() # Returns 4D action vector >>> >>> # Serialization works automatically >>> import pickle >>> serialized = pickle.dumps(sampler) >>> restored_sampler = pickle.loads(serialized)
Discrete action sampler with custom distribution:
import numpy as np from POMDPPlanners.planners.planners_utils.dpw import ActionSampler class WeightedDiscreteActionSampler(ActionSampler): def __init__(self, actions, probabilities=None): self.actions = actions # Use uniform probabilities if none provided if probabilities is None: self.probabilities = np.ones(len(actions)) / len(actions) else: self.probabilities = np.array(probabilities) self.probabilities /= np.sum(self.probabilities) # Normalize def sample(self, belief_node: Optional[BeliefNode] = None): return np.random.choice(self.actions, p=self.probabilities) # Prefer certain actions over others actions = ["up", "down", "left", "right", "stay"] probs = [0.2, 0.2, 0.2, 0.2, 0.2] # Uniform sampler = WeightedDiscreteActionSampler(actions, probs)
Belief-informed action sampler:
import numpy as np from POMDPPlanners.planners.planners_utils.dpw import ActionSampler class AdaptiveActionSampler(ActionSampler): def __init__(self, base_actions, exploration_noise=0.1): self.base_actions = base_actions self.exploration_noise = exploration_noise def sample(self, belief_node: Optional[BeliefNode] = None): if belief_node is not None and belief_node.visit_count > 10: # Use belief state to inform sampling best_action = self._get_best_action_from_belief(belief_node) # Add exploration noise noise = np.random.normal(0, self.exploration_noise, len(best_action)) return best_action + noise else: # Random exploration for new nodes return np.random.choice(self.base_actions) def _get_best_action_from_belief(self, belief_node): # Simplified: return action from best child if belief_node.children: best_child = max(belief_node.children, key=lambda x: x.q_value) return best_child.action return np.random.choice(self.base_actions) sampler = AdaptiveActionSampler([0, 1, 2, 3], exploration_noise=0.05)
Multi-modal action sampler for hybrid control:
import numpy as np from POMDPPlanners.planners.planners_utils.dpw import ActionSampler class MultiModalActionSampler(ActionSampler): def __init__(self, discrete_actions, continuous_bounds, mode_prob=0.5): self.discrete_actions = discrete_actions self.continuous_bounds = continuous_bounds self.mode_prob = mode_prob # Probability of discrete vs continuous def sample(self, belief_node=None): if np.random.random() < self.mode_prob: # Sample discrete action return {"type": "discrete", "action": np.random.choice(self.discrete_actions)} else: # Sample continuous action low, high = self.continuous_bounds continuous_action = np.random.uniform(low, high, size=2) return {"type": "continuous", "action": continuous_action} # For environments with both discrete and continuous actions discrete_acts = ["stop", "emergency_brake", "lane_change"] continuous_bounds = (-5.0, 5.0) # Steering/acceleration range sampler = MultiModalActionSampler(discrete_acts, continuous_bounds)
Goal-directed action sampler:
import numpy as np from POMDPPlanners.planners.planners_utils.dpw import ActionSampler class GoalDirectedActionSampler(ActionSampler): def __init__(self, goal_position, action_magnitude=1.0, goal_bias=0.7): self.goal_position = np.array(goal_position) self.action_magnitude = action_magnitude self.goal_bias = goal_bias def sample(self, belief_node=None): if np.random.random() < self.goal_bias and belief_node is not None: # Sample action towards goal based on current belief current_position = self._estimate_position(belief_node) direction = self.goal_position - current_position if np.linalg.norm(direction) > 0: direction = direction / np.linalg.norm(direction) return direction * self.action_magnitude # Random exploration angle = np.random.uniform(0, 2 * np.pi) return self.action_magnitude * np.array([np.cos(angle), np.sin(angle)]) def _estimate_position(self, belief_node): # Simplified: use mean of particles in belief if hasattr(belief_node.belief, 'particles'): positions = [p[:2] for p in belief_node.belief.particles] # First 2D as position return np.mean(positions, axis=0) return np.array([0.0, 0.0]) # Navigation towards specific goal goal = [10.0, 5.0] sampler = GoalDirectedActionSampler(goal, action_magnitude=2.0, goal_bias=0.8)
- abstractmethod sample(belief_node=None)[source]
Sample a new action for progressive widening.
- Parameters:
belief_node (
Optional[BeliefNode]) – Optional belief node context for informed sampling- Return type:
- Returns:
A sampled action compatible with the environment’s action space
- POMDPPlanners.planners.planners_utils.dpw.action_progressive_widening(belief_node, alpha_a, action_sampler, exploration_constant, k_a, min_visit_count_per_action=1)[source]
Select or add action using progressive widening strategy.
Progressive widening gradually expands the action space based on visit counts. New actions are added when ⌊n^α_a⌋ > ⌊(n-1)^α_a⌋, where n is the visit count. Otherwise, existing actions are selected using UCB1.
The progressive widening mechanism balances exploration and exploitation by: 1. Initially adding new actions frequently (exploration phase) 2. Gradually reducing the rate of new actions as visit count increases 3. Eventually relying primarily on UCB1 selection (exploitation phase)
- Parameters:
belief_node (
BeliefNode) – Current belief node to select action fromalpha_a (
float) – Progressive widening exponent (0 < alpha_a ≤ 1). Lower values create fewer actions, higher values create more actions.action_sampler (
ActionSampler) – Action sampler for generating new actionsexploration_constant (
float) – UCB1 exploration constant for existing actionsk_a (float)
min_visit_count_per_action (int)
- Return type:
- Returns:
Selected or newly created action node
Examples
Basic usage with continuous action sampler:
import numpy as np from POMDPPlanners.planners.planners_utils.dpw import ( ActionSampler, action_progressive_widening ) from POMDPPlanners.core.tree import BeliefNode from POMDPPlanners.core.belief import WeightedParticleBelief # Create action sampler class SimpleActionSampler(ActionSampler): def sample(self, belief_node=None): return np.random.uniform(-1, 1, size=2) # Create belief node particles = [[0.0, 0.0], [1.0, 1.0]] log_weights = np.log(np.array([0.5, 0.5])) belief = WeightedParticleBelief(particles, log_weights) belief_node = BeliefNode(belief=belief) # Progressive widening action_sampler = SimpleActionSampler() action_node = action_progressive_widening( belief_node=belief_node, alpha_a=0.5, # Moderate exploration action_sampler=action_sampler, exploration_constant=1.0 )
Comparing different alpha_a values:
# Conservative exploration (fewer new actions) conservative_action = action_progressive_widening( belief_node=belief_node, alpha_a=0.25, # Low alpha = fewer actions action_sampler=action_sampler, exploration_constant=1.0 ) # Aggressive exploration (more new actions) aggressive_action = action_progressive_widening( belief_node=belief_node, alpha_a=0.75, # High alpha = more actions action_sampler=action_sampler, exploration_constant=1.0 )
Progressive widening in a loop (simulating MCTS):
import numpy as np from POMDPPlanners.planners.planners_utils.dpw import ActionSampler, action_progressive_widening from POMDPPlanners.core.tree import BeliefNode, ActionNode from POMDPPlanners.core.belief import WeightedParticleBelief # Setup class DiscreteActionSampler(ActionSampler): def __init__(self, actions): self.actions = actions def sample(self, belief_node=None): return np.random.choice(self.actions) particles = [[0], [1], [2]] log_weights = np.log(np.array([1/3, 1/3, 1/3])) belief = WeightedParticleBelief(particles, log_weights) root_node = BeliefNode(belief=belief) sampler = DiscreteActionSampler(['up', 'down', 'left', 'right']) # Simulate multiple selections for i in range(10): root_node.visit_count = i # Simulate increasing visits action_node = action_progressive_widening( belief_node=root_node, alpha_a=0.5, action_sampler=sampler, exploration_constant=1.41 # sqrt(2) ) print(f"Visit {i}: {len(root_node.children)} actions, selected {action_node.action}")
Tuning progressive widening parameters:
# Effect of alpha_a on action creation visit_counts = range(1, 21) alpha_values = [0.25, 0.5, 0.75, 1.0] for alpha in alpha_values: action_counts = [] for n in visit_counts: # Calculate when new actions would be created should_create = floor(n ** alpha) > floor((n-1) ** alpha) if n > 0 else True action_counts.append(1 if should_create else 0) total_new_actions = sum(action_counts) print(f"Alpha {alpha}: {total_new_actions} new actions in 20 visits")
- POMDPPlanners.planners.planners_utils.dpw.ucb1_exploration(belief_node, exploration_constant)[source]
Select action from existing children using UCB1 criterion.
Uses Upper Confidence Bounds (UCB1) to balance exploration and exploitation: UCB1(a) = Q(a) + c * sqrt(log(N) / N(a)) where Q(a) is the average reward, N is parent visits, N(a) is action visits, and c is the exploration constant.
The UCB1 algorithm provides theoretical guarantees for multi-armed bandit problems and is widely used in Monte Carlo Tree Search algorithms. It automatically balances exploitation of good actions (high Q-values) with exploration of uncertain actions (low visit counts).
- Parameters:
belief_node (
BeliefNode) – Belief node with existing action childrenexploration_constant (
float) – Controls exploration vs exploitation trade-off. Higher values favor exploration, lower values favor exploitation. Common values: √2 ≈ 1.41 (theoretical optimum), 0.5-2.0 (practical range)
- Return type:
- Returns:
Action node with highest UCB1 value
Examples
Basic UCB1 action selection:
import numpy as np from POMDPPlanners.planners.planners_utils.dpw import ucb1_exploration from POMDPPlanners.core.tree import BeliefNode, ActionNode from POMDPPlanners.core.belief import WeightedParticleBelief # Create belief node with action children particles = [[0.0], [1.0]] log_weights = np.log(np.array([0.5, 0.5])) belief = WeightedParticleBelief(particles, log_weights) belief_node = BeliefNode(belief=belief) belief_node.visit_count = 100 # Add action nodes with different Q-values and visit counts actions_data = [ {"action": "up", "q_value": 0.8, "visits": 30}, {"action": "down", "q_value": 0.6, "visits": 20}, {"action": "left", "q_value": 0.9, "visits": 40}, {"action": "right", "q_value": 0.4, "visits": 10} ] for data in actions_data: action_node = ActionNode(action=data["action"], parent=belief_node) action_node.q_value = data["q_value"] action_node.visit_count = data["visits"] # Select action using UCB1 selected_action = ucb1_exploration( belief_node=belief_node, exploration_constant=1.41 # sqrt(2) ) print(f"Selected action: {selected_action.action}")
Comparing exploration constants:
# Low exploration (favor exploitation) conservative_action = ucb1_exploration( belief_node=belief_node, exploration_constant=0.1 ) # High exploration (favor exploration) exploratory_action = ucb1_exploration( belief_node=belief_node, exploration_constant=3.0 ) # Balanced approach (theoretical optimum) balanced_action = ucb1_exploration( belief_node=belief_node, exploration_constant=1.41 # sqrt(2) )
Manual UCB1 calculation and verification:
import numpy as np from POMDPPlanners.planners.planners_utils.dpw import ucb1_exploration # Calculate UCB1 values manually exploration_constant = 1.0 ucb1_values = [] for child in belief_node.children: exploration_term = exploration_constant * np.sqrt( np.log(belief_node.visit_count) / child.visit_count ) ucb1 = child.q_value + exploration_term ucb1_values.append(ucb1) print(f"Action {child.action}: Q={child.q_value:.2f}, " f"exploration={exploration_term:.3f}, UCB1={ucb1:.3f}") # Verify our function selects the highest UCB1 expected_best_idx = np.argmax(ucb1_values) selected_action = ucb1_exploration(belief_node, exploration_constant) actual_best_idx = belief_node.children.index(selected_action) assert expected_best_idx == actual_best_idx, "UCB1 selection mismatch"
UCB1 in dynamic scenarios:
import numpy as np from POMDPPlanners.planners.planners_utils.dpw import ucb1_exploration # Simulate how UCB1 selection changes over time belief_node.visit_count = 1 for round_num in range(1, 11): belief_node.visit_count = round_num * 10 # Select action selected = ucb1_exploration(belief_node, exploration_constant=1.41) # Update the selected action (simulate learning) selected.visit_count += 1 selected.q_value += (np.random.normal(0.5, 0.1) - selected.q_value) / selected.visit_count print(f"Round {round_num}: Selected {selected.action}, " f"Q={selected.q_value:.3f}, visits={selected.visit_count}")
Exploration vs exploitation analysis:
# Create scenario with clear best action vs uncertain actions exploration_constants = [0.1, 0.5, 1.0, 1.41, 2.0, 5.0] selection_counts = {c: {"up": 0, "down": 0, "left": 0, "right": 0} for c in exploration_constants} for exploration_c in exploration_constants: # Run multiple selections for _ in range(100): selected = ucb1_exploration(belief_node, exploration_c) selection_counts[exploration_c][selected.action] += 1 print(f"Exploration constant {exploration_c}:") for action, count in selection_counts[exploration_c].items(): print(f" {action}: {count}% selections")
UCB1 with confidence intervals:
import math from POMDPPlanners.planners.planners_utils.dpw import ucb1_exploration # Calculate confidence intervals for each action exploration_constant = 1.41 confidence_level = 0.95 for child in belief_node.children: # UCB1 upper confidence bound confidence_radius = exploration_constant * math.sqrt( math.log(belief_node.visit_count) / child.visit_count ) lower_bound = child.q_value - confidence_radius upper_bound = child.q_value + confidence_radius print(f"Action {child.action}: " f"Q={child.q_value:.3f} ± {confidence_radius:.3f} " f"[{lower_bound:.3f}, {upper_bound:.3f}]") # The selected action has the highest upper bound selected = ucb1_exploration(belief_node, exploration_constant) print(f"Selected: {selected.action} (highest upper confidence bound)")
POMDPPlanners.planners.planners_utils.path_simulations_policy module
- class POMDPPlanners.planners.planners_utils.path_simulations_policy.DoubleProgressiveWideningMCTSPolicy(environment, discount_factor, depth, name, action_sampler, k_a, alpha_a, k_o, alpha_o, exploration_constant, min_visit_count_per_action=1, time_out_in_seconds=None, n_simulations=None, log_path=None, debug=False, use_queue_logger=False)[source]
Bases:
PathSimulationPolicyAbstract base class for MCTS planners using double progressive widening.
This base class provides common initialization, parameter validation, and attributes for MCTS planners that use double progressive widening (both action and observation progressive widening). Subclasses implement their own simulation strategies while sharing common parameters and validation logic.
Progressive Widening Overview: Double progressive widening controls tree growth by limiting how many actions and observations are added to the tree based on visit counts: - Action widening: New actions added when ⌊k_a * n^α_a⌋ increases - Observation widening: Max observations limited by ⌊k_o * n^α_o⌋
Common Progressive Widening Parameters: - k_a, alpha_a: Control action progressive widening - k_o, alpha_o: Control observation progressive widening - exploration_constant: UCB1 exploration parameter
- Parameters:
environment (Environment)
discount_factor (float)
depth (int)
name (str)
action_sampler (ActionSampler)
k_a (float)
alpha_a (float)
k_o (float)
alpha_o (float)
exploration_constant (float)
min_visit_count_per_action (int)
time_out_in_seconds (int | None)
n_simulations (int | None)
log_path (Path | None)
debug (bool)
use_queue_logger (bool)
- depth
Maximum search depth for tree expansion
- exploration_constant
UCB1 exploration parameter (c in UCB1 formula)
- action_sampler
Action sampling strategy for progressive widening
- k_o
Observation progressive widening coefficient (k_o > 0)
- k_a
Action progressive widening coefficient (k_a > 0)
- alpha_o
Observation progressive widening exponent (0 < α_o ≤ 1)
- alpha_a
Action progressive widening exponent (0 < α_a ≤ 1)
- Subclasses:
POMCP_DPW: Uses unweighted particle beliefs with double progressive widening
POMCPOW: Uses weighted particle beliefs with double progressive widening
PFT_DPW: Uses progressive function transfer with custom simulation strategy
Note
This is an abstract base class and cannot be instantiated directly. Subclasses must implement the _simulate_path method.
- action_sampler: ActionSampler
- classmethod get_space_info()[source]
Get information about action and observation spaces.
Default implementation returns MIXED space types, which is appropriate for most progressive widening MCTS planners that support both discrete and continuous action spaces through the action sampler interface.
Subclasses can override this method to specify different space requirements (e.g., PFT_DPW specifies CONTINUOUS action space).
- Return type:
- Returns:
PolicySpaceInfo with MIXED space types for both actions and observations
- class POMDPPlanners.planners.planners_utils.path_simulations_policy.PathSimulationPolicy(environment, discount_factor, name, n_simulations, time_out_in_seconds, action_sampler=None, log_path=None, debug=False, use_queue_logger=False)[source]
Bases:
PolicyAbstract base class for Monte Carlo Tree Search algorithms in POMDP planning.
This class provides a common framework for MCTS-based POMDP planners that build search trees through path simulations. It handles the core tree construction loop and provides hooks for algorithm-specific simulation strategies.
The class supports two termination criteria: 1. Simulation count: Run a fixed number of MCTS simulations 2. Time limit: Run simulations for a specified time duration
Key Components: - Tree construction with configurable termination criteria - Automatic tree metrics collection for analysis - Action selection from the constructed search tree - Abstract simulation interface for algorithm specialization
Subclass Responsibilities: Concrete implementations must provide the _simulate_path method that defines how individual MCTS simulations are performed, including: - Node expansion strategies - Action selection during tree traversal - Value estimation and backpropagation
- Parameters:
environment (Environment)
discount_factor (float)
name (str)
n_simulations (int | None)
time_out_in_seconds (int | None)
action_sampler (ActionSampler | None)
log_path (Path | None)
debug (bool)
use_queue_logger (bool)
- environment
The POMDP environment for planning
- discount_factor
Discount factor for future rewards (0 < γ ≤ 1)
- n_simulations
Number of MCTS simulations to run (mutually exclusive with timeout)
- time_out_in_seconds
Time limit for planning in seconds (mutually exclusive with n_simulations)
- name
Identifier for the policy instance
Algorithm Integration: This base class is used by several MCTS algorithms in the framework: - POMCP: Uses UCB1 for action selection with particle filtering - PFT-DPW: Implements progressive widening for continuous action spaces - Sparse-PFT: Combines sparse sampling with progressive widening
The common interface allows easy comparison and benchmarking of different MCTS variants while sharing the core tree construction infrastructure.
- action(belief)[source]
Select action(s) based on the current belief state.
This is the core method that implements the policy’s decision-making logic. It takes a belief state and returns the selected action(s) along with execution information and performance metrics.
- Parameters:
belief (
Belief) – Current belief state representing uncertainty over states- Returns:
List of selected actions (typically single action, but supports multiple)
PolicyRunData with execution metrics and performance information
- Return type:
Note
Subclasses must implement this method with their specific planning or decision-making algorithm.
- class POMDPPlanners.planners.planners_utils.path_simulations_policy.PathSimulationPolicyCostSetting(environment, discount_factor, name, action_sampler=None, n_simulations=None, time_out_in_seconds=None, log_path=None, debug=False)[source]
Bases:
PathSimulationPolicy- Parameters:
environment (Environment)
discount_factor (float)
name (str)
action_sampler (ActionSampler | None)
n_simulations (int | None)
time_out_in_seconds (int | None)
log_path (Path | None)
debug (bool)
- action(belief)[source]
Select action(s) based on the current belief state.
This is the core method that implements the policy’s decision-making logic. It takes a belief state and returns the selected action(s) along with execution information and performance metrics.
- Parameters:
belief (
Belief) – Current belief state representing uncertainty over states- Returns:
List of selected actions (typically single action, but supports multiple)
PolicyRunData with execution metrics and performance information
- Return type:
Note
Subclasses must implement this method with their specific planning or decision-making algorithm.
POMDPPlanners.planners.planners_utils.rollout module
- POMDPPlanners.planners.planners_utils.rollout.random_rollout_action_sampler(state, depth, action_sampler, environment, discount_factor, max_depth=10)[source]
Perform random rollout to estimate value from leaf node.
Rollout policy samples random actions using the action_sampler until reaching maximum depth or terminal state. This provides value estimates for leaf nodes in the search tree during Monte Carlo Tree Search.
The rollout uses a random policy (via action_sampler) to quickly estimate the value of a state without expensive planning. This is a key component of MCTS algorithms where accurate value estimation is traded off against computational efficiency.
- Parameters:
state (
Any) – Current state to rollout fromdepth (
int) – Current depth in rollout (starts at 0)action_sampler (
ActionSampler) – Action sampler for selecting rollout actionsenvironment (
Environment) – POMDP environment to simulate indiscount_factor (
float) – Discount factor for future rewards (0 < γ ≤ 1)max_depth (
int) – Maximum rollout depth to prevent infinite loops
- Return type:
- Returns:
Total discounted return from rollout simulation
Examples
>>> import numpy as np >>> np.random.seed(42) # For reproducible results >>> from POMDPPlanners.planners.planners_utils.rollout import random_rollout_action_sampler >>> from POMDPPlanners.planners.planners_utils.dpw import ActionSampler >>> from POMDPPlanners.environments.tiger_pomdp import TigerPOMDP >>> >>> # Simple action sampler for Tiger POMDP >>> class TigerActionSampler(ActionSampler): ... def sample(self, belief_node=None): ... return np.random.choice(["listen", "open_left", "open_right"]) >>> >>> # Create environment and sampler >>> tiger = TigerPOMDP(discount_factor=0.95) >>> action_sampler = TigerActionSampler() >>> >>> # Perform rollout from initial state >>> initial_state = "tiger_left" >>> rollout_value = random_rollout_action_sampler( ... state=initial_state, ... depth=0, ... action_sampler=action_sampler, ... environment=tiger, ... discount_factor=0.95, ... max_depth=10 ... )