diff --git a/packages/markitup/src/markitup/__init__.py b/packages/markitup/src/markitup/__init__.py index 18b38aa..aef329a 100644 --- a/packages/markitup/src/markitup/__init__.py +++ b/packages/markitup/src/markitup/__init__.py @@ -5,8 +5,6 @@ from .__about__ import __version__ from ._markitup import ( MarkItUp, - PRIORITY_SPECIFIC_FILE_FORMAT, - PRIORITY_GENERIC_FILE_FORMAT, ) from ._base_converter import DocumentConverterResult, DocumentConverter from ._stream_info import StreamInfo @@ -29,6 +27,4 @@ __all__ = [ "FileConversionException", "UnsupportedFormatException", "StreamInfo", - "PRIORITY_SPECIFIC_FILE_FORMAT", - "PRIORITY_GENERIC_FILE_FORMAT", ] diff --git a/packages/markitup/src/markitup/_base_converter.py b/packages/markitup/src/markitup/_base_converter.py index 2f0ca9d..cdabef9 100644 --- a/packages/markitup/src/markitup/_base_converter.py +++ b/packages/markitup/src/markitup/_base_converter.py @@ -1,8 +1,9 @@ import os import tempfile from warnings import warn -from typing import Any, Union, BinaryIO, Optional, List +from typing import Any, Union, BinaryIO, Optional, List, Dict from ._stream_info import StreamInfo +import re class DocumentConverterResult: @@ -26,6 +27,61 @@ class DocumentConverterResult: """ self.markdown = markdown self.title = title + + def to_llm(self) -> List[Dict[str, Any]]: + """ + Convert markdown with base64 images to a format compatible with OpenAI's API. + + This function parses the markdown content, extracting text and images in their + original order, and returns a list of content elements in OpenAI's format. + + Returns: + List[Dict[str, Any]]: A list of dictionaries representing the content elements + (text and images) in their original order. + """ + + + # Pattern to match markdown image syntax with base64 data + pattern = r'!\[(.*?)\]\(data:(.*?);base64,(.*?)\)' + + content = [] + last_end = 0 + + # Process the document sequentially to maintain order + for match in re.finditer(pattern, self.markdown): + # Add the text before this image if any + if match.start() > last_end: + text_chunk = self.markdown[last_end:match.start()].strip() + if text_chunk: + content.append({ + "type": "text", + "text": text_chunk + }) + + # Extract image data + alt_text, content_type, b64_data = match.groups() + + # Add the image + content.append({ + "type": "image", + "image_url": { + "url": f"data:{content_type};base64,{b64_data}" + }, + "alt_text": alt_text + }) + + last_end = match.end() + + # Add any remaining text after the last image + if last_end < len(self.markdown): + text_chunk = self.markdown[last_end:].strip() + if text_chunk: + content.append({ + "type": "text", + "text": text_chunk + }) + + return content @property def text_content(self) -> str: @@ -45,45 +101,6 @@ class DocumentConverterResult: class DocumentConverter: """Abstract superclass of all DocumentConverters.""" - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> bool: - """ - Return a quick determination on if the converter should attempt converting the document. - This is primarily based `stream_info` (typically, `stream_info.mimetype`, `stream_info.extension`). - In cases where the data is retrieved via HTTP, the `steam_info.url` might also be referenced to - make a determination (e.g., special converters for Wikipedia, YouTube etc). - Finally, it is conceivable that the `stream_info.filename` might be used to in cases - where the filename is well-known (e.g., `Dockerfile`, `Makefile`, etc) - - NOTE: The method signature is designed to match that of the convert() method. This provides some - assurance that, if accepts() returns True, the convert() method will also be able to handle the document. - - IMPORTANT: In rare cases, (e.g., OutlookMsgConverter) we need to read more from the stream to make a final - determination. Read operations inevitably advances the position in file_stream. In these case, the position - MUST be reset it MUST be reset before returning. This is because the convert() method may be called immediately - after accepts(), and will expect the file_stream to be at the original position. - - E.g., - cur_pos = file_stream.tell() # Save the current position - data = file_stream.read(100) # ... peek at the first 100 bytes, etc. - file_stream.seek(cur_pos) # Reset the position to the original position - - Prameters: - - file_stream: The file-like object to convert. Must support seek(), tell(), and read() methods. - - stream_info: The StreamInfo object containing metadata about the file (mimetype, extension, charset, set) - - kwargs: Additional keyword arguments for the converter. - - Returns: - - bool: True if the converter can handle the document, False otherwise. - """ - raise NotImplementedError( - f"The subclass, {type(self).__name__}, must implement the accepts() method to determine if they can handle the document." - ) - def convert( self, file_stream: BinaryIO, diff --git a/packages/markitup/src/markitup/_markitup.py b/packages/markitup/src/markitup/_markitup.py index a17f3a6..c2fb0a2 100644 --- a/packages/markitup/src/markitup/_markitup.py +++ b/packages/markitup/src/markitup/_markitup.py @@ -1,26 +1,10 @@ -import copy -import mimetypes -import os -import re -import sys -import shutil -import tempfile -import warnings -import traceback -import io -from dataclasses import dataclass -from importlib.metadata import entry_points from typing import Any, List, Dict, Optional, Union, BinaryIO from pathlib import Path from urllib.parse import urlparse from warnings import warn -import requests -import magika -import charset_normalizer -import codecs +import magic from ._stream_info import StreamInfo -from ._uri_utils import parse_data_uri, file_uri_to_path from .converters import ( PlainTextConverter, @@ -43,718 +27,74 @@ from ._exceptions import ( ) -# Lower priority values are tried first. -PRIORITY_SPECIFIC_FILE_FORMAT = ( - 0.0 # e.g., .docx, .pdf, .xlsx, Or specific pages, e.g., wikipedia -) -PRIORITY_GENERIC_FILE_FORMAT = ( - 10.0 # Near catch-all converters for mimetypes like text/*, etc. -) - - -_plugins: Union[None, List[Any]] = None # If None, plugins have not been loaded yet. - - -def _load_plugins() -> Union[None, List[Any]]: - """Lazy load plugins, exiting early if already loaded.""" - global _plugins - - # Skip if we've already loaded plugins - if _plugins is not None: - return _plugins - - # Load plugins - _plugins = [] - for entry_point in entry_points(group="markitup.plugin"): - try: - _plugins.append(entry_point.load()) - except Exception: - tb = traceback.format_exc() - warn(f"Plugin '{entry_point.name}' failed to load ... skipping:\n{tb}") - - return _plugins - - -@dataclass(kw_only=True, frozen=True) -class ConverterRegistration: - """A registration of a converter with its priority and other metadata.""" - - converter: DocumentConverter - priority: float - - class MarkItUp: """(In preview) An extremely simple text-based document reader, suitable for LLM use. This reader will convert common file-types or webpages to Markdown.""" def __init__( self, - *, - enable_builtins: Union[None, bool] = None, - enable_plugins: Union[None, bool] = None, - **kwargs, + config: Optional[Dict[str, Any]] = None, ): - self._builtins_enabled = False - self._plugins_enabled = False - - requests_session = kwargs.get("requests_session") - if requests_session is None: - self._requests_session = requests.Session() - else: - self._requests_session = requests_session - - self._magika = magika.Magika() - - # TODO - remove these (see enable_builtins) - self._llm_client: Any = None - self._llm_model: Union[str | None] = None - self._exiftool_path: Union[str | None] = None - self._style_map: Union[str | None] = None - - # Register the converters - self._converters: List[ConverterRegistration] = [] - - if ( - enable_builtins is None or enable_builtins - ): # Default to True when not specified - self.enable_builtins(**kwargs) - - if enable_plugins: - self.enable_plugins(**kwargs) - - def enable_builtins(self, **kwargs) -> None: - """ - Enable and register built-in converters. - Built-in converters are enabled by default. - This method should only be called once, if built-ins were initially disabled. - """ - if not self._builtins_enabled: - # TODO: Move these into converter constructors - self._llm_client = kwargs.get("llm_client") - self._llm_model = kwargs.get("llm_model") - self._exiftool_path = kwargs.get("exiftool_path") - self._style_map = kwargs.get("style_map") - - if self._exiftool_path is None: - self._exiftool_path = os.getenv("EXIFTOOL_PATH") - - # Still none? Check well-known paths - if self._exiftool_path is None: - candidate = shutil.which("exiftool") - if candidate: - candidate = os.path.abspath(candidate) - if any( - d == os.path.dirname(candidate) - for d in [ - "/usr/bin", - "/usr/local/bin", - "/opt", - "/opt/bin", - "/opt/local/bin", - "/opt/homebrew/bin", - "C:\\Windows\\System32", - "C:\\Program Files", - "C:\\Program Files (x86)", - ] - ): - self._exiftool_path = candidate - - # Register converters for successful browsing operations - # Later registrations are tried first / take higher priority than earlier registrations - # To this end, the most specific converters should appear below the most generic converters - self.register_converter( - PlainTextConverter(), priority=PRIORITY_GENERIC_FILE_FORMAT - ) - self.register_converter( - ZipConverter(markitup=self), priority=PRIORITY_GENERIC_FILE_FORMAT - ) - self.register_converter( - HtmlConverter(), priority=PRIORITY_GENERIC_FILE_FORMAT - ) - self.register_converter(RssConverter()) - self.register_converter(WikipediaConverter()) - self.register_converter(YouTubeConverter()) - self.register_converter(BingSerpConverter()) - self.register_converter(DocxConverter()) - self.register_converter(XlsxConverter()) - self.register_converter(XlsConverter()) - self.register_converter(PptxConverter()) - self.register_converter(AudioConverter()) - self.register_converter(ImageConverter()) - self.register_converter(IpynbConverter()) - self.register_converter(PdfConverter()) - self.register_converter(OutlookMsgConverter()) - self.register_converter(EpubConverter()) - self.register_converter(CsvConverter()) - - # Register Document Intelligence converter at the top of the stack if endpoint is provided - docintel_endpoint = kwargs.get("docintel_endpoint") - if docintel_endpoint is not None: - docintel_args: Dict[str, Any] = {} - docintel_args["endpoint"] = docintel_endpoint - - docintel_credential = kwargs.get("docintel_credential") - if docintel_credential is not None: - docintel_args["credential"] = docintel_credential - - docintel_types = kwargs.get("docintel_file_types") - if docintel_types is not None: - docintel_args["file_types"] = docintel_types - - self.register_converter( - DocumentIntelligenceConverter(**docintel_args), - ) - - self._builtins_enabled = True - else: - warn("Built-in converters are already enabled.", RuntimeWarning) - - def enable_plugins(self, **kwargs) -> None: - """ - Enable and register converters provided by plugins. - Plugins are disabled by default. - This method should only be called once, if plugins were initially disabled. - """ - if not self._plugins_enabled: - # Load plugins - plugins = _load_plugins() - assert plugins is not None - for plugin in plugins: - try: - plugin.register_converters(self, **kwargs) - except Exception: - tb = traceback.format_exc() - warn(f"Plugin '{plugin}' failed to register converters:\n{tb}") - self._plugins_enabled = True - else: - warn("Plugins converters are already enabled.", RuntimeWarning) - - def convert( - self, - source: Union[str, requests.Response, Path, BinaryIO], - *, - stream_info: Optional[StreamInfo] = None, - **kwargs: Any, - ) -> DocumentConverterResult: # TODO: deal with kwargs - """ - Args: - - source: can be a path (str or Path), url, or a requests.response object - - stream_info: optional stream info to use for the conversion. If None, infer from source - - kwargs: additional arguments to pass to the converter - """ - - # Local path or url - if isinstance(source, str): - if ( - source.startswith("http:") - or source.startswith("https:") - or source.startswith("file:") - or source.startswith("data:") - ): - # Rename the url argument to mock_url - # (Deprecated -- use stream_info) - _kwargs = {k: v for k, v in kwargs.items()} - if "url" in _kwargs: - _kwargs["mock_url"] = _kwargs["url"] - del _kwargs["url"] - - return self.convert_uri(source, stream_info=stream_info, **_kwargs) - else: - return self.convert_local(source, stream_info=stream_info, **kwargs) - # Path object - elif isinstance(source, Path): - return self.convert_local(source, stream_info=stream_info, **kwargs) - # Request response - elif isinstance(source, requests.Response): - return self.convert_response(source, stream_info=stream_info, **kwargs) - # Binary stream - elif ( - hasattr(source, "read") - and callable(source.read) - and not isinstance(source, io.TextIOBase) - ): - return self.convert_stream(source, stream_info=stream_info, **kwargs) - else: - raise TypeError( - f"Invalid source type: {type(source)}. Expected str, requests.Response, BinaryIO." - ) - - def convert_local( - self, - path: Union[str, Path], - *, - stream_info: Optional[StreamInfo] = None, - file_extension: Optional[str] = None, # Deprecated -- use stream_info - url: Optional[str] = None, # Deprecated -- use stream_info - **kwargs: Any, - ) -> DocumentConverterResult: - if isinstance(path, Path): - path = str(path) - - # Build a base StreamInfo object from which to start guesses - base_guess = StreamInfo( - local_path=path, - extension=os.path.splitext(path)[1], - filename=os.path.basename(path), - ) - - # Extend the base_guess with any additional info from the arguments - if stream_info is not None: - base_guess = base_guess.copy_and_update(stream_info) - - if file_extension is not None: - # Deprecated -- use stream_info - base_guess = base_guess.copy_and_update(extension=file_extension) - - if url is not None: - # Deprecated -- use stream_info - base_guess = base_guess.copy_and_update(url=url) - - with open(path, "rb") as fh: - guesses = self._get_stream_info_guesses( - file_stream=fh, base_guess=base_guess - ) - return self._convert(file_stream=fh, stream_info_guesses=guesses, **kwargs) - - def convert_stream( - self, - stream: BinaryIO, - *, - stream_info: Optional[StreamInfo] = None, - file_extension: Optional[str] = None, # Deprecated -- use stream_info - url: Optional[str] = None, # Deprecated -- use stream_info - **kwargs: Any, - ) -> DocumentConverterResult: - guesses: List[StreamInfo] = [] - - # Do we have anything on which to base a guess? - base_guess = None - if stream_info is not None or file_extension is not None or url is not None: - # Start with a non-Null base guess - if stream_info is None: - base_guess = StreamInfo() - else: - base_guess = stream_info - - if file_extension is not None: - # Deprecated -- use stream_info - assert base_guess is not None # for mypy - base_guess = base_guess.copy_and_update(extension=file_extension) - - if url is not None: - # Deprecated -- use stream_info - assert base_guess is not None # for mypy - base_guess = base_guess.copy_and_update(url=url) - - # Check if we have a seekable stream. If not, load the entire stream into memory. - if not stream.seekable(): - buffer = io.BytesIO() - while True: - chunk = stream.read(4096) - if not chunk: - break - buffer.write(chunk) - buffer.seek(0) - stream = buffer - - # Add guesses based on stream content - guesses = self._get_stream_info_guesses( - file_stream=stream, base_guess=base_guess or StreamInfo() - ) - return self._convert(file_stream=stream, stream_info_guesses=guesses, **kwargs) - - def convert_url( - self, - url: str, - *, - stream_info: Optional[StreamInfo] = None, - file_extension: Optional[str] = None, - mock_url: Optional[str] = None, - **kwargs: Any, - ) -> DocumentConverterResult: - """Alias for convert_uri()""" - # convert_url will likely be deprecated in the future in favor of convert_uri - return self.convert_uri( - url, - stream_info=stream_info, - file_extension=file_extension, - mock_url=mock_url, - **kwargs, - ) - - def convert_uri( - self, - uri: str, - *, - stream_info: Optional[StreamInfo] = None, - file_extension: Optional[str] = None, # Deprecated -- use stream_info - mock_url: Optional[ - str - ] = None, # Mock the request as if it came from a different URL - **kwargs: Any, - ) -> DocumentConverterResult: - uri = uri.strip() - - # File URIs - if uri.startswith("file:"): - netloc, path = file_uri_to_path(uri) - if netloc and netloc != "localhost": - raise ValueError( - f"Unsupported file URI: {uri}. Netloc must be empty or localhost." - ) - return self.convert_local( - path, - stream_info=stream_info, - file_extension=file_extension, - url=mock_url, - **kwargs, - ) - # Data URIs - elif uri.startswith("data:"): - mimetype, attributes, data = parse_data_uri(uri) - - base_guess = StreamInfo( - mimetype=mimetype, - charset=attributes.get("charset"), - ) - if stream_info is not None: - base_guess = base_guess.copy_and_update(stream_info) - - return self.convert_stream( - io.BytesIO(data), - stream_info=base_guess, - file_extension=file_extension, - url=mock_url, - **kwargs, - ) - # HTTP/HTTPS URIs - elif uri.startswith("http:") or uri.startswith("https:"): - response = self._requests_session.get(uri, stream=True) - response.raise_for_status() - return self.convert_response( - response, - stream_info=stream_info, - file_extension=file_extension, - url=mock_url, - **kwargs, - ) - else: - raise ValueError( - f"Unsupported URI scheme: {uri.split(':')[0]}. Supported schemes are: file:, data:, http:, https:" - ) - - def convert_response( - self, - response: requests.Response, - *, - stream_info: Optional[StreamInfo] = None, - file_extension: Optional[str] = None, # Deprecated -- use stream_info - url: Optional[str] = None, # Deprecated -- use stream_info - **kwargs: Any, - ) -> DocumentConverterResult: - # If there is a content-type header, get the mimetype and charset (if present) - mimetype: Optional[str] = None - charset: Optional[str] = None - - if "content-type" in response.headers: - parts = response.headers["content-type"].split(";") - mimetype = parts.pop(0).strip() - for part in parts: - if part.strip().startswith("charset="): - _charset = part.split("=")[1].strip() - if len(_charset) > 0: - charset = _charset - - # If there is a content-disposition header, get the filename and possibly the extension - filename: Optional[str] = None - extension: Optional[str] = None - if "content-disposition" in response.headers: - m = re.search(r"filename=([^;]+)", response.headers["content-disposition"]) - if m: - filename = m.group(1).strip("\"'") - _, _extension = os.path.splitext(filename) - if len(_extension) > 0: - extension = _extension - - # If there is still no filename, try to read it from the url - if filename is None: - parsed_url = urlparse(response.url) - _, _extension = os.path.splitext(parsed_url.path) - if len(_extension) > 0: # Looks like this might be a file! - filename = os.path.basename(parsed_url.path) - extension = _extension - - # Create an initial guess from all this information - base_guess = StreamInfo( - mimetype=mimetype, - charset=charset, - filename=filename, - extension=extension, - url=response.url, - ) - - # Update with any additional info from the arguments - if stream_info is not None: - base_guess = base_guess.copy_and_update(stream_info) - if file_extension is not None: - # Deprecated -- use stream_info - base_guess = base_guess.copy_and_update(extension=file_extension) - if url is not None: - # Deprecated -- use stream_info - base_guess = base_guess.copy_and_update(url=url) - - # Read into BytesIO - buffer = io.BytesIO() - for chunk in response.iter_content(chunk_size=512): - buffer.write(chunk) - buffer.seek(0) - - # Convert - guesses = self._get_stream_info_guesses( - file_stream=buffer, base_guess=base_guess - ) - return self._convert(file_stream=buffer, stream_info_guesses=guesses, **kwargs) - - def _convert( - self, *, file_stream: BinaryIO, stream_info_guesses: List[StreamInfo], **kwargs - ) -> DocumentConverterResult: - res: Union[None, DocumentConverterResult] = None - - # Keep track of which converters throw exceptions - failed_attempts: List[FailedConversionAttempt] = [] - - # Create a copy of the page_converters list, sorted by priority. - # We do this with each call to _convert because the priority of converters may change between calls. - # The sort is guaranteed to be stable, so converters with the same priority will remain in the same order. - sorted_registrations = sorted(self._converters, key=lambda x: x.priority) - - # Remember the initial stream position so that we can return to it - cur_pos = file_stream.tell() - - for stream_info in stream_info_guesses + [StreamInfo()]: - for converter_registration in sorted_registrations: - converter = converter_registration.converter - # Sanity check -- make sure the cur_pos is still the same - assert ( - cur_pos == file_stream.tell() - ), f"File stream position should NOT change between guess iterations" - - _kwargs = {k: v for k, v in kwargs.items()} - - # Copy any additional global options - if "llm_client" not in _kwargs and self._llm_client is not None: - _kwargs["llm_client"] = self._llm_client - - if "llm_model" not in _kwargs and self._llm_model is not None: - _kwargs["llm_model"] = self._llm_model - - if "style_map" not in _kwargs and self._style_map is not None: - _kwargs["style_map"] = self._style_map - - if "exiftool_path" not in _kwargs and self._exiftool_path is not None: - _kwargs["exiftool_path"] = self._exiftool_path - - # Add the list of converters for nested processing - _kwargs["_parent_converters"] = self._converters - - # Add legaxy kwargs - if stream_info is not None: - if stream_info.extension is not None: - _kwargs["file_extension"] = stream_info.extension - - if stream_info.url is not None: - _kwargs["url"] = stream_info.url - - # Check if the converter will accept the file, and if so, try to convert it - _accepts = False - try: - _accepts = converter.accepts(file_stream, stream_info, **_kwargs) - except NotImplementedError: - pass - - # accept() should not have changed the file stream position - assert ( - cur_pos == file_stream.tell() - ), f"{type(converter).__name__}.accept() should NOT change the file_stream position" - - # Attempt the conversion - if _accepts: - try: - res = converter.convert(file_stream, stream_info, **_kwargs) - except Exception: - failed_attempts.append( - FailedConversionAttempt( - converter=converter, exc_info=sys.exc_info() - ) - ) - finally: - file_stream.seek(cur_pos) - - if res is not None: - # Normalize the content - res.text_content = "\n".join( - [line.rstrip() for line in re.split(r"\r?\n", res.text_content)] - ) - res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content) - return res - - # If we got this far without success, report any exceptions - if len(failed_attempts) > 0: - raise FileConversionException(attempts=failed_attempts) - - # Nothing can handle it! - raise UnsupportedFormatException( - f"Could not convert stream to Markdown. No converter attempted a conversion, suggesting that the filetype is simply not supported." - ) - - def register_page_converter(self, converter: DocumentConverter) -> None: - """DEPRECATED: User register_converter instead.""" - warn( - "register_page_converter is deprecated. Use register_converter instead.", - DeprecationWarning, - ) - self.register_converter(converter) - - def register_converter( - self, - converter: DocumentConverter, - *, - priority: float = PRIORITY_SPECIFIC_FILE_FORMAT, - ) -> None: - """ - Register a DocumentConverter with a given priority. - - Priorities work as follows: By default, most converters get priority - DocumentConverter.PRIORITY_SPECIFIC_FILE_FORMAT (== 0). The exception - is the PlainTextConverter, HtmlConverter, and ZipConverter, which get - priority PRIORITY_SPECIFIC_FILE_FORMAT (== 10), with lower values - being tried first (i.e., higher priority). - - Just prior to conversion, the converters are sorted by priority, using - a stable sort. This means that converters with the same priority will - remain in the same order, with the most recently registered converters - appearing first. - - We have tight control over the order of built-in converters, but - plugins can register converters in any order. The registration's priority - field reasserts some control over the order of converters. - - Plugins can register converters with any priority, to appear before or - after the built-ins. For example, a plugin with priority 9 will run - before the PlainTextConverter, but after the built-in converters. - """ - self._converters.insert( - 0, ConverterRegistration(converter=converter, priority=priority) - ) - - def _get_stream_info_guesses( - self, file_stream: BinaryIO, base_guess: StreamInfo - ) -> List[StreamInfo]: - """ - Given a base guess, attempt to guess or expand on the stream info using the stream content (via magika). - """ - guesses: List[StreamInfo] = [] - - # Enhance the base guess with information based on the extension or mimetype - enhanced_guess = base_guess.copy_and_update() - - # If there's an extension and no mimetype, try to guess the mimetype - if base_guess.mimetype is None and base_guess.extension is not None: - _m, _ = mimetypes.guess_type( - "placeholder" + base_guess.extension, strict=False - ) - if _m is not None: - enhanced_guess = enhanced_guess.copy_and_update(mimetype=_m) - - # If there's a mimetype and no extension, try to guess the extension - if base_guess.mimetype is not None and base_guess.extension is None: - _e = mimetypes.guess_all_extensions(base_guess.mimetype, strict=False) - if len(_e) > 0: - enhanced_guess = enhanced_guess.copy_and_update(extension=_e[0]) - - # Call magika to guess from the stream - cur_pos = file_stream.tell() + self.config = config + + def convert(self, stream: BinaryIO) -> Dict[DocumentConverterResult, StreamInfo]: + stream_info: StreamInfo = self._get_stream_info(stream) + # Deal with unsupported file types + match stream_info.category: + case "ppt": + raise UnsupportedFormatException(".ppt files are not supported, try .pptx instead") + case "other": + raise UnsupportedFormatException(f"{stream_info.magic_type} files are not supported") + try: - result = self._magika.identify_stream(file_stream) - if result.status == "ok" and result.prediction.output.label != "unknown": - # If it's text, also guess the charset - charset = None - if result.prediction.output.is_text: - # Read the first 4k to guess the charset - file_stream.seek(cur_pos) - stream_page = file_stream.read(4096) - charset_result = charset_normalizer.from_bytes(stream_page).best() + match stream_info.category: + case "text": + return PlainTextConverter().convert(stream, stream_info), stream_info + case "pptx": + return PptxConverter().convert(stream, stream_info), stream_info + case "pdf": + return PdfConverter().convert(stream, stream_info), stream_info + except FailedConversionAttempt: + raise FileConversionException(f"Failed to convert file of type {stream_info.magic_type}") + return stream_info - if charset_result is not None: - charset = self._normalize_charset(charset_result.encoding) + def _get_stream_info(self, byte_stream: BinaryIO) -> StreamInfo: + original_position = byte_stream.tell() - # Normalize the first extension listed - guessed_extension = None - if len(result.prediction.output.extensions) > 0: - guessed_extension = "." + result.prediction.output.extensions[0] + # Reset stream position to beginning + byte_stream.seek(0) - # Determine if the guess is compatible with the base guess - compatible = True - if ( - base_guess.mimetype is not None - and base_guess.mimetype != result.prediction.output.mime_type - ): - compatible = False + # Get file content for analysis + file_content = byte_stream.read() - if ( - base_guess.extension is not None - and base_guess.extension.lstrip(".") - not in result.prediction.output.extensions - ): - compatible = False + # Use python-magic to determine file type based on content + magic_type = magic.from_buffer(file_content, mime=True) - if ( - base_guess.charset is not None - and self._normalize_charset(base_guess.charset) != charset - ): - compatible = False + # Determine file category based on magic_type + if magic_type.startswith("image/"): + category = "image" + elif magic_type.startswith("audio/"): + category = "audio" + elif magic_type.startswith("video/"): + category = "video" + elif magic_type.startswith("application/vnd.ms-excel"): + category = 'xls' + elif magic_type.startswith("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"): + category = "xlsx" + elif magic_type.startswith("application/vnd.ms-powerpoint"): + category = 'ppt' + elif magic_type == "application/vnd.openxmlformats-officedocument.presentationml.presentation": + category = "pptx" + elif magic_type.startswith("application/msword"): + category = 'doc' + elif magic_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": + category = "docx" + elif magic_type == "application/pdf": + category = "pdf" + elif magic_type.startswith("text/"): + category = "text" + else: + category = "other" - if compatible: - # Add the compatible base guess - guesses.append( - StreamInfo( - mimetype=base_guess.mimetype - or result.prediction.output.mime_type, - extension=base_guess.extension or guessed_extension, - charset=base_guess.charset or charset, - filename=base_guess.filename, - local_path=base_guess.local_path, - url=base_guess.url, - ) - ) - else: - # The magika guess was incompatible with the base guess, so add both guesses - guesses.append(enhanced_guess) - guesses.append( - StreamInfo( - mimetype=result.prediction.output.mime_type, - extension=guessed_extension, - charset=charset, - filename=base_guess.filename, - local_path=base_guess.local_path, - url=base_guess.url, - ) - ) - else: - # There were no other guesses, so just add the base guess - guesses.append(enhanced_guess) - finally: - file_stream.seek(cur_pos) - - return guesses - - def _normalize_charset(self, charset: str | None) -> str | None: - """ - Normalize a charset string to a canonical form. - """ - if charset is None: - return None - try: - return codecs.lookup(charset).name - except LookupError: - return charset + byte_stream.seek(original_position) + return StreamInfo(magic_type=magic_type, category=category) \ No newline at end of file diff --git a/packages/markitup/src/markitup/converters/_html_converter.py b/packages/markitup/src/markitup/converters/_html_converter.py index d4bb3aa..b85a68d 100644 --- a/packages/markitup/src/markitup/converters/_html_converter.py +++ b/packages/markitup/src/markitup/converters/_html_converter.py @@ -19,25 +19,6 @@ ACCEPTED_FILE_CATEGORY = [ class HtmlConverter(DocumentConverter): """Anything with content type text/html""" - - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> bool: - magic_type = (stream_info.magic_type or "").lower() - category = (stream_info.category or "").lower() - - if category in ACCEPTED_FILE_CATEGORY: - return True - - for prefix in ACCEPTED_MAGIC_TYPE_PREFIXES: - if magic_type.startswith(prefix): - return True - - return False - def convert( self, file_stream: BinaryIO, diff --git a/packages/markitup/src/markitup/converters/_pdf_converter.py b/packages/markitup/src/markitup/converters/_pdf_converter.py index 4586ef1..696eb2b 100644 --- a/packages/markitup/src/markitup/converters/_pdf_converter.py +++ b/packages/markitup/src/markitup/converters/_pdf_converter.py @@ -1,32 +1,9 @@ -import sys -import io - from typing import BinaryIO, Any - -from ._html_converter import HtmlConverter from .._base_converter import DocumentConverter, DocumentConverterResult from .._stream_info import StreamInfo -from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE - -# Try loading optional (but in this case, required) dependencies -# Save reporting of any exceptions for later -_dependency_exc_info = None -try: - import pdfminer - import pdfminer.high_level -except ImportError: - # Preserve the error and stack trace for later - _dependency_exc_info = sys.exc_info() - - -ACCEPTED_MIME_TYPE_PREFIXES = [ - "application/pdf", - "application/x-pdf", -] - -ACCEPTED_FILE_EXTENSIONS = [".pdf"] +import pdfminer.high_level class PdfConverter(DocumentConverter): @@ -34,45 +11,12 @@ class PdfConverter(DocumentConverter): Converts PDFs to Markdown. Most style information is ignored, so the results are essentially plain-text. """ - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> bool: - mimetype = (stream_info.mimetype or "").lower() - extension = (stream_info.extension or "").lower() - - if extension in ACCEPTED_FILE_EXTENSIONS: - return True - - for prefix in ACCEPTED_MIME_TYPE_PREFIXES: - if mimetype.startswith(prefix): - return True - - return False - def convert( self, file_stream: BinaryIO, stream_info: StreamInfo, **kwargs: Any, # Options to pass to the converter ) -> DocumentConverterResult: - # Check the dependencies - if _dependency_exc_info is not None: - raise MissingDependencyException( - MISSING_DEPENDENCY_MESSAGE.format( - converter=type(self).__name__, - extension=".pdf", - feature="pdf", - ) - ) from _dependency_exc_info[ - 1 - ].with_traceback( # type: ignore[union-attr] - _dependency_exc_info[2] - ) - - assert isinstance(file_stream, io.IOBase) # for mypy return DocumentConverterResult( markdown=pdfminer.high_level.extract_text(file_stream), ) diff --git a/packages/markitup/src/markitup/converters/_plain_text_converter.py b/packages/markitup/src/markitup/converters/_plain_text_converter.py index ff6d75e..b7f776e 100644 --- a/packages/markitup/src/markitup/converters/_plain_text_converter.py +++ b/packages/markitup/src/markitup/converters/_plain_text_converter.py @@ -1,62 +1,16 @@ -import sys - from typing import BinaryIO, Any from charset_normalizer import from_bytes from .._base_converter import DocumentConverter, DocumentConverterResult from .._stream_info import StreamInfo -ACCEPTED_MIME_TYPE_PREFIXES = [ - "text/", - "application/json", - "application/markdown", -] - -ACCEPTED_FILE_EXTENSIONS = [ - ".txt", - ".text", - ".md", - ".markdown", - ".json", - ".jsonl", -] - class PlainTextConverter(DocumentConverter): """Anything with content type text/plain""" - - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> bool: - mimetype = (stream_info.mimetype or "").lower() - extension = (stream_info.extension or "").lower() - - # If we have a charset, we can safely assume it's text - # With Magika in the earlier stages, this handles most cases - if stream_info.charset is not None: - return True - - # Otherwise, check the mimetype and extension - if extension in ACCEPTED_FILE_EXTENSIONS: - return True - - for prefix in ACCEPTED_MIME_TYPE_PREFIXES: - if mimetype.startswith(prefix): - return True - - return False - def convert( self, file_stream: BinaryIO, stream_info: StreamInfo, **kwargs: Any, # Options to pass to the converter ) -> DocumentConverterResult: - if stream_info.charset: - text_content = file_stream.read().decode(stream_info.charset) - else: - text_content = str(from_bytes(file_stream.read()).best()) - + text_content = str(from_bytes(file_stream.read()).best()) return DocumentConverterResult(markdown=text_content) diff --git a/packages/markitup/src/markitup/converters/_pptx_converter.py b/packages/markitup/src/markitup/converters/_pptx_converter.py index d6a0b66..f1c112b 100644 --- a/packages/markitup/src/markitup/converters/_pptx_converter.py +++ b/packages/markitup/src/markitup/converters/_pptx_converter.py @@ -30,24 +30,6 @@ class PptxConverter(DocumentConverter): super().__init__() self._html_converter = HtmlConverter() - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> bool: - magic_type = (stream_info.magic_type or "").lower() - category = (stream_info.category or "").lower() - - if category in ACCEPTED_FILE_CATEGORY: - return True - - for prefix in ACCEPTED_MAGIC_TYPE_PREFIXES: - if magic_type.startswith(prefix): - return True - - return False - def convert( self, file_stream: BinaryIO, diff --git a/packages/markitup/tests/test_files/test.pdf b/packages/markitup/tests/test_files/test.pdf index e82861e..d9220a8 100644 Binary files a/packages/markitup/tests/test_files/test.pdf and b/packages/markitup/tests/test_files/test.pdf differ diff --git a/packages/markitup/tests/test_files/test.ppt b/packages/markitup/tests/test_files/test.ppt new file mode 100644 index 0000000..34f0fea Binary files /dev/null and b/packages/markitup/tests/test_files/test.ppt differ diff --git a/packages/markitup/tests/test_files/test.txt b/packages/markitup/tests/test_files/test.txt new file mode 100644 index 0000000..86b03d8 --- /dev/null +++ b/packages/markitup/tests/test_files/test.txt @@ -0,0 +1,4 @@ +Lorem ipsum dolor sit amet, consectetur adipiscing elit. +Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. + +This sample TXT file is provided by Sample-Files.com. Visit us for more sample files and resources. \ No newline at end of file