Source code for amlgym.algorithms.ROSAME

import re
import os

from amlgym.algorithms.PassiveAlgorithmAdapter import PassiveAlgorithmAdapter
from typing import List, ClassVar, OrderedDict
import shutil
from pathlib import Path
from pddl_plus_parser.lisp_parsers import DomainParser, TrajectoryParser
from pddl_plus_parser.lisp_parsers import ProblemParser

from amlgym.algorithms.rosame.experiment_runner.rosame_runner import Rosame_Runner


[docs]class ROSAME(PassiveAlgorithmAdapter): """ Adapter class for running an *unofficial* implementation of the ROSAME algorithm: "Neuro-Symbolic Learning of Lifted Action Models from Visual Traces", Kai Xi, Stephen Gould, Sylvie Thiebaux, Proceedings of the Thirty-Fourth International Conference on Automated Planning and Scheduling, 2024. https://ojs.aaai.org/index.php/ICAPS/article/download/31528/33688 Example: .. code-block:: python from amlgym.algorithms import get_algorithm rosame = get_algorithm('ROSAME') model = rosame.learn('path/to/domain.pddl', ['path/to/trace0', 'path/to/trace1']) print(model) """ _reference: ClassVar[OrderedDict[str, str]] = { 'Authors': "K. Xi, S. Gould, and S. Thiebaux", 'Title': "Neuro-Symbolic Learning of Lifted Action Models from Visual Traces", 'Venue': "International Conference on Automated Planning and Scheduling", 'Year': 2024, 'URL': "https://ojs.aaai.org/index.php/ICAPS/article/download/31528/33688" }
[docs] def __init__(self, **kwargs): super(ROSAME, self).__init__(**kwargs)
[docs] def learn(self, domain_path: str, trajectory_paths: List[str], use_problems: bool = True) -> str: """ Learns a PDDL action model from: (i) a (possibly empty) input model which is required to specify the predicates and operators signature; (ii) a list of trajectory file paths. :parameter domain_path: input PDDL domain file path :parameter trajectory_paths: list of trajectory file paths :parameter use_problems: boolean flag indicating whether to provide the set of objects specified in the problem from which the trajectories have been generated :return: a string representing the learned PDDL model """ # Format input trajectories os.makedirs('tmp', exist_ok=True) filled_traj_paths = [] for i, traj_path in enumerate(trajectory_paths): filled_traj = self._preprocess_trace(traj_path) filled_traj_paths.append(f"tmp/{i}_traj_filled") with open(f"tmp/{i}_traj_filled", "w") as f: f.write(filled_traj) # Instantiate ROSAME algorithm partial_domain = DomainParser(Path(domain_path), partial_parsing=True).parse_domain() rosame = Rosame_Runner(domain_path) # Parse input trajectories TODO: TO BE REMOVED (if not ...) if not use_problems: allowed_observations = [TrajectoryParser(partial_domain).parse_trajectory(traj_path) for traj_path in sorted(filled_traj_paths, key=lambda x: int(x.split('/')[-1].split('_')[0]))] else: # allowed_observations = [] for k, traj_path in enumerate(sorted(filled_traj_paths, key=lambda x: int(x.split('/')[-1].split('_')[0]))): problem_path = trajectory_paths[k].replace('trajectories', 'problems').replace('_traj', '_prob.pddl') problem = ProblemParser(Path(problem_path), partial_domain).parse_problem() rosame.add_problem(problem) # Learn the observation observation = TrajectoryParser(partial_domain, problem).parse_trajectory(traj_path) rosame.ground_new_trajectory() rosame.learn_rosame(observation) # allowed_observations.append(TrajectoryParser(partial_domain, problem).parse_trajectory(traj_path)) # Remove temporary files shutil.rmtree('tmp') return rosame.rosame_to_pddl()
def _preprocess_trace(self, traj_path: str) -> str: """ Format the trajectory to make it compliant with the algorithm, by replacing initial state and action keywords. :parameter traj_path: path to the trajectory file :return: a string representing the formatted trajectory """ with open(traj_path, 'r') as f: traj_str = f.read() # Format extra spaces traj_str = re.sub(r' +', ' ', traj_str) # Remove initial 'observation' keyword traj_str = traj_str.replace('(:trajectory', '(') # Rename `action` into `operator` traj_str = traj_str.replace('(:action ', '(operator: ') # Rename the first state into `init` traj_str = traj_str.replace('(:state ', '(:init ', 1) return traj_str