Spaces:
Sleeping
Sleeping
| import requests | |
| import pandas as pd | |
| from bs4 import BeautifulSoup | |
| HF_INFERENCE_PAGE = "https://huggingface.co/inference/models" | |
| def fetch_inference_page(): | |
| """ | |
| Fetch the HF inference providers page | |
| """ | |
| r = requests.get(HF_INFERENCE_PAGE, timeout=60) | |
| r.raise_for_status() | |
| return r.text | |
| def parse_provider_costs(html_content): | |
| """ | |
| Parse provider cost information from the HTML table | |
| Extracts: model, provider, input_cost, output_cost, context_length, latency, speed, tool_calling, structured_output | |
| """ | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| provider_model_data = [] | |
| # Find all table rows - try different selectors | |
| rows = soup.find_all('tr') | |
| print(f"Found {len(rows)} table rows") | |
| for idx, row in enumerate(rows): | |
| cells = row.find_all('td') | |
| if len(cells) >= 8: # Need at least 8 columns for all data | |
| # Debug: print first few rows | |
| if idx < 3: | |
| print(f"Row {idx}: {len(cells)} cells") | |
| for i, cell in enumerate(cells[:9]): | |
| print(f" Cell {i}: {cell.get_text(strip=True)[:50]}") | |
| try: | |
| # Column indices based on the table structure | |
| model_cell = cells[0].get_text(strip=True) if len(cells) > 0 else "" | |
| provider_cell = cells[1].get_text(strip=True) if len(cells) > 1 else "" | |
| input_cost_cell = cells[2].get_text(strip=True) if len(cells) > 2 else "-" | |
| output_cost_cell = cells[3].get_text(strip=True) if len(cells) > 3 else "-" | |
| context_cell = cells[4].get_text(strip=True) if len(cells) > 4 else "-" | |
| latency_cell = cells[5].get_text(strip=True) if len(cells) > 5 else "-" | |
| speed_cell = cells[6].get_text(strip=True) if len(cells) > 6 else "-" | |
| tool_calling_cell = cells[7].get_text(strip=True) if len(cells) > 7 else "-" | |
| structured_cell = cells[8].get_text(strip=True) if len(cells) > 8 else "-" | |
| # Extract model name (format: "org/model-name Copy to clipboard") | |
| model = model_cell.split("Copy to clipboard")[0].strip() if model_cell else None | |
| # Extract provider name (may have "cheapest" or "fastest" tags) | |
| provider = provider_cell.split()[0] if provider_cell else None | |
| if not provider or provider == '-' or not model: | |
| continue | |
| # Parse costs (format: $0.30 or -) | |
| input_cost = None | |
| output_cost = None | |
| if input_cost_cell and input_cost_cell != '-': | |
| try: | |
| input_cost = float(input_cost_cell.replace('$', '').replace(',', '').strip()) | |
| except ValueError: | |
| pass | |
| if output_cost_cell and output_cost_cell != '-': | |
| try: | |
| output_cost = float(output_cost_cell.replace('$', '').replace(',', '').strip()) | |
| except ValueError: | |
| pass | |
| # Parse context length (number or -) | |
| context_length = None | |
| if context_cell and context_cell != '-': | |
| try: | |
| context_length = int(context_cell.replace(',', '').strip()) | |
| except ValueError: | |
| pass | |
| # Parse latency (seconds or -) | |
| latency = None | |
| if latency_cell and latency_cell != '-': | |
| try: | |
| latency = float(latency_cell.strip()) | |
| except ValueError: | |
| pass | |
| # Parse speed (tokens/second or -) | |
| speed = None | |
| if speed_cell and speed_cell != '-': | |
| try: | |
| speed = float(speed_cell.strip()) | |
| except ValueError: | |
| pass | |
| # Parse tool calling support (Yes/No) | |
| tool_calling = tool_calling_cell if tool_calling_cell in ['Yes', 'No'] else None | |
| # Parse structured output support (Yes/No) | |
| structured_output = structured_cell if structured_cell in ['Yes', 'No'] else None | |
| # Add each (provider, model) pair with all metadata | |
| provider_model_data.append({ | |
| 'provider': provider, | |
| 'model': model, | |
| 'input_cost_per_1M': input_cost, | |
| 'output_cost_per_1M': output_cost, | |
| 'context_length': context_length, | |
| 'latency_ttft': latency, | |
| 'speed_tokens_per_sec': speed, | |
| 'tool_calling': tool_calling, | |
| 'structured_output': structured_output | |
| }) | |
| except (ValueError, IndexError) as e: | |
| if idx < 3: | |
| print(f" Error parsing row {idx}: {e}") | |
| continue | |
| print(f"Extracted {len(provider_model_data)} provider-model entries") | |
| return provider_model_data | |
| def load_provider_costs(csv_path="hf_provider_model_costs.csv"): | |
| """ | |
| Load provider-model cost data from CSV file | |
| """ | |
| df = pd.read_csv(csv_path) | |
| return df | |
| def find_best_model(prompt, | |
| csv_path="hf_provider_model_costs.csv", | |
| strategy="cheapest_total", | |
| require_both_costs=True, | |
| input_output_ratio=1.0, | |
| min_context_length=None, | |
| max_latency=None, | |
| min_speed=None, | |
| require_tool_calling=False, | |
| require_structured_output=False): | |
| """ | |
| Find the most appropriate model for a given prompt based on cost strategy. | |
| Parameters: | |
| ----------- | |
| prompt : str | |
| The user prompt (currently used for length estimation) | |
| csv_path : str | |
| Path to the CSV file with provider-model costs | |
| strategy : str | |
| Selection strategy: | |
| - "cheapest_total": Cheapest combined input+output cost | |
| - "cheapest_input": Cheapest input cost only | |
| # Demo: Find best model for a sample prompt | |
| print("\n" + "="*60) | |
| print("Example: Finding best model for a prompt") | |
| print("="*60) | |
| sample_prompt = "Explain the concept of machine learning in simple terms" | |
| strategies = ["cheapest_total", "cheapest_input", "cheapest_output", "best_value"] | |
| for strategy in strategies: | |
| result = find_best_model(sample_prompt, strategy=strategy) | |
| if result: | |
| print(f"\nStrategy: {strategy}") | |
| print(f" Provider: {result['provider']}") | |
| print(f" Model: {result['model']}") | |
| print(f" Input cost: ${result['input_cost_per_1M']:.2f}/1M tokens") | |
| print(f" Output cost: ${result['output_cost_per_1M']:.2f}/1M tokens") | |
| print(f" Estimated cost for this prompt: ${result['estimated_cost']:.6f}") | |
| - "cheapest_output": Cheapest output cost only | |
| - "best_value": Best value considering typical usage (uses input_output_ratio) | |
| require_both_costs : bool | |
| If True, only consider models with both input and output costs available | |
| input_output_ratio : float | |
| Expected input/output token ratio for "best_value" strategy (default 1.0) | |
| min_context_length : int, optional | |
| Minimum required context length | |
| max_latency : float, optional | |
| Maximum acceptable latency (TTFT in seconds) | |
| min_speed : float, optional | |
| Minimum required speed (tokens/second) | |
| require_tool_calling : bool | |
| If True, only return models that support tool calling | |
| require_structured_output : bool | |
| If True, only return models that support structured output | |
| Returns: | |
| -------- | |
| dict with keys: provider, model, input_cost_per_1M, output_cost_per_1M, estimated_cost, | |
| context_length, latency_ttft, speed_tokens_per_sec, tool_calling, structured_output | |
| """ | |
| df = load_provider_costs(csv_path) | |
| # Filter out entries based on cost availability | |
| if require_both_costs: | |
| df_filtered = df.dropna(subset=['input_cost_per_1M', 'output_cost_per_1M']).copy() | |
| else: | |
| df_filtered = df[df['input_cost_per_1M'].notna() | df['output_cost_per_1M'].notna()].copy() | |
| # Apply capability filters | |
| if min_context_length is not None: | |
| df_filtered = df_filtered[df_filtered['context_length'] >= min_context_length] | |
| if max_latency is not None: | |
| df_filtered = df_filtered[(df_filtered['latency_ttft'].notna()) & (df_filtered['latency_ttft'] <= max_latency)] | |
| if min_speed is not None: | |
| df_filtered = df_filtered[(df_filtered['speed_tokens_per_sec'].notna()) & (df_filtered['speed_tokens_per_sec'] >= min_speed)] | |
| if require_tool_calling: | |
| df_filtered = df_filtered[df_filtered['tool_calling'] == 'Yes'] | |
| if require_structured_output: | |
| df_filtered = df_filtered[df_filtered['structured_output'] == 'Yes'] | |
| if df_filtered.empty: | |
| return None | |
| # Fill NaN costs with 0 for calculation purposes | |
| df_filtered.loc[:, 'input_cost_per_1M'] = df_filtered['input_cost_per_1M'].fillna(0) | |
| df_filtered.loc[:, 'output_cost_per_1M'] = df_filtered['output_cost_per_1M'].fillna(0) | |
| # Estimate prompt length (rough approximation: 1 word ≈ 1.3 tokens) | |
| prompt_tokens = len(prompt.split()) * 1.3 / 1_000_000 # Convert to millions | |
| # Apply selection strategy | |
| if strategy == "cheapest_input": | |
| df_filtered = df_filtered[df_filtered['input_cost_per_1M'] > 0] | |
| if df_filtered.empty: | |
| return None | |
| best = df_filtered.nsmallest(1, 'input_cost_per_1M').iloc[0] | |
| estimated_cost = best['input_cost_per_1M'] * prompt_tokens | |
| elif strategy == "cheapest_output": | |
| df_filtered = df_filtered[df_filtered['output_cost_per_1M'] > 0] | |
| if df_filtered.empty: | |
| return None | |
| best = df_filtered.nsmallest(1, 'output_cost_per_1M').iloc[0] | |
| # Assume output is similar length to input | |
| estimated_cost = best['output_cost_per_1M'] * prompt_tokens | |
| elif strategy == "best_value": | |
| # Calculate weighted cost based on input/output ratio | |
| df_filtered.loc[:, 'weighted_cost'] = ( | |
| df_filtered['input_cost_per_1M'] * input_output_ratio + | |
| df_filtered['output_cost_per_1M'] | |
| ) / (input_output_ratio + 1) | |
| df_filtered = df_filtered[df_filtered['weighted_cost'] > 0] | |
| if df_filtered.empty: | |
| return None | |
| best = df_filtered.nsmallest(1, 'weighted_cost').iloc[0] | |
| estimated_cost = best['weighted_cost'] * prompt_tokens * 2 # Input + output | |
| else: # "cheapest_total" (default) | |
| df_filtered.loc[:, 'total_cost'] = df_filtered['input_cost_per_1M'] + df_filtered['output_cost_per_1M'] | |
| df_filtered = df_filtered[df_filtered['total_cost'] > 0] | |
| if df_filtered.empty: | |
| return None | |
| best = df_filtered.nsmallest(1, 'total_cost').iloc[0] | |
| estimated_cost = best['total_cost'] * prompt_tokens | |
| return { | |
| 'provider': best['provider'], | |
| 'model': best['model'], | |
| 'input_cost_per_1M': best['input_cost_per_1M'], | |
| 'output_cost_per_1M': best['output_cost_per_1M'], | |
| 'estimated_cost': estimated_cost, | |
| 'strategy': strategy, | |
| 'context_length': best.get('context_length'), | |
| 'latency_ttft': best.get('latency_ttft'), | |
| 'speed_tokens_per_sec': best.get('speed_tokens_per_sec'), | |
| 'tool_calling': best.get('tool_calling'), | |
| 'structured_output': best.get('structured_output') | |
| } | |
| def test_find_best_model(): | |
| """ | |
| Test function to demonstrate find_best_model with different prompts and strategies | |
| """ | |
| import os | |
| # Check if CSV file exists | |
| csv_path = "hf_provider_model_costs.csv" | |
| if not os.path.exists(csv_path): | |
| print(f"\nERROR: {csv_path} not found!") | |
| print("Please run the script without --test flag first to fetch and generate the CSV file.") | |
| print("Example: python eval_provider.py") | |
| return | |
| print("\n" + "="*70) | |
| print("TESTING: find_best_model function") | |
| print("="*70) | |
| test_prompts = [ | |
| "What is the capital of France?", | |
| "Write a detailed essay about the impact of artificial intelligence on modern society, covering economic, social, and ethical implications.", | |
| "Translate this text to Spanish: Hello, how are you today?" | |
| ] | |
| for i, prompt in enumerate(test_prompts, 1): | |
| print(f"\n{'='*70}") | |
| print(f"Test {i}: Prompt length = {len(prompt)} chars, ~{len(prompt.split())} words") | |
| print(f"Prompt: {prompt[:60]}...") | |
| print(f"{'='*70}") | |
| strategies = ["cheapest_total", "cheapest_input", "cheapest_output", "best_value"] | |
| for strategy in strategies: | |
| result = find_best_model(prompt, strategy=strategy, require_both_costs=True) | |
| if result: | |
| print(f"\n Strategy: {strategy.upper()}") | |
| print(f" Provider: {result['provider']}") | |
| print(f" Model: {result['model']}") | |
| print(f" Input cost: ${result['input_cost_per_1M']:.4f}/1M tokens") | |
| print(f" Output cost: ${result['output_cost_per_1M']:.4f}/1M tokens") | |
| print(f" Context length: {result['context_length'] if result['context_length'] else 'N/A'}") | |
| print(f" Latency (TTFT): {result['latency_ttft']:.2f}s" if result['latency_ttft'] else " Latency: N/A") | |
| print(f" Speed: {result['speed_tokens_per_sec']:.0f} tok/s" if result['speed_tokens_per_sec'] else " Speed: N/A") | |
| print(f" Tool calling: {result['tool_calling'] or 'N/A'}") | |
| print(f" Structured output: {result['structured_output'] or 'N/A'}") | |
| print(f" Estimated cost: ${result['estimated_cost']:.8f}") | |
| else: | |
| print(f"\n Strategy: {strategy.upper()}") | |
| print(f" No suitable model found") | |
| # Test with require_both_costs=False | |
| print(f"\n{'='*70}") | |
| print("Test: Allow models with partial cost information") | |
| print(f"{'='*70}") | |
| result = find_best_model(test_prompts[0], strategy="cheapest_total", require_both_costs=False) | |
| if result: | |
| print(f" Provider: {result['provider']}") | |
| print(f" Model: {result['model']}") | |
| print(f" Input cost: ${result['input_cost_per_1M']:.4f}/1M tokens") | |
| print(f" Output cost: ${result['output_cost_per_1M']:.4f}/1M tokens") | |
| # Test with capability filters | |
| print(f"\n{'='*70}") | |
| print("Test: Filter by capabilities (tool calling + structured output)") | |
| print(f"{'='*70}") | |
| result = find_best_model( | |
| test_prompts[0], | |
| strategy="cheapest_total", | |
| require_tool_calling=True, | |
| require_structured_output=True, | |
| min_context_length=100000 | |
| ) | |
| if result: | |
| print(f" Provider: {result['provider']}") | |
| print(f" Model: {result['model']}") | |
| print(f" Context length: {result['context_length']}") | |
| print(f" Tool calling: {result['tool_calling']}") | |
| print(f" Structured output: {result['structured_output']}") | |
| print(f" Input cost: ${result['input_cost_per_1M']:.4f}/1M tokens") | |
| print(f" Output cost: ${result['output_cost_per_1M']:.4f}/1M tokens") | |
| else: | |
| print(" No models found matching criteria") | |
| def main(): | |
| import sys | |
| # Check if we should skip fetching and just run tests | |
| if len(sys.argv) > 1 and sys.argv[1] == "--test": | |
| test_find_best_model() | |
| return | |
| print("Fetching inference providers page...") | |
| html_content = fetch_inference_page() | |
| print("Parsing provider costs...") | |
| provider_data = parse_provider_costs(html_content) | |
| if not provider_data: | |
| print("No provider cost data found.") | |
| return | |
| df = pd.DataFrame(provider_data) | |
| df = df.sort_values(["provider", "model"]) | |
| df.to_csv("hf_provider_model_costs.csv", index=False) | |
| print(f"\nFound {len(df)} provider-model combinations") | |
| unique_providers = df['provider'].nunique() | |
| unique_models = df['model'].nunique() | |
| print(f"Unique providers: {unique_providers}") | |
| print(f"Unique models: {unique_models}") | |
| print("\nProvider-Model Cost Summary (first 20 rows):") | |
| print(df.head(20).to_string(index=False)) | |
| # Run comprehensive tests | |
| test_find_best_model() | |
| if __name__ == "__main__": | |
| main() | |