Source code for eval_framework.llm.openai

import concurrent.futures
import logging
import math
import os
import traceback
from collections.abc import Callable, Sequence
from functools import partial

import tiktoken
from openai import OpenAI
from openai.types.chat import (
    ChatCompletionAssistantMessageParam,
    ChatCompletionMessageParam,
    ChatCompletionSystemMessageParam,
    ChatCompletionUserMessageParam,
)
from tokenizers import Tokenizer
from transformers import AutoTokenizer

from eval_framework.llm.base import BaseLLM
from eval_framework.shared.types import ConcatCompression, Error, RawCompletion, RawLoglikelihood
from eval_framework.tasks.base import Sample
from template_formatting.formatter import BaseFormatter, ConcatFormatter, HFFormatter, Message, Role

logger = logging.getLogger(__name__)


def _to_chat_completion_message(message: Message) -> ChatCompletionMessageParam:
    match message.role:
        case Role.SYSTEM:
            return ChatCompletionSystemMessageParam(role="system", content=message.content)
        case Role.USER:
            return ChatCompletionUserMessageParam(role="user", content=message.content)
        case Role.ASSISTANT:
            return ChatCompletionAssistantMessageParam(role="assistant", content=message.content)
        case None:
            raise ValueError(
                "Cannot send a Message without a role through the chat completion API; "
                "the legacy roleless format is only supported for fine-tuning."
            )


[docs] class OpenAIModel(BaseLLM): """ LLM wrapper for OpenAI API providing text/chat completions and log-probability evaluation output. """ LLM_NAME: str | None = None DEFAULT_FORMATTER: Callable[[], BaseFormatter] | None = None BYTES_PER_TOKEN: float = 4.0 # rule of thumb according to https://platform.openai.com/tokenizer def __init__( self, model_name: str | None = None, formatter: BaseFormatter | None = None, temperature: float | None = None, top_p: float | None = None, api_key: str | None = os.getenv("OPENAI_API_KEY", ""), organization: str | None = None, base_url: str | None = None, bytes_per_token: float | None = None, ) -> None: """ Initialize the OpenAIModel. Args: model_name: OpenAI model name (e.g., "gpt-4o", "gpt-3.5-turbo"). If None, uses LLM_NAME class attribute. formatter: Optional message formatter. temperature: Sampling temperature used when not passed to generate methods (from 0.0 to 2.0). top_p: Nucleus sampling probability mass (from 0.0 to 1.0). If None, the API default is used. api_key: OpenAI API key (defaults to OPENAI_API_KEY env variable). organization: Optional OpenAI organization ID. base_url: Optional API base URL for Azure or alternate endpoints. bytes_per_token: Optional custom bytes per token scalar for non-standard models. """ assert model_name is not None or self.LLM_NAME is not None, "A model name must be specified." self._model_name = model_name if model_name else self.LLM_NAME logger.info(f"Instantiating OpenAIModel with name: {self._model_name}") self._formatter = formatter or (self.DEFAULT_FORMATTER() if self.DEFAULT_FORMATTER is not None else None) self._temperature = temperature if temperature is not None else 0.0 assert 0.0 <= self._temperature <= 2.0, "Temperature must be between 0.0 and 2.0" if top_p is not None: assert 0.0 <= top_p <= 1.0, "top_p must be between 0.0 and 1.0" self._top_p = top_p self._client = OpenAI( api_key=api_key, organization=organization, base_url=base_url, ) # Initialize tokenizer for the model self._encoder = self._get_encoder() # set bytes_per_token_scalar for non-standard models if bytes_per_token is not None and bytes_per_token <= 0: raise ValueError("bytes_per_token must be positive") self.bytes_per_token_scalar = ( 4.0 / bytes_per_token if bytes_per_token is not None else 4.0 / self.BYTES_PER_TOKEN ) def _get_encoder(self) -> tiktoken.Encoding: assert self._model_name is not None return tiktoken.encoding_for_model(self._model_name) def _count_tokens(self, text: str) -> int: """ Count tokens for the given text using the encoder. Args: text: Input string. Returns: Number of tokens. """ return len(self._encoder.encode(text))
[docs] def generate_from_messages( self, messages: list[Sequence[Message]], stop_sequences: list[str] | None = None, max_tokens: int | None = None, temperature: float | None = None, top_p: float | None = None, ) -> list[RawCompletion]: """ Generate completions for a list of message sequences concurrently. Uses text completion API when a formatter is configured, otherwise uses chat completion API. Args: messages: Sequence of messages. stop_sequences: Optional list of stop sequences. max_tokens: Optional maximum number of tokens to generate. temperature: Sampling temperature. top_p: Nucleus sampling probability mass (0.0 to 1.0). Overrides instance default if provided. Returns: List of RawCompletion objects containing prompts and completions. """ effective_temperature = temperature if temperature is not None else self._temperature assert 0.0 <= effective_temperature <= 2.0, "Temperature must be between 0.0 and 2.0" effective_top_p = top_p if top_p is not None else self._top_p if effective_top_p is not None: assert 0.0 <= effective_top_p <= 1.0, "top_p must be between 0.0 and 1.0" def _process_one(single_messages: Sequence[Message]) -> RawCompletion: # Adjust max tokens based on bytes_per_token_scalar so that non-standard models generate full responses scaled_max_tokens = math.ceil(max_tokens * self.bytes_per_token_scalar) if max_tokens is not None else None if self._formatter is not None: # Use formatter and text completion API prompt = self._formatter.format(single_messages, output_mode="string") # documentation: https://platform.openai.com/docs/api-reference/completions/create assert self._model_name is not None response = self._client.completions.create( model=self._model_name, prompt=prompt, temperature=effective_temperature, top_p=effective_top_p, max_tokens=scaled_max_tokens, stop=stop_sequences, ) completion = response.choices[0].text return RawCompletion( prompt=prompt, prompt_sequence_positions=self._count_tokens(prompt), concat_compression=ConcatCompression.calculate( single_messages, count_tokens=self._count_tokens, completion=completion ), completion=completion, completion_sequence_positions=self._count_tokens(completion), ) else: # Use chat completion API chat_messages = [_to_chat_completion_message(m) for m in single_messages] assert self._model_name is not None chat_response = self._client.chat.completions.create( model=self._model_name, messages=chat_messages, temperature=effective_temperature, top_p=effective_top_p, max_tokens=scaled_max_tokens, stop=stop_sequences, ) prompt = "\n".join([f"{m.get('role', '')}: {m.get('content', '')}" for m in chat_messages]) prompt_tokens = getattr(chat_response.usage, "prompt_tokens", None) completion = chat_response.choices[0].message.content or "" return RawCompletion( prompt=prompt, prompt_sequence_positions=prompt_tokens, concat_compression=ConcatCompression.calculate( single_messages, count_tokens=self._count_tokens, completion=completion ), completion=completion, completion_sequence_positions=self._count_tokens(completion), ) with concurrent.futures.ThreadPoolExecutor() as executor: results = list(executor.map(_process_one, messages)) return results
[docs] def logprobs(self, samples: list[Sample]) -> list[RawLoglikelihood]: """ Compute total log-probabilities for possible completions given each sample's prompt. Args: samples: List of Sample objects, each with prompt messages and possible completions. Returns: List of RawLoglikelihood objects mapping each prompt and completion to its log-probability. Note: Uses the OpenAI completions API with echo=True; chat logprobs are not supported. """ assert self._model_name in ["babbage-002", "davinci-002"], ( "Log-probs for prompt tokens are only supported for a limited set of models." ) # apparently OpenAI stopped providing logprobs of prompt tokens, see discussion in: # https://github.com/EleutherAI/lm-evaluation-harness/issues/1196 assert self._formatter is not None, "Log-probs require a formatter to create text prompts." results: list[RawLoglikelihood] = [] for sample in samples: prompt = self._formatter.format(sample.messages, output_mode="string") if sample.messages else "" choices_log_probs: dict[str, float] = {} choices_sequence_positions: dict[str, int] = {} prompt_sequence_positions: int | None = self._count_tokens(prompt) error: Error | None = None for choice in sample.possible_completions or []: if error is not None: continue # Tokenize prompt and completion prompt_tokens = self._encoder.encode(prompt) completion_tokens = self._encoder.encode(choice) full_text = prompt + choice try: response = self._client.completions.create( model=self._model_name, prompt=full_text, echo=True, max_tokens=0, logprobs=1, temperature=0, ) choice_obj = response.choices[0] if not hasattr(choice_obj, "logprobs") or choice_obj.logprobs is None: raise ValueError("Logprobs not returned in response.") all_tokens = getattr(choice_obj.logprobs, "tokens", None) all_logprobs = getattr(choice_obj.logprobs, "token_logprobs", None) if all_tokens is None or all_logprobs is None: raise ValueError("Logprobs response missing expected 'tokens' or 'token_logprobs' fields.") if len(all_tokens) != len(prompt_tokens) + len(completion_tokens): raise ValueError( f"Token count mismatch: tokens in response ({len(all_tokens)}) != prompt+completion " f"({len(prompt_tokens) + len(completion_tokens)})" ) # Sum logprobs for the completion portion choices_log_probs[choice] = sum(all_logprobs[len(prompt_tokens) :]) choices_sequence_positions[choice] = len(completion_tokens) except Exception as e: error = Error(error_class=e.__class__.__name__, message=str(e), traceback=traceback.format_exc()) prompt_sequence_positions = None choices_log_probs = {} choices_sequence_positions = {} results.append( RawLoglikelihood( prompt=prompt, prompt_sequence_positions=prompt_sequence_positions, loglikelihoods=choices_log_probs, loglikelihoods_sequence_positions=choices_sequence_positions, raw_loglikelihood_error=error, ) ) return results
def __del__(self) -> None: if hasattr(self, "_client"): self._client.close()
[docs] class OpenAIEmbeddingModel(BaseLLM): def __init__( self, model_name: str = "text-embedding-3-large", formatter: BaseFormatter | None = None, api_key: str | None = None, organization: str | None = None, base_url: str | None = None, ) -> None: """Initialize OpenAI API client. Args: model_name: Name of the OpenAI model to use (e.g., "text-embedding-3-large") formatter: Optional message formatter api_key: OpenAI API key (defaults to OPENAI_API_KEY env variable) organization: Optional organization ID base_url: Optional API base URL for Azure or other endpoints """ if formatter is not None: raise ValueError("Formatter is not supported for embedding model.") self._model_name = model_name logger.info(f"Using {model_name} as embedding model") self._client = OpenAI( api_key=api_key or os.getenv("OPENAI_API_KEY", ""), organization=organization, base_url=base_url, )
[docs] def generate_from_messages( self, messages: list[Sequence[Message]], stop_sequences: list[str] | None = None, max_tokens: int | None = None, temperature: float | None = None, top_p: float | None = None, ) -> list[RawCompletion]: raise NotImplementedError( "Embedding model does not support generate_from_messages. Use generate_embeddings instead." )
[docs] def generate_embeddings( self, messages: list[Sequence[Message]], ) -> list[list[float]]: embeddings = [] for single_messages in messages: prompt = "".join([m.content for m in single_messages]) response = self._client.embeddings.create(model=self._model_name, input=[prompt]) embedding = response.data[0].embedding embeddings.append(embedding) return embeddings
[docs] def logprobs(self, samples: list[Sample]) -> list[RawLoglikelihood]: raise NotImplementedError("Embedding model cannot return logprobs.")
def __del__(self) -> None: if hasattr(self, "_client"): self._client.close() try: self._client.close() except Exception: pass
[docs] class DeepseekModel(OpenAIModel): """ General Deepseek model wrapper using OpenAI-compatible API for deepseek-chat and deepseek-reasoner models. Using the deepseek API: https://api-docs.deepseek.com/quick_start/pricing """ def __init__( self, model_name: str | None = None, formatter: BaseFormatter | None = None, temperature: float | None = None, api_key: str | None = None, organization: str | None = None, base_url: str | None = None, tokenizer_name: str | None = None, ) -> None: super().__init__( model_name=model_name, formatter=formatter, temperature=temperature, api_key=os.getenv("DEEPSEEK_API_KEY", ""), organization=organization, base_url="https://api.deepseek.com/beta", ) self._tokenizer_name = tokenizer_name if tokenizer_name is not None else "deepseek-ai/DeepSeek-V3.2-Exp" def _get_encoder(self) -> Tokenizer: return AutoTokenizer.from_pretrained(self._tokenizer_name) def _count_tokens(self, text: str) -> int: return len(self._encoder.encode(text))
### Model Aliases ###
[docs] class OpenAI_gpt_4o_mini(OpenAIModel): LLM_NAME = "gpt-4o-mini-2024-07-18"
[docs] class OpenAI_gpt_4o_mini_with_ConcatFormatter(OpenAIModel): LLM_NAME = "gpt-4o-mini-2024-07-18" DEFAULT_FORMATTER = ConcatFormatter
[docs] class OpenAI_davinci_002(OpenAIModel): LLM_NAME = "davinci-002" DEFAULT_FORMATTER = ConcatFormatter
[docs] class Deepseek_reasoner(DeepseekModel): LLM_NAME = "deepseek-reasoner" # DeepSeek-V3.2-Exp (Thinking Mode)
# multi-round conversations for reasoning model documented here: # https://api-docs.deepseek.com/guides/reasoning_model#api-example # does not support completion API
[docs] class Deepseek_chat(DeepseekModel): LLM_NAME = "deepseek-chat" # DeepSeek-V3.2-Exp (Non-thinking Mode)
[docs] class Deepseek_chat_with_formatter(DeepseekModel): LLM_NAME = "deepseek-chat" # DeepSeek-V3.2-Exp (Non-thinking Mode) DEFAULT_FORMATTER = partial(HFFormatter, "deepseek-ai/DeepSeek-V3.2-Exp") """ <|begin▁of▁sentence|><|User|>Question: What color is the night sky? <|Assistant|></think>Answer: """