Source code for eval_framework.metrics.completion.json_format

import json
from collections.abc import Mapping
from typing import Any

import jsonschema  # type: ignore
from pydantic import BaseModel

from eval_framework.metrics.base import BaseMetric, MetricResult
from eval_framework.shared.types import Completion


[docs] class JsonFormatEvaluation(BaseModel): is_just_json: bool = False is_valid_json: bool = False fulfills_schema: bool | None = None exact_match: bool | None = None json_parsing_error: str | None = None schema_validation_error: str | None = None
[docs] class JsonFormat(BaseMetric[Completion]): NAME = "JSON Format"
[docs] def calculate(self, response: Completion) -> list[MetricResult]: keys = [ "is_just_json", "is_valid_json", "fulfills_schema", "exact_match", ] if response.error is not None: return [ MetricResult(metric_name=f"{self.NAME}/{k}", value=None, higher_is_better=True, error=response.error) for k in keys ] if response.completion == "": return [ MetricResult(metric_name=f"{self.NAME}/{k}", value=0.0, higher_is_better=True, error=response.error) for k in keys ] json_dict, grading = self._extract_and_parse_json(response.completion) ground_truth_dict = json.loads(str(response.ground_truth)) schema = ground_truth_dict["json_schema"] expected_object = ground_truth_dict.get("expected_output", None) if schema and json_dict is None: grading.fulfills_schema = False if schema and json_dict is not None: grading = self._validate_json_against_schema(json_dict, schema, grading) if expected_object is not None and json_dict is not None: grading.exact_match = json_dict == expected_object results = [] for key in keys: result = MetricResult( metric_name=f"{self.NAME}/{key}", value=float(getattr(grading, key)) if getattr(grading, key) is not None else None, higher_is_better=True, error=response.error, code_execution_trace=(grading.json_parsing_error or "") + (grading.schema_validation_error or ""), ) results.append(result) return results
@staticmethod def _validate_json_against_schema( json_obj: object, schema: Mapping[str, Any], evaluation_result: JsonFormatEvaluation ) -> JsonFormatEvaluation: evaluation_result = evaluation_result.model_copy(deep=True) try: jsonschema.validate(json_obj, schema) evaluation_result.fulfills_schema = True except jsonschema.exceptions.ValidationError as e: evaluation_result.fulfills_schema = False evaluation_result.schema_validation_error = type(e).__name__ except jsonschema.exceptions.SchemaError as e: evaluation_result.schema_validation_error = type(e).__name__ return evaluation_result @staticmethod def _extract_and_parse_json(completion: str) -> tuple[object, JsonFormatEvaluation]: evaluation_result = JsonFormatEvaluation() json_dict = None try: json_dict = json.loads(remove_comments(completion.strip("`"))) evaluation_result.is_just_json = True evaluation_result.is_valid_json = True except Exception as _: try: json_string = remove_comments(get_json_object(completion)) json_dict = json.loads(json_string) evaluation_result.is_valid_json = True except Exception as e: evaluation_result.json_parsing_error = type(e).__name__ return json_dict, evaluation_result
[docs] def get_json_object(text: str) -> str: """ Extract the first valid JSON object or array from text. This function handles nested brackets properly by using a bracket counting approach to find complete JSON structures, rather than using regex which can incorrectly match outer brackets containing non-JSON content. """ def find_json_at_position(text: str, start_pos: int, open_char: str, close_char: str) -> str | None: """Find a complete JSON object/array starting at the given position.""" if start_pos >= len(text) or text[start_pos] != open_char: return None bracket_count = 0 in_string = False escaped = False for i in range(start_pos, len(text)): char = text[i] if escaped: escaped = False continue if char == "\\" and in_string: escaped = True continue if char == '"' and not escaped: in_string = not in_string continue if not in_string: if char == open_char: bracket_count += 1 elif char == close_char: bracket_count -= 1 if bracket_count == 0: # Found complete JSON structure candidate = text[start_pos : i + 1] # Test if it's valid JSON try: json.loads(candidate) return candidate except json.JSONDecodeError: return None return None # Look for JSON objects {} and arrays [] json_candidates = [] # Search for objects starting with { for i in range(len(text)): if text[i] == "{": candidate = find_json_at_position(text, i, "{", "}") if candidate: json_candidates.append(candidate) # Search for arrays starting with [ for i in range(len(text)): if text[i] == "[": candidate = find_json_at_position(text, i, "[", "]") if candidate: json_candidates.append(candidate) if not json_candidates: raise RuntimeError(f"No valid JSON object found in {text}.") # Return the longest valid JSON (most likely to be the main content) return max(json_candidates, key=len)
[docs] def remove_comments(text: str, comment_indicator: str = "//") -> str: lines = text.splitlines() lines = [line.split(comment_indicator)[0] for line in lines] return "\n".join([line for line in lines if line.strip()])