From 7a6a08b3a178e27388e4e8de8641acd5b5fc61d2 Mon Sep 17 00:00:00 2001 From: Adam Fourney Date: Sun, 9 Feb 2025 11:38:47 -0800 Subject: [PATCH] More converters. --- src/markitdown/_markitdown.py | 332 +----------------- src/markitdown/converters/__init__.py | 6 + src/markitdown/converters/_rss_converter.py | 144 ++++++++ .../converters/_wikipedia_converter.py | 57 +++ .../converters/_youtube_converter.py | 155 ++++++++ 5 files changed, 367 insertions(+), 327 deletions(-) create mode 100644 src/markitdown/converters/_rss_converter.py create mode 100644 src/markitdown/converters/_wikipedia_converter.py create mode 100644 src/markitdown/converters/_youtube_converter.py diff --git a/src/markitdown/_markitdown.py b/src/markitdown/_markitdown.py index 77363e3..6800a14 100644 --- a/src/markitdown/_markitdown.py +++ b/src/markitdown/_markitdown.py @@ -50,6 +50,9 @@ from .converters import ( DocumentConverterResult, PlainTextConverter, HtmlConverter, + RssConverter, + WikipediaConverter, + YouTubeConverter, ) from .converters._markdownify import _CustomMarkdownify @@ -88,332 +91,6 @@ except ModuleNotFoundError: finally: resetwarnings() -# Optional YouTube transcription support -try: - from youtube_transcript_api import YouTubeTranscriptApi - - IS_YOUTUBE_TRANSCRIPT_CAPABLE = True -except ModuleNotFoundError: - pass - - -class RSSConverter(DocumentConverter): - """Convert RSS / Atom type to markdown""" - - def convert( - self, local_path: str, **kwargs - ) -> Union[None, DocumentConverterResult]: - # Bail if not RSS type - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".xml", ".rss", ".atom"]: - return None - try: - doc = minidom.parse(local_path) - except BaseException as _: - return None - result = None - if doc.getElementsByTagName("rss"): - # A RSS feed must have a root element of - result = self._parse_rss_type(doc) - elif doc.getElementsByTagName("feed"): - root = doc.getElementsByTagName("feed")[0] - if root.getElementsByTagName("entry"): - # An Atom feed must have a root element of and at least one - result = self._parse_atom_type(doc) - else: - return None - else: - # not rss or atom - return None - - return result - - def _parse_atom_type( - self, doc: minidom.Document - ) -> Union[None, DocumentConverterResult]: - """Parse the type of an Atom feed. - - Returns None if the feed type is not recognized or something goes wrong. - """ - try: - root = doc.getElementsByTagName("feed")[0] - title = self._get_data_by_tag_name(root, "title") - subtitle = self._get_data_by_tag_name(root, "subtitle") - entries = root.getElementsByTagName("entry") - md_text = f"# {title}\n" - if subtitle: - md_text += f"{subtitle}\n" - for entry in entries: - entry_title = self._get_data_by_tag_name(entry, "title") - entry_summary = self._get_data_by_tag_name(entry, "summary") - entry_updated = self._get_data_by_tag_name(entry, "updated") - entry_content = self._get_data_by_tag_name(entry, "content") - - if entry_title: - md_text += f"\n## {entry_title}\n" - if entry_updated: - md_text += f"Updated on: {entry_updated}\n" - if entry_summary: - md_text += self._parse_content(entry_summary) - if entry_content: - md_text += self._parse_content(entry_content) - - return DocumentConverterResult( - title=title, - text_content=md_text, - ) - except BaseException as _: - return None - - def _parse_rss_type( - self, doc: minidom.Document - ) -> Union[None, DocumentConverterResult]: - """Parse the type of an RSS feed. - - Returns None if the feed type is not recognized or something goes wrong. - """ - try: - root = doc.getElementsByTagName("rss")[0] - channel = root.getElementsByTagName("channel") - if not channel: - return None - channel = channel[0] - channel_title = self._get_data_by_tag_name(channel, "title") - channel_description = self._get_data_by_tag_name(channel, "description") - items = channel.getElementsByTagName("item") - if channel_title: - md_text = f"# {channel_title}\n" - if channel_description: - md_text += f"{channel_description}\n" - if not items: - items = [] - for item in items: - title = self._get_data_by_tag_name(item, "title") - description = self._get_data_by_tag_name(item, "description") - pubDate = self._get_data_by_tag_name(item, "pubDate") - content = self._get_data_by_tag_name(item, "content:encoded") - - if title: - md_text += f"\n## {title}\n" - if pubDate: - md_text += f"Published on: {pubDate}\n" - if description: - md_text += self._parse_content(description) - if content: - md_text += self._parse_content(content) - - return DocumentConverterResult( - title=channel_title, - text_content=md_text, - ) - except BaseException as _: - print(traceback.format_exc()) - return None - - def _parse_content(self, content: str) -> str: - """Parse the content of an RSS feed item""" - try: - # using bs4 because many RSS feeds have HTML-styled content - soup = BeautifulSoup(content, "html.parser") - return _CustomMarkdownify().convert_soup(soup) - except BaseException as _: - return content - - def _get_data_by_tag_name( - self, element: minidom.Element, tag_name: str - ) -> Union[str, None]: - """Get data from first child element with the given tag name. - Returns None when no such element is found. - """ - nodes = element.getElementsByTagName(tag_name) - if not nodes: - return None - fc = nodes[0].firstChild - if fc: - return fc.data - return None - - -class WikipediaConverter(DocumentConverter): - """Handle Wikipedia pages separately, focusing only on the main document content.""" - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - # Bail if not Wikipedia - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".html", ".htm"]: - return None - url = kwargs.get("url", "") - if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url): - return None - - # Parse the file - soup = None - with open(local_path, "rt", encoding="utf-8") as fh: - soup = BeautifulSoup(fh.read(), "html.parser") - - # Remove javascript and style blocks - for script in soup(["script", "style"]): - script.extract() - - # Print only the main content - body_elm = soup.find("div", {"id": "mw-content-text"}) - title_elm = soup.find("span", {"class": "mw-page-title-main"}) - - webpage_text = "" - main_title = None if soup.title is None else soup.title.string - - if body_elm: - # What's the title - if title_elm and len(title_elm) > 0: - main_title = title_elm.string # type: ignore - assert isinstance(main_title, str) - - # Convert the page - webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup( - body_elm - ) - else: - webpage_text = _CustomMarkdownify().convert_soup(soup) - - return DocumentConverterResult( - title=main_title, - text_content=webpage_text, - ) - - -class YouTubeConverter(DocumentConverter): - """Handle YouTube specially, focusing on the video title, description, and transcript.""" - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - # Bail if not YouTube - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".html", ".htm"]: - return None - url = kwargs.get("url", "") - if not url.startswith("https://www.youtube.com/watch?"): - return None - - # Parse the file - soup = None - with open(local_path, "rt", encoding="utf-8") as fh: - soup = BeautifulSoup(fh.read(), "html.parser") - - # Read the meta tags - assert soup.title is not None and soup.title.string is not None - metadata: Dict[str, str] = {"title": soup.title.string} - for meta in soup(["meta"]): - for a in meta.attrs: - if a in ["itemprop", "property", "name"]: - metadata[meta[a]] = meta.get("content", "") - break - - # We can also try to read the full description. This is more prone to breaking, since it reaches into the page implementation - try: - for script in soup(["script"]): - content = script.text - if "ytInitialData" in content: - lines = re.split(r"\r?\n", content) - obj_start = lines[0].find("{") - obj_end = lines[0].rfind("}") - if obj_start >= 0 and obj_end >= 0: - data = json.loads(lines[0][obj_start : obj_end + 1]) - attrdesc = self._findKey(data, "attributedDescriptionBodyText") # type: ignore - if attrdesc: - metadata["description"] = str(attrdesc["content"]) - break - except Exception: - pass - - # Start preparing the page - webpage_text = "# YouTube\n" - - title = self._get(metadata, ["title", "og:title", "name"]) # type: ignore - assert isinstance(title, str) - - if title: - webpage_text += f"\n## {title}\n" - - stats = "" - views = self._get(metadata, ["interactionCount"]) # type: ignore - if views: - stats += f"- **Views:** {views}\n" - - keywords = self._get(metadata, ["keywords"]) # type: ignore - if keywords: - stats += f"- **Keywords:** {keywords}\n" - - runtime = self._get(metadata, ["duration"]) # type: ignore - if runtime: - stats += f"- **Runtime:** {runtime}\n" - - if len(stats) > 0: - webpage_text += f"\n### Video Metadata\n{stats}\n" - - description = self._get(metadata, ["description", "og:description"]) # type: ignore - if description: - webpage_text += f"\n### Description\n{description}\n" - - if IS_YOUTUBE_TRANSCRIPT_CAPABLE: - transcript_text = "" - parsed_url = urlparse(url) # type: ignore - params = parse_qs(parsed_url.query) # type: ignore - if "v" in params: - assert isinstance(params["v"][0], str) - video_id = str(params["v"][0]) - try: - youtube_transcript_languages = kwargs.get( - "youtube_transcript_languages", ("en",) - ) - # Must be a single transcript. - transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=youtube_transcript_languages) # type: ignore - transcript_text = " ".join([part["text"] for part in transcript]) # type: ignore - # Alternative formatting: - # formatter = TextFormatter() - # formatter.format_transcript(transcript) - except Exception: - pass - if transcript_text: - webpage_text += f"\n### Transcript\n{transcript_text}\n" - - title = title if title else soup.title.string - assert isinstance(title, str) - - return DocumentConverterResult( - title=title, - text_content=webpage_text, - ) - - def _get( - self, - metadata: Dict[str, str], - keys: List[str], - default: Union[str, None] = None, - ) -> Union[str, None]: - for k in keys: - if k in metadata: - return metadata[k] - return default - - def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json type - if isinstance(json, list): - for elm in json: - ret = self._findKey(elm, key) - if ret is not None: - return ret - elif isinstance(json, dict): - for k in json: - if k == key: - return json[k] - else: - ret = self._findKey(json[k], key) - if ret is not None: - return ret - return None - class IpynbConverter(DocumentConverter): """Converts Jupyter Notebook (.ipynb) files to Markdown.""" @@ -1369,8 +1046,9 @@ class MarkItDown: # To this end, the most specific converters should appear below the most generic converters self.register_page_converter(PlainTextConverter()) self.register_page_converter(HtmlConverter()) - self.register_page_converter(RSSConverter()) + self.register_page_converter(RssConverter()) self.register_page_converter(WikipediaConverter()) + self.register_page_converter(YouTubeConverter()) self.register_page_converter(BingSerpConverter()) self.register_page_converter(DocxConverter()) diff --git a/src/markitdown/converters/__init__.py b/src/markitdown/converters/__init__.py index 21a02d5..e169fa0 100644 --- a/src/markitdown/converters/__init__.py +++ b/src/markitdown/converters/__init__.py @@ -5,10 +5,16 @@ from ._base import DocumentConverter, DocumentConverterResult from ._plain_text_converter import PlainTextConverter from ._html_converter import HtmlConverter +from ._rss_converter import RssConverter +from ._wikipedia_converter import WikipediaConverter +from ._youtube_converter import YouTubeConverter __all__ = [ "DocumentConverter", "DocumentConverterResult", "PlainTextConverter", "HtmlConverter", + "RssConverter", + "WikipediaConverter", + "YouTubeConverter", ] diff --git a/src/markitdown/converters/_rss_converter.py b/src/markitdown/converters/_rss_converter.py new file mode 100644 index 0000000..bf0d7c8 --- /dev/null +++ b/src/markitdown/converters/_rss_converter.py @@ -0,0 +1,144 @@ +# type: ignore +from xml.dom import minidom +from typing import Any, Dict, List, Optional, Union +from bs4 import BeautifulSoup + +from ._markdownify import _CustomMarkdownify +from ._base import DocumentConverter, DocumentConverterResult + + +class RssConverter(DocumentConverter): + """Convert RSS / Atom type to markdown""" + + def convert( + self, local_path: str, **kwargs + ) -> Union[None, DocumentConverterResult]: + # Bail if not RSS type + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".xml", ".rss", ".atom"]: + return None + try: + doc = minidom.parse(local_path) + except BaseException as _: + return None + result = None + if doc.getElementsByTagName("rss"): + # A RSS feed must have a root element of + result = self._parse_rss_type(doc) + elif doc.getElementsByTagName("feed"): + root = doc.getElementsByTagName("feed")[0] + if root.getElementsByTagName("entry"): + # An Atom feed must have a root element of and at least one + result = self._parse_atom_type(doc) + else: + return None + else: + # not rss or atom + return None + + return result + + def _parse_atom_type( + self, doc: minidom.Document + ) -> Union[None, DocumentConverterResult]: + """Parse the type of an Atom feed. + + Returns None if the feed type is not recognized or something goes wrong. + """ + try: + root = doc.getElementsByTagName("feed")[0] + title = self._get_data_by_tag_name(root, "title") + subtitle = self._get_data_by_tag_name(root, "subtitle") + entries = root.getElementsByTagName("entry") + md_text = f"# {title}\n" + if subtitle: + md_text += f"{subtitle}\n" + for entry in entries: + entry_title = self._get_data_by_tag_name(entry, "title") + entry_summary = self._get_data_by_tag_name(entry, "summary") + entry_updated = self._get_data_by_tag_name(entry, "updated") + entry_content = self._get_data_by_tag_name(entry, "content") + + if entry_title: + md_text += f"\n## {entry_title}\n" + if entry_updated: + md_text += f"Updated on: {entry_updated}\n" + if entry_summary: + md_text += self._parse_content(entry_summary) + if entry_content: + md_text += self._parse_content(entry_content) + + return DocumentConverterResult( + title=title, + text_content=md_text, + ) + except BaseException as _: + return None + + def _parse_rss_type( + self, doc: minidom.Document + ) -> Union[None, DocumentConverterResult]: + """Parse the type of an RSS feed. + + Returns None if the feed type is not recognized or something goes wrong. + """ + try: + root = doc.getElementsByTagName("rss")[0] + channel = root.getElementsByTagName("channel") + if not channel: + return None + channel = channel[0] + channel_title = self._get_data_by_tag_name(channel, "title") + channel_description = self._get_data_by_tag_name(channel, "description") + items = channel.getElementsByTagName("item") + if channel_title: + md_text = f"# {channel_title}\n" + if channel_description: + md_text += f"{channel_description}\n" + if not items: + items = [] + for item in items: + title = self._get_data_by_tag_name(item, "title") + description = self._get_data_by_tag_name(item, "description") + pubDate = self._get_data_by_tag_name(item, "pubDate") + content = self._get_data_by_tag_name(item, "content:encoded") + + if title: + md_text += f"\n## {title}\n" + if pubDate: + md_text += f"Published on: {pubDate}\n" + if description: + md_text += self._parse_content(description) + if content: + md_text += self._parse_content(content) + + return DocumentConverterResult( + title=channel_title, + text_content=md_text, + ) + except BaseException as _: + print(traceback.format_exc()) + return None + + def _parse_content(self, content: str) -> str: + """Parse the content of an RSS feed item""" + try: + # using bs4 because many RSS feeds have HTML-styled content + soup = BeautifulSoup(content, "html.parser") + return _CustomMarkdownify().convert_soup(soup) + except BaseException as _: + return content + + def _get_data_by_tag_name( + self, element: minidom.Element, tag_name: str + ) -> Union[str, None]: + """Get data from first child element with the given tag name. + Returns None when no such element is found. + """ + nodes = element.getElementsByTagName(tag_name) + if not nodes: + return None + fc = nodes[0].firstChild + if fc: + return fc.data + return None diff --git a/src/markitdown/converters/_wikipedia_converter.py b/src/markitdown/converters/_wikipedia_converter.py new file mode 100644 index 0000000..729171c --- /dev/null +++ b/src/markitdown/converters/_wikipedia_converter.py @@ -0,0 +1,57 @@ +import re + +from typing import Any, Union +from urllib.parse import parse_qs, quote, unquote, urlparse, urlunparse +from bs4 import BeautifulSoup + +from ._base import DocumentConverter, DocumentConverterResult +from ._markdownify import _CustomMarkdownify + + +class WikipediaConverter(DocumentConverter): + """Handle Wikipedia pages separately, focusing only on the main document content.""" + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Bail if not Wikipedia + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + url = kwargs.get("url", "") + if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url): + return None + + # Parse the file + soup = None + with open(local_path, "rt", encoding="utf-8") as fh: + soup = BeautifulSoup(fh.read(), "html.parser") + + # Remove javascript and style blocks + for script in soup(["script", "style"]): + script.extract() + + # Print only the main content + body_elm = soup.find("div", {"id": "mw-content-text"}) + title_elm = soup.find("span", {"class": "mw-page-title-main"}) + + webpage_text = "" + main_title = None if soup.title is None else soup.title.string + + if body_elm: + # What's the title + if title_elm and len(title_elm) > 0: + main_title = title_elm.string # type: ignore + assert isinstance(main_title, str) + + # Convert the page + webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup( + body_elm + ) + else: + webpage_text = _CustomMarkdownify().convert_soup(soup) + + return DocumentConverterResult( + title=main_title, + text_content=webpage_text, + ) diff --git a/src/markitdown/converters/_youtube_converter.py b/src/markitdown/converters/_youtube_converter.py new file mode 100644 index 0000000..88d4017 --- /dev/null +++ b/src/markitdown/converters/_youtube_converter.py @@ -0,0 +1,155 @@ +import re + +from typing import Any, Union, Dict, List +from urllib.parse import parse_qs, quote, unquote, urlparse, urlunparse +from bs4 import BeautifulSoup + +from ._base import DocumentConverter, DocumentConverterResult +from ._markdownify import _CustomMarkdownify + +from .._exceptions import ( + MarkItDownException, + ConverterPrerequisiteException, + FileConversionException, + UnsupportedFormatException, +) + +# Optional YouTube transcription support +try: + from youtube_transcript_api import YouTubeTranscriptApi + + IS_YOUTUBE_TRANSCRIPT_CAPABLE = True +except ModuleNotFoundError: + pass + + +class YouTubeConverter(DocumentConverter): + """Handle YouTube specially, focusing on the video title, description, and transcript.""" + + def convert( + self, local_path: str, **kwargs: Any + ) -> Union[None, DocumentConverterResult]: + # Bail if not YouTube + extension = kwargs.get("file_extension", "") + if extension.lower() not in [".html", ".htm"]: + return None + url = kwargs.get("url", "") + if not url.startswith("https://www.youtube.com/watch?"): + return None + + # Parse the file + soup = None + with open(local_path, "rt", encoding="utf-8") as fh: + soup = BeautifulSoup(fh.read(), "html.parser") + + # Read the meta tags + assert soup.title is not None and soup.title.string is not None + metadata: Dict[str, str] = {"title": soup.title.string} + for meta in soup(["meta"]): + for a in meta.attrs: + if a in ["itemprop", "property", "name"]: + metadata[meta[a]] = meta.get("content", "") + break + + # We can also try to read the full description. This is more prone to breaking, since it reaches into the page implementation + try: + for script in soup(["script"]): + content = script.text + if "ytInitialData" in content: + lines = re.split(r"\r?\n", content) + obj_start = lines[0].find("{") + obj_end = lines[0].rfind("}") + if obj_start >= 0 and obj_end >= 0: + data = json.loads(lines[0][obj_start : obj_end + 1]) + attrdesc = self._findKey(data, "attributedDescriptionBodyText") # type: ignore + if attrdesc: + metadata["description"] = str(attrdesc["content"]) + break + except Exception: + pass + + # Start preparing the page + webpage_text = "# YouTube\n" + + title = self._get(metadata, ["title", "og:title", "name"]) # type: ignore + assert isinstance(title, str) + + if title: + webpage_text += f"\n## {title}\n" + + stats = "" + views = self._get(metadata, ["interactionCount"]) # type: ignore + if views: + stats += f"- **Views:** {views}\n" + + keywords = self._get(metadata, ["keywords"]) # type: ignore + if keywords: + stats += f"- **Keywords:** {keywords}\n" + + runtime = self._get(metadata, ["duration"]) # type: ignore + if runtime: + stats += f"- **Runtime:** {runtime}\n" + + if len(stats) > 0: + webpage_text += f"\n### Video Metadata\n{stats}\n" + + description = self._get(metadata, ["description", "og:description"]) # type: ignore + if description: + webpage_text += f"\n### Description\n{description}\n" + + if IS_YOUTUBE_TRANSCRIPT_CAPABLE: + transcript_text = "" + parsed_url = urlparse(url) # type: ignore + params = parse_qs(parsed_url.query) # type: ignore + if "v" in params: + assert isinstance(params["v"][0], str) + video_id = str(params["v"][0]) + try: + youtube_transcript_languages = kwargs.get( + "youtube_transcript_languages", ("en",) + ) + # Must be a single transcript. + transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=youtube_transcript_languages) # type: ignore + transcript_text = " ".join([part["text"] for part in transcript]) # type: ignore + # Alternative formatting: + # formatter = TextFormatter() + # formatter.format_transcript(transcript) + except Exception: + pass + if transcript_text: + webpage_text += f"\n### Transcript\n{transcript_text}\n" + + title = title if title else soup.title.string + assert isinstance(title, str) + + return DocumentConverterResult( + title=title, + text_content=webpage_text, + ) + + def _get( + self, + metadata: Dict[str, str], + keys: List[str], + default: Union[str, None] = None, + ) -> Union[str, None]: + for k in keys: + if k in metadata: + return metadata[k] + return default + + def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json type + if isinstance(json, list): + for elm in json: + ret = self._findKey(elm, key) + if ret is not None: + return ret + elif isinstance(json, dict): + for k in json: + if k == key: + return json[k] + else: + ret = self._findKey(json[k], key) + if ret is not None: + return ret + return None