More converters.
This commit is contained in:
parent
71fa94e3c9
commit
7a6a08b3a1
5 changed files with 367 additions and 327 deletions
|
|
@ -50,6 +50,9 @@ from .converters import (
|
|||
DocumentConverterResult,
|
||||
PlainTextConverter,
|
||||
HtmlConverter,
|
||||
RssConverter,
|
||||
WikipediaConverter,
|
||||
YouTubeConverter,
|
||||
)
|
||||
from .converters._markdownify import _CustomMarkdownify
|
||||
|
||||
|
|
@ -88,332 +91,6 @@ except ModuleNotFoundError:
|
|||
finally:
|
||||
resetwarnings()
|
||||
|
||||
# Optional YouTube transcription support
|
||||
try:
|
||||
from youtube_transcript_api import YouTubeTranscriptApi
|
||||
|
||||
IS_YOUTUBE_TRANSCRIPT_CAPABLE = True
|
||||
except ModuleNotFoundError:
|
||||
pass
|
||||
|
||||
|
||||
class RSSConverter(DocumentConverter):
|
||||
"""Convert RSS / Atom type to markdown"""
|
||||
|
||||
def convert(
|
||||
self, local_path: str, **kwargs
|
||||
) -> Union[None, DocumentConverterResult]:
|
||||
# Bail if not RSS type
|
||||
extension = kwargs.get("file_extension", "")
|
||||
if extension.lower() not in [".xml", ".rss", ".atom"]:
|
||||
return None
|
||||
try:
|
||||
doc = minidom.parse(local_path)
|
||||
except BaseException as _:
|
||||
return None
|
||||
result = None
|
||||
if doc.getElementsByTagName("rss"):
|
||||
# A RSS feed must have a root element of <rss>
|
||||
result = self._parse_rss_type(doc)
|
||||
elif doc.getElementsByTagName("feed"):
|
||||
root = doc.getElementsByTagName("feed")[0]
|
||||
if root.getElementsByTagName("entry"):
|
||||
# An Atom feed must have a root element of <feed> and at least one <entry>
|
||||
result = self._parse_atom_type(doc)
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
# not rss or atom
|
||||
return None
|
||||
|
||||
return result
|
||||
|
||||
def _parse_atom_type(
|
||||
self, doc: minidom.Document
|
||||
) -> Union[None, DocumentConverterResult]:
|
||||
"""Parse the type of an Atom feed.
|
||||
|
||||
Returns None if the feed type is not recognized or something goes wrong.
|
||||
"""
|
||||
try:
|
||||
root = doc.getElementsByTagName("feed")[0]
|
||||
title = self._get_data_by_tag_name(root, "title")
|
||||
subtitle = self._get_data_by_tag_name(root, "subtitle")
|
||||
entries = root.getElementsByTagName("entry")
|
||||
md_text = f"# {title}\n"
|
||||
if subtitle:
|
||||
md_text += f"{subtitle}\n"
|
||||
for entry in entries:
|
||||
entry_title = self._get_data_by_tag_name(entry, "title")
|
||||
entry_summary = self._get_data_by_tag_name(entry, "summary")
|
||||
entry_updated = self._get_data_by_tag_name(entry, "updated")
|
||||
entry_content = self._get_data_by_tag_name(entry, "content")
|
||||
|
||||
if entry_title:
|
||||
md_text += f"\n## {entry_title}\n"
|
||||
if entry_updated:
|
||||
md_text += f"Updated on: {entry_updated}\n"
|
||||
if entry_summary:
|
||||
md_text += self._parse_content(entry_summary)
|
||||
if entry_content:
|
||||
md_text += self._parse_content(entry_content)
|
||||
|
||||
return DocumentConverterResult(
|
||||
title=title,
|
||||
text_content=md_text,
|
||||
)
|
||||
except BaseException as _:
|
||||
return None
|
||||
|
||||
def _parse_rss_type(
|
||||
self, doc: minidom.Document
|
||||
) -> Union[None, DocumentConverterResult]:
|
||||
"""Parse the type of an RSS feed.
|
||||
|
||||
Returns None if the feed type is not recognized or something goes wrong.
|
||||
"""
|
||||
try:
|
||||
root = doc.getElementsByTagName("rss")[0]
|
||||
channel = root.getElementsByTagName("channel")
|
||||
if not channel:
|
||||
return None
|
||||
channel = channel[0]
|
||||
channel_title = self._get_data_by_tag_name(channel, "title")
|
||||
channel_description = self._get_data_by_tag_name(channel, "description")
|
||||
items = channel.getElementsByTagName("item")
|
||||
if channel_title:
|
||||
md_text = f"# {channel_title}\n"
|
||||
if channel_description:
|
||||
md_text += f"{channel_description}\n"
|
||||
if not items:
|
||||
items = []
|
||||
for item in items:
|
||||
title = self._get_data_by_tag_name(item, "title")
|
||||
description = self._get_data_by_tag_name(item, "description")
|
||||
pubDate = self._get_data_by_tag_name(item, "pubDate")
|
||||
content = self._get_data_by_tag_name(item, "content:encoded")
|
||||
|
||||
if title:
|
||||
md_text += f"\n## {title}\n"
|
||||
if pubDate:
|
||||
md_text += f"Published on: {pubDate}\n"
|
||||
if description:
|
||||
md_text += self._parse_content(description)
|
||||
if content:
|
||||
md_text += self._parse_content(content)
|
||||
|
||||
return DocumentConverterResult(
|
||||
title=channel_title,
|
||||
text_content=md_text,
|
||||
)
|
||||
except BaseException as _:
|
||||
print(traceback.format_exc())
|
||||
return None
|
||||
|
||||
def _parse_content(self, content: str) -> str:
|
||||
"""Parse the content of an RSS feed item"""
|
||||
try:
|
||||
# using bs4 because many RSS feeds have HTML-styled content
|
||||
soup = BeautifulSoup(content, "html.parser")
|
||||
return _CustomMarkdownify().convert_soup(soup)
|
||||
except BaseException as _:
|
||||
return content
|
||||
|
||||
def _get_data_by_tag_name(
|
||||
self, element: minidom.Element, tag_name: str
|
||||
) -> Union[str, None]:
|
||||
"""Get data from first child element with the given tag name.
|
||||
Returns None when no such element is found.
|
||||
"""
|
||||
nodes = element.getElementsByTagName(tag_name)
|
||||
if not nodes:
|
||||
return None
|
||||
fc = nodes[0].firstChild
|
||||
if fc:
|
||||
return fc.data
|
||||
return None
|
||||
|
||||
|
||||
class WikipediaConverter(DocumentConverter):
|
||||
"""Handle Wikipedia pages separately, focusing only on the main document content."""
|
||||
|
||||
def convert(
|
||||
self, local_path: str, **kwargs: Any
|
||||
) -> Union[None, DocumentConverterResult]:
|
||||
# Bail if not Wikipedia
|
||||
extension = kwargs.get("file_extension", "")
|
||||
if extension.lower() not in [".html", ".htm"]:
|
||||
return None
|
||||
url = kwargs.get("url", "")
|
||||
if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url):
|
||||
return None
|
||||
|
||||
# Parse the file
|
||||
soup = None
|
||||
with open(local_path, "rt", encoding="utf-8") as fh:
|
||||
soup = BeautifulSoup(fh.read(), "html.parser")
|
||||
|
||||
# Remove javascript and style blocks
|
||||
for script in soup(["script", "style"]):
|
||||
script.extract()
|
||||
|
||||
# Print only the main content
|
||||
body_elm = soup.find("div", {"id": "mw-content-text"})
|
||||
title_elm = soup.find("span", {"class": "mw-page-title-main"})
|
||||
|
||||
webpage_text = ""
|
||||
main_title = None if soup.title is None else soup.title.string
|
||||
|
||||
if body_elm:
|
||||
# What's the title
|
||||
if title_elm and len(title_elm) > 0:
|
||||
main_title = title_elm.string # type: ignore
|
||||
assert isinstance(main_title, str)
|
||||
|
||||
# Convert the page
|
||||
webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup(
|
||||
body_elm
|
||||
)
|
||||
else:
|
||||
webpage_text = _CustomMarkdownify().convert_soup(soup)
|
||||
|
||||
return DocumentConverterResult(
|
||||
title=main_title,
|
||||
text_content=webpage_text,
|
||||
)
|
||||
|
||||
|
||||
class YouTubeConverter(DocumentConverter):
|
||||
"""Handle YouTube specially, focusing on the video title, description, and transcript."""
|
||||
|
||||
def convert(
|
||||
self, local_path: str, **kwargs: Any
|
||||
) -> Union[None, DocumentConverterResult]:
|
||||
# Bail if not YouTube
|
||||
extension = kwargs.get("file_extension", "")
|
||||
if extension.lower() not in [".html", ".htm"]:
|
||||
return None
|
||||
url = kwargs.get("url", "")
|
||||
if not url.startswith("https://www.youtube.com/watch?"):
|
||||
return None
|
||||
|
||||
# Parse the file
|
||||
soup = None
|
||||
with open(local_path, "rt", encoding="utf-8") as fh:
|
||||
soup = BeautifulSoup(fh.read(), "html.parser")
|
||||
|
||||
# Read the meta tags
|
||||
assert soup.title is not None and soup.title.string is not None
|
||||
metadata: Dict[str, str] = {"title": soup.title.string}
|
||||
for meta in soup(["meta"]):
|
||||
for a in meta.attrs:
|
||||
if a in ["itemprop", "property", "name"]:
|
||||
metadata[meta[a]] = meta.get("content", "")
|
||||
break
|
||||
|
||||
# We can also try to read the full description. This is more prone to breaking, since it reaches into the page implementation
|
||||
try:
|
||||
for script in soup(["script"]):
|
||||
content = script.text
|
||||
if "ytInitialData" in content:
|
||||
lines = re.split(r"\r?\n", content)
|
||||
obj_start = lines[0].find("{")
|
||||
obj_end = lines[0].rfind("}")
|
||||
if obj_start >= 0 and obj_end >= 0:
|
||||
data = json.loads(lines[0][obj_start : obj_end + 1])
|
||||
attrdesc = self._findKey(data, "attributedDescriptionBodyText") # type: ignore
|
||||
if attrdesc:
|
||||
metadata["description"] = str(attrdesc["content"])
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Start preparing the page
|
||||
webpage_text = "# YouTube\n"
|
||||
|
||||
title = self._get(metadata, ["title", "og:title", "name"]) # type: ignore
|
||||
assert isinstance(title, str)
|
||||
|
||||
if title:
|
||||
webpage_text += f"\n## {title}\n"
|
||||
|
||||
stats = ""
|
||||
views = self._get(metadata, ["interactionCount"]) # type: ignore
|
||||
if views:
|
||||
stats += f"- **Views:** {views}\n"
|
||||
|
||||
keywords = self._get(metadata, ["keywords"]) # type: ignore
|
||||
if keywords:
|
||||
stats += f"- **Keywords:** {keywords}\n"
|
||||
|
||||
runtime = self._get(metadata, ["duration"]) # type: ignore
|
||||
if runtime:
|
||||
stats += f"- **Runtime:** {runtime}\n"
|
||||
|
||||
if len(stats) > 0:
|
||||
webpage_text += f"\n### Video Metadata\n{stats}\n"
|
||||
|
||||
description = self._get(metadata, ["description", "og:description"]) # type: ignore
|
||||
if description:
|
||||
webpage_text += f"\n### Description\n{description}\n"
|
||||
|
||||
if IS_YOUTUBE_TRANSCRIPT_CAPABLE:
|
||||
transcript_text = ""
|
||||
parsed_url = urlparse(url) # type: ignore
|
||||
params = parse_qs(parsed_url.query) # type: ignore
|
||||
if "v" in params:
|
||||
assert isinstance(params["v"][0], str)
|
||||
video_id = str(params["v"][0])
|
||||
try:
|
||||
youtube_transcript_languages = kwargs.get(
|
||||
"youtube_transcript_languages", ("en",)
|
||||
)
|
||||
# Must be a single transcript.
|
||||
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=youtube_transcript_languages) # type: ignore
|
||||
transcript_text = " ".join([part["text"] for part in transcript]) # type: ignore
|
||||
# Alternative formatting:
|
||||
# formatter = TextFormatter()
|
||||
# formatter.format_transcript(transcript)
|
||||
except Exception:
|
||||
pass
|
||||
if transcript_text:
|
||||
webpage_text += f"\n### Transcript\n{transcript_text}\n"
|
||||
|
||||
title = title if title else soup.title.string
|
||||
assert isinstance(title, str)
|
||||
|
||||
return DocumentConverterResult(
|
||||
title=title,
|
||||
text_content=webpage_text,
|
||||
)
|
||||
|
||||
def _get(
|
||||
self,
|
||||
metadata: Dict[str, str],
|
||||
keys: List[str],
|
||||
default: Union[str, None] = None,
|
||||
) -> Union[str, None]:
|
||||
for k in keys:
|
||||
if k in metadata:
|
||||
return metadata[k]
|
||||
return default
|
||||
|
||||
def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json type
|
||||
if isinstance(json, list):
|
||||
for elm in json:
|
||||
ret = self._findKey(elm, key)
|
||||
if ret is not None:
|
||||
return ret
|
||||
elif isinstance(json, dict):
|
||||
for k in json:
|
||||
if k == key:
|
||||
return json[k]
|
||||
else:
|
||||
ret = self._findKey(json[k], key)
|
||||
if ret is not None:
|
||||
return ret
|
||||
return None
|
||||
|
||||
|
||||
class IpynbConverter(DocumentConverter):
|
||||
"""Converts Jupyter Notebook (.ipynb) files to Markdown."""
|
||||
|
|
@ -1369,8 +1046,9 @@ class MarkItDown:
|
|||
# To this end, the most specific converters should appear below the most generic converters
|
||||
self.register_page_converter(PlainTextConverter())
|
||||
self.register_page_converter(HtmlConverter())
|
||||
self.register_page_converter(RSSConverter())
|
||||
self.register_page_converter(RssConverter())
|
||||
self.register_page_converter(WikipediaConverter())
|
||||
|
||||
self.register_page_converter(YouTubeConverter())
|
||||
self.register_page_converter(BingSerpConverter())
|
||||
self.register_page_converter(DocxConverter())
|
||||
|
|
|
|||
|
|
@ -5,10 +5,16 @@
|
|||
from ._base import DocumentConverter, DocumentConverterResult
|
||||
from ._plain_text_converter import PlainTextConverter
|
||||
from ._html_converter import HtmlConverter
|
||||
from ._rss_converter import RssConverter
|
||||
from ._wikipedia_converter import WikipediaConverter
|
||||
from ._youtube_converter import YouTubeConverter
|
||||
|
||||
__all__ = [
|
||||
"DocumentConverter",
|
||||
"DocumentConverterResult",
|
||||
"PlainTextConverter",
|
||||
"HtmlConverter",
|
||||
"RssConverter",
|
||||
"WikipediaConverter",
|
||||
"YouTubeConverter",
|
||||
]
|
||||
|
|
|
|||
144
src/markitdown/converters/_rss_converter.py
Normal file
144
src/markitdown/converters/_rss_converter.py
Normal file
|
|
@ -0,0 +1,144 @@
|
|||
# type: ignore
|
||||
from xml.dom import minidom
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from ._markdownify import _CustomMarkdownify
|
||||
from ._base import DocumentConverter, DocumentConverterResult
|
||||
|
||||
|
||||
class RssConverter(DocumentConverter):
|
||||
"""Convert RSS / Atom type to markdown"""
|
||||
|
||||
def convert(
|
||||
self, local_path: str, **kwargs
|
||||
) -> Union[None, DocumentConverterResult]:
|
||||
# Bail if not RSS type
|
||||
extension = kwargs.get("file_extension", "")
|
||||
if extension.lower() not in [".xml", ".rss", ".atom"]:
|
||||
return None
|
||||
try:
|
||||
doc = minidom.parse(local_path)
|
||||
except BaseException as _:
|
||||
return None
|
||||
result = None
|
||||
if doc.getElementsByTagName("rss"):
|
||||
# A RSS feed must have a root element of <rss>
|
||||
result = self._parse_rss_type(doc)
|
||||
elif doc.getElementsByTagName("feed"):
|
||||
root = doc.getElementsByTagName("feed")[0]
|
||||
if root.getElementsByTagName("entry"):
|
||||
# An Atom feed must have a root element of <feed> and at least one <entry>
|
||||
result = self._parse_atom_type(doc)
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
# not rss or atom
|
||||
return None
|
||||
|
||||
return result
|
||||
|
||||
def _parse_atom_type(
|
||||
self, doc: minidom.Document
|
||||
) -> Union[None, DocumentConverterResult]:
|
||||
"""Parse the type of an Atom feed.
|
||||
|
||||
Returns None if the feed type is not recognized or something goes wrong.
|
||||
"""
|
||||
try:
|
||||
root = doc.getElementsByTagName("feed")[0]
|
||||
title = self._get_data_by_tag_name(root, "title")
|
||||
subtitle = self._get_data_by_tag_name(root, "subtitle")
|
||||
entries = root.getElementsByTagName("entry")
|
||||
md_text = f"# {title}\n"
|
||||
if subtitle:
|
||||
md_text += f"{subtitle}\n"
|
||||
for entry in entries:
|
||||
entry_title = self._get_data_by_tag_name(entry, "title")
|
||||
entry_summary = self._get_data_by_tag_name(entry, "summary")
|
||||
entry_updated = self._get_data_by_tag_name(entry, "updated")
|
||||
entry_content = self._get_data_by_tag_name(entry, "content")
|
||||
|
||||
if entry_title:
|
||||
md_text += f"\n## {entry_title}\n"
|
||||
if entry_updated:
|
||||
md_text += f"Updated on: {entry_updated}\n"
|
||||
if entry_summary:
|
||||
md_text += self._parse_content(entry_summary)
|
||||
if entry_content:
|
||||
md_text += self._parse_content(entry_content)
|
||||
|
||||
return DocumentConverterResult(
|
||||
title=title,
|
||||
text_content=md_text,
|
||||
)
|
||||
except BaseException as _:
|
||||
return None
|
||||
|
||||
def _parse_rss_type(
|
||||
self, doc: minidom.Document
|
||||
) -> Union[None, DocumentConverterResult]:
|
||||
"""Parse the type of an RSS feed.
|
||||
|
||||
Returns None if the feed type is not recognized or something goes wrong.
|
||||
"""
|
||||
try:
|
||||
root = doc.getElementsByTagName("rss")[0]
|
||||
channel = root.getElementsByTagName("channel")
|
||||
if not channel:
|
||||
return None
|
||||
channel = channel[0]
|
||||
channel_title = self._get_data_by_tag_name(channel, "title")
|
||||
channel_description = self._get_data_by_tag_name(channel, "description")
|
||||
items = channel.getElementsByTagName("item")
|
||||
if channel_title:
|
||||
md_text = f"# {channel_title}\n"
|
||||
if channel_description:
|
||||
md_text += f"{channel_description}\n"
|
||||
if not items:
|
||||
items = []
|
||||
for item in items:
|
||||
title = self._get_data_by_tag_name(item, "title")
|
||||
description = self._get_data_by_tag_name(item, "description")
|
||||
pubDate = self._get_data_by_tag_name(item, "pubDate")
|
||||
content = self._get_data_by_tag_name(item, "content:encoded")
|
||||
|
||||
if title:
|
||||
md_text += f"\n## {title}\n"
|
||||
if pubDate:
|
||||
md_text += f"Published on: {pubDate}\n"
|
||||
if description:
|
||||
md_text += self._parse_content(description)
|
||||
if content:
|
||||
md_text += self._parse_content(content)
|
||||
|
||||
return DocumentConverterResult(
|
||||
title=channel_title,
|
||||
text_content=md_text,
|
||||
)
|
||||
except BaseException as _:
|
||||
print(traceback.format_exc())
|
||||
return None
|
||||
|
||||
def _parse_content(self, content: str) -> str:
|
||||
"""Parse the content of an RSS feed item"""
|
||||
try:
|
||||
# using bs4 because many RSS feeds have HTML-styled content
|
||||
soup = BeautifulSoup(content, "html.parser")
|
||||
return _CustomMarkdownify().convert_soup(soup)
|
||||
except BaseException as _:
|
||||
return content
|
||||
|
||||
def _get_data_by_tag_name(
|
||||
self, element: minidom.Element, tag_name: str
|
||||
) -> Union[str, None]:
|
||||
"""Get data from first child element with the given tag name.
|
||||
Returns None when no such element is found.
|
||||
"""
|
||||
nodes = element.getElementsByTagName(tag_name)
|
||||
if not nodes:
|
||||
return None
|
||||
fc = nodes[0].firstChild
|
||||
if fc:
|
||||
return fc.data
|
||||
return None
|
||||
57
src/markitdown/converters/_wikipedia_converter.py
Normal file
57
src/markitdown/converters/_wikipedia_converter.py
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
import re
|
||||
|
||||
from typing import Any, Union
|
||||
from urllib.parse import parse_qs, quote, unquote, urlparse, urlunparse
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from ._base import DocumentConverter, DocumentConverterResult
|
||||
from ._markdownify import _CustomMarkdownify
|
||||
|
||||
|
||||
class WikipediaConverter(DocumentConverter):
|
||||
"""Handle Wikipedia pages separately, focusing only on the main document content."""
|
||||
|
||||
def convert(
|
||||
self, local_path: str, **kwargs: Any
|
||||
) -> Union[None, DocumentConverterResult]:
|
||||
# Bail if not Wikipedia
|
||||
extension = kwargs.get("file_extension", "")
|
||||
if extension.lower() not in [".html", ".htm"]:
|
||||
return None
|
||||
url = kwargs.get("url", "")
|
||||
if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url):
|
||||
return None
|
||||
|
||||
# Parse the file
|
||||
soup = None
|
||||
with open(local_path, "rt", encoding="utf-8") as fh:
|
||||
soup = BeautifulSoup(fh.read(), "html.parser")
|
||||
|
||||
# Remove javascript and style blocks
|
||||
for script in soup(["script", "style"]):
|
||||
script.extract()
|
||||
|
||||
# Print only the main content
|
||||
body_elm = soup.find("div", {"id": "mw-content-text"})
|
||||
title_elm = soup.find("span", {"class": "mw-page-title-main"})
|
||||
|
||||
webpage_text = ""
|
||||
main_title = None if soup.title is None else soup.title.string
|
||||
|
||||
if body_elm:
|
||||
# What's the title
|
||||
if title_elm and len(title_elm) > 0:
|
||||
main_title = title_elm.string # type: ignore
|
||||
assert isinstance(main_title, str)
|
||||
|
||||
# Convert the page
|
||||
webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup(
|
||||
body_elm
|
||||
)
|
||||
else:
|
||||
webpage_text = _CustomMarkdownify().convert_soup(soup)
|
||||
|
||||
return DocumentConverterResult(
|
||||
title=main_title,
|
||||
text_content=webpage_text,
|
||||
)
|
||||
155
src/markitdown/converters/_youtube_converter.py
Normal file
155
src/markitdown/converters/_youtube_converter.py
Normal file
|
|
@ -0,0 +1,155 @@
|
|||
import re
|
||||
|
||||
from typing import Any, Union, Dict, List
|
||||
from urllib.parse import parse_qs, quote, unquote, urlparse, urlunparse
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from ._base import DocumentConverter, DocumentConverterResult
|
||||
from ._markdownify import _CustomMarkdownify
|
||||
|
||||
from .._exceptions import (
|
||||
MarkItDownException,
|
||||
ConverterPrerequisiteException,
|
||||
FileConversionException,
|
||||
UnsupportedFormatException,
|
||||
)
|
||||
|
||||
# Optional YouTube transcription support
|
||||
try:
|
||||
from youtube_transcript_api import YouTubeTranscriptApi
|
||||
|
||||
IS_YOUTUBE_TRANSCRIPT_CAPABLE = True
|
||||
except ModuleNotFoundError:
|
||||
pass
|
||||
|
||||
|
||||
class YouTubeConverter(DocumentConverter):
|
||||
"""Handle YouTube specially, focusing on the video title, description, and transcript."""
|
||||
|
||||
def convert(
|
||||
self, local_path: str, **kwargs: Any
|
||||
) -> Union[None, DocumentConverterResult]:
|
||||
# Bail if not YouTube
|
||||
extension = kwargs.get("file_extension", "")
|
||||
if extension.lower() not in [".html", ".htm"]:
|
||||
return None
|
||||
url = kwargs.get("url", "")
|
||||
if not url.startswith("https://www.youtube.com/watch?"):
|
||||
return None
|
||||
|
||||
# Parse the file
|
||||
soup = None
|
||||
with open(local_path, "rt", encoding="utf-8") as fh:
|
||||
soup = BeautifulSoup(fh.read(), "html.parser")
|
||||
|
||||
# Read the meta tags
|
||||
assert soup.title is not None and soup.title.string is not None
|
||||
metadata: Dict[str, str] = {"title": soup.title.string}
|
||||
for meta in soup(["meta"]):
|
||||
for a in meta.attrs:
|
||||
if a in ["itemprop", "property", "name"]:
|
||||
metadata[meta[a]] = meta.get("content", "")
|
||||
break
|
||||
|
||||
# We can also try to read the full description. This is more prone to breaking, since it reaches into the page implementation
|
||||
try:
|
||||
for script in soup(["script"]):
|
||||
content = script.text
|
||||
if "ytInitialData" in content:
|
||||
lines = re.split(r"\r?\n", content)
|
||||
obj_start = lines[0].find("{")
|
||||
obj_end = lines[0].rfind("}")
|
||||
if obj_start >= 0 and obj_end >= 0:
|
||||
data = json.loads(lines[0][obj_start : obj_end + 1])
|
||||
attrdesc = self._findKey(data, "attributedDescriptionBodyText") # type: ignore
|
||||
if attrdesc:
|
||||
metadata["description"] = str(attrdesc["content"])
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Start preparing the page
|
||||
webpage_text = "# YouTube\n"
|
||||
|
||||
title = self._get(metadata, ["title", "og:title", "name"]) # type: ignore
|
||||
assert isinstance(title, str)
|
||||
|
||||
if title:
|
||||
webpage_text += f"\n## {title}\n"
|
||||
|
||||
stats = ""
|
||||
views = self._get(metadata, ["interactionCount"]) # type: ignore
|
||||
if views:
|
||||
stats += f"- **Views:** {views}\n"
|
||||
|
||||
keywords = self._get(metadata, ["keywords"]) # type: ignore
|
||||
if keywords:
|
||||
stats += f"- **Keywords:** {keywords}\n"
|
||||
|
||||
runtime = self._get(metadata, ["duration"]) # type: ignore
|
||||
if runtime:
|
||||
stats += f"- **Runtime:** {runtime}\n"
|
||||
|
||||
if len(stats) > 0:
|
||||
webpage_text += f"\n### Video Metadata\n{stats}\n"
|
||||
|
||||
description = self._get(metadata, ["description", "og:description"]) # type: ignore
|
||||
if description:
|
||||
webpage_text += f"\n### Description\n{description}\n"
|
||||
|
||||
if IS_YOUTUBE_TRANSCRIPT_CAPABLE:
|
||||
transcript_text = ""
|
||||
parsed_url = urlparse(url) # type: ignore
|
||||
params = parse_qs(parsed_url.query) # type: ignore
|
||||
if "v" in params:
|
||||
assert isinstance(params["v"][0], str)
|
||||
video_id = str(params["v"][0])
|
||||
try:
|
||||
youtube_transcript_languages = kwargs.get(
|
||||
"youtube_transcript_languages", ("en",)
|
||||
)
|
||||
# Must be a single transcript.
|
||||
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=youtube_transcript_languages) # type: ignore
|
||||
transcript_text = " ".join([part["text"] for part in transcript]) # type: ignore
|
||||
# Alternative formatting:
|
||||
# formatter = TextFormatter()
|
||||
# formatter.format_transcript(transcript)
|
||||
except Exception:
|
||||
pass
|
||||
if transcript_text:
|
||||
webpage_text += f"\n### Transcript\n{transcript_text}\n"
|
||||
|
||||
title = title if title else soup.title.string
|
||||
assert isinstance(title, str)
|
||||
|
||||
return DocumentConverterResult(
|
||||
title=title,
|
||||
text_content=webpage_text,
|
||||
)
|
||||
|
||||
def _get(
|
||||
self,
|
||||
metadata: Dict[str, str],
|
||||
keys: List[str],
|
||||
default: Union[str, None] = None,
|
||||
) -> Union[str, None]:
|
||||
for k in keys:
|
||||
if k in metadata:
|
||||
return metadata[k]
|
||||
return default
|
||||
|
||||
def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json type
|
||||
if isinstance(json, list):
|
||||
for elm in json:
|
||||
ret = self._findKey(elm, key)
|
||||
if ret is not None:
|
||||
return ret
|
||||
elif isinstance(json, dict):
|
||||
for k in json:
|
||||
if k == key:
|
||||
return json[k]
|
||||
else:
|
||||
ret = self._findKey(json[k], key)
|
||||
if ret is not None:
|
||||
return ret
|
||||
return None
|
||||
Loading…
Reference in a new issue