diff --git a/packages/markitdown/src/markitdown/converters/_xlsx_converter.py b/packages/markitdown/src/markitdown/converters/_xlsx_converter.py index 28f73a0..e6632f6 100644 --- a/packages/markitdown/src/markitdown/converters/_xlsx_converter.py +++ b/packages/markitdown/src/markitdown/converters/_xlsx_converter.py @@ -33,15 +33,68 @@ ACCEPTED_XLS_MIME_TYPE_PREFIXES = [ ACCEPTED_XLS_FILE_EXTENSIONS = [".xls"] -class XlsxConverter(DocumentConverter): - """ - Converts XLSX files to Markdown, with each sheet presented as a separate Markdown table. - """ +class ExcelConverterBase(DocumentConverter): + """Base class for Excel-like converters""" def __init__(self): super().__init__() self._html_converter = HtmlConverter() + def _clean_colname(self, colname: Any) -> Any: + # Remove Pandas header placeholders + if isinstance(colname, str) and colname.startswith("Unnamed:"): + return None + return colname + + def _convert_excel( + self, + file_stream: BinaryIO, + stream_info: StreamInfo, + engine: str, + na_rep: Any = "", + remove_header_placeholders: bool = True, + drop_empty_cols: bool = False, + drop_empty_rows: bool = False, + **kwargs: Any, + ) -> DocumentConverterResult: + sheets = pd.read_excel(file_stream, sheet_name=None, engine=engine) + md_content = "" + for name, sheet in sheets.items(): + md_content += f"## {name}\n" + + if remove_header_placeholders: + sheet = sheet.rename(columns=lambda col: self._clean_colname(col)) + + if drop_empty_cols: + # Also consider headers to be part of the column + sheet = sheet.loc[:, sheet.notna().any() | sheet.columns.notna()] + + if drop_empty_rows: + sheet = sheet.dropna(axis=0, how="all") + + # Coerce any cell that evaluates to `pd.isna(c) == True` to `na_rep` + # More reliable than using `.to_html(na_rep=...)`: https://github.com/pandas-dev/pandas/issues/11953 + # Because the latter does not replace NaT's + with pd.option_context("future.no_silent_downcasting", True): + sheet = sheet.fillna(na_rep, axis=1).infer_objects(copy=False) + sheet.columns = sheet.columns.fillna(na_rep) + + html_content = sheet.to_html(index=False, na_rep=na_rep) + md_content += ( + self._html_converter.convert_string( + html_content, **kwargs + ).markdown.strip() + + "\n\n" + ) + + return DocumentConverterResult(markdown=md_content.strip()) + + +class XlsxConverter(ExcelConverterBase): + """ + Converts XLSX files to Markdown, with each sheet presented as a separate Markdown table. + """ + def accepts( self, file_stream: BinaryIO, @@ -80,30 +133,19 @@ class XlsxConverter(DocumentConverter): _xlsx_dependency_exc_info[2] ) - sheets = pd.read_excel(file_stream, sheet_name=None, engine="openpyxl") - md_content = "" - for s in sheets: - md_content += f"## {s}\n" - html_content = sheets[s].to_html(index=False) - md_content += ( - self._html_converter.convert_string( - html_content, **kwargs - ).markdown.strip() - + "\n\n" - ) - - return DocumentConverterResult(markdown=md_content.strip()) + return self._convert_excel( + file_stream=file_stream, + stream_info=stream_info, + engine="openpyxl", + **kwargs, + ) -class XlsConverter(DocumentConverter): +class XlsConverter(ExcelConverterBase): """ Converts XLS files to Markdown, with each sheet presented as a separate Markdown table. """ - def __init__(self): - super().__init__() - self._html_converter = HtmlConverter() - def accepts( self, file_stream: BinaryIO, @@ -142,16 +184,9 @@ class XlsConverter(DocumentConverter): _xls_dependency_exc_info[2] ) - sheets = pd.read_excel(file_stream, sheet_name=None, engine="xlrd") - md_content = "" - for s in sheets: - md_content += f"## {s}\n" - html_content = sheets[s].to_html(index=False) - md_content += ( - self._html_converter.convert_string( - html_content, **kwargs - ).markdown.strip() - + "\n\n" - ) - - return DocumentConverterResult(markdown=md_content.strip()) + return self._convert_excel( + file_stream=file_stream, + stream_info=stream_info, + engine="xlrd", + **kwargs, + ) diff --git a/packages/markitdown/tests/_test_vectors.py b/packages/markitdown/tests/_test_vectors.py index 4a7b54a..e2187a5 100644 --- a/packages/markitdown/tests/_test_vectors.py +++ b/packages/markitdown/tests/_test_vectors.py @@ -41,7 +41,7 @@ GENERAL_TEST_VECTORS = [ "6ff4173b-42a5-4784-9b19-f49caff4d93d", "affc7dad-52dc-4b98-9b5d-51e65d8a8ad0", ], - must_not_include=[], + must_not_include=["Unnamed:", "NaN"], ), FileTestVector( filename="test.xls", @@ -53,7 +53,7 @@ GENERAL_TEST_VECTORS = [ "6ff4173b-42a5-4784-9b19-f49caff4d93d", "affc7dad-52dc-4b98-9b5d-51e65d8a8ad0", ], - must_not_include=[], + must_not_include=["Unnamed:", "NaN"], ), FileTestVector( filename="test.pptx", diff --git a/src/markitdown/_markitdown.py b/src/markitdown/_markitdown.py deleted file mode 100644 index 9ca5d67..0000000 --- a/src/markitdown/_markitdown.py +++ /dev/null @@ -1,1549 +0,0 @@ -# type: ignore -import base64 -import binascii -import copy -import html -import json -import mimetypes -import os -import re -import shutil -import subprocess -import sys -import tempfile -import traceback -import zipfile -from xml.dom import minidom -from typing import Any, Dict, List, Optional, Union -from pathlib import Path -from urllib.parse import parse_qs, quote, unquote, urlparse, urlunparse -from warnings import warn, resetwarnings, catch_warnings - -import mammoth -import markdownify -import pandas as pd -import pdfminer -import pdfminer.high_level -import pptx - -# File-format detection -import puremagic -import requests -from bs4 import BeautifulSoup -from charset_normalizer import from_path - -# Optional Transcription support -try: - # Using warnings' catch_warnings to catch - # pydub's warning of ffmpeg or avconv missing - with catch_warnings(record=True) as w: - import pydub - - if w: - raise ModuleNotFoundError - import speech_recognition as sr - - IS_AUDIO_TRANSCRIPTION_CAPABLE = True -except ModuleNotFoundError: - pass -finally: - resetwarnings() - -# Optional YouTube transcription support -try: - from youtube_transcript_api import YouTubeTranscriptApi - - IS_YOUTUBE_TRANSCRIPT_CAPABLE = True -except ModuleNotFoundError: - pass - - -class _CustomMarkdownify(markdownify.MarkdownConverter): - """ - A custom version of markdownify's MarkdownConverter. Changes include: - - - Altering the default heading style to use '#', '##', etc. - - Removing javascript hyperlinks. - - Truncating images with large data:uri sources. - - Ensuring URIs are properly escaped, and do not conflict with Markdown syntax - """ - - def __init__(self, **options: Any): - options["heading_style"] = options.get("heading_style", markdownify.ATX) - # Explicitly cast options to the expected type if necessary - super().__init__(**options) - - def convert_hn(self, n: int, el: Any, text: str, convert_as_inline: bool) -> str: - """Same as usual, but be sure to start with a new line""" - if not convert_as_inline: - if not re.search(r"^\n", text): - return "\n" + super().convert_hn(n, el, text, convert_as_inline) # type: ignore - - return super().convert_hn(n, el, text, convert_as_inline) # type: ignore - - def convert_a(self, el: Any, text: str, convert_as_inline: bool): - """Same as usual converter, but removes Javascript links and escapes URIs.""" - prefix, suffix, text = markdownify.chomp(text) # type: ignore - if not text: - return "" - href = el.get("href") - title = el.get("title") - - # Escape URIs and skip non-http or file schemes - if href: - try: - parsed_url = urlparse(href) # type: ignore - if parsed_url.scheme and parsed_url.scheme.lower() not in [ - "http", - "https", - "file", - ]: # type: ignore - return "%s%s%s" % (prefix, text, suffix) - href = urlunparse( - parsed_url._replace(path=quote(unquote(parsed_url.path))) - ) # type: ignore - except ValueError: # It's not clear if this ever gets thrown - return "%s%s%s" % (prefix, text, suffix) - - # For the replacement see #29: text nodes underscores are escaped - if ( - self.options["autolinks"] - and text.replace(r"\_", "_") == href - and not title - and not self.options["default_title"] - ): - # Shortcut syntax - return "<%s>" % href - if self.options["default_title"] and not title: - title = href - title_part = ' "%s"' % title.replace('"', r"\"") if title else "" - return ( - "%s[%s](%s%s)%s" % (prefix, text, href, title_part, suffix) - if href - else text - ) - - def convert_img(self, el: Any, text: str, convert_as_inline: bool) -> str: - """Same as usual converter, but removes data URIs""" - - alt = el.attrs.get("alt", None) or "" - src = el.attrs.get("src", None) or "" - title = el.attrs.get("title", None) or "" - title_part = ' "%s"' % title.replace('"', r"\"") if title else "" - if ( - convert_as_inline - and el.parent.name not in self.options["keep_inline_images_in"] - ): - return alt - - # Remove dataURIs - if src.startswith("data:"): - src = src.split(",")[0] + "..." - - return "![%s](%s%s)" % (alt, src, title_part) - - def convert_soup(self, soup: Any) -> str: - return super().convert_soup(soup) # type: ignore - - -class DocumentConverterResult: - """The result of converting a document to text.""" - - def __init__(self, title: Union[str, None] = None, text_content: str = ""): - self.title: Union[str, None] = title - self.text_content: str = text_content - - -class DocumentConverter: - """Abstract superclass of all DocumentConverters.""" - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - raise NotImplementedError() - - -class PlainTextConverter(DocumentConverter): - """Anything with content type text/plain""" - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - # Guess the content type from any file extension that might be around - content_type, _ = mimetypes.guess_type( - "__placeholder" + kwargs.get("file_extension", "") - ) - - # Only accept text files - if content_type is None: - return None - elif "text/" not in content_type.lower(): - return None - - text_content = str(from_path(local_path).best()) - return DocumentConverterResult( - title=None, - text_content=text_content, - ) - - -class HtmlConverter(DocumentConverter): - """Anything with content type text/html""" - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - # Bail if not html - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".html", ".htm"]: - return None - - result = None - with open(local_path, "rt", encoding="utf-8") as fh: - result = self._convert(fh.read()) - - return result - - def _convert(self, html_content: str) -> Union[None, DocumentConverterResult]: - """Helper function that converts and HTML string.""" - - # Parse the string - soup = BeautifulSoup(html_content, "html.parser") - - # Remove javascript and style blocks - for script in soup(["script", "style"]): - script.extract() - - # Print only the main content - body_elm = soup.find("body") - webpage_text = "" - if body_elm: - webpage_text = _CustomMarkdownify().convert_soup(body_elm) - else: - webpage_text = _CustomMarkdownify().convert_soup(soup) - - assert isinstance(webpage_text, str) - - return DocumentConverterResult( - title=None if soup.title is None else soup.title.string, - text_content=webpage_text, - ) - - -class RSSConverter(DocumentConverter): - """Convert RSS / Atom type to markdown""" - - def convert( - self, local_path: str, **kwargs - ) -> Union[None, DocumentConverterResult]: - # Bail if not RSS type - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".xml", ".rss", ".atom"]: - return None - try: - doc = minidom.parse(local_path) - except BaseException as _: - return None - result = None - if doc.getElementsByTagName("rss"): - # A RSS feed must have a root element of - result = self._parse_rss_type(doc) - elif doc.getElementsByTagName("feed"): - root = doc.getElementsByTagName("feed")[0] - if root.getElementsByTagName("entry"): - # An Atom feed must have a root element of and at least one - result = self._parse_atom_type(doc) - else: - return None - else: - # not rss or atom - return None - - return result - - def _parse_atom_type( - self, doc: minidom.Document - ) -> Union[None, DocumentConverterResult]: - """Parse the type of an Atom feed. - - Returns None if the feed type is not recognized or something goes wrong. - """ - try: - root = doc.getElementsByTagName("feed")[0] - title = self._get_data_by_tag_name(root, "title") - subtitle = self._get_data_by_tag_name(root, "subtitle") - entries = root.getElementsByTagName("entry") - md_text = f"# {title}\n" - if subtitle: - md_text += f"{subtitle}\n" - for entry in entries: - entry_title = self._get_data_by_tag_name(entry, "title") - entry_summary = self._get_data_by_tag_name(entry, "summary") - entry_updated = self._get_data_by_tag_name(entry, "updated") - entry_content = self._get_data_by_tag_name(entry, "content") - - if entry_title: - md_text += f"\n## {entry_title}\n" - if entry_updated: - md_text += f"Updated on: {entry_updated}\n" - if entry_summary: - md_text += self._parse_content(entry_summary) - if entry_content: - md_text += self._parse_content(entry_content) - - return DocumentConverterResult( - title=title, - text_content=md_text, - ) - except BaseException as _: - return None - - def _parse_rss_type( - self, doc: minidom.Document - ) -> Union[None, DocumentConverterResult]: - """Parse the type of an RSS feed. - - Returns None if the feed type is not recognized or something goes wrong. - """ - try: - root = doc.getElementsByTagName("rss")[0] - channel = root.getElementsByTagName("channel") - if not channel: - return None - channel = channel[0] - channel_title = self._get_data_by_tag_name(channel, "title") - channel_description = self._get_data_by_tag_name(channel, "description") - items = channel.getElementsByTagName("item") - if channel_title: - md_text = f"# {channel_title}\n" - if channel_description: - md_text += f"{channel_description}\n" - if not items: - items = [] - for item in items: - title = self._get_data_by_tag_name(item, "title") - description = self._get_data_by_tag_name(item, "description") - pubDate = self._get_data_by_tag_name(item, "pubDate") - content = self._get_data_by_tag_name(item, "content:encoded") - - if title: - md_text += f"\n## {title}\n" - if pubDate: - md_text += f"Published on: {pubDate}\n" - if description: - md_text += self._parse_content(description) - if content: - md_text += self._parse_content(content) - - return DocumentConverterResult( - title=channel_title, - text_content=md_text, - ) - except BaseException as _: - print(traceback.format_exc()) - return None - - def _parse_content(self, content: str) -> str: - """Parse the content of an RSS feed item""" - try: - # using bs4 because many RSS feeds have HTML-styled content - soup = BeautifulSoup(content, "html.parser") - return _CustomMarkdownify().convert_soup(soup) - except BaseException as _: - return content - - def _get_data_by_tag_name( - self, element: minidom.Element, tag_name: str - ) -> Union[str, None]: - """Get data from first child element with the given tag name. - Returns None when no such element is found. - """ - nodes = element.getElementsByTagName(tag_name) - if not nodes: - return None - fc = nodes[0].firstChild - if fc: - return fc.data - return None - - -class WikipediaConverter(DocumentConverter): - """Handle Wikipedia pages separately, focusing only on the main document content.""" - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - # Bail if not Wikipedia - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".html", ".htm"]: - return None - url = kwargs.get("url", "") - if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url): - return None - - # Parse the file - soup = None - with open(local_path, "rt", encoding="utf-8") as fh: - soup = BeautifulSoup(fh.read(), "html.parser") - - # Remove javascript and style blocks - for script in soup(["script", "style"]): - script.extract() - - # Print only the main content - body_elm = soup.find("div", {"id": "mw-content-text"}) - title_elm = soup.find("span", {"class": "mw-page-title-main"}) - - webpage_text = "" - main_title = None if soup.title is None else soup.title.string - - if body_elm: - # What's the title - if title_elm and len(title_elm) > 0: - main_title = title_elm.string # type: ignore - assert isinstance(main_title, str) - - # Convert the page - webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup( - body_elm - ) - else: - webpage_text = _CustomMarkdownify().convert_soup(soup) - - return DocumentConverterResult( - title=main_title, - text_content=webpage_text, - ) - - -class YouTubeConverter(DocumentConverter): - """Handle YouTube specially, focusing on the video title, description, and transcript.""" - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - # Bail if not YouTube - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".html", ".htm"]: - return None - url = kwargs.get("url", "") - if not url.startswith("https://www.youtube.com/watch?"): - return None - - # Parse the file - soup = None - with open(local_path, "rt", encoding="utf-8") as fh: - soup = BeautifulSoup(fh.read(), "html.parser") - - # Read the meta tags - assert soup.title is not None and soup.title.string is not None - metadata: Dict[str, str] = {"title": soup.title.string} - for meta in soup(["meta"]): - for a in meta.attrs: - if a in ["itemprop", "property", "name"]: - metadata[meta[a]] = meta.get("content", "") - break - - # We can also try to read the full description. This is more prone to breaking, since it reaches into the page implementation - try: - for script in soup(["script"]): - content = script.text - if "ytInitialData" in content: - lines = re.split(r"\r?\n", content) - obj_start = lines[0].find("{") - obj_end = lines[0].rfind("}") - if obj_start >= 0 and obj_end >= 0: - data = json.loads(lines[0][obj_start : obj_end + 1]) - attrdesc = self._findKey(data, "attributedDescriptionBodyText") # type: ignore - if attrdesc: - metadata["description"] = str(attrdesc["content"]) - break - except Exception: - pass - - # Start preparing the page - webpage_text = "# YouTube\n" - - title = self._get(metadata, ["title", "og:title", "name"]) # type: ignore - assert isinstance(title, str) - - if title: - webpage_text += f"\n## {title}\n" - - stats = "" - views = self._get(metadata, ["interactionCount"]) # type: ignore - if views: - stats += f"- **Views:** {views}\n" - - keywords = self._get(metadata, ["keywords"]) # type: ignore - if keywords: - stats += f"- **Keywords:** {keywords}\n" - - runtime = self._get(metadata, ["duration"]) # type: ignore - if runtime: - stats += f"- **Runtime:** {runtime}\n" - - if len(stats) > 0: - webpage_text += f"\n### Video Metadata\n{stats}\n" - - description = self._get(metadata, ["description", "og:description"]) # type: ignore - if description: - webpage_text += f"\n### Description\n{description}\n" - - if IS_YOUTUBE_TRANSCRIPT_CAPABLE: - transcript_text = "" - parsed_url = urlparse(url) # type: ignore - params = parse_qs(parsed_url.query) # type: ignore - if "v" in params: - assert isinstance(params["v"][0], str) - video_id = str(params["v"][0]) - try: - youtube_transcript_languages = kwargs.get( - "youtube_transcript_languages", ("en",) - ) - # Must be a single transcript. - transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=youtube_transcript_languages) # type: ignore - transcript_text = " ".join([part["text"] for part in transcript]) # type: ignore - # Alternative formatting: - # formatter = TextFormatter() - # formatter.format_transcript(transcript) - except Exception: - pass - if transcript_text: - webpage_text += f"\n### Transcript\n{transcript_text}\n" - - title = title if title else soup.title.string - assert isinstance(title, str) - - return DocumentConverterResult( - title=title, - text_content=webpage_text, - ) - - def _get( - self, - metadata: Dict[str, str], - keys: List[str], - default: Union[str, None] = None, - ) -> Union[str, None]: - for k in keys: - if k in metadata: - return metadata[k] - return default - - def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json type - if isinstance(json, list): - for elm in json: - ret = self._findKey(elm, key) - if ret is not None: - return ret - elif isinstance(json, dict): - for k in json: - if k == key: - return json[k] - else: - ret = self._findKey(json[k], key) - if ret is not None: - return ret - return None - - -class IpynbConverter(DocumentConverter): - """Converts Jupyter Notebook (.ipynb) files to Markdown.""" - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - # Bail if not ipynb - extension = kwargs.get("file_extension", "") - if extension.lower() != ".ipynb": - return None - - # Parse and convert the notebook - result = None - with open(local_path, "rt", encoding="utf-8") as fh: - notebook_content = json.load(fh) - result = self._convert(notebook_content) - - return result - - def _convert(self, notebook_content: dict) -> Union[None, DocumentConverterResult]: - """Helper function that converts notebook JSON content to Markdown.""" - try: - md_output = [] - title = None - - for cell in notebook_content.get("cells", []): - cell_type = cell.get("cell_type", "") - source_lines = cell.get("source", []) - - if cell_type == "markdown": - md_output.append("".join(source_lines)) - - # Extract the first # heading as title if not already found - if title is None: - for line in source_lines: - if line.startswith("# "): - title = line.lstrip("# ").strip() - break - - elif cell_type == "code": - # Code cells are wrapped in Markdown code blocks - md_output.append(f"```python\n{''.join(source_lines)}\n```") - elif cell_type == "raw": - md_output.append(f"```\n{''.join(source_lines)}\n```") - - md_text = "\n\n".join(md_output) - - # Check for title in notebook metadata - title = notebook_content.get("metadata", {}).get("title", title) - - return DocumentConverterResult( - title=title, - text_content=md_text, - ) - - except Exception as e: - raise FileConversionException( - f"Error converting .ipynb file: {str(e)}" - ) from e - - -class BingSerpConverter(DocumentConverter): - """ - Handle Bing results pages (only the organic search results). - NOTE: It is better to use the Bing API - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not a Bing SERP - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".html", ".htm"]: - return None - url = kwargs.get("url", "") - if not re.search(r"^https://www\.bing\.com/search\?q=", url): - return None - - # Parse the query parameters - parsed_params = parse_qs(urlparse(url).query) - query = parsed_params.get("q", [""])[0] - - # Parse the file - soup = None - with open(local_path, "rt", encoding="utf-8") as fh: - soup = BeautifulSoup(fh.read(), "html.parser") - - # Clean up some formatting - for tptt in soup.find_all(class_="tptt"): - if hasattr(tptt, "string") and tptt.string: - tptt.string += " " - for slug in soup.find_all(class_="algoSlug_icon"): - slug.extract() - - # Parse the algorithmic results - _markdownify = _CustomMarkdownify() - results = list() - for result in soup.find_all(class_="b_algo"): - # Rewrite redirect urls - for a in result.find_all("a", href=True): - parsed_href = urlparse(a["href"]) - qs = parse_qs(parsed_href.query) - - # The destination is contained in the u parameter, - # but appears to be base64 encoded, with some prefix - if "u" in qs: - u = ( - qs["u"][0][2:].strip() + "==" - ) # Python 3 doesn't care about extra padding - - try: - # RFC 4648 / Base64URL" variant, which uses "-" and "_" - a["href"] = base64.b64decode(u, altchars="-_").decode("utf-8") - except UnicodeDecodeError: - pass - except binascii.Error: - pass - - # Convert to markdown - md_result = _markdownify.convert_soup(result).strip() - lines = [line.strip() for line in re.split(r"\n+", md_result)] - results.append("\n".join([line for line in lines if len(line) > 0])) - - webpage_text = ( - f"## A Bing search for '{query}' found the following results:\n\n" - + "\n\n".join(results) - ) - - return DocumentConverterResult( - title=None if soup.title is None else soup.title.string, - text_content=webpage_text, - ) - - -class PdfConverter(DocumentConverter): - """ - Converts PDFs to Markdown. Most style information is ignored, so the results are essentially plain-text. - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not a PDF - extension = kwargs.get("file_extension", "") - if extension.lower() != ".pdf": - return None - - return DocumentConverterResult( - title=None, - text_content=pdfminer.high_level.extract_text(local_path), - ) - - -class DocxConverter(HtmlConverter): - """ - Converts DOCX files to Markdown. Style information (e.g.m headings) and tables are preserved where possible. - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not a DOCX - extension = kwargs.get("file_extension", "") - if extension.lower() != ".docx": - return None - - result = None - with open(local_path, "rb") as docx_file: - style_map = kwargs.get("style_map", None) - - result = mammoth.convert_to_html(docx_file, style_map=style_map) - html_content = result.value - result = self._convert(html_content) - - return result - - -class XlsxConverter(HtmlConverter): - """ - Converts XLSX files to Markdown, with each sheet presented as a separate Markdown table. - """ - - def _clean_colname(self, colname: Any) -> Any: - # Remove Pandas header placeholders - if isinstance(colname, str) and colname.startswith("Unnamed:"): - return None - return colname - - def convert( - self, - local_path, - na_rep: Any = "", - drop_empty_cols: bool = False, - drop_empty_rows: bool = False, - **kwargs, - ) -> Union[None, DocumentConverterResult]: - # Bail if not a XLSX - extension = kwargs.get("file_extension", "") - if extension.lower() != ".xlsx": - return None - - sheets = pd.read_excel(local_path, sheet_name=None) - md_content = "" - for name, sheet in sheets.items(): - md_content += f"## {name}\n" - sheet = sheet.rename(columns=lambda col: self._clean_colname(col)) - - if drop_empty_cols: - # also consider headers to be part of the column - sheet = sheet.loc[:, sheet.notna().any() | sheet.columns.notna()] - - if drop_empty_rows: - sheet = sheet.dropna(axis=0, how="all") - - # convert remaining NaN's to empty string - # because .to_html(na_rep="") does not apply to headers - sheet.columns = sheet.columns.fillna(na_rep) - - html_content = sheet.to_html(index=False, na_rep=na_rep) - md_content += self._convert(html_content).text_content.strip() + "\n\n" - - return DocumentConverterResult( - title=None, - text_content=md_content.strip(), - ) - - -class PptxConverter(HtmlConverter): - """ - Converts PPTX files to Markdown. Supports heading, tables and images with alt text. - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not a PPTX - extension = kwargs.get("file_extension", "") - if extension.lower() != ".pptx": - return None - - md_content = "" - - presentation = pptx.Presentation(local_path) - slide_num = 0 - for slide in presentation.slides: - slide_num += 1 - - md_content += f"\n\n\n" - - title = slide.shapes.title - for shape in slide.shapes: - # Pictures - if self._is_picture(shape): - # https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069 - alt_text = "" - try: - alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "") - except Exception: - pass - - # A placeholder name - filename = re.sub(r"\W", "", shape.name) + ".jpg" - md_content += ( - "\n![" - + (alt_text if alt_text else shape.name) - + "](" - + filename - + ")\n" - ) - - # Tables - if self._is_table(shape): - html_table = "" - first_row = True - for row in shape.table.rows: - html_table += "" - for cell in row.cells: - if first_row: - html_table += "" - else: - html_table += "" - html_table += "" - first_row = False - html_table += "
" + html.escape(cell.text) + "" + html.escape(cell.text) + "
" - md_content += ( - "\n" + self._convert(html_table).text_content.strip() + "\n" - ) - - # Charts - if shape.has_chart: - md_content += self._convert_chart_to_markdown(shape.chart) - - # Text areas - elif shape.has_text_frame: - if shape == title: - md_content += "# " + shape.text.lstrip() + "\n" - else: - md_content += shape.text + "\n" - - md_content = md_content.strip() - - if slide.has_notes_slide: - md_content += "\n\n### Notes:\n" - notes_frame = slide.notes_slide.notes_text_frame - if notes_frame is not None: - md_content += notes_frame.text - md_content = md_content.strip() - - return DocumentConverterResult( - title=None, - text_content=md_content.strip(), - ) - - def _is_picture(self, shape): - if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PICTURE: - return True - if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PLACEHOLDER: - if hasattr(shape, "image"): - return True - return False - - def _is_table(self, shape): - if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.TABLE: - return True - return False - - def _convert_chart_to_markdown(self, chart): - md = "\n\n### Chart" - if chart.has_title: - md += f": {chart.chart_title.text_frame.text}" - md += "\n\n" - data = [] - category_names = [c.label for c in chart.plots[0].categories] - series_names = [s.name for s in chart.series] - data.append(["Category"] + series_names) - - for idx, category in enumerate(category_names): - row = [category] - for series in chart.series: - row.append(series.values[idx]) - data.append(row) - - markdown_table = [] - for row in data: - markdown_table.append("| " + " | ".join(map(str, row)) + " |") - header = markdown_table[0] - separator = "|" + "|".join(["---"] * len(data[0])) + "|" - return md + "\n".join([header, separator] + markdown_table[1:]) - - -class MediaConverter(DocumentConverter): - """ - Abstract class for multi-modal media (e.g., images and audio) - """ - - def _get_metadata(self, local_path): - exiftool = shutil.which("exiftool") - if not exiftool: - return None - else: - try: - result = subprocess.run( - [exiftool, "-json", local_path], - capture_output=True, - text=True, - ).stdout - return json.loads(result)[0] - except Exception: - return None - - -class WavConverter(MediaConverter): - """ - Converts WAV files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` is installed). - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not a WAV - extension = kwargs.get("file_extension", "") - if extension.lower() != ".wav": - return None - - md_content = "" - - # Add metadata - metadata = self._get_metadata(local_path) - if metadata: - for f in [ - "Title", - "Artist", - "Author", - "Band", - "Album", - "Genre", - "Track", - "DateTimeOriginal", - "CreateDate", - "Duration", - ]: - if f in metadata: - md_content += f"{f}: {metadata[f]}\n" - - # Transcribe - if IS_AUDIO_TRANSCRIPTION_CAPABLE: - try: - transcript = self._transcribe_audio(local_path) - md_content += "\n\n### Audio Transcript:\n" + ( - "[No speech detected]" if transcript == "" else transcript - ) - except Exception: - md_content += ( - "\n\n### Audio Transcript:\nError. Could not transcribe this audio." - ) - - return DocumentConverterResult( - title=None, - text_content=md_content.strip(), - ) - - def _transcribe_audio(self, local_path) -> str: - recognizer = sr.Recognizer() - with sr.AudioFile(local_path) as source: - audio = recognizer.record(source) - return recognizer.recognize_google(audio).strip() - - -class Mp3Converter(WavConverter): - """ - Converts MP3 files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` AND `pydub` are installed). - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not a MP3 - extension = kwargs.get("file_extension", "") - if extension.lower() != ".mp3": - return None - - md_content = "" - - # Add metadata - metadata = self._get_metadata(local_path) - if metadata: - for f in [ - "Title", - "Artist", - "Author", - "Band", - "Album", - "Genre", - "Track", - "DateTimeOriginal", - "CreateDate", - "Duration", - ]: - if f in metadata: - md_content += f"{f}: {metadata[f]}\n" - - # Transcribe - if IS_AUDIO_TRANSCRIPTION_CAPABLE: - handle, temp_path = tempfile.mkstemp(suffix=".wav") - os.close(handle) - try: - sound = pydub.AudioSegment.from_mp3(local_path) - sound.export(temp_path, format="wav") - - _args = dict() - _args.update(kwargs) - _args["file_extension"] = ".wav" - - try: - transcript = super()._transcribe_audio(temp_path).strip() - md_content += "\n\n### Audio Transcript:\n" + ( - "[No speech detected]" if transcript == "" else transcript - ) - except Exception: - md_content += "\n\n### Audio Transcript:\nError. Could not transcribe this audio." - - finally: - os.unlink(temp_path) - - # Return the result - return DocumentConverterResult( - title=None, - text_content=md_content.strip(), - ) - - -class ImageConverter(MediaConverter): - """ - Converts images to markdown via extraction of metadata (if `exiftool` is installed), OCR (if `easyocr` is installed), and description via a multimodal LLM (if an llm_client is configured). - """ - - def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: - # Bail if not an image - extension = kwargs.get("file_extension", "") - if extension.lower() not in [".jpg", ".jpeg", ".png"]: - return None - - md_content = "" - - # Add metadata - metadata = self._get_metadata(local_path) - if metadata: - for f in [ - "ImageSize", - "Title", - "Caption", - "Description", - "Keywords", - "Artist", - "Author", - "DateTimeOriginal", - "CreateDate", - "GPSPosition", - ]: - if f in metadata: - md_content += f"{f}: {metadata[f]}\n" - - # Try describing the image with GPTV - llm_client = kwargs.get("llm_client") - llm_model = kwargs.get("llm_model") - if llm_client is not None and llm_model is not None: - md_content += ( - "\n# Description:\n" - + self._get_llm_description( - local_path, - extension, - llm_client, - llm_model, - prompt=kwargs.get("llm_prompt"), - ).strip() - + "\n" - ) - - return DocumentConverterResult( - title=None, - text_content=md_content, - ) - - def _get_llm_description(self, local_path, extension, client, model, prompt=None): - if prompt is None or prompt.strip() == "": - prompt = "Write a detailed caption for this image." - - data_uri = "" - with open(local_path, "rb") as image_file: - content_type, encoding = mimetypes.guess_type("_dummy" + extension) - if content_type is None: - content_type = "image/jpeg" - image_base64 = base64.b64encode(image_file.read()).decode("utf-8") - data_uri = f"data:{content_type};base64,{image_base64}" - - messages = [ - { - "role": "user", - "content": [ - {"type": "text", "text": prompt}, - { - "type": "image_url", - "image_url": { - "url": data_uri, - }, - }, - ], - } - ] - - response = client.chat.completions.create(model=model, messages=messages) - return response.choices[0].message.content - - -class ZipConverter(DocumentConverter): - """Converts ZIP files to markdown by extracting and converting all contained files. - - The converter extracts the ZIP contents to a temporary directory, processes each file - using appropriate converters based on file extensions, and then combines the results - into a single markdown document. The temporary directory is cleaned up after processing. - - Example output format: - ```markdown - Content from the zip file `example.zip`: - - ## File: docs/readme.txt - - This is the content of readme.txt - Multiple lines are preserved - - ## File: images/example.jpg - - ImageSize: 1920x1080 - DateTimeOriginal: 2024-02-15 14:30:00 - Description: A beautiful landscape photo - - ## File: data/report.xlsx - - ## Sheet1 - | Column1 | Column2 | Column3 | - |---------|---------|---------| - | data1 | data2 | data3 | - | data4 | data5 | data6 | - ``` - - Key features: - - Maintains original file structure in headings - - Processes nested files recursively - - Uses appropriate converters for each file type - - Preserves formatting of converted content - - Cleans up temporary files after processing - """ - - def convert( - self, local_path: str, **kwargs: Any - ) -> Union[None, DocumentConverterResult]: - # Bail if not a ZIP - extension = kwargs.get("file_extension", "") - if extension.lower() != ".zip": - return None - - # Get parent converters list if available - parent_converters = kwargs.get("_parent_converters", []) - if not parent_converters: - return DocumentConverterResult( - title=None, - text_content=f"[ERROR] No converters available to process zip contents from: {local_path}", - ) - - extracted_zip_folder_name = ( - f"extracted_{os.path.basename(local_path).replace('.zip', '_zip')}" - ) - extraction_dir = os.path.normpath( - os.path.join(os.path.dirname(local_path), extracted_zip_folder_name) - ) - md_content = f"Content from the zip file `{os.path.basename(local_path)}`:\n\n" - - try: - # Extract the zip file safely - with zipfile.ZipFile(local_path, "r") as zipObj: - # Safeguard against path traversal - for member in zipObj.namelist(): - member_path = os.path.normpath(os.path.join(extraction_dir, member)) - if ( - not os.path.commonprefix([extraction_dir, member_path]) - == extraction_dir - ): - raise ValueError( - f"Path traversal detected in zip file: {member}" - ) - - # Extract all files safely - zipObj.extractall(path=extraction_dir) - - # Process each extracted file - for root, dirs, files in os.walk(extraction_dir): - for name in files: - file_path = os.path.join(root, name) - relative_path = os.path.relpath(file_path, extraction_dir) - - # Get file extension - _, file_extension = os.path.splitext(name) - - # Update kwargs for the file - file_kwargs = kwargs.copy() - file_kwargs["file_extension"] = file_extension - file_kwargs["_parent_converters"] = parent_converters - - # Try converting the file using available converters - for converter in parent_converters: - # Skip the zip converter to avoid infinite recursion - if isinstance(converter, ZipConverter): - continue - - result = converter.convert(file_path, **file_kwargs) - if result is not None: - md_content += f"\n## File: {relative_path}\n\n" - md_content += result.text_content + "\n\n" - break - - # Clean up extracted files if specified - if kwargs.get("cleanup_extracted", True): - shutil.rmtree(extraction_dir) - - return DocumentConverterResult(title=None, text_content=md_content.strip()) - - except zipfile.BadZipFile: - return DocumentConverterResult( - title=None, - text_content=f"[ERROR] Invalid or corrupted zip file: {local_path}", - ) - except ValueError as ve: - return DocumentConverterResult( - title=None, - text_content=f"[ERROR] Security error in zip file {local_path}: {str(ve)}", - ) - except Exception as e: - return DocumentConverterResult( - title=None, - text_content=f"[ERROR] Failed to process zip file {local_path}: {str(e)}", - ) - - -class FileConversionException(BaseException): - pass - - -class UnsupportedFormatException(BaseException): - pass - - -class MarkItDown: - """(In preview) An extremely simple text-based document reader, suitable for LLM use. - This reader will convert common file-types or webpages to Markdown.""" - - def __init__( - self, - requests_session: Optional[requests.Session] = None, - llm_client: Optional[Any] = None, - llm_model: Optional[str] = None, - style_map: Optional[str] = None, - # Deprecated - mlm_client: Optional[Any] = None, - mlm_model: Optional[str] = None, - ): - if requests_session is None: - self._requests_session = requests.Session() - else: - self._requests_session = requests_session - - # Handle deprecation notices - ############################# - if mlm_client is not None: - if llm_client is None: - warn( - "'mlm_client' is deprecated, and was renamed 'llm_client'.", - DeprecationWarning, - ) - llm_client = mlm_client - mlm_client = None - else: - raise ValueError( - "'mlm_client' is deprecated, and was renamed 'llm_client'. Do not use both at the same time. Just use 'llm_client' instead." - ) - - if mlm_model is not None: - if llm_model is None: - warn( - "'mlm_model' is deprecated, and was renamed 'llm_model'.", - DeprecationWarning, - ) - llm_model = mlm_model - mlm_model = None - else: - raise ValueError( - "'mlm_model' is deprecated, and was renamed 'llm_model'. Do not use both at the same time. Just use 'llm_model' instead." - ) - ############################# - - self._llm_client = llm_client - self._llm_model = llm_model - self._style_map = style_map - - self._page_converters: List[DocumentConverter] = [] - - # Register converters for successful browsing operations - # Later registrations are tried first / take higher priority than earlier registrations - # To this end, the most specific converters should appear below the most generic converters - self.register_page_converter(PlainTextConverter()) - self.register_page_converter(HtmlConverter()) - self.register_page_converter(RSSConverter()) - self.register_page_converter(WikipediaConverter()) - self.register_page_converter(YouTubeConverter()) - self.register_page_converter(BingSerpConverter()) - self.register_page_converter(DocxConverter()) - self.register_page_converter(XlsxConverter()) - self.register_page_converter(PptxConverter()) - self.register_page_converter(WavConverter()) - self.register_page_converter(Mp3Converter()) - self.register_page_converter(ImageConverter()) - self.register_page_converter(IpynbConverter()) - self.register_page_converter(PdfConverter()) - self.register_page_converter(ZipConverter()) - - def convert( - self, source: Union[str, requests.Response, Path], **kwargs: Any - ) -> DocumentConverterResult: # TODO: deal with kwargs - """ - Args: - - source: can be a string representing a path either as string pathlib path object or url, or a requests.response object - - extension: specifies the file extension to use when interpreting the file. If None, infer from source (path, uri, content-type, etc.) - """ - - # Local path or url - if isinstance(source, str): - if ( - source.startswith("http://") - or source.startswith("https://") - or source.startswith("file://") - ): - return self.convert_url(source, **kwargs) - else: - return self.convert_local(source, **kwargs) - # Request response - elif isinstance(source, requests.Response): - return self.convert_response(source, **kwargs) - elif isinstance(source, Path): - return self.convert_local(source, **kwargs) - - def convert_local( - self, path: Union[str, Path], **kwargs: Any - ) -> DocumentConverterResult: # TODO: deal with kwargs - if isinstance(path, Path): - path = str(path) - # Prepare a list of extensions to try (in order of priority) - ext = kwargs.get("file_extension") - extensions = [ext] if ext is not None else [] - - # Get extension alternatives from the path and puremagic - base, ext = os.path.splitext(path) - self._append_ext(extensions, ext) - - for g in self._guess_ext_magic(path): - self._append_ext(extensions, g) - - # Convert - return self._convert(path, extensions, **kwargs) - - # TODO what should stream's type be? - def convert_stream( - self, stream: Any, **kwargs: Any - ) -> DocumentConverterResult: # TODO: deal with kwargs - # Prepare a list of extensions to try (in order of priority) - ext = kwargs.get("file_extension") - extensions = [ext] if ext is not None else [] - - # Save the file locally to a temporary file. It will be deleted before this method exits - handle, temp_path = tempfile.mkstemp() - fh = os.fdopen(handle, "wb") - result = None - try: - # Write to the temporary file - content = stream.read() - if isinstance(content, str): - fh.write(content.encode("utf-8")) - else: - fh.write(content) - fh.close() - - # Use puremagic to check for more extension options - for g in self._guess_ext_magic(temp_path): - self._append_ext(extensions, g) - - # Convert - result = self._convert(temp_path, extensions, **kwargs) - # Clean up - finally: - try: - fh.close() - except Exception: - pass - os.unlink(temp_path) - - return result - - def convert_url( - self, url: str, **kwargs: Any - ) -> DocumentConverterResult: # TODO: fix kwargs type - # Send a HTTP request to the URL - response = self._requests_session.get(url, stream=True) - response.raise_for_status() - return self.convert_response(response, **kwargs) - - def convert_response( - self, response: requests.Response, **kwargs: Any - ) -> DocumentConverterResult: # TODO fix kwargs type - # Prepare a list of extensions to try (in order of priority) - ext = kwargs.get("file_extension") - extensions = [ext] if ext is not None else [] - - # Guess from the mimetype - content_type = response.headers.get("content-type", "").split(";")[0] - self._append_ext(extensions, mimetypes.guess_extension(content_type)) - - # Read the content disposition if there is one - content_disposition = response.headers.get("content-disposition", "") - m = re.search(r"filename=([^;]+)", content_disposition) - if m: - base, ext = os.path.splitext(m.group(1).strip("\"'")) - self._append_ext(extensions, ext) - - # Read from the extension from the path - base, ext = os.path.splitext(urlparse(response.url).path) - self._append_ext(extensions, ext) - - # Save the file locally to a temporary file. It will be deleted before this method exits - handle, temp_path = tempfile.mkstemp() - fh = os.fdopen(handle, "wb") - result = None - try: - # Download the file - for chunk in response.iter_content(chunk_size=512): - fh.write(chunk) - fh.close() - - # Use puremagic to check for more extension options - for g in self._guess_ext_magic(temp_path): - self._append_ext(extensions, g) - - # Convert - result = self._convert(temp_path, extensions, url=response.url, **kwargs) - # Clean up - finally: - try: - fh.close() - except Exception: - pass - os.unlink(temp_path) - - return result - - def _convert( - self, local_path: str, extensions: List[Union[str, None]], **kwargs - ) -> DocumentConverterResult: - error_trace = "" - for ext in extensions + [None]: # Try last with no extension - for converter in self._page_converters: - _kwargs = copy.deepcopy(kwargs) - - # Overwrite file_extension appropriately - if ext is None: - if "file_extension" in _kwargs: - del _kwargs["file_extension"] - else: - _kwargs.update({"file_extension": ext}) - - # Copy any additional global options - if "llm_client" not in _kwargs and self._llm_client is not None: - _kwargs["llm_client"] = self._llm_client - - if "llm_model" not in _kwargs and self._llm_model is not None: - _kwargs["llm_model"] = self._llm_model - - # Add the list of converters for nested processing - _kwargs["_parent_converters"] = self._page_converters - - if "style_map" not in _kwargs and self._style_map is not None: - _kwargs["style_map"] = self._style_map - - # If we hit an error log it and keep trying - try: - res = converter.convert(local_path, **_kwargs) - except Exception: - error_trace = ("\n\n" + traceback.format_exc()).strip() - - if res is not None: - # Normalize the content - res.text_content = "\n".join( - [line.rstrip() for line in re.split(r"\r?\n", res.text_content)] - ) - res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content) - - # Todo - return res - - # If we got this far without success, report any exceptions - if len(error_trace) > 0: - raise FileConversionException( - f"Could not convert '{local_path}' to Markdown. File type was recognized as {extensions}. While converting the file, the following error was encountered:\n\n{error_trace}" - ) - - # Nothing can handle it! - raise UnsupportedFormatException( - f"Could not convert '{local_path}' to Markdown. The formats {extensions} are not supported." - ) - - def _append_ext(self, extensions, ext): - """Append a unique non-None, non-empty extension to a list of extensions.""" - if ext is None: - return - ext = ext.strip() - if ext == "": - return - # if ext not in extensions: - extensions.append(ext) - - def _guess_ext_magic(self, path): - """Use puremagic (a Python implementation of libmagic) to guess a file's extension based on the first few bytes.""" - # Use puremagic to guess - try: - guesses = puremagic.magic_file(path) - extensions = list() - for g in guesses: - ext = g.extension.strip() - if len(ext) > 0: - if not ext.startswith("."): - ext = "." + ext - if ext not in extensions: - extensions.append(ext) - return extensions - except FileNotFoundError: - pass - except IsADirectoryError: - pass - except PermissionError: - pass - return [] - - def register_page_converter(self, converter: DocumentConverter) -> None: - """Register a page text converter.""" - self._page_converters.insert(0, converter) diff --git a/tests/test_markitdown.py b/tests/test_markitdown.py deleted file mode 100644 index 1eefba1..0000000 --- a/tests/test_markitdown.py +++ /dev/null @@ -1,312 +0,0 @@ -#!/usr/bin/env python3 -m pytest -import io -import os -import shutil - -import pytest -import requests - -from warnings import catch_warnings, resetwarnings - -from markitdown import MarkItDown - -skip_remote = ( - True if os.environ.get("GITHUB_ACTIONS") else False -) # Don't run these tests in CI - - -# Don't run the llm tests without a key and the client library -skip_llm = False if os.environ.get("OPENAI_API_KEY") else True -try: - import openai -except ModuleNotFoundError: - skip_llm = True - -# Skip exiftool tests if not installed -skip_exiftool = shutil.which("exiftool") is None - -TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "test_files") - -JPG_TEST_EXIFTOOL = { - "Author": "AutoGen Authors", - "Title": "AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation", - "Description": "AutoGen enables diverse LLM-based applications", - "ImageSize": "1615x1967", - "DateTimeOriginal": "2024:03:14 22:10:00", -} - -PDF_TEST_URL = "https://arxiv.org/pdf/2308.08155v2.pdf" -PDF_TEST_STRINGS = [ - "While there is contemporaneous exploration of multi-agent approaches" -] - -YOUTUBE_TEST_URL = "https://www.youtube.com/watch?v=V2qZ_lgxTzg" -YOUTUBE_TEST_STRINGS = [ - "## AutoGen FULL Tutorial with Python (Step-By-Step)", - "This is an intermediate tutorial for installing and using AutoGen locally", - "PT15M4S", - "the model we're going to be using today is GPT 3.5 turbo", # From the transcript -] - -XLSX_TEST_STRINGS = [ - "## 09060124-b5e7-4717-9d07-3c046eb", - "6ff4173b-42a5-4784-9b19-f49caff4d93d", - "affc7dad-52dc-4b98-9b5d-51e65d8a8ad0", -] - -XLSX_TEST_EXCLUDES = ["Unnamed:", "NaN"] - - -DOCX_TEST_STRINGS = [ - "314b0a30-5b04-470b-b9f7-eed2c2bec74a", - "49e168b7-d2ae-407f-a055-2167576f39a1", - "## d666f1f7-46cb-42bd-9a39-9a39cf2a509f", - "# Abstract", - "# Introduction", - "AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation", -] - -DOCX_COMMENT_TEST_STRINGS = [ - "314b0a30-5b04-470b-b9f7-eed2c2bec74a", - "49e168b7-d2ae-407f-a055-2167576f39a1", - "## d666f1f7-46cb-42bd-9a39-9a39cf2a509f", - "# Abstract", - "# Introduction", - "AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation", - "This is a test comment. 12df-321a", - "Yet another comment in the doc. 55yiyi-asd09", -] - -PPTX_TEST_STRINGS = [ - "2cdda5c8-e50e-4db4-b5f0-9722a649f455", - "04191ea8-5c73-4215-a1d3-1cfb43aaaf12", - "44bf7d06-5e7a-4a40-a2e1-a2e42ef28c8a", - "1b92870d-e3b5-4e65-8153-919f4ff45592", - "AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation", - "a3f6004b-6f4f-4ea8-bee3-3741f4dc385f", # chart title - "2003", # chart value -] - -BLOG_TEST_URL = "https://microsoft.github.io/autogen/blog/2023/04/21/LLM-tuning-math" -BLOG_TEST_STRINGS = [ - "Large language models (LLMs) are powerful tools that can generate natural language texts for various applications, such as chatbots, summarization, translation, and more. GPT-4 is currently the state of the art LLM in the world. Is model selection irrelevant? What about inference parameters?", - "an example where high cost can easily prevent a generic complex", -] - - -RSS_TEST_STRINGS = [ - "The Official Microsoft Blog", - "In the case of AI, it is absolutely true that the industry is moving incredibly fast", -] - - -WIKIPEDIA_TEST_URL = "https://en.wikipedia.org/wiki/Microsoft" -WIKIPEDIA_TEST_STRINGS = [ - "Microsoft entered the operating system (OS) business in 1980 with its own version of [Unix]", - 'Microsoft was founded by [Bill Gates](/wiki/Bill_Gates "Bill Gates")', -] -WIKIPEDIA_TEST_EXCLUDES = [ - "You are encouraged to create an account and log in", - "154 languages", - "move to sidebar", -] - -SERP_TEST_URL = "https://www.bing.com/search?q=microsoft+wikipedia" -SERP_TEST_STRINGS = [ - "](https://en.wikipedia.org/wiki/Microsoft", - "Microsoft Corporation is **an American multinational corporation and technology company headquartered** in Redmond", - "1995–2007: Foray into the Web, Windows 95, Windows XP, and Xbox", -] -SERP_TEST_EXCLUDES = [ - "https://www.bing.com/ck/a?!&&p=", - "data:image/svg+xml,%3Csvg%20width%3D", -] - -CSV_CP932_TEST_STRINGS = [ - "名前,年齢,住所", - "佐藤太郎,30,東京", - "三木英子,25,大阪", - "髙橋淳,35,名古屋", -] - -LLM_TEST_STRINGS = [ - "5bda1dd6", -] - - -# --- Helper Functions --- -def validate_strings(result, expected_strings, exclude_strings=None): - """Validate presence or absence of specific strings.""" - text_content = result.text_content.replace("\\", "") - for string in expected_strings: - assert string in text_content - if exclude_strings: - for string in exclude_strings: - assert string not in text_content - - -@pytest.mark.skipif( - skip_remote, - reason="do not run tests that query external urls", -) -def test_markitdown_remote() -> None: - markitdown = MarkItDown() - - # By URL - result = markitdown.convert(PDF_TEST_URL) - for test_string in PDF_TEST_STRINGS: - assert test_string in result.text_content - - # By stream - response = requests.get(PDF_TEST_URL) - result = markitdown.convert_stream( - io.BytesIO(response.content), file_extension=".pdf", url=PDF_TEST_URL - ) - for test_string in PDF_TEST_STRINGS: - assert test_string in result.text_content - - # Youtube - # TODO: This test randomly fails for some reason. Haven't been able to repro it yet. Disabling until I can debug the issue - # result = markitdown.convert(YOUTUBE_TEST_URL) - # for test_string in YOUTUBE_TEST_STRINGS: - # assert test_string in result.text_content - - -def test_markitdown_local() -> None: - markitdown = MarkItDown() - - # Test XLSX processing - result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.xlsx")) - validate_strings(result, XLSX_TEST_STRINGS, XLSX_TEST_EXCLUDES) - - # Test DOCX processing - result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.docx")) - validate_strings(result, DOCX_TEST_STRINGS) - - # Test DOCX processing, with comments - result = markitdown.convert( - os.path.join(TEST_FILES_DIR, "test_with_comment.docx"), - style_map="comment-reference => ", - ) - validate_strings(result, DOCX_COMMENT_TEST_STRINGS) - - # Test DOCX processing, with comments and setting style_map on init - markitdown_with_style_map = MarkItDown(style_map="comment-reference => ") - result = markitdown_with_style_map.convert( - os.path.join(TEST_FILES_DIR, "test_with_comment.docx") - ) - validate_strings(result, DOCX_COMMENT_TEST_STRINGS) - - # Test PPTX processing - result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.pptx")) - validate_strings(result, PPTX_TEST_STRINGS) - - # Test HTML processing - result = markitdown.convert( - os.path.join(TEST_FILES_DIR, "test_blog.html"), url=BLOG_TEST_URL - ) - validate_strings(result, BLOG_TEST_STRINGS) - - # Test ZIP file processing - result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_files.zip")) - validate_strings(result, XLSX_TEST_STRINGS) - - # Test Wikipedia processing - result = markitdown.convert( - os.path.join(TEST_FILES_DIR, "test_wikipedia.html"), url=WIKIPEDIA_TEST_URL - ) - text_content = result.text_content.replace("\\", "") - validate_strings(result, WIKIPEDIA_TEST_STRINGS, WIKIPEDIA_TEST_EXCLUDES) - - # Test Bing processing - result = markitdown.convert( - os.path.join(TEST_FILES_DIR, "test_serp.html"), url=SERP_TEST_URL - ) - text_content = result.text_content.replace("\\", "") - validate_strings(result, SERP_TEST_STRINGS, SERP_TEST_EXCLUDES) - - # Test RSS processing - result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_rss.xml")) - text_content = result.text_content.replace("\\", "") - for test_string in RSS_TEST_STRINGS: - assert test_string in text_content - - ## Test non-UTF-8 encoding - result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_mskanji.csv")) - validate_strings(result, CSV_CP932_TEST_STRINGS) - - -@pytest.mark.skipif( - skip_exiftool, - reason="do not run if exiftool is not installed", -) -def test_markitdown_exiftool() -> None: - markitdown = MarkItDown() - - # Test JPG metadata processing - result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.jpg")) - for key in JPG_TEST_EXIFTOOL: - target = f"{key}: {JPG_TEST_EXIFTOOL[key]}" - assert target in result.text_content - - -def test_markitdown_deprecation() -> None: - try: - with catch_warnings(record=True) as w: - test_client = object() - markitdown = MarkItDown(mlm_client=test_client) - assert len(w) == 1 - assert w[0].category is DeprecationWarning - assert markitdown._llm_client == test_client - finally: - resetwarnings() - - try: - with catch_warnings(record=True) as w: - markitdown = MarkItDown(mlm_model="gpt-4o") - assert len(w) == 1 - assert w[0].category is DeprecationWarning - assert markitdown._llm_model == "gpt-4o" - finally: - resetwarnings() - - try: - test_client = object() - markitdown = MarkItDown(mlm_client=test_client, llm_client=test_client) - assert False - except ValueError: - pass - - try: - markitdown = MarkItDown(mlm_model="gpt-4o", llm_model="gpt-4o") - assert False - except ValueError: - pass - - -@pytest.mark.skipif( - skip_llm, - reason="do not run llm tests without a key", -) -def test_markitdown_llm() -> None: - client = openai.OpenAI() - markitdown = MarkItDown(llm_client=client, llm_model="gpt-4o") - - result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_llm.jpg")) - - for test_string in LLM_TEST_STRINGS: - assert test_string in result.text_content - - # This is not super precise. It would also accept "red square", "blue circle", - # "the square is not blue", etc. But it's sufficient for this test. - for test_string in ["red", "circle", "blue", "square"]: - assert test_string in result.text_content.lower() - - -if __name__ == "__main__": - """Runs this file's tests from the command line.""" - test_markitdown_remote() - test_markitdown_local() - test_markitdown_exiftool() - test_markitdown_deprecation() - test_markitdown_llm()