fix: add error handling, refactor _findKey to use json.items()
This commit is contained in:
parent
dbdf2c0c10
commit
8f76393ad8
1 changed files with 20 additions and 11 deletions
|
|
@ -1,4 +1,5 @@
|
|||
import re
|
||||
import json
|
||||
|
||||
from typing import Any, Union, Dict, List
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
|
@ -13,7 +14,7 @@ try:
|
|||
|
||||
IS_YOUTUBE_TRANSCRIPT_CAPABLE = True
|
||||
except ModuleNotFoundError:
|
||||
pass
|
||||
IS_YOUTUBE_TRANSCRIPT_CAPABLE = False
|
||||
|
||||
|
||||
class YouTubeConverter(DocumentConverter):
|
||||
|
|
@ -35,10 +36,16 @@ class YouTubeConverter(DocumentConverter):
|
|||
if not url.startswith("https://www.youtube.com/watch?"):
|
||||
return None
|
||||
|
||||
# Parse the file
|
||||
soup = None
|
||||
with open(local_path, "rt", encoding="utf-8") as fh:
|
||||
soup = BeautifulSoup(fh.read(), "html.parser")
|
||||
# Parse the file with error handling
|
||||
try:
|
||||
with open(local_path, "rt", encoding="utf-8") as fh:
|
||||
soup = BeautifulSoup(fh.read(), "html.parser")
|
||||
except Exception as e:
|
||||
print(f"Error reading YouTube page: {e}")
|
||||
return None
|
||||
|
||||
if not soup.title or not soup.title.string:
|
||||
return None
|
||||
|
||||
# Read the meta tags
|
||||
assert soup.title is not None and soup.title.string is not None
|
||||
|
|
@ -107,7 +114,9 @@ class YouTubeConverter(DocumentConverter):
|
|||
"youtube_transcript_languages", ("en",)
|
||||
)
|
||||
# Must be a single transcript.
|
||||
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=youtube_transcript_languages) # type: ignore
|
||||
transcript = YouTubeTranscriptApi.get_transcript(
|
||||
video_id, languages=youtube_transcript_languages
|
||||
) # type: ignore
|
||||
transcript_text = " ".join([part["text"] for part in transcript]) # type: ignore
|
||||
# Alternative formatting:
|
||||
# formatter = TextFormatter()
|
||||
|
|
@ -131,23 +140,23 @@ class YouTubeConverter(DocumentConverter):
|
|||
keys: List[str],
|
||||
default: Union[str, None] = None,
|
||||
) -> Union[str, None]:
|
||||
"""Get first non-empty value from metadata matching given keys."""
|
||||
for k in keys:
|
||||
if k in metadata:
|
||||
return metadata[k]
|
||||
return default
|
||||
|
||||
def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json type
|
||||
"""Recursively search for a key in nested dictionary/list structures."""
|
||||
if isinstance(json, list):
|
||||
for elm in json:
|
||||
ret = self._findKey(elm, key)
|
||||
if ret is not None:
|
||||
return ret
|
||||
elif isinstance(json, dict):
|
||||
for k in json:
|
||||
for k, v in json.items():
|
||||
if k == key:
|
||||
return json[k]
|
||||
else:
|
||||
ret = self._findKey(json[k], key)
|
||||
if ret is not None:
|
||||
return ret
|
||||
if result := self._findKey(v, key):
|
||||
return result
|
||||
return None
|
||||
|
|
|
|||
Loading…
Reference in a new issue