This commit is contained in:
Hieu Lam 2025-03-10 10:36:05 +07:00 committed by GitHub
commit c3d241ec12
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 44 additions and 56 deletions

View file

@ -27,8 +27,7 @@ dependencies = [
"beautifulsoup4", "beautifulsoup4",
"requests", "requests",
"markdownify", "markdownify",
"puremagic", "magika>=0.6.0rc1",
"pathvalidate",
"charset-normalizer", "charset-normalizer",
] ]

View file

@ -14,9 +14,6 @@ from typing import Any, List, Optional, Union, BinaryIO
from pathlib import Path from pathlib import Path
from urllib.parse import urlparse from urllib.parse import urlparse
from warnings import warn from warnings import warn
# File-format detection
import puremagic
import requests import requests
from ._stream_info import StreamInfo, _guess_stream_info_from_stream from ._stream_info import StreamInfo, _guess_stream_info_from_stream

View file

@ -1,14 +1,10 @@
import puremagic
import mimetypes import mimetypes
import os import os
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from typing import Optional, BinaryIO, List, TypeVar, Type from typing import Optional, BinaryIO, List, TypeVar, Type
from magika import Magika
# Mimetype substitutions table magika = Magika()
MIMETYPE_SUBSTITUTIONS = {
"application/excel": "application/vnd.ms-excel",
"application/mspowerpoint": "application/vnd.ms-powerpoint",
}
@dataclass(kw_only=True, frozen=True) @dataclass(kw_only=True, frozen=True)
@ -59,6 +55,25 @@ def _guess_stream_info_from_stream(
""" """
guesses: List[StreamInfo] = [] guesses: List[StreamInfo] = []
# Call magika to guess from the stream
cur_pos = file_stream.tell()
try:
result = magika.identify_bytes(file_stream.read())
if result.status == "ok" and result.prediction.output.label != "unknown":
extension = None
if len(result.prediction.output.extensions) > 0:
extension = result.prediction.output.extensions[0]
if extension and not extension.startswith("."):
extension = "." + extension
guesses.append(
StreamInfo(
mimetype=result.prediction.output.mime_type,
extension=extension,
)
)
finally:
file_stream.seek(cur_pos)
# Add a guess purely based on the filename hint # Add a guess purely based on the filename hint
if filename_hint: if filename_hint:
try: try:
@ -74,49 +89,4 @@ def _guess_stream_info_from_stream(
) )
) )
def _puremagic(
file_stream, filename_hint
) -> List[puremagic.main.PureMagicWithConfidence]:
"""Wrap guesses to handle exceptions."""
try:
return puremagic.magic_stream(file_stream, filename=filename_hint)
except puremagic.main.PureError as e:
return []
cur_pos = file_stream.tell()
type_guesses = _puremagic(file_stream, filename_hint=filename_hint)
if len(type_guesses) == 0:
# Fix for: https://github.com/microsoft/markitdown/issues/222
# If there are no guesses, then try again after trimming leading ASCII whitespaces.
# ASCII whitespace characters are those byte values in the sequence b' \t\n\r\x0b\f'
# (space, tab, newline, carriage return, vertical tab, form feed).
# Eat all the leading whitespace
file_stream.seek(cur_pos)
while True:
char = file_stream.read(1)
if not char: # End of file
break
if not char.isspace():
file_stream.seek(file_stream.tell() - 1)
break
# Try again
type_guesses = _puremagic(file_stream, filename_hint=filename_hint)
file_stream.seek(cur_pos)
# Convert and return the guesses
for guess in type_guesses:
kwargs: dict[str, str] = {}
if guess.extension:
kwargs["extension"] = guess.extension
if guess.mime_type:
kwargs["mimetype"] = MIMETYPE_SUBSTITUTIONS.get(
guess.mime_type, guess.mime_type
)
if len(kwargs) > 0:
# We don't add the filename_hint, because sometimes it's just a placeholder,
# and, in any case, doesn't add new information.
guesses.append(StreamInfo(**kwargs))
return guesses return guesses

View file

@ -3,6 +3,9 @@ import markdownify
from typing import Any, Optional from typing import Any, Optional
from urllib.parse import quote, unquote, urlparse, urlunparse from urllib.parse import quote, unquote, urlparse, urlunparse
from magika import Magika
magika = Magika()
class _CustomMarkdownify(markdownify.MarkdownConverter): class _CustomMarkdownify(markdownify.MarkdownConverter):
@ -17,6 +20,25 @@ class _CustomMarkdownify(markdownify.MarkdownConverter):
def __init__(self, **options: Any): def __init__(self, **options: Any):
options["heading_style"] = options.get("heading_style", markdownify.ATX) options["heading_style"] = options.get("heading_style", markdownify.ATX)
# Add a custom code language callback to guess the language of code snippets
def code_language_callback(el):
extracted_code_snippet = el.get_text()
if not extracted_code_snippet:
return ""
result = magika.identify_bytes(extracted_code_snippet.encode())
if result.status == "ok" and result.prediction.output.group in [
"text",
"code",
]:
language = result.prediction.output.label
return language
return ""
options["code_language_callback"] = options.get(
"code_language_callback", code_language_callback
)
# Explicitly cast options to the expected type if necessary # Explicitly cast options to the expected type if necessary
super().__init__(**options) super().__init__(**options)