commit
2524c00089
2 changed files with 123 additions and 134 deletions
|
|
@ -1,9 +1,12 @@
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import unicodedata
|
import base64
|
||||||
from typing import BinaryIO, Any
|
import hashlib
|
||||||
|
from typing import BinaryIO, Any, Dict, List, Tuple
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
import json
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
|
||||||
from ._html_converter import HtmlConverter
|
from ._html_converter import HtmlConverter
|
||||||
from ..converter_utils.docx.pre_process import pre_process_docx
|
from ..converter_utils.docx.pre_process import pre_process_docx
|
||||||
|
|
@ -30,7 +33,8 @@ ACCEPTED_FILE_EXTENSIONS = [".docx"]
|
||||||
|
|
||||||
class DocxConverter(HtmlConverter):
|
class DocxConverter(HtmlConverter):
|
||||||
"""
|
"""
|
||||||
Converts DOCX files to Markdown. Style information (e.g.m headings) and tables are preserved where possible.
|
Converts DOCX files to Markdown. Style information (e.g., headings) and tables are preserved where possible.
|
||||||
|
Extracts images from documents and saves them to document-specific subfolders.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
|
@ -54,70 +58,113 @@ class DocxConverter(HtmlConverter):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _sanitize_filename(self, filename: str) -> str:
|
|
||||||
"""
|
|
||||||
Sanitize a filename by removing or replacing problematic characters.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
filename: The original filename
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A sanitized filename safe for filesystem use
|
|
||||||
"""
|
|
||||||
# Step 1: Normalize unicode characters
|
|
||||||
filename = unicodedata.normalize('NFKD', filename)
|
|
||||||
|
|
||||||
# Step 2: Remove invalid characters and replace spaces with underscores
|
|
||||||
# Keep alphanumeric characters, underscores, hyphens, and periods
|
|
||||||
sanitized = re.sub(r'[^\w\-\.]', '_', filename)
|
|
||||||
|
|
||||||
# Step 3: Collapse multiple underscores
|
|
||||||
sanitized = re.sub(r'_+', '_', sanitized)
|
|
||||||
|
|
||||||
# Step 4: Remove leading/trailing underscores
|
|
||||||
sanitized = sanitized.strip('_')
|
|
||||||
|
|
||||||
# Step 5: Ensure we have a valid filename (default if empty)
|
|
||||||
if not sanitized:
|
|
||||||
sanitized = "unnamed"
|
|
||||||
|
|
||||||
return sanitized
|
|
||||||
|
|
||||||
def _get_document_name(self, stream_info: StreamInfo) -> str:
|
def _get_document_name(self, stream_info: StreamInfo) -> str:
|
||||||
"""
|
"""
|
||||||
Extract document name from StreamInfo
|
Extract document name from StreamInfo and sanitize it
|
||||||
"""
|
"""
|
||||||
# First try to extract from filename attribute
|
# First try to extract from filename attribute
|
||||||
if stream_info.filename:
|
if stream_info.filename:
|
||||||
basename = os.path.basename(stream_info.filename)
|
basename = os.path.basename(stream_info.filename)
|
||||||
name, _ = os.path.splitext(basename)
|
name, _ = os.path.splitext(basename)
|
||||||
if name:
|
if name:
|
||||||
return self._sanitize_filename(name)
|
return name
|
||||||
|
|
||||||
# If local_path exists, try to extract from local path
|
# If local_path exists, try to extract from local path
|
||||||
if stream_info.local_path:
|
if stream_info.local_path:
|
||||||
basename = os.path.basename(stream_info.local_path)
|
basename = os.path.basename(stream_info.local_path)
|
||||||
name, _ = os.path.splitext(basename)
|
name, _ = os.path.splitext(basename)
|
||||||
if name:
|
if name:
|
||||||
return self._sanitize_filename(name)
|
return name
|
||||||
|
|
||||||
# If URL exists, try to extract from URL
|
# If URL exists, try to extract from URL
|
||||||
if stream_info.url:
|
if stream_info.url:
|
||||||
basename = os.path.basename(stream_info.url)
|
basename = os.path.basename(stream_info.url)
|
||||||
name, _ = os.path.splitext(basename)
|
name, _ = os.path.splitext(basename)
|
||||||
if name:
|
if name:
|
||||||
return self._sanitize_filename(name)
|
print(f"[DEBUG] Extracted document name from URL: {name}")
|
||||||
|
return name
|
||||||
|
|
||||||
# Default name
|
# Default name
|
||||||
return "docx_document"
|
return "docx_document"
|
||||||
|
|
||||||
|
def _extract_and_save_images(
|
||||||
|
self, html_content: str, doc_folder: str, assets_folder: str = "assets"
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Extract base64 images from HTML content, save them to filesystem, and update HTML with new image paths
|
||||||
|
|
||||||
|
Args:
|
||||||
|
html_content: The HTML content containing images
|
||||||
|
doc_folder: The document-specific folder name
|
||||||
|
assets_folder: The base folder for assets
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Updated HTML content with image references pointing to saved files
|
||||||
|
"""
|
||||||
|
# Parse HTML
|
||||||
|
soup = BeautifulSoup(html_content, "html.parser")
|
||||||
|
|
||||||
|
# Find all images
|
||||||
|
images = soup.find_all("img")
|
||||||
|
if not images:
|
||||||
|
return html_content
|
||||||
|
|
||||||
|
# Create output directory
|
||||||
|
output_dir = os.path.join(assets_folder, doc_folder)
|
||||||
|
os.makedirs(output_dir, exist_ok=True)
|
||||||
|
|
||||||
|
# Process each image
|
||||||
|
for img in images:
|
||||||
|
src = img.get("src", "") or img.get("data-src", "")
|
||||||
|
if not src or not src.startswith("data:image"):
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Parse image data
|
||||||
|
mime_type = src.split(";")[0].replace("data:", "")
|
||||||
|
|
||||||
|
# Get file extension
|
||||||
|
ext = {
|
||||||
|
"image/png": ".png",
|
||||||
|
"image/jpeg": ".jpg",
|
||||||
|
"image/jpg": ".jpg",
|
||||||
|
"image/gif": ".gif",
|
||||||
|
}.get(mime_type, ".png")
|
||||||
|
|
||||||
|
# Extract base64 data
|
||||||
|
encoded_data = src.split(",", 1)[1]
|
||||||
|
image_data = base64.b64decode(encoded_data)
|
||||||
|
|
||||||
|
# Generate unique filename
|
||||||
|
hashname = hashlib.sha256(image_data).hexdigest()[:8]
|
||||||
|
filename = f"image_{hashname}{ext}"
|
||||||
|
|
||||||
|
# Save file
|
||||||
|
filepath = os.path.join(output_dir, filename)
|
||||||
|
with open(filepath, "wb") as f:
|
||||||
|
f.write(image_data)
|
||||||
|
|
||||||
|
# Update image src in HTML
|
||||||
|
new_src = os.path.join(output_dir, filename).replace("\\", "/")
|
||||||
|
img["src"] = new_src
|
||||||
|
|
||||||
|
# Add alt text if empty
|
||||||
|
if not img.get("alt"):
|
||||||
|
img["alt"] = f"image_{hashname}"
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Return updated HTML
|
||||||
|
return str(soup)
|
||||||
|
|
||||||
def convert(
|
def convert(
|
||||||
self,
|
self,
|
||||||
file_stream: BinaryIO,
|
file_stream: BinaryIO,
|
||||||
stream_info: StreamInfo,
|
stream_info: StreamInfo,
|
||||||
**kwargs: Any, # Options to pass to the converter
|
**kwargs: Any, # Options to pass to the converter
|
||||||
) -> DocumentConverterResult:
|
) -> DocumentConverterResult:
|
||||||
# Check dependencies
|
# Check dependencies
|
||||||
if _dependency_exc_info is not None:
|
if _dependency_exc_info is not None:
|
||||||
raise MissingDependencyException(
|
raise MissingDependencyException(
|
||||||
|
|
@ -132,27 +179,37 @@ class DocxConverter(HtmlConverter):
|
||||||
_dependency_exc_info[2]
|
_dependency_exc_info[2]
|
||||||
)
|
)
|
||||||
|
|
||||||
# If conversion_name not explicitly provided, try to extract from stream_info
|
# Get document name
|
||||||
if "conversion_name" not in kwargs:
|
doc_name = kwargs.get("conversion_name") or self._get_document_name(stream_info)
|
||||||
conversion_name = self._get_document_name(stream_info)
|
if hasattr(self, "sanitize_filename"):
|
||||||
kwargs["conversion_name"] = conversion_name
|
doc_name = self.sanitize_filename(doc_name)
|
||||||
|
|
||||||
|
# Get assets folder
|
||||||
|
assets_folder = kwargs.get("image_output_dir", "assets")
|
||||||
|
|
||||||
|
# Convert DOCX to HTML
|
||||||
style_map = kwargs.get("style_map", None)
|
style_map = kwargs.get("style_map", None)
|
||||||
pre_process_stream = pre_process_docx(file_stream)
|
pre_process_stream = pre_process_docx(file_stream)
|
||||||
|
html_content = mammoth.convert_to_html(
|
||||||
# Convert to HTML and pass necessary parameters to HTML converter
|
pre_process_stream, style_map=style_map
|
||||||
html_content = mammoth.convert_to_html(pre_process_stream, style_map=style_map).value
|
).value
|
||||||
|
|
||||||
# Create new StreamInfo to pass to HTML converter
|
# Extract and save images, getting updated HTML with correct image references
|
||||||
html_stream_info = stream_info.copy_and_update(
|
processed_html = self._extract_and_save_images(
|
||||||
mimetype="text/html",
|
html_content, doc_name, assets_folder
|
||||||
extension=".html"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Use io.BytesIO to create binary stream
|
# Create a new StreamInfo for the HTML converter
|
||||||
from io import BytesIO
|
html_stream_info = stream_info.copy_and_update(
|
||||||
|
mimetype="text/html", extension=".html"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Use the standard HTML converter to convert to Markdown
|
||||||
|
# We don't need to pass conversion_name because images are already extracted
|
||||||
|
html_kwargs = {k: v for k, v in kwargs.items() if k != "conversion_name"}
|
||||||
|
|
||||||
return self._html_converter.convert(
|
return self._html_converter.convert(
|
||||||
file_stream=BytesIO(html_content.encode("utf-8")),
|
file_stream=BytesIO(processed_html.encode("utf-8")),
|
||||||
stream_info=html_stream_info,
|
stream_info=html_stream_info,
|
||||||
**kwargs,
|
**html_kwargs,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,5 @@
|
||||||
import re
|
import re
|
||||||
import markdownify
|
import markdownify
|
||||||
import os
|
|
||||||
import base64
|
|
||||||
import hashlib
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from typing import Any, Optional
|
from typing import Any, Optional
|
||||||
from urllib.parse import quote, unquote, urlparse, urlunparse
|
from urllib.parse import quote, unquote, urlparse, urlunparse
|
||||||
|
|
@ -20,15 +16,9 @@ class _CustomMarkdownify(markdownify.MarkdownConverter):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, **options: Any):
|
def __init__(self, **options: Any):
|
||||||
# Set default values for image-related options
|
|
||||||
self.image_output_dir = options.get("image_output_dir", "assets")
|
|
||||||
self.conversion_name = options.get("conversion_name")
|
|
||||||
|
|
||||||
# Apply basic options
|
|
||||||
options["heading_style"] = options.get("heading_style", markdownify.ATX)
|
options["heading_style"] = options.get("heading_style", markdownify.ATX)
|
||||||
options["keep_data_uris"] = options.get("keep_data_uris", False)
|
options["keep_data_uris"] = options.get("keep_data_uris", False)
|
||||||
|
# Explicitly cast options to the expected type if necessary
|
||||||
# Initialize parent class
|
|
||||||
super().__init__(**options)
|
super().__init__(**options)
|
||||||
|
|
||||||
def convert_hn(
|
def convert_hn(
|
||||||
|
|
@ -99,81 +89,23 @@ class _CustomMarkdownify(markdownify.MarkdownConverter):
|
||||||
convert_as_inline: Optional[bool] = False,
|
convert_as_inline: Optional[bool] = False,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""Same as usual converter, but removes data URIs"""
|
||||||
Process image elements, save data URI format images to filesystem
|
|
||||||
Supports categorized storage in subfolders by document name
|
|
||||||
"""
|
|
||||||
alt = el.attrs.get("alt", None) or ""
|
alt = el.attrs.get("alt", None) or ""
|
||||||
src = el.attrs.get("src", None) or el.attrs.get("data-src", None) or ""
|
src = el.attrs.get("src", None) or ""
|
||||||
title = el.attrs.get("title", None) or ""
|
title = el.attrs.get("title", None) or ""
|
||||||
title_part = ' "%s"' % title.replace('"', r"\"") if title else ""
|
title_part = ' "%s"' % title.replace('"', r"\"") if title else ""
|
||||||
|
|
||||||
# If in inline mode and not preserved, return alt text
|
|
||||||
if (
|
if (
|
||||||
convert_as_inline
|
convert_as_inline
|
||||||
and el.parent.name not in self.options.get("keep_inline_images_in", [])
|
and el.parent.name not in self.options["keep_inline_images_in"]
|
||||||
):
|
):
|
||||||
return alt
|
return alt
|
||||||
|
|
||||||
# Process data URI format images
|
# Remove dataURIs
|
||||||
if src.startswith("data:image") and not self.options.get("keep_data_uris", False):
|
if src.startswith("data:") and not self.options["keep_data_uris"]:
|
||||||
try:
|
|
||||||
# Parse MIME type
|
|
||||||
mime_type = src.split(";")[0].replace("data:", "")
|
|
||||||
|
|
||||||
# Get file extension
|
|
||||||
ext = {
|
|
||||||
"image/png": ".png",
|
|
||||||
"image/jpeg": ".jpg",
|
|
||||||
"image/jpg": ".jpg",
|
|
||||||
"image/gif": ".gif"
|
|
||||||
}.get(mime_type, ".png")
|
|
||||||
|
|
||||||
# Decode base64 data
|
|
||||||
encoded = src.split(",")[1]
|
|
||||||
image_data = base64.b64decode(encoded)
|
|
||||||
|
|
||||||
# Generate unique filename
|
|
||||||
hashname = hashlib.sha256(image_data).hexdigest()[:8]
|
|
||||||
filename = f"image_{hashname}{ext}"
|
|
||||||
|
|
||||||
# Determine output directory
|
|
||||||
if hasattr(self, 'conversion_name') and self.conversion_name:
|
|
||||||
# If conversion_name exists, create subfolder
|
|
||||||
output_dir = os.path.join(self.image_output_dir, self.conversion_name)
|
|
||||||
else:
|
|
||||||
# Otherwise use base directory
|
|
||||||
output_dir = self.image_output_dir
|
|
||||||
|
|
||||||
# Ensure directory exists
|
|
||||||
os.makedirs(output_dir, exist_ok=True)
|
|
||||||
|
|
||||||
# Save image file
|
|
||||||
filepath = os.path.join(output_dir, filename)
|
|
||||||
with open(filepath, "wb") as f:
|
|
||||||
f.write(image_data)
|
|
||||||
|
|
||||||
# Update src to relative path
|
|
||||||
src = os.path.join(output_dir, filename).replace("\\", "/")
|
|
||||||
|
|
||||||
# If alt text is empty, use the image filename (without extension) as alt text
|
|
||||||
if not alt:
|
|
||||||
alt = f"image_{hashname}"
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
error_msg = f"Error saving image: {str(e)}"
|
|
||||||
import traceback
|
|
||||||
traceback.print_exc(file=sys.stderr)
|
|
||||||
# If extraction fails, revert to original truncating behavior
|
|
||||||
src = src.split(",")[0] + "..."
|
|
||||||
return f" <!-- {error_msg} -->"
|
|
||||||
|
|
||||||
# Process other data URIs that are not images (truncate them)
|
|
||||||
elif src.startswith("data:") and not self.options.get("keep_data_uris", False):
|
|
||||||
src = src.split(",")[0] + "..."
|
src = src.split(",")[0] + "..."
|
||||||
|
|
||||||
# Return Markdown format image reference
|
return "" % (alt, src, title_part)
|
||||||
return f""
|
|
||||||
|
|
||||||
def convert_soup(self, soup: Any) -> str:
|
def convert_soup(self, soup: Any) -> str:
|
||||||
return super().convert_soup(soup) # type: ignore
|
return super().convert_soup(soup) # type: ignore
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue