Source code for amlgym.util.gen_states_predictability

# Add current project to sys path
import json
import math
import os
import sys
from collections import defaultdict

parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, parent_dir)

# from amlgym.util.gen_problems import *
from .gen_problems import *  # DO NOT REMOVE
from datetime import datetime
import yaml
import re
import contextlib
import logging
import random
import shutil
import numpy as np
import unified_planning
from alive_progress import alive_bar
from unified_planning.io import PDDLReader
from unified_planning.model import Problem, UPState
from unified_planning.plans import ActionInstance
from unified_planning.shortcuts import OneshotPlanner, SequentialSimulator

from amlgym.modeling.trajectory import Trajectory

from tarski.io import PDDLReader as tarskiPDDLReader
from tarski.grounding import LPGroundingStrategy


[docs]def replan(problem: Problem, current_state: UPState, action_instance: ActionInstance) -> any: """ Check action execution does not make the problem unsolvable by simulating the action and computing a new plan :param problem: solvable problem :param action: action to be executed :return: new plan (if any) """ # Update the problem initial state problem = problem.clone() for fluent in problem.initial_values: value = current_state.get_value(fluent) problem.set_initial_value(fluent, value) # Simulate action execution with SequentialSimulator(problem=problem) as simulator: current_state = simulator.apply(current_state, action_instance) # Update the problem state problem = problem.clone() for fluent in problem.initial_values: value = current_state.get_value(fluent) problem.set_initial_value(fluent, value) # Check a plan still exists logging.debug(f"Checking random action {action_instance} preserves solvability.") with contextlib.redirect_stdout(open(os.devnull, 'w')): with OneshotPlanner( problem_kind=problem.kind, **PLANNER_CFG, ) as planner: result = planner.solve(problem, timeout=MAX_REPLANNING_TIME) plan = result.plan return plan
[docs]def generate_traj( problem: Problem, randomness: float = 0.2) -> Trajectory: with SequentialSimulator(problem=problem) as simulator: current_state = simulator.get_initial_state() states = [current_state] # init trajectory states actions = [] # init trajectory actions plan = None # Ground actions with tarski since unified-planning (1.2.0) grounder is inefficient reader = tarskiPDDLReader(raise_on_error=True) reader.parse_domain(domain_file) reader.parse_instance(problem_path) grounder = LPGroundingStrategy(reader.problem) ground_actions = grounder.ground_actions() while len(states) < TRAJ_LEN_MAX: if plan is None: # Update the problem initial state logging.debug(f"Updating problem state") problem = problem.clone() for fluent in problem.initial_values: value = current_state.get_value(fluent) problem.set_initial_value(fluent, value) logging.debug("Computing a new plan...") with contextlib.redirect_stdout(open(os.devnull, 'w')): with OneshotPlanner( problem_kind=problem.kind, **PLANNER_CFG ) as planner: result = planner.solve(problem, timeout=MAX_PLANNING_TIME) plan = result.plan # Problem unsolvable if plan is None: if result.status.name == 'TIMEOUT': logging.debug(f"Planning timout reached ({MAX_PLANNING_TIME}s).") break elif result.status.name == 'UNSOLVABLE_INCOMPLETELY': logging.debug(f"Planning unsolvable.") break else: raise Exception(f"Planning exited with status: {result.status.name}") for action_instance in plan.actions: # Possibly execute a random action and replan if random.random() < randomness: logging.debug(f"Sampling a random action...") # applicable_actions = list(simulator.get_applicable_actions(current_state)) applicable_actions = [(problem.action(k.lower()), [problem.object(o.lower()) for o in objs]) for k, params in ground_actions.items() for objs in params if simulator._is_applicable(current_state, problem.action(k.lower()), [problem.object(o.lower()) for o in objs])] applicable_actions = sorted(applicable_actions, key=lambda x: f"{x[0]} - {x[1]}") # reproducibility action, params = random.choices(applicable_actions)[0] action_instance = ActionInstance(action, params) logging.debug(f"Random action sampled.") # Check random action does not make the problem unfeasible trial = 1 plan = replan(problem, current_state, action_instance) # while not check_feasibility(problem, current_state, action_instance): while plan is None: trial += 1 logging.debug(f"Random action {action_instance} makes the problem unsolvable." f" Newly sampling a random action.") # applicable_actions = list(simulator.get_applicable_actions(current_state)) applicable_actions = [(problem.action(k.lower()), [problem.object(o.lower()) for o in objs]) for k, params in ground_actions.items() for objs in params if simulator._is_applicable(current_state, problem.action(k.lower()), [problem.object(o.lower()) for o in objs])] applicable_actions = sorted(applicable_actions, key=lambda x: f"{x[0]} - {x[1]}") # reproducibility action, params = random.choices(applicable_actions)[0] action_instance = ActionInstance(action, params) plan = replan(problem, current_state, action_instance) if trial >= MAX_RANDOM_TRIALS and plan is None: break if trial >= MAX_RANDOM_TRIALS and plan is None: logging.debug(f"Maximum number of random action trials reached." f" Avoiding random action execution.") break logging.debug(f"Simulating random action {action_instance}.") current_state = simulator.apply(current_state, action_instance) states.append(current_state) actions.append(action_instance) break logging.debug(f"Simulating action {action_instance}.") current_state = simulator.apply(current_state, action_instance) actions.append(action_instance) if current_state is None: raise Exception(f"Error in applying: {action_instance}") states.append(current_state) if plan is not None and (len(plan.actions) == 0 or action_instance == plan.actions[-1]): logging.debug("A goal state has been reached.") break return Trajectory(states, actions)
if __name__ == '__main__': TRAJ_LEN_MIN = 5 TRAJ_LEN_MAX = 30 TRAJ_PER_DOMAIN = 100 OPTIMAL_TRACES = 1 # corresponds to 30% of optimal traces since every domain has 3 problem settings MAX_PLANNING_TIME = 600 MAX_REPLANNING_TIME = 60 # time to check problem feasibility MAX_RANDOM_TRIALS = 3 # maximum number of random action samplings at each step DOWNWARD_SEARCH_CFG = 'let(hff,ff(),let(hcea,cea(),lazy_greedy([hff,hcea],preferred=[hff,hcea])))' HEUR_PLANNER_CFG = { 'name': 'fast-downward', 'params': dict( fast_downward_search_config=DOWNWARD_SEARCH_CFG, fast_downward_search_time_limit=f"{MAX_PLANNING_TIME}s" )} OPT_PLANNER_CFG = { 'name': 'fast-downward-opt', } PLANNER_CFG = OPT_PLANNER_CFG logging.basicConfig( filename='out.log', level=logging.DEBUG ) GEN_DIR = "pddl-generators" BENCHMARK_DIR = "benchmarks" DOMAINS_DIR = "domains" PROB_DIR = "problems/predictive_power" STATES_DIR = "states/predictive_power" DOM_CFG = f"{BENCHMARK_DIR}/problems_predictive_power.yaml" # Trace CPU time run_start = datetime.now() # Instantiate a PDDL problem reader reader = PDDLReader() # Disable printing of planning engine credits to avoid overloading stdout unified_planning.shortcuts.get_environment().credits_stream = None # Prevents from retrieving all literals UPState.MAX_ANCESTORS = None # Read domain configs with open(f"../{DOM_CFG}") as f: cfg = yaml.safe_load(f) seed = cfg['SEED'] domains = cfg['domains'] to_be_avoided = [] domains = {k: v for k, v in domains.items() if k not in to_be_avoided} for domain in domains: # Set random seed for reproducibility np.random.seed(seed) random.seed(seed) # Trace CPU time domain_run_start = datetime.now() # Clean domain problems directory if os.path.exists(f"../{BENCHMARK_DIR}/{PROB_DIR}/{domain}"): shutil.rmtree(f"../{BENCHMARK_DIR}/{PROB_DIR}/{domain}") os.makedirs(os.path.join(f"../{BENCHMARK_DIR}/{PROB_DIR}/{domain}")) # Clean domain states directory if os.path.exists(f"../{BENCHMARK_DIR}/{STATES_DIR}/{domain}"): shutil.rmtree(f"../{BENCHMARK_DIR}/{STATES_DIR}/{domain}") os.makedirs(os.path.join(f"../{BENCHMARK_DIR}/{STATES_DIR}/{domain}")) tot_runs = math.ceil(TRAJ_PER_DOMAIN / len(domains[domain])) states_set = defaultdict(dict) with alive_bar(len(domains[domain] * tot_runs), title=f'Processing domain {domain}', length=20, bar='halloween') as bar: for run in range(tot_runs): if run >= TRAJ_PER_DOMAIN: break # For every domain problem kwargs for i, kwargs in enumerate(domains[domain]): if i >= OPTIMAL_TRACES: PLANNER_CFG = HEUR_PLANNER_CFG else: PLANNER_CFG = OPT_PLANNER_CFG trajectory = Trajectory([], []) while len(trajectory.states) < TRAJ_LEN_MIN: # Generate a problem logging.debug(f"Generating a new problem") kwargs['seed'] = np.random.randint(1, 1000) generate_prob = getattr(sys.modules[__name__], f'problem_{domain}') problem_str = generate_prob(**kwargs) # Fix hyphens to avoid issues with unified-planning parsing problem_str = re.sub(r'(?<=\w)-(?=\w)', '_', problem_str) # Write the problem string to a file problems_dir = f"../{BENCHMARK_DIR}/{PROB_DIR}/{domain}" problem_file = f"{len(os.listdir(problems_dir))}_{domain}_prob.pddl" problem_path = f'{problems_dir}/{problem_file}' with open(problem_path, 'w') as f: f.write(problem_str.lower()) # Parse the problem in unified-planning domain_file = f'../{BENCHMARK_DIR}/{DOMAINS_DIR}/{domain}.pddl' problem = reader.parse_problem(domain_file, problem_path) # Generate a trace by solving the problem try: trajectory = generate_traj(problem) except: logging.debug(f"Generated problem is not feasible. Retrying...") os.remove(problem_path) continue if len(trajectory.states) < TRAJ_LEN_MIN: logging.debug(f"Failed to generate a sufficiently long trace. Retrying...") os.remove(problem_path) else: logging.debug(f"Trajectory generated successfully. Getting states and applicable actions...") states_literals = [{str(l) if v.is_true() else f"not++{str(l)}" for l, v in state._values.items()} for state in trajectory.states] states_literals_formatted = [] for state_literals in states_literals: state_literals_formatted = [] for l in state_literals: l_name = l.split('(')[0] l_objs = [] if '(' in l: l_objs = [o.strip() for o in l.strip().split('(')[1][:-1].split(',') if len(l.strip().split('(')) > 1 and o.strip() != ''] if len(l_objs) > 0: l_formatted = f"({l_name} {' '.join(l_objs)})" else: l_formatted = f"({l_name})" if "not++" in l_formatted: l_formatted = l_formatted.replace("not++", "not (") + ')' state_literals_formatted.append(l_formatted) states_literals_formatted.append(state_literals_formatted) # Ground actions with tarski since unified-planning (1.2.0) grounder is inefficient tarski_reader = tarskiPDDLReader(raise_on_error=True) tarski_reader.parse_domain(domain_file) tarski_reader.parse_instance(problem_path) grounder = LPGroundingStrategy(tarski_reader.problem) ground_actions = grounder.ground_actions() sim = SequentialSimulator(problem) applicable_actions = [] for state in trajectory.states: applicable_in_state = {op_name: list({objs for objs in all_objs if sim._is_applicable(state, problem.action(op_name), [problem.object(o.lower()) for o in objs])}) for op_name, all_objs in ground_actions.items() } applicable_actions.append(applicable_in_state) # from unified_planning.engines.compilers import Grounder # gc = Grounder() # res = gc.compile(problem) # ground_actions = res.problem.actions # # applicable_actions = [] # for state in states: # applicable_in_state = [a for a in ground_actions # if {str(p) for p in a.preconditions}.issubset(state)] # applicable_actions.append(applicable_in_state) for k, state in enumerate(states_literals_formatted): states_set[problem_file][len(states_set[problem_file])] = { 'fluents': list(state), 'applicable_actions': applicable_actions[k], } # trace_file = f'../{BENCHMARK_DIR}/{STATES_DIR}/{domain}/{len(os.listdir(f"../{BENCHMARK_DIR}/{STATES_DIR}/{domain}"))}_{domain}_traj' # trajectory.write(trace_file) bar() # update progress bar # Store test set of states and applicable actions with open(f'../{BENCHMARK_DIR}/{STATES_DIR}/{domain}/test_states.json', 'w') as f: json.dump(states_set, f, indent=4) logging.info(f'{domain} CPU time (s): {(datetime.now() - domain_run_start).seconds}') logging.info(f'Total CPU time (s): {(datetime.now() - run_start).seconds}')