Experimeting with new signaures.

This commit is contained in:
Adam Fourney 2025-03-03 23:01:16 -08:00
parent e43632b048
commit 7bc6d827ee
5 changed files with 519 additions and 221 deletions

View file

@ -5,6 +5,7 @@
from .__about__ import __version__
from ._markitdown import MarkItDown
from ._base_converter import DocumentConverterResult, BaseDocumentConverter
from ._stream_info import StreamInfo
from ._exceptions import (
MarkItDownException,
MissingDependencyException,
@ -25,4 +26,5 @@ __all__ = [
"FailedConversionAttempt",
"FileConversionException",
"UnsupportedFormatException",
"StreamInfo",
]

View file

@ -1,3 +1,4 @@
from ._stream_info import StreamInfo
from typing import Any, Union, BinaryIO, Optional
@ -13,6 +14,9 @@ class DocumentConverterResult:
"""
Initialize the DocumentConverterResult.
The only required parameter is the converted Markdown text.
The title, and any other metadata that may be added in the future, are optional.
Parameters:
- markdown: The converted Markdown text.
- title: Optional title of the document.
@ -72,27 +76,25 @@ class BaseDocumentConverter:
def convert(
self,
file_stream,
*,
mime_type: str = "application/octet-stream",
file_extension: Optional[str] = None,
charset: Optional[str] = None,
**kwargs: Any,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> Union[None, DocumentConverterResult]:
"""
Convert a document to Markdown text, or return None if the converter
cannot handle the document (causing the next converter to be tried).
The determination of whether a converter can handle a document is primarily based on
the provided MIME type. The file extension can serve as a secondary check if the
MIME type is not sufficiently specific (e.g., application/octet-stream). Finally, the
chatset is used to determine the encoding of the file content in cases of text/*
the provided `stream_info.mimetype`. The field `stream_info.extension` can serve as
a secondary check if the MIME type is not sufficiently specific
(e.g., application/octet-stream). In the case of data retreived via HTTP, the
`steam_info.url` might also be referenced to guide conversion (e.g., special-handling
for Wikipedia). Finally, the `stream_info.chatset` is used to determine the encoding
of the file content in cases of text/*
Prameters:
- file_stream: The file-like object to convert. Must support seek(), tell(), and read() methods.
- mime_type: The MIME type of the file. Default is "application/octet-stream".
- file_extension: The file extension of the file. Default is None.
- charset: The character set of the file. Default is None.
- stream_info: The StreamInfo object containing metadata about the file (mimetype, extension, charset, set)
- kwargs: Additional keyword arguments for the converter.
Returns:

View file

@ -6,8 +6,9 @@ import sys
import tempfile
import warnings
import traceback
import io
from importlib.metadata import entry_points
from typing import Any, List, Optional, Union
from typing import Any, List, Optional, Union, BinaryIO
from pathlib import Path
from urllib.parse import urlparse
from warnings import warn
@ -16,6 +17,8 @@ from warnings import warn
import puremagic
import requests
from ._stream_info import StreamInfo
from .converters import (
DocumentConverter,
PlainTextConverter,
@ -175,12 +178,17 @@ class MarkItDown:
warn("Plugins converters are already enabled.", RuntimeWarning)
def convert(
self, source: Union[str, requests.Response, Path], **kwargs: Any
self,
source: Union[str, requests.Response, Path, BinaryIO],
*,
stream_info: Optional[StreamInfo] = None,
**kwargs: Any,
) -> DocumentConverterResult: # TODO: deal with kwargs
"""
Args:
- source: can be a string representing a path either as string pathlib path object or url, or a requests.response object
- extension: specifies the file extension to use when interpreting the file. If None, infer from source (path, uri, content-type, etc.)
- source: can be a path (str or Path), url, or a requests.response object
- stream_info: optional stream info to use for the conversion. If None, infer from source
- kwargs: additional arguments to pass to the converter
"""
# Local path or url
@ -192,68 +200,112 @@ class MarkItDown:
):
return self.convert_url(source, **kwargs)
else:
return self.convert_local(source, **kwargs)
return self.convert_local(source, stream_info=stream_info, **kwargs)
# Path object
elif isinstance(source, Path):
return self.convert_local(source, stream_info=stream_info, **kwargs)
# Request response
elif isinstance(source, requests.Response):
return self.convert_response(source, **kwargs)
elif isinstance(source, Path):
return self.convert_local(source, **kwargs)
# Binary stream
elif (
hasattr(source, "read")
and callable(source.read)
and not isinstance(source, io.TextIOBase)
):
return self.convert_stream(source, **kwargs)
else:
raise TypeError(
f"Invalid source type: {type(source)}. Expected str, requests.Response, BinaryIO."
)
def convert_local(
self, path: Union[str, Path], **kwargs: Any
) -> DocumentConverterResult: # TODO: deal with kwargs
self,
path: Union[str, Path],
*,
stream_info: Optional[StreamInfo] = None,
file_extension: Optional[str] = None, # Deprecated -- use stream_info
url: Optional[str] = None, # Deprecated -- use stream_info
**kwargs: Any,
) -> DocumentConverterResult:
if isinstance(path, Path):
path = str(path)
# Prepare a list of extensions to try (in order of priority)
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
# Get extension alternatives from the path and puremagic
base, ext = os.path.splitext(path)
self._append_ext(extensions, ext)
# Build a base StreamInfo object from which to start guesses
base_stream_info = StreamInfo(
local_path=path,
extension=os.path.splitext(path)[1],
filename=os.path.basename(path),
)
for g in self._guess_ext_magic(path):
self._append_ext(extensions, g)
# Extend the base_stream_info with any additional info from the arguments
if stream_info is not None:
base_stream_info = base_stream_info.copy_and_update(stream_info)
# Convert
return self._convert(path, extensions, **kwargs)
if file_extension is not None:
# Deprecated -- use stream_info
base_stream_info = base_stream_info.copy_and_update(
extension=file_extension
)
if url is not None:
# Deprecated -- use stream_info
base_stream_info = base_stream_info.copy_and_update(url=url)
with open(path, "rb") as fh:
# Prepare a list of configurations to try, starting with the base_stream_info
guesses: List[StreamInfo] = [base_stream_info]
for guess in StreamInfo.guess_from_stream(
file_stream=fh, filename_hint=path
):
guesses.append(base_stream_info.copy_and_update(guess))
return self._convert(file_stream=fh, stream_info_guesses=guesses, **kwargs)
# TODO what should stream's type be?
def convert_stream(
self, stream: Any, **kwargs: Any
) -> DocumentConverterResult: # TODO: deal with kwargs
# Prepare a list of extensions to try (in order of priority)
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
self,
stream: BinaryIO,
*,
stream_info: Optional[StreamInfo] = None,
file_extension: Optional[str] = None, # Deprecated -- use stream_info
url: Optional[str] = None, # Deprecated -- use stream_info
**kwargs: Any,
) -> DocumentConverterResult:
guesses: List[StreamInfo] = []
# Save the file locally to a temporary file. It will be deleted before this method exits
handle, temp_path = tempfile.mkstemp()
fh = os.fdopen(handle, "wb")
result = None
try:
# Write to the temporary file
content = stream.read()
if isinstance(content, str):
fh.write(content.encode("utf-8"))
else:
fh.write(content)
fh.close()
# Do we have anything on which to base a guess?
base_guess = None
if stream_info is not None or file_extension is not None or url is not None:
base_guess = stream_info if stream_info is not None else StreamInfo()
if file_extension is not None:
# Deprecated -- use stream_info
base_guess = base_guess.copy_and_update(extension=file_extension)
if url is not None:
# Deprecated -- use stream_info
base_guess = base_guess.copy_and_update(url=url)
# Use puremagic to check for more extension options
for g in self._guess_ext_magic(temp_path):
self._append_ext(extensions, g)
# Append the base guess, if it's non-trivial
if base_guess is not None:
if base_guess.mimetype is not None or base_guess.extension is not None:
guesses.append(base_guess)
else:
# Create a base guess with no information
base_guess = StreamInfo()
# Convert
result = self._convert(temp_path, extensions, **kwargs)
# Clean up
finally:
try:
fh.close()
except Exception:
pass
os.unlink(temp_path)
# Create a placeholder filename to help with guessing
placeholder_filename = None
if base_guess.filename is not None:
placeholder_filename = base_guess.filename
elif base_guess.extension is not None:
placeholder_filename = "placeholder" + base_guess.extension
return result
# Add guesses based on stream content
for guess in StreamInfo.guess_from_stream(
file_stream=stream, filename_hint=placeholder_filename
):
guesses.append(base_guess.copy_and_update(guess))
# Perform the conversion
return self._convert(file_stream=stream, stream_info_guesses=guesses, **kwargs)
def convert_url(
self, url: str, **kwargs: Any
@ -264,173 +316,197 @@ class MarkItDown:
return self.convert_response(response, **kwargs)
def convert_response(
self, response: requests.Response, **kwargs: Any
) -> DocumentConverterResult: # TODO fix kwargs type
# Prepare a list of extensions to try (in order of priority)
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
# Guess from the mimetype
content_type = response.headers.get("content-type", "").split(";")[0]
self._append_ext(extensions, mimetypes.guess_extension(content_type))
# Read the content disposition if there is one
content_disposition = response.headers.get("content-disposition", "")
m = re.search(r"filename=([^;]+)", content_disposition)
if m:
base, ext = os.path.splitext(m.group(1).strip("\"'"))
self._append_ext(extensions, ext)
# Read from the extension from the path
base, ext = os.path.splitext(urlparse(response.url).path)
self._append_ext(extensions, ext)
# Save the file locally to a temporary file. It will be deleted before this method exits
handle, temp_path = tempfile.mkstemp()
fh = os.fdopen(handle, "wb")
result = None
try:
# Download the file
for chunk in response.iter_content(chunk_size=512):
fh.write(chunk)
fh.close()
# Use puremagic to check for more extension options
for g in self._guess_ext_magic(temp_path):
self._append_ext(extensions, g)
# Convert
result = self._convert(temp_path, extensions, url=response.url, **kwargs)
# Clean up
finally:
try:
fh.close()
except Exception:
pass
os.unlink(temp_path)
return result
def _convert(
self, local_path: str, extensions: List[Union[str, None]], **kwargs
self,
response: requests.Response,
*,
stream_info: Optional[StreamInfo] = None,
file_extension: Optional[str] = None, # Deprecated -- use stream_info
url: Optional[str] = None, # Deprecated -- use stream_info
**kwargs: Any,
) -> DocumentConverterResult:
res: Union[None, DocumentConverterResult] = None
# If there is a content-type header, get the mimetype and charset (if present)
mimetype: Optional[str] = None
charset: Optional[str] = None
# Keep track of which converters throw exceptions
failed_attempts: List[FailedConversionAttempt] = []
if "content-type" in response.headers:
parts = response.headers["content-type"].split(";")
mimetype = parts.pop(0).strip()
for part in parts:
if part.strip().startswith("charset="):
_charset = part.split("=")[1].strip()
if len(_charset) > 0:
charset = _charset
# Create a copy of the page_converters list, sorted by priority.
# We do this with each call to _convert because the priority of converters may change between calls.
# The sort is guaranteed to be stable, so converters with the same priority will remain in the same order.
sorted_converters = sorted(self._page_converters, key=lambda x: x.priority)
# If there is a content-disposition header, get the filename and possibly the extension
filename: Optional[str] = None
extension: Optional[str] = None
if "content-disposition" in response.headers:
m = re.search(r"filename=([^;]+)", response.headers["content-disposition"])
if m:
filename = m.group(1).strip("\"'")
_, _extension = os.path.splitext(filename)
if len(_extension) > 0:
extension = _extension
for ext in extensions + [None]: # Try last with no extension
for converter in sorted_converters:
_kwargs = copy.deepcopy(kwargs)
# If there is still no filename, try to read it from the url
if filename is None:
parsed_url = urlparse(response.url)
_, _extension = os.path.splitext(parsed_url.path)
if len(_extension) > 0: # Looks like this might be a file!
filename = os.path.basename(parsed_url.path)
extension = _extension
# Overwrite file_extension appropriately
if ext is None:
if "file_extension" in _kwargs:
del _kwargs["file_extension"]
else:
_kwargs.update({"file_extension": ext})
# Copy any additional global options
if "llm_client" not in _kwargs and self._llm_client is not None:
_kwargs["llm_client"] = self._llm_client
if "llm_model" not in _kwargs and self._llm_model is not None:
_kwargs["llm_model"] = self._llm_model
if "style_map" not in _kwargs and self._style_map is not None:
_kwargs["style_map"] = self._style_map
if "exiftool_path" not in _kwargs and self._exiftool_path is not None:
_kwargs["exiftool_path"] = self._exiftool_path
# Add the list of converters for nested processing
_kwargs["_parent_converters"] = self._page_converters
# If we hit an error log it and keep trying
try:
res = converter.convert(local_path, **_kwargs)
except Exception:
failed_attempts.append(
FailedConversionAttempt(
converter=converter, exc_info=sys.exc_info()
)
)
if res is not None:
# Normalize the content
res.text_content = "\n".join(
[line.rstrip() for line in re.split(r"\r?\n", res.text_content)]
)
res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content)
# Todo
return res
# If we got this far without success, report any exceptions
if len(failed_attempts) > 0:
raise FileConversionException(attempts=failed_attempts)
# Nothing can handle it!
raise UnsupportedFormatException(
f"Could not convert '{local_path}' to Markdown. No converter attempted a conversion, suggesting that the filetype is simply not supported."
# Create an initial guess from all this information
base_guess = StreamInfo(
mimetype=mimetype,
charset=charset,
filename=filename,
extension=extension,
url=response.url,
)
def _append_ext(self, extensions, ext):
"""Append a unique non-None, non-empty extension to a list of extensions."""
if ext is None:
return
ext = ext.strip()
if ext == "":
return
if ext in extensions:
return
extensions.append(ext)
# Update with any additional info from the arguments
if stream_info is not None:
base_guess = base_guess.copy_and_update(stream_info)
if file_extension is not None:
# Deprecated -- use stream_info
base_guess = base_guess.copy_and_update(extension=file_extension)
if url is not None:
# Deprecated -- use stream_info
base_guess = base_guess.copy_and_update(url=url)
# Add the guess if its non-trivial
guesses: List[StreamInfo] = []
if base_guess.mimetype is not None or base_guess.extension is not None:
guesses.append(base_guess)
# Read into BytesIO
buffer = io.BytesIO()
for chunk in response.iter_content(chunk_size=512):
buffer.write(chunk)
buffer.seek(0)
# Create a placeholder filename to help with guessing
placeholder_filename = None
if base_guess.filename is not None:
placeholder_filename = base_guess.filename
elif base_guess.extension is not None:
placeholder_filename = "placeholder" + base_guess.extension
# Add guesses based on stream content
for guess in StreamInfo.guess_from_stream(
file_stream=buffer, filename_hint=placeholder_filename
):
guesses.append(base_guess.copy_and_update(guess))
# Convert
return self._convert(file_stream=buffer, stream_info_guesses=guesses, **kwargs)
def _convert(
self, *, file_stream: BinaryIO, stream_info_guesses: List[StreamInfo], **kwargs
) -> DocumentConverterResult:
# Lazily create a temporary file, if needed, for backward compatibility
# This is to support a deprecated feature, and will be removed in the future
temp_file = None
def get_temp_file():
nonlocal temp_file
if temp_file is not None:
return temp_file
else:
cur_pos = file_stream.tell()
handle, temp_file = tempfile.mkstemp()
fh = os.fdopen(handle, "wb")
file_stream.seek(0)
fh.write(file_stream.read())
file_stream.seek(cur_pos)
fh.close()
return temp_file
def _guess_ext_magic(self, path):
"""Use puremagic (a Python implementation of libmagic) to guess a file's extension based on the first few bytes."""
# Use puremagic to guess
try:
guesses = puremagic.magic_file(path)
res: Union[None, DocumentConverterResult] = None
# Fix for: https://github.com/microsoft/markitdown/issues/222
# If there are no guesses, then try again after trimming leading ASCII whitespaces.
# ASCII whitespace characters are those byte values in the sequence b' \t\n\r\x0b\f'
# (space, tab, newline, carriage return, vertical tab, form feed).
if len(guesses) == 0:
with open(path, "rb") as file:
while True:
char = file.read(1)
if not char: # End of file
break
if not char.isspace():
file.seek(file.tell() - 1)
break
try:
guesses = puremagic.magic_stream(file)
except puremagic.main.PureError:
pass
# Keep track of which converters throw exceptions
failed_attempts: List[FailedConversionAttempt] = []
extensions = list()
for g in guesses:
ext = g.extension.strip()
if len(ext) > 0:
if not ext.startswith("."):
ext = "." + ext
if ext not in extensions:
extensions.append(ext)
return extensions
except FileNotFoundError:
pass
except IsADirectoryError:
pass
except PermissionError:
pass
return []
# Create a copy of the page_converters list, sorted by priority.
# We do this with each call to _convert because the priority of converters may change between calls.
# The sort is guaranteed to be stable, so converters with the same priority will remain in the same order.
sorted_converters = sorted(self._page_converters, key=lambda x: x.priority)
for file_info in stream_info_guesses + [None]:
for converter in sorted_converters:
_kwargs = copy.deepcopy(kwargs)
# Copy any additional global options
if "llm_client" not in _kwargs and self._llm_client is not None:
_kwargs["llm_client"] = self._llm_client
if "llm_model" not in _kwargs and self._llm_model is not None:
_kwargs["llm_model"] = self._llm_model
if "style_map" not in _kwargs and self._style_map is not None:
_kwargs["style_map"] = self._style_map
if (
"exiftool_path" not in _kwargs
and self._exiftool_path is not None
):
_kwargs["exiftool_path"] = self._exiftool_path
# Add the list of converters for nested processing
_kwargs["_parent_converters"] = self._page_converters
# Add backwards compatibility
if isinstance(converter, DocumentConverter):
if file_info is not None:
# Legacy converters need a file_extension
if file_info.extension is not None:
_kwargs["file_extension"] = file_info.extension
# And benefit from urls, when available
if file_info.url is not None:
_kwargs["url"] = file_info.url
try:
res = converter.convert(get_temp_file(), **_kwargs)
except Exception:
failed_attempts.append(
FailedConversionAttempt(
converter=converter, exc_info=sys.exc_info()
)
)
else:
raise NotImplementedError("TODO")
if res is not None:
# Normalize the content
res.text_content = "\n".join(
[
line.rstrip()
for line in re.split(r"\r?\n", res.text_content)
]
)
res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content)
return res
# If we got this far without success, report any exceptions
if len(failed_attempts) > 0:
raise FileConversionException(attempts=failed_attempts)
# Nothing can handle it!
raise UnsupportedFormatException(
f"Could not convert stream to Markdown. No converter attempted a conversion, suggesting that the filetype is simply not supported."
)
finally:
# Clean up the temporary file
if temp_file is not None:
try:
os.unlink(temp_file)
except Exception:
pass
def register_page_converter(self, converter: DocumentConverter) -> None:
"""DEPRECATED: User register_converter instead."""

View file

@ -0,0 +1,105 @@
import puremagic
from dataclasses import dataclass, asdict
from typing import Optional, BinaryIO, List, TypeVar, Type
# This is a workaround for Self not being available in Python 3.10
T = TypeVar("T", bound="StreamInfo")
# Mimetype substitutions table
MIMETYPE_SUBSTITUTIONS = {
"application/excel": "application/vnd.ms-excel",
"application/mspowerpoint": "application/vnd.ms-powerpoint",
}
@dataclass(kw_only=True, frozen=True)
class StreamInfo:
"""The StreamInfo class is used to store information about a file stream.
All fields can be None, and will depend on how the stream was opened.
"""
mimetype: Optional[str] = None
extension: Optional[str] = None
charset: Optional[str] = None
filename: Optional[
str
] = None # From local path, url, or Content-Disposition header
local_path: Optional[str] = None # If read from disk
url: Optional[str] = None # If read from url
def copy_and_update(self, *args, **kwargs):
"""Copy the StreamInfo object and update it with the given StreamInfo
instance and/or other keyword arguments."""
new_info = asdict(self)
for si in args:
assert isinstance(si, StreamInfo)
new_info.update({k: v for k, v in asdict(si).items() if v is not None})
if len(kwargs) > 0:
new_info.update(kwargs)
return StreamInfo(**new_info)
@classmethod
def guess_from_stream(
cls: Type[T], file_stream: BinaryIO, *, filename_hint: Optional[str] = None
) -> List[T]:
"""
Guess StreamInfo properties (mostly mimetype and extension) from a stream.
Args:
- stream: The stream to guess the StreamInfo from.
- filename_hint [Optional]: A filename hint to help with the guessing (may be a placeholder, and not actually be the file name)
Returns a list of StreamInfo objects in order of confidence.
"""
guesses: List[StreamInfo] = []
def _puremagic(
file_stream, filename_hint
) -> puremagic.main.PureMagicWithConfidence:
"""Wrap guesses to handle exceptions."""
try:
return puremagic.magic_stream(file_stream, filename=filename_hint)
except puremagic.main.PureError as e:
return []
cur_pos = file_stream.tell()
type_guesses = _puremagic(file_stream, filename_hint=filename_hint)
if len(type_guesses) == 0:
# Fix for: https://github.com/microsoft/markitdown/issues/222
# If there are no guesses, then try again after trimming leading ASCII whitespaces.
# ASCII whitespace characters are those byte values in the sequence b' \t\n\r\x0b\f'
# (space, tab, newline, carriage return, vertical tab, form feed).
# Eat all the leading whitespace
file_stream.seek(cur_pos)
while True:
char = file_stream.read(1)
if not char: # End of file
break
if not char.isspace():
file_stream.seek(file_stream.tell() - 1)
break
# Try again
type_guesses = _puremagic(file_stream, filename_hint=filename_hint)
file_stream.seek(cur_pos)
# Convert and return the guesses
for guess in type_guesses:
kwargs: dict[str, str] = {}
if guess.extension:
kwargs["extension"] = guess.extension
if guess.mime_type:
kwargs["mimetype"] = MIMETYPE_SUBSTITUTIONS.get(
guess.mime_type, guess.mime_type
)
if len(kwargs) > 0:
# We don't add the filename_hint, because sometimes it's just a placeholder,
# and, in any case, doesn't add new information.
guesses.append(cls(**kwargs))
# Return the guesses
return guesses

View file

@ -8,7 +8,12 @@ import requests
from warnings import catch_warnings, resetwarnings
from markitdown import MarkItDown, UnsupportedFormatException, FileConversionException
from markitdown import (
MarkItDown,
UnsupportedFormatException,
FileConversionException,
StreamInfo,
)
skip_remote = (
True if os.environ.get("GITHUB_ACTIONS") else False
@ -162,6 +167,107 @@ def validate_strings(result, expected_strings, exclude_strings=None):
assert string not in text_content
def test_stream_info_operations() -> None:
"""Test operations performed on StreamInfo objects."""
stream_info_original = StreamInfo(
mimetype="mimetype.1",
extension="extension.1",
charset="charset.1",
filename="filename.1",
local_path="local_path.1",
url="url.1",
)
# Check updating all attributes by keyword
keywords = ["mimetype", "extension", "charset", "filename", "local_path", "url"]
for keyword in keywords:
updated_stream_info = stream_info_original.copy_and_update(
**{keyword: f"{keyword}.2"}
)
# Make sure the targted attribute is updated
assert getattr(updated_stream_info, keyword) == f"{keyword}.2"
# Make sure the other attributes are unchanged
for k in keywords:
if k != keyword:
assert getattr(stream_info_original, k) == getattr(
updated_stream_info, k
)
# Check updating all attributes by passing a new StreamInfo object
keywords = ["mimetype", "extension", "charset", "filename", "local_path", "url"]
for keyword in keywords:
updated_stream_info = stream_info_original.copy_and_update(
StreamInfo(**{keyword: f"{keyword}.2"})
)
# Make sure the targted attribute is updated
assert getattr(updated_stream_info, keyword) == f"{keyword}.2"
# Make sure the other attributes are unchanged
for k in keywords:
if k != keyword:
assert getattr(stream_info_original, k) == getattr(
updated_stream_info, k
)
# Check mixing and matching
updated_stream_info = stream_info_original.copy_and_update(
StreamInfo(extension="extension.2", filename="filename.2"),
mimetype="mimetype.3",
charset="charset.3",
)
assert updated_stream_info.extension == "extension.2"
assert updated_stream_info.filename == "filename.2"
assert updated_stream_info.mimetype == "mimetype.3"
assert updated_stream_info.charset == "charset.3"
assert updated_stream_info.local_path == "local_path.1"
assert updated_stream_info.url == "url.1"
# Check multiple StreamInfo objects
updated_stream_info = stream_info_original.copy_and_update(
StreamInfo(extension="extension.4", filename="filename.5"),
StreamInfo(mimetype="mimetype.6", charset="charset.7"),
)
assert updated_stream_info.extension == "extension.4"
assert updated_stream_info.filename == "filename.5"
assert updated_stream_info.mimetype == "mimetype.6"
assert updated_stream_info.charset == "charset.7"
assert updated_stream_info.local_path == "local_path.1"
assert updated_stream_info.url == "url.1"
def test_stream_info_guesses() -> None:
"""Test StreamInfo guesses based on stream content."""
test_tuples = [
(
os.path.join(TEST_FILES_DIR, "test.xlsx"),
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
),
(
os.path.join(TEST_FILES_DIR, "test.docx"),
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
),
(
os.path.join(TEST_FILES_DIR, "test.pptx"),
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
),
(os.path.join(TEST_FILES_DIR, "test.xls"), "application/vnd.ms-excel"),
]
for file_path, expected_mimetype in test_tuples:
with open(file_path, "rb") as f:
guesses = StreamInfo.guess_from_stream(
f, filename_hint=os.path.basename(file_path)
)
assert len(guesses) > 0
assert guesses[0].mimetype == expected_mimetype
assert guesses[0].extension == os.path.splitext(file_path)[1]
@pytest.mark.skipif(
skip_remote,
reason="do not run tests that query external urls",
@ -266,6 +372,11 @@ def test_markitdown_local() -> None:
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.json"))
validate_strings(result, JSON_TEST_STRINGS)
# Test input from a stream
input_data = b"<html><body><h1>Test</h1></body></html>"
result = markitdown.convert_stream(io.BytesIO(input_data))
assert "# Test" in result.text_content
# Test input with leading blank characters
input_data = b" \n\n\n<html><body><h1>Test</h1></body></html>"
result = markitdown.convert_stream(io.BytesIO(input_data))
@ -342,9 +453,11 @@ def test_markitdown_llm() -> None:
if __name__ == "__main__":
"""Runs this file's tests from the command line."""
test_stream_info_operations()
test_stream_info_guesses()
test_markitdown_remote()
test_markitdown_local()
test_exceptions()
test_markitdown_exiftool()
# test_exceptions()
# test_markitdown_exiftool()
# test_markitdown_llm()
print("All tests passed!")