supportfordata-src

This commit is contained in:
朱昊天 2025-04-30 17:03:10 +12:00
parent db9277bc79
commit 17081dae64
2 changed files with 13 additions and 159 deletions

View file

@ -1,9 +1,6 @@
import sys import sys
import os
import re
import unicodedata
from typing import BinaryIO, Any from typing import BinaryIO, Any
from io import BytesIO
from ._html_converter import HtmlConverter from ._html_converter import HtmlConverter
from ..converter_utils.docx.pre_process import pre_process_docx from ..converter_utils.docx.pre_process import pre_process_docx
@ -55,70 +52,13 @@ class DocxConverter(HtmlConverter):
return False return False
def _sanitize_filename(self, filename: str) -> str:
"""
Sanitize a filename by removing or replacing problematic characters.
Args:
filename: The original filename
Returns:
A sanitized filename safe for filesystem use
"""
# Step 1: Normalize unicode characters
filename = unicodedata.normalize('NFKD', filename)
# Step 2: Remove invalid characters and replace spaces with underscores
# Keep alphanumeric characters, underscores, hyphens, and periods
sanitized = re.sub(r'[^\w\-\.]', '_', filename)
# Step 3: Collapse multiple underscores
sanitized = re.sub(r'_+', '_', sanitized)
# Step 4: Remove leading/trailing underscores
sanitized = sanitized.strip('_')
# Step 5: Ensure we have a valid filename (default if empty)
if not sanitized:
sanitized = "unnamed"
return sanitized
def _get_document_name(self, stream_info: StreamInfo) -> str:
"""
Extract document name from StreamInfo
"""
# First try to extract from filename attribute
if stream_info.filename:
basename = os.path.basename(stream_info.filename)
name, _ = os.path.splitext(basename)
if name:
return self._sanitize_filename(name)
# If local_path exists, try to extract from local path
if stream_info.local_path:
basename = os.path.basename(stream_info.local_path)
name, _ = os.path.splitext(basename)
if name:
return self._sanitize_filename(name)
# If URL exists, try to extract from URL
if stream_info.url:
basename = os.path.basename(stream_info.url)
name, _ = os.path.splitext(basename)
if name:
return self._sanitize_filename(name)
# Default name
return "docx_document"
def convert( def convert(
self, self,
file_stream: BinaryIO, file_stream: BinaryIO,
stream_info: StreamInfo, stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter **kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult: ) -> DocumentConverterResult:
# Check dependencies # Check: the dependencies
if _dependency_exc_info is not None: if _dependency_exc_info is not None:
raise MissingDependencyException( raise MissingDependencyException(
MISSING_DEPENDENCY_MESSAGE.format( MISSING_DEPENDENCY_MESSAGE.format(
@ -132,27 +72,9 @@ class DocxConverter(HtmlConverter):
_dependency_exc_info[2] _dependency_exc_info[2]
) )
# If conversion_name not explicitly provided, try to extract from stream_info
if "conversion_name" not in kwargs:
conversion_name = self._get_document_name(stream_info)
kwargs["conversion_name"] = conversion_name
style_map = kwargs.get("style_map", None) style_map = kwargs.get("style_map", None)
pre_process_stream = pre_process_docx(file_stream) pre_process_stream = pre_process_docx(file_stream)
return self._html_converter.convert_string(
# Convert to HTML and pass necessary parameters to HTML converter mammoth.convert_to_html(pre_process_stream, style_map=style_map).value,
html_content = mammoth.convert_to_html(pre_process_stream, style_map=style_map).value
# Create new StreamInfo to pass to HTML converter
html_stream_info = stream_info.copy_and_update(
mimetype="text/html",
extension=".html"
)
# Use io.BytesIO to create binary stream
from io import BytesIO
return self._html_converter.convert(
file_stream=BytesIO(html_content.encode("utf-8")),
stream_info=html_stream_info,
**kwargs, **kwargs,
) )

View file

@ -1,9 +1,5 @@
import re import re
import markdownify import markdownify
import os
import base64
import hashlib
import sys
from typing import Any, Optional from typing import Any, Optional
from urllib.parse import quote, unquote, urlparse, urlunparse from urllib.parse import quote, unquote, urlparse, urlunparse
@ -20,15 +16,9 @@ class _CustomMarkdownify(markdownify.MarkdownConverter):
""" """
def __init__(self, **options: Any): def __init__(self, **options: Any):
# Set default values for image-related options
self.image_output_dir = options.get("image_output_dir", "assets")
self.conversion_name = options.get("conversion_name")
# Apply basic options
options["heading_style"] = options.get("heading_style", markdownify.ATX) options["heading_style"] = options.get("heading_style", markdownify.ATX)
options["keep_data_uris"] = options.get("keep_data_uris", False) options["keep_data_uris"] = options.get("keep_data_uris", False)
# Explicitly cast options to the expected type if necessary
# Initialize parent class
super().__init__(**options) super().__init__(**options)
def convert_hn( def convert_hn(
@ -99,81 +89,23 @@ class _CustomMarkdownify(markdownify.MarkdownConverter):
convert_as_inline: Optional[bool] = False, convert_as_inline: Optional[bool] = False,
**kwargs, **kwargs,
) -> str: ) -> str:
""" """Same as usual converter, but removes data URIs"""
Process image elements, save data URI format images to filesystem
Supports categorized storage in subfolders by document name
"""
alt = el.attrs.get("alt", None) or "" alt = el.attrs.get("alt", None) or ""
src = el.attrs.get("src", None) or el.attrs.get("data-src", None) or "" src = el.attrs.get("src", None) or el.attrs.get("data-src", None) or ""
title = el.attrs.get("title", None) or "" title = el.attrs.get("title", None) or ""
title_part = ' "%s"' % title.replace('"', r"\"") if title else "" title_part = ' "%s"' % title.replace('"', r"\"") if title else ""
# If in inline mode and not preserved, return alt text
if ( if (
convert_as_inline convert_as_inline
and el.parent.name not in self.options.get("keep_inline_images_in", []) and el.parent.name not in self.options["keep_inline_images_in"]
): ):
return alt return alt
# Process data URI format images # Remove dataURIs
if src.startswith("data:image") and not self.options.get("keep_data_uris", False): if src.startswith("data:") and not self.options["keep_data_uris"]:
try:
# Parse MIME type
mime_type = src.split(";")[0].replace("data:", "")
# Get file extension
ext = {
"image/png": ".png",
"image/jpeg": ".jpg",
"image/jpg": ".jpg",
"image/gif": ".gif"
}.get(mime_type, ".png")
# Decode base64 data
encoded = src.split(",")[1]
image_data = base64.b64decode(encoded)
# Generate unique filename
hashname = hashlib.sha256(image_data).hexdigest()[:8]
filename = f"image_{hashname}{ext}"
# Determine output directory
if hasattr(self, 'conversion_name') and self.conversion_name:
# If conversion_name exists, create subfolder
output_dir = os.path.join(self.image_output_dir, self.conversion_name)
else:
# Otherwise use base directory
output_dir = self.image_output_dir
# Ensure directory exists
os.makedirs(output_dir, exist_ok=True)
# Save image file
filepath = os.path.join(output_dir, filename)
with open(filepath, "wb") as f:
f.write(image_data)
# Update src to relative path
src = os.path.join(output_dir, filename).replace("\\", "/")
# If alt text is empty, use the image filename (without extension) as alt text
if not alt:
alt = f"image_{hashname}"
except Exception as e:
error_msg = f"Error saving image: {str(e)}"
import traceback
traceback.print_exc(file=sys.stderr)
# If extraction fails, revert to original truncating behavior
src = src.split(",")[0] + "..."
return f"![{alt}](image_error.png) <!-- {error_msg} -->"
# Process other data URIs that are not images (truncate them)
elif src.startswith("data:") and not self.options.get("keep_data_uris", False):
src = src.split(",")[0] + "..." src = src.split(",")[0] + "..."
# Return Markdown format image reference return "![%s](%s%s)" % (alt, src, title_part)
return f"![{alt}]({src}{title_part})"
def convert_soup(self, soup: Any) -> str: def convert_soup(self, soup: Any) -> str:
return super().convert_soup(soup) # type: ignore return super().convert_soup(soup) # type: ignore