Stream exiftool.
This commit is contained in:
parent
7879028c98
commit
4a034da269
2 changed files with 127 additions and 53 deletions
44
packages/markitdown/src/markitdown/converters/_exiftool.py
Normal file
44
packages/markitdown/src/markitdown/converters/_exiftool.py
Normal file
|
|
@ -0,0 +1,44 @@
|
||||||
|
import json
|
||||||
|
import subprocess
|
||||||
|
import locale
|
||||||
|
import sys
|
||||||
|
import shutil
|
||||||
|
import os
|
||||||
|
from warnings import warn
|
||||||
|
from typing import BinaryIO, Literal, Optional
|
||||||
|
|
||||||
|
|
||||||
|
def exiftool_metadata(
|
||||||
|
file_stream: BinaryIO, *, exiftool_path: Optional[str] = None
|
||||||
|
) -> dict[str, Literal]:
|
||||||
|
# Check if we have a valid pointer to exiftool
|
||||||
|
if not exiftool_path:
|
||||||
|
which_exiftool = shutil.which("exiftool")
|
||||||
|
if which_exiftool:
|
||||||
|
warn(
|
||||||
|
f"""Implicit discovery of 'exiftool' is disabled. If you would like to continue to use exiftool in MarkItDown, please set the exiftool_path parameter in the MarkItDown consructor. E.g.,
|
||||||
|
|
||||||
|
md = MarkItDown(exiftool_path="{which_exiftool}")
|
||||||
|
|
||||||
|
This warning will be removed in future releases.
|
||||||
|
""",
|
||||||
|
DeprecationWarning,
|
||||||
|
)
|
||||||
|
# Nothing to do
|
||||||
|
return {}
|
||||||
|
|
||||||
|
# Run exiftool
|
||||||
|
cur_pos = file_stream.tell()
|
||||||
|
try:
|
||||||
|
output = subprocess.run(
|
||||||
|
[exiftool_path, "-json", "-"],
|
||||||
|
input=file_stream.read(),
|
||||||
|
capture_output=True,
|
||||||
|
text=False,
|
||||||
|
).stdout
|
||||||
|
|
||||||
|
return json.loads(
|
||||||
|
output.decode(locale.getpreferredencoding(False)),
|
||||||
|
)[0]
|
||||||
|
finally:
|
||||||
|
file_stream.seek(cur_pos)
|
||||||
|
|
@ -1,11 +1,20 @@
|
||||||
from typing import Union
|
from typing import BinaryIO, Any
|
||||||
from .._base_converter import DocumentConverter, DocumentConverterResult
|
|
||||||
from ._media_converter import MediaConverter
|
|
||||||
import base64
|
import base64
|
||||||
import mimetypes
|
import mimetypes
|
||||||
|
from ._exiftool import exiftool_metadata
|
||||||
|
from .._base_converter import DocumentConverter, DocumentConverterResult
|
||||||
|
from .._stream_info import StreamInfo
|
||||||
|
from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE
|
||||||
|
|
||||||
|
ACCEPTED_MIME_TYPE_PREFIXES = [
|
||||||
|
"image/jpeg",
|
||||||
|
"image/png",
|
||||||
|
]
|
||||||
|
|
||||||
|
ACCEPTED_FILE_EXTENSIONS = [".jpg", ".jpeg", ".png"]
|
||||||
|
|
||||||
|
|
||||||
class ImageConverter(MediaConverter):
|
class ImageConverter(DocumentConverter):
|
||||||
"""
|
"""
|
||||||
Converts images to markdown via extraction of metadata (if `exiftool` is installed), and description via a multimodal LLM (if an llm_client is configured).
|
Converts images to markdown via extraction of metadata (if `exiftool` is installed), and description via a multimodal LLM (if an llm_client is configured).
|
||||||
"""
|
"""
|
||||||
|
|
@ -15,16 +24,36 @@ class ImageConverter(MediaConverter):
|
||||||
):
|
):
|
||||||
super().__init__(priority=priority)
|
super().__init__(priority=priority)
|
||||||
|
|
||||||
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
|
def accepts(
|
||||||
# Bail if not an image
|
self,
|
||||||
extension = kwargs.get("file_extension", "")
|
file_stream: BinaryIO,
|
||||||
if extension.lower() not in [".jpg", ".jpeg", ".png"]:
|
stream_info: StreamInfo,
|
||||||
return None
|
**kwargs: Any,
|
||||||
|
) -> bool:
|
||||||
|
mimetype = (stream_info.mimetype or "").lower()
|
||||||
|
extension = (stream_info.extension or "").lower()
|
||||||
|
|
||||||
|
if extension in ACCEPTED_FILE_EXTENSIONS:
|
||||||
|
return True
|
||||||
|
|
||||||
|
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
|
||||||
|
if mimetype.startswith(prefix):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def convert(
|
||||||
|
self,
|
||||||
|
file_stream: BinaryIO,
|
||||||
|
stream_info: StreamInfo,
|
||||||
|
**kwargs: Any, # Options to pass to the converter
|
||||||
|
) -> DocumentConverterResult:
|
||||||
md_content = ""
|
md_content = ""
|
||||||
|
|
||||||
# Add metadata
|
# Add metadata
|
||||||
metadata = self._get_metadata(local_path, kwargs.get("exiftool_path"))
|
metadata = exiftool_metadata(
|
||||||
|
file_stream, exiftool_path=kwargs.get("exiftool_path")
|
||||||
|
)
|
||||||
|
|
||||||
if metadata:
|
if metadata:
|
||||||
for f in [
|
for f in [
|
||||||
|
|
@ -42,52 +71,53 @@ class ImageConverter(MediaConverter):
|
||||||
if f in metadata:
|
if f in metadata:
|
||||||
md_content += f"{f}: {metadata[f]}\n"
|
md_content += f"{f}: {metadata[f]}\n"
|
||||||
|
|
||||||
# Try describing the image with GPTV
|
# # Try describing the image with GPTV
|
||||||
llm_client = kwargs.get("llm_client")
|
# llm_client = kwargs.get("llm_client")
|
||||||
llm_model = kwargs.get("llm_model")
|
# llm_model = kwargs.get("llm_model")
|
||||||
if llm_client is not None and llm_model is not None:
|
# if llm_client is not None and llm_model is not None:
|
||||||
md_content += (
|
# md_content += (
|
||||||
"\n# Description:\n"
|
# "\n# Description:\n"
|
||||||
+ self._get_llm_description(
|
# + self._get_llm_description(
|
||||||
local_path,
|
# local_path,
|
||||||
extension,
|
# extension,
|
||||||
llm_client,
|
# llm_client,
|
||||||
llm_model,
|
# llm_model,
|
||||||
prompt=kwargs.get("llm_prompt"),
|
# prompt=kwargs.get("llm_prompt"),
|
||||||
).strip()
|
# ).strip()
|
||||||
+ "\n"
|
# + "\n"
|
||||||
)
|
# )
|
||||||
|
|
||||||
return DocumentConverterResult(
|
return DocumentConverterResult(
|
||||||
markdown=md_content,
|
markdown=md_content,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _get_llm_description(self, local_path, extension, client, model, prompt=None):
|
|
||||||
if prompt is None or prompt.strip() == "":
|
|
||||||
prompt = "Write a detailed caption for this image."
|
|
||||||
|
|
||||||
data_uri = ""
|
# def _get_llm_description(self, local_path, extension, client, model, prompt=None):
|
||||||
with open(local_path, "rb") as image_file:
|
# if prompt is None or prompt.strip() == "":
|
||||||
content_type, encoding = mimetypes.guess_type("_dummy" + extension)
|
# prompt = "Write a detailed caption for this image."
|
||||||
if content_type is None:
|
#
|
||||||
content_type = "image/jpeg"
|
# data_uri = ""
|
||||||
image_base64 = base64.b64encode(image_file.read()).decode("utf-8")
|
# with open(local_path, "rb") as image_file:
|
||||||
data_uri = f"data:{content_type};base64,{image_base64}"
|
# content_type, encoding = mimetypes.guess_type("_dummy" + extension)
|
||||||
|
# if content_type is None:
|
||||||
messages = [
|
# content_type = "image/jpeg"
|
||||||
{
|
# image_base64 = base64.b64encode(image_file.read()).decode("utf-8")
|
||||||
"role": "user",
|
# data_uri = f"data:{content_type};base64,{image_base64}"
|
||||||
"content": [
|
#
|
||||||
{"type": "text", "text": prompt},
|
# messages = [
|
||||||
{
|
# {
|
||||||
"type": "image_url",
|
# "role": "user",
|
||||||
"image_url": {
|
# "content": [
|
||||||
"url": data_uri,
|
# {"type": "text", "text": prompt},
|
||||||
},
|
# {
|
||||||
},
|
# "type": "image_url",
|
||||||
],
|
# "image_url": {
|
||||||
}
|
# "url": data_uri,
|
||||||
]
|
# },
|
||||||
|
# },
|
||||||
response = client.chat.completions.create(model=model, messages=messages)
|
# ],
|
||||||
return response.choices[0].message.content
|
# }
|
||||||
|
# ]
|
||||||
|
#
|
||||||
|
# response = client.chat.completions.create(model=model, messages=messages)
|
||||||
|
# return response.choices[0].message.content
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue