Source code for eval_framework.metrics.completion.csv_format
import json
from pydantic import BaseModel
from eval_framework.metrics.base import BaseMetric, MetricResult
from eval_framework.shared.types import Completion
SEPARATOR_MAP = {"comma": ",", "semicolon": ";", "space": " ", "tab": "\t"}
[docs]
class CSVFormatEvaluation(BaseModel):
implicit: bool = False
has_csv: bool = False
is_separator_respected: bool = False
is_column_count_respected: bool = False
[docs]
class CSVFormat(BaseMetric[Completion]):
NAME = "CSV Format"
KEYS = ["has_csv", "is_separator_respected", "is_column_count_respected"]
[docs]
def calculate(self, response: Completion) -> list[MetricResult]:
if response.error is not None:
return [
MetricResult(metric_name=f"{self.NAME}/{k}", value=None, higher_is_better=True, error=response.error)
for k in self.KEYS
]
if response.completion == "":
return [
MetricResult(metric_name=f"{self.NAME}/{k}", value=0.0, higher_is_better=True, error=response.error)
for k in self.KEYS
]
grading = evaluate_csv_format(response)
results = []
for key in self.KEYS:
result = MetricResult(
metric_name=f"{self.NAME}/{key}",
value=float(getattr(grading, key)),
higher_is_better=True,
error=response.error,
)
results.append(result)
return results
[docs]
def extract_csv_from_text(text: str, min_rows: int = 2, min_columns: int = 2) -> tuple[list[str] | None, str | None]:
lines = text.split("\n")
delimiters = set(SEPARATOR_MAP.values())
best_delimiter = None
csv_lines: list[str] = []
# Iterate over lines to find potential delimiters and consistent substring counts
for i, line in enumerate(lines):
for delimiter in delimiters:
substrings = line.split(delimiter)
if len(substrings) < min_columns:
continue
current_csv_lines = [line]
for j in range(i + 1, len(lines)):
next_line = lines[j]
next_substrings = next_line.split(delimiter)
if len(next_substrings) != len(substrings):
break
current_csv_lines.append(next_line)
if len(current_csv_lines) >= min_rows and len(current_csv_lines) > len(csv_lines):
best_delimiter = delimiter
csv_lines = current_csv_lines
if not csv_lines:
return None, None
return csv_lines, best_delimiter
[docs]
def evaluate_csv_format(response: Completion) -> CSVFormatEvaluation:
expected_output = json.loads(str(response.ground_truth))
expected_separator_code = expected_output["separator"]
csv_lines, separator = extract_csv_from_text(response.completion)
if not csv_lines:
return CSVFormatEvaluation(has_csv=False, implicit=not expected_separator_code)
csv_format_evaluation = CSVFormatEvaluation(has_csv=True, implicit=not expected_separator_code)
if not expected_separator_code:
csv_format_evaluation.is_separator_respected = separator in SEPARATOR_MAP.values()
else:
csv_format_evaluation.is_separator_respected = separator == SEPARATOR_MAP.get(expected_separator_code)
expected_column_count = len(expected_output["columns"])
column_counts = [len(csv_lines.split(separator)) for csv_lines in csv_lines]
csv_format_evaluation.is_column_count_respected = all(
column_count == expected_column_count for column_count in column_counts
)
return csv_format_evaluation