Most converters are now working.

This commit is contained in:
Adam Fourney 2025-03-05 00:24:54 -08:00
parent 4a034da269
commit c426cb81b3
15 changed files with 422 additions and 286 deletions

View file

@ -41,7 +41,7 @@ class DocumentConverterResult:
self.markdown = markdown
def __str__(self) -> str:
"""Return the Markdown content."""
"""Return the converted Markdown text."""
return self.markdown

View file

@ -130,7 +130,7 @@ class MarkItDown:
# Later registrations are tried first / take higher priority than earlier registrations
# To this end, the most specific converters should appear below the most generic converters
self.register_converter(PlainTextConverter())
self.register_converter(ZipConverter())
self.register_converter(ZipConverter(markitdown=self))
self.register_converter(HtmlConverter())
self.register_converter(RssConverter())
self.register_converter(WikipediaConverter())
@ -464,16 +464,16 @@ class MarkItDown:
# Attempt the conversion
if _accepts:
# try:
res = converter.convert(file_stream, stream_info, **_kwargs)
# except Exception:
# failed_attempts.append(
# FailedConversionAttempt(
# converter=converter, exc_info=sys.exc_info()
# )
# )
# finally:
file_stream.seek(cur_pos)
try:
res = converter.convert(file_stream, stream_info, **_kwargs)
except Exception:
failed_attempts.append(
FailedConversionAttempt(
converter=converter, exc_info=sys.exc_info()
)
)
finally:
file_stream.seek(cur_pos)
if res is not None:
# Normalize the content

View file

@ -1,4 +1,6 @@
import puremagic
import mimetypes
import os
from dataclasses import dataclass, asdict
from typing import Optional, BinaryIO, List, TypeVar, Type
@ -56,6 +58,18 @@ class StreamInfo:
"""
guesses: List[StreamInfo] = []
# Add a guess purely based on the filename hint
if filename_hint:
try:
mimetype, _ = mimetypes.guess_file_type(filename_hint)
except AttributeError:
mimetype, _ = mimetypes.guess_type(filename_hint)
if mimetype:
guesses.append(
cls(mimetype=mimetype, extension=os.path.splitext(filename_hint)[1])
)
def _puremagic(
file_stream, filename_hint
) -> puremagic.main.PureMagicWithConfidence:

View file

@ -41,7 +41,7 @@ class BingSerpConverter(DocumentConverter):
Make sure we're dealing with HTML content *from* Bing.
"""
url = (stream_info.url or "").lower()
url = stream_info.url or ""
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()

View file

@ -1,4 +1,4 @@
from typing import BinaryIO, Any
from typing import BinaryIO, Any, Union
import base64
import mimetypes
from ._exiftool import exiftool_metadata
@ -71,53 +71,73 @@ class ImageConverter(DocumentConverter):
if f in metadata:
md_content += f"{f}: {metadata[f]}\n"
# # Try describing the image with GPTV
# llm_client = kwargs.get("llm_client")
# llm_model = kwargs.get("llm_model")
# if llm_client is not None and llm_model is not None:
# md_content += (
# "\n# Description:\n"
# + self._get_llm_description(
# local_path,
# extension,
# llm_client,
# llm_model,
# prompt=kwargs.get("llm_prompt"),
# ).strip()
# + "\n"
# )
# Try describing the image with GPT
llm_client = kwargs.get("llm_client")
llm_model = kwargs.get("llm_model")
if llm_client is not None and llm_model is not None:
md_content += (
"\n# Description:\n"
+ self._get_llm_description(
file_stream,
stream_info,
client=llm_client,
model=llm_model,
prompt=kwargs.get("llm_prompt"),
).strip()
+ "\n"
)
return DocumentConverterResult(
markdown=md_content,
)
def _get_llm_description(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
*,
client,
model,
prompt=None,
) -> Union[None, str]:
if prompt is None or prompt.strip() == "":
prompt = "Write a detailed caption for this image."
# def _get_llm_description(self, local_path, extension, client, model, prompt=None):
# if prompt is None or prompt.strip() == "":
# prompt = "Write a detailed caption for this image."
#
# data_uri = ""
# with open(local_path, "rb") as image_file:
# content_type, encoding = mimetypes.guess_type("_dummy" + extension)
# if content_type is None:
# content_type = "image/jpeg"
# image_base64 = base64.b64encode(image_file.read()).decode("utf-8")
# data_uri = f"data:{content_type};base64,{image_base64}"
#
# messages = [
# {
# "role": "user",
# "content": [
# {"type": "text", "text": prompt},
# {
# "type": "image_url",
# "image_url": {
# "url": data_uri,
# },
# },
# ],
# }
# ]
#
# response = client.chat.completions.create(model=model, messages=messages)
# return response.choices[0].message.content
# Get the content type
content_type = stream_info.mimetype
if not content_type:
content_type, _ = mimetypes.guess_type("_dummy" + stream_info.extension)
if not content_type:
content_type = "application/octet-stream"
# Convert to base64
cur_pos = file_stream.tell()
try:
base64_image = base64.b64encode(file_stream.read()).decode("utf-8")
except Exception as e:
return None
finally:
file_stream.seek(cur_pos)
# Prepare the data-uri
data_uri = f"data:{content_type};base64,{base64_image}"
# Prepare the OpenAI API request
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": data_uri,
},
},
],
}
]
# Call the OpenAI API
response = client.chat.completions.create(model=model, messages=messages)
return response.choices[0].message.content

View file

@ -0,0 +1,50 @@
from typing import BinaryIO, Any, Union
import base64
import mimetypes
from .._stream_info import StreamInfo
def llm_caption(
file_stream: BinaryIO, stream_info: StreamInfo, *, client, model, prompt=None
) -> Union[None, str]:
if prompt is None or prompt.strip() == "":
prompt = "Write a detailed caption for this image."
# Get the content type
content_type = stream_info.mimetype
if not content_type:
content_type, _ = mimetypes.guess_type("_dummy" + stream_info.extension)
if not content_type:
content_type = "application/octet-stream"
# Convert to base64
cur_pos = file_stream.tell()
try:
base64_image = base64.b64encode(file_stream.read()).decode("utf-8")
except Exception as e:
return None
finally:
file_stream.seek(cur_pos)
# Prepare the data-uri
data_uri = f"data:{content_type};base64,{base64_image}"
# Prepare the OpenAI API request
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": data_uri,
},
},
],
}
]
# Call the OpenAI API
response = client.chat.completions.create(model=model, messages=messages)
return response.choices[0].message.content

View file

@ -1,8 +1,13 @@
import sys
from typing import Union
from typing import BinaryIO, Any
from ._html_converter import HtmlConverter
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE
# Try loading optional (but in this case, required) dependencies
# Save reporting of any exceptions for later
_dependency_exc_info = None
@ -14,6 +19,14 @@ except ImportError:
_dependency_exc_info = sys.exc_info()
ACCEPTED_MIME_TYPE_PREFIXES = [
"application/pdf",
"application/x-pdf",
]
ACCEPTED_FILE_EXTENSIONS = [".pdf"]
class PdfConverter(DocumentConverter):
"""
Converts PDFs to Markdown. Most style information is ignored, so the results are essentially plain-text.
@ -24,12 +37,30 @@ class PdfConverter(DocumentConverter):
):
super().__init__(priority=priority)
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bail if not a PDF
extension = kwargs.get("file_extension", "")
if extension.lower() != ".pdf":
return None
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
return False
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
# Check the dependencies
if _dependency_exc_info is not None:
raise MissingDependencyException(
@ -43,5 +74,5 @@ class PdfConverter(DocumentConverter):
) # Restore the original traceback
return DocumentConverterResult(
markdown=pdfminer.high_level.extract_text(local_path)
markdown=pdfminer.high_level.extract_text(file_stream),
)

View file

@ -1,13 +1,26 @@
import mimetypes
from charset_normalizer import from_path
from typing import Any, Union
import sys
from typing import BinaryIO, Any
from charset_normalizer import from_bytes
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
# Try loading optional (but in this case, required) dependencies
# Save reporting of any exceptions for later
_dependency_exc_info = None
try:
import mammoth
except ImportError:
# Preserve the error and stack trace for later
_dependency_exc_info = sys.exc_info()
ACCEPTED_MIME_TYPE_PREFIXES = [
"text/",
"application/json",
]
# Mimetypes to ignore (commonly confused extensions)
IGNORE_MIMETYPES = [
IGNORE_MIME_TYPE_PREFIXES = [
"text/vnd.in3d.spot", # .spo wich is confused with xls, doc, etc.
"text/vnd.graphviz", # .dot which is confused with xls, doc, etc.
]
@ -21,26 +34,34 @@ class PlainTextConverter(DocumentConverter):
):
super().__init__(priority=priority)
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
for prefix in IGNORE_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return False
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
return False
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
# Guess the content type from any file extension that might be around
content_type, _ = mimetypes.guess_type(
"__placeholder" + kwargs.get("file_extension", "")
)
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
if stream_info.charset:
text_content = file_stream.read().decode(stream_info.charset)
else:
text_content = str(from_bytes(file_stream.read()).best())
# Ignore common false positives
if content_type in IGNORE_MIMETYPES:
content_type = None
# Only accept text files
if content_type is None:
return None
elif all(
not content_type.lower().startswith(type_prefix)
for type_prefix in ["text/", "application/json"]
):
return None
text_content = str(from_path(local_path).best())
return DocumentConverterResult(markdown=text_content)

View file

@ -1,11 +1,14 @@
import sys
import base64
import os
import io
import re
import html
from typing import BinaryIO, Any
from ._html_converter import HtmlConverter
from ._llm_caption import llm_caption
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE
@ -38,35 +41,6 @@ class PptxConverter(DocumentConverter):
super().__init__(priority=priority)
self._html_converter = HtmlConverter()
def _get_llm_description(
self, llm_client, llm_model, image_blob, content_type, prompt=None
):
if prompt is None or prompt.strip() == "":
prompt = "Write a detailed alt text for this image with less than 50 words."
image_base64 = base64.b64encode(image_blob).decode("utf-8")
data_uri = f"data:{content_type};base64,{image_base64}"
messages = [
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": data_uri,
},
},
{"type": "text", "text": prompt},
],
}
]
response = llm_client.chat.completions.create(
model=llm_model, messages=messages
)
return response.choices[0].message.content
def accepts(
self,
file_stream: BinaryIO,
@ -120,41 +94,54 @@ class PptxConverter(DocumentConverter):
if self._is_picture(shape):
# https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069
llm_description = None
alt_text = None
llm_description = ""
alt_text = ""
# Potentially generate a description using an LLM
llm_client = kwargs.get("llm_client")
llm_model = kwargs.get("llm_model")
if llm_client is not None and llm_model is not None:
# Prepare a file_stream and stream_info for the image data
image_filename = shape.image.filename
image_extension = None
if image_filename:
image_extension = os.path.splitext(image_filename)[1]
image_stream_info = StreamInfo(
mimetype=shape.image.content_type,
extension=image_extension,
filename=image_filename,
)
image_stream = io.BytesIO(shape.image.blob)
# Caption the image
try:
llm_description = self._get_llm_description(
llm_client,
llm_model,
shape.image.blob,
shape.image.content_type,
llm_description = llm_caption(
image_stream,
image_stream_info,
client=llm_client,
model=llm_model,
prompt=kwargs.get("llm_prompt"),
)
except Exception:
# Unable to describe with LLM
# Unable to generate a description
pass
if not llm_description:
try:
alt_text = shape._element._nvXxPr.cNvPr.attrib.get(
"descr", ""
)
except Exception:
# Unable to get alt text
pass
# Also grab any description embedded in the deck
try:
alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "")
except Exception:
# Unable to get alt text
pass
# Prepare the alt, escaping any special characters
alt_text = "\n".join([llm_description, alt_text]) or shape.name
alt_text = re.sub(r"[\r\n\[\]]", " ", alt_text)
alt_text = re.sub(r"\s+", " ", alt_text).strip()
# A placeholder name
filename = re.sub(r"\W", "", shape.name) + ".jpg"
md_content += (
"\n!["
+ (llm_description or alt_text or shape.name)
+ "]("
+ filename
+ ")\n"
)
md_content += "\n![" + alt_text + "](" + filename + ")\n"
# Tables
if self._is_table(shape):

View file

@ -36,7 +36,7 @@ class WikipediaConverter(DocumentConverter):
Make sure we're dealing with HTML content *from* Wikipedia.
"""
url = (stream_info.url or "").lower()
url = stream_info.url or ""
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()

View file

@ -1,14 +1,15 @@
import re
import sys
import json
import urllib.parse
import time
from typing import Any, Union, Dict, List
from urllib.parse import parse_qs, urlparse
import io
import re
from typing import Any, BinaryIO, Optional, Dict, List, Union
from urllib.parse import parse_qs, urlparse, unquote
from bs4 import BeautifulSoup
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
from ._markdownify import _CustomMarkdownify
# Optional YouTube transcription support
try:
@ -19,6 +20,17 @@ except ModuleNotFoundError:
IS_YOUTUBE_TRANSCRIPT_CAPABLE = False
ACCEPTED_MIME_TYPE_PREFIXES = [
"text/html",
"application/xhtml",
]
ACCEPTED_FILE_EXTENSIONS = [
".html",
".htm",
]
class YouTubeConverter(DocumentConverter):
"""Handle YouTube specially, focusing on the video title, description, and transcript."""
@ -27,45 +39,45 @@ class YouTubeConverter(DocumentConverter):
):
super().__init__(priority=priority)
def retry_operation(self, operation, retries=3, delay=2):
"""Retries the operation if it fails."""
attempt = 0
while attempt < retries:
try:
return operation() # Attempt the operation
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
time.sleep(delay) # Wait before retrying
attempt += 1
# If all attempts fail, raise the last exception
raise Exception(f"Operation failed after {retries} attempts.")
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
"""
Make sure we're dealing with HTML content *from* YouTube.
"""
url = stream_info.url or ""
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
# Bail if not YouTube
extension = kwargs.get("file_extension", "")
if extension.lower() not in [".html", ".htm"]:
return None
url = kwargs.get("url", "")
url = urllib.parse.unquote(url)
url = unquote(url)
url = url.replace(r"\?", "?").replace(r"\=", "=")
if not url.startswith("https://www.youtube.com/watch?"):
return None
# Not a YouTube URL
return False
# Parse the file with error handling
try:
with open(local_path, "rt", encoding="utf-8") as fh:
soup = BeautifulSoup(fh.read(), "html.parser")
except Exception as e:
print(f"Error reading YouTube page: {e}")
return None
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
if not soup.title or not soup.title.string:
return None
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
# Not HTML content
return False
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
# Parse the stream
encoding = "utf-8" if stream_info.charset is None else stream_info.charset
soup = BeautifulSoup(file_stream, "html.parser", from_encoding=encoding)
# Read the meta tags
metadata: Dict[str, str] = {"title": soup.title.string}
@ -126,7 +138,7 @@ class YouTubeConverter(DocumentConverter):
if IS_YOUTUBE_TRANSCRIPT_CAPABLE:
transcript_text = ""
parsed_url = urlparse(url) # type: ignore
parsed_url = urlparse(stream_info.url) # type: ignore
params = parse_qs(parsed_url.query) # type: ignore
if "v" in params and params["v"][0]:
video_id = str(params["v"][0])
@ -135,7 +147,7 @@ class YouTubeConverter(DocumentConverter):
"youtube_transcript_languages", ("en",)
)
# Retry the transcript fetching operation
transcript = self.retry_operation(
transcript = self._retry_operation(
lambda: YouTubeTranscriptApi.get_transcript(
video_id, languages=youtube_transcript_languages
),
@ -188,3 +200,17 @@ class YouTubeConverter(DocumentConverter):
if result := self._findKey(v, key):
return result
return None
def _retry_operation(self, operation, retries=3, delay=2):
"""Retries the operation if it fails."""
attempt = 0
while attempt < retries:
try:
return operation() # Attempt the operation
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
time.sleep(delay) # Wait before retrying
attempt += 1
# If all attempts fail, raise the last exception
raise Exception(f"Operation failed after {retries} attempts.")

View file

@ -1,9 +1,19 @@
import os
import sys
import zipfile
import shutil
from typing import Any, Union
import io
import os
from typing import BinaryIO, Any
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
from .._exceptions import UnsupportedFormatException, FileConversionException
ACCEPTED_MIME_TYPE_PREFIXES = [
"application/zip",
]
ACCEPTED_FILE_EXTENSIONS = [".zip"]
class ZipConverter(DocumentConverter):
@ -46,95 +56,59 @@ class ZipConverter(DocumentConverter):
"""
def __init__(
self, priority: float = DocumentConverter.PRIORITY_SPECIFIC_FILE_FORMAT
self,
priority: float = DocumentConverter.PRIORITY_SPECIFIC_FILE_FORMAT,
*,
markitdown: Any,
):
super().__init__(priority=priority)
self._markitdown = markitdown
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
return False
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
# Bail if not a ZIP
extension = kwargs.get("file_extension", "")
if extension.lower() != ".zip":
return None
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
file_path = stream_info.url or stream_info.local_path or stream_info.file_name
md_content = f"Content from the zip file `{file_path}`:\n\n"
# Get parent converters list if available
parent_converters = kwargs.get("_parent_converters", [])
if not parent_converters:
return DocumentConverterResult(
markdown=f"[ERROR] No converters available to process zip contents from: {local_path}",
)
with zipfile.ZipFile(file_stream, "r") as zipObj:
for name in zipObj.namelist():
try:
z_file_stream = io.BytesIO(zipObj.read(name))
z_file_stream_info = StreamInfo(
extension=os.path.splitext(name)[1],
filename=os.path.basename(name),
)
result = self._markitdown.convert_stream(
stream=z_file_stream,
stream_info=z_file_stream_info,
)
if result is not None:
md_content += f"## File: {name}\n\n"
md_content += result.markdown + "\n\n"
except UnsupportedFormatException:
pass
except FileConversionException:
pass
extracted_zip_folder_name = (
f"extracted_{os.path.basename(local_path).replace('.zip', '_zip')}"
)
extraction_dir = os.path.normpath(
os.path.join(os.path.dirname(local_path), extracted_zip_folder_name)
)
md_content = f"Content from the zip file `{os.path.basename(local_path)}`:\n\n"
try:
# Extract the zip file safely
with zipfile.ZipFile(local_path, "r") as zipObj:
# Bail if we discover it's an Office OOXML file
if "[Content_Types].xml" in zipObj.namelist():
return None
# Safeguard against path traversal
for member in zipObj.namelist():
member_path = os.path.normpath(os.path.join(extraction_dir, member))
if (
not os.path.commonprefix([extraction_dir, member_path])
== extraction_dir
):
raise ValueError(
f"Path traversal detected in zip file: {member}"
)
# Extract all files safely
zipObj.extractall(path=extraction_dir)
# Process each extracted file
for root, dirs, files in os.walk(extraction_dir):
for name in files:
file_path = os.path.join(root, name)
relative_path = os.path.relpath(file_path, extraction_dir)
# Get file extension
_, file_extension = os.path.splitext(name)
# Update kwargs for the file
file_kwargs = kwargs.copy()
file_kwargs["file_extension"] = file_extension
file_kwargs["_parent_converters"] = parent_converters
# Try converting the file using available converters
for converter in parent_converters:
# Skip the zip converter to avoid infinite recursion
if isinstance(converter, ZipConverter):
continue
result = converter.convert(file_path, **file_kwargs)
if result is not None:
md_content += f"\n## File: {relative_path}\n\n"
md_content += result.markdown + "\n\n"
break
# Clean up extracted files if specified
if kwargs.get("cleanup_extracted", True):
shutil.rmtree(extraction_dir)
return DocumentConverterResult(markdown=md_content.strip())
except zipfile.BadZipFile:
return DocumentConverterResult(
markdown=f"[ERROR] Invalid or corrupted zip file: {local_path}",
)
except ValueError as ve:
return DocumentConverterResult(
markdown=f"[ERROR] Security error in zip file {local_path}: {str(ve)}",
)
except Exception as e:
return DocumentConverterResult(
markdown=f"[ERROR] Failed to process zip file {local_path}: {str(e)}",
)
return DocumentConverterResult(markdown=md_content.strip())

Binary file not shown.

View file

@ -2,6 +2,7 @@
import io
import os
import shutil
import openai
import pytest
import requests
@ -289,7 +290,6 @@ def test_markitdown_remote() -> None:
assert test_string in result.text_content
# Youtube
# TODO: This test randomly fails for some reason. Haven't been able to repro it yet. Disabling until I can debug the issue
result = markitdown.convert(YOUTUBE_TEST_URL)
for test_string in YOUTUBE_TEST_STRINGS:
assert test_string in result.text_content
@ -298,6 +298,10 @@ def test_markitdown_remote() -> None:
def test_markitdown_local() -> None:
markitdown = MarkItDown()
# Test PDF processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.pdf"))
validate_strings(result, PDF_TEST_STRINGS)
# Test XLSX processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.xlsx"))
validate_strings(result, XLSX_TEST_STRINGS)
@ -336,10 +340,6 @@ def test_markitdown_local() -> None:
)
validate_strings(result, BLOG_TEST_STRINGS)
# Test ZIP file processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_files.zip"))
validate_strings(result, XLSX_TEST_STRINGS)
# Test Wikipedia processing
result = markitdown.convert(
os.path.join(TEST_FILES_DIR, "test_wikipedia.html"), url=WIKIPEDIA_TEST_URL
@ -360,18 +360,24 @@ def test_markitdown_local() -> None:
for test_string in RSS_TEST_STRINGS:
assert test_string in text_content
## Test non-UTF-8 encoding
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_mskanji.csv"))
validate_strings(result, CSV_CP932_TEST_STRINGS)
# Test MSG (Outlook email) processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_outlook_msg.msg"))
validate_strings(result, MSG_TEST_STRINGS)
# Test non-UTF-8 encoding
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_mskanji.csv"))
validate_strings(result, CSV_CP932_TEST_STRINGS)
# Test JSON processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.json"))
validate_strings(result, JSON_TEST_STRINGS)
# # Test ZIP file processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_files.zip"))
validate_strings(result, DOCX_TEST_STRINGS)
validate_strings(result, XLSX_TEST_STRINGS)
validate_strings(result, BLOG_TEST_STRINGS)
# Test input from a stream
input_data = b"<html><body><h1>Test</h1></body></html>"
result = markitdown.convert_stream(io.BytesIO(input_data))
@ -441,7 +447,6 @@ def test_markitdown_llm() -> None:
markitdown = MarkItDown(llm_client=client, llm_model="gpt-4o")
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_llm.jpg"))
for test_string in LLM_TEST_STRINGS:
assert test_string in result.text_content
@ -450,6 +455,14 @@ def test_markitdown_llm() -> None:
for test_string in ["red", "circle", "blue", "square"]:
assert test_string in result.text_content.lower()
# Images embedded in PPTX files
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.pptx"))
# LLM Captions are included
for test_string in LLM_TEST_STRINGS:
assert test_string in result.text_content
# Standard alt text is included
validate_strings(result, PPTX_TEST_STRINGS)
if __name__ == "__main__":
"""Runs this file's tests from the command line."""
@ -457,7 +470,7 @@ if __name__ == "__main__":
test_stream_info_guesses()
test_markitdown_remote()
test_markitdown_local()
# test_exceptions()
# test_markitdown_exiftool()
# test_markitdown_llm()
test_exceptions()
test_markitdown_exiftool()
test_markitdown_llm()
print("All tests passed!")