Source code for POMDPPlanners.core.belief.gaussian_belief

"""Gaussian belief state representation for POMDP environments.

This module provides a multivariate Gaussian belief state that delegates
updates to a :class:`~POMDPPlanners.core.belief.gaussian_belief_updaters.GaussianBeliefUpdater`
instance, allowing compatibility with EKF, UKF, or any custom Gaussian update rule.

Classes:
    GaussianBelief: Multivariate Gaussian belief with pluggable updater.
"""

from typing import Any, Optional

import numpy as np

from POMDPPlanners.core.belief.base_belief import Belief
from POMDPPlanners.core.belief.gaussian_belief_updaters import GaussianBeliefUpdater
from POMDPPlanners.core.environment import Environment
from POMDPPlanners.utils.config_to_id import config_to_id
from POMDPPlanners.utils.multivariate_normal import (
    CovarianceParameterizedMultivariateNormal,
)


[docs] class GaussianBelief(Belief): """Multivariate Gaussian belief state representation. Represents the belief as a multivariate normal distribution N(mean, covariance). The update mechanism delegates to a :class:`~POMDPPlanners.core.belief.gaussian_belief_updaters.GaussianBeliefUpdater` instance, allowing the same class to work with EKF, UKF, or any custom Gaussian update rule without requiring environment modifications. This belief type is compatible with PFT_DPW, Sparse-PFT, and SparseSampling planners. It is NOT compatible with POMCP/POMCP_DPW planners because it does not support incremental particle accumulation via ``inplace_update()``. Attributes: mean: Mean vector of the Gaussian distribution. covariance: Covariance matrix of the Gaussian distribution. updater: GaussianBeliefUpdater that computes the Bayesian belief update. n_terminal_check_samples: Number of Monte Carlo samples for terminal checks. Example: >>> import numpy as np >>> np.random.seed(42) >>> >>> # Create a linear Kalman filter updater >>> from POMDPPlanners.core.belief.gaussian_belief_updaters import ( ... LinearKalmanFilterUpdater, ... ) >>> updater = LinearKalmanFilterUpdater( ... A=np.eye(2), B=np.zeros((2, 1)), H=np.eye(2), ... Q=0.1 * np.eye(2), R=0.5 * np.eye(2), ... ) >>> >>> # Create 2D Gaussian belief >>> mean = np.array([0.0, 0.0]) >>> cov = np.eye(2) >>> belief = GaussianBelief(mean=mean, covariance=cov, updater=updater) >>> >>> # Sample a state >>> state = belief.sample() >>> len(state) == 2 True >>> >>> # Update belief with observation >>> new_belief = belief.update( ... action=np.zeros(1), observation=np.array([1.0, 1.0]), pomdp=None ... ) >>> new_belief.mean.shape (2,) """ def __init__( self, mean: np.ndarray, covariance: np.ndarray, updater: GaussianBeliefUpdater, n_terminal_check_samples: int = 50, ): """Initialize Gaussian belief. Args: mean: Mean vector of shape (d,). covariance: Positive definite covariance matrix of shape (d, d). updater: A :class:`GaussianBeliefUpdater` instance whose ``update(mean, covariance, action, observation)`` method returns ``(new_mean, new_covariance)``. n_terminal_check_samples: Number of Monte Carlo samples drawn for terminal state checks. Defaults to 50. Raises: ValueError: If mean/covariance dimensions are inconsistent or covariance is not a valid positive definite matrix. """ mean = np.asarray(mean, dtype=float) covariance = np.asarray(covariance, dtype=float) self._validate_inputs(mean, covariance) self.mean = mean self.covariance = covariance self.updater = updater self.n_terminal_check_samples = n_terminal_check_samples self._mvn = CovarianceParameterizedMultivariateNormal(covariance) @staticmethod def _validate_inputs(mean: np.ndarray, covariance: np.ndarray) -> None: if mean.ndim != 1: raise ValueError(f"mean must be a 1D array, got {mean.ndim}D") if covariance.ndim != 2: raise ValueError(f"covariance must be a 2D array, got {covariance.ndim}D") if covariance.shape[0] != covariance.shape[1]: raise ValueError(f"covariance must be square, got shape {covariance.shape}") if mean.shape[0] != covariance.shape[0]: raise ValueError( f"mean length {mean.shape[0]} does not match " f"covariance dimension {covariance.shape[0]}" )
[docs] def sample(self) -> np.ndarray: """Sample a state from the Gaussian belief. Returns: A state vector of shape (d,) sampled from N(mean, covariance). """ return self._mvn.sample(self.mean, n_samples=1)[0]
[docs] def update( self, action: Any, observation: Any, pomdp: Optional[Environment] = None, state: Optional[Any] = None, ) -> "GaussianBelief": """Update belief using the provided updater. Args: action: Action that was executed. observation: Observation that was received. pomdp: Unused. Kept for interface compatibility with :class:`~POMDPPlanners.core.belief.base_belief.Belief`. state: Ignored for Gaussian beliefs. Returns: New GaussianBelief with updated mean and covariance. """ new_mean, new_cov = self.updater.update(self.mean, self.covariance, action, observation) return GaussianBelief( mean=new_mean, covariance=new_cov, updater=self.updater, n_terminal_check_samples=self.n_terminal_check_samples, )
@property def config_id(self) -> str: """Generate a deterministic identifier based on belief configuration.""" config_dict = { "mean": self.mean.tolist(), "covariance": self.covariance.tolist(), "n_terminal_check_samples": self.n_terminal_check_samples, "updater": self.updater.config_id, } return config_to_id(config_dict) @property def dim(self) -> int: """Return the dimensionality of the Gaussian belief.""" return len(self.mean)
[docs] def entropy(self) -> float: """Compute the differential entropy of the Gaussian distribution. Uses the closed-form expression: H = 0.5 * (d * ln(2 * pi * e) + ln(det(Sigma))) Returns: Differential entropy in nats. """ d = self.dim log_det = np.linalg.slogdet(self.covariance)[1] return 0.5 * (d * np.log(2.0 * np.pi * np.e) + log_det)