Added support for vaious audio files.

This commit is contained in:
Adam Fourney 2025-03-05 10:15:42 -08:00
parent c426cb81b3
commit a9ceb13feb
11 changed files with 363 additions and 197 deletions

View file

@ -4,9 +4,6 @@ from warnings import warn
from typing import Any, Union, BinaryIO, Optional, List
from ._stream_info import StreamInfo
# Avoid printing the same warning multiple times
_WARNED: List[str] = []
class DocumentConverterResult:
"""The result of converting a document to Markdown."""

View file

@ -17,7 +17,7 @@ from warnings import warn
import puremagic
import requests
from ._stream_info import StreamInfo
from ._stream_info import StreamInfo, _guess_stream_info_from_stream
from .converters import (
PlainTextConverter,
@ -254,7 +254,7 @@ class MarkItDown:
with open(path, "rb") as fh:
# Prepare a list of configurations to try, starting with the base_stream_info
guesses: List[StreamInfo] = [base_stream_info]
for guess in StreamInfo.guess_from_stream(
for guess in _guess_stream_info_from_stream(
file_stream=fh, filename_hint=path
):
guesses.append(base_stream_info.copy_and_update(guess))
@ -298,7 +298,7 @@ class MarkItDown:
placeholder_filename = "placeholder" + base_guess.extension
# Add guesses based on stream content
for guess in StreamInfo.guess_from_stream(
for guess in _guess_stream_info_from_stream(
file_stream=stream, filename_hint=placeholder_filename
):
guesses.append(base_guess.copy_and_update(guess))
@ -393,7 +393,7 @@ class MarkItDown:
placeholder_filename = "placeholder" + base_guess.extension
# Add guesses based on stream content
for guess in StreamInfo.guess_from_stream(
for guess in _guess_stream_info_from_stream(
file_stream=buffer, filename_hint=placeholder_filename
):
guesses.append(base_guess.copy_and_update(guess))

View file

@ -43,76 +43,82 @@ class StreamInfo:
return StreamInfo(**new_info)
@classmethod
def guess_from_stream(
cls: Type[T], file_stream: BinaryIO, *, filename_hint: Optional[str] = None
) -> List[T]:
"""
Guess StreamInfo properties (mostly mimetype and extension) from a stream.
Args:
- stream: The stream to guess the StreamInfo from.
- filename_hint [Optional]: A filename hint to help with the guessing (may be a placeholder, and not actually be the file name)
# Behavior subject to change.
# Do not rely on this outside of this module.
def _guess_stream_info_from_stream(
file_stream: BinaryIO,
*,
filename_hint: Optional[str] = None,
) -> List[StreamInfo]:
"""
Guess StreamInfo properties (mostly mimetype and extension) from a stream.
Returns a list of StreamInfo objects in order of confidence.
"""
guesses: List[StreamInfo] = []
Args:
- stream: The stream to guess the StreamInfo from.
- filename_hint [Optional]: A filename hint to help with the guessing (may be a placeholder, and not actually be the file name)
# Add a guess purely based on the filename hint
if filename_hint:
try:
mimetype, _ = mimetypes.guess_file_type(filename_hint)
except AttributeError:
mimetype, _ = mimetypes.guess_type(filename_hint)
Returns a list of StreamInfo objects in order of confidence.
"""
guesses: List[StreamInfo] = []
if mimetype:
guesses.append(
cls(mimetype=mimetype, extension=os.path.splitext(filename_hint)[1])
# Add a guess purely based on the filename hint
if filename_hint:
try:
mimetype, _ = mimetypes.guess_file_type(filename_hint)
except AttributeError:
mimetype, _ = mimetypes.guess_type(filename_hint)
if mimetype:
guesses.append(
StreamInfo(
mimetype=mimetype, extension=os.path.splitext(filename_hint)[1]
)
)
def _puremagic(
file_stream, filename_hint
) -> puremagic.main.PureMagicWithConfidence:
"""Wrap guesses to handle exceptions."""
try:
return puremagic.magic_stream(file_stream, filename=filename_hint)
except puremagic.main.PureError as e:
return []
def _puremagic(
file_stream, filename_hint
) -> puremagic.main.PureMagicWithConfidence:
"""Wrap guesses to handle exceptions."""
try:
return puremagic.magic_stream(file_stream, filename=filename_hint)
except puremagic.main.PureError as e:
return []
cur_pos = file_stream.tell()
type_guesses = _puremagic(file_stream, filename_hint=filename_hint)
if len(type_guesses) == 0:
# Fix for: https://github.com/microsoft/markitdown/issues/222
# If there are no guesses, then try again after trimming leading ASCII whitespaces.
# ASCII whitespace characters are those byte values in the sequence b' \t\n\r\x0b\f'
# (space, tab, newline, carriage return, vertical tab, form feed).
cur_pos = file_stream.tell()
type_guesses = _puremagic(file_stream, filename_hint=filename_hint)
if len(type_guesses) == 0:
# Fix for: https://github.com/microsoft/markitdown/issues/222
# If there are no guesses, then try again after trimming leading ASCII whitespaces.
# ASCII whitespace characters are those byte values in the sequence b' \t\n\r\x0b\f'
# (space, tab, newline, carriage return, vertical tab, form feed).
# Eat all the leading whitespace
file_stream.seek(cur_pos)
while True:
char = file_stream.read(1)
if not char: # End of file
break
if not char.isspace():
file_stream.seek(file_stream.tell() - 1)
break
# Try again
type_guesses = _puremagic(file_stream, filename_hint=filename_hint)
# Eat all the leading whitespace
file_stream.seek(cur_pos)
while True:
char = file_stream.read(1)
if not char: # End of file
break
if not char.isspace():
file_stream.seek(file_stream.tell() - 1)
break
# Convert and return the guesses
for guess in type_guesses:
kwargs: dict[str, str] = {}
if guess.extension:
kwargs["extension"] = guess.extension
if guess.mime_type:
kwargs["mimetype"] = MIMETYPE_SUBSTITUTIONS.get(
guess.mime_type, guess.mime_type
)
if len(kwargs) > 0:
# We don't add the filename_hint, because sometimes it's just a placeholder,
# and, in any case, doesn't add new information.
guesses.append(cls(**kwargs))
# Try again
type_guesses = _puremagic(file_stream, filename_hint=filename_hint)
file_stream.seek(cur_pos)
return guesses
# Convert and return the guesses
for guess in type_guesses:
kwargs: dict[str, str] = {}
if guess.extension:
kwargs["extension"] = guess.extension
if guess.mime_type:
kwargs["mimetype"] = MIMETYPE_SUBSTITUTIONS.get(
guess.mime_type, guess.mime_type
)
if len(kwargs) > 0:
# We don't add the filename_hint, because sometimes it's just a placeholder,
# and, in any case, doesn't add new information.
guesses.append(StreamInfo(**kwargs))
return guesses

View file

@ -1,9 +1,12 @@
from typing import Any, Union
import re
import sys
import re
from typing import BinaryIO, Any, List
from ._html_converter import HtmlConverter
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._exceptions import MissingDependencyException
from .._stream_info import StreamInfo
from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE
# Try loading optional (but in this case, required) dependencies
# Save reporting of any exceptions for later
@ -26,6 +29,40 @@ except ImportError:
CONTENT_FORMAT = "markdown"
OFFICE_MIME_TYPE_PREFIXES = [
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.openxmlformats-officedocument.presentationml",
"application/xhtml",
"text/html",
]
OTHER_MIME_TYPE_PREFIXES = [
"application/pdf",
"application/x-pdf",
"text/html",
"image/",
]
OFFICE_FILE_EXTENSIONS = [
".docx",
".xlsx",
".pptx",
".html",
".htm",
]
OTHER_FILE_EXTENSIONS = [
".pdf",
".jpeg",
".jpg",
".png",
".bmp",
".tiff",
".heif",
]
class DocumentIntelligenceConverter(DocumentConverter):
"""Specialized DocumentConverter that uses Document Intelligence to extract text from documents."""
@ -57,46 +94,57 @@ class DocumentIntelligenceConverter(DocumentConverter):
)
self._priority = priority
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
# Bail if extension is not supported by Document Intelligence
extension = kwargs.get("file_extension", "")
docintel_extensions = [
".pdf",
".docx",
".xlsx",
".pptx",
".html",
".jpeg",
".jpg",
".png",
".bmp",
".tiff",
".heif",
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if extension in OFFICE_FILE_EXTENSIONS + OTHER_FILE_EXTENSIONS:
return True
for prefix in OFFICE_MIME_TYPE_PREFIXES + OTHER_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
return False
def _analysis_features(self, stream_info: StreamInfo) -> List[str]:
"""
Helper needed to determine which analysis features to use.
Certain document analysis features are not availiable for
office filetypes (.xlsx, .pptx, .html, .docx)
"""
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if extension in OFFICE_FILE_EXTENSIONS:
return []
for prefix in OFFICE_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return []
return [
DocumentAnalysisFeature.FORMULAS, # enable formula extraction
DocumentAnalysisFeature.OCR_HIGH_RESOLUTION, # enable high resolution OCR
DocumentAnalysisFeature.STYLE_FONT, # enable font style extraction
]
if extension.lower() not in docintel_extensions:
return None
# Get the bytestring for the local path
with open(local_path, "rb") as f:
file_bytes = f.read()
# Certain document analysis features are not availiable for office filetypes (.xlsx, .pptx, .html, .docx)
if extension.lower() in [".xlsx", ".pptx", ".html", ".docx"]:
analysis_features = []
else:
analysis_features = [
DocumentAnalysisFeature.FORMULAS, # enable formula extraction
DocumentAnalysisFeature.OCR_HIGH_RESOLUTION, # enable high resolution OCR
DocumentAnalysisFeature.STYLE_FONT, # enable font style extraction
]
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
# Extract the text using Azure Document Intelligence
poller = self.doc_intel_client.begin_analyze_document(
model_id="prebuilt-layout",
body=AnalyzeDocumentRequest(bytes_source=file_bytes),
features=analysis_features,
body=AnalyzeDocumentRequest(bytes_source=file_stream.read()),
features=self._analysis_features(stream_info),
output_content_format=CONTENT_FORMAT, # TODO: replace with "ContentFormat.MARKDOWN" when the bug is fixed
)
result: AnalyzeResult = poller.result()

View file

@ -1,7 +1,6 @@
import tempfile
from typing import Union
from .._base_converter import DocumentConverter, DocumentConverterResult
from ._wav_converter import WavConverter
from warnings import resetwarnings, catch_warnings
# Optional Transcription support
@ -23,64 +22,65 @@ finally:
resetwarnings()
class Mp3Converter(WavConverter):
class Mp3Converter(DocumentConverter):
"""
Converts MP3 files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` AND `pydub` are installed).
"""
def __init__(
self, priority: float = DocumentConverter.PRIORITY_SPECIFIC_FILE_FORMAT
):
super().__init__(priority=priority)
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bail if not a MP3
extension = kwargs.get("file_extension", "")
if extension.lower() != ".mp3":
return None
md_content = ""
# Add metadata
metadata = self._get_metadata(local_path, kwargs.get("exiftool_path"))
if metadata:
for f in [
"Title",
"Artist",
"Author",
"Band",
"Album",
"Genre",
"Track",
"DateTimeOriginal",
"CreateDate",
"Duration",
]:
if f in metadata:
md_content += f"{f}: {metadata[f]}\n"
# Transcribe
if IS_AUDIO_TRANSCRIPTION_CAPABLE:
handle, temp_path = tempfile.mkstemp(suffix=".wav")
os.close(handle)
try:
sound = pydub.AudioSegment.from_mp3(local_path)
sound.export(temp_path, format="wav")
_args = dict()
_args.update(kwargs)
_args["file_extension"] = ".wav"
try:
transcript = super()._transcribe_audio(temp_path).strip()
md_content += "\n\n### Audio Transcript:\n" + (
"[No speech detected]" if transcript == "" else transcript
)
except Exception:
md_content += "\n\n### Audio Transcript:\nError. Could not transcribe this audio."
finally:
os.unlink(temp_path)
# Return the result
return DocumentConverterResult(markdown=md_content.strip())
# def __init__(
# self, priority: float = DocumentConverter.PRIORITY_SPECIFIC_FILE_FORMAT
# ):
# super().__init__(priority=priority)
#
# def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# # Bail if not a MP3
# extension = kwargs.get("file_extension", "")
# if extension.lower() != ".mp3":
# return None
#
# md_content = ""
#
# # Add metadata
# metadata = self._get_metadata(local_path, kwargs.get("exiftool_path"))
# if metadata:
# for f in [
# "Title",
# "Artist",
# "Author",
# "Band",
# "Album",
# "Genre",
# "Track",
# "DateTimeOriginal",
# "CreateDate",
# "Duration",
# ]:
# if f in metadata:
# md_content += f"{f}: {metadata[f]}\n"
#
# # Transcribe
# if IS_AUDIO_TRANSCRIPTION_CAPABLE:
# handle, temp_path = tempfile.mkstemp(suffix=".wav")
# os.close(handle)
# try:
# sound = pydub.AudioSegment.from_mp3(local_path)
# sound.export(temp_path, format="wav")
#
# _args = dict()
# _args.update(kwargs)
# _args["file_extension"] = ".wav"
#
# try:
# transcript = super()._transcribe_audio(temp_path).strip()
# md_content += "\n\n### Audio Transcript:\n" + (
# "[No speech detected]" if transcript == "" else transcript
# )
# except Exception:
# md_content += "\n\n### Audio Transcript:\nError. Could not transcribe this audio."
#
# finally:
# os.unlink(temp_path)
#
# # Return the result
# return DocumentConverterResult(markdown=md_content.strip())

View file

@ -0,0 +1,38 @@
import io
from typing import BinaryIO
from .._exceptions import MissingDependencyException
# Try loading optional (but in this case, required) dependencies
# Save reporting of any exceptions for later
_dependency_exc_info = None
try:
import speech_recognition as sr
import pydub
except ImportError:
# Preserve the error and stack trace for later
_dependency_exc_info = sys.exc_info()
def transcribe_audio(file_stream: BinaryIO, *, audio_format: str = "wav") -> str:
# Check for installed dependencies
if _dependency_exc_info is not None:
raise MissingDependencyException(
"Speech transcription requires installing MarkItdown with the [audio-transcription] optional dependencies. E.g., `pip install markitdown[audio-transcription]` or `pip install markitdown[all]`"
) from _dependency_exc_info[1].with_traceback(_dependency_exc_info[2])
if audio_format in ["wav", "aiff", "flac"]:
audio_source = file_stream
elif audio_format in ["mp3", "mp4"]:
audio_segment = pydub.AudioSegment.from_file(file_stream, format=audio_format)
audio_source = io.BytesIO()
audio_segment.export(audio_source, format="wav")
audio_source.seek(0)
else:
raise ValueError(f"Unsupported audio format: {audio_format}")
recognizer = sr.Recognizer()
with sr.AudioFile(audio_source) as source:
audio = recognizer.record(source)
transcript = recognizer.recognize_google(audio).strip()
return "[No speech detected]" if transcript == "" else transcript

View file

@ -1,18 +1,27 @@
from typing import Union
import io
from typing import Any, BinaryIO, Optional
from ._exiftool import exiftool_metadata
from ._transcribe_audio import transcribe_audio
from .._base_converter import DocumentConverter, DocumentConverterResult
from ._media_converter import MediaConverter
from .._stream_info import StreamInfo
from .._exceptions import MissingDependencyException
# Optional Transcription support
IS_AUDIO_TRANSCRIPTION_CAPABLE = False
try:
import speech_recognition as sr
ACCEPTED_MIME_TYPE_PREFIXES = [
"audio/x-wav",
"audio/mpeg",
"video/mp4",
]
IS_AUDIO_TRANSCRIPTION_CAPABLE = True
except ModuleNotFoundError:
pass
ACCEPTED_FILE_EXTENSIONS = [
".wav",
".mp3",
".m4a",
".mp4",
]
class WavConverter(MediaConverter):
class WavConverter(DocumentConverter):
"""
Converts WAV files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` is installed).
"""
@ -22,16 +31,40 @@ class WavConverter(MediaConverter):
):
super().__init__(priority=priority)
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bail if not a WAV
extension = kwargs.get("file_extension", "")
if extension.lower() != ".wav":
return None
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
"""
Make sure we're dealing with HTML content *from* Wikipedia.
"""
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
return False
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
md_content = ""
# Add metadata
metadata = self._get_metadata(local_path, kwargs.get("exiftool_path"))
metadata = exiftool_metadata(
file_stream, exiftool_path=kwargs.get("exiftool_path")
)
if metadata:
for f in [
"Title",
@ -43,27 +76,36 @@ class WavConverter(MediaConverter):
"Track",
"DateTimeOriginal",
"CreateDate",
"Duration",
# "Duration", -- Wrong values when read from memory
"NumChannels",
"SampleRate",
"AvgBytesPerSec",
"BitsPerSample",
]:
if f in metadata:
md_content += f"{f}: {metadata[f]}\n"
# Figure out the audio format for transcription
if stream_info.extension == ".wav" or stream_info.mimetype == "audio/x-wav":
audio_format = "wav"
elif stream_info.extension == ".mp3" or stream_info.mimetype == "audio/mpeg":
audio_format = "mp3"
elif (
stream_info.extension in [".mp4", ".m4a"]
or stream_info.mimetype == "video/mp4"
):
audio_format = "mp4"
else:
audio_format = None
# Transcribe
if IS_AUDIO_TRANSCRIPTION_CAPABLE:
if audio_format:
try:
transcript = self._transcribe_audio(local_path)
md_content += "\n\n### Audio Transcript:\n" + (
"[No speech detected]" if transcript == "" else transcript
)
except Exception:
md_content += (
"\n\n### Audio Transcript:\nError. Could not transcribe this audio."
)
transcript = transcribe_audio(file_stream, audio_format=audio_format)
if transcript:
md_content += "\n\n### Audio Transcript:\n" + transcript
except MissingDependencyException:
pass
# Return the result
return DocumentConverterResult(markdown=md_content.strip())
def _transcribe_audio(self, local_path) -> str:
recognizer = sr.Recognizer()
with sr.AudioFile(local_path) as source:
audio = recognizer.record(source)
return recognizer.recognize_google(audio).strip()

Binary file not shown.

Binary file not shown.

Binary file not shown.

View file

@ -15,6 +15,7 @@ from markitdown import (
FileConversionException,
StreamInfo,
)
from markitdown._stream_info import _guess_stream_info_from_stream
skip_remote = (
True if os.environ.get("GITHUB_ACTIONS") else False
@ -41,6 +42,13 @@ JPG_TEST_EXIFTOOL = {
"DateTimeOriginal": "2024:03:14 22:10:00",
}
MP3_TEST_EXIFTOOL = {
"Title": "f67a499e-a7d0-4ca3-a49b-358bd934ae3e",
"Artist": "Artist Name Test String",
"Album": "Album Name Test String",
"SampleRate": "48000",
}
PDF_TEST_URL = "https://arxiv.org/pdf/2308.08155v2.pdf"
PDF_TEST_STRINGS = [
"While there is contemporaneous exploration of multi-agent approaches"
@ -261,7 +269,7 @@ def test_stream_info_guesses() -> None:
for file_path, expected_mimetype in test_tuples:
with open(file_path, "rb") as f:
guesses = StreamInfo.guess_from_stream(
guesses = _guess_stream_info_from_stream(
f, filename_hint=os.path.basename(file_path)
)
assert len(guesses) > 0
@ -389,6 +397,26 @@ def test_markitdown_local() -> None:
assert "# Test" in result.text_content
@pytest.mark.skipif(
skip_remote,
reason="do not run remotely run speech transcription tests",
)
def test_speech_transcription() -> None:
markitdown = MarkItDown()
# Test WAV files, MP3 and M4A files
for file_name in ["test.wav", "test.mp3", "test.m4a"]:
result = markitdown.convert(os.path.join(TEST_FILES_DIR, file_name))
result_lower = result.text_content.lower()
assert (
("1" in result_lower or "one" in result_lower)
and ("2" in result_lower or "two" in result_lower)
and ("3" in result_lower or "three" in result_lower)
and ("4" in result_lower or "four" in result_lower)
and ("5" in result_lower or "five" in result_lower)
)
def test_exceptions() -> None:
# Check that an exception is raised when trying to convert an unsupported format
markitdown = MarkItDown()
@ -437,6 +465,12 @@ def test_markitdown_exiftool() -> None:
target = f"{key}: {JPG_TEST_EXIFTOOL[key]}"
assert target in result.text_content
# Test some other media types
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.mp3"))
for key in MP3_TEST_EXIFTOOL:
target = f"{key}: {MP3_TEST_EXIFTOOL[key]}"
assert target in result.text_content
@pytest.mark.skipif(
skip_llm,
@ -470,6 +504,7 @@ if __name__ == "__main__":
test_stream_info_guesses()
test_markitdown_remote()
test_markitdown_local()
test_speech_transcription()
test_exceptions()
test_markitdown_exiftool()
test_markitdown_llm()