Source code for amlgym.algorithms.RandomAgent

import logging
import os
import random
from dataclasses import dataclass
from typing import List, Tuple, Any

import numpy as np
from tarski.grounding import LPGroundingStrategy
from unified_planning.interop import convert_problem_to_tarski
from unified_planning.exceptions import UPInvalidActionError
from unified_planning.io import PDDLReader, PDDLWriter
from unified_planning.model import Fluent
from unified_planning.plans import ActionInstance
from unified_planning.shortcuts import SequentialSimulator, BoolType

from amlgym.algorithms.ActiveAlgorithmAdapter import ActiveAlgorithmAdapter
from amlgym.algorithms.SAM import SAM
from amlgym.modeling.trajectory import Trajectory


[docs]@dataclass class RandomAgent(ActiveAlgorithmAdapter): """ A simple baseline for online learning in a fully observable and deterministic environment by randomly executing actions. The baselines firstly generates a trajectory and then applies the SAM algorithm for offline learning a model from the generated trace. Example: .. code-block:: python from unified_planning.io import PDDLReader from unified_planning.shortcuts import SequentialSimulator from amlgym.algorithms import get_algorithm from amlgym.benchmarks import get_domain_path, get_problems_path from amlgym.util.util import empty_domain domain = 'blocksworld' domain_ref_path = get_domain_path(domain) input_domain_path = empty_domain(domain_ref_path) problem_path = get_problems_path(domain, kind='learning')[0] problem = PDDLReader().parse_problem(domain_ref_path, problem_path) env = SequentialSimulator(problem=problem) baseline = get_algorithm('RandomAgent', input_domain_path=input_domain_path) model, trajectory = baseline.learn(env, max_steps=100) print("##################### Learned model #####################") print(model) print("################# Generated trajectory ##################") print(trajectory) """
[docs] def learn(self, simulator: SequentialSimulator, max_steps: int = 100, seed: int = 123) -> Tuple[str, Trajectory]: """ Learns a PDDL action model from: (i) a simulator of the environment to learn from (ii) a (possibly empty) input model which is required to specify the predicates and operators signature (set via the input_domain_path attribute at instantiation time); :parameter simulator: environment simulator :parameter max_steps: maximum number of interaction steps with the simulator :parameter seed: random seed for reproducibility :return: a string representing the learned PDDL model, and a JSON specification of the trajectory """ # Set seed for reproducibility random.seed(seed) np.random.seed(seed) # Ground actions problem_path = 'tmp.pddl' PDDLWriter(simulator._problem).write_problem(problem_path) ground_actions = self._ground_actions(self.input_domain_path, problem_path) os.remove(problem_path) # Get initial state state = simulator.get_initial_state() trace_actions = [] trace_states = [state] for i in range(max_steps): action_label = random.choice(ground_actions) operator = simulator._problem.action(action_label[0]) args = [simulator._problem.object(o) for o in action_label[1]] action = ActionInstance(operator, tuple(args)) try: next_state = simulator.apply(state, action) except UPInvalidActionError: next_state = None if next_state is not None: state = next_state trace_states.append(next_state) trace_actions.append(action) # Store generated trajectory by filtering out failed actions trajectory_path = 'tmp_trajectory' success_states = [s for s in trace_states if s is not None] success_actions = [ a for s, a in zip(trace_states[1:], trace_actions) if s is not None ] trajectory = Trajectory(success_states, success_actions) trajectory.write(trajectory_path) model = SAM().learn(self.input_domain_path, [trajectory_path]) return model, Trajectory(trace_states, trace_actions)
def _ground_actions(self, domain_path: str, problem_path: str) -> List[Any]: # Initialize actions grounder with tarski _tmp_problem = PDDLReader().parse_problem(domain_path, problem_path) # Add a dummy fluent to show `preconditions:` and `effects:` sections in the PDDL file dummy_fluent = Fluent('dummy', BoolType()) if dummy_fluent not in _tmp_problem.fluents: _tmp_problem.add_fluent(dummy_fluent) _tmp_problem.set_initial_value(dummy_fluent, True) # Rebuild actions with no preconditions/effects for action in _tmp_problem.actions: action.clear_preconditions() action.clear_effects() # ensure `preconditions:` and `effects:` sections in the PDDL file action.add_precondition(dummy_fluent) action.add_effect(dummy_fluent, True) # Remove problem goal to avoid tarski reachability issues _tmp_problem.clear_goals() tarski_problem = convert_problem_to_tarski(_tmp_problem) grounder = LPGroundingStrategy(tarski_problem) logging.debug("Grounding actions with tarski...") ground_actions = grounder.ground_actions() ground_action_labels = list() for op_name, param_combos in ground_actions.items(): for args in param_combos: ground_action_labels.append((op_name, args)) return ground_action_labels