数据处理¶
概述¶
数据预处理是构建基于检索增强生成系统(RAG)中的关键步骤,主要包含三个核心子功能:
Loader: 从不同来源加载数据Splitter: 将加载的数据分割为多个小片段Embedding: 将文本转换为向量表示
Loader¶
Loader是数据处理流程的第一步,它负责从不同的数据源加载数据。
支持任意实现了AbstractFileSystem的数据源,包括:
LocalFileSystemS3FileSystemHTTPFileSystemWebHDFSDatabricksFileSystem
TongAgents SDK 已经支持的文件格式有:
csvdocxxlsxlsxmdpdftxthtml
快速开始¶
从本地文件系统加载
from pathlib import Path
from tongagents.tools.loader.file.pdf_loader import PDFLoader
loader = PDFLoader()
file_path = Path('/path/to/doc.pdf')
documents = loader.lazy_load(file_path)
从S3文件系统加载
from pathlib import Path
from s3fs import S3FileSystem
from tongagents.tools.loader.file.pdf_loader import PDFLoader
file_system = S3FileSystem(
endpoint_url="http://endpoint-url",
secret="s3-secret",
key="s3-key"
)
file_path = "dir/xxx.pdf"
loader = PDFLoader()
documents = loader.lazy_load(Path(file_path), fs=file_system)
从网页加载
from fsspec.implementations.http import HTTPFileSystem
from tongagents.tools.loader.file.html_loader import HtmlLoader
url = "https://www.bigai.ai"
# 当你的链接需要携带一些请求头信息时,可以通过HTTPFileSystem的client_kwargs参数传入
# 例如,这里添加了user-agent和referer
# 如果不需要外信息,可以直接使用 HTTPFileSystem() 初始化
http_fs = HTTPFileSystem(
client_kwargs={
"headers": {
"user-agent": "Mozilla/5.0",
"referer": "https://www.bigai.ai/research/",
},
"cookies": None,
"auth": None,
}
)
loader = HtmlLoader()
documents = loader.load(file=url, fs=http_fs)
同时,也提供了更加方便的DirectoryLoader,可以遍历目录下的文件,并根据文件的后缀名自动选择合适的Loader进行加载。
from tongagents.tools.loader.directory_loader import DirectoryLoader
## 加载指定文件
pdf_files = [
"/path/to/doc1.pdf",
"/path/to/doc2.pdf",
]
documents = DirectoryLoader(input_files=pdf_files).load()
## 加载指定目录
dir_path = "/path/to/docs"
documents = DirectoryLoader(input_dir=dir_path).load()
自定义¶
BaseLoader作为文档加载的抽象类,提供了lazy_load、alazy_load、load、aload四个方法,分别用于懒加载、异步懒加载、加载、异步加载文档。
from abc import ABC, abstractmethod
from collections.abc import Iterator
from tongagents.knowledge.core import Document
class BaseLoader(ABC):
"""
Abstract base class for data loaders.
Provides methods for lazy loading and asynchronous loading of documents.
"""
@abstractmethod
def lazy_load(
self,
*args: any,
**kwargs: any,
) -> Iterator[Document]:
raise NotImplementedError(
f"{self.__class__.__name__} does not provide lazy_load method currently"
)
async def alazy_load(
self,
*args: any,
**kwargs: any,
) -> Iterator[Document]:
return self.lazy_load(*args, **kwargs)
def load(
self,
*args: any,
**kwargs: any,
) -> list[Document]:
return list(self.lazy_load(*args, **kwargs))
async def aload(
self,
*args: any,
**kwargs: any,
) -> list[Document]:
return list(self.lazy_load(*args, **kwargs))
开发者可以通过继承BaseLoader并实现lazy_load方法来自定义Loader。
以下是一个自定义的Loader的伪代码,用于加载json文件。
from collections.abc import Iterator
import json
from pathlib import Path
from fsspec import AbstractFileSystem
from tongagents.knowledge.core import Document
from tongagents.tools.loader.base import BaseLoader
class JsonLoader(BaseLoader):
def lazy_load(
self,
file: Path,
fs: AbstractFileSystem | None = None,
extra_meta: dict | None = None
) -> Iterator[Document]:
file_path = file.as_posix()
metadata = {"file_name": "name", "file_path": file_path}
with fs.open(file_path, "r") as f:
data = json.load(f)
yield Document(content=data, metadata=metadata)
json_loader = JsonLoader()
documents = json_loader.load("path/to/file.json")
在实现了Loader之后,也可以通过DirectoryLoader来指定对应的Loader加载目录下的文件。
from tongagents.tools.loader.directory_loader import DirectoryLoader
json_loader = JsonLoader()
directory_loader = DirectoryLoader(
input_dir="/path/to/dir",
file_loader={
".json": json_loader
}
)
documents = directory_loader.load()
Splitter¶
splitter是数据处理流程的第二步,它负责将加载的数据分割为多个小片段。
快速开始¶
切分文本
splitter = LangchainSplitterWrapper(
splitter=RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=separators,
)
)
documents = splitter.split_document(documents)
自定义¶
BaseTextSplitter作为文档切分的抽象类,提供了split_text和split_document两个方法,分别用于切分文本和文档。
import copy
from abc import ABC, abstractmethod
from collections.abc import Iterable
from tongagents.knowledge.core import Document
class BaseTextSplitter(ABC):
@abstractmethod
def split_text(self, text: str) -> list[str]:
"""Split text into multiple components."""
def split_document(self, documents: Iterable[Document]) -> list[Document]:
"""Split documents into multiple components."""
new_documents = []
for doc in documents:
if isinstance(doc.content, str):
metadata = copy.deepcopy(doc.metadata)
for chunk in self.split_text(doc.content):
new_doc = Document(content=chunk, metadata=metadata)
new_documents.append(new_doc)
else:
new_documents.append(doc)
return new_documents
开发者可以通过继承BaseTextSplitter并实现split_text方法来自定义Splitter。
以下是一个自定义的Splitter的伪代码,用于将文本按照句号切分。
from tongagents.tools.splitter.base import BaseTextSplitter
class SentenceSplitter(BaseTextSplitter):
def split_text(self, text: str) -> list[str]:
return text.split("。")
Embedding¶
嵌入过程将文本转换为向量表示,使其能够在向量空间中进行相似性比较。
TongAgents SDK 支持使用SentenceTransformer加载模型进行文本嵌入。也支持调用兼容OpenAI API格式的embedding http服务进行文本嵌入。
快速开始¶
## 基于SentenceTransformer
import os
from tongagents.knowledge.embedding.openai_embedding import OpenAIEmbedding
from tongagents.knowledge.embedding.sentence_transformer_embedding import SentenceTransformerEmbedding
embedding = SentenceTransformerEmbedding(
model_name="BAAI/bge-large-zh-v1.5",
device="cpu",
dimensions=1024,
)
embedding.embed_texts(["text1", "text2"])
## 基于兼容OpenAI API格式的服务
os.environ["OPENAI_API_KEY"] = "skn-xxx"
os.environ["OPENAI_BASE_URL"] = "https://your-openai-api-url"
os.environ["OPENAI_API_VERSION"] = "api-version"
embedding = OpenAIEmbedding()
embedding.embed_texts(["text1", "text2"])
自定义¶
BaseEmbedding作为文本嵌入的抽象类,提供了embed_text、embed_texts、embed_documents三个方法,分别用于嵌入单个文本、多个文本和多个文档。
from abc import ABC, abstractmethod
import numpy as np
from pydantic import Field
from tongagents.knowledge.core import Document
class BaseEmbedding(ABC):
dimensions: int | None = Field(
default=0,
description="The number of dimensions on the output embedding vectors.",
)
def embed_text(self, text: str) -> np.array:
"""Embed text.
Args:
text: Text to embed.
Returns:
Embedding.
"""
return self.embed_texts([text])[0]
@abstractmethod
def embed_texts(self, texts: list[str]) -> list[np.array]:
"""Embed list of text.
Args:
texts: List of text to embed.
Returns:
List of embeddings.
"""
def embed_documents(
self, documents: list[Document], max_batch_size: int | None
) -> list[np.ndarray]:
"""Embed list of documents.
Args:
documents: List of documents to embed.
max_batch_size: If None, all documents are embedded at once.
Returns:
List of embeddings.
"""
if max_batch_size is None:
max_batch_size = len(documents)
embeddings = []
for i in range(0, len(documents), max_batch_size):
batch = documents[i : i + max_batch_size]
embeddings.extend(self.embed_texts([doc.content for doc in batch]))
return embeddings
def __call__(self, texts: list[str]) -> list[np.array]:
return self.embed_texts(texts)
开发者可以通过继承BaseEmbedding并实现embed_texts方法来自定义Embedding。以下是一个自定义的Embedding的伪代码,用于将文本转换为随机向量。