跳转至

数据处理

概述

数据预处理是构建基于检索增强生成系统(RAG)中的关键步骤,主要包含三个核心子功能:

  • Loader: 从不同来源加载数据
  • Splitter: 将加载的数据分割为多个小片段
  • Embedding: 将文本转换为向量表示

Loader

Loader是数据处理流程的第一步,它负责从不同的数据源加载数据。

支持任意实现了AbstractFileSystem的数据源,包括:

  • LocalFileSystem
  • S3FileSystem
  • HTTPFileSystem
  • WebHDFS
  • DatabricksFileSystem

TongAgents SDK 已经支持的文件格式有:

  • csv
  • docx
  • xls
  • xlsx
  • md
  • pdf
  • txt
  • html

快速开始

从本地文件系统加载

from pathlib import Path
from tongagents.tools.loader.file.pdf_loader import PDFLoader

loader = PDFLoader()
file_path = Path('/path/to/doc.pdf')
documents = loader.lazy_load(file_path)

从S3文件系统加载

from pathlib import Path
from s3fs import S3FileSystem
from tongagents.tools.loader.file.pdf_loader import PDFLoader

file_system = S3FileSystem(
    endpoint_url="http://endpoint-url",
    secret="s3-secret",
    key="s3-key"
)
file_path = "dir/xxx.pdf"
loader = PDFLoader()
documents = loader.lazy_load(Path(file_path), fs=file_system)

从网页加载

from fsspec.implementations.http import HTTPFileSystem
from tongagents.tools.loader.file.html_loader import HtmlLoader

url = "https://www.bigai.ai"
# 当你的链接需要携带一些请求头信息时,可以通过HTTPFileSystem的client_kwargs参数传入
# 例如,这里添加了user-agent和referer
# 如果不需要外信息,可以直接使用 HTTPFileSystem() 初始化
http_fs = HTTPFileSystem(
    client_kwargs={
        "headers": {
            "user-agent": "Mozilla/5.0",
            "referer": "https://www.bigai.ai/research/",
        },
        "cookies": None,
        "auth": None,
    }
)
loader = HtmlLoader()
documents = loader.load(file=url, fs=http_fs)

同时,也提供了更加方便的DirectoryLoader,可以遍历目录下的文件,并根据文件的后缀名自动选择合适的Loader进行加载。

from tongagents.tools.loader.directory_loader import DirectoryLoader

## 加载指定文件
pdf_files = [
    "/path/to/doc1.pdf",
    "/path/to/doc2.pdf",
]
documents = DirectoryLoader(input_files=pdf_files).load()

## 加载指定目录
dir_path = "/path/to/docs"
documents = DirectoryLoader(input_dir=dir_path).load()

自定义

BaseLoader作为文档加载的抽象类,提供了lazy_loadalazy_loadloadaload四个方法,分别用于懒加载、异步懒加载、加载、异步加载文档。

from abc import ABC, abstractmethod
from collections.abc import Iterator

from tongagents.knowledge.core import Document


class BaseLoader(ABC):
    """
    Abstract base class for data loaders.
    Provides methods for lazy loading and asynchronous loading of documents.
    """

    @abstractmethod
    def lazy_load(
        self,
        *args: any,
        **kwargs: any,
    ) -> Iterator[Document]:
        raise NotImplementedError(
            f"{self.__class__.__name__} does not provide lazy_load method currently"
        )

    async def alazy_load(
        self,
        *args: any,
        **kwargs: any,
    ) -> Iterator[Document]:
        return self.lazy_load(*args, **kwargs)

    def load(
        self,
        *args: any,
        **kwargs: any,
    ) -> list[Document]:
        return list(self.lazy_load(*args, **kwargs))

    async def aload(
        self,
        *args: any,
        **kwargs: any,
    ) -> list[Document]:
        return list(self.lazy_load(*args, **kwargs))

开发者可以通过继承BaseLoader并实现lazy_load方法来自定义Loader。 以下是一个自定义的Loader的伪代码,用于加载json文件。

from collections.abc import Iterator
import json
from pathlib import Path
from fsspec import AbstractFileSystem
from tongagents.knowledge.core import Document
from tongagents.tools.loader.base import BaseLoader

class JsonLoader(BaseLoader):
    def lazy_load(
            self,
            file: Path,
            fs: AbstractFileSystem | None = None,
            extra_meta: dict | None = None
    ) -> Iterator[Document]:
        file_path = file.as_posix()
        metadata = {"file_name": "name", "file_path": file_path}
        with fs.open(file_path, "r") as f:
            data = json.load(f)
            yield Document(content=data, metadata=metadata)


json_loader = JsonLoader()
documents = json_loader.load("path/to/file.json")

在实现了Loader之后,也可以通过DirectoryLoader来指定对应的Loader加载目录下的文件。

from tongagents.tools.loader.directory_loader import DirectoryLoader

json_loader = JsonLoader()
directory_loader = DirectoryLoader(
    input_dir="/path/to/dir",
    file_loader={
        ".json": json_loader
    }
)
documents = directory_loader.load()

Splitter

splitter是数据处理流程的第二步,它负责将加载的数据分割为多个小片段。

快速开始

切分文本

splitter = LangchainSplitterWrapper(
    splitter=RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=separators,
    )
)
documents = splitter.split_document(documents)

自定义

BaseTextSplitter作为文档切分的抽象类,提供了split_textsplit_document两个方法,分别用于切分文本和文档。

import copy
from abc import ABC, abstractmethod
from collections.abc import Iterable

from tongagents.knowledge.core import Document


class BaseTextSplitter(ABC):
    @abstractmethod
    def split_text(self, text: str) -> list[str]:
        """Split text into multiple components."""

    def split_document(self, documents: Iterable[Document]) -> list[Document]:
        """Split documents into multiple components."""
        new_documents = []
        for doc in documents:
            if isinstance(doc.content, str):
                metadata = copy.deepcopy(doc.metadata)
                for chunk in self.split_text(doc.content):
                    new_doc = Document(content=chunk, metadata=metadata)
                    new_documents.append(new_doc)
            else:
                new_documents.append(doc)
        return new_documents

开发者可以通过继承BaseTextSplitter并实现split_text方法来自定义Splitter。 以下是一个自定义的Splitter的伪代码,用于将文本按照句号切分。

from tongagents.tools.splitter.base import BaseTextSplitter

class SentenceSplitter(BaseTextSplitter):
    def split_text(self, text: str) -> list[str]:
        return text.split("。")

Embedding

嵌入过程将文本转换为向量表示,使其能够在向量空间中进行相似性比较。 TongAgents SDK 支持使用SentenceTransformer加载模型进行文本嵌入。也支持调用兼容OpenAI API格式的embedding http服务进行文本嵌入。

快速开始

## 基于SentenceTransformer
import os
from tongagents.knowledge.embedding.openai_embedding import OpenAIEmbedding
from tongagents.knowledge.embedding.sentence_transformer_embedding import SentenceTransformerEmbedding

embedding = SentenceTransformerEmbedding(
    model_name="BAAI/bge-large-zh-v1.5",
    device="cpu",
    dimensions=1024,
)
embedding.embed_texts(["text1", "text2"])

## 基于兼容OpenAI API格式的服务
os.environ["OPENAI_API_KEY"] = "skn-xxx"
os.environ["OPENAI_BASE_URL"] = "https://your-openai-api-url"
os.environ["OPENAI_API_VERSION"] = "api-version"

embedding = OpenAIEmbedding()
embedding.embed_texts(["text1", "text2"])

自定义

BaseEmbedding作为文本嵌入的抽象类,提供了embed_textembed_textsembed_documents三个方法,分别用于嵌入单个文本、多个文本和多个文档。

from abc import ABC, abstractmethod
import numpy as np
from pydantic import Field
from tongagents.knowledge.core import Document


class BaseEmbedding(ABC):
    dimensions: int | None = Field(
        default=0,
        description="The number of dimensions on the output embedding vectors.",
    )

    def embed_text(self, text: str) -> np.array:
        """Embed text.

        Args:
            text: Text to embed.

        Returns:
            Embedding.
        """
        return self.embed_texts([text])[0]

    @abstractmethod
    def embed_texts(self, texts: list[str]) -> list[np.array]:
        """Embed list of text.

        Args:
            texts: List of text to embed.

        Returns:
            List of embeddings.
        """

    def embed_documents(
        self, documents: list[Document], max_batch_size: int | None
    ) -> list[np.ndarray]:
        """Embed list of documents.

        Args:
            documents: List of documents to embed.
            max_batch_size: If None, all documents are embedded at once.

        Returns:
            List of embeddings.
        """
        if max_batch_size is None:
            max_batch_size = len(documents)

        embeddings = []
        for i in range(0, len(documents), max_batch_size):
            batch = documents[i : i + max_batch_size]
            embeddings.extend(self.embed_texts([doc.content for doc in batch]))
        return embeddings

    def __call__(self, texts: list[str]) -> list[np.array]:
        return self.embed_texts(texts)

开发者可以通过继承BaseEmbedding并实现embed_texts方法来自定义Embedding。以下是一个自定义的Embedding的伪代码,用于将文本转换为随机向量。

import numpy as np
from tongagents.knowledge.embedding.base import BaseEmbedding

class RandomEmbedding(BaseEmbedding):
    def embed_texts(self, texts: list[str]) -> list[np.array]:
        return [np.random.rand(self.dimensions) for _ in range(len(texts))]