Source code for amlgym.modeling.UPEnv

import logging
import os
from collections import defaultdict
from dataclasses import dataclass
from functools import cached_property
from typing import List, TypeVar, Set, Dict, Any

from unified_planning.exceptions import UPInvalidActionError
from unified_planning.io import PDDLReader, PDDLWriter
from unified_planning.model import Problem, Fluent, UPState
from unified_planning.plans import ActionInstance
from unified_planning.shortcuts import SequentialSimulator, BoolType, FALSE, TRUE

from tarski.io import PDDLReader as tarskiPDDLReader
from tarski.grounding import LPGroundingStrategy

from amlgym.modeling.env import Env

ObservationType = TypeVar('ObservationType')


[docs]@dataclass class UPEnv(Env): """ A Gymnasium environment which simulates a PDDL problem through the unified-planning sequential simulator. """ # The PDDL problem in unified-planning problem: Problem # The environment simulation engine _simulator: SequentialSimulator
[docs] def __init__(self, domain_path: str, problem_path: str) -> None: """ Set environment state and seed through :meth:`reset` for reproducibility """ # Parse the PDDL environment in unified-planning self.problem = PDDLReader().parse_problem(domain_path, problem_path) # Instantiate the environment simulation engine self._simulator = SequentialSimulator(self.problem) # Create a fictitious state with all negated literals all_neg_fluents = {f: FALSE() for f, v in self.problem.initial_values.items()} UPState.MAX_ANCESTORS = None self.all_neg_state = UPState(all_neg_fluents, self.problem) # Initialize actions grounder with tarski _tmp_problem = PDDLReader().parse_problem(domain_path, problem_path) # Add a dummy fluent to show `preconditions:` and `effects:` sections in the PDDL file dummy_fluent = Fluent('dummy', BoolType()) if dummy_fluent not in _tmp_problem.fluents: _tmp_problem.add_fluent(dummy_fluent) _tmp_problem.set_initial_value(dummy_fluent, True) # Rebuild actions with no preconditions/effects for action in _tmp_problem.actions: action.clear_preconditions() action.clear_effects() # ensure `preconditions:` and `effects:` sections in the PDDL file action.add_precondition(dummy_fluent) action.add_effect(dummy_fluent, True) # Remove problem goal to avoid tarski reachability issues _tmp_problem.clear_goals() tmp_domain_path = 'tmp_domain.pddl' tmp_problem_path = 'tmp_problem.pddl' PDDLWriter(_tmp_problem).write_domain(tmp_domain_path) PDDLWriter(_tmp_problem).write_problem(tmp_problem_path) reader = tarskiPDDLReader(raise_on_error=True) reader.parse_domain(tmp_domain_path) reader.parse_instance(tmp_problem_path) os.remove(tmp_domain_path) os.remove(tmp_problem_path) self._grounder = LPGroundingStrategy(reader.problem)
@cached_property # def ground_actions(self) -> List[ActionInstance]: def ground_actions(self) -> Dict[str, Any]: """ Return a list of all ground actions for the current environment. :return: ground actions list """ logging.debug("Grounding actions with tarski...") ground_actions = self._grounder.ground_actions() return ground_actions def _str_to_action(self, action_label: str) -> ActionInstance: """ Get UP problem ground action from the action label :param action_label: action label string :return: unified planning action instance """ action_split = action_label.strip()[1:-1].split() op_name = action_split[0] if len(action_split) > 1: obj_names = [o.strip() for o in action_split[1:]] else: obj_names = [] up_op = self.problem.action(op_name) up_objs = [self.problem.object(o) for o in obj_names] return ActionInstance(up_op, up_objs)
[docs] def apply(self, state, action): """ Return the state :math:`s'` reached after executing action :math:`a` in state :math:`s`. :param state: current state :math:`s` :param action: action :math:`a` to be executed :return: future state :math:`s'` """ if isinstance(action, str): action = self._str_to_action(action) if isinstance(state, Set) or isinstance(state, List): pos_literals = {l for l in state if not l.startswith('(not ')} prob_state_fluents = dict() for f in pos_literals: f_split = f[1:-1].split() f_name = f_split[0] f_objs = [] if len(f_split) > 1: f_objs = f_split[1:] prob_f = self.problem.fluent(f_name) prob_args = [self.problem.object(o) for o in f_objs] prob_state_fluents[prob_f(*prob_args)] = TRUE() state = self.all_neg_state.make_child(prob_state_fluents) try: next_state = self._simulator.apply(state, action) except UPInvalidActionError: next_state = None if next_state is None: return None literals = set() for l, v in next_state._values.items(): l_name = l.fluent().name l_objs = [str(o) for o in l.args] if len(l_objs) == 0: l_formatted = f"({l_name})" else: l_formatted = f"({l_name} {' '.join(l_objs)})" if v.is_true(): literals.add(l_formatted) else: literals.add(f"(not {l_formatted})") return literals
[docs] def applicable_actions(self, state) -> Dict[str, Set[str]]: # cached = self.cache_app_actions.get(frozenset(state), None) # if cached is not None: # return cached if isinstance(state, Set) or isinstance(state, List): pos_literals = {l for l in state if not l.startswith('(not ')} prob_state_fluents = dict() for f in pos_literals: f_split = f[1:-1].split() f_name = f_split[0] f_objs = [] if len(f_split) > 1: f_objs = f_split[1:] prob_f = self.problem.fluent(f_name) prob_args = [self.problem.object(o) for o in f_objs] prob_state_fluents[prob_f(*prob_args)] = TRUE() state = self.all_neg_state.make_child(prob_state_fluents) applicable_actions = defaultdict(set) for op_name, param_combos in self.ground_actions.items(): for args in param_combos: if self._simulator._is_applicable(state, self.problem.action(op_name), [self.problem.object(o.lower()) for o in args]): if len(args) > 0: action_label = f"({op_name} {' '.join(args)})" else: action_label = f"({op_name})" applicable_actions[op_name].add(action_label) # self.cache_app_actions[frozenset(state)] = applicable_actions return applicable_actions