Source code for amlgym.metrics._predictive

r"""
Predictive power metrics assess a domain model ability of predicting actions
applicability and outcomes of executing an action in a state.
The *predicted applicability* evaluates the model ability of predicting whether 
and action is applicable in an environment state.
The *predicted effects* evaluates the model ability of predicting the state reached 
after executing an action in an environment state.
Predictive power metrics are defined with respect to a set of (test) states and a simulator, 
which is required to evaluate if an action is applicable and what is the state reached 
after executing the action in a given (simulated) environment state.
"""
import logging
import warnings
from collections import defaultdict
from contextlib import nullcontext

import unified_planning.model
from alive_progress import alive_bar
from typing import Dict, Sequence
import numpy as np
from unified_planning.shortcuts import SequentialSimulator

from amlgym.modeling.env import Env, StateType, ActionType

# Disable printing of planning engine credits to avoid overloading stdout
unified_planning.shortcuts.get_environment().credits_stream = None


[docs]def applicability(simulator: Env | Sequence[Env], simulator_ref: Env | Sequence[Env], test_states: Sequence[StateType] | Sequence[Sequence[StateType]], applicable_actions: Sequence[ActionType] = None, show_progress: bool = True) -> Dict[str, Dict[str, float]]: r""" Evaluate the predicted applicability metric given the simulator of a domain model :math:`M` and an environment simulator :math:`E`. The model :math:`M` and environment simulators share the set :math:`S` of states and the set :math:`A` of actions. For an action :math:`a\in A`, and state :math:`s\in S`, we denote by :math:`app_M(a,S_{test})` and :math:`app(a,S_{test})` the set of states in :math:`S_{test}` in which :math:`a` is applicable according to :math:`M` and :math:`E`, respectively. We define the predicted applicability metric for every action :math:`a\in A` as: * True Positives: :math:`TP_{app}(a)=|app_M(a,S_{test})\cap app(a,S_{test})|` * False Positives: :math:`FP_{app}(a)=|app_M(a,S_{test})\setminus app(a,S_{test})|` * False Negatives: :math:`FN_{app}(a)=|app(a,S_{test}) \setminus app_M(a,S_{test})|` The predicted applicability precision and recall per action are obtained as: * Predicted applicability precision of :math:`a`: :math:`P_{app}(a) = \frac{TP_{app}(a)}{TP_{app}(a)+FP_{app}(a)}` * Predicted applicability recall of :math:`a`: :math:`R_{app}(a) = \frac{TP_{app}(a)}{TP_{app}(a)+FN_{app}(a)}` When :math:`TP_{app}(a) = FP_{app}(a) = 0`, we define :math:`P_{app}(a)=1` and :math:`R_{app}(a)=0`, as the domain model :math:`M` never allows :math:`a` to be applied in :math:`S_{test}`. Finally, the mean precision and recall of a domain model averages over the actions precision and recall: * **Predicted applicability precision** of :math:`M`: :math:`P = \frac{1}{|A|}\sum_{a\in A} P(a)` * **Predicted applicability recall** of :math:`M`: :math:`R = \frac{1}{|A|}\sum_{a\in A} R(a)` :param simulator: simulator of a domain model :math:`M` to be evaluated. :param simulator_env: environment simulator :math:`E` to compare with. :param test_states: set :math:`S_{test}` of test states. :param applicable_actions: optionally precomputed set of applicable actions for every test state. :param show_progress: show a progress bar. :return: the predicted applicability precision and recall averaged over all test states and actions. """ if isinstance(simulator, Sequence): assert isinstance(simulator_ref, Sequence), ('The applicability evaluation for a sequence of ' 'learned simulators requires a corresponding sequence ' 'of reference simulators: a reference simulator to compare with ' 'for each learned simulator.') assert len(simulator_ref) == len(simulator), ("The number of learned and reference simulators " "must be equal. Current number of learned simulators " f"is {len(simulator)} and reference simulators " f"is {len(simulator_ref)}.") assert len(simulator_ref) == len(test_states), ("A sequence of states to be evaluated must be provided " "for every pair (learned, reference) simulator. " f"The current sequence of states has size {len(test_states)}, " f"while the number of (learned, reference) simulator " f"pairs is {len(simulator_ref)}.") simulator_ref_list = simulator_ref simulator_learned_list = simulator test_states_list = test_states else: simulator_ref_list = [simulator_ref] simulator_learned_list = [simulator] test_states_list = [test_states] tp = defaultdict(int) fp = defaultdict(int) fn = defaultdict(int) precision = defaultdict(float) recall = defaultdict(float) bar = alive_bar(len(test_states_list), title=f'Evaluating actions applicability...', length=20) if show_progress else nullcontext() with bar as bar: for k, (simulator, simulator_ref, states) in enumerate(zip(simulator_learned_list, simulator_ref_list, test_states_list)): applicable_in_states = None if applicable_actions is not None and k < len(applicable_actions): # applicable_actions is a possibly precomputed list of applicable # actions for every k-th state in the test set applicable_in_states = applicable_actions[k] for j, s in enumerate(states): s = set(s) # TODO: use sets in yaml rather than JSON # parse the possibly precomputed list of applicable action for the k-th state if applicable_in_states is not None: applicable_ref = applicable_in_states[j] else: # if no precomputed list of applicable actions is given, compute them applicable_ref = simulator_ref.applicable_actions(s) applicable_learned = simulator.applicable_actions(s) operators = set(list(applicable_learned.keys()) + list(applicable_ref.keys())) for op in operators: tp[op] += len(applicable_ref[op] & applicable_learned[op]) fp[op] += len(applicable_learned[op] - applicable_ref[op]) fn[op] += len(applicable_ref[op] - applicable_learned[op]) if show_progress: bar() operators = set(tp.keys()) | set(fp.keys()) | set(fn.keys()) for op in operators: if (tp[op] + fp[op]) == 0: warnings.warn(f"No true and false positives for operator {op}, " f"predicted applicability precision set to 1.", stacklevel=2) precision[op] = 1. else: precision[op] = tp[op] / (tp[op] + fp[op]) if (tp[op] + fn[op]) == 0: warnings.warn(f"No true positives and false negatives for operator {op}, " f"predicted applicability recall set to 1.", stacklevel=2) recall[op] = 1. else: recall[op] = tp[op] / (tp[op] + fn[op]) return { 'mean_precision': np.mean(list(precision.values())), 'mean_recall': np.mean(list(recall.values())), }
[docs]def predicted_effects(simulator: Env | Sequence[Env], simulator_env: Env | Sequence[Env], test_states: Sequence[StateType] | Sequence[Sequence[StateType]], applicable_actions: Sequence[ActionType] = None, show_progress: bool = True) -> Dict[str, Dict[str, float]]: r""" Evaluate the predicted effects metric given the simulator of a domain model :math:`M` and an environment simulator :math:`E`. The model :math:`M` and environment simulators share the set :math:`S` of states and the set :math:`A` of actions; the evaluation considers actions applicable in a state for both :math:`M` and :math:`E`. For an action :math:`a\in A`, and state :math:`s\in S`, we denote by :math:`a_{M}(s)` and :math:`a(s)` the state resulting from applying :math:`a` in :math:`s` according to :math:`M` and :math:`E`, respectively. We define the predicted effect metrics for every state :math:`s \in S_{test}` and action :math:`a\in A` as: * True Positives: :math:`TP_{eff}(s,a)=|(a_M(s)\setminus s)\cap (a(s)\setminus s)|` * False Positives: :math:`FP_{eff}(s,a)=|(a_M(s)\setminus s)\setminus a(s)|` * False Negatives: :math:`FN_{eff}(s,a)=|(a_M(s)\cap s)\setminus a(s)|` The predicted effects mean precision and recall per action are obtained by averaging over all states in :math:`S_{test}`, i.e.: * True Positives: :math:`TP_{eff}(a)=\sum\limits_{s\in S_{test}}TP_{eff}(s,a)` * False Positives: :math:`FP_{eff}(a)=\sum\limits_{s\in S_{test}}TP_{eff}(s,a)` * False Negatives: :math:`FN_{eff}(a)=\sum\limits_{s\in S_{test}}FN_{eff}(s,a)` * Predicted effects precision of :math:`a` : :math:`P_{eff}(a) = \frac{TP_{eff}(a)}{TP_{eff}(a)+FP_{eff}(a)}` * Predicted effects recall of :math:`a` : :math:`R_{eff}(a) = \frac{TP_{eff}(a)}{TP_{eff}(a)+FN_{eff}(a)}` When :math:`TP_{eff}(a) = FP_{eff}(a) = 0`, we define :math:`P_{eff}(a)=1` and :math:`R_{eff}(a)=0`. Finally, the mean precision and recall of a domain model averages over the actions precision and recall: * **Predicted effects precision** of :math:`M`: :math:`P = \frac{1}{|A|}\sum_{a\in A} P(a)` * **Predicted effects recall** of :math:`M`: :math:`R = \frac{1}{|A|}\sum_{a\in A} R(a)` :param simulator: simulator of a domain model :math:`M` to be evaluated. :param simulator_env: environment simulator :math:`E` to compare with. :param test_states: set :math:`S_{test}` of test states. :param applicable_actions: optionally precomputed set of applicable actions for every test state. :param show_progress: show a progress bar. :return: the predicted effects mean precision and recall """ if isinstance(simulator, Sequence): assert isinstance(simulator_env, Sequence), ('The applicability evaluation for a sequence of ' 'learned simulators requires a corresponding sequence ' 'of reference simulators: a reference simulator to compare with ' 'for each learned simulator.') assert len(simulator_env) == len(simulator), ("The number of learned and reference simulators " "must be equal. Current number of learned simulators " f"is {len(simulator)} and reference simulators " f"is {len(simulator_env)}.") assert len(simulator_env) == len(test_states), ("A sequence of states to be evaluated must be provided " "for every pair (learned, reference) simulator. " f"The current sequence of states has size {len(test_states)}, " f"while the number of (learned, reference) simulator " f"pairs is {len(simulator_env)}.") simulator_ref_list = simulator_env simulator_learned_list = simulator test_states_list = test_states else: simulator_ref_list = [simulator_env] simulator_learned_list = [simulator] test_states_list = [test_states] tp = defaultdict(int) fp = defaultdict(int) fn = defaultdict(int) precision = defaultdict(float) recall = defaultdict(float) bar = alive_bar(len(test_states_list), title=f'Evaluating actions applicability...', length=20) if show_progress else nullcontext() with bar as bar: for k, (simulator, simulator_env, states) in enumerate(zip(simulator_learned_list, simulator_ref_list, test_states_list)): applicable_in_states = None if applicable_actions is not None and k < len(applicable_actions): # applicable_actions is a possibly precomputed list of applicable # actions for every k-th state in the test set applicable_in_states = applicable_actions[k] for j, s in enumerate(states): s = set(s) # TODO: use sets in yaml rather than JSON # parse the possibly precomputed list of applicable action for the k-th state if applicable_in_states is not None: applicable_ref = applicable_in_states[j] else: # if no precomputed list of applicable actions is given, compute them applicable_ref = simulator_env.applicable_actions(s) applicable_learned = simulator.applicable_actions(s) operators = set(list(applicable_learned.keys()) + list(applicable_ref.keys())) # NOTE: consider only actions that are applicable in a given state # according to both the evaluated and reference simulators for op in operators: for action_label in applicable_ref[op] & applicable_learned[op]: snext_learned = simulator.apply(s, action_label) snext_ref = simulator_env.apply(s, action_label) tp[op] += len((snext_learned - s) .intersection((snext_ref - s))) fp[op] += len((snext_learned - s) - snext_ref) fn[op] += len((snext_learned.intersection(s)) - snext_ref) if show_progress: bar() operators = set(tp.keys()) | set(fp.keys()) | set(fn.keys()) for op in operators: if (tp[op] + fp[op]) == 0: warnings.warn(f"No true and false positives for operator {op}, " f"predicted effects precision set to 1.", stacklevel=2) precision[op] = 1. else: precision[op] = tp[op] / (tp[op] + fp[op]) if (tp[op] + fn[op]) == 0: warnings.warn(f"No true positives and false negatives for operator {op}, " f"predicted effects recall set to 1.", stacklevel=2) recall[op] = 1. else: recall[op] = tp[op] / (tp[op] + fn[op]) return { 'mean_precision': np.mean(list(precision.values())), 'mean_recall': np.mean(list(recall.values())), }
[docs]def predictive_power(simulator_learned: Env | Sequence[Env], simulator_ref: Env | Sequence[Env], test_states: Sequence[StateType] | Sequence[Sequence[StateType]], applicable_actions: Sequence[ActionType] = None, show_progress: bool = True) -> Dict[str, Dict[str, Dict[str, float]]]: """ Evaluate both the predicted applicability and predicted effects metrics of a simulated domain model :math:`M` with respect to an environment simulator :math:`E` against a test set of states :math:`S_{test}`. The results can be reproduced by separately executing functions :func:`~amlgym.metrics._predictive.predicted_effects` and :func:`~amlgym.metrics._predictive.applicability`; this function performs a joint and more efficient evaluation of both metrics. :param simulator: simulator of a domain model :math:`M` to be evaluated. :param simulator_env: environment simulator :math:`E` to compare with. :param test_states: set :math:`S_{test}` of test states. :param applicable_actions: optionally precomputed set of applicable actions for every test state. :param show_progress: show a progress bar. :return: the mean precision and recall for both predicted effects and predicted applicability """ if isinstance(simulator_learned, Sequence): assert isinstance(simulator_ref, Sequence), ('The applicability evaluation for a sequence of ' 'learned simulators requires a corresponding sequence ' 'of reference simulators: a reference simulator to compare with ' 'for each learned simulator.') assert len(simulator_ref) == len(simulator_learned), ("The number of learned and reference simulators " "must be equal. Current number of learned simulators " f"is {len(simulator_learned)} and reference simulators " f"is {len(simulator_ref)}.") assert len(simulator_ref) == len(test_states), ("A sequence of states to be evaluated must be provided " "for every pair (learned, reference) simulator. " f"The current sequence of states has size {len(test_states)}, " f"while the number of (learned, reference) simulator " f"pairs is {len(simulator_ref)}.") simulator_ref_list = simulator_ref simulator_learned_list = simulator_learned test_states_list = test_states else: simulator_ref_list = [simulator_ref] simulator_learned_list = [simulator_learned] test_states_list = [test_states] predeffs_tp = defaultdict(int) predeffs_fp = defaultdict(int) predeffs_fn = defaultdict(int) predeffs_precision = defaultdict(float) predeffs_recall = defaultdict(float) app_tp = defaultdict(int) app_fp = defaultdict(int) app_fn = defaultdict(int) app_precision = defaultdict(float) app_recall = defaultdict(float) bar = alive_bar(len(test_states_list), title=f'Evaluating predictive power...', length=20) if show_progress else nullcontext() with bar as bar: for k, (simulator_learned, simulator_ref, states) in enumerate(zip(simulator_learned_list, simulator_ref_list, test_states_list)): applicable_in_states = None if applicable_actions is not None and k < len(applicable_actions): # applicable_actions is a possibly precomputed list of applicable # actions for every k-th states list in the test set applicable_in_states = applicable_actions[k] for j, s in enumerate(states): s = set(s) # TODO: use sets in yaml rather than JSON # parse the possibly precomputed list of applicable action for the j-th state # in the k-th states list if applicable_in_states is not None: applicable_ref = applicable_in_states[j] else: # if no precomputed list of applicable actions is given, compute them applicable_ref = simulator_ref.applicable_actions(s) applicable_learned = simulator_learned.applicable_actions(s) operators = set(list(applicable_learned.keys()) + list(applicable_ref.keys())) # Evaluate action applicability and predicted effects for op in operators: # Predicted effects for action_label in applicable_ref[op] & applicable_learned[op]: snext_learned = simulator_learned.apply(s, action_label) snext_ref = simulator_ref.apply(s, action_label) predeffs_tp[op] += len((snext_learned - s) & (snext_ref - s)) predeffs_fp[op] += len((snext_learned - s) - snext_ref) predeffs_fn[op] += len((snext_learned & s) - snext_ref) pass # Action applicability app_tp[op] += len(applicable_ref[op] & applicable_learned[op]) app_fp[op] += len(applicable_learned[op] - applicable_ref[op]) app_fn[op] += len(applicable_ref[op] - applicable_learned[op]) if show_progress: bar() operators = set(predeffs_tp.keys()) | set(predeffs_fp.keys()) | set(predeffs_fn.keys()) for op in operators: if (app_tp[op] + app_fp[op]) == 0: warnings.warn(f"No true and false positives for operator {op}, " f"predicted applicability precision set to 1.", stacklevel=2) app_precision[op] = 1. else: app_precision[op] = app_tp[op] / (app_tp[op] + app_fp[op]) if (app_tp[op] + app_fn[op]) == 0: warnings.warn(f"No true positives and false negatives for operator {op}, " f"predicted applicability recall set to 1.", stacklevel=2) app_recall[op] = 1. else: app_recall[op] = app_tp[op] / (app_tp[op] + app_fn[op]) if (predeffs_tp[op] + predeffs_fp[op]) == 0: warnings.warn(f"No true and false positives for operator {op}, " f"predicted effects precision set to 1.", stacklevel=2) predeffs_precision[op] = 1. else: predeffs_precision[op] = predeffs_tp[op] / (predeffs_tp[op] + predeffs_fp[op]) if (predeffs_tp[op] + predeffs_fn[op]) == 0: warnings.warn(f"No true positives and false negatives for operator {op}, " f"predicted effects recall set to 1.", stacklevel=2) predeffs_recall[op] = 1. else: predeffs_recall[op] = predeffs_tp[op] / (predeffs_tp[op] + predeffs_fn[op]) return { 'applicability': { 'mean_precision': np.mean(list(app_precision.values())), 'mean_recall': np.mean(list(app_recall.values())), }, 'predicted_effects': { 'mean_precision': np.mean(list(predeffs_precision.values())), 'mean_recall': np.mean(list(predeffs_recall.values())), } }