fix test
This commit is contained in:
parent
ce9cfff3bf
commit
72e89cb368
4 changed files with 78 additions and 120 deletions
|
|
@ -104,6 +104,7 @@ class DocumentConverterResult:
|
||||||
"text": text_chunk
|
"text": text_chunk
|
||||||
})
|
})
|
||||||
if self.audio_stream:
|
if self.audio_stream:
|
||||||
|
print('hello')
|
||||||
audio_b64 = base64.b64encode(
|
audio_b64 = base64.b64encode(
|
||||||
self.audio_stream.read()).decode('utf-8')
|
self.audio_stream.read()).decode('utf-8')
|
||||||
content.append({
|
content.append({
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@ from pathlib import Path
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
from warnings import warn
|
from warnings import warn
|
||||||
import magic
|
import magic
|
||||||
|
import mimetypes
|
||||||
from ._schemas import StreamInfo, Config
|
from ._schemas import StreamInfo, Config
|
||||||
|
|
||||||
from .converters import (
|
from .converters import (
|
||||||
|
|
@ -38,13 +38,13 @@ class MarkItUp:
|
||||||
):
|
):
|
||||||
self.config = config
|
self.config = config
|
||||||
|
|
||||||
def convert(self, stream: BinaryIO) -> Dict[DocumentConverterResult, StreamInfo]:
|
def convert(self, stream: BinaryIO, file_name: str) -> Dict[DocumentConverterResult, StreamInfo]:
|
||||||
stream_info: StreamInfo = self._get_stream_info(stream)
|
stream_info: StreamInfo = self._get_stream_info(stream, file_name)
|
||||||
# Deal with unsupported file types
|
# Deal with unsupported file types
|
||||||
try:
|
try:
|
||||||
match stream_info.category:
|
match stream_info.category:
|
||||||
case "text":
|
case "text":
|
||||||
return PlainTextConverter().convert(stream, stream_info), stream_info
|
return PlainTextConverter(config=self.config).convert(stream, stream_info), stream_info
|
||||||
case "pptx":
|
case "pptx":
|
||||||
return PptxConverter(config=self.config).convert(stream, stream_info), stream_info
|
return PptxConverter(config=self.config).convert(stream, stream_info), stream_info
|
||||||
case "pdf":
|
case "pdf":
|
||||||
|
|
@ -78,10 +78,7 @@ class MarkItUp:
|
||||||
raise FileConversionException(
|
raise FileConversionException(
|
||||||
f"Failed to convert file of type {stream_info.magic_type}")
|
f"Failed to convert file of type {stream_info.magic_type}")
|
||||||
|
|
||||||
def _get_stream_info(self, byte_stream: BinaryIO) -> StreamInfo:
|
def _get_stream_info(self, byte_stream: BinaryIO, filename: str) -> StreamInfo:
|
||||||
original_position = byte_stream.tell()
|
|
||||||
|
|
||||||
# Reset stream position to beginning
|
|
||||||
byte_stream.seek(0)
|
byte_stream.seek(0)
|
||||||
|
|
||||||
# Get file content for analysis
|
# Get file content for analysis
|
||||||
|
|
@ -89,6 +86,10 @@ class MarkItUp:
|
||||||
|
|
||||||
# Use python-magic to determine file type based on content
|
# Use python-magic to determine file type based on content
|
||||||
magic_type = magic.from_buffer(file_content, mime=True)
|
magic_type = magic.from_buffer(file_content, mime=True)
|
||||||
|
if magic_type == "application/octet-stream":
|
||||||
|
guessed_type, _ = mimetypes.guess_type(filename)
|
||||||
|
if guessed_type:
|
||||||
|
magic_type = guessed_type
|
||||||
|
|
||||||
# Determine file category based on magic_type
|
# Determine file category based on magic_type
|
||||||
if magic_type.startswith("image/"):
|
if magic_type.startswith("image/"):
|
||||||
|
|
@ -96,7 +97,7 @@ class MarkItUp:
|
||||||
category = "image"
|
category = "image"
|
||||||
else:
|
else:
|
||||||
category = "other"
|
category = "other"
|
||||||
elif magic_type.startswith("audio/"):
|
elif magic_type ==("audio/mpeg"):
|
||||||
category = "audio"
|
category = "audio"
|
||||||
elif magic_type.startswith("video/"):
|
elif magic_type.startswith("video/"):
|
||||||
category = "video"
|
category = "video"
|
||||||
|
|
@ -126,5 +127,5 @@ class MarkItUp:
|
||||||
else:
|
else:
|
||||||
category = "other"
|
category = "other"
|
||||||
|
|
||||||
byte_stream.seek(original_position)
|
byte_stream.seek(0)
|
||||||
return StreamInfo(magic_type=magic_type, category=category)
|
return StreamInfo(magic_type=magic_type, category=category)
|
||||||
|
|
|
||||||
|
|
@ -1,17 +1,16 @@
|
||||||
from typing import BinaryIO, Any
|
from typing import BinaryIO, Any
|
||||||
from charset_normalizer import from_bytes
|
from charset_normalizer import from_bytes
|
||||||
from .._base_converter import DocumentConverter, DocumentConverterResult
|
from .._base_converter import DocumentConverter, DocumentConverterResult
|
||||||
from .._schemas import StreamInfo
|
from .._schemas import StreamInfo, Config
|
||||||
|
|
||||||
|
|
||||||
class PlainTextConverter(DocumentConverter):
|
class PlainTextConverter(DocumentConverter):
|
||||||
"""Anything with content type text/plain"""
|
"""Anything with content type text/plain"""
|
||||||
|
def __init__(self, config: Config):
|
||||||
def convert(
|
self.config = config
|
||||||
self,
|
|
||||||
file_stream: BinaryIO,
|
def convert(self, file_stream: BinaryIO, stream_info: StreamInfo, **kwargs: Any) -> DocumentConverterResult:
|
||||||
stream_info: StreamInfo,
|
content = file_stream.read()
|
||||||
**kwargs: Any, # Options to pass to the converter
|
text_content = str(from_bytes(content).best())
|
||||||
) -> DocumentConverterResult:
|
|
||||||
text_content = str(from_bytes(file_stream.read()).best())
|
|
||||||
return DocumentConverterResult(markdown=text_content)
|
return DocumentConverterResult(markdown=text_content)
|
||||||
|
|
|
||||||
|
|
@ -2,153 +2,124 @@ import os
|
||||||
import unittest
|
import unittest
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from markitup import MarkItUp, Config
|
from markitup import MarkItUp, Config
|
||||||
|
from markitup.converter_utils.utils import read_files_to_bytestreams
|
||||||
|
|
||||||
|
fs = read_files_to_bytestreams('packages/markitup/tests/test_files')
|
||||||
|
|
||||||
|
|
||||||
class TestMarkItUp(unittest.TestCase):
|
class TestMarkItUp(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
# Get the absolute path to the test_files directory
|
print("Setting up test environment")
|
||||||
self.test_files_dir = os.path.join(
|
print(fs)
|
||||||
os.path.dirname(os.path.abspath(__file__)), "test_files"
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_plain_text_conversion(self):
|
def test_plain_text_conversion(self):
|
||||||
"""Test converting a plain text file to markdown."""
|
"""Test converting a plain text file to markdown."""
|
||||||
filepath = os.path.join(self.test_files_dir, "test.txt")
|
markitup = MarkItUp()
|
||||||
|
# fs['test.txt'].seek(0)
|
||||||
with open(filepath, "rb") as f:
|
result, info = markitup.convert(fs['test.txt'], 'test.txt')
|
||||||
markitup = MarkItUp()
|
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEqual(info.category, "text")
|
self.assertEqual(info.category, "text")
|
||||||
self.assertTrue(result.markdown, "Content should not be empty")
|
self.assertTrue(result.to_llm(), "Content should not be empty")
|
||||||
|
|
||||||
def test_docx_conversion(self):
|
def test_docx_conversion(self):
|
||||||
"""Test converting a DOCX file to markdown."""
|
"""Test converting a DOCX file to markdown."""
|
||||||
filepath = os.path.join(self.test_files_dir, "test.docx")
|
markitup = MarkItUp()
|
||||||
|
result, info = markitup.convert(fs['test.docx'], 'test.docx')
|
||||||
with open(filepath, "rb") as f:
|
|
||||||
markitup = MarkItUp()
|
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEqual(info.category, "docx")
|
self.assertEqual(info.category, "docx")
|
||||||
self.assertTrue(result.markdown, "Content should not be empty")
|
self.assertTrue(result.to_llm(), "Content should not be empty")
|
||||||
|
|
||||||
def test_docx_with_comments_conversion(self):
|
def test_docx_with_comments_conversion(self):
|
||||||
"""Test converting a DOCX file with comments to markdown."""
|
"""Test converting a DOCX file with comments to markdown."""
|
||||||
filepath = os.path.join(self.test_files_dir, "test_with_comment.docx")
|
markitup = MarkItUp()
|
||||||
|
result, info = markitup.convert(fs['test_with_comment.docx'], 'test_with_comment.docx')
|
||||||
with open(filepath, "rb") as f:
|
|
||||||
markitup = MarkItUp()
|
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEqual(info.category, "docx")
|
self.assertEqual(info.category, "docx")
|
||||||
self.assertTrue(result.markdown, "Content should not be empty")
|
self.assertTrue(result.to_llm(), "Content should not be empty")
|
||||||
|
|
||||||
def test_pdf_conversion(self):
|
def test_pdf_conversion(self):
|
||||||
"""Test converting a PDF file to markdown."""
|
"""Test converting a PDF file to markdown."""
|
||||||
filepath = os.path.join(self.test_files_dir, "test.pdf")
|
markitup = MarkItUp()
|
||||||
|
result, info = markitup.convert(fs['test.pdf'], 'test.pdf')
|
||||||
with open(filepath, "rb") as f:
|
|
||||||
markitup = MarkItUp()
|
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEqual(info.category, "pdf")
|
self.assertEqual(info.category, "pdf")
|
||||||
self.assertTrue(result.markdown, "Content should not be empty")
|
self.assertTrue(result.to_llm(), "Content should not be empty")
|
||||||
|
|
||||||
def test_html_conversion(self):
|
def test_html_conversion(self):
|
||||||
"""Test converting HTML files to markdown."""
|
"""Test converting HTML files to markdown."""
|
||||||
html_files = ["test_blog.html", "test_wikipedia.html", "test_serp.html"]
|
html_files = ["test_blog.html", "test_wikipedia.html", "test_serp.html"]
|
||||||
|
|
||||||
for html_file in html_files:
|
for html_file in html_files:
|
||||||
filepath = os.path.join(self.test_files_dir, html_file)
|
|
||||||
with self.subTest(file=html_file):
|
with self.subTest(file=html_file):
|
||||||
with open(filepath, "rb") as f:
|
markitup = MarkItUp()
|
||||||
markitup = MarkItUp()
|
result, info = markitup.convert(fs[html_file], html_file)
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEqual(info.category, "text")
|
self.assertEqual(info.category, "html")
|
||||||
self.assertTrue(result.markdown, "Content should not be empty")
|
self.assertTrue(result.to_llm(), "Content should not be empty")
|
||||||
|
|
||||||
def test_xlsx_conversion(self):
|
def test_xlsx_conversion(self):
|
||||||
"""Test converting an XLSX file to markdown."""
|
"""Test converting an XLSX file to markdown."""
|
||||||
filepath = os.path.join(self.test_files_dir, "test.xlsx")
|
markitup = MarkItUp()
|
||||||
|
result, info = markitup.convert(fs['test.xlsx'], 'test.xlsx')
|
||||||
with open(filepath, "rb") as f:
|
|
||||||
markitup = MarkItUp()
|
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEqual(info.category, "xlsx")
|
self.assertEqual(info.category, "xlsx")
|
||||||
self.assertTrue(result.markdown, "Content should not be empty")
|
self.assertTrue(result.to_llm(), "Content should not be empty")
|
||||||
|
|
||||||
def test_xls_conversion(self):
|
def test_xls_conversion(self):
|
||||||
"""Test converting an XLS file to markdown."""
|
"""Test converting an XLS file to markdown."""
|
||||||
filepath = os.path.join(self.test_files_dir, "test.xls")
|
markitup = MarkItUp()
|
||||||
|
result, info = markitup.convert(fs['test.xls'], 'test.xls')
|
||||||
with open(filepath, "rb") as f:
|
|
||||||
markitup = MarkItUp()
|
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEqual(info.category, "xls")
|
self.assertEqual(info.category, "xls")
|
||||||
self.assertTrue(result.markdown, "Content should not be empty")
|
self.assertTrue(result.to_llm(), "Content should not be empty")
|
||||||
|
|
||||||
def test_csv_conversion(self):
|
def test_csv_conversion(self):
|
||||||
"""Test converting CSV files to markdown."""
|
"""Test converting CSV files to markdown."""
|
||||||
csv_files = ["test.csv", "test_mskanji.csv"]
|
csv_files = ["test.csv", "test_mskanji.csv"]
|
||||||
|
|
||||||
for csv_file in csv_files:
|
for csv_file in csv_files:
|
||||||
filepath = os.path.join(self.test_files_dir, csv_file)
|
|
||||||
with self.subTest(file=csv_file):
|
with self.subTest(file=csv_file):
|
||||||
with open(filepath, "rb") as f:
|
markitup = MarkItUp()
|
||||||
markitup = MarkItUp()
|
result, info = markitup.convert(fs[csv_file], csv_file)
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEqual(info.category, "csv")
|
self.assertEqual(info.category, "csv")
|
||||||
self.assertTrue(result.markdown, "Content should not be empty")
|
self.assertTrue(result.to_llm(), "Content should not be empty")
|
||||||
|
|
||||||
def test_pptx_conversion(self):
|
def test_pptx_conversion(self):
|
||||||
"""Test converting a PPTX file to markdown."""
|
"""Test converting a PPTX file to markdown."""
|
||||||
filepath = os.path.join(self.test_files_dir, "test.pptx")
|
markitup = MarkItUp()
|
||||||
|
result, info = markitup.convert(fs['test.pptx'], 'test.pptx')
|
||||||
with open(filepath, "rb") as f:
|
|
||||||
markitup = MarkItUp()
|
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEqual(info.category, "pptx")
|
self.assertEqual(info.category, "pptx")
|
||||||
self.assertTrue(result.markdown, "Content should not be empty")
|
self.assertTrue(result.to_llm(), "Content should not be empty")
|
||||||
|
|
||||||
def test_audio_conversion(self):
|
def test_audio_conversion(self):
|
||||||
"""Test converting audio files to markdown."""
|
"""Test converting audio files to markdown."""
|
||||||
audio_files = ["test.mp3", "test.m4a"]
|
audio_files = ["test.mp3"]
|
||||||
|
|
||||||
for audio_file in audio_files:
|
for audio_file in audio_files:
|
||||||
filepath = os.path.join(self.test_files_dir, audio_file)
|
|
||||||
with self.subTest(file=audio_file):
|
with self.subTest(file=audio_file):
|
||||||
with open(filepath, "rb") as f:
|
markitup = MarkItUp(config=Config(modalities=["audio"]))
|
||||||
markitup = MarkItUp()
|
result, info = markitup.convert(fs[audio_file], audio_file)
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEqual(info.category, "audio")
|
self.assertEqual(info.category, "audio")
|
||||||
self.assertTrue(result.markdown, "Content should not be empty")
|
self.assertTrue(result.to_llm(), "Content should not be empty")
|
||||||
|
|
||||||
def test_image_in_config(self):
|
def test_image_in_config(self):
|
||||||
"""Test with only image in modalities config."""
|
"""Test with only image in modalities config."""
|
||||||
filepath = os.path.join(self.test_files_dir, "test.pdf")
|
# Configure with only image modality
|
||||||
|
config = Config(modalities=["image"])
|
||||||
with open(filepath, "rb") as f:
|
markitup = MarkItUp(config=config)
|
||||||
# Configure with only image modality
|
result, info = markitup.convert(fs['test.pdf'], 'test.pdf')
|
||||||
config = Config(modalities=["image"])
|
|
||||||
markitup = MarkItUp(config=config)
|
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEqual(info.category, "pdf")
|
self.assertEqual(info.category, "pdf")
|
||||||
|
|
@ -156,13 +127,10 @@ class TestMarkItUp(unittest.TestCase):
|
||||||
|
|
||||||
def test_audio_in_config(self):
|
def test_audio_in_config(self):
|
||||||
"""Test with only audio in modalities config."""
|
"""Test with only audio in modalities config."""
|
||||||
filepath = os.path.join(self.test_files_dir, "test.docx")
|
# Configure with only audio modality
|
||||||
|
config = Config(modalities=["audio"])
|
||||||
with open(filepath, "rb") as f:
|
markitup = MarkItUp(config=config)
|
||||||
# Configure with only audio modality
|
result, info = markitup.convert(fs['test.docx'], 'test.docx')
|
||||||
config = Config(modalities=["audio"])
|
|
||||||
markitup = MarkItUp(config=config)
|
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEqual(info.category, "docx")
|
self.assertEqual(info.category, "docx")
|
||||||
|
|
@ -170,13 +138,10 @@ class TestMarkItUp(unittest.TestCase):
|
||||||
|
|
||||||
def test_no_modalities_config(self):
|
def test_no_modalities_config(self):
|
||||||
"""Test with empty modalities config."""
|
"""Test with empty modalities config."""
|
||||||
filepath = os.path.join(self.test_files_dir, "test_with_comment.docx")
|
# Configure with no modalities
|
||||||
|
config = Config(modalities=[])
|
||||||
with open(filepath, "rb") as f:
|
markitup = MarkItUp(config=config)
|
||||||
# Configure with no modalities
|
result, info = markitup.convert(fs['test_with_comment.docx'], 'test_with_comment.docx')
|
||||||
config = Config(modalities=[])
|
|
||||||
markitup = MarkItUp(config=config)
|
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEqual(info.category, "docx")
|
self.assertEqual(info.category, "docx")
|
||||||
|
|
@ -184,13 +149,10 @@ class TestMarkItUp(unittest.TestCase):
|
||||||
|
|
||||||
def test_unsupported_format(self):
|
def test_unsupported_format(self):
|
||||||
"""Test handling of an unsupported file format."""
|
"""Test handling of an unsupported file format."""
|
||||||
filepath = os.path.join(self.test_files_dir, "random.bin")
|
markitup = MarkItUp()
|
||||||
|
with self.assertRaises(Exception):
|
||||||
with open(filepath, "rb") as f:
|
# Should raise an exception for unsupported format
|
||||||
markitup = MarkItUp()
|
markitup.convert(fs['random.bin'], 'random.bin')
|
||||||
with self.assertRaises(Exception):
|
|
||||||
# Should raise an exception for unsupported format
|
|
||||||
markitup.convert(f)
|
|
||||||
|
|
||||||
def test_multiple_files_same_config(self):
|
def test_multiple_files_same_config(self):
|
||||||
"""Test converting multiple files with the same configuration."""
|
"""Test converting multiple files with the same configuration."""
|
||||||
|
|
@ -206,22 +168,17 @@ class TestMarkItUp(unittest.TestCase):
|
||||||
markitup = MarkItUp(config=config)
|
markitup = MarkItUp(config=config)
|
||||||
|
|
||||||
for filename, expected_category in test_files.items():
|
for filename, expected_category in test_files.items():
|
||||||
filepath = os.path.join(self.test_files_dir, filename)
|
|
||||||
with self.subTest(file=filename):
|
with self.subTest(file=filename):
|
||||||
with open(filepath, "rb") as f:
|
result, info = markitup.convert(fs[filename], filename)
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
self.assertEqual(info.category, expected_category)
|
self.assertEqual(info.category, expected_category)
|
||||||
self.assertTrue(result.markdown, "Content should not be empty")
|
self.assertTrue(result.to_llm(), "Content should not be empty")
|
||||||
|
|
||||||
def test_to_llm_method(self):
|
def test_to_llm_method(self):
|
||||||
"""Test the to_llm method of the conversion result."""
|
"""Test the to_llm method of the conversion result."""
|
||||||
filepath = os.path.join(self.test_files_dir, "test.docx")
|
markitup = MarkItUp()
|
||||||
|
result, info = markitup.convert(fs['test.docx'], 'test.docx')
|
||||||
with open(filepath, "rb") as f:
|
|
||||||
markitup = MarkItUp()
|
|
||||||
result, info = markitup.convert(f)
|
|
||||||
|
|
||||||
# Call the to_llm method and check the result
|
# Call the to_llm method and check the result
|
||||||
llm_format = result.to_llm()
|
llm_format = result.to_llm()
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue