enhancecovertdocx

This commit is contained in:
朱昊天 2025-04-30 12:53:48 +12:00
parent 041be54471
commit a0023e691c
3 changed files with 133 additions and 15 deletions

View file

@ -1,5 +1,5 @@
import sys import sys
import os
from typing import BinaryIO, Any from typing import BinaryIO, Any
from ._html_converter import HtmlConverter from ._html_converter import HtmlConverter
@ -52,13 +52,47 @@ class DocxConverter(HtmlConverter):
return False return False
def _get_document_name(self, stream_info: StreamInfo) -> str:
"""
Extract document name from StreamInfo
"""
# First try to extract from filename attribute
if stream_info.filename:
basename = os.path.basename(stream_info.filename)
name, _ = os.path.splitext(basename)
if name:
print(f"[DEBUG] Extracted document name from filename: {name}")
return name
# If local_path exists, try to extract from local path
if stream_info.local_path:
basename = os.path.basename(stream_info.local_path)
name, _ = os.path.splitext(basename)
if name:
print(f"[DEBUG] Extracted document name from local_path: {name}")
return name
# If URL exists, try to extract from URL
if stream_info.url:
basename = os.path.basename(stream_info.url)
name, _ = os.path.splitext(basename)
if name:
print(f"[DEBUG] Extracted document name from URL: {name}")
return name
# Default name
return "docx_document"
def convert( def convert(
self, self,
file_stream: BinaryIO, file_stream: BinaryIO,
stream_info: StreamInfo, stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter **kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult: ) -> DocumentConverterResult:
# Check: the dependencies print(f"[DEBUG] DocxConverter.convert called with kwargs: {kwargs}")
print(f"[DEBUG] StreamInfo: filename={stream_info.filename}, local_path={stream_info.local_path}, url={stream_info.url}")
# Check dependencies
if _dependency_exc_info is not None: if _dependency_exc_info is not None:
raise MissingDependencyException( raise MissingDependencyException(
MISSING_DEPENDENCY_MESSAGE.format( MISSING_DEPENDENCY_MESSAGE.format(
@ -72,9 +106,29 @@ class DocxConverter(HtmlConverter):
_dependency_exc_info[2] _dependency_exc_info[2]
) )
# If conversion_name not explicitly provided, try to extract from stream_info
if "conversion_name" not in kwargs:
conversion_name = self._get_document_name(stream_info)
kwargs["conversion_name"] = conversion_name
print(f"[DEBUG] Setting conversion_name to: {conversion_name}")
style_map = kwargs.get("style_map", None) style_map = kwargs.get("style_map", None)
pre_process_stream = pre_process_docx(file_stream) pre_process_stream = pre_process_docx(file_stream)
return self._html_converter.convert_string(
mammoth.convert_to_html(pre_process_stream, style_map=style_map).value, # Convert to HTML and pass necessary parameters to HTML converter
**kwargs, html_content = mammoth.convert_to_html(pre_process_stream, style_map=style_map).value
# Create new StreamInfo to pass to HTML converter
html_stream_info = stream_info.copy_and_update(
mimetype="text/html",
extension=".html"
) )
print(f"[DEBUG] Calling HTML converter with parameters: conversion_name={kwargs.get('conversion_name')}")
# Use io.BytesIO to create binary stream
from io import BytesIO
return self._html_converter.convert(
file_stream=BytesIO(html_content.encode("utf-8")),
stream_info=html_stream_info,
**kwargs,
)

View file

@ -61,7 +61,7 @@ class HtmlConverter(DocumentConverter):
webpage_text = _CustomMarkdownify(**kwargs).convert_soup(soup) webpage_text = _CustomMarkdownify(**kwargs).convert_soup(soup)
assert isinstance(webpage_text, str) assert isinstance(webpage_text, str)
converter = _CustomMarkdownify(image_output_dir="assets")
# remove leading and trailing \n # remove leading and trailing \n
webpage_text = webpage_text.strip() webpage_text = webpage_text.strip()

View file

@ -1,5 +1,9 @@
import re import re
import markdownify import markdownify
import os
import base64
import hashlib
import sys
from typing import Any, Optional from typing import Any, Optional
from urllib.parse import quote, unquote, urlparse, urlunparse from urllib.parse import quote, unquote, urlparse, urlunparse
@ -16,9 +20,15 @@ class _CustomMarkdownify(markdownify.MarkdownConverter):
""" """
def __init__(self, **options: Any): def __init__(self, **options: Any):
# Set default values for image-related options
self.image_output_dir = options.get("image_output_dir", "assets")
self.conversion_name = options.get("conversion_name")
# Apply basic options
options["heading_style"] = options.get("heading_style", markdownify.ATX) options["heading_style"] = options.get("heading_style", markdownify.ATX)
options["keep_data_uris"] = options.get("keep_data_uris", False) options["keep_data_uris"] = options.get("keep_data_uris", False)
# Explicitly cast options to the expected type if necessary
# Initialize parent class
super().__init__(**options) super().__init__(**options)
def convert_hn( def convert_hn(
@ -89,23 +99,77 @@ class _CustomMarkdownify(markdownify.MarkdownConverter):
convert_as_inline: Optional[bool] = False, convert_as_inline: Optional[bool] = False,
**kwargs, **kwargs,
) -> str: ) -> str:
"""Same as usual converter, but removes data URIs""" """
Process image elements, save data URI format images to filesystem
Supports categorized storage in subfolders by document name
"""
alt = el.attrs.get("alt", None) or "" alt = el.attrs.get("alt", None) or ""
src = el.attrs.get("src", None) or "" src = el.attrs.get("src", None) or ""
title = el.attrs.get("title", None) or "" title = el.attrs.get("title", None) or ""
title_part = ' "%s"' % title.replace('"', r"\"") if title else "" title_part = ' "%s"' % title.replace('"', r"\"") if title else ""
# If in inline mode and not preserved, return alt text
if ( if (
convert_as_inline convert_as_inline
and el.parent.name not in self.options["keep_inline_images_in"] and el.parent.name not in self.options.get("keep_inline_images_in", [])
): ):
return alt return alt
# Remove dataURIs # Process data URI format images
if src.startswith("data:") and not self.options["keep_data_uris"]: if src.startswith("data:image") and not self.options.get("keep_data_uris", False):
src = src.split(",")[0] + "..." try:
# Parse MIME type
return "![%s](%s%s)" % (alt, src, title_part) mime_type = src.split(";")[0].replace("data:", "")
# Get file extension
ext = {
"image/png": ".png",
"image/jpeg": ".jpg",
"image/jpg": ".jpg",
"image/gif": ".gif"
}.get(mime_type, ".png")
# Decode base64 data
encoded = src.split(",")[1]
image_data = base64.b64decode(encoded)
# Generate unique filename
hashname = hashlib.sha256(image_data).hexdigest()[:8]
filename = f"image_{hashname}{ext}"
# Determine output directory
if hasattr(self, 'conversion_name') and self.conversion_name:
# If conversion_name exists, create subfolder
output_dir = os.path.join(self.image_output_dir, self.conversion_name)
print(f"[DEBUG] Using subfolder for image: {output_dir}")
else:
# Otherwise use base directory
output_dir = self.image_output_dir
print(f"[DEBUG] Using base directory for image: {output_dir}")
# Ensure directory exists
os.makedirs(output_dir, exist_ok=True)
print(f"[DEBUG] Ensuring directory exists: {output_dir}")
# Save image file
filepath = os.path.join(output_dir, filename)
with open(filepath, "wb") as f:
f.write(image_data)
print(f"[DEBUG] Image saved to: {filepath}")
# Update src to relative path
src = os.path.join(output_dir, filename).replace("\\", "/")
print(f"[DEBUG] Updated image path to: {src}")
except Exception as e:
error_msg = f"Error saving image: {str(e)}"
print(f"[ERROR] {error_msg}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
return f"![{alt}](image_error.png) <!-- {error_msg} -->"
# Return Markdown format image reference
return f"![{alt}]({src}{title_part})"
def convert_soup(self, soup: Any) -> str: def convert_soup(self, soup: Any) -> str:
return super().convert_soup(soup) # type: ignore return super().convert_soup(soup) # type: ignore