Source code for amlgym.algorithms.InformationGainAgent

import logging
import os
import random
import tempfile
from dataclasses import dataclass
from typing import Tuple

import numpy as np
from unified_planning.exceptions import UPInvalidActionError
from unified_planning.io import PDDLWriter
from unified_planning.plans import ActionInstance
from unified_planning.shortcuts import SequentialSimulator

from amlgym.algorithms.ActiveAlgorithmAdapter import ActiveAlgorithmAdapter
from amlgym.modeling.trajectory import Trajectory

from information_gain_aml.algorithms import InformationGainLearner
from information_gain_aml.core import UPAdapter
logger = logging.getLogger(__name__)


[docs]@dataclass class InformationGainAgent(ActiveAlgorithmAdapter): """ Online action model learning via information gain. Uses CNF/SAT-based information-theoretic approach to select actions that maximize expected information gain about the action model. Args: use_object_subset (bool): Enable object subset selection for reduced grounding spare_objects_per_type (int): Extra objects per type beyond minimum requirement (for subset selection) model_mode (str): "safe" (all possible preconditions, confirmed effects only) or "complete" (certain preconditions only, all possible effects) learn_negative_preconditions (bool): Whether to learn negative preconditions selection_strategy (str): Action selection strategy. One of: - "greedy" — always select highest information gain (default) - "epsilon_greedy" — explore with probability epsilon - "boltzmann" — softmax probabilistic selection - "lookahead" — depth-limited lookahead with discounted future gain - "mcts" — full UCT-based Monte Carlo Tree Search lookahead_depth (int): Lookahead depth for 'lookahead' strategy (default: 2) lookahead_top_k (int): Number of top actions to evaluate in lookahead (default: 5) lookahead_discount (float): Discount factor for future gain in lookahead (default: 0.9) epsilon (float): Exploration probability for 'epsilon_greedy' strategy (default: 0.1) temperature (float): Temperature for 'boltzmann' softmax selection (default: 1.0) mcts_iterations (int): Number of MCTS iterations per action selection (default: 50) mcts_rollout_depth (int): Simulation depth during MCTS rollout phase (default: 5) Example: .. code-block:: python from unified_planning.io import PDDLReader from unified_planning.shortcuts import SequentialSimulator from amlgym.algorithms import get_algorithm from amlgym.benchmarks import get_domain_path, get_problems_path from amlgym.util.util import empty_domain domain = 'blocksworld' domain_ref_path = get_domain_path(domain) input_domain_path = empty_domain(domain_ref_path) problem_path = get_problems_path(domain, kind='learning')[0] problem = PDDLReader().parse_problem(domain_ref_path, problem_path) env = SequentialSimulator(problem=problem) info_gain = get_algorithm('InformationGainAgent', input_domain_path=input_domain_path) model, trajectory = info_gain.learn(env, max_steps=100) # With lookahead strategy info_gain = get_algorithm( 'InformationGainAgent', input_domain_path=input_domain_path, selection_strategy='lookahead', lookahead_depth=3, ) model, trajectory = info_gain.learn(env, max_steps=100) print("##################### Learned model #####################") print(model) print("################# Generated trajectory ##################") print(trajectory) """ use_object_subset: bool = True spare_objects_per_type: int = 2 model_mode: str = "safe" learn_negative_preconditions: bool = True selection_strategy: str = "greedy" epsilon: float = 0.1 temperature: float = 1.0 lookahead_depth: int = 2 lookahead_top_k: int = 5 lookahead_discount: float = 0.9 mcts_iterations: int = 50 mcts_rollout_depth: int = 5
[docs] def learn(self, simulator: SequentialSimulator, max_steps: int = 500, seed: int = 123) -> Tuple[str, Trajectory]: """ Learn a PDDL action model by interacting with the environment. :parameter simulator: environment simulator :parameter max_steps: maximum number of interaction steps with the simulator :parameter seed: random seed for reproducibility :return: (learned PDDL model string, trajectory) """ random.seed(seed) np.random.seed(seed) problem = simulator._problem # Write problem to temp file for our learner's init tmp_problem = tempfile.NamedTemporaryFile( mode='w', suffix='.pddl', delete=False ) tmp_problem_path = tmp_problem.name tmp_problem.close() PDDLWriter(problem).write_problem(tmp_problem_path) try: # Initialize our learner learner = InformationGainLearner( domain_file=self.input_domain_path, problem_file=tmp_problem_path, max_iterations=max_steps, use_object_subset=self.use_object_subset, spare_objects_per_type=self.spare_objects_per_type, learn_negative_preconditions=self.learn_negative_preconditions, seed=seed, selection_strategy=self.selection_strategy, epsilon=self.epsilon, temperature=self.temperature, lookahead_depth=self.lookahead_depth, lookahead_top_k=self.lookahead_top_k, lookahead_discount=self.lookahead_discount, mcts_iterations=self.mcts_iterations, mcts_rollout_depth=self.mcts_rollout_depth, ) # Get initial state up_state = simulator.get_initial_state() trace_states = [up_state] trace_actions = [] for _ in range(max_steps): # Convert UP state to fluent set for our algorithm state_set = UPAdapter.up_state_to_fluent_set(up_state, problem) # Select action action_name, objects = learner.select_action(state_set) if action_name == "no_action": logger.info("Algorithm converged (no information gain)") break # Create ActionInstance for simulator up_action = problem.action(action_name) up_objects = tuple(problem.object(o) for o in objects) action_instance = ActionInstance(up_action, up_objects) # Execute via simulator try: next_up_state = simulator.apply(up_state, action_instance) except UPInvalidActionError: next_up_state = None success = next_up_state is not None # Record trajectory trace_actions.append(action_instance) trace_states.append(next_up_state) # Observe result for learning if success: next_state_set = UPAdapter.up_state_to_fluent_set( next_up_state, problem ) learner.observe( state_set, action_name, objects, True, next_state_set ) up_state = next_up_state else: learner.observe( state_set, action_name, objects, False, None ) # Generate PDDL model model_str = learner.to_pddl_string(mode=self.model_mode) trajectory = Trajectory(trace_states, trace_actions) finally: os.remove(tmp_problem_path) return model_str, trajectory