diff --git a/packages/markitdown/src/markitdown/__init__.py b/packages/markitdown/src/markitdown/__init__.py index 620e2b0..fb14feb 100644 --- a/packages/markitdown/src/markitdown/__init__.py +++ b/packages/markitdown/src/markitdown/__init__.py @@ -5,6 +5,7 @@ from .__about__ import __version__ from ._markitdown import MarkItDown from ._base_converter import DocumentConverterResult, BaseDocumentConverter +from ._stream_info import StreamInfo from ._exceptions import ( MarkItDownException, MissingDependencyException, @@ -25,4 +26,5 @@ __all__ = [ "FailedConversionAttempt", "FileConversionException", "UnsupportedFormatException", + "StreamInfo", ] diff --git a/packages/markitdown/src/markitdown/_base_converter.py b/packages/markitdown/src/markitdown/_base_converter.py index 470ff74..7cd945f 100644 --- a/packages/markitdown/src/markitdown/_base_converter.py +++ b/packages/markitdown/src/markitdown/_base_converter.py @@ -1,3 +1,4 @@ +from ._stream_info import StreamInfo from typing import Any, Union, BinaryIO, Optional @@ -13,6 +14,9 @@ class DocumentConverterResult: """ Initialize the DocumentConverterResult. + The only required parameter is the converted Markdown text. + The title, and any other metadata that may be added in the future, are optional. + Parameters: - markdown: The converted Markdown text. - title: Optional title of the document. @@ -72,27 +76,25 @@ class BaseDocumentConverter: def convert( self, - file_stream, - *, - mime_type: str = "application/octet-stream", - file_extension: Optional[str] = None, - charset: Optional[str] = None, - **kwargs: Any, + file_stream: BinaryIO, + stream_info: StreamInfo, + **kwargs: Any, # Options to pass to the converter ) -> Union[None, DocumentConverterResult]: """ Convert a document to Markdown text, or return None if the converter cannot handle the document (causing the next converter to be tried). The determination of whether a converter can handle a document is primarily based on - the provided MIME type. The file extension can serve as a secondary check if the - MIME type is not sufficiently specific (e.g., application/octet-stream). Finally, the - chatset is used to determine the encoding of the file content in cases of text/* + the provided `stream_info.mimetype`. The field `stream_info.extension` can serve as + a secondary check if the MIME type is not sufficiently specific + (e.g., application/octet-stream). In the case of data retreived via HTTP, the + `steam_info.url` might also be referenced to guide conversion (e.g., special-handling + for Wikipedia). Finally, the `stream_info.chatset` is used to determine the encoding + of the file content in cases of text/* Prameters: - file_stream: The file-like object to convert. Must support seek(), tell(), and read() methods. - - mime_type: The MIME type of the file. Default is "application/octet-stream". - - file_extension: The file extension of the file. Default is None. - - charset: The character set of the file. Default is None. + - stream_info: The StreamInfo object containing metadata about the file (mimetype, extension, charset, set) - kwargs: Additional keyword arguments for the converter. Returns: diff --git a/packages/markitdown/src/markitdown/_markitdown.py b/packages/markitdown/src/markitdown/_markitdown.py index 50b64b4..2738535 100644 --- a/packages/markitdown/src/markitdown/_markitdown.py +++ b/packages/markitdown/src/markitdown/_markitdown.py @@ -6,8 +6,9 @@ import sys import tempfile import warnings import traceback +import io from importlib.metadata import entry_points -from typing import Any, List, Optional, Union +from typing import Any, List, Optional, Union, BinaryIO from pathlib import Path from urllib.parse import urlparse from warnings import warn @@ -16,6 +17,8 @@ from warnings import warn import puremagic import requests +from ._stream_info import StreamInfo + from .converters import ( DocumentConverter, PlainTextConverter, @@ -175,12 +178,17 @@ class MarkItDown: warn("Plugins converters are already enabled.", RuntimeWarning) def convert( - self, source: Union[str, requests.Response, Path], **kwargs: Any + self, + source: Union[str, requests.Response, Path, BinaryIO], + *, + stream_info: Optional[StreamInfo] = None, + **kwargs: Any, ) -> DocumentConverterResult: # TODO: deal with kwargs """ Args: - - source: can be a string representing a path either as string pathlib path object or url, or a requests.response object - - extension: specifies the file extension to use when interpreting the file. If None, infer from source (path, uri, content-type, etc.) + - source: can be a path (str or Path), url, or a requests.response object + - stream_info: optional stream info to use for the conversion. If None, infer from source + - kwargs: additional arguments to pass to the converter """ # Local path or url @@ -192,68 +200,112 @@ class MarkItDown: ): return self.convert_url(source, **kwargs) else: - return self.convert_local(source, **kwargs) + return self.convert_local(source, stream_info=stream_info, **kwargs) + # Path object + elif isinstance(source, Path): + return self.convert_local(source, stream_info=stream_info, **kwargs) # Request response elif isinstance(source, requests.Response): return self.convert_response(source, **kwargs) - elif isinstance(source, Path): - return self.convert_local(source, **kwargs) + # Binary stream + elif ( + hasattr(source, "read") + and callable(source.read) + and not isinstance(source, io.TextIOBase) + ): + return self.convert_stream(source, **kwargs) + else: + raise TypeError( + f"Invalid source type: {type(source)}. Expected str, requests.Response, BinaryIO." + ) def convert_local( - self, path: Union[str, Path], **kwargs: Any - ) -> DocumentConverterResult: # TODO: deal with kwargs + self, + path: Union[str, Path], + *, + stream_info: Optional[StreamInfo] = None, + file_extension: Optional[str] = None, # Deprecated -- use stream_info + url: Optional[str] = None, # Deprecated -- use stream_info + **kwargs: Any, + ) -> DocumentConverterResult: if isinstance(path, Path): path = str(path) - # Prepare a list of extensions to try (in order of priority) - ext = kwargs.get("file_extension") - extensions = [ext] if ext is not None else [] - # Get extension alternatives from the path and puremagic - base, ext = os.path.splitext(path) - self._append_ext(extensions, ext) + # Build a base StreamInfo object from which to start guesses + base_stream_info = StreamInfo( + local_path=path, + extension=os.path.splitext(path)[1], + filename=os.path.basename(path), + ) - for g in self._guess_ext_magic(path): - self._append_ext(extensions, g) + # Extend the base_stream_info with any additional info from the arguments + if stream_info is not None: + base_stream_info = base_stream_info.copy_and_update(stream_info) - # Convert - return self._convert(path, extensions, **kwargs) + if file_extension is not None: + # Deprecated -- use stream_info + base_stream_info = base_stream_info.copy_and_update( + extension=file_extension + ) + + if url is not None: + # Deprecated -- use stream_info + base_stream_info = base_stream_info.copy_and_update(url=url) + + with open(path, "rb") as fh: + # Prepare a list of configurations to try, starting with the base_stream_info + guesses: List[StreamInfo] = [base_stream_info] + for guess in StreamInfo.guess_from_stream( + file_stream=fh, filename_hint=path + ): + guesses.append(base_stream_info.copy_and_update(guess)) + return self._convert(file_stream=fh, stream_info_guesses=guesses, **kwargs) - # TODO what should stream's type be? def convert_stream( - self, stream: Any, **kwargs: Any - ) -> DocumentConverterResult: # TODO: deal with kwargs - # Prepare a list of extensions to try (in order of priority) - ext = kwargs.get("file_extension") - extensions = [ext] if ext is not None else [] + self, + stream: BinaryIO, + *, + stream_info: Optional[StreamInfo] = None, + file_extension: Optional[str] = None, # Deprecated -- use stream_info + url: Optional[str] = None, # Deprecated -- use stream_info + **kwargs: Any, + ) -> DocumentConverterResult: + guesses: List[StreamInfo] = [] - # Save the file locally to a temporary file. It will be deleted before this method exits - handle, temp_path = tempfile.mkstemp() - fh = os.fdopen(handle, "wb") - result = None - try: - # Write to the temporary file - content = stream.read() - if isinstance(content, str): - fh.write(content.encode("utf-8")) - else: - fh.write(content) - fh.close() + # Do we have anything on which to base a guess? + base_guess = None + if stream_info is not None or file_extension is not None or url is not None: + base_guess = stream_info if stream_info is not None else StreamInfo() + if file_extension is not None: + # Deprecated -- use stream_info + base_guess = base_guess.copy_and_update(extension=file_extension) + if url is not None: + # Deprecated -- use stream_info + base_guess = base_guess.copy_and_update(url=url) - # Use puremagic to check for more extension options - for g in self._guess_ext_magic(temp_path): - self._append_ext(extensions, g) + # Append the base guess, if it's non-trivial + if base_guess is not None: + if base_guess.mimetype is not None or base_guess.extension is not None: + guesses.append(base_guess) + else: + # Create a base guess with no information + base_guess = StreamInfo() - # Convert - result = self._convert(temp_path, extensions, **kwargs) - # Clean up - finally: - try: - fh.close() - except Exception: - pass - os.unlink(temp_path) + # Create a placeholder filename to help with guessing + placeholder_filename = None + if base_guess.filename is not None: + placeholder_filename = base_guess.filename + elif base_guess.extension is not None: + placeholder_filename = "placeholder" + base_guess.extension - return result + # Add guesses based on stream content + for guess in StreamInfo.guess_from_stream( + file_stream=stream, filename_hint=placeholder_filename + ): + guesses.append(base_guess.copy_and_update(guess)) + + # Perform the conversion + return self._convert(file_stream=stream, stream_info_guesses=guesses, **kwargs) def convert_url( self, url: str, **kwargs: Any @@ -264,173 +316,197 @@ class MarkItDown: return self.convert_response(response, **kwargs) def convert_response( - self, response: requests.Response, **kwargs: Any - ) -> DocumentConverterResult: # TODO fix kwargs type - # Prepare a list of extensions to try (in order of priority) - ext = kwargs.get("file_extension") - extensions = [ext] if ext is not None else [] - - # Guess from the mimetype - content_type = response.headers.get("content-type", "").split(";")[0] - self._append_ext(extensions, mimetypes.guess_extension(content_type)) - - # Read the content disposition if there is one - content_disposition = response.headers.get("content-disposition", "") - m = re.search(r"filename=([^;]+)", content_disposition) - if m: - base, ext = os.path.splitext(m.group(1).strip("\"'")) - self._append_ext(extensions, ext) - - # Read from the extension from the path - base, ext = os.path.splitext(urlparse(response.url).path) - self._append_ext(extensions, ext) - - # Save the file locally to a temporary file. It will be deleted before this method exits - handle, temp_path = tempfile.mkstemp() - fh = os.fdopen(handle, "wb") - result = None - try: - # Download the file - for chunk in response.iter_content(chunk_size=512): - fh.write(chunk) - fh.close() - - # Use puremagic to check for more extension options - for g in self._guess_ext_magic(temp_path): - self._append_ext(extensions, g) - - # Convert - result = self._convert(temp_path, extensions, url=response.url, **kwargs) - # Clean up - finally: - try: - fh.close() - except Exception: - pass - os.unlink(temp_path) - - return result - - def _convert( - self, local_path: str, extensions: List[Union[str, None]], **kwargs + self, + response: requests.Response, + *, + stream_info: Optional[StreamInfo] = None, + file_extension: Optional[str] = None, # Deprecated -- use stream_info + url: Optional[str] = None, # Deprecated -- use stream_info + **kwargs: Any, ) -> DocumentConverterResult: - res: Union[None, DocumentConverterResult] = None + # If there is a content-type header, get the mimetype and charset (if present) + mimetype: Optional[str] = None + charset: Optional[str] = None - # Keep track of which converters throw exceptions - failed_attempts: List[FailedConversionAttempt] = [] + if "content-type" in response.headers: + parts = response.headers["content-type"].split(";") + mimetype = parts.pop(0).strip() + for part in parts: + if part.strip().startswith("charset="): + _charset = part.split("=")[1].strip() + if len(_charset) > 0: + charset = _charset - # Create a copy of the page_converters list, sorted by priority. - # We do this with each call to _convert because the priority of converters may change between calls. - # The sort is guaranteed to be stable, so converters with the same priority will remain in the same order. - sorted_converters = sorted(self._page_converters, key=lambda x: x.priority) + # If there is a content-disposition header, get the filename and possibly the extension + filename: Optional[str] = None + extension: Optional[str] = None + if "content-disposition" in response.headers: + m = re.search(r"filename=([^;]+)", response.headers["content-disposition"]) + if m: + filename = m.group(1).strip("\"'") + _, _extension = os.path.splitext(filename) + if len(_extension) > 0: + extension = _extension - for ext in extensions + [None]: # Try last with no extension - for converter in sorted_converters: - _kwargs = copy.deepcopy(kwargs) + # If there is still no filename, try to read it from the url + if filename is None: + parsed_url = urlparse(response.url) + _, _extension = os.path.splitext(parsed_url.path) + if len(_extension) > 0: # Looks like this might be a file! + filename = os.path.basename(parsed_url.path) + extension = _extension - # Overwrite file_extension appropriately - if ext is None: - if "file_extension" in _kwargs: - del _kwargs["file_extension"] - else: - _kwargs.update({"file_extension": ext}) - - # Copy any additional global options - if "llm_client" not in _kwargs and self._llm_client is not None: - _kwargs["llm_client"] = self._llm_client - - if "llm_model" not in _kwargs and self._llm_model is not None: - _kwargs["llm_model"] = self._llm_model - - if "style_map" not in _kwargs and self._style_map is not None: - _kwargs["style_map"] = self._style_map - - if "exiftool_path" not in _kwargs and self._exiftool_path is not None: - _kwargs["exiftool_path"] = self._exiftool_path - - # Add the list of converters for nested processing - _kwargs["_parent_converters"] = self._page_converters - - # If we hit an error log it and keep trying - try: - res = converter.convert(local_path, **_kwargs) - except Exception: - failed_attempts.append( - FailedConversionAttempt( - converter=converter, exc_info=sys.exc_info() - ) - ) - - if res is not None: - # Normalize the content - res.text_content = "\n".join( - [line.rstrip() for line in re.split(r"\r?\n", res.text_content)] - ) - res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content) - - # Todo - return res - - # If we got this far without success, report any exceptions - if len(failed_attempts) > 0: - raise FileConversionException(attempts=failed_attempts) - - # Nothing can handle it! - raise UnsupportedFormatException( - f"Could not convert '{local_path}' to Markdown. No converter attempted a conversion, suggesting that the filetype is simply not supported." + # Create an initial guess from all this information + base_guess = StreamInfo( + mimetype=mimetype, + charset=charset, + filename=filename, + extension=extension, + url=response.url, ) - def _append_ext(self, extensions, ext): - """Append a unique non-None, non-empty extension to a list of extensions.""" - if ext is None: - return - ext = ext.strip() - if ext == "": - return - if ext in extensions: - return - extensions.append(ext) + # Update with any additional info from the arguments + if stream_info is not None: + base_guess = base_guess.copy_and_update(stream_info) + if file_extension is not None: + # Deprecated -- use stream_info + base_guess = base_guess.copy_and_update(extension=file_extension) + if url is not None: + # Deprecated -- use stream_info + base_guess = base_guess.copy_and_update(url=url) + + # Add the guess if its non-trivial + guesses: List[StreamInfo] = [] + if base_guess.mimetype is not None or base_guess.extension is not None: + guesses.append(base_guess) + + # Read into BytesIO + buffer = io.BytesIO() + for chunk in response.iter_content(chunk_size=512): + buffer.write(chunk) + buffer.seek(0) + + # Create a placeholder filename to help with guessing + placeholder_filename = None + if base_guess.filename is not None: + placeholder_filename = base_guess.filename + elif base_guess.extension is not None: + placeholder_filename = "placeholder" + base_guess.extension + + # Add guesses based on stream content + for guess in StreamInfo.guess_from_stream( + file_stream=buffer, filename_hint=placeholder_filename + ): + guesses.append(base_guess.copy_and_update(guess)) + + # Convert + return self._convert(file_stream=buffer, stream_info_guesses=guesses, **kwargs) + + def _convert( + self, *, file_stream: BinaryIO, stream_info_guesses: List[StreamInfo], **kwargs + ) -> DocumentConverterResult: + # Lazily create a temporary file, if needed, for backward compatibility + # This is to support a deprecated feature, and will be removed in the future + temp_file = None + + def get_temp_file(): + nonlocal temp_file + + if temp_file is not None: + return temp_file + else: + cur_pos = file_stream.tell() + handle, temp_file = tempfile.mkstemp() + fh = os.fdopen(handle, "wb") + file_stream.seek(0) + fh.write(file_stream.read()) + file_stream.seek(cur_pos) + fh.close() + return temp_file - def _guess_ext_magic(self, path): - """Use puremagic (a Python implementation of libmagic) to guess a file's extension based on the first few bytes.""" - # Use puremagic to guess try: - guesses = puremagic.magic_file(path) + res: Union[None, DocumentConverterResult] = None - # Fix for: https://github.com/microsoft/markitdown/issues/222 - # If there are no guesses, then try again after trimming leading ASCII whitespaces. - # ASCII whitespace characters are those byte values in the sequence b' \t\n\r\x0b\f' - # (space, tab, newline, carriage return, vertical tab, form feed). - if len(guesses) == 0: - with open(path, "rb") as file: - while True: - char = file.read(1) - if not char: # End of file - break - if not char.isspace(): - file.seek(file.tell() - 1) - break - try: - guesses = puremagic.magic_stream(file) - except puremagic.main.PureError: - pass + # Keep track of which converters throw exceptions + failed_attempts: List[FailedConversionAttempt] = [] - extensions = list() - for g in guesses: - ext = g.extension.strip() - if len(ext) > 0: - if not ext.startswith("."): - ext = "." + ext - if ext not in extensions: - extensions.append(ext) - return extensions - except FileNotFoundError: - pass - except IsADirectoryError: - pass - except PermissionError: - pass - return [] + # Create a copy of the page_converters list, sorted by priority. + # We do this with each call to _convert because the priority of converters may change between calls. + # The sort is guaranteed to be stable, so converters with the same priority will remain in the same order. + sorted_converters = sorted(self._page_converters, key=lambda x: x.priority) + + for file_info in stream_info_guesses + [None]: + for converter in sorted_converters: + _kwargs = copy.deepcopy(kwargs) + + # Copy any additional global options + if "llm_client" not in _kwargs and self._llm_client is not None: + _kwargs["llm_client"] = self._llm_client + + if "llm_model" not in _kwargs and self._llm_model is not None: + _kwargs["llm_model"] = self._llm_model + + if "style_map" not in _kwargs and self._style_map is not None: + _kwargs["style_map"] = self._style_map + + if ( + "exiftool_path" not in _kwargs + and self._exiftool_path is not None + ): + _kwargs["exiftool_path"] = self._exiftool_path + + # Add the list of converters for nested processing + _kwargs["_parent_converters"] = self._page_converters + + # Add backwards compatibility + if isinstance(converter, DocumentConverter): + if file_info is not None: + # Legacy converters need a file_extension + if file_info.extension is not None: + _kwargs["file_extension"] = file_info.extension + + # And benefit from urls, when available + if file_info.url is not None: + _kwargs["url"] = file_info.url + + try: + res = converter.convert(get_temp_file(), **_kwargs) + except Exception: + failed_attempts.append( + FailedConversionAttempt( + converter=converter, exc_info=sys.exc_info() + ) + ) + else: + raise NotImplementedError("TODO") + + if res is not None: + # Normalize the content + res.text_content = "\n".join( + [ + line.rstrip() + for line in re.split(r"\r?\n", res.text_content) + ] + ) + res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content) + return res + + # If we got this far without success, report any exceptions + if len(failed_attempts) > 0: + raise FileConversionException(attempts=failed_attempts) + + # Nothing can handle it! + raise UnsupportedFormatException( + f"Could not convert stream to Markdown. No converter attempted a conversion, suggesting that the filetype is simply not supported." + ) + + finally: + # Clean up the temporary file + if temp_file is not None: + try: + os.unlink(temp_file) + except Exception: + pass def register_page_converter(self, converter: DocumentConverter) -> None: """DEPRECATED: User register_converter instead.""" diff --git a/packages/markitdown/src/markitdown/_stream_info.py b/packages/markitdown/src/markitdown/_stream_info.py new file mode 100644 index 0000000..9014e73 --- /dev/null +++ b/packages/markitdown/src/markitdown/_stream_info.py @@ -0,0 +1,105 @@ +import puremagic +from dataclasses import dataclass, asdict +from typing import Optional, BinaryIO, List, TypeVar, Type + +# This is a workaround for Self not being available in Python 3.10 +T = TypeVar("T", bound="StreamInfo") + +# Mimetype substitutions table +MIMETYPE_SUBSTITUTIONS = { + "application/excel": "application/vnd.ms-excel", + "application/mspowerpoint": "application/vnd.ms-powerpoint", +} + + +@dataclass(kw_only=True, frozen=True) +class StreamInfo: + """The StreamInfo class is used to store information about a file stream. + All fields can be None, and will depend on how the stream was opened. + """ + + mimetype: Optional[str] = None + extension: Optional[str] = None + charset: Optional[str] = None + filename: Optional[ + str + ] = None # From local path, url, or Content-Disposition header + local_path: Optional[str] = None # If read from disk + url: Optional[str] = None # If read from url + + def copy_and_update(self, *args, **kwargs): + """Copy the StreamInfo object and update it with the given StreamInfo + instance and/or other keyword arguments.""" + new_info = asdict(self) + + for si in args: + assert isinstance(si, StreamInfo) + new_info.update({k: v for k, v in asdict(si).items() if v is not None}) + + if len(kwargs) > 0: + new_info.update(kwargs) + + return StreamInfo(**new_info) + + @classmethod + def guess_from_stream( + cls: Type[T], file_stream: BinaryIO, *, filename_hint: Optional[str] = None + ) -> List[T]: + """ + Guess StreamInfo properties (mostly mimetype and extension) from a stream. + + Args: + - stream: The stream to guess the StreamInfo from. + - filename_hint [Optional]: A filename hint to help with the guessing (may be a placeholder, and not actually be the file name) + + Returns a list of StreamInfo objects in order of confidence. + """ + guesses: List[StreamInfo] = [] + + def _puremagic( + file_stream, filename_hint + ) -> puremagic.main.PureMagicWithConfidence: + """Wrap guesses to handle exceptions.""" + try: + return puremagic.magic_stream(file_stream, filename=filename_hint) + except puremagic.main.PureError as e: + return [] + + cur_pos = file_stream.tell() + type_guesses = _puremagic(file_stream, filename_hint=filename_hint) + if len(type_guesses) == 0: + # Fix for: https://github.com/microsoft/markitdown/issues/222 + # If there are no guesses, then try again after trimming leading ASCII whitespaces. + # ASCII whitespace characters are those byte values in the sequence b' \t\n\r\x0b\f' + # (space, tab, newline, carriage return, vertical tab, form feed). + + # Eat all the leading whitespace + file_stream.seek(cur_pos) + while True: + char = file_stream.read(1) + if not char: # End of file + break + if not char.isspace(): + file_stream.seek(file_stream.tell() - 1) + break + + # Try again + type_guesses = _puremagic(file_stream, filename_hint=filename_hint) + file_stream.seek(cur_pos) + + # Convert and return the guesses + for guess in type_guesses: + kwargs: dict[str, str] = {} + if guess.extension: + kwargs["extension"] = guess.extension + if guess.mime_type: + kwargs["mimetype"] = MIMETYPE_SUBSTITUTIONS.get( + guess.mime_type, guess.mime_type + ) + if len(kwargs) > 0: + # We don't add the filename_hint, because sometimes it's just a placeholder, + # and, in any case, doesn't add new information. + guesses.append(cls(**kwargs)) + + # Return the guesses + return guesses diff --git a/packages/markitdown/tests/test_markitdown.py b/packages/markitdown/tests/test_markitdown.py index 0a3b56e..61c9ff7 100644 --- a/packages/markitdown/tests/test_markitdown.py +++ b/packages/markitdown/tests/test_markitdown.py @@ -8,7 +8,12 @@ import requests from warnings import catch_warnings, resetwarnings -from markitdown import MarkItDown, UnsupportedFormatException, FileConversionException +from markitdown import ( + MarkItDown, + UnsupportedFormatException, + FileConversionException, + StreamInfo, +) skip_remote = ( True if os.environ.get("GITHUB_ACTIONS") else False @@ -162,6 +167,107 @@ def validate_strings(result, expected_strings, exclude_strings=None): assert string not in text_content +def test_stream_info_operations() -> None: + """Test operations performed on StreamInfo objects.""" + + stream_info_original = StreamInfo( + mimetype="mimetype.1", + extension="extension.1", + charset="charset.1", + filename="filename.1", + local_path="local_path.1", + url="url.1", + ) + + # Check updating all attributes by keyword + keywords = ["mimetype", "extension", "charset", "filename", "local_path", "url"] + for keyword in keywords: + updated_stream_info = stream_info_original.copy_and_update( + **{keyword: f"{keyword}.2"} + ) + + # Make sure the targted attribute is updated + assert getattr(updated_stream_info, keyword) == f"{keyword}.2" + + # Make sure the other attributes are unchanged + for k in keywords: + if k != keyword: + assert getattr(stream_info_original, k) == getattr( + updated_stream_info, k + ) + + # Check updating all attributes by passing a new StreamInfo object + keywords = ["mimetype", "extension", "charset", "filename", "local_path", "url"] + for keyword in keywords: + updated_stream_info = stream_info_original.copy_and_update( + StreamInfo(**{keyword: f"{keyword}.2"}) + ) + + # Make sure the targted attribute is updated + assert getattr(updated_stream_info, keyword) == f"{keyword}.2" + + # Make sure the other attributes are unchanged + for k in keywords: + if k != keyword: + assert getattr(stream_info_original, k) == getattr( + updated_stream_info, k + ) + + # Check mixing and matching + updated_stream_info = stream_info_original.copy_and_update( + StreamInfo(extension="extension.2", filename="filename.2"), + mimetype="mimetype.3", + charset="charset.3", + ) + assert updated_stream_info.extension == "extension.2" + assert updated_stream_info.filename == "filename.2" + assert updated_stream_info.mimetype == "mimetype.3" + assert updated_stream_info.charset == "charset.3" + assert updated_stream_info.local_path == "local_path.1" + assert updated_stream_info.url == "url.1" + + # Check multiple StreamInfo objects + updated_stream_info = stream_info_original.copy_and_update( + StreamInfo(extension="extension.4", filename="filename.5"), + StreamInfo(mimetype="mimetype.6", charset="charset.7"), + ) + assert updated_stream_info.extension == "extension.4" + assert updated_stream_info.filename == "filename.5" + assert updated_stream_info.mimetype == "mimetype.6" + assert updated_stream_info.charset == "charset.7" + assert updated_stream_info.local_path == "local_path.1" + assert updated_stream_info.url == "url.1" + + +def test_stream_info_guesses() -> None: + """Test StreamInfo guesses based on stream content.""" + + test_tuples = [ + ( + os.path.join(TEST_FILES_DIR, "test.xlsx"), + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ), + ( + os.path.join(TEST_FILES_DIR, "test.docx"), + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ), + ( + os.path.join(TEST_FILES_DIR, "test.pptx"), + "application/vnd.openxmlformats-officedocument.presentationml.presentation", + ), + (os.path.join(TEST_FILES_DIR, "test.xls"), "application/vnd.ms-excel"), + ] + + for file_path, expected_mimetype in test_tuples: + with open(file_path, "rb") as f: + guesses = StreamInfo.guess_from_stream( + f, filename_hint=os.path.basename(file_path) + ) + assert len(guesses) > 0 + assert guesses[0].mimetype == expected_mimetype + assert guesses[0].extension == os.path.splitext(file_path)[1] + + @pytest.mark.skipif( skip_remote, reason="do not run tests that query external urls", @@ -266,6 +372,11 @@ def test_markitdown_local() -> None: result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.json")) validate_strings(result, JSON_TEST_STRINGS) + # Test input from a stream + input_data = b"