diff --git a/packages/markitup/pyproject.toml b/packages/markitup/pyproject.toml index 13f4f3a..17c2488 100644 --- a/packages/markitup/pyproject.toml +++ b/packages/markitup/pyproject.toml @@ -51,10 +51,7 @@ docx = ["mammoth", "lxml"] xlsx = ["pandas", "openpyxl"] xls = ["pandas", "xlrd"] pdf = ["pdfminer.six"] -outlook = ["olefile"] -audio-transcription = ["pydub", "SpeechRecognition"] -youtube-transcription = ["youtube-transcript-api"] -az-doc-intel = ["azure-ai-documentintelligence", "azure-identity"] + [tool.hatch.version] path = "src/markitup/__about__.py" diff --git a/packages/markitup/src/markitup/converters/__init__.py b/packages/markitup/src/markitup/converters/__init__.py index e4437a5..775e4c0 100644 --- a/packages/markitup/src/markitup/converters/__init__.py +++ b/packages/markitup/src/markitup/converters/__init__.py @@ -4,24 +4,11 @@ from ._plain_text_converter import PlainTextConverter from ._html_converter import HtmlConverter -from ._rss_converter import RssConverter -from ._wikipedia_converter import WikipediaConverter -from ._youtube_converter import YouTubeConverter -from ._ipynb_converter import IpynbConverter -from ._bing_serp_converter import BingSerpConverter from ._pdf_converter import PdfConverter from ._docx_converter import DocxConverter from ._xlsx_converter import XlsxConverter, XlsConverter from ._pptx_converter import PptxConverter -from ._image_converter import ImageConverter from ._audio_converter import AudioConverter -from ._outlook_msg_converter import OutlookMsgConverter -from ._zip_converter import ZipConverter -from ._doc_intel_converter import ( - DocumentIntelligenceConverter, - DocumentIntelligenceFileType, -) -from ._epub_converter import EpubConverter from ._csv_converter import CsvConverter __all__ = [ diff --git a/packages/markitup/src/markitup/converters/_bing_serp_converter.py b/packages/markitup/src/markitup/converters/_bing_serp_converter.py deleted file mode 100644 index f65b85f..0000000 --- a/packages/markitup/src/markitup/converters/_bing_serp_converter.py +++ /dev/null @@ -1,121 +0,0 @@ -import io -import re -import base64 -import binascii -from urllib.parse import parse_qs, urlparse -from typing import Any, BinaryIO, Optional -from bs4 import BeautifulSoup - -from .._base_converter import DocumentConverter, DocumentConverterResult -from .._stream_info import StreamInfo -from ._markdownify import _CustomMarkdownify - -ACCEPTED_MIME_TYPE_PREFIXES = [ - "text/html", - "application/xhtml", -] - -ACCEPTED_FILE_EXTENSIONS = [ - ".html", - ".htm", -] - - -class BingSerpConverter(DocumentConverter): - """ - Handle Bing results pages (only the organic search results). - NOTE: It is better to use the Bing API - """ - - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> bool: - """ - Make sure we're dealing with HTML content *from* Bing. - """ - - url = stream_info.url or "" - mimetype = (stream_info.mimetype or "").lower() - extension = (stream_info.extension or "").lower() - - if not re.search(r"^https://www\.bing\.com/search\?q=", url): - # Not a Bing SERP URL - return False - - if extension in ACCEPTED_FILE_EXTENSIONS: - return True - - for prefix in ACCEPTED_MIME_TYPE_PREFIXES: - if mimetype.startswith(prefix): - return True - - # Not HTML content - return False - - def convert( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> DocumentConverterResult: - assert stream_info.url is not None - - # Parse the query parameters - parsed_params = parse_qs(urlparse(stream_info.url).query) - query = parsed_params.get("q", [""])[0] - - # Parse the stream - encoding = "utf-8" if stream_info.charset is None else stream_info.charset - soup = BeautifulSoup(file_stream, "html.parser", from_encoding=encoding) - - # Clean up some formatting - for tptt in soup.find_all(class_="tptt"): - if hasattr(tptt, "string") and tptt.string: - tptt.string += " " - for slug in soup.find_all(class_="algoSlug_icon"): - slug.extract() - - # Parse the algorithmic results - _markdownify = _CustomMarkdownify(**kwargs) - results = list() - for result in soup.find_all(class_="b_algo"): - if not hasattr(result, "find_all"): - continue - - # Rewrite redirect urls - for a in result.find_all("a", href=True): - parsed_href = urlparse(a["href"]) - qs = parse_qs(parsed_href.query) - - # The destination is contained in the u parameter, - # but appears to be base64 encoded, with some prefix - if "u" in qs: - u = ( - qs["u"][0][2:].strip() + "==" - ) # Python 3 doesn't care about extra padding - - try: - # RFC 4648 / Base64URL" variant, which uses "-" and "_" - a["href"] = base64.b64decode(u, altchars="-_").decode("utf-8") - except UnicodeDecodeError: - pass - except binascii.Error: - pass - - # Convert to markdown - md_result = _markdownify.convert_soup(result).strip() - lines = [line.strip() for line in re.split(r"\n+", md_result)] - results.append("\n".join([line for line in lines if len(line) > 0])) - - webpage_text = ( - f"## A Bing search for '{query}' found the following results:\n\n" - + "\n\n".join(results) - ) - - return DocumentConverterResult( - markdown=webpage_text, - title=None if soup.title is None else soup.title.string, - ) diff --git a/packages/markitup/src/markitup/converters/_doc_intel_converter.py b/packages/markitup/src/markitup/converters/_doc_intel_converter.py deleted file mode 100644 index c71d7cc..0000000 --- a/packages/markitup/src/markitup/converters/_doc_intel_converter.py +++ /dev/null @@ -1,250 +0,0 @@ -import sys -import re -import os -from typing import BinaryIO, Any, List, Optional, Union -from enum import Enum - -from ._html_converter import HtmlConverter -from .._base_converter import DocumentConverter, DocumentConverterResult -from .._stream_info import StreamInfo -from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE - -# Try loading optional (but in this case, required) dependencies -# Save reporting of any exceptions for later -_dependency_exc_info = None -try: - from azure.ai.documentintelligence import DocumentIntelligenceClient - from azure.ai.documentintelligence.models import ( - AnalyzeDocumentRequest, - AnalyzeResult, - DocumentAnalysisFeature, - ) - from azure.core.credentials import AzureKeyCredential, TokenCredential - from azure.identity import DefaultAzureCredential -except ImportError: - # Preserve the error and stack trace for later - _dependency_exc_info = sys.exc_info() - - # Define these types for type hinting when the package is not available - class AzureKeyCredential: - pass - - class TokenCredential: - pass - - class DocumentIntelligenceClient: - pass - - class AnalyzeDocumentRequest: - pass - - class AnalyzeResult: - pass - - class DocumentAnalysisFeature: - pass - - class DefaultAzureCredential: - pass - - -# TODO: currently, there is a bug in the document intelligence SDK with importing the "ContentFormat" enum. -# This constant is a temporary fix until the bug is resolved. -CONTENT_FORMAT = "markdown" - - -class DocumentIntelligenceFileType(str, Enum): - """Enum of file types supported by the Document Intelligence Converter.""" - - # No OCR - DOCX = "docx" - PPTX = "pptx" - XLSX = "xlsx" - HTML = "html" - # OCR - PDF = "pdf" - JPEG = "jpeg" - PNG = "png" - BMP = "bmp" - TIFF = "tiff" - - -def _get_mime_type_prefixes(types: List[DocumentIntelligenceFileType]) -> List[str]: - """Get the MIME type prefixes for the given file types.""" - prefixes: List[str] = [] - for type_ in types: - if type_ == DocumentIntelligenceFileType.DOCX: - prefixes.append( - "application/vnd.openxmlformats-officedocument.wordprocessingml.document" - ) - elif type_ == DocumentIntelligenceFileType.PPTX: - prefixes.append( - "application/vnd.openxmlformats-officedocument.presentationml" - ) - elif type_ == DocumentIntelligenceFileType.XLSX: - prefixes.append( - "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" - ) - elif type_ == DocumentIntelligenceFileType.PDF: - prefixes.append("application/pdf") - prefixes.append("application/x-pdf") - elif type_ == DocumentIntelligenceFileType.JPEG: - prefixes.append("image/jpeg") - elif type_ == DocumentIntelligenceFileType.PNG: - prefixes.append("image/png") - elif type_ == DocumentIntelligenceFileType.BMP: - prefixes.append("image/bmp") - elif type_ == DocumentIntelligenceFileType.TIFF: - prefixes.append("image/tiff") - return prefixes - - -def _get_file_extensions(types: List[DocumentIntelligenceFileType]) -> List[str]: - """Get the file extensions for the given file types.""" - extensions: List[str] = [] - for type_ in types: - if type_ == DocumentIntelligenceFileType.DOCX: - extensions.append(".docx") - elif type_ == DocumentIntelligenceFileType.PPTX: - extensions.append(".pptx") - elif type_ == DocumentIntelligenceFileType.XLSX: - extensions.append(".xlsx") - elif type_ == DocumentIntelligenceFileType.PDF: - extensions.append(".pdf") - elif type_ == DocumentIntelligenceFileType.JPEG: - extensions.append(".jpg") - extensions.append(".jpeg") - elif type_ == DocumentIntelligenceFileType.PNG: - extensions.append(".png") - elif type_ == DocumentIntelligenceFileType.BMP: - extensions.append(".bmp") - elif type_ == DocumentIntelligenceFileType.TIFF: - extensions.append(".tiff") - return extensions - - -class DocumentIntelligenceConverter(DocumentConverter): - """Specialized DocumentConverter that uses Document Intelligence to extract text from documents.""" - - def __init__( - self, - *, - endpoint: str, - api_version: str = "2024-07-31-preview", - credential: AzureKeyCredential | TokenCredential | None = None, - file_types: List[DocumentIntelligenceFileType] = [ - DocumentIntelligenceFileType.DOCX, - DocumentIntelligenceFileType.PPTX, - DocumentIntelligenceFileType.XLSX, - DocumentIntelligenceFileType.PDF, - DocumentIntelligenceFileType.JPEG, - DocumentIntelligenceFileType.PNG, - DocumentIntelligenceFileType.BMP, - DocumentIntelligenceFileType.TIFF, - ], - ): - """ - Initialize the DocumentIntelligenceConverter. - - Args: - endpoint (str): The endpoint for the Document Intelligence service. - api_version (str): The API version to use. Defaults to "2024-07-31-preview". - credential (AzureKeyCredential | TokenCredential | None): The credential to use for authentication. - file_types (List[DocumentIntelligenceFileType]): The file types to accept. Defaults to all supported file types. - """ - - super().__init__() - self._file_types = file_types - - # Raise an error if the dependencies are not available. - # This is different than other converters since this one isn't even instantiated - # unless explicitly requested. - if _dependency_exc_info is not None: - raise MissingDependencyException( - "DocumentIntelligenceConverter requires the optional dependency [az-doc-intel] (or [all]) to be installed. E.g., `pip install markitup[az-doc-intel]`" - ) from _dependency_exc_info[ - 1 - ].with_traceback( # type: ignore[union-attr] - _dependency_exc_info[2] - ) - - if credential is None: - if os.environ.get("AZURE_API_KEY") is None: - credential = DefaultAzureCredential() - else: - credential = AzureKeyCredential(os.environ["AZURE_API_KEY"]) - - self.endpoint = endpoint - self.api_version = api_version - self.doc_intel_client = DocumentIntelligenceClient( - endpoint=self.endpoint, - api_version=self.api_version, - credential=credential, - ) - - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> bool: - mimetype = (stream_info.mimetype or "").lower() - extension = (stream_info.extension or "").lower() - - if extension in _get_file_extensions(self._file_types): - return True - - for prefix in _get_mime_type_prefixes(self._file_types): - if mimetype.startswith(prefix): - return True - - return False - - def _analysis_features(self, stream_info: StreamInfo) -> List[str]: - """ - Helper needed to determine which analysis features to use. - Certain document analysis features are not availiable for - office filetypes (.xlsx, .pptx, .html, .docx) - """ - mimetype = (stream_info.mimetype or "").lower() - extension = (stream_info.extension or "").lower() - - # Types that don't support ocr - no_ocr_types = [ - DocumentIntelligenceFileType.DOCX, - DocumentIntelligenceFileType.PPTX, - DocumentIntelligenceFileType.XLSX, - DocumentIntelligenceFileType.HTML, - ] - - if extension in _get_file_extensions(no_ocr_types): - return [] - - for prefix in _get_mime_type_prefixes(no_ocr_types): - if mimetype.startswith(prefix): - return [] - - return [ - DocumentAnalysisFeature.FORMULAS, # enable formula extraction - DocumentAnalysisFeature.OCR_HIGH_RESOLUTION, # enable high resolution OCR - DocumentAnalysisFeature.STYLE_FONT, # enable font style extraction - ] - - def convert( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> DocumentConverterResult: - # Extract the text using Azure Document Intelligence - poller = self.doc_intel_client.begin_analyze_document( - model_id="prebuilt-layout", - body=AnalyzeDocumentRequest(bytes_source=file_stream.read()), - features=self._analysis_features(stream_info), - output_content_format=CONTENT_FORMAT, # TODO: replace with "ContentFormat.MARKDOWN" when the bug is fixed - ) - result: AnalyzeResult = poller.result() - - # remove comments from the markdown content generated by Doc Intelligence and append to markdown string - markdown_text = re.sub(r"", "", result.content, flags=re.DOTALL) - return DocumentConverterResult(markdown=markdown_text) diff --git a/packages/markitup/src/markitup/converters/_epub_converter.py b/packages/markitup/src/markitup/converters/_epub_converter.py deleted file mode 100644 index 17d6d29..0000000 --- a/packages/markitup/src/markitup/converters/_epub_converter.py +++ /dev/null @@ -1,147 +0,0 @@ -import os -import zipfile -import xml.dom.minidom as minidom - -from typing import BinaryIO, Any, Dict, List - -from ._html_converter import HtmlConverter -from .._base_converter import DocumentConverter, DocumentConverterResult -from .._stream_info import StreamInfo - -ACCEPTED_MIME_TYPE_PREFIXES = [ - "application/epub", - "application/epub+zip", - "application/x-epub+zip", -] - -ACCEPTED_FILE_EXTENSIONS = [".epub"] - -MIME_TYPE_MAPPING = { - ".html": "text/html", - ".xhtml": "application/xhtml+xml", -} - - -class EpubConverter(HtmlConverter): - """ - Converts EPUB files to Markdown. Style information (e.g.m headings) and tables are preserved where possible. - """ - - def __init__(self): - super().__init__() - self._html_converter = HtmlConverter() - - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> bool: - mimetype = (stream_info.mimetype or "").lower() - extension = (stream_info.extension or "").lower() - - if extension in ACCEPTED_FILE_EXTENSIONS: - return True - - for prefix in ACCEPTED_MIME_TYPE_PREFIXES: - if mimetype.startswith(prefix): - return True - - return False - - def convert( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> DocumentConverterResult: - with zipfile.ZipFile(file_stream, "r") as z: - # Extracts metadata (title, authors, language, publisher, date, description, cover) from an EPUB file.""" - - # Locate content.opf - container_dom = minidom.parse(z.open("META-INF/container.xml")) - opf_path = container_dom.getElementsByTagName("rootfile")[0].getAttribute( - "full-path" - ) - - # Parse content.opf - opf_dom = minidom.parse(z.open(opf_path)) - metadata: Dict[str, Any] = { - "title": self._get_text_from_node(opf_dom, "dc:title"), - "authors": self._get_all_texts_from_nodes(opf_dom, "dc:creator"), - "language": self._get_text_from_node(opf_dom, "dc:language"), - "publisher": self._get_text_from_node(opf_dom, "dc:publisher"), - "date": self._get_text_from_node(opf_dom, "dc:date"), - "description": self._get_text_from_node(opf_dom, "dc:description"), - "identifier": self._get_text_from_node(opf_dom, "dc:identifier"), - } - - # Extract manifest items (ID → href mapping) - manifest = { - item.getAttribute("id"): item.getAttribute("href") - for item in opf_dom.getElementsByTagName("item") - } - - # Extract spine order (ID refs) - spine_items = opf_dom.getElementsByTagName("itemref") - spine_order = [item.getAttribute("idref") for item in spine_items] - - # Convert spine order to actual file paths - base_path = "/".join( - opf_path.split("/")[:-1] - ) # Get base directory of content.opf - spine = [ - f"{base_path}/{manifest[item_id]}" if base_path else manifest[item_id] - for item_id in spine_order - if item_id in manifest - ] - - # Extract and convert the content - markdown_content: List[str] = [] - for file in spine: - if file in z.namelist(): - with z.open(file) as f: - filename = os.path.basename(file) - extension = os.path.splitext(filename)[1].lower() - mimetype = MIME_TYPE_MAPPING.get(extension) - converted_content = self._html_converter.convert( - f, - StreamInfo( - mimetype=mimetype, - extension=extension, - filename=filename, - ), - ) - markdown_content.append(converted_content.markdown.strip()) - - # Format and add the metadata - metadata_markdown = [] - for key, value in metadata.items(): - if isinstance(value, list): - value = ", ".join(value) - if value: - metadata_markdown.append(f"**{key.capitalize()}:** {value}") - - markdown_content.insert(0, "\n".join(metadata_markdown)) - - return DocumentConverterResult( - markdown="\n\n".join(markdown_content), title=metadata["title"] - ) - - def _get_text_from_node(self, dom: minidom.Document, tag_name: str) -> str | None: - """Convenience function to extract a single occurrence of a tag (e.g., title).""" - texts = self._get_all_texts_from_nodes(dom, tag_name) - if len(texts) > 0: - return texts[0] - else: - return None - - def _get_all_texts_from_nodes( - self, dom: minidom.Document, tag_name: str - ) -> List[str]: - """Helper function to extract all occurrences of a tag (e.g., multiple authors).""" - texts: List[str] = [] - for node in dom.getElementsByTagName(tag_name): - if node.firstChild and hasattr(node.firstChild, "nodeValue"): - texts.append(node.firstChild.nodeValue.strip()) - return texts diff --git a/packages/markitup/src/markitup/converters/_image_converter.py b/packages/markitup/src/markitup/converters/_image_converter.py deleted file mode 100644 index dd8fbac..0000000 --- a/packages/markitup/src/markitup/converters/_image_converter.py +++ /dev/null @@ -1,138 +0,0 @@ -from typing import BinaryIO, Any, Union -import base64 -import mimetypes -from ._exiftool import exiftool_metadata -from .._base_converter import DocumentConverter, DocumentConverterResult -from .._stream_info import StreamInfo - -ACCEPTED_MIME_TYPE_PREFIXES = [ - "image/jpeg", - "image/png", -] - -ACCEPTED_FILE_EXTENSIONS = [".jpg", ".jpeg", ".png"] - - -class ImageConverter(DocumentConverter): - """ - Converts images to markdown via extraction of metadata (if `exiftool` is installed), and description via a multimodal LLM (if an llm_client is configured). - """ - - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, - ) -> bool: - mimetype = (stream_info.mimetype or "").lower() - extension = (stream_info.extension or "").lower() - - if extension in ACCEPTED_FILE_EXTENSIONS: - return True - - for prefix in ACCEPTED_MIME_TYPE_PREFIXES: - if mimetype.startswith(prefix): - return True - - return False - - def convert( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> DocumentConverterResult: - md_content = "" - - # Add metadata - metadata = exiftool_metadata( - file_stream, exiftool_path=kwargs.get("exiftool_path") - ) - - if metadata: - for f in [ - "ImageSize", - "Title", - "Caption", - "Description", - "Keywords", - "Artist", - "Author", - "DateTimeOriginal", - "CreateDate", - "GPSPosition", - ]: - if f in metadata: - md_content += f"{f}: {metadata[f]}\n" - - # Try describing the image with GPT - llm_client = kwargs.get("llm_client") - llm_model = kwargs.get("llm_model") - if llm_client is not None and llm_model is not None: - llm_description = self._get_llm_description( - file_stream, - stream_info, - client=llm_client, - model=llm_model, - prompt=kwargs.get("llm_prompt"), - ) - - if llm_description is not None: - md_content += "\n# Description:\n" + llm_description.strip() + "\n" - - return DocumentConverterResult( - markdown=md_content, - ) - - def _get_llm_description( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - *, - client, - model, - prompt=None, - ) -> Union[None, str]: - if prompt is None or prompt.strip() == "": - prompt = "Write a detailed caption for this image." - - # Get the content type - content_type = stream_info.mimetype - if not content_type: - content_type, _ = mimetypes.guess_type( - "_dummy" + (stream_info.extension or "") - ) - if not content_type: - content_type = "application/octet-stream" - - # Convert to base64 - cur_pos = file_stream.tell() - try: - base64_image = base64.b64encode(file_stream.read()).decode("utf-8") - except Exception as e: - return None - finally: - file_stream.seek(cur_pos) - - # Prepare the data-uri - data_uri = f"data:{content_type};base64,{base64_image}" - - # Prepare the OpenAI API request - messages = [ - { - "role": "user", - "content": [ - {"type": "text", "text": prompt}, - { - "type": "image_url", - "image_url": { - "url": data_uri, - }, - }, - ], - } - ] - - # Call the OpenAI API - response = client.chat.completions.create(model=model, messages=messages) - return response.choices[0].message.content diff --git a/packages/markitup/src/markitup/converters/_ipynb_converter.py b/packages/markitup/src/markitup/converters/_ipynb_converter.py deleted file mode 100644 index f8ba193..0000000 --- a/packages/markitup/src/markitup/converters/_ipynb_converter.py +++ /dev/null @@ -1,98 +0,0 @@ -from typing import BinaryIO, Any -import json - -from .._base_converter import DocumentConverter, DocumentConverterResult -from .._exceptions import FileConversionException -from .._stream_info import StreamInfo - -CANDIDATE_MIME_TYPE_PREFIXES = [ - "application/json", -] - -ACCEPTED_FILE_EXTENSIONS = [".ipynb"] - - -class IpynbConverter(DocumentConverter): - """Converts Jupyter Notebook (.ipynb) files to Markdown.""" - - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> bool: - mimetype = (stream_info.mimetype or "").lower() - extension = (stream_info.extension or "").lower() - - if extension in ACCEPTED_FILE_EXTENSIONS: - return True - - for prefix in CANDIDATE_MIME_TYPE_PREFIXES: - if mimetype.startswith(prefix): - # Read further to see if it's a notebook - cur_pos = file_stream.tell() - try: - encoding = stream_info.charset or "utf-8" - notebook_content = file_stream.read().decode(encoding) - return ( - "nbformat" in notebook_content - and "nbformat_minor" in notebook_content - ) - finally: - file_stream.seek(cur_pos) - - return False - - def convert( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> DocumentConverterResult: - # Parse and convert the notebook - result = None - - encoding = stream_info.charset or "utf-8" - notebook_content = file_stream.read().decode(encoding=encoding) - return self._convert(json.loads(notebook_content)) - - def _convert(self, notebook_content: dict) -> DocumentConverterResult: - """Helper function that converts notebook JSON content to Markdown.""" - try: - md_output = [] - title = None - - for cell in notebook_content.get("cells", []): - cell_type = cell.get("cell_type", "") - source_lines = cell.get("source", []) - - if cell_type == "markdown": - md_output.append("".join(source_lines)) - - # Extract the first # heading as title if not already found - if title is None: - for line in source_lines: - if line.startswith("# "): - title = line.lstrip("# ").strip() - break - - elif cell_type == "code": - # Code cells are wrapped in Markdown code blocks - md_output.append(f"```python\n{''.join(source_lines)}\n```") - elif cell_type == "raw": - md_output.append(f"```\n{''.join(source_lines)}\n```") - - md_text = "\n\n".join(md_output) - - # Check for title in notebook metadata - title = notebook_content.get("metadata", {}).get("title", title) - - return DocumentConverterResult( - markdown=md_text, - title=title, - ) - - except Exception as e: - raise FileConversionException( - f"Error converting .ipynb file: {str(e)}" - ) from e diff --git a/packages/markitup/src/markitup/converters/_llm_caption.py b/packages/markitup/src/markitup/converters/_llm_caption.py deleted file mode 100644 index b851dc8..0000000 --- a/packages/markitup/src/markitup/converters/_llm_caption.py +++ /dev/null @@ -1,50 +0,0 @@ -from typing import BinaryIO, Any, Union -import base64 -import mimetypes -from .._stream_info import StreamInfo - - -def llm_caption( - file_stream: BinaryIO, stream_info: StreamInfo, *, client, model, prompt=None -) -> Union[None, str]: - if prompt is None or prompt.strip() == "": - prompt = "Write a detailed caption for this image." - - # Get the content type - content_type = stream_info.mimetype - if not content_type: - content_type, _ = mimetypes.guess_type("_dummy" + (stream_info.extension or "")) - if not content_type: - content_type = "application/octet-stream" - - # Convert to base64 - cur_pos = file_stream.tell() - try: - base64_image = base64.b64encode(file_stream.read()).decode("utf-8") - except Exception as e: - return None - finally: - file_stream.seek(cur_pos) - - # Prepare the data-uri - data_uri = f"data:{content_type};base64,{base64_image}" - - # Prepare the OpenAI API request - messages = [ - { - "role": "user", - "content": [ - {"type": "text", "text": prompt}, - { - "type": "image_url", - "image_url": { - "url": data_uri, - }, - }, - ], - } - ] - - # Call the OpenAI API - response = client.chat.completions.create(model=model, messages=messages) - return response.choices[0].message.content diff --git a/packages/markitup/src/markitup/converters/_markdownify.py b/packages/markitup/src/markitup/converters/_markdownify.py deleted file mode 100644 index d98bdfb..0000000 --- a/packages/markitup/src/markitup/converters/_markdownify.py +++ /dev/null @@ -1,111 +0,0 @@ -import re -import markdownify - -from typing import Any, Optional -from urllib.parse import quote, unquote, urlparse, urlunparse - - -class _CustomMarkdownify(markdownify.MarkdownConverter): - """ - A custom version of markdownify's MarkdownConverter. Changes include: - - - Altering the default heading style to use '#', '##', etc. - - Removing javascript hyperlinks. - - Truncating images with large data:uri sources. - - Ensuring URIs are properly escaped, and do not conflict with Markdown syntax - """ - - def __init__(self, **options: Any): - options["heading_style"] = options.get("heading_style", markdownify.ATX) - options["keep_data_uris"] = options.get("keep_data_uris", False) - # Explicitly cast options to the expected type if necessary - super().__init__(**options) - - def convert_hn( - self, - n: int, - el: Any, - text: str, - convert_as_inline: Optional[bool] = False, - **kwargs, - ) -> str: - """Same as usual, but be sure to start with a new line""" - if not convert_as_inline: - if not re.search(r"^\n", text): - return "\n" + super().convert_hn(n, el, text, convert_as_inline) # type: ignore - - return super().convert_hn(n, el, text, convert_as_inline) # type: ignore - - def convert_a( - self, - el: Any, - text: str, - convert_as_inline: Optional[bool] = False, - **kwargs, - ): - """Same as usual converter, but removes Javascript links and escapes URIs.""" - prefix, suffix, text = markdownify.chomp(text) # type: ignore - if not text: - return "" - - if el.find_parent("pre") is not None: - return text - - href = el.get("href") - title = el.get("title") - - # Escape URIs and skip non-http or file schemes - if href: - try: - parsed_url = urlparse(href) # type: ignore - if parsed_url.scheme and parsed_url.scheme.lower() not in ["http", "https", "file"]: # type: ignore - return "%s%s%s" % (prefix, text, suffix) - href = urlunparse(parsed_url._replace(path=quote(unquote(parsed_url.path)))) # type: ignore - except ValueError: # It's not clear if this ever gets thrown - return "%s%s%s" % (prefix, text, suffix) - - # For the replacement see #29: text nodes underscores are escaped - if ( - self.options["autolinks"] - and text.replace(r"\_", "_") == href - and not title - and not self.options["default_title"] - ): - # Shortcut syntax - return "<%s>" % href - if self.options["default_title"] and not title: - title = href - title_part = ' "%s"' % title.replace('"', r"\"") if title else "" - return ( - "%s[%s](%s%s)%s" % (prefix, text, href, title_part, suffix) - if href - else text - ) - - def convert_img( - self, - el: Any, - text: str, - convert_as_inline: Optional[bool] = False, - **kwargs, - ) -> str: - """Same as usual converter, but removes data URIs""" - - alt = el.attrs.get("alt", None) or "" - src = el.attrs.get("src", None) or "" - title = el.attrs.get("title", None) or "" - title_part = ' "%s"' % title.replace('"', r"\"") if title else "" - if ( - convert_as_inline - and el.parent.name not in self.options["keep_inline_images_in"] - ): - return alt - - # Remove dataURIs - if src.startswith("data:") and not self.options["keep_data_uris"]: - src = src.split(",")[0] + "..." - - return "![%s](%s%s)" % (alt, src, title_part) - - def convert_soup(self, soup: Any) -> str: - return super().convert_soup(soup) # type: ignore diff --git a/packages/markitup/src/markitup/converters/_outlook_msg_converter.py b/packages/markitup/src/markitup/converters/_outlook_msg_converter.py deleted file mode 100644 index d216bea..0000000 --- a/packages/markitup/src/markitup/converters/_outlook_msg_converter.py +++ /dev/null @@ -1,149 +0,0 @@ -import sys -from typing import Any, Union, BinaryIO -from .._stream_info import StreamInfo -from .._base_converter import DocumentConverter, DocumentConverterResult -from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE - -# Try loading optional (but in this case, required) dependencies -# Save reporting of any exceptions for later -_dependency_exc_info = None -olefile = None -try: - import olefile # type: ignore[no-redef] -except ImportError: - # Preserve the error and stack trace for later - _dependency_exc_info = sys.exc_info() - -ACCEPTED_MIME_TYPE_PREFIXES = [ - "application/vnd.ms-outlook", -] - -ACCEPTED_FILE_EXTENSIONS = [".msg"] - - -class OutlookMsgConverter(DocumentConverter): - """Converts Outlook .msg files to markdown by extracting email metadata and content. - - Uses the olefile package to parse the .msg file structure and extract: - - Email headers (From, To, Subject) - - Email body content - """ - - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> bool: - mimetype = (stream_info.mimetype or "").lower() - extension = (stream_info.extension or "").lower() - - # Check the extension and mimetype - if extension in ACCEPTED_FILE_EXTENSIONS: - return True - - for prefix in ACCEPTED_MIME_TYPE_PREFIXES: - if mimetype.startswith(prefix): - return True - - # Brute force, check if we have an OLE file - cur_pos = file_stream.tell() - try: - if olefile and not olefile.isOleFile(file_stream): - return False - finally: - file_stream.seek(cur_pos) - - # Brue force, check if it's an Outlook file - try: - if olefile is not None: - msg = olefile.OleFileIO(file_stream) - toc = "\n".join([str(stream) for stream in msg.listdir()]) - return ( - "__properties_version1.0" in toc - and "__recip_version1.0_#00000000" in toc - ) - except Exception as e: - pass - finally: - file_stream.seek(cur_pos) - - return False - - def convert( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> DocumentConverterResult: - # Check: the dependencies - if _dependency_exc_info is not None: - raise MissingDependencyException( - MISSING_DEPENDENCY_MESSAGE.format( - converter=type(self).__name__, - extension=".msg", - feature="outlook", - ) - ) from _dependency_exc_info[ - 1 - ].with_traceback( # type: ignore[union-attr] - _dependency_exc_info[2] - ) - - assert ( - olefile is not None - ) # If we made it this far, olefile should be available - msg = olefile.OleFileIO(file_stream) - - # Extract email metadata - md_content = "# Email Message\n\n" - - # Get headers - headers = { - "From": self._get_stream_data(msg, "__substg1.0_0C1F001F"), - "To": self._get_stream_data(msg, "__substg1.0_0E04001F"), - "Subject": self._get_stream_data(msg, "__substg1.0_0037001F"), - } - - # Add headers to markdown - for key, value in headers.items(): - if value: - md_content += f"**{key}:** {value}\n" - - md_content += "\n## Content\n\n" - - # Get email body - body = self._get_stream_data(msg, "__substg1.0_1000001F") - if body: - md_content += body - - msg.close() - - return DocumentConverterResult( - markdown=md_content.strip(), - title=headers.get("Subject"), - ) - - def _get_stream_data(self, msg: Any, stream_path: str) -> Union[str, None]: - """Helper to safely extract and decode stream data from the MSG file.""" - assert olefile is not None - assert isinstance( - msg, olefile.OleFileIO - ) # Ensure msg is of the correct type (type hinting is not possible with the optional olefile package) - - try: - if msg.exists(stream_path): - data = msg.openstream(stream_path).read() - # Try UTF-16 first (common for .msg files) - try: - return data.decode("utf-16-le").strip() - except UnicodeDecodeError: - # Fall back to UTF-8 - try: - return data.decode("utf-8").strip() - except UnicodeDecodeError: - # Last resort - ignore errors - return data.decode("utf-8", errors="ignore").strip() - except Exception: - pass - return None diff --git a/packages/markitup/src/markitup/converters/_plain_text_converter.py b/packages/markitup/src/markitup/converters/_plain_text_converter.py index 2e10405..ff6d75e 100644 --- a/packages/markitup/src/markitup/converters/_plain_text_converter.py +++ b/packages/markitup/src/markitup/converters/_plain_text_converter.py @@ -5,15 +5,6 @@ from charset_normalizer import from_bytes from .._base_converter import DocumentConverter, DocumentConverterResult from .._stream_info import StreamInfo -# Try loading optional (but in this case, required) dependencies -# Save reporting of any exceptions for later -_dependency_exc_info = None -try: - import mammoth -except ImportError: - # Preserve the error and stack trace for later - _dependency_exc_info = sys.exc_info() - ACCEPTED_MIME_TYPE_PREFIXES = [ "text/", "application/json", diff --git a/packages/markitup/src/markitup/converters/_pptx_converter.py b/packages/markitup/src/markitup/converters/_pptx_converter.py index 087da32..23bb7f9 100644 --- a/packages/markitup/src/markitup/converters/_pptx_converter.py +++ b/packages/markitup/src/markitup/converters/_pptx_converter.py @@ -9,7 +9,6 @@ from typing import BinaryIO, Any from operator import attrgetter from ._html_converter import HtmlConverter -from ._llm_caption import llm_caption from .._base_converter import DocumentConverter, DocumentConverterResult from .._stream_info import StreamInfo from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE @@ -95,39 +94,8 @@ class PptxConverter(DocumentConverter): if self._is_picture(shape): # https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069 - llm_description = "" alt_text = "" - # Potentially generate a description using an LLM - llm_client = kwargs.get("llm_client") - llm_model = kwargs.get("llm_model") - if llm_client is not None and llm_model is not None: - # Prepare a file_stream and stream_info for the image data - image_filename = shape.image.filename - image_extension = None - if image_filename: - image_extension = os.path.splitext(image_filename)[1] - image_stream_info = StreamInfo( - mimetype=shape.image.content_type, - extension=image_extension, - filename=image_filename, - ) - - image_stream = io.BytesIO(shape.image.blob) - - # Caption the image - try: - llm_description = llm_caption( - image_stream, - image_stream_info, - client=llm_client, - model=llm_model, - prompt=kwargs.get("llm_prompt"), - ) - except Exception: - # Unable to generate a description - pass - # Also grab any description embedded in the deck try: alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "") @@ -136,7 +104,7 @@ class PptxConverter(DocumentConverter): pass # Prepare the alt, escaping any special characters - alt_text = "\n".join([llm_description, alt_text]) or shape.name + alt_text = "\n".join([alt_text]) or shape.name alt_text = re.sub(r"[\r\n\[\]]", " ", alt_text) alt_text = re.sub(r"\s+", " ", alt_text).strip() diff --git a/packages/markitup/src/markitup/converters/_rss_converter.py b/packages/markitup/src/markitup/converters/_rss_converter.py deleted file mode 100644 index 6a0e4c1..0000000 --- a/packages/markitup/src/markitup/converters/_rss_converter.py +++ /dev/null @@ -1,191 +0,0 @@ -from xml.dom import minidom -from typing import BinaryIO, Any, Union -from bs4 import BeautifulSoup - -from ._markdownify import _CustomMarkdownify -from .._stream_info import StreamInfo -from .._base_converter import DocumentConverter, DocumentConverterResult - -PRECISE_MIME_TYPE_PREFIXES = [ - "application/rss", - "application/rss+xml", - "application/atom", - "application/atom+xml", -] - -PRECISE_FILE_EXTENSIONS = [".rss", ".atom"] - -CANDIDATE_MIME_TYPE_PREFIXES = [ - "text/xml", - "application/xml", -] - -CANDIDATE_FILE_EXTENSIONS = [ - ".xml", -] - - -class RssConverter(DocumentConverter): - """Convert RSS / Atom type to markdown""" - - def __init__(self): - super().__init__() - self._kwargs = {} - - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> bool: - mimetype = (stream_info.mimetype or "").lower() - extension = (stream_info.extension or "").lower() - - # Check for precise mimetypes and file extensions - if extension in PRECISE_FILE_EXTENSIONS: - return True - - for prefix in PRECISE_MIME_TYPE_PREFIXES: - if mimetype.startswith(prefix): - return True - - # Check for precise mimetypes and file extensions - if extension in CANDIDATE_FILE_EXTENSIONS: - return self._check_xml(file_stream) - - for prefix in CANDIDATE_MIME_TYPE_PREFIXES: - if mimetype.startswith(prefix): - return self._check_xml(file_stream) - - return False - - def _check_xml(self, file_stream: BinaryIO) -> bool: - cur_pos = file_stream.tell() - try: - doc = minidom.parse(file_stream) - return self._feed_type(doc) is not None - except BaseException as _: - pass - finally: - file_stream.seek(cur_pos) - return False - - def _feed_type(self, doc: Any) -> str | None: - if doc.getElementsByTagName("rss"): - return "rss" - elif doc.getElementsByTagName("feed"): - root = doc.getElementsByTagName("feed")[0] - if root.getElementsByTagName("entry"): - # An Atom feed must have a root element of and at least one - return "atom" - return None - - def convert( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> DocumentConverterResult: - self._kwargs = kwargs - doc = minidom.parse(file_stream) - feed_type = self._feed_type(doc) - - if feed_type == "rss": - return self._parse_rss_type(doc) - elif feed_type == "atom": - return self._parse_atom_type(doc) - else: - raise ValueError("Unknown feed type") - - def _parse_atom_type(self, doc: minidom.Document) -> DocumentConverterResult: - """Parse the type of an Atom feed. - - Returns None if the feed type is not recognized or something goes wrong. - """ - root = doc.getElementsByTagName("feed")[0] - title = self._get_data_by_tag_name(root, "title") - subtitle = self._get_data_by_tag_name(root, "subtitle") - entries = root.getElementsByTagName("entry") - md_text = f"# {title}\n" - if subtitle: - md_text += f"{subtitle}\n" - for entry in entries: - entry_title = self._get_data_by_tag_name(entry, "title") - entry_summary = self._get_data_by_tag_name(entry, "summary") - entry_updated = self._get_data_by_tag_name(entry, "updated") - entry_content = self._get_data_by_tag_name(entry, "content") - - if entry_title: - md_text += f"\n## {entry_title}\n" - if entry_updated: - md_text += f"Updated on: {entry_updated}\n" - if entry_summary: - md_text += self._parse_content(entry_summary) - if entry_content: - md_text += self._parse_content(entry_content) - - return DocumentConverterResult( - markdown=md_text, - title=title, - ) - - def _parse_rss_type(self, doc: minidom.Document) -> DocumentConverterResult: - """Parse the type of an RSS feed. - - Returns None if the feed type is not recognized or something goes wrong. - """ - root = doc.getElementsByTagName("rss")[0] - channel_list = root.getElementsByTagName("channel") - if not channel_list: - raise ValueError("No channel found in RSS feed") - channel = channel_list[0] - channel_title = self._get_data_by_tag_name(channel, "title") - channel_description = self._get_data_by_tag_name(channel, "description") - items = channel.getElementsByTagName("item") - if channel_title: - md_text = f"# {channel_title}\n" - if channel_description: - md_text += f"{channel_description}\n" - for item in items: - title = self._get_data_by_tag_name(item, "title") - description = self._get_data_by_tag_name(item, "description") - pubDate = self._get_data_by_tag_name(item, "pubDate") - content = self._get_data_by_tag_name(item, "content:encoded") - - if title: - md_text += f"\n## {title}\n" - if pubDate: - md_text += f"Published on: {pubDate}\n" - if description: - md_text += self._parse_content(description) - if content: - md_text += self._parse_content(content) - - return DocumentConverterResult( - markdown=md_text, - title=channel_title, - ) - - def _parse_content(self, content: str) -> str: - """Parse the content of an RSS feed item""" - try: - # using bs4 because many RSS feeds have HTML-styled content - soup = BeautifulSoup(content, "html.parser") - return _CustomMarkdownify(**self._kwargs).convert_soup(soup) - except BaseException as _: - return content - - def _get_data_by_tag_name( - self, element: minidom.Element, tag_name: str - ) -> Union[str, None]: - """Get data from first child element with the given tag name. - Returns None when no such element is found. - """ - nodes = element.getElementsByTagName(tag_name) - if not nodes: - return None - fc = nodes[0].firstChild - if fc: - if hasattr(fc, "data"): - return fc.data - return None diff --git a/packages/markitup/src/markitup/converters/_transcribe_audio.py b/packages/markitup/src/markitup/converters/_transcribe_audio.py deleted file mode 100644 index 5e09d23..0000000 --- a/packages/markitup/src/markitup/converters/_transcribe_audio.py +++ /dev/null @@ -1,49 +0,0 @@ -import io -import sys -from typing import BinaryIO -from .._exceptions import MissingDependencyException - -# Try loading optional (but in this case, required) dependencies -# Save reporting of any exceptions for later -_dependency_exc_info = None -try: - # Suppress some warnings on library import - import warnings - - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=DeprecationWarning) - warnings.filterwarnings("ignore", category=SyntaxWarning) - import speech_recognition as sr - import pydub -except ImportError: - # Preserve the error and stack trace for later - _dependency_exc_info = sys.exc_info() - - -def transcribe_audio(file_stream: BinaryIO, *, audio_format: str = "wav") -> str: - # Check for installed dependencies - if _dependency_exc_info is not None: - raise MissingDependencyException( - "Speech transcription requires installing MarkItdown with the [audio-transcription] optional dependencies. E.g., `pip install markitup[audio-transcription]` or `pip install markitup[all]`" - ) from _dependency_exc_info[ - 1 - ].with_traceback( # type: ignore[union-attr] - _dependency_exc_info[2] - ) - - if audio_format in ["wav", "aiff", "flac"]: - audio_source = file_stream - elif audio_format in ["mp3", "mp4"]: - audio_segment = pydub.AudioSegment.from_file(file_stream, format=audio_format) - - audio_source = io.BytesIO() - audio_segment.export(audio_source, format="wav") - audio_source.seek(0) - else: - raise ValueError(f"Unsupported audio format: {audio_format}") - - recognizer = sr.Recognizer() - with sr.AudioFile(audio_source) as source: - audio = recognizer.record(source) - transcript = recognizer.recognize_google(audio).strip() - return "[No speech detected]" if transcript == "" else transcript diff --git a/packages/markitup/src/markitup/converters/_wikipedia_converter.py b/packages/markitup/src/markitup/converters/_wikipedia_converter.py deleted file mode 100644 index c0f7e0e..0000000 --- a/packages/markitup/src/markitup/converters/_wikipedia_converter.py +++ /dev/null @@ -1,88 +0,0 @@ -import io -import re -import bs4 -from typing import Any, BinaryIO, Optional - -from .._base_converter import DocumentConverter, DocumentConverterResult -from .._stream_info import StreamInfo -from ._markdownify import _CustomMarkdownify - -ACCEPTED_MIME_TYPE_PREFIXES = [ - "text/html", - "application/xhtml", -] - -ACCEPTED_FILE_EXTENSIONS = [ - ".html", - ".htm", -] - - -class WikipediaConverter(DocumentConverter): - """Handle Wikipedia pages separately, focusing only on the main document content.""" - - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> bool: - """ - Make sure we're dealing with HTML content *from* Wikipedia. - """ - - url = stream_info.url or "" - mimetype = (stream_info.mimetype or "").lower() - extension = (stream_info.extension or "").lower() - - if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url): - # Not a Wikipedia URL - return False - - if extension in ACCEPTED_FILE_EXTENSIONS: - return True - - for prefix in ACCEPTED_MIME_TYPE_PREFIXES: - if mimetype.startswith(prefix): - return True - - # Not HTML content - return False - - def convert( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> DocumentConverterResult: - # Parse the stream - encoding = "utf-8" if stream_info.charset is None else stream_info.charset - soup = bs4.BeautifulSoup(file_stream, "html.parser", from_encoding=encoding) - - # Remove javascript and style blocks - for script in soup(["script", "style"]): - script.extract() - - # Print only the main content - body_elm = soup.find("div", {"id": "mw-content-text"}) - title_elm = soup.find("span", {"class": "mw-page-title-main"}) - - webpage_text = "" - main_title = None if soup.title is None else soup.title.string - - if body_elm: - # What's the title - if title_elm and isinstance(title_elm, bs4.Tag): - main_title = title_elm.string - - # Convert the page - webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify( - **kwargs - ).convert_soup(body_elm) - else: - webpage_text = _CustomMarkdownify(**kwargs).convert_soup(soup) - - return DocumentConverterResult( - markdown=webpage_text, - title=main_title, - ) diff --git a/packages/markitup/src/markitup/converters/_youtube_converter.py b/packages/markitup/src/markitup/converters/_youtube_converter.py deleted file mode 100644 index b5a014c..0000000 --- a/packages/markitup/src/markitup/converters/_youtube_converter.py +++ /dev/null @@ -1,224 +0,0 @@ -import sys -import json -import time -import io -import re -import bs4 -from typing import Any, BinaryIO, Optional, Dict, List, Union -from urllib.parse import parse_qs, urlparse, unquote - -from .._base_converter import DocumentConverter, DocumentConverterResult -from .._stream_info import StreamInfo - -# Optional YouTube transcription support -try: - # Suppress some warnings on library import - import warnings - - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=SyntaxWarning) - # Patch submitted upstream to fix the SyntaxWarning - from youtube_transcript_api import YouTubeTranscriptApi - - IS_YOUTUBE_TRANSCRIPT_CAPABLE = True -except ModuleNotFoundError: - IS_YOUTUBE_TRANSCRIPT_CAPABLE = False - - -ACCEPTED_MIME_TYPE_PREFIXES = [ - "text/html", - "application/xhtml", -] - -ACCEPTED_FILE_EXTENSIONS = [ - ".html", - ".htm", -] - - -class YouTubeConverter(DocumentConverter): - """Handle YouTube specially, focusing on the video title, description, and transcript.""" - - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> bool: - """ - Make sure we're dealing with HTML content *from* YouTube. - """ - url = stream_info.url or "" - mimetype = (stream_info.mimetype or "").lower() - extension = (stream_info.extension or "").lower() - - url = unquote(url) - url = url.replace(r"\?", "?").replace(r"\=", "=") - - if not url.startswith("https://www.youtube.com/watch?"): - # Not a YouTube URL - return False - - if extension in ACCEPTED_FILE_EXTENSIONS: - return True - - for prefix in ACCEPTED_MIME_TYPE_PREFIXES: - if mimetype.startswith(prefix): - return True - - # Not HTML content - return False - - def convert( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> DocumentConverterResult: - # Parse the stream - encoding = "utf-8" if stream_info.charset is None else stream_info.charset - soup = bs4.BeautifulSoup(file_stream, "html.parser", from_encoding=encoding) - - # Read the meta tags - metadata: Dict[str, str] = {} - - if soup.title and soup.title.string: - metadata["title"] = soup.title.string - - for meta in soup(["meta"]): - if not isinstance(meta, bs4.Tag): - continue - - for a in meta.attrs: - if a in ["itemprop", "property", "name"]: - key = str(meta.get(a, "")) - content = str(meta.get("content", "")) - if key and content: # Only add non-empty content - metadata[key] = content - break - - # Try reading the description - try: - for script in soup(["script"]): - if not isinstance(script, bs4.Tag): - continue - if not script.string: # Skip empty scripts - continue - content = script.string - if "ytInitialData" in content: - match = re.search(r"var ytInitialData = ({.*?});", content) - if match: - data = json.loads(match.group(1)) - attrdesc = self._findKey(data, "attributedDescriptionBodyText") - if attrdesc and isinstance(attrdesc, dict): - metadata["description"] = str(attrdesc.get("content", "")) - break - except Exception as e: - print(f"Error extracting description: {e}") - pass - - # Start preparing the page - webpage_text = "# YouTube\n" - - title = self._get(metadata, ["title", "og:title", "name"]) # type: ignore - assert isinstance(title, str) - - if title: - webpage_text += f"\n## {title}\n" - - stats = "" - views = self._get(metadata, ["interactionCount"]) # type: ignore - if views: - stats += f"- **Views:** {views}\n" - - keywords = self._get(metadata, ["keywords"]) # type: ignore - if keywords: - stats += f"- **Keywords:** {keywords}\n" - - runtime = self._get(metadata, ["duration"]) # type: ignore - if runtime: - stats += f"- **Runtime:** {runtime}\n" - - if len(stats) > 0: - webpage_text += f"\n### Video Metadata\n{stats}\n" - - description = self._get(metadata, ["description", "og:description"]) # type: ignore - if description: - webpage_text += f"\n### Description\n{description}\n" - - if IS_YOUTUBE_TRANSCRIPT_CAPABLE: - ytt_api = YouTubeTranscriptApi() - transcript_text = "" - parsed_url = urlparse(stream_info.url) # type: ignore - params = parse_qs(parsed_url.query) # type: ignore - if "v" in params and params["v"][0]: - video_id = str(params["v"][0]) - try: - youtube_transcript_languages = kwargs.get( - "youtube_transcript_languages", ("en",) - ) - # Retry the transcript fetching operation - transcript = self._retry_operation( - lambda: ytt_api.fetch( - video_id, languages=youtube_transcript_languages - ), - retries=3, # Retry 3 times - delay=2, # 2 seconds delay between retries - ) - if transcript: - transcript_text = " ".join( - [part.text for part in transcript] - ) # type: ignore - except Exception as e: - print(f"Error fetching transcript: {e}") - if transcript_text: - webpage_text += f"\n### Transcript\n{transcript_text}\n" - - title = title if title else (soup.title.string if soup.title else "") - assert isinstance(title, str) - - return DocumentConverterResult( - markdown=webpage_text, - title=title, - ) - - def _get( - self, - metadata: Dict[str, str], - keys: List[str], - default: Union[str, None] = None, - ) -> Union[str, None]: - """Get first non-empty value from metadata matching given keys.""" - for k in keys: - if k in metadata: - return metadata[k] - return default - - def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json type - """Recursively search for a key in nested dictionary/list structures.""" - if isinstance(json, list): - for elm in json: - ret = self._findKey(elm, key) - if ret is not None: - return ret - elif isinstance(json, dict): - for k, v in json.items(): - if k == key: - return json[k] - if result := self._findKey(v, key): - return result - return None - - def _retry_operation(self, operation, retries=3, delay=2): - """Retries the operation if it fails.""" - attempt = 0 - while attempt < retries: - try: - return operation() # Attempt the operation - except Exception as e: - print(f"Attempt {attempt + 1} failed: {e}") - if attempt < retries - 1: - time.sleep(delay) # Wait before retrying - attempt += 1 - # If all attempts fail, raise the last exception - raise Exception(f"Operation failed after {retries} attempts.") diff --git a/packages/markitup/src/markitup/converters/_zip_converter.py b/packages/markitup/src/markitup/converters/_zip_converter.py deleted file mode 100644 index 897ff72..0000000 --- a/packages/markitup/src/markitup/converters/_zip_converter.py +++ /dev/null @@ -1,117 +0,0 @@ -import sys -import zipfile -import io -import os - -from typing import BinaryIO, Any, TYPE_CHECKING - -from .._base_converter import DocumentConverter, DocumentConverterResult -from .._stream_info import StreamInfo -from .._exceptions import UnsupportedFormatException, FileConversionException - -# Break otherwise circular import for type hinting -if TYPE_CHECKING: - from .._markitup import MarkItUp - -ACCEPTED_MIME_TYPE_PREFIXES = [ - "application/zip", -] - -ACCEPTED_FILE_EXTENSIONS = [".zip"] - - -class ZipConverter(DocumentConverter): - """Converts ZIP files to markdown by extracting and converting all contained files. - - The converter extracts the ZIP contents to a temporary directory, processes each file - using appropriate converters based on file extensions, and then combines the results - into a single markdown document. The temporary directory is cleaned up after processing. - - Example output format: - ```markdown - Content from the zip file `example.zip`: - - ## File: docs/readme.txt - - This is the content of readme.txt - Multiple lines are preserved - - ## File: images/example.jpg - - ImageSize: 1920x1080 - DateTimeOriginal: 2024-02-15 14:30:00 - Description: A beautiful landscape photo - - ## File: data/report.xlsx - - ## Sheet1 - | Column1 | Column2 | Column3 | - |---------|---------|---------| - | data1 | data2 | data3 | - | data4 | data5 | data6 | - ``` - - Key features: - - Maintains original file structure in headings - - Processes nested files recursively - - Uses appropriate converters for each file type - - Preserves formatting of converted content - - Cleans up temporary files after processing - """ - - def __init__( - self, - *, - markitup: "MarkItUp", - ): - super().__init__() - self._markitup = markitup - - def accepts( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> bool: - mimetype = (stream_info.mimetype or "").lower() - extension = (stream_info.extension or "").lower() - - if extension in ACCEPTED_FILE_EXTENSIONS: - return True - - for prefix in ACCEPTED_MIME_TYPE_PREFIXES: - if mimetype.startswith(prefix): - return True - - return False - - def convert( - self, - file_stream: BinaryIO, - stream_info: StreamInfo, - **kwargs: Any, # Options to pass to the converter - ) -> DocumentConverterResult: - file_path = stream_info.url or stream_info.local_path or stream_info.filename - md_content = f"Content from the zip file `{file_path}`:\n\n" - - with zipfile.ZipFile(file_stream, "r") as zipObj: - for name in zipObj.namelist(): - try: - z_file_stream = io.BytesIO(zipObj.read(name)) - z_file_stream_info = StreamInfo( - extension=os.path.splitext(name)[1], - filename=os.path.basename(name), - ) - result = self._markitup.convert_stream( - stream=z_file_stream, - stream_info=z_file_stream_info, - ) - if result is not None: - md_content += f"## File: {name}\n\n" - md_content += result.markdown + "\n\n" - except UnsupportedFormatException: - pass - except FileConversionException: - pass - - return DocumentConverterResult(markdown=md_content.strip()) diff --git a/packages/markitup/uv.lock b/packages/markitup/uv.lock index 2b95df2..def8c93 100644 --- a/packages/markitup/uv.lock +++ b/packages/markitup/uv.lock @@ -532,10 +532,6 @@ audio-transcription = [ { name = "pydub" }, { name = "speechrecognition" }, ] -az-doc-intel = [ - { name = "azure-ai-documentintelligence" }, - { name = "azure-identity" }, -] docx = [ { name = "lxml" }, { name = "mammoth" }, @@ -564,9 +560,7 @@ youtube-transcription = [ [package.metadata] requires-dist = [ { name = "azure-ai-documentintelligence", marker = "extra == 'all'" }, - { name = "azure-ai-documentintelligence", marker = "extra == 'az-doc-intel'" }, { name = "azure-identity", marker = "extra == 'all'" }, - { name = "azure-identity", marker = "extra == 'az-doc-intel'" }, { name = "beautifulsoup4" }, { name = "charset-normalizer" }, { name = "lxml", marker = "extra == 'all'" }, @@ -596,7 +590,7 @@ requires-dist = [ { name = "youtube-transcript-api", marker = "extra == 'all'", specifier = "~=1.0.0" }, { name = "youtube-transcript-api", marker = "extra == 'youtube-transcription'" }, ] -provides-extras = ["all", "audio-transcription", "az-doc-intel", "docx", "outlook", "pdf", "pptx", "xls", "xlsx", "youtube-transcription"] +provides-extras = ["all", "audio-transcription", "docx", "outlook", "pdf", "pptx", "xls", "xlsx", "youtube-transcription"] [[package]] name = "mpmath"