remove files

This commit is contained in:
rong-xyz 2025-04-21 08:43:19 +00:00
parent 9909ae13b8
commit 615975f918
18 changed files with 3 additions and 1799 deletions

View file

@ -51,10 +51,7 @@ docx = ["mammoth", "lxml"]
xlsx = ["pandas", "openpyxl"] xlsx = ["pandas", "openpyxl"]
xls = ["pandas", "xlrd"] xls = ["pandas", "xlrd"]
pdf = ["pdfminer.six"] pdf = ["pdfminer.six"]
outlook = ["olefile"]
audio-transcription = ["pydub", "SpeechRecognition"]
youtube-transcription = ["youtube-transcript-api"]
az-doc-intel = ["azure-ai-documentintelligence", "azure-identity"]
[tool.hatch.version] [tool.hatch.version]
path = "src/markitup/__about__.py" path = "src/markitup/__about__.py"

View file

@ -4,24 +4,11 @@
from ._plain_text_converter import PlainTextConverter from ._plain_text_converter import PlainTextConverter
from ._html_converter import HtmlConverter from ._html_converter import HtmlConverter
from ._rss_converter import RssConverter
from ._wikipedia_converter import WikipediaConverter
from ._youtube_converter import YouTubeConverter
from ._ipynb_converter import IpynbConverter
from ._bing_serp_converter import BingSerpConverter
from ._pdf_converter import PdfConverter from ._pdf_converter import PdfConverter
from ._docx_converter import DocxConverter from ._docx_converter import DocxConverter
from ._xlsx_converter import XlsxConverter, XlsConverter from ._xlsx_converter import XlsxConverter, XlsConverter
from ._pptx_converter import PptxConverter from ._pptx_converter import PptxConverter
from ._image_converter import ImageConverter
from ._audio_converter import AudioConverter from ._audio_converter import AudioConverter
from ._outlook_msg_converter import OutlookMsgConverter
from ._zip_converter import ZipConverter
from ._doc_intel_converter import (
DocumentIntelligenceConverter,
DocumentIntelligenceFileType,
)
from ._epub_converter import EpubConverter
from ._csv_converter import CsvConverter from ._csv_converter import CsvConverter
__all__ = [ __all__ = [

View file

@ -1,121 +0,0 @@
import io
import re
import base64
import binascii
from urllib.parse import parse_qs, urlparse
from typing import Any, BinaryIO, Optional
from bs4 import BeautifulSoup
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
from ._markdownify import _CustomMarkdownify
ACCEPTED_MIME_TYPE_PREFIXES = [
"text/html",
"application/xhtml",
]
ACCEPTED_FILE_EXTENSIONS = [
".html",
".htm",
]
class BingSerpConverter(DocumentConverter):
"""
Handle Bing results pages (only the organic search results).
NOTE: It is better to use the Bing API
"""
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
"""
Make sure we're dealing with HTML content *from* Bing.
"""
url = stream_info.url or ""
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if not re.search(r"^https://www\.bing\.com/search\?q=", url):
# Not a Bing SERP URL
return False
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
# Not HTML content
return False
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
assert stream_info.url is not None
# Parse the query parameters
parsed_params = parse_qs(urlparse(stream_info.url).query)
query = parsed_params.get("q", [""])[0]
# Parse the stream
encoding = "utf-8" if stream_info.charset is None else stream_info.charset
soup = BeautifulSoup(file_stream, "html.parser", from_encoding=encoding)
# Clean up some formatting
for tptt in soup.find_all(class_="tptt"):
if hasattr(tptt, "string") and tptt.string:
tptt.string += " "
for slug in soup.find_all(class_="algoSlug_icon"):
slug.extract()
# Parse the algorithmic results
_markdownify = _CustomMarkdownify(**kwargs)
results = list()
for result in soup.find_all(class_="b_algo"):
if not hasattr(result, "find_all"):
continue
# Rewrite redirect urls
for a in result.find_all("a", href=True):
parsed_href = urlparse(a["href"])
qs = parse_qs(parsed_href.query)
# The destination is contained in the u parameter,
# but appears to be base64 encoded, with some prefix
if "u" in qs:
u = (
qs["u"][0][2:].strip() + "=="
) # Python 3 doesn't care about extra padding
try:
# RFC 4648 / Base64URL" variant, which uses "-" and "_"
a["href"] = base64.b64decode(u, altchars="-_").decode("utf-8")
except UnicodeDecodeError:
pass
except binascii.Error:
pass
# Convert to markdown
md_result = _markdownify.convert_soup(result).strip()
lines = [line.strip() for line in re.split(r"\n+", md_result)]
results.append("\n".join([line for line in lines if len(line) > 0]))
webpage_text = (
f"## A Bing search for '{query}' found the following results:\n\n"
+ "\n\n".join(results)
)
return DocumentConverterResult(
markdown=webpage_text,
title=None if soup.title is None else soup.title.string,
)

View file

@ -1,250 +0,0 @@
import sys
import re
import os
from typing import BinaryIO, Any, List, Optional, Union
from enum import Enum
from ._html_converter import HtmlConverter
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE
# Try loading optional (but in this case, required) dependencies
# Save reporting of any exceptions for later
_dependency_exc_info = None
try:
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import (
AnalyzeDocumentRequest,
AnalyzeResult,
DocumentAnalysisFeature,
)
from azure.core.credentials import AzureKeyCredential, TokenCredential
from azure.identity import DefaultAzureCredential
except ImportError:
# Preserve the error and stack trace for later
_dependency_exc_info = sys.exc_info()
# Define these types for type hinting when the package is not available
class AzureKeyCredential:
pass
class TokenCredential:
pass
class DocumentIntelligenceClient:
pass
class AnalyzeDocumentRequest:
pass
class AnalyzeResult:
pass
class DocumentAnalysisFeature:
pass
class DefaultAzureCredential:
pass
# TODO: currently, there is a bug in the document intelligence SDK with importing the "ContentFormat" enum.
# This constant is a temporary fix until the bug is resolved.
CONTENT_FORMAT = "markdown"
class DocumentIntelligenceFileType(str, Enum):
"""Enum of file types supported by the Document Intelligence Converter."""
# No OCR
DOCX = "docx"
PPTX = "pptx"
XLSX = "xlsx"
HTML = "html"
# OCR
PDF = "pdf"
JPEG = "jpeg"
PNG = "png"
BMP = "bmp"
TIFF = "tiff"
def _get_mime_type_prefixes(types: List[DocumentIntelligenceFileType]) -> List[str]:
"""Get the MIME type prefixes for the given file types."""
prefixes: List[str] = []
for type_ in types:
if type_ == DocumentIntelligenceFileType.DOCX:
prefixes.append(
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"
)
elif type_ == DocumentIntelligenceFileType.PPTX:
prefixes.append(
"application/vnd.openxmlformats-officedocument.presentationml"
)
elif type_ == DocumentIntelligenceFileType.XLSX:
prefixes.append(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
elif type_ == DocumentIntelligenceFileType.PDF:
prefixes.append("application/pdf")
prefixes.append("application/x-pdf")
elif type_ == DocumentIntelligenceFileType.JPEG:
prefixes.append("image/jpeg")
elif type_ == DocumentIntelligenceFileType.PNG:
prefixes.append("image/png")
elif type_ == DocumentIntelligenceFileType.BMP:
prefixes.append("image/bmp")
elif type_ == DocumentIntelligenceFileType.TIFF:
prefixes.append("image/tiff")
return prefixes
def _get_file_extensions(types: List[DocumentIntelligenceFileType]) -> List[str]:
"""Get the file extensions for the given file types."""
extensions: List[str] = []
for type_ in types:
if type_ == DocumentIntelligenceFileType.DOCX:
extensions.append(".docx")
elif type_ == DocumentIntelligenceFileType.PPTX:
extensions.append(".pptx")
elif type_ == DocumentIntelligenceFileType.XLSX:
extensions.append(".xlsx")
elif type_ == DocumentIntelligenceFileType.PDF:
extensions.append(".pdf")
elif type_ == DocumentIntelligenceFileType.JPEG:
extensions.append(".jpg")
extensions.append(".jpeg")
elif type_ == DocumentIntelligenceFileType.PNG:
extensions.append(".png")
elif type_ == DocumentIntelligenceFileType.BMP:
extensions.append(".bmp")
elif type_ == DocumentIntelligenceFileType.TIFF:
extensions.append(".tiff")
return extensions
class DocumentIntelligenceConverter(DocumentConverter):
"""Specialized DocumentConverter that uses Document Intelligence to extract text from documents."""
def __init__(
self,
*,
endpoint: str,
api_version: str = "2024-07-31-preview",
credential: AzureKeyCredential | TokenCredential | None = None,
file_types: List[DocumentIntelligenceFileType] = [
DocumentIntelligenceFileType.DOCX,
DocumentIntelligenceFileType.PPTX,
DocumentIntelligenceFileType.XLSX,
DocumentIntelligenceFileType.PDF,
DocumentIntelligenceFileType.JPEG,
DocumentIntelligenceFileType.PNG,
DocumentIntelligenceFileType.BMP,
DocumentIntelligenceFileType.TIFF,
],
):
"""
Initialize the DocumentIntelligenceConverter.
Args:
endpoint (str): The endpoint for the Document Intelligence service.
api_version (str): The API version to use. Defaults to "2024-07-31-preview".
credential (AzureKeyCredential | TokenCredential | None): The credential to use for authentication.
file_types (List[DocumentIntelligenceFileType]): The file types to accept. Defaults to all supported file types.
"""
super().__init__()
self._file_types = file_types
# Raise an error if the dependencies are not available.
# This is different than other converters since this one isn't even instantiated
# unless explicitly requested.
if _dependency_exc_info is not None:
raise MissingDependencyException(
"DocumentIntelligenceConverter requires the optional dependency [az-doc-intel] (or [all]) to be installed. E.g., `pip install markitup[az-doc-intel]`"
) from _dependency_exc_info[
1
].with_traceback( # type: ignore[union-attr]
_dependency_exc_info[2]
)
if credential is None:
if os.environ.get("AZURE_API_KEY") is None:
credential = DefaultAzureCredential()
else:
credential = AzureKeyCredential(os.environ["AZURE_API_KEY"])
self.endpoint = endpoint
self.api_version = api_version
self.doc_intel_client = DocumentIntelligenceClient(
endpoint=self.endpoint,
api_version=self.api_version,
credential=credential,
)
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if extension in _get_file_extensions(self._file_types):
return True
for prefix in _get_mime_type_prefixes(self._file_types):
if mimetype.startswith(prefix):
return True
return False
def _analysis_features(self, stream_info: StreamInfo) -> List[str]:
"""
Helper needed to determine which analysis features to use.
Certain document analysis features are not availiable for
office filetypes (.xlsx, .pptx, .html, .docx)
"""
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
# Types that don't support ocr
no_ocr_types = [
DocumentIntelligenceFileType.DOCX,
DocumentIntelligenceFileType.PPTX,
DocumentIntelligenceFileType.XLSX,
DocumentIntelligenceFileType.HTML,
]
if extension in _get_file_extensions(no_ocr_types):
return []
for prefix in _get_mime_type_prefixes(no_ocr_types):
if mimetype.startswith(prefix):
return []
return [
DocumentAnalysisFeature.FORMULAS, # enable formula extraction
DocumentAnalysisFeature.OCR_HIGH_RESOLUTION, # enable high resolution OCR
DocumentAnalysisFeature.STYLE_FONT, # enable font style extraction
]
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
# Extract the text using Azure Document Intelligence
poller = self.doc_intel_client.begin_analyze_document(
model_id="prebuilt-layout",
body=AnalyzeDocumentRequest(bytes_source=file_stream.read()),
features=self._analysis_features(stream_info),
output_content_format=CONTENT_FORMAT, # TODO: replace with "ContentFormat.MARKDOWN" when the bug is fixed
)
result: AnalyzeResult = poller.result()
# remove comments from the markdown content generated by Doc Intelligence and append to markdown string
markdown_text = re.sub(r"<!--.*?-->", "", result.content, flags=re.DOTALL)
return DocumentConverterResult(markdown=markdown_text)

View file

@ -1,147 +0,0 @@
import os
import zipfile
import xml.dom.minidom as minidom
from typing import BinaryIO, Any, Dict, List
from ._html_converter import HtmlConverter
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
ACCEPTED_MIME_TYPE_PREFIXES = [
"application/epub",
"application/epub+zip",
"application/x-epub+zip",
]
ACCEPTED_FILE_EXTENSIONS = [".epub"]
MIME_TYPE_MAPPING = {
".html": "text/html",
".xhtml": "application/xhtml+xml",
}
class EpubConverter(HtmlConverter):
"""
Converts EPUB files to Markdown. Style information (e.g.m headings) and tables are preserved where possible.
"""
def __init__(self):
super().__init__()
self._html_converter = HtmlConverter()
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
return False
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
with zipfile.ZipFile(file_stream, "r") as z:
# Extracts metadata (title, authors, language, publisher, date, description, cover) from an EPUB file."""
# Locate content.opf
container_dom = minidom.parse(z.open("META-INF/container.xml"))
opf_path = container_dom.getElementsByTagName("rootfile")[0].getAttribute(
"full-path"
)
# Parse content.opf
opf_dom = minidom.parse(z.open(opf_path))
metadata: Dict[str, Any] = {
"title": self._get_text_from_node(opf_dom, "dc:title"),
"authors": self._get_all_texts_from_nodes(opf_dom, "dc:creator"),
"language": self._get_text_from_node(opf_dom, "dc:language"),
"publisher": self._get_text_from_node(opf_dom, "dc:publisher"),
"date": self._get_text_from_node(opf_dom, "dc:date"),
"description": self._get_text_from_node(opf_dom, "dc:description"),
"identifier": self._get_text_from_node(opf_dom, "dc:identifier"),
}
# Extract manifest items (ID → href mapping)
manifest = {
item.getAttribute("id"): item.getAttribute("href")
for item in opf_dom.getElementsByTagName("item")
}
# Extract spine order (ID refs)
spine_items = opf_dom.getElementsByTagName("itemref")
spine_order = [item.getAttribute("idref") for item in spine_items]
# Convert spine order to actual file paths
base_path = "/".join(
opf_path.split("/")[:-1]
) # Get base directory of content.opf
spine = [
f"{base_path}/{manifest[item_id]}" if base_path else manifest[item_id]
for item_id in spine_order
if item_id in manifest
]
# Extract and convert the content
markdown_content: List[str] = []
for file in spine:
if file in z.namelist():
with z.open(file) as f:
filename = os.path.basename(file)
extension = os.path.splitext(filename)[1].lower()
mimetype = MIME_TYPE_MAPPING.get(extension)
converted_content = self._html_converter.convert(
f,
StreamInfo(
mimetype=mimetype,
extension=extension,
filename=filename,
),
)
markdown_content.append(converted_content.markdown.strip())
# Format and add the metadata
metadata_markdown = []
for key, value in metadata.items():
if isinstance(value, list):
value = ", ".join(value)
if value:
metadata_markdown.append(f"**{key.capitalize()}:** {value}")
markdown_content.insert(0, "\n".join(metadata_markdown))
return DocumentConverterResult(
markdown="\n\n".join(markdown_content), title=metadata["title"]
)
def _get_text_from_node(self, dom: minidom.Document, tag_name: str) -> str | None:
"""Convenience function to extract a single occurrence of a tag (e.g., title)."""
texts = self._get_all_texts_from_nodes(dom, tag_name)
if len(texts) > 0:
return texts[0]
else:
return None
def _get_all_texts_from_nodes(
self, dom: minidom.Document, tag_name: str
) -> List[str]:
"""Helper function to extract all occurrences of a tag (e.g., multiple authors)."""
texts: List[str] = []
for node in dom.getElementsByTagName(tag_name):
if node.firstChild and hasattr(node.firstChild, "nodeValue"):
texts.append(node.firstChild.nodeValue.strip())
return texts

View file

@ -1,138 +0,0 @@
from typing import BinaryIO, Any, Union
import base64
import mimetypes
from ._exiftool import exiftool_metadata
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
ACCEPTED_MIME_TYPE_PREFIXES = [
"image/jpeg",
"image/png",
]
ACCEPTED_FILE_EXTENSIONS = [".jpg", ".jpeg", ".png"]
class ImageConverter(DocumentConverter):
"""
Converts images to markdown via extraction of metadata (if `exiftool` is installed), and description via a multimodal LLM (if an llm_client is configured).
"""
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any,
) -> bool:
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
return False
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
md_content = ""
# Add metadata
metadata = exiftool_metadata(
file_stream, exiftool_path=kwargs.get("exiftool_path")
)
if metadata:
for f in [
"ImageSize",
"Title",
"Caption",
"Description",
"Keywords",
"Artist",
"Author",
"DateTimeOriginal",
"CreateDate",
"GPSPosition",
]:
if f in metadata:
md_content += f"{f}: {metadata[f]}\n"
# Try describing the image with GPT
llm_client = kwargs.get("llm_client")
llm_model = kwargs.get("llm_model")
if llm_client is not None and llm_model is not None:
llm_description = self._get_llm_description(
file_stream,
stream_info,
client=llm_client,
model=llm_model,
prompt=kwargs.get("llm_prompt"),
)
if llm_description is not None:
md_content += "\n# Description:\n" + llm_description.strip() + "\n"
return DocumentConverterResult(
markdown=md_content,
)
def _get_llm_description(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
*,
client,
model,
prompt=None,
) -> Union[None, str]:
if prompt is None or prompt.strip() == "":
prompt = "Write a detailed caption for this image."
# Get the content type
content_type = stream_info.mimetype
if not content_type:
content_type, _ = mimetypes.guess_type(
"_dummy" + (stream_info.extension or "")
)
if not content_type:
content_type = "application/octet-stream"
# Convert to base64
cur_pos = file_stream.tell()
try:
base64_image = base64.b64encode(file_stream.read()).decode("utf-8")
except Exception as e:
return None
finally:
file_stream.seek(cur_pos)
# Prepare the data-uri
data_uri = f"data:{content_type};base64,{base64_image}"
# Prepare the OpenAI API request
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": data_uri,
},
},
],
}
]
# Call the OpenAI API
response = client.chat.completions.create(model=model, messages=messages)
return response.choices[0].message.content

View file

@ -1,98 +0,0 @@
from typing import BinaryIO, Any
import json
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._exceptions import FileConversionException
from .._stream_info import StreamInfo
CANDIDATE_MIME_TYPE_PREFIXES = [
"application/json",
]
ACCEPTED_FILE_EXTENSIONS = [".ipynb"]
class IpynbConverter(DocumentConverter):
"""Converts Jupyter Notebook (.ipynb) files to Markdown."""
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in CANDIDATE_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
# Read further to see if it's a notebook
cur_pos = file_stream.tell()
try:
encoding = stream_info.charset or "utf-8"
notebook_content = file_stream.read().decode(encoding)
return (
"nbformat" in notebook_content
and "nbformat_minor" in notebook_content
)
finally:
file_stream.seek(cur_pos)
return False
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
# Parse and convert the notebook
result = None
encoding = stream_info.charset or "utf-8"
notebook_content = file_stream.read().decode(encoding=encoding)
return self._convert(json.loads(notebook_content))
def _convert(self, notebook_content: dict) -> DocumentConverterResult:
"""Helper function that converts notebook JSON content to Markdown."""
try:
md_output = []
title = None
for cell in notebook_content.get("cells", []):
cell_type = cell.get("cell_type", "")
source_lines = cell.get("source", [])
if cell_type == "markdown":
md_output.append("".join(source_lines))
# Extract the first # heading as title if not already found
if title is None:
for line in source_lines:
if line.startswith("# "):
title = line.lstrip("# ").strip()
break
elif cell_type == "code":
# Code cells are wrapped in Markdown code blocks
md_output.append(f"```python\n{''.join(source_lines)}\n```")
elif cell_type == "raw":
md_output.append(f"```\n{''.join(source_lines)}\n```")
md_text = "\n\n".join(md_output)
# Check for title in notebook metadata
title = notebook_content.get("metadata", {}).get("title", title)
return DocumentConverterResult(
markdown=md_text,
title=title,
)
except Exception as e:
raise FileConversionException(
f"Error converting .ipynb file: {str(e)}"
) from e

View file

@ -1,50 +0,0 @@
from typing import BinaryIO, Any, Union
import base64
import mimetypes
from .._stream_info import StreamInfo
def llm_caption(
file_stream: BinaryIO, stream_info: StreamInfo, *, client, model, prompt=None
) -> Union[None, str]:
if prompt is None or prompt.strip() == "":
prompt = "Write a detailed caption for this image."
# Get the content type
content_type = stream_info.mimetype
if not content_type:
content_type, _ = mimetypes.guess_type("_dummy" + (stream_info.extension or ""))
if not content_type:
content_type = "application/octet-stream"
# Convert to base64
cur_pos = file_stream.tell()
try:
base64_image = base64.b64encode(file_stream.read()).decode("utf-8")
except Exception as e:
return None
finally:
file_stream.seek(cur_pos)
# Prepare the data-uri
data_uri = f"data:{content_type};base64,{base64_image}"
# Prepare the OpenAI API request
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": data_uri,
},
},
],
}
]
# Call the OpenAI API
response = client.chat.completions.create(model=model, messages=messages)
return response.choices[0].message.content

View file

@ -1,111 +0,0 @@
import re
import markdownify
from typing import Any, Optional
from urllib.parse import quote, unquote, urlparse, urlunparse
class _CustomMarkdownify(markdownify.MarkdownConverter):
"""
A custom version of markdownify's MarkdownConverter. Changes include:
- Altering the default heading style to use '#', '##', etc.
- Removing javascript hyperlinks.
- Truncating images with large data:uri sources.
- Ensuring URIs are properly escaped, and do not conflict with Markdown syntax
"""
def __init__(self, **options: Any):
options["heading_style"] = options.get("heading_style", markdownify.ATX)
options["keep_data_uris"] = options.get("keep_data_uris", False)
# Explicitly cast options to the expected type if necessary
super().__init__(**options)
def convert_hn(
self,
n: int,
el: Any,
text: str,
convert_as_inline: Optional[bool] = False,
**kwargs,
) -> str:
"""Same as usual, but be sure to start with a new line"""
if not convert_as_inline:
if not re.search(r"^\n", text):
return "\n" + super().convert_hn(n, el, text, convert_as_inline) # type: ignore
return super().convert_hn(n, el, text, convert_as_inline) # type: ignore
def convert_a(
self,
el: Any,
text: str,
convert_as_inline: Optional[bool] = False,
**kwargs,
):
"""Same as usual converter, but removes Javascript links and escapes URIs."""
prefix, suffix, text = markdownify.chomp(text) # type: ignore
if not text:
return ""
if el.find_parent("pre") is not None:
return text
href = el.get("href")
title = el.get("title")
# Escape URIs and skip non-http or file schemes
if href:
try:
parsed_url = urlparse(href) # type: ignore
if parsed_url.scheme and parsed_url.scheme.lower() not in ["http", "https", "file"]: # type: ignore
return "%s%s%s" % (prefix, text, suffix)
href = urlunparse(parsed_url._replace(path=quote(unquote(parsed_url.path)))) # type: ignore
except ValueError: # It's not clear if this ever gets thrown
return "%s%s%s" % (prefix, text, suffix)
# For the replacement see #29: text nodes underscores are escaped
if (
self.options["autolinks"]
and text.replace(r"\_", "_") == href
and not title
and not self.options["default_title"]
):
# Shortcut syntax
return "<%s>" % href
if self.options["default_title"] and not title:
title = href
title_part = ' "%s"' % title.replace('"', r"\"") if title else ""
return (
"%s[%s](%s%s)%s" % (prefix, text, href, title_part, suffix)
if href
else text
)
def convert_img(
self,
el: Any,
text: str,
convert_as_inline: Optional[bool] = False,
**kwargs,
) -> str:
"""Same as usual converter, but removes data URIs"""
alt = el.attrs.get("alt", None) or ""
src = el.attrs.get("src", None) or ""
title = el.attrs.get("title", None) or ""
title_part = ' "%s"' % title.replace('"', r"\"") if title else ""
if (
convert_as_inline
and el.parent.name not in self.options["keep_inline_images_in"]
):
return alt
# Remove dataURIs
if src.startswith("data:") and not self.options["keep_data_uris"]:
src = src.split(",")[0] + "..."
return "![%s](%s%s)" % (alt, src, title_part)
def convert_soup(self, soup: Any) -> str:
return super().convert_soup(soup) # type: ignore

View file

@ -1,149 +0,0 @@
import sys
from typing import Any, Union, BinaryIO
from .._stream_info import StreamInfo
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE
# Try loading optional (but in this case, required) dependencies
# Save reporting of any exceptions for later
_dependency_exc_info = None
olefile = None
try:
import olefile # type: ignore[no-redef]
except ImportError:
# Preserve the error and stack trace for later
_dependency_exc_info = sys.exc_info()
ACCEPTED_MIME_TYPE_PREFIXES = [
"application/vnd.ms-outlook",
]
ACCEPTED_FILE_EXTENSIONS = [".msg"]
class OutlookMsgConverter(DocumentConverter):
"""Converts Outlook .msg files to markdown by extracting email metadata and content.
Uses the olefile package to parse the .msg file structure and extract:
- Email headers (From, To, Subject)
- Email body content
"""
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
# Check the extension and mimetype
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
# Brute force, check if we have an OLE file
cur_pos = file_stream.tell()
try:
if olefile and not olefile.isOleFile(file_stream):
return False
finally:
file_stream.seek(cur_pos)
# Brue force, check if it's an Outlook file
try:
if olefile is not None:
msg = olefile.OleFileIO(file_stream)
toc = "\n".join([str(stream) for stream in msg.listdir()])
return (
"__properties_version1.0" in toc
and "__recip_version1.0_#00000000" in toc
)
except Exception as e:
pass
finally:
file_stream.seek(cur_pos)
return False
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
# Check: the dependencies
if _dependency_exc_info is not None:
raise MissingDependencyException(
MISSING_DEPENDENCY_MESSAGE.format(
converter=type(self).__name__,
extension=".msg",
feature="outlook",
)
) from _dependency_exc_info[
1
].with_traceback( # type: ignore[union-attr]
_dependency_exc_info[2]
)
assert (
olefile is not None
) # If we made it this far, olefile should be available
msg = olefile.OleFileIO(file_stream)
# Extract email metadata
md_content = "# Email Message\n\n"
# Get headers
headers = {
"From": self._get_stream_data(msg, "__substg1.0_0C1F001F"),
"To": self._get_stream_data(msg, "__substg1.0_0E04001F"),
"Subject": self._get_stream_data(msg, "__substg1.0_0037001F"),
}
# Add headers to markdown
for key, value in headers.items():
if value:
md_content += f"**{key}:** {value}\n"
md_content += "\n## Content\n\n"
# Get email body
body = self._get_stream_data(msg, "__substg1.0_1000001F")
if body:
md_content += body
msg.close()
return DocumentConverterResult(
markdown=md_content.strip(),
title=headers.get("Subject"),
)
def _get_stream_data(self, msg: Any, stream_path: str) -> Union[str, None]:
"""Helper to safely extract and decode stream data from the MSG file."""
assert olefile is not None
assert isinstance(
msg, olefile.OleFileIO
) # Ensure msg is of the correct type (type hinting is not possible with the optional olefile package)
try:
if msg.exists(stream_path):
data = msg.openstream(stream_path).read()
# Try UTF-16 first (common for .msg files)
try:
return data.decode("utf-16-le").strip()
except UnicodeDecodeError:
# Fall back to UTF-8
try:
return data.decode("utf-8").strip()
except UnicodeDecodeError:
# Last resort - ignore errors
return data.decode("utf-8", errors="ignore").strip()
except Exception:
pass
return None

View file

@ -5,15 +5,6 @@ from charset_normalizer import from_bytes
from .._base_converter import DocumentConverter, DocumentConverterResult from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo from .._stream_info import StreamInfo
# Try loading optional (but in this case, required) dependencies
# Save reporting of any exceptions for later
_dependency_exc_info = None
try:
import mammoth
except ImportError:
# Preserve the error and stack trace for later
_dependency_exc_info = sys.exc_info()
ACCEPTED_MIME_TYPE_PREFIXES = [ ACCEPTED_MIME_TYPE_PREFIXES = [
"text/", "text/",
"application/json", "application/json",

View file

@ -9,7 +9,6 @@ from typing import BinaryIO, Any
from operator import attrgetter from operator import attrgetter
from ._html_converter import HtmlConverter from ._html_converter import HtmlConverter
from ._llm_caption import llm_caption
from .._base_converter import DocumentConverter, DocumentConverterResult from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo from .._stream_info import StreamInfo
from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE
@ -95,39 +94,8 @@ class PptxConverter(DocumentConverter):
if self._is_picture(shape): if self._is_picture(shape):
# https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069 # https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069
llm_description = ""
alt_text = "" alt_text = ""
# Potentially generate a description using an LLM
llm_client = kwargs.get("llm_client")
llm_model = kwargs.get("llm_model")
if llm_client is not None and llm_model is not None:
# Prepare a file_stream and stream_info for the image data
image_filename = shape.image.filename
image_extension = None
if image_filename:
image_extension = os.path.splitext(image_filename)[1]
image_stream_info = StreamInfo(
mimetype=shape.image.content_type,
extension=image_extension,
filename=image_filename,
)
image_stream = io.BytesIO(shape.image.blob)
# Caption the image
try:
llm_description = llm_caption(
image_stream,
image_stream_info,
client=llm_client,
model=llm_model,
prompt=kwargs.get("llm_prompt"),
)
except Exception:
# Unable to generate a description
pass
# Also grab any description embedded in the deck # Also grab any description embedded in the deck
try: try:
alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "") alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "")
@ -136,7 +104,7 @@ class PptxConverter(DocumentConverter):
pass pass
# Prepare the alt, escaping any special characters # Prepare the alt, escaping any special characters
alt_text = "\n".join([llm_description, alt_text]) or shape.name alt_text = "\n".join([alt_text]) or shape.name
alt_text = re.sub(r"[\r\n\[\]]", " ", alt_text) alt_text = re.sub(r"[\r\n\[\]]", " ", alt_text)
alt_text = re.sub(r"\s+", " ", alt_text).strip() alt_text = re.sub(r"\s+", " ", alt_text).strip()

View file

@ -1,191 +0,0 @@
from xml.dom import minidom
from typing import BinaryIO, Any, Union
from bs4 import BeautifulSoup
from ._markdownify import _CustomMarkdownify
from .._stream_info import StreamInfo
from .._base_converter import DocumentConverter, DocumentConverterResult
PRECISE_MIME_TYPE_PREFIXES = [
"application/rss",
"application/rss+xml",
"application/atom",
"application/atom+xml",
]
PRECISE_FILE_EXTENSIONS = [".rss", ".atom"]
CANDIDATE_MIME_TYPE_PREFIXES = [
"text/xml",
"application/xml",
]
CANDIDATE_FILE_EXTENSIONS = [
".xml",
]
class RssConverter(DocumentConverter):
"""Convert RSS / Atom type to markdown"""
def __init__(self):
super().__init__()
self._kwargs = {}
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
# Check for precise mimetypes and file extensions
if extension in PRECISE_FILE_EXTENSIONS:
return True
for prefix in PRECISE_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
# Check for precise mimetypes and file extensions
if extension in CANDIDATE_FILE_EXTENSIONS:
return self._check_xml(file_stream)
for prefix in CANDIDATE_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return self._check_xml(file_stream)
return False
def _check_xml(self, file_stream: BinaryIO) -> bool:
cur_pos = file_stream.tell()
try:
doc = minidom.parse(file_stream)
return self._feed_type(doc) is not None
except BaseException as _:
pass
finally:
file_stream.seek(cur_pos)
return False
def _feed_type(self, doc: Any) -> str | None:
if doc.getElementsByTagName("rss"):
return "rss"
elif doc.getElementsByTagName("feed"):
root = doc.getElementsByTagName("feed")[0]
if root.getElementsByTagName("entry"):
# An Atom feed must have a root element of <feed> and at least one <entry>
return "atom"
return None
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
self._kwargs = kwargs
doc = minidom.parse(file_stream)
feed_type = self._feed_type(doc)
if feed_type == "rss":
return self._parse_rss_type(doc)
elif feed_type == "atom":
return self._parse_atom_type(doc)
else:
raise ValueError("Unknown feed type")
def _parse_atom_type(self, doc: minidom.Document) -> DocumentConverterResult:
"""Parse the type of an Atom feed.
Returns None if the feed type is not recognized or something goes wrong.
"""
root = doc.getElementsByTagName("feed")[0]
title = self._get_data_by_tag_name(root, "title")
subtitle = self._get_data_by_tag_name(root, "subtitle")
entries = root.getElementsByTagName("entry")
md_text = f"# {title}\n"
if subtitle:
md_text += f"{subtitle}\n"
for entry in entries:
entry_title = self._get_data_by_tag_name(entry, "title")
entry_summary = self._get_data_by_tag_name(entry, "summary")
entry_updated = self._get_data_by_tag_name(entry, "updated")
entry_content = self._get_data_by_tag_name(entry, "content")
if entry_title:
md_text += f"\n## {entry_title}\n"
if entry_updated:
md_text += f"Updated on: {entry_updated}\n"
if entry_summary:
md_text += self._parse_content(entry_summary)
if entry_content:
md_text += self._parse_content(entry_content)
return DocumentConverterResult(
markdown=md_text,
title=title,
)
def _parse_rss_type(self, doc: minidom.Document) -> DocumentConverterResult:
"""Parse the type of an RSS feed.
Returns None if the feed type is not recognized or something goes wrong.
"""
root = doc.getElementsByTagName("rss")[0]
channel_list = root.getElementsByTagName("channel")
if not channel_list:
raise ValueError("No channel found in RSS feed")
channel = channel_list[0]
channel_title = self._get_data_by_tag_name(channel, "title")
channel_description = self._get_data_by_tag_name(channel, "description")
items = channel.getElementsByTagName("item")
if channel_title:
md_text = f"# {channel_title}\n"
if channel_description:
md_text += f"{channel_description}\n"
for item in items:
title = self._get_data_by_tag_name(item, "title")
description = self._get_data_by_tag_name(item, "description")
pubDate = self._get_data_by_tag_name(item, "pubDate")
content = self._get_data_by_tag_name(item, "content:encoded")
if title:
md_text += f"\n## {title}\n"
if pubDate:
md_text += f"Published on: {pubDate}\n"
if description:
md_text += self._parse_content(description)
if content:
md_text += self._parse_content(content)
return DocumentConverterResult(
markdown=md_text,
title=channel_title,
)
def _parse_content(self, content: str) -> str:
"""Parse the content of an RSS feed item"""
try:
# using bs4 because many RSS feeds have HTML-styled content
soup = BeautifulSoup(content, "html.parser")
return _CustomMarkdownify(**self._kwargs).convert_soup(soup)
except BaseException as _:
return content
def _get_data_by_tag_name(
self, element: minidom.Element, tag_name: str
) -> Union[str, None]:
"""Get data from first child element with the given tag name.
Returns None when no such element is found.
"""
nodes = element.getElementsByTagName(tag_name)
if not nodes:
return None
fc = nodes[0].firstChild
if fc:
if hasattr(fc, "data"):
return fc.data
return None

View file

@ -1,49 +0,0 @@
import io
import sys
from typing import BinaryIO
from .._exceptions import MissingDependencyException
# Try loading optional (but in this case, required) dependencies
# Save reporting of any exceptions for later
_dependency_exc_info = None
try:
# Suppress some warnings on library import
import warnings
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=SyntaxWarning)
import speech_recognition as sr
import pydub
except ImportError:
# Preserve the error and stack trace for later
_dependency_exc_info = sys.exc_info()
def transcribe_audio(file_stream: BinaryIO, *, audio_format: str = "wav") -> str:
# Check for installed dependencies
if _dependency_exc_info is not None:
raise MissingDependencyException(
"Speech transcription requires installing MarkItdown with the [audio-transcription] optional dependencies. E.g., `pip install markitup[audio-transcription]` or `pip install markitup[all]`"
) from _dependency_exc_info[
1
].with_traceback( # type: ignore[union-attr]
_dependency_exc_info[2]
)
if audio_format in ["wav", "aiff", "flac"]:
audio_source = file_stream
elif audio_format in ["mp3", "mp4"]:
audio_segment = pydub.AudioSegment.from_file(file_stream, format=audio_format)
audio_source = io.BytesIO()
audio_segment.export(audio_source, format="wav")
audio_source.seek(0)
else:
raise ValueError(f"Unsupported audio format: {audio_format}")
recognizer = sr.Recognizer()
with sr.AudioFile(audio_source) as source:
audio = recognizer.record(source)
transcript = recognizer.recognize_google(audio).strip()
return "[No speech detected]" if transcript == "" else transcript

View file

@ -1,88 +0,0 @@
import io
import re
import bs4
from typing import Any, BinaryIO, Optional
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
from ._markdownify import _CustomMarkdownify
ACCEPTED_MIME_TYPE_PREFIXES = [
"text/html",
"application/xhtml",
]
ACCEPTED_FILE_EXTENSIONS = [
".html",
".htm",
]
class WikipediaConverter(DocumentConverter):
"""Handle Wikipedia pages separately, focusing only on the main document content."""
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
"""
Make sure we're dealing with HTML content *from* Wikipedia.
"""
url = stream_info.url or ""
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url):
# Not a Wikipedia URL
return False
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
# Not HTML content
return False
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
# Parse the stream
encoding = "utf-8" if stream_info.charset is None else stream_info.charset
soup = bs4.BeautifulSoup(file_stream, "html.parser", from_encoding=encoding)
# Remove javascript and style blocks
for script in soup(["script", "style"]):
script.extract()
# Print only the main content
body_elm = soup.find("div", {"id": "mw-content-text"})
title_elm = soup.find("span", {"class": "mw-page-title-main"})
webpage_text = ""
main_title = None if soup.title is None else soup.title.string
if body_elm:
# What's the title
if title_elm and isinstance(title_elm, bs4.Tag):
main_title = title_elm.string
# Convert the page
webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify(
**kwargs
).convert_soup(body_elm)
else:
webpage_text = _CustomMarkdownify(**kwargs).convert_soup(soup)
return DocumentConverterResult(
markdown=webpage_text,
title=main_title,
)

View file

@ -1,224 +0,0 @@
import sys
import json
import time
import io
import re
import bs4
from typing import Any, BinaryIO, Optional, Dict, List, Union
from urllib.parse import parse_qs, urlparse, unquote
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
# Optional YouTube transcription support
try:
# Suppress some warnings on library import
import warnings
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=SyntaxWarning)
# Patch submitted upstream to fix the SyntaxWarning
from youtube_transcript_api import YouTubeTranscriptApi
IS_YOUTUBE_TRANSCRIPT_CAPABLE = True
except ModuleNotFoundError:
IS_YOUTUBE_TRANSCRIPT_CAPABLE = False
ACCEPTED_MIME_TYPE_PREFIXES = [
"text/html",
"application/xhtml",
]
ACCEPTED_FILE_EXTENSIONS = [
".html",
".htm",
]
class YouTubeConverter(DocumentConverter):
"""Handle YouTube specially, focusing on the video title, description, and transcript."""
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
"""
Make sure we're dealing with HTML content *from* YouTube.
"""
url = stream_info.url or ""
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
url = unquote(url)
url = url.replace(r"\?", "?").replace(r"\=", "=")
if not url.startswith("https://www.youtube.com/watch?"):
# Not a YouTube URL
return False
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
# Not HTML content
return False
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
# Parse the stream
encoding = "utf-8" if stream_info.charset is None else stream_info.charset
soup = bs4.BeautifulSoup(file_stream, "html.parser", from_encoding=encoding)
# Read the meta tags
metadata: Dict[str, str] = {}
if soup.title and soup.title.string:
metadata["title"] = soup.title.string
for meta in soup(["meta"]):
if not isinstance(meta, bs4.Tag):
continue
for a in meta.attrs:
if a in ["itemprop", "property", "name"]:
key = str(meta.get(a, ""))
content = str(meta.get("content", ""))
if key and content: # Only add non-empty content
metadata[key] = content
break
# Try reading the description
try:
for script in soup(["script"]):
if not isinstance(script, bs4.Tag):
continue
if not script.string: # Skip empty scripts
continue
content = script.string
if "ytInitialData" in content:
match = re.search(r"var ytInitialData = ({.*?});", content)
if match:
data = json.loads(match.group(1))
attrdesc = self._findKey(data, "attributedDescriptionBodyText")
if attrdesc and isinstance(attrdesc, dict):
metadata["description"] = str(attrdesc.get("content", ""))
break
except Exception as e:
print(f"Error extracting description: {e}")
pass
# Start preparing the page
webpage_text = "# YouTube\n"
title = self._get(metadata, ["title", "og:title", "name"]) # type: ignore
assert isinstance(title, str)
if title:
webpage_text += f"\n## {title}\n"
stats = ""
views = self._get(metadata, ["interactionCount"]) # type: ignore
if views:
stats += f"- **Views:** {views}\n"
keywords = self._get(metadata, ["keywords"]) # type: ignore
if keywords:
stats += f"- **Keywords:** {keywords}\n"
runtime = self._get(metadata, ["duration"]) # type: ignore
if runtime:
stats += f"- **Runtime:** {runtime}\n"
if len(stats) > 0:
webpage_text += f"\n### Video Metadata\n{stats}\n"
description = self._get(metadata, ["description", "og:description"]) # type: ignore
if description:
webpage_text += f"\n### Description\n{description}\n"
if IS_YOUTUBE_TRANSCRIPT_CAPABLE:
ytt_api = YouTubeTranscriptApi()
transcript_text = ""
parsed_url = urlparse(stream_info.url) # type: ignore
params = parse_qs(parsed_url.query) # type: ignore
if "v" in params and params["v"][0]:
video_id = str(params["v"][0])
try:
youtube_transcript_languages = kwargs.get(
"youtube_transcript_languages", ("en",)
)
# Retry the transcript fetching operation
transcript = self._retry_operation(
lambda: ytt_api.fetch(
video_id, languages=youtube_transcript_languages
),
retries=3, # Retry 3 times
delay=2, # 2 seconds delay between retries
)
if transcript:
transcript_text = " ".join(
[part.text for part in transcript]
) # type: ignore
except Exception as e:
print(f"Error fetching transcript: {e}")
if transcript_text:
webpage_text += f"\n### Transcript\n{transcript_text}\n"
title = title if title else (soup.title.string if soup.title else "")
assert isinstance(title, str)
return DocumentConverterResult(
markdown=webpage_text,
title=title,
)
def _get(
self,
metadata: Dict[str, str],
keys: List[str],
default: Union[str, None] = None,
) -> Union[str, None]:
"""Get first non-empty value from metadata matching given keys."""
for k in keys:
if k in metadata:
return metadata[k]
return default
def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json type
"""Recursively search for a key in nested dictionary/list structures."""
if isinstance(json, list):
for elm in json:
ret = self._findKey(elm, key)
if ret is not None:
return ret
elif isinstance(json, dict):
for k, v in json.items():
if k == key:
return json[k]
if result := self._findKey(v, key):
return result
return None
def _retry_operation(self, operation, retries=3, delay=2):
"""Retries the operation if it fails."""
attempt = 0
while attempt < retries:
try:
return operation() # Attempt the operation
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
time.sleep(delay) # Wait before retrying
attempt += 1
# If all attempts fail, raise the last exception
raise Exception(f"Operation failed after {retries} attempts.")

View file

@ -1,117 +0,0 @@
import sys
import zipfile
import io
import os
from typing import BinaryIO, Any, TYPE_CHECKING
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
from .._exceptions import UnsupportedFormatException, FileConversionException
# Break otherwise circular import for type hinting
if TYPE_CHECKING:
from .._markitup import MarkItUp
ACCEPTED_MIME_TYPE_PREFIXES = [
"application/zip",
]
ACCEPTED_FILE_EXTENSIONS = [".zip"]
class ZipConverter(DocumentConverter):
"""Converts ZIP files to markdown by extracting and converting all contained files.
The converter extracts the ZIP contents to a temporary directory, processes each file
using appropriate converters based on file extensions, and then combines the results
into a single markdown document. The temporary directory is cleaned up after processing.
Example output format:
```markdown
Content from the zip file `example.zip`:
## File: docs/readme.txt
This is the content of readme.txt
Multiple lines are preserved
## File: images/example.jpg
ImageSize: 1920x1080
DateTimeOriginal: 2024-02-15 14:30:00
Description: A beautiful landscape photo
## File: data/report.xlsx
## Sheet1
| Column1 | Column2 | Column3 |
|---------|---------|---------|
| data1 | data2 | data3 |
| data4 | data5 | data6 |
```
Key features:
- Maintains original file structure in headings
- Processes nested files recursively
- Uses appropriate converters for each file type
- Preserves formatting of converted content
- Cleans up temporary files after processing
"""
def __init__(
self,
*,
markitup: "MarkItUp",
):
super().__init__()
self._markitup = markitup
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
return False
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
file_path = stream_info.url or stream_info.local_path or stream_info.filename
md_content = f"Content from the zip file `{file_path}`:\n\n"
with zipfile.ZipFile(file_stream, "r") as zipObj:
for name in zipObj.namelist():
try:
z_file_stream = io.BytesIO(zipObj.read(name))
z_file_stream_info = StreamInfo(
extension=os.path.splitext(name)[1],
filename=os.path.basename(name),
)
result = self._markitup.convert_stream(
stream=z_file_stream,
stream_info=z_file_stream_info,
)
if result is not None:
md_content += f"## File: {name}\n\n"
md_content += result.markdown + "\n\n"
except UnsupportedFormatException:
pass
except FileConversionException:
pass
return DocumentConverterResult(markdown=md_content.strip())

View file

@ -532,10 +532,6 @@ audio-transcription = [
{ name = "pydub" }, { name = "pydub" },
{ name = "speechrecognition" }, { name = "speechrecognition" },
] ]
az-doc-intel = [
{ name = "azure-ai-documentintelligence" },
{ name = "azure-identity" },
]
docx = [ docx = [
{ name = "lxml" }, { name = "lxml" },
{ name = "mammoth" }, { name = "mammoth" },
@ -564,9 +560,7 @@ youtube-transcription = [
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "azure-ai-documentintelligence", marker = "extra == 'all'" }, { name = "azure-ai-documentintelligence", marker = "extra == 'all'" },
{ name = "azure-ai-documentintelligence", marker = "extra == 'az-doc-intel'" },
{ name = "azure-identity", marker = "extra == 'all'" }, { name = "azure-identity", marker = "extra == 'all'" },
{ name = "azure-identity", marker = "extra == 'az-doc-intel'" },
{ name = "beautifulsoup4" }, { name = "beautifulsoup4" },
{ name = "charset-normalizer" }, { name = "charset-normalizer" },
{ name = "lxml", marker = "extra == 'all'" }, { name = "lxml", marker = "extra == 'all'" },
@ -596,7 +590,7 @@ requires-dist = [
{ name = "youtube-transcript-api", marker = "extra == 'all'", specifier = "~=1.0.0" }, { name = "youtube-transcript-api", marker = "extra == 'all'", specifier = "~=1.0.0" },
{ name = "youtube-transcript-api", marker = "extra == 'youtube-transcription'" }, { name = "youtube-transcript-api", marker = "extra == 'youtube-transcription'" },
] ]
provides-extras = ["all", "audio-transcription", "az-doc-intel", "docx", "outlook", "pdf", "pptx", "xls", "xlsx", "youtube-transcription"] provides-extras = ["all", "audio-transcription", "docx", "outlook", "pdf", "pptx", "xls", "xlsx", "youtube-transcription"]
[[package]] [[package]]
name = "mpmath" name = "mpmath"