""" Anime Database Application using OOP and Modern Data Engineering Practices Built with anycoder """ from dataclasses import dataclass, field from typing import List, Dict, Optional, Set from enum import Enum, auto from datetime import datetime import uuid import json from abc import ABC, abstractmethod import logging from functools import cached_property import re from pathlib import Path # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class Genre(Enum): """Enum representing anime genres""" ACTION = auto() ADVENTURE = auto() COMEDY = auto() DRAMA = auto() FANTASY = auto() HORROR = auto() ROMANCE = auto() SCI_FI = auto() SLICE_OF_LIFE = auto() SPORTS = auto() SUPERNATURAL = auto() THRILLER = auto() def __str__(self): return self.name.replace('_', ' ').title() class AnimeStatus(Enum): """Enum representing anime status""" ONGOING = auto() COMPLETED = auto() UPCOMING = auto() HIATUS = auto() CANCELLED = auto() def __str__(self): return self.name.title() @dataclass class Anime: """Data class representing an anime entry""" id: str = field(default_factory=lambda: str(uuid.uuid4())) title: str alternative_titles: List[str] = field(default_factory=list) episodes: int = 0 status: AnimeStatus = AnimeStatus.ONGOING genres: Set[Genre] = field(default_factory=set) release_date: Optional[datetime] = None end_date: Optional[datetime] = None synopsis: str = "" rating: float = 0.0 studio: str = "" source: str = "" duration: str = "" season: Optional[str] = None year: Optional[int] = None popularity_rank: Optional[int] = None favorites: int = 0 created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) def __post_init__(self): """Validate and process data after initialization""" if not self.title: raise ValueError("Title cannot be empty") if self.episodes < 0: raise ValueError("Episodes cannot be negative") if self.rating < 0 or self.rating > 10: raise ValueError("Rating must be between 0 and 10") if self.release_date and self.end_date: if self.release_date > self.end_date: raise ValueError("Release date cannot be after end date") if self.year and (self.year < 1900 or self.year > datetime.now().year + 5): raise ValueError("Invalid year") # Extract year from release_date if not provided if not self.year and self.release_date: self.year = self.release_date.year def to_dict(self) -> Dict: """Convert anime object to dictionary""" return { "id": self.id, "title": self.title, "alternative_titles": self.alternative_titles, "episodes": self.episodes, "status": self.status.name, "genres": [genre.name for genre in self.genres], "release_date": self.release_date.isoformat() if self.release_date else None, "end_date": self.end_date.isoformat() if self.end_date else None, "synopsis": self.synopsis, "rating": self.rating, "studio": self.studio, "source": self.source, "duration": self.duration, "season": self.season, "year": self.year, "popularity_rank": self.popularity_rank, "favorites": self.favorites, "created_at": self.created_at.isoformat(), "updated_at": self.updated_at.isoformat() } @classmethod def from_dict(cls, data: Dict) -> 'Anime': """Create anime object from dictionary""" try: return cls( id=data.get("id", str(uuid.uuid4())), title=data["title"], alternative_titles=data.get("alternative_titles", []), episodes=data.get("episodes", 0), status=AnimeStatus[data.get("status", "ONGOING")], genres={Genre[genre] for genre in data.get("genres", [])}, release_date=datetime.fromisoformat(data["release_date"]) if data.get("release_date") else None, end_date=datetime.fromisoformat(data["end_date"]) if data.get("end_date") else None, synopsis=data.get("synopsis", ""), rating=data.get("rating", 0.0), studio=data.get("studio", ""), source=data.get("source", ""), duration=data.get("duration", ""), season=data.get("season"), year=data.get("year"), popularity_rank=data.get("popularity_rank"), favorites=data.get("favorites", 0), created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else datetime.now(), updated_at=datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else datetime.now() ) except Exception as e: logger.error(f"Error creating anime from dict: {e}") raise class AnimeRepositoryInterface(ABC): """Abstract base class for anime repository""" @abstractmethod def add_anime(self, anime: Anime) -> None: pass @abstractmethod def get_anime(self, anime_id: str) -> Optional[Anime]: pass @abstractmethod def update_anime(self, anime: Anime) -> None: pass @abstractmethod def delete_anime(self, anime_id: str) -> None: pass @abstractmethod def get_all_anime(self) -> List[Anime]: pass @abstractmethod def search_anime(self, query: str) -> List[Anime]: pass @abstractmethod def filter_by_genre(self, genre: Genre) -> List[Anime]: pass @abstractmethod def filter_by_status(self, status: AnimeStatus) -> List[Anime]: pass class InMemoryAnimeRepository(AnimeRepositoryInterface): """In-memory implementation of anime repository""" def __init__(self): self._anime_db: Dict[str, Anime] = {} def add_anime(self, anime: Anime) -> None: """Add anime to repository""" if anime.id in self._anime_db: raise ValueError(f"Anime with ID {anime.id} already exists") self._anime_db[anime.id] = anime logger.info(f"Added anime: {anime.title}") def get_anime(self, anime_id: str) -> Optional[Anime]: """Get anime by ID""" return self._anime_db.get(anime_id) def update_anime(self, anime: Anime) -> None: """Update existing anime""" if anime.id not in self._anime_db: raise ValueError(f"Anime with ID {anime.id} not found") anime.updated_at = datetime.now() self._anime_db[anime.id] = anime logger.info(f"Updated anime: {anime.title}") def delete_anime(self, anime_id: str) -> None: """Delete anime by ID""" if anime_id not in self._anime_db: raise ValueError(f"Anime with ID {anime_id} not found") anime_title = self._anime_db[anime_id].title del self._anime_db[anime_id] logger.info(f"Deleted anime: {anime_title}") def get_all_anime(self) -> List[Anime]: """Get all anime in repository""" return list(self._anime_db.values()) def search_anime(self, query: str) -> List[Anime]: """Search anime by title or alternative titles""" query = query.lower() return [ anime for anime in self._anime_db.values() if (query in anime.title.lower() or any(query in alt.lower() for alt in anime.alternative_titles)) ] def filter_by_genre(self, genre: Genre) -> List[Anime]: """Filter anime by genre""" return [anime for anime in self._anime_db.values() if genre in anime.genres] def filter_by_status(self, status: AnimeStatus) -> List[Anime]: """Filter anime by status""" return [anime for anime in self._anime_db.values() if anime.status == status] class JSONAnimeRepository(AnimeRepositoryInterface): """JSON file-based implementation of anime repository""" def __init__(self, file_path: str = "anime_database.json"): self.file_path = Path(file_path) self._anime_db: Dict[str, Anime] = {} self._load_database() def _load_database(self) -> None: """Load database from JSON file""" try: if self.file_path.exists(): with open(self.file_path, 'r', encoding='utf-8') as f: data = json.load(f) self._anime_db = { anime_id: Anime.from_dict(anime_data) for anime_id, anime_data in data.items() } logger.info(f"Loaded {len(self._anime_db)} anime entries from {self.file_path}") else: logger.info(f"Creating new database at {self.file_path}") except Exception as e: logger.error(f"Error loading database: {e}") raise def _save_database(self) -> None: """Save database to JSON file""" try: data = { anime_id: anime.to_dict() for anime_id, anime in self._anime_db.items() } with open(self.file_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) logger.info(f"Saved {len(self._anime_db)} anime entries to {self.file_path}") except Exception as e: logger.error(f"Error saving database: {e}") raise def add_anime(self, anime: Anime) -> None: """Add anime to repository""" if anime.id in self._anime_db: raise ValueError(f"Anime with ID {anime.id} already exists") self._anime_db[anime.id] = anime self._save_database() logger.info(f"Added anime: {anime.title}") def get_anime(self, anime_id: str) -> Optional[Anime]: """Get anime by ID""" return self._anime_db.get(anime_id) def update_anime(self, anime: Anime) -> None: """Update existing anime""" if anime.id not in self._anime_db: raise ValueError(f"Anime with ID {anime.id} not found") anime.updated_at = datetime.now() self._anime_db[anime.id] = anime self._save_database() logger.info(f"Updated anime: {anime.title}") def delete_anime(self, anime_id: str) -> None: """Delete anime by ID""" if anime_id not in self._anime_db: raise ValueError(f"Anime with ID {anime_id} not found") anime_title = self._anime_db[anime_id].title del self._anime_db[anime_id] self._save_database() logger.info(f"Deleted anime: {anime_title}") def get_all_anime(self) -> List[Anime]: """Get all anime in repository""" return list(self._anime_db.values()) def search_anime(self, query: str) -> List[Anime]: """Search anime by title or alternative titles""" query = query.lower() return [ anime for anime in self._anime_db.values() if (query in anime.title.lower() or any(query in alt.lower() for alt in anime.alternative_titles)) ] def filter_by_genre(self, genre: Genre) -> List[Anime]: """Filter anime by genre""" return [anime for anime in self._anime_db.values() if genre in anime.genres] def filter_by_status(self, status: AnimeStatus) -> List[Anime]: """Filter anime by status""" return [anime for anime in self._anime_db.values() if anime.status == status] class AnimeService: """Service class for anime operations""" def __init__(self, repository: AnimeRepositoryInterface): self.repository = repository def add_anime(self, anime_data: Dict) -> Anime: """Add new anime to database""" try: anime = Anime.from_dict(anime_data) self.repository.add_anime(anime) return anime except Exception as e: logger.error(f"Error adding anime: {e}") raise def get_anime(self, anime_id: str) -> Optional[Anime]: """Get anime by ID""" return self.repository.get_anime(anime_id) def update_anime(self, anime_id: str, anime_data: Dict) -> Optional[Anime]: """Update existing anime""" existing_anime = self.repository.get_anime(anime_id) if not existing_anime: return None try: # Update only the fields that are provided for key, value in anime_data.items(): if hasattr(existing_anime, key): if key == "genres": setattr(existing_anime, key, {Genre[genre] for genre in value}) elif key == "status": setattr(existing_anime, key, AnimeStatus[value]) else: setattr(existing_anime, key, value) existing_anime.updated_at = datetime.now() self.repository.update_anime(existing_anime) return existing_anime except Exception as e: logger.error(f"Error updating anime: {e}") raise def delete_anime(self, anime_id: str) -> bool: """Delete anime by ID""" try: self.repository.delete_anime(anime_id) return True except Exception as e: logger.error(f"Error deleting anime: {e}") return False def get_all_anime(self) -> List[Anime]: """Get all anime""" return self.repository.get_all_anime() def search_anime(self, query: str) -> List[Anime]: """Search anime by query""" return self.repository.search_anime(query) def filter_by_genre(self, genre: str) -> List[Anime]: """Filter anime by genre""" try: genre_enum = Genre[genre.upper()] return self.repository.filter_by_genre(genre_enum) except KeyError: logger.error(f"Invalid genre: {genre}") return [] def filter_by_status(self, status: str) -> List[Anime]: """Filter anime by status""" try: status_enum = AnimeStatus[status.upper()] return self.repository.filter_by_status(status_enum) except KeyError: logger.error(f"Invalid status: {status}") return [] def get_top_rated(self, limit: int = 10) -> List[Anime]: """Get top rated anime""" return sorted( self.repository.get_all_anime(), key=lambda x: x.rating, reverse=True )[:limit] def get_most_popular(self, limit: int = 10) -> List[Anime]: """Get most popular anime""" return sorted( self.repository.get_all_anime(), key=lambda x: x.popularity_rank if x.popularity_rank else float('inf'), reverse=False )[:limit] class AnimeDatabase: """Main anime database application class""" def __init__(self, repository_type: str = "json"): """Initialize anime database with specified repository type""" self.repository_type = repository_type.lower() if self.repository_type == "json": self.repository = JSONAnimeRepository() else: self.repository = InMemoryAnimeRepository() self.service = AnimeService(self.repository) def run(self): """Run the anime database application""" print("Anime Database Application") print("Built with anycoder") print(f"Using {self.repository_type} repository") print("Type 'help' for available commands") while True: try: command = input("\n> ").strip().lower() if command == "exit": break elif command == "help": self._show_help() elif command == "add": self._add_anime() elif command == "list": self._list_anime() elif command == "search": self._search_anime() elif command == "filter": self._filter_anime() elif command == "top": self._show_top_rated() elif command == "popular": self._show_most_popular() elif command == "delete": self._delete_anime() elif command == "update": self._update_anime() else: print("Unknown command. Type 'help' for available commands.") except KeyboardInterrupt: print("\nExiting...") break except Exception as e: print(f"Error: {e}") def _show_help(self): """Display help information""" print("\nAvailable commands:") print(" help - Show this help message") print(" add - Add new anime to database") print(" list - List all anime in database") print(" search - Search anime by title") print(" filter - Filter anime by genre or status") print(" top - Show top rated anime") print(" popular - Show most popular anime") print(" update - Update existing anime") print(" delete - Delete anime from database") print(" exit - Exit the application") def _add_anime(self): """Add new anime to database""" print("\nAdd New Anime") print("Enter anime details (press Enter to skip optional fields)") try: title = input("Title (required): ").strip() if not title: print("Title is required") return anime_data = { "title": title, "alternative_titles": input("Alternative titles (comma separated): ").strip().split(","), "episodes": int(input("Episodes: ").strip() or 0), "status": input("Status (ONGOING/COMPLETED/UPCOMING/HIATUS/CANCELLED): ").strip().upper() or "ONGOING", "genres": input("Genres (comma separated, e.g., ACTION,COMEDY): ").strip().upper().split(","), "release_date": input("Release date (YYYY-MM-DD): ").strip(), "end_date": input("End date (YYYY-MM-DD): ").strip(), "synopsis": input("Synopsis: ").strip(), "rating": float(input("Rating (0-10): ").strip() or 0), "studio": input("Studio: ").strip(), "source": input("Source: ").strip(), "duration": input("Duration (e.g., 24 min): ").strip(), "season": input("Season (e.g., Winter 2023): ").strip(), "year": int(input("Year: ").strip() or datetime.now().year), "popularity_rank": int(input("Popularity rank: ").strip() or 0) or None, "favorites": int(input("Favorites count: ").strip() or 0) } # Process dates if anime_data["release_date"]: anime_data["release_date"] = datetime.strptime(anime_data["release_date"], "%Y-%m-%d").isoformat() if anime_data["end_date"]: anime_data["end_date"] = datetime.strptime(anime_data["end_date"], "%Y-%m-%d").isoformat() # Remove empty alternative titles anime_data["alternative_titles"] = [t.strip() for t in anime_data["alternative_titles"] if t.strip()] # Remove empty genres anime_data["genres"] = [g.strip() for g in anime_data["genres"] if g.strip()] anime = self.service.add_anime(anime_data) print(f"Successfully added anime: {anime.title}") except Exception as e: print(f"Error adding anime: {e}") def _list_anime(self): """List all anime in database""" anime_list = self.service.get_all_anime() if not anime_list: print("No anime in database") return print(f"\nListing {len(anime_list)} anime:") for i, anime in enumerate(anime_list, 1): print(f"\n{i}. {anime.title}") print(f" ID: {anime.id}") print(f" Status: {anime.status}") print(f" Episodes: {anime.episodes}") print(f" Rating: {anime.rating}/10") print(f" Genres: {', '.join(str(g) for g in anime.genres)}") if anime.release_date: print(f" Released: {anime.release_date.strftime('%Y-%m-%d')}") print(f" Studio: {anime.studio}") print(f" Synopsis: {anime.synopsis[:100]}...") def _search_anime(self): """Search anime by title""" query = input("\nEnter search query: ").strip() if not query: print("Please enter a search query") return results = self.service.search_anime(query) if not results: print("No results found") return print(f"\nFound {len(results)} results:") for i, anime in enumerate(results, 1): print(f"\n{i}. {anime.title}") print(f" ID: {anime.id}") print(f" Status: {anime.status}") print(f" Rating: {anime.rating}/10") def _filter_anime(self): """Filter anime by genre or status""" print("\nFilter Options:") print("1. By Genre") print("2. By Status") choice = input("Choose filter type (1/2): ").strip() if choice == "1": print("\nAvailable Genres:") for genre in Genre: print(f" - {genre}") genre_input = input("Enter genre: ").strip().upper() try: genre = Genre[genre_input] results = self.service.filter_by_genre(genre.name) except KeyError: print("Invalid genre") return elif choice == "2": print("\nAvailable Statuses:") for status in AnimeStatus: print(f" - {status}") status_input = input("Enter status: ").strip().upper() try: results = self.service.filter_by_status(status_input) except KeyError: print("Invalid status") return else: print("Invalid choice") return if not results: print("No results found") return print(f"\nFound {len(results)} results:") for i, anime in enumerate(results, 1): print(f"\n{i}. {anime.title}") print(f" ID: {anime.id}") print(f" Rating: {anime.rating}/10") def _show_top_rated(self): """Show top rated anime""" limit = input("\nEnter number of top rated anime to show (default 10): ").strip() try: limit = int(limit) if limit else 10 results = self.service.get_top_rated(limit) if not results: print("No anime in database") return print(f"\nTop {len(results)} Rated Anime:") for i, anime in enumerate(results, 1): print(f"\n{i}. {anime.title}") print(f" Rating: {anime.rating}/10") print(f" Episodes: {anime.episodes}") print(f" Genres: {', '.join(str(g) for g in anime.genres)}") except ValueError: print("Please enter a valid number") def _show_most_popular(self): """Show most popular anime""" limit = input("\nEnter number of most popular anime to show (default 10): ").strip() try: limit = int(limit) if limit else 10 results = self.service.get_most_popular(limit) if not results: print("No anime in database") return print(f"\nTop {len(results)} Most Popular Anime:") for i, anime in enumerate(results, 1): print(f"\n{i}. {anime.title}") print(f" Popularity Rank: {anime.popularity_rank}") print(f" Favorites: {anime.favorites}") print(f" Rating: {anime.rating}/10") except ValueError: print("Please enter a valid number") def _delete_anime(self): """Delete anime from database""" anime_id = input("\nEnter anime ID to delete: ").strip() if not anime_id: print("Please enter an anime ID") return anime = self.service.get_anime(anime_id) if not anime: print("Anime not found") return confirm = input(f"Are you sure you want to delete '{anime.title}'? (y/n): ").strip().lower() if confirm == 'y': if self.service.delete_anime(anime_id): print(f"Successfully deleted anime: {anime.title}") else: print("Failed to delete anime") else: print("Deletion cancelled") def _update_anime(self): """Update existing anime""" anime_id = input("\nEnter anime ID to update: ").strip() if not anime_id: print("Please enter an anime ID") return anime = self.service.get_anime(anime_id) if not anime: print("Anime not found") return print(f"\nCurrent anime details for '{anime.title}':") print(f"1. Title: {anime.title}") print(f"2. Alternative Titles: {', '.join(anime.alternative_titles)}") print(f"3. Episodes: {anime.episodes}") print(f"4. Status: {anime.status}") print(f"5. Genres: {', '.join(str(g) for g in anime.genres)}") print(f"6. Release Date: {anime.release_date}") print(f"7. End Date: {anime.end_date}") print(f"8. Synopsis: {anime.synopsis}") print(f"9. Rating: {anime.rating}") print(f"10. Studio: {anime.studio}") print(f"11. Source: {anime.source}") print(f"12. Duration: {anime.duration}") print(f"13. Season: {anime.season}") print(f"14. Year: {anime.year}") print(f"15. Popularity Rank: {anime.popularity_rank}") print(f"16. Favorites: {anime.favorites}") update_data = {} while True: field_num = input("\nEnter field number to update (0 to finish): ").strip() if field_num == "0": break try: field_num = int(field_num) if field_num < 1 or field_num > 16: print("Invalid field number") continue field_names = [ "title", "alternative_titles", "episodes", "status", "genres", "release_date", "end_date", "synopsis", "rating", "studio", "source", "duration", "season", "year", "popularity_rank", "favorites" ] field_name = field_names[field_num - 1] if field_name == "alternative_titles": value = input(f"Enter new {field_name} (comma separated): ").strip().split(",") elif field_name in ["release_date", "end_date"]: value = input(f"Enter new {field_name} (YYYY-MM-DD): ").strip() if value: value = datetime.strptime(value, "%Y-%m-%d").isoformat() elif field_name in ["episodes", "year", "popularity_rank", "favorites"]: value = int(input(f"Enter new {field_name}: ").strip()) elif field_name == "rating": value = float(input(f"Enter new {field_name}: ").strip()) elif field_name == "genres": value = input(f"Enter new {field_name} (comma separated): ").strip().upper().split(",") else: value = input(f"Enter new {field_name}: ").strip() if value or value == 0: # Allow 0 for numeric fields update_data[field_name] = value except ValueError as e: print(f"Invalid input: {e}") except Exception as e: print(f"Error: {e}") if update_data: updated_anime = self.service.update_anime(anime_id, update_data) if updated_anime: print(f"Successfully updated anime: {updated_anime.title}") else: print("Failed to update anime") else: print("No changes made") if __name__ == "__main__": # Initialize and run the anime database application print("Initializing Anime Database...") db = AnimeDatabase(repository_type="json") db.run()