Source code for pytubefix.contrib.search

"""Module for interacting with YouTube search."""
# Native python imports
import logging
from enum import Enum
from typing import List, Optional, Dict, Callable, Tuple, Set

# Local imports
from pytubefix import YouTube, Channel, Playlist
from pytubefix.helpers import install_proxy
from pytubefix.innertube import InnerTube
from pytubefix.protobuf import encode_protobuf

logger = logging.getLogger(__name__)

class Filter:
    class Type(Enum):
        VIDEO = {2: 1}
        CHANNEL = {2: 2}
        PLAYLIST = {2: 3}
        MOVIE = {2: 4}

    class Duration(Enum):
        UNDER_4_MINUTES = {3: 1}
        OVER_20_MINUTES = {3: 2}
        BETWEEN_4_20_MINUTES = {3: 3}

    class UploadDate(Enum):
        LAST_HOUR = {1: 1}
        TODAY = {1: 2}
        THIS_WEEK = {1: 3}
        THIS_MONTH = {1: 4}
        THIS_YEAR = {1: 5}

    class Features(Enum):
        LIVE = {8: 1}
        _4K = {14: 1}
        HD = {4: 1}
        SUBTITLES_CC = {5: 1}
        CREATIVE_COMMONS = {6: 1}
        _360 = {15: 1}
        VR180 = {26: 1}
        _3D = {7: 1}
        HDR = {25: 1}
        LOCATION = {23: 1}
        PURCHASED = {9: 1}

    class SortBy(Enum):
        RELEVANCE = {1: 0}
        UPLOAD_DATE = {1: 2}
        VIEW_COUNT = {1: 3}
        RATING = {1: 1}

    def __init__(self):
        self._type = None
        self._upload_date = None
        self._duration = None
        self._sort_by = None
        self._features = set()

        # TODO: Remove legacy implementation

        ################################# legacy implementation ####################################

        self.filters = {
            'upload_date': None,
            'type': None,
            'duration': None,
            'features': [],
            'sort_by': None
        }

    def set_filters(self, filter_dict):
        """
        Applies multiple filters at once using a dictionary.
        """
        for category, value in filter_dict.items():
            if category == 'features':
                if isinstance(value, list):
                    logger.debug("Filter features is a list")
                    self.filters['features'].extend(value)
                else:
                    self.filters['features'].append(value)
            else:
                self.filters[category] = value

    def clear_filters(self):
        """
        Clear all filters
        """
        for category in self.filters:
            if category == 'features':
                self.filters[category] = []
            else:
                self.filters[category] = None

    def get_filters_params(self):
        """
        Combines selected filters into a final structure
        """
        combined = {}

        if self.filters['sort_by']:
            combined.update(self.filters['sort_by'])

        combined[2] = {}

        if self.filters['type']:
            combined[2].update(self.filters['type'])

        if self.filters['duration']:
            combined[2].update(self.filters['duration'])

        if self.filters['features']:
            for feature in self.filters['features']:
                combined[2].update(feature)

        if self.filters['upload_date']:
            combined[2].update(self.filters['upload_date'])

        combined[2] = dict(sorted(combined.get(2, {}).items()))

        logger.debug(f"Combined filters: {combined}")

        encoded_filters = encode_protobuf(str(combined))

        logger.debug(f"Filter encoded in protobuf: {encoded_filters}")

        return encoded_filters

    @staticmethod
    def get_upload_date(option: str) -> dict:
        """
        Last Hour,
        Today,
        This Week,
        This Month,
        This Year
        """
        filters = {
            "Last Hour": {1: 1},
            "Today": {1: 2},
            "This Week": {1: 3},
            "This Month": {1: 4},
            "This Year": {1: 5},
        }
        return filters.get(option)

    @staticmethod
    def get_type(option: str) -> dict:
        """
        Video,
        Channel,
        Playlist,
        Movie
        """
        filters = {
            "Video": {2: 1},
            "Channel": {2: 2},
            "Playlist": {2: 3},
            "Movie": {2: 4},
        }
        return filters.get(option)

    @staticmethod
    def get_duration(option: str) -> dict:
        """
        Under 4 minutes,
        Over 20 minutes,
        4 - 20 minutes
        """
        filters = {
            "Under 4 minutes": {3: 1},
            "Over 20 minutes": {3: 2},
            "4 - 20 minutes": {3: 3},
        }
        return filters.get(option)

    @staticmethod
    def get_features(option: str) -> dict:
        """
        Live,
        4K,
        HD,
        Subtitles/CC,
        Creative Commons,
        360,
        VR180,
        3D,
        HDR,
        Location,
        Purchased
        """
        filters = {
            "Live": {8: 1},
            "4K": {14: 1},
            "HD": {4: 1},
            "Subtitles/CC": {5: 1},
            "Creative Commons": {6: 1},
            "360": {15: 1},
            "VR180": {26: 1},
            "3D": {7: 1},
            "HDR": {25: 1},
            "Location": {23: 1},
            "Purchased": {9: 1},
        }
        return filters.get(option)

    @staticmethod
    def get_sort_by(option: str) -> dict:
        """
        Relevance,
        Upload date,
        View count,
        Rating
        """
        filters = {
            "Relevance": {1: 0},
            "Upload date": {1: 2},
            "View count": {1: 3},
            "Rating": {1: 1},
        }
        return filters.get(option)

    ################################# legacy implementation ####################################

    @classmethod
    def create(cls):
        return cls()

    def type(self, t: Type):
        self._type = t
        return self

    def upload_date(self, u: UploadDate):
        self._upload_date = u
        return self

    def duration(self, d: Duration):
        self._duration = d
        return self

    def sort_by(self, s: SortBy):
        self._sort_by = s
        return self

    def feature(self, features: List[Features]):
        if isinstance(features, list):
            self._features.update(features)
        else:
            self._features.update([features])
        return self

    def merge(self):
        result = {}
        group_2 = {}

        if self._sort_by is not None:
            sort_dict = self._sort_by.value
            if len(sort_dict) != 1 or 1 not in sort_dict:
                raise ValueError("SortBy must have exactly {1: <value>}")
            result[1] = sort_dict[1]

        for val in [self._type, self._upload_date, self._duration]:
            if val is not None:
                for k, v in val.value.items():
                    group_2[k] = v

        for f in self._features:
            for k, v in f.value.items():
                group_2[k] = v

        if group_2:
            result[2] = group_2

        logger.debug(f"Combined filters: {result}")

        return str(result)

    def __repr__(self):
        return (f"Filter(type={self._type}, upload_date={self._upload_date}, "
                f"duration={self._duration}, sort_by={self._sort_by}, "
                f"features={[f.name for f in self._features]})")