Module transcripter.core

Sub-modules

transcripter.core.redis_manager
transcripter.core.youtube_manager

Classes

class RedisManager (config: Config)

Manages interactions with Redis, including connection, indexing, and document operations.

Attributes

config : Config
Configuration object containing Redis settings.
redis_client : Optional[redis.Redis]
Redis client instance.

Initializes the RedisManager with the given configuration.

Args

config : Config
Configuration object containing Redis settings.
Expand source code
class RedisManager:
    """
    Manages interactions with Redis, including connection, indexing, and document operations.

    Attributes:
        config (Config): Configuration object containing Redis settings.
        redis_client (Optional[redis.Redis]): Redis client instance.
    """

    def __init__(self, config: Config) -> None:
        """
        Initializes the RedisManager with the given configuration.

        Args:
            config (Config): Configuration object containing Redis settings.
        """
        self.config: Config = config
        self.redis_client: Optional[redis.Redis] = None
        logger.info("RedisManager initialized")

    def ensure_connection(self) -> None:
        """
        Ensures a connection to the Redis server. If not connected, establishes a new connection
        and creates the index.
        """
        if self.redis_client is None:
            self.redis_client = redis.Redis(
                host=self.config.REDIS_HOST,
                port=self.config.REDIS_PORT,
                password=self.config.REDIS_PASSWORD,
                db=self.config.REDIS_DB,
            )
            logger.info("Redis connection established")
        self._create_index()

    def _create_index(self) -> None:
        """
        Creates an index in Redis for storing documents. If the index already exists, logs a message.
        """
        try:
            self.redis_client.execute_command(
                "FT.CREATE",
                self.config.REDIS_INDEX,
                "ON",
                "HASH",
                "PREFIX",
                "1",
                "doc:",
                "SCHEMA",
                "text",
                "TEXT",
                "WEIGHT",
                "5.0",
                "video_id",
                "TAG",
                "video_title",
                "TEXT",
                "WEIGHT",
                "2.0",
                "video_publish_date",
                "TEXT",
                "start_time",
                "NUMERIC",
                "SORTABLE",
                "timecode",
                "TEXT",
            )
            logger.info("Redis index created successfully")
        except redis.exceptions.ResponseError as e:
            if "Index already exists" in str(e):
                logger.info("Redis index already exists")
            else:
                logger.error(f"Error creating Redis index: {str(e)}")
                raise

    def add_document(self, doc_id: str, **fields: Union[str, int, float]) -> None:
        """
        Adds a document to Redis.

        Args:
            doc_id (str): The ID of the document.
            **fields (Union[str, int, float]): The fields of the document.
        """
        logger.debug(f"Adding document: {doc_id}")
        fields = {k: str(v) for k, v in fields.items()}
        self.redis_client.hset(f"doc:{doc_id}", mapping=fields)

    def search(self, query_string: str) -> Dict[str, Union[int, List[Dict[str, str]]]]:
        """
        Searches for documents in Redis based on the query string.

        Args:
            query_string (str): The query string to search for.

        Returns:
            Dict[str, Union[int, List[Dict[str, str]]]]: The search results, including total count and documents.
        """
        logger.debug(f"Searching with query: {query_string}")
        try:
            query = f"(@text:{query_string}) | (@video_title:{query_string})"
            result = self.redis_client.execute_command(
                "FT.SEARCH", self.config.REDIS_INDEX, query, "LIMIT", 0, 1000
            )
            logger.debug(f"Raw search result: {result}")
            return self._parse_search_result(result)
        except Exception as e:
            logger.error(f"Error during Redis search: {str(e)}")
            return {"total": 0, "docs": []}

    def _parse_search_result(
        self, result: List[Union[int, List[bytes]]]
    ) -> Dict[str, Union[int, List[Dict[str, str]]]]:
        """
        Parses the raw search result from Redis.

        Args:
            result (List[Union[int, List[bytes]]]): The raw search result.

        Returns:
            Dict[str, Union[int, List[Dict[str, str]]]]: The parsed search result, including total count and documents.
        """
        if not result or len(result) < 1:
            logger.warning("Empty search result from Redis")
            return {"total": 0, "docs": []}

        total_results = result[0]
        documents = [
            {
                k.decode(): v.decode()
                for k, v in zip(result[i + 1][::2], result[i + 1][1::2])
            }
            for i in range(1, len(result), 2)
        ]

        logger.debug(f"Parsed {len(documents)} documents from search result")
        return {"total": total_results, "docs": documents}

    def get_raw_sample(self) -> List[Dict[str, Dict[str, str]]]:
        """
        Retrieves a sample of raw documents from Redis.

        Returns:
            List[Dict[str, Dict[str, str]]]: A list of dictionaries containing document keys and their fields.
        """
        keys = self.redis_client.keys("doc:*")
        sample = [
            {
                key.decode(): {
                    k.decode(): v.decode()
                    for k, v in self.redis_client.hgetall(key).items()
                }
            }
            for key in keys[:5]
        ]
        return sample

    def get_all_indexed_video_ids(self) -> List[str]:
        """
        Retrieves all indexed video IDs from Redis.

        Returns:
            List[str]: A list of indexed video IDs.
        """
        keys = self.redis_client.keys("doc:*")
        video_ids = {
            re.match(r"^(.*)_(?!.*_)", key.decode().split(":")[1]).group(1)
            for key in keys
            if len(key.decode().split(":")) > 1
        }
        return list(video_ids)

    def get_partially_indexed_videos(
        self,
    ) -> Dict[str, Dict[str, Union[int, List[str]]]]:
        """
        Retrieves information about partially indexed videos from Redis.

        Returns:
            Dict[str, Dict[str, Union[int, List[str]]]]: A dictionary containing video IDs, chunk counts, and chunks.
        """
        keys = self.redis_client.keys("doc:*")
        video_chunks = {}
        for key in keys:
            key_parts = key.decode().split(":")
            if len(key_parts) > 1:
                video_id, chunk_info = key_parts[1].split("_", 1)
                if video_id not in video_chunks:
                    video_chunks[video_id] = set()
                video_chunks[video_id].add(chunk_info)

        return {
            video_id: {
                "chunk_count": len(chunks),
                "chunks": sorted(list(chunks)),
            }
            for video_id, chunks in video_chunks.items()
        }

    def document_exists(self, doc_id: str) -> bool:
        """
        Checks if a document exists in Redis.

        Args:
            doc_id (str): The ID of the document.

        Returns:
            bool: True if the document exists, False otherwise.
        """
        exists = self.redis_client.exists(f"doc:{doc_id}")
        logger.debug(f"Document {doc_id} exists: {exists}")
        return bool(exists)

    def get_all_documents(
        self,
    ) -> Dict[str, Union[int, List[Dict[str, Union[str, Dict[str, str]]]]]]:
        """
        Retrieves all documents from Redis.

        Returns:
            Dict[str, Union[int, List[Dict[str, Union[str, Dict[str, str]]]]]]: A dictionary containing total keys and a sample of documents.
        """
        keys = self.redis_client.keys("doc:*")
        documents = [
            {
                "key": key.decode(),
                "fields": {
                    k.decode(): v.decode()
                    for k, v in self.redis_client.hgetall(key).items()
                },
            }
            for key in keys
        ]
        total_keys = len(keys)
        logger.info(
            f"Total keys: {total_keys}, Retrieved {len(documents)} documents from Redis"
        )
        return {"total_keys": total_keys, "sample": documents}

    def get_document_count(self) -> int:
        """
        Retrieves the total number of documents in Redis.

        Returns:
            int: The total number of documents.
        """
        return self.redis_client.dbsize()

    def get_index_info(self) -> Dict[str, Union[str, int, float, bool, None]]:
        """
        Retrieves information about the Redis index.

        Returns:
            Dict[str, Union[str, int, float, bool, None]]: A dictionary containing index information.
        """
        try:
            info = self.redis_client.execute_command("FT.INFO", self.config.REDIS_INDEX)
            return {
                (k.decode() if isinstance(k, bytes) else str(k)): (
                    v.decode() if isinstance(v, bytes) else v
                )
                for k, v in zip(info[::2], info[1::2])
            }
        except Exception as e:
            logger.error(f"Error getting index info: {str(e)}")
            return {}

    def check_redisearch(self) -> bool:
        """
        Checks if the RediSearch module is loaded in Redis.

        Returns:
            bool: True if the RediSearch module is loaded, False otherwise.
        """
        try:
            modules = self.redis_client.execute_command("MODULE LIST")
            logger.debug(f"Redis modules: {modules}")
            return any(module[1] == b"search" for module in modules)
        except Exception as e:
            logger.error(f"Error checking RediSearch module: {str(e)}")
            return False

Methods

def add_document(self, doc_id: str, **fields: Union[str, int, float]) ‑> None

Adds a document to Redis.

Args

doc_id : str
The ID of the document.
**fields : Union[str, int, float]
The fields of the document.
def check_redisearch(self) ‑> bool

Checks if the RediSearch module is loaded in Redis.

Returns

bool
True if the RediSearch module is loaded, False otherwise.
def document_exists(self, doc_id: str) ‑> bool

Checks if a document exists in Redis.

Args

doc_id : str
The ID of the document.

Returns

bool
True if the document exists, False otherwise.
def ensure_connection(self) ‑> None

Ensures a connection to the Redis server. If not connected, establishes a new connection and creates the index.

def get_all_documents(self) ‑> Dict[str, Union[int, List[Dict[str, Union[str, Dict[str, str]]]]]]

Retrieves all documents from Redis.

Returns

Dict[str, Union[int, List[Dict[str, Union[str, Dict[str, str]]]]]]
A dictionary containing total keys and a sample of documents.
def get_all_indexed_video_ids(self) ‑> List[str]

Retrieves all indexed video IDs from Redis.

Returns

List[str]
A list of indexed video IDs.
def get_document_count(self) ‑> int

Retrieves the total number of documents in Redis.

Returns

int
The total number of documents.
def get_index_info(self) ‑> Dict[str, Union[str, int, float, bool, ForwardRef(None)]]

Retrieves information about the Redis index.

Returns

Dict[str, Union[str, int, float, bool, None]]
A dictionary containing index information.
def get_partially_indexed_videos(self) ‑> Dict[str, Dict[str, Union[int, List[str]]]]

Retrieves information about partially indexed videos from Redis.

Returns

Dict[str, Dict[str, Union[int, List[str]]]]
A dictionary containing video IDs, chunk counts, and chunks.
def get_raw_sample(self) ‑> List[Dict[str, Dict[str, str]]]

Retrieves a sample of raw documents from Redis.

Returns

List[Dict[str, Dict[str, str]]]
A list of dictionaries containing document keys and their fields.
def search(self, query_string: str) ‑> Dict[str, Union[int, List[Dict[str, str]]]]

Searches for documents in Redis based on the query string.

Args

query_string : str
The query string to search for.

Returns

Dict[str, Union[int, List[Dict[str, str]]]]
The search results, including total count and documents.
class YouTubeManager (config: Config)

A class to manage interactions with the YouTube API, including fetching video details and transcripts.

Attributes

config : Config
Configuration object containing API keys and other settings.
api_client : Api
YouTube API client initialized with the provided API key.

Initializes the YouTubeManager with the given configuration.

Args

config : Config
Configuration object containing API keys and other settings.
Expand source code
class YouTubeManager:
    """
    A class to manage interactions with the YouTube API, including fetching video details and transcripts.

    Attributes:
        config (Config): Configuration object containing API keys and other settings.
        api_client (Api): YouTube API client initialized with the provided API key.
    """

    def __init__(self, config: Config) -> None:
        """
        Initializes the YouTubeManager with the given configuration.

        Args:
            config (Config): Configuration object containing API keys and other settings.
        """
        self.config = config
        self.api_client = Api(api_key=self.config.YOUTUBE_API_KEY)
        logger.info("YouTubeManager initialized")

    def get_all_video_details_from_playlist(
        self, playlist_id: str
    ) -> Dict[str, Dict[str, Union[str, int]]]:
        """
        Fetches details of all videos in a given playlist.

        Args:
            playlist_id (str): The ID of the playlist to fetch video details from.

        Returns:
            Dict[str, Dict[str, Union[str, int]]]: A dictionary where keys are video IDs and values are dictionaries
            containing video details such as title, publish date, and video ID.
        """
        logger.info(f"Fetching video details for playlist: {playlist_id}")
        playlist_items = self.api_client.get_playlist_items(
            playlist_id=playlist_id, limit=50, count=None
        ).items
        logger.debug(f"Playlist items: {playlist_items}")
        return self._get_video_details(playlist_items)

    def get_all_video_details_from_channel(
        self, channel_id: str
    ) -> Dict[str, Dict[str, Union[str, int]]]:
        """
        Fetches details of all videos in a given channel.

        Args:
            channel_id (str): The ID of the channel to fetch video details from.

        Returns:
            Dict[str, Dict[str, Union[str, int]]]: A dictionary where keys are video IDs and values are dictionaries
            containing video details such as title, publish date, and video ID.
        """
        logger.info(f"Fetching video details for channel: {channel_id}")
        if not channel_id:
            logger.error("Channel ID is None")
            return {}
        channel_response = self.api_client.get_channel_info(channel_id=channel_id)
        if not channel_response.items:
            logger.error(f"No channel found for ID: {channel_id}")
            return {}
        channel_item = channel_response.items[0]
        if not hasattr(channel_item, "contentDetails") or not hasattr(
            channel_item.contentDetails, "relatedPlaylists"
        ):
            logger.error(f"Channel {channel_id} does not have expected content details")
            return {}
        playlist_id = channel_item.contentDetails.relatedPlaylists.uploads
        return self.get_all_video_details_from_playlist(playlist_id)

    def get_video_details(self, video_id: str) -> Dict[str, Union[str, int]]:
        """
        Fetches details of a single video.

        Args:
            video_id (str): The ID of the video to fetch details for.

        Returns:
            Dict[str, Union[str, int]]: A dictionary containing video details such as title, publish date, and video ID.
        """
        logger.info(f"Fetching video details for video: {video_id}")
        if not video_id:
            logger.error("Video ID is None")
            return {}
        video_response = self.api_client.get_video_by_id(video_id=video_id)
        video = video_response.items[0]
        return {
            "title": video.snippet.title,
            "publish_date": video.snippet.publishedAt,
            "video_id": video.id,
        }

    def _get_video_details(self, items: List) -> Dict[str, Dict[str, Union[str, int]]]:
        """
        Helper method to extract video details from a list of playlist items.

        Args:
            items (List): A list of playlist items.

        Returns:
            Dict[str, Dict[str, Union[str, int]]]: A dictionary where keys are video IDs and values are dictionaries
            containing video details such as title, publish date, and video ID.
        """
        videos = {
            item.contentDetails.videoId: {
                "title": item.snippet.title,
                "publish_date": item.snippet.publishedAt,
                "video_id": item.contentDetails.videoId,
            }
            for item in items
        }
        logger.debug(f"Fetched details for {len(videos)} videos")
        return videos

    def get_transcript_details_from_video(
        self, video_id: str
    ) -> Optional[List[Dict[str, Union[str, float]]]]:
        """
        Fetches the transcript of a given video.

        Args:
            video_id (str): The ID of the video to fetch the transcript for.

        Returns:
            Optional[List[Dict[str, Union[str, float]]]]: A list of dictionaries containing transcript details such as
            start time and text, or None if an error occurs.
        """
        logger.info(f"Fetching transcript for video: {video_id}")
        try:
            return YouTubeTranscriptApi.get_transcript(video_id)
        except Exception as e:
            logger.error(f"Error fetching transcript for video {video_id}: {str(e)}")
            return None

    @staticmethod
    def merge_transcript_chunks(
        chunks: List[Dict[str, Union[str, float]]],
    ) -> List[Dict[str, Union[str, float]]]:
        """
        Merges adjacent transcript chunks into larger chunks.

        Args:
            chunks (List[Dict[str, Union[str, float]]]): A list of dictionaries containing transcript details such as
            start time and text.

        Returns:
            List[Dict[str, Union[str, float]]]: A list of merged transcript chunks.
        """
        logger.debug(f"Merging {len(chunks)} transcript chunks")
        merged_list = [
            {
                "start": chunks[i]["start"],
                "text": chunks[i]["text"] + " " + chunks[i + 1]["text"],
            }
            for i in range(0, len(chunks) - 1, 2)
        ]
        if len(chunks) % 2 != 0:
            merged_list.append(chunks[-1])
        logger.debug(f"Merged into {len(merged_list)} chunks")
        return merged_list

Static methods

def merge_transcript_chunks(chunks: List[Dict[str, Union[str, float]]]) ‑> List[Dict[str, Union[str, float]]]

Merges adjacent transcript chunks into larger chunks.

Args

chunks : List[Dict[str, Union[str, float]]]
A list of dictionaries containing transcript details such as

start time and text.

Returns

List[Dict[str, Union[str, float]]]
A list of merged transcript chunks.

Methods

def get_all_video_details_from_channel(self, channel_id: str) ‑> Dict[str, Dict[str, Union[str, int]]]

Fetches details of all videos in a given channel.

Args

channel_id : str
The ID of the channel to fetch video details from.

Returns

Dict[str, Dict[str, Union[str, int]]]
A dictionary where keys are video IDs and values are dictionaries

containing video details such as title, publish date, and video ID.

def get_all_video_details_from_playlist(self, playlist_id: str) ‑> Dict[str, Dict[str, Union[str, int]]]

Fetches details of all videos in a given playlist.

Args

playlist_id : str
The ID of the playlist to fetch video details from.

Returns

Dict[str, Dict[str, Union[str, int]]]
A dictionary where keys are video IDs and values are dictionaries

containing video details such as title, publish date, and video ID.

def get_transcript_details_from_video(self, video_id: str) ‑> Optional[List[Dict[str, Union[str, float]]]]

Fetches the transcript of a given video.

Args

video_id : str
The ID of the video to fetch the transcript for.

Returns

Optional[List[Dict[str, Union[str, float]]]]
A list of dictionaries containing transcript details such as

start time and text, or None if an error occurs.

def get_video_details(self, video_id: str) ‑> Dict[str, Union[str, int]]

Fetches details of a single video.

Args

video_id : str
The ID of the video to fetch details for.

Returns

Dict[str, Union[str, int]]
A dictionary containing video details such as title, publish date, and video ID.