All converters.

This commit is contained in:
Adam Fourney 2025-02-09 16:38:25 -08:00
parent 6793648d15
commit a795a16ce0
9 changed files with 493 additions and 443 deletions

View file

@ -61,6 +61,11 @@ from .converters import (
XlsConverter, XlsConverter,
PptxConverter, PptxConverter,
ImageConverter, ImageConverter,
WavConverter,
Mp3Converter,
OutlookMsgConverter,
ZipConverter,
DocumentIntelligenceConverter,
) )
from .converters._markdownify import _CustomMarkdownify from .converters._markdownify import _CustomMarkdownify
@ -71,450 +76,12 @@ from ._exceptions import (
UnsupportedFormatException, UnsupportedFormatException,
) )
# TODO: currently, there is a bug in the document intelligence SDK with importing the "ContentFormat" enum.
# This constant is a temporary fix until the bug is resolved.
CONTENT_FORMAT = "markdown"
# Override mimetype for csv to fix issue on windows # Override mimetype for csv to fix issue on windows
mimetypes.add_type("text/csv", ".csv") mimetypes.add_type("text/csv", ".csv")
PRIORITY_SPECIFIC_FILE_FORMAT = 0.0 PRIORITY_SPECIFIC_FILE_FORMAT = 0.0
PRIORITY_GENERIC_FILE_FORMAT = -10.0 PRIORITY_GENERIC_FILE_FORMAT = -10.0
# Optional Transcription support
IS_AUDIO_TRANSCRIPTION_CAPABLE = False
try:
# Using warnings' catch_warnings to catch
# pydub's warning of ffmpeg or avconv missing
with catch_warnings(record=True) as w:
import pydub
if w:
raise ModuleNotFoundError
import speech_recognition as sr
IS_AUDIO_TRANSCRIPTION_CAPABLE = True
except ModuleNotFoundError:
pass
finally:
resetwarnings()
class MediaConverter(DocumentConverter):
"""
Abstract class for multi-modal media (e.g., images and audio)
"""
def _get_metadata(self, local_path, exiftool_path=None):
if not exiftool_path:
which_exiftool = shutil.which("exiftool")
if which_exiftool:
warn(
f"""Implicit discovery of 'exiftool' is disabled. If you would like to continue to use exiftool in MarkItDown, please set the exiftool_path parameter in the MarkItDown consructor. E.g.,
md = MarkItDown(exiftool_path="{which_exiftool}")
This warning will be removed in future releases.
""",
DeprecationWarning,
)
return None
else:
try:
result = subprocess.run(
[exiftool_path, "-json", local_path], capture_output=True, text=True
).stdout
return json.loads(result)[0]
except Exception:
return None
class WavConverter(MediaConverter):
"""
Converts WAV files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` is installed).
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bail if not a WAV
extension = kwargs.get("file_extension", "")
if extension.lower() != ".wav":
return None
md_content = ""
# Add metadata
metadata = self._get_metadata(local_path, kwargs.get("exiftool_path"))
if metadata:
for f in [
"Title",
"Artist",
"Author",
"Band",
"Album",
"Genre",
"Track",
"DateTimeOriginal",
"CreateDate",
"Duration",
]:
if f in metadata:
md_content += f"{f}: {metadata[f]}\n"
# Transcribe
if IS_AUDIO_TRANSCRIPTION_CAPABLE:
try:
transcript = self._transcribe_audio(local_path)
md_content += "\n\n### Audio Transcript:\n" + (
"[No speech detected]" if transcript == "" else transcript
)
except Exception:
md_content += (
"\n\n### Audio Transcript:\nError. Could not transcribe this audio."
)
return DocumentConverterResult(
title=None,
text_content=md_content.strip(),
)
def _transcribe_audio(self, local_path) -> str:
recognizer = sr.Recognizer()
with sr.AudioFile(local_path) as source:
audio = recognizer.record(source)
return recognizer.recognize_google(audio).strip()
class Mp3Converter(WavConverter):
"""
Converts MP3 files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` AND `pydub` are installed).
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bail if not a MP3
extension = kwargs.get("file_extension", "")
if extension.lower() != ".mp3":
return None
md_content = ""
# Add metadata
metadata = self._get_metadata(local_path, kwargs.get("exiftool_path"))
if metadata:
for f in [
"Title",
"Artist",
"Author",
"Band",
"Album",
"Genre",
"Track",
"DateTimeOriginal",
"CreateDate",
"Duration",
]:
if f in metadata:
md_content += f"{f}: {metadata[f]}\n"
# Transcribe
if IS_AUDIO_TRANSCRIPTION_CAPABLE:
handle, temp_path = tempfile.mkstemp(suffix=".wav")
os.close(handle)
try:
sound = pydub.AudioSegment.from_mp3(local_path)
sound.export(temp_path, format="wav")
_args = dict()
_args.update(kwargs)
_args["file_extension"] = ".wav"
try:
transcript = super()._transcribe_audio(temp_path).strip()
md_content += "\n\n### Audio Transcript:\n" + (
"[No speech detected]" if transcript == "" else transcript
)
except Exception:
md_content += "\n\n### Audio Transcript:\nError. Could not transcribe this audio."
finally:
os.unlink(temp_path)
# Return the result
return DocumentConverterResult(
title=None,
text_content=md_content.strip(),
)
class OutlookMsgConverter(DocumentConverter):
"""Converts Outlook .msg files to markdown by extracting email metadata and content.
Uses the olefile package to parse the .msg file structure and extract:
- Email headers (From, To, Subject)
- Email body content
"""
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
# Bail if not a MSG file
extension = kwargs.get("file_extension", "")
if extension.lower() != ".msg":
return None
try:
msg = olefile.OleFileIO(local_path)
# Extract email metadata
md_content = "# Email Message\n\n"
# Get headers
headers = {
"From": self._get_stream_data(msg, "__substg1.0_0C1F001F"),
"To": self._get_stream_data(msg, "__substg1.0_0E04001F"),
"Subject": self._get_stream_data(msg, "__substg1.0_0037001F"),
}
# Add headers to markdown
for key, value in headers.items():
if value:
md_content += f"**{key}:** {value}\n"
md_content += "\n## Content\n\n"
# Get email body
body = self._get_stream_data(msg, "__substg1.0_1000001F")
if body:
md_content += body
msg.close()
return DocumentConverterResult(
title=headers.get("Subject"), text_content=md_content.strip()
)
except Exception as e:
raise FileConversionException(
f"Could not convert MSG file '{local_path}': {str(e)}"
)
def _get_stream_data(
self, msg: olefile.OleFileIO, stream_path: str
) -> Union[str, None]:
"""Helper to safely extract and decode stream data from the MSG file."""
try:
if msg.exists(stream_path):
data = msg.openstream(stream_path).read()
# Try UTF-16 first (common for .msg files)
try:
return data.decode("utf-16-le").strip()
except UnicodeDecodeError:
# Fall back to UTF-8
try:
return data.decode("utf-8").strip()
except UnicodeDecodeError:
# Last resort - ignore errors
return data.decode("utf-8", errors="ignore").strip()
except Exception:
pass
return None
class ZipConverter(DocumentConverter):
"""Converts ZIP files to markdown by extracting and converting all contained files.
The converter extracts the ZIP contents to a temporary directory, processes each file
using appropriate converters based on file extensions, and then combines the results
into a single markdown document. The temporary directory is cleaned up after processing.
Example output format:
```markdown
Content from the zip file `example.zip`:
## File: docs/readme.txt
This is the content of readme.txt
Multiple lines are preserved
## File: images/example.jpg
ImageSize: 1920x1080
DateTimeOriginal: 2024-02-15 14:30:00
Description: A beautiful landscape photo
## File: data/report.xlsx
## Sheet1
| Column1 | Column2 | Column3 |
|---------|---------|---------|
| data1 | data2 | data3 |
| data4 | data5 | data6 |
```
Key features:
- Maintains original file structure in headings
- Processes nested files recursively
- Uses appropriate converters for each file type
- Preserves formatting of converted content
- Cleans up temporary files after processing
"""
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
# Bail if not a ZIP
extension = kwargs.get("file_extension", "")
if extension.lower() != ".zip":
return None
# Get parent converters list if available
parent_converters = kwargs.get("_parent_converters", [])
if not parent_converters:
return DocumentConverterResult(
title=None,
text_content=f"[ERROR] No converters available to process zip contents from: {local_path}",
)
extracted_zip_folder_name = (
f"extracted_{os.path.basename(local_path).replace('.zip', '_zip')}"
)
extraction_dir = os.path.normpath(
os.path.join(os.path.dirname(local_path), extracted_zip_folder_name)
)
md_content = f"Content from the zip file `{os.path.basename(local_path)}`:\n\n"
try:
# Extract the zip file safely
with zipfile.ZipFile(local_path, "r") as zipObj:
# Safeguard against path traversal
for member in zipObj.namelist():
member_path = os.path.normpath(os.path.join(extraction_dir, member))
if (
not os.path.commonprefix([extraction_dir, member_path])
== extraction_dir
):
raise ValueError(
f"Path traversal detected in zip file: {member}"
)
# Extract all files safely
zipObj.extractall(path=extraction_dir)
# Process each extracted file
for root, dirs, files in os.walk(extraction_dir):
for name in files:
file_path = os.path.join(root, name)
relative_path = os.path.relpath(file_path, extraction_dir)
# Get file extension
_, file_extension = os.path.splitext(name)
# Update kwargs for the file
file_kwargs = kwargs.copy()
file_kwargs["file_extension"] = file_extension
file_kwargs["_parent_converters"] = parent_converters
# Try converting the file using available converters
for converter in parent_converters:
# Skip the zip converter to avoid infinite recursion
if isinstance(converter, ZipConverter):
continue
result = converter.convert(file_path, **file_kwargs)
if result is not None:
md_content += f"\n## File: {relative_path}\n\n"
md_content += result.text_content + "\n\n"
break
# Clean up extracted files if specified
if kwargs.get("cleanup_extracted", True):
shutil.rmtree(extraction_dir)
return DocumentConverterResult(title=None, text_content=md_content.strip())
except zipfile.BadZipFile:
return DocumentConverterResult(
title=None,
text_content=f"[ERROR] Invalid or corrupted zip file: {local_path}",
)
except ValueError as ve:
return DocumentConverterResult(
title=None,
text_content=f"[ERROR] Security error in zip file {local_path}: {str(ve)}",
)
except Exception as e:
return DocumentConverterResult(
title=None,
text_content=f"[ERROR] Failed to process zip file {local_path}: {str(e)}",
)
class DocumentIntelligenceConverter(DocumentConverter):
"""Specialized DocumentConverter that uses Document Intelligence to extract text from documents."""
def __init__(
self,
endpoint: str,
api_version: str = "2024-07-31-preview",
):
self.endpoint = endpoint
self.api_version = api_version
self.doc_intel_client = DocumentIntelligenceClient(
endpoint=self.endpoint,
api_version=self.api_version,
credential=DefaultAzureCredential(),
)
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
# Bail if extension is not supported by Document Intelligence
extension = kwargs.get("file_extension", "")
docintel_extensions = [
".pdf",
".docx",
".xlsx",
".pptx",
".html",
".jpeg",
".jpg",
".png",
".bmp",
".tiff",
".heif",
]
if extension.lower() not in docintel_extensions:
return None
# Get the bytestring for the local path
with open(local_path, "rb") as f:
file_bytes = f.read()
# Certain document analysis features are not availiable for filetypes (.xlsx, .pptx, .html)
if extension.lower() in [".xlsx", ".pptx", ".html"]:
analysis_features = []
else:
analysis_features = [
DocumentAnalysisFeature.FORMULAS, # enable formula extraction
DocumentAnalysisFeature.OCR_HIGH_RESOLUTION, # enable high resolution OCR
DocumentAnalysisFeature.STYLE_FONT, # enable font style extraction
]
# Extract the text using Azure Document Intelligence
poller = self.doc_intel_client.begin_analyze_document(
model_id="prebuilt-layout",
body=AnalyzeDocumentRequest(bytes_source=file_bytes),
features=analysis_features,
output_content_format=CONTENT_FORMAT, # TODO: replace with "ContentFormat.MARKDOWN" when the bug is fixed
)
result: AnalyzeResult = poller.result()
# remove comments from the markdown content generated by Doc Intelligence and append to markdown string
markdown_text = re.sub(r"<!--.*?-->", "", result.content, flags=re.DOTALL)
return DocumentConverterResult(
title=None,
text_content=markdown_text,
)
class MarkItDown: class MarkItDown:
"""(In preview) An extremely simple text-based document reader, suitable for LLM use. """(In preview) An extremely simple text-based document reader, suitable for LLM use.
@ -800,10 +367,11 @@ class MarkItDown:
_kwargs["_parent_converters"] = self._page_converters _kwargs["_parent_converters"] = self._page_converters
# If we hit an error log it and keep trying # If we hit an error log it and keep trying
try: # try:
if True:
res = converter.convert(local_path, **_kwargs) res = converter.convert(local_path, **_kwargs)
except Exception: # except Exception:
error_trace = ("\n\n" + traceback.format_exc()).strip() # error_trace = ("\n\n" + traceback.format_exc()).strip()
if res is not None: if res is not None:
# Normalize the content # Normalize the content

View file

@ -15,6 +15,11 @@ from ._docx_converter import DocxConverter
from ._xlsx_converter import XlsxConverter, XlsConverter from ._xlsx_converter import XlsxConverter, XlsConverter
from ._pptx_converter import PptxConverter from ._pptx_converter import PptxConverter
from ._image_converter import ImageConverter from ._image_converter import ImageConverter
from ._wav_converter import WavConverter
from ._mp3_converter import Mp3Converter
from ._outlook_msg_converter import OutlookMsgConverter
from ._zip_converter import ZipConverter
from ._doc_intel_converter import DocumentIntelligenceConverter
__all__ = [ __all__ = [
"DocumentConverter", "DocumentConverter",
@ -32,4 +37,9 @@ __all__ = [
"XlsConverter", "XlsConverter",
"PptxConverter", "PptxConverter",
"ImageConverter", "ImageConverter",
"WavConverter",
"Mp3Converter",
"OutlookMsgConverter",
"ZipConverter",
"DocumentIntelligenceConverter",
] ]

View file

@ -0,0 +1,91 @@
from typing import Any, Union
# Azure imports
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import (
AnalyzeDocumentRequest,
AnalyzeResult,
DocumentAnalysisFeature,
)
from azure.identity import DefaultAzureCredential
from ._base import DocumentConverter, DocumentConverterResult
from .._exceptions import (
MarkItDownException,
ConverterPrerequisiteException,
FileConversionException,
UnsupportedFormatException,
)
# TODO: currently, there is a bug in the document intelligence SDK with importing the "ContentFormat" enum.
# This constant is a temporary fix until the bug is resolved.
CONTENT_FORMAT = "markdown"
class DocumentIntelligenceConverter(DocumentConverter):
"""Specialized DocumentConverter that uses Document Intelligence to extract text from documents."""
def __init__(
self,
endpoint: str,
api_version: str = "2024-07-31-preview",
):
self.endpoint = endpoint
self.api_version = api_version
self.doc_intel_client = DocumentIntelligenceClient(
endpoint=self.endpoint,
api_version=self.api_version,
credential=DefaultAzureCredential(),
)
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
# Bail if extension is not supported by Document Intelligence
extension = kwargs.get("file_extension", "")
docintel_extensions = [
".pdf",
".docx",
".xlsx",
".pptx",
".html",
".jpeg",
".jpg",
".png",
".bmp",
".tiff",
".heif",
]
if extension.lower() not in docintel_extensions:
return None
# Get the bytestring for the local path
with open(local_path, "rb") as f:
file_bytes = f.read()
# Certain document analysis features are not availiable for filetypes (.xlsx, .pptx, .html)
if extension.lower() in [".xlsx", ".pptx", ".html"]:
analysis_features = []
else:
analysis_features = [
DocumentAnalysisFeature.FORMULAS, # enable formula extraction
DocumentAnalysisFeature.OCR_HIGH_RESOLUTION, # enable high resolution OCR
DocumentAnalysisFeature.STYLE_FONT, # enable font style extraction
]
# Extract the text using Azure Document Intelligence
poller = self.doc_intel_client.begin_analyze_document(
model_id="prebuilt-layout",
body=AnalyzeDocumentRequest(bytes_source=file_bytes),
features=analysis_features,
output_content_format=CONTENT_FORMAT, # TODO: replace with "ContentFormat.MARKDOWN" when the bug is fixed
)
result: AnalyzeResult = poller.result()
# remove comments from the markdown content generated by Doc Intelligence and append to markdown string
markdown_text = re.sub(r"<!--.*?-->", "", result.content, flags=re.DOTALL)
return DocumentConverterResult(
title=None,
text_content=markdown_text,
)

View file

@ -6,6 +6,8 @@ from ._base import (
DocumentConverterResult, DocumentConverterResult,
) )
from .._exceptions import FileConversionException
class IpynbConverter(DocumentConverter): class IpynbConverter(DocumentConverter):
"""Converts Jupyter Notebook (.ipynb) files to Markdown.""" """Converts Jupyter Notebook (.ipynb) files to Markdown."""

View file

@ -0,0 +1,84 @@
import tempfile
from typing import Any, Dict, List, Optional, Union
from ._base import DocumentConverter, DocumentConverterResult
from ._wav_converter import WavConverter
from warnings import warn, resetwarnings, catch_warnings
# Optional Transcription support
IS_AUDIO_TRANSCRIPTION_CAPABLE = False
try:
# Using warnings' catch_warnings to catch
# pydub's warning of ffmpeg or avconv missing
with catch_warnings(record=True) as w:
import pydub
if w:
raise ModuleNotFoundError
import speech_recognition as sr
IS_AUDIO_TRANSCRIPTION_CAPABLE = True
except ModuleNotFoundError:
pass
finally:
resetwarnings()
class Mp3Converter(WavConverter):
"""
Converts MP3 files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` AND `pydub` are installed).
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bail if not a MP3
extension = kwargs.get("file_extension", "")
if extension.lower() != ".mp3":
return None
md_content = ""
# Add metadata
metadata = self._get_metadata(local_path, kwargs.get("exiftool_path"))
if metadata:
for f in [
"Title",
"Artist",
"Author",
"Band",
"Album",
"Genre",
"Track",
"DateTimeOriginal",
"CreateDate",
"Duration",
]:
if f in metadata:
md_content += f"{f}: {metadata[f]}\n"
# Transcribe
if IS_AUDIO_TRANSCRIPTION_CAPABLE:
handle, temp_path = tempfile.mkstemp(suffix=".wav")
os.close(handle)
try:
sound = pydub.AudioSegment.from_mp3(local_path)
sound.export(temp_path, format="wav")
_args = dict()
_args.update(kwargs)
_args["file_extension"] = ".wav"
try:
transcript = super()._transcribe_audio(temp_path).strip()
md_content += "\n\n### Audio Transcript:\n" + (
"[No speech detected]" if transcript == "" else transcript
)
except Exception:
md_content += "\n\n### Audio Transcript:\nError. Could not transcribe this audio."
finally:
os.unlink(temp_path)
# Return the result
return DocumentConverterResult(
title=None,
text_content=md_content.strip(),
)

View file

@ -0,0 +1,76 @@
import olefile
from typing import Any, Union
from ._base import DocumentConverter, DocumentConverterResult
class OutlookMsgConverter(DocumentConverter):
"""Converts Outlook .msg files to markdown by extracting email metadata and content.
Uses the olefile package to parse the .msg file structure and extract:
- Email headers (From, To, Subject)
- Email body content
"""
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
# Bail if not a MSG file
extension = kwargs.get("file_extension", "")
if extension.lower() != ".msg":
return None
try:
msg = olefile.OleFileIO(local_path)
# Extract email metadata
md_content = "# Email Message\n\n"
# Get headers
headers = {
"From": self._get_stream_data(msg, "__substg1.0_0C1F001F"),
"To": self._get_stream_data(msg, "__substg1.0_0E04001F"),
"Subject": self._get_stream_data(msg, "__substg1.0_0037001F"),
}
# Add headers to markdown
for key, value in headers.items():
if value:
md_content += f"**{key}:** {value}\n"
md_content += "\n## Content\n\n"
# Get email body
body = self._get_stream_data(msg, "__substg1.0_1000001F")
if body:
md_content += body
msg.close()
return DocumentConverterResult(
title=headers.get("Subject"), text_content=md_content.strip()
)
except Exception as e:
raise FileConversionException(
f"Could not convert MSG file '{local_path}': {str(e)}"
)
def _get_stream_data(
self, msg: olefile.OleFileIO, stream_path: str
) -> Union[str, None]:
"""Helper to safely extract and decode stream data from the MSG file."""
try:
if msg.exists(stream_path):
data = msg.openstream(stream_path).read()
# Try UTF-16 first (common for .msg files)
try:
return data.decode("utf-16-le").strip()
except UnicodeDecodeError:
# Fall back to UTF-8
try:
return data.decode("utf-8").strip()
except UnicodeDecodeError:
# Last resort - ignore errors
return data.decode("utf-8", errors="ignore").strip()
except Exception:
pass
return None

View file

@ -0,0 +1,77 @@
from typing import Union
from ._base import DocumentConverter, DocumentConverterResult
from ._media_converter import MediaConverter
from warnings import warn, resetwarnings, catch_warnings
# Optional Transcription support
IS_AUDIO_TRANSCRIPTION_CAPABLE = False
try:
# Using warnings' catch_warnings to catch
# pydub's warning of ffmpeg or avconv missing
with catch_warnings(record=True) as w:
import pydub
if w:
raise ModuleNotFoundError
import speech_recognition as sr
IS_AUDIO_TRANSCRIPTION_CAPABLE = True
except ModuleNotFoundError:
pass
finally:
resetwarnings()
class WavConverter(MediaConverter):
"""
Converts WAV files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` is installed).
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bail if not a WAV
extension = kwargs.get("file_extension", "")
if extension.lower() != ".wav":
return None
md_content = ""
# Add metadata
metadata = self._get_metadata(local_path, kwargs.get("exiftool_path"))
if metadata:
for f in [
"Title",
"Artist",
"Author",
"Band",
"Album",
"Genre",
"Track",
"DateTimeOriginal",
"CreateDate",
"Duration",
]:
if f in metadata:
md_content += f"{f}: {metadata[f]}\n"
# Transcribe
if IS_AUDIO_TRANSCRIPTION_CAPABLE:
try:
transcript = self._transcribe_audio(local_path)
md_content += "\n\n### Audio Transcript:\n" + (
"[No speech detected]" if transcript == "" else transcript
)
except Exception:
md_content += (
"\n\n### Audio Transcript:\nError. Could not transcribe this audio."
)
return DocumentConverterResult(
title=None,
text_content=md_content.strip(),
)
def _transcribe_audio(self, local_path) -> str:
recognizer = sr.Recognizer()
with sr.AudioFile(local_path) as source:
audio = recognizer.record(source)
return recognizer.recognize_google(audio).strip()

View file

@ -1,8 +1,8 @@
from typing import Any, Dict, List, Optional, Union from typing import Union
import pandas as pd import pandas as pd
from ._base import DocumentConverter, DocumentConverterResult from ._base import DocumentConverterResult
from ._html_converter import HtmlConverter from ._html_converter import HtmlConverter

View file

@ -0,0 +1,142 @@
import os
import zipfile
import shutil
from typing import Any, Union
from ._base import DocumentConverter, DocumentConverterResult
from .._exceptions import (
MarkItDownException,
ConverterPrerequisiteException,
FileConversionException,
UnsupportedFormatException,
)
class ZipConverter(DocumentConverter):
"""Converts ZIP files to markdown by extracting and converting all contained files.
The converter extracts the ZIP contents to a temporary directory, processes each file
using appropriate converters based on file extensions, and then combines the results
into a single markdown document. The temporary directory is cleaned up after processing.
Example output format:
```markdown
Content from the zip file `example.zip`:
## File: docs/readme.txt
This is the content of readme.txt
Multiple lines are preserved
## File: images/example.jpg
ImageSize: 1920x1080
DateTimeOriginal: 2024-02-15 14:30:00
Description: A beautiful landscape photo
## File: data/report.xlsx
## Sheet1
| Column1 | Column2 | Column3 |
|---------|---------|---------|
| data1 | data2 | data3 |
| data4 | data5 | data6 |
```
Key features:
- Maintains original file structure in headings
- Processes nested files recursively
- Uses appropriate converters for each file type
- Preserves formatting of converted content
- Cleans up temporary files after processing
"""
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
# Bail if not a ZIP
extension = kwargs.get("file_extension", "")
if extension.lower() != ".zip":
return None
# Get parent converters list if available
parent_converters = kwargs.get("_parent_converters", [])
if not parent_converters:
return DocumentConverterResult(
title=None,
text_content=f"[ERROR] No converters available to process zip contents from: {local_path}",
)
extracted_zip_folder_name = (
f"extracted_{os.path.basename(local_path).replace('.zip', '_zip')}"
)
extraction_dir = os.path.normpath(
os.path.join(os.path.dirname(local_path), extracted_zip_folder_name)
)
md_content = f"Content from the zip file `{os.path.basename(local_path)}`:\n\n"
try:
# Extract the zip file safely
with zipfile.ZipFile(local_path, "r") as zipObj:
# Safeguard against path traversal
for member in zipObj.namelist():
member_path = os.path.normpath(os.path.join(extraction_dir, member))
if (
not os.path.commonprefix([extraction_dir, member_path])
== extraction_dir
):
raise ValueError(
f"Path traversal detected in zip file: {member}"
)
# Extract all files safely
zipObj.extractall(path=extraction_dir)
# Process each extracted file
for root, dirs, files in os.walk(extraction_dir):
for name in files:
file_path = os.path.join(root, name)
relative_path = os.path.relpath(file_path, extraction_dir)
# Get file extension
_, file_extension = os.path.splitext(name)
# Update kwargs for the file
file_kwargs = kwargs.copy()
file_kwargs["file_extension"] = file_extension
file_kwargs["_parent_converters"] = parent_converters
# Try converting the file using available converters
for converter in parent_converters:
# Skip the zip converter to avoid infinite recursion
if isinstance(converter, ZipConverter):
continue
result = converter.convert(file_path, **file_kwargs)
if result is not None:
md_content += f"\n## File: {relative_path}\n\n"
md_content += result.text_content + "\n\n"
break
# Clean up extracted files if specified
if kwargs.get("cleanup_extracted", True):
shutil.rmtree(extraction_dir)
return DocumentConverterResult(title=None, text_content=md_content.strip())
except zipfile.BadZipFile:
return DocumentConverterResult(
title=None,
text_content=f"[ERROR] Invalid or corrupted zip file: {local_path}",
)
except ValueError as ve:
return DocumentConverterResult(
title=None,
text_content=f"[ERROR] Security error in zip file {local_path}: {str(ve)}",
)
except Exception as e:
return DocumentConverterResult(
title=None,
text_content=f"[ERROR] Failed to process zip file {local_path}: {str(e)}",
)