编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

使用Llamaindex实现结构化分层检索:革新多文档 RAG 架构

wxchong 2024-08-18 00:54:13 开源技术 7 ℃ 0 评论

介绍

在 RAG(检索和生成)等框架内管理和处理多个文档带来了重大挑战。关键不仅在于提取相关内容,还在于选择包含用户查询所寻求信息的适当文档。这种基于与用户查询一致的不同属性的动态文档选择的需求导致了结构化分层检索的发展,这是一种在多个文档中导航和访问信息的范式转换方法。

Source: Structural Retrieval

定义 Llamaindex

Llamaindex 是这种创新方法的最前沿,这是一个革命性的概念,它改变了文档选择和内容检索的过程。Llamaindex 的运作方式是将每个文档表示在简洁的元数据字典中,并丰富了结构化元数据和提取的摘要。此元数据字典用作一组过滤器,智能地存储在矢量数据库中。

Llamaindex 支持多层信息检索方法。它不仅对文档进行筛选,还利用元数据驱动的过滤器来简化选择过程。这些筛选器有助于通过采用自动检索机制使用户查询与最相关的文档保持一致。此过程涉及推断语义查询和确定向量数据库中的最佳筛选器集,有效地结合了文本到 SQL 和语义搜索的强大功能。

结构化分层检索的优点

采用由 Llamaindex 提供支持的结构化分层检索带来了许多好处:

  1. 增强的相关性:通过利用元数据驱动的过滤器,系统可以精确地识别和检索符合用户查询细微要求的文档。这确保了内容选择的更高相关性和准确性。
  2. 动态文档选择:与文档检索是静态的传统方法不同,Llamaindex 支持动态文档选择。系统根据文档的属性和结构化元数据灵活选择相关文档,智能地适应不同的用户查询。
  3. 高效的信息检索:结构化的分层检索显著提高了信息检索的效率。通过将文档预处理成元数据字典并将其存储在矢量数据库中,该系统简化了检索过程,最大限度地减少了计算开销并优化了搜索效率。
  4. 语义查询优化:文本转 SQL 和语义搜索的融合使系统能够更好地理解用户意图。Llamaindex 的自动检索机制将用户查询细化为语义结构,从而能够精确、细致地从文档存储库中检索信息。

代码实现

这个基于 Python 的实现展示了 Llamaindex 的基本概念,这是一个结构化的分层检索系统。Llamaindex 类初始化系统以管理矢量数据库中的文档元数据。

  • 文档添加:add_document 方法通过创建包含摘要和关键字等关键信息的元数据字典将文档添加到 Llamaindex。
  • 检索逻辑:retrieve_documents 方法通过将用户查询与矢量数据库中的元数据筛选器进行匹配来处理用户查询。出于演示目的,采用了基本的模拟匹配逻辑。
  • 匹配机制:match_metadata方法模拟用户查询与文档元数据之间的匹配过程。这是一种简化的演示逻辑,通常涉及更高级的 NLP 或语义分析技术。

本演示旨在说明 Llamaindex 的核心概念,展示它如何通过简化的 Python 实现来存储文档元数据并根据用户查询检索相关文档。

步骤 I:安装库

!pip install llama-index wandb llama_hub weaviate-client --quiet

第 2 步:导入库

import os
import openai
import logging
import sys
from IPython.display import Markdown, display

from llama_index.llms import OpenAI
from llama_index.callbacks import CallbackManager, WandbCallbackHandler
from llama_index import load_index_from_storage

import pandas as pd
from llama_index.query_engine import PandasQueryEngine

from pprint import pprint
from llama_index import (
    VectorStoreIndex,
    SimpleKeywordTableIndex,
    SimpleDirectoryReader,
    StorageContext,
    ServiceContext,
)

import nest_asyncio

nest_asyncio.apply()

#Setup  OPEN API Key
os.environ["OPENAI_API_KEY"] = ""

# openai_key = "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" #<--- Your API KEY
# openai.api_key = openai_key

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

# initialise WandbCallbackHandler and pass any wandb.init args
wandb_args = {"project":"llama-index-report"}
wandb_callback = WandbCallbackHandler(run_args=wandb_args)


# pass wandb_callback to the service context
callback_manager = CallbackManager([wandb_callback])
service_context = ServiceContext.from_defaults(llm=OpenAI(model="gpt-3.5-turbo-0613", temperature=0), chunk_size=1024, callback_manager=callback_manager)

第 3 步:下载 Github 问题

os.environ["GITHUB_TOKEN"] = ""

from llama_hub.github_repo_issues import (
    GitHubRepositoryIssuesReader,
    GitHubIssuesClient,
)

github_client = GitHubIssuesClient()
loader = GitHubRepositoryIssuesReader(
    github_client,
    owner="run-llama",
    repo="llama_index",
    verbose=True,
)

orig_docs = loader.load_data()

limit = 100

docs = []
for idx, doc in enumerate(orig_docs):
    doc.metadata["index_id"] = doc.id_
    if idx >= limit:
        break
    docs.append(doc)

# Output
Found 100 issues in the repo page 1
Resulted in 100 documents
Found 100 issues in the repo page 2
Resulted in 200 documents
Found 100 issues in the repo page 3
Resulted in 300 documents
Found 8 issues in the repo page 4
Resulted in 308 documents
No more issues found, stopping
from copy import deepcopy
import asyncio
from tqdm.asyncio import tqdm_asyncio
from llama_index import SummaryIndex, Document, ServiceContext
from llama_index.llms import OpenAI
from llama_index.async_utils import run_jobs


async def aprocess_doc(doc, include_summary: bool = True):
    """Process doc."""
    print(f"Processing {doc.id_}")
    metadata = doc.metadata

    date_tokens = metadata["created_at"].split("T")[0].split("-")
    year = int(date_tokens[0])
    month = int(date_tokens[1])
    day = int(date_tokens[2])

    assignee = (
        "" if "assignee" not in doc.metadata else doc.metadata["assignee"]
    )
    size = ""
    if len(doc.metadata["labels"]) > 0:
        size_arr = [l for l in doc.metadata["labels"] if "size:" in l]
        size = size_arr[0].split(":")[1] if len(size_arr) > 0 else ""
    new_metadata = {
        "state": metadata["state"],
        "year": year,
        "month": month,
        "day": day,
        "assignee": assignee,
        "size": size,
        "index_id": doc.id_,
    }

    # now extract out summary
    summary_index = SummaryIndex.from_documents([doc])
    query_str = "Give a one-sentence concise summary of this issue."
    query_engine = summary_index.as_query_engine(
        service_context=ServiceContext.from_defaults(
            llm=OpenAI(model="gpt-3.5-turbo")
        )
    )
    summary_txt = str(query_engine.query(query_str))

    new_doc = Document(text=summary_txt, metadata=new_metadata)
    return new_doc


async def aprocess_docs(docs):
    """Process metadata on docs."""

    new_docs = []
    tasks = []
    for doc in docs:
        task = aprocess_doc(doc)
        tasks.append(task)

    new_docs = await run_jobs(tasks, show_progress=True, workers=5)

    # new_docs = await tqdm_asyncio.gather(*tasks)

    return new_docs

new_docs = await aprocess_docs(docs)

# Output
Processing 9398
Processing 9427
..........
Processing 7744
Processing 9472
Processing 8475
Processing 9244
Processing 9618
100%|██████████| 100/100 [02:07<00:00,  1.27s/it]

第 4 步:将数据加载到 Weaviate Vector Store 中

from llama_index.vector_stores import WeaviateVectorStore
from llama_index.storage import StorageContext
from llama_index import VectorStoreIndex

import weaviate

# cloud
auth_config = weaviate.AuthApiKey(api_key="")
client = weaviate.Client(
    "https://<weaviate-cluster>.weaviate.network",
    auth_client_secret=auth_config,
)

class_name = "LlamaIndex_auto"


vector_store = WeaviateVectorStore(
    weaviate_client=client, index_name=class_name
)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

# Since "new_docs" are concise summaries, we can directly feed them as nodes into VectorStoreIndex
index = VectorStoreIndex(new_docs, storage_context=storage_context)

docs[0].metadata

# Output
{'state': 'open',
 'created_at': '2023-12-21T20:18:03Z',
 'url': 'https://api.github.com/repos/run-llama/llama_index/issues/9655',
 'source': 'https://github.com/run-llama/llama_index/pull/9655',
 'labels': ['size:L'],
 'index_id': '9655'}

第 5 步:使用原始文档进行 Weaviate 索引

vector_store = WeaviateVectorStore(
    weaviate_client=client, index_name=doc_class_name
)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

doc_index = VectorStoreIndex.from_documents(
    docs, storage_context=storage_context
)

第 6 步:建立自动检索机制

在这一部分中,我们的自动检索器的设置过程通过几个关键步骤展开:

  1. 模式定义:我们定义矢量数据库模式,包括元数据字段。这些规范在确定要推断的元数据过滤器时指导 LLM 输入提示。
  2. VectorIndexAutoRetriever 的初始化:实例化此类将创建一个利用我们的精简元数据索引的检索器。它需要定义的架构作为其输入。
  3. 创建包装检索器:此步骤有助于将每个节点的后处理转换为 IndexNode。此转换包含链接回源文档的索引 ID。此链接在后续部分中启用递归检索,依赖于 IndexNode 对象与下游检索器、查询引擎或其他节点连接。注意:我们正在努力完善此抽象以获得最佳功能。

6(a) 定义架构

from llama_index.vector_stores.types import MetadataInfo, VectorStoreInfo


vector_store_info = VectorStoreInfo(
    content_info="Github Issues",
    metadata_info=[
        MetadataInfo(
            name="state",
            description="Whether the issue is `open` or `closed`",
            type="string",
        ),
        MetadataInfo(
            name="year",
            description="The year issue was created",
            type="integer",
        ),
        MetadataInfo(
            name="month",
            description="The month issue was created",
            type="integer",
        ),
        MetadataInfo(
            name="day",
            description="The day issue was created",
            type="integer",
        ),
        MetadataInfo(
            name="assignee",
            description="The assignee of the ticket",
            type="string",
        ),
        MetadataInfo(
            name="size",
            description="How big the issue is (XS, S, M, L, XL, XXL)",
            type="string",
        ),
    ],
)

6(b) 实例化 VectorIndexAutoRetriever

from llama_index.retrievers import VectorIndexAutoRetriever

retriever = VectorIndexAutoRetriever(
    index,
    vector_store_info=vector_store_info,
    similarity_top_k=2,
    empty_query_top_k=10,  # if only metadata filters are specified, this is the limit
    verbose=True,
)
nodes = retriever.retrieve("Tell me about some issues on 12/11")
print(f"Number retrieved: {len(nodes)}")
print(nodes[0].metadata)

# Output
Using query str: 
Using filters: [('month', '==', 12), ('day', '==', 11)]
Number retrieved: 6
{'state': 'open', 'year': 2023, 'month': 12, 'day': 11, 'assignee': '', 'size': 'XL', 'index_id': '9431'}

6(c) 定义包装检索器

from llama_index.retrievers import BaseRetriever
from llama_index.indices.query.schema import QueryBundle
from llama_index.schema import IndexNode, NodeWithScore


class IndexAutoRetriever(BaseRetriever):
    """Index auto-retriever."""

    def __init__(self, retriever: VectorIndexAutoRetriever):
        """Init params."""
        self.retriever = retriever

    def _retrieve(self, query_bundle: QueryBundle):
        """Convert nodes to index node."""
        retrieved_nodes = self.retriever.retrieve(query_bundle)
        new_retrieved_nodes = []
        for retrieved_node in retrieved_nodes:
            index_id = retrieved_node.metadata["index_id"]
            index_node = IndexNode.from_text_node(
                retrieved_node.node, index_id=index_id
            )
            new_retrieved_nodes.append(
                NodeWithScore(node=index_node, score=retrieved_node.score)
            )
        return new_retrieved_nodes


index_retriever = IndexAutoRetriever(retriever=retriever)

步骤 7:建立递归检索机制

这种类型的检索器将一个检索器的每个节点连接到另一个检索器、查询引擎或节点。

该设置涉及将每个汇总的元数据节点链接到与与相应文档对应的 RAG 管道对齐的检索器。

此配置按如下方式进行:

  1. 为每个文档定义一个检索器,并将它们组织在字典中。
  2. 通过包含根检索器(汇总的元数据检索器)并在参数中合并其他特定于文档的检索器来定义递归检索器。
from llama_index.vector_stores.types import (
    MetadataFilter,
    MetadataFilters,
    FilterOperator,
)

retriever_dict = {}
query_engine_dict = {}
for doc in docs:
    index_id = doc.metadata["index_id"]
    # filter for the specific doc id
    filters = MetadataFilters(
        filters=[
            MetadataFilter(
                key="index_id", operator=FilterOperator.EQ, value=index_id
            ),
        ]
    )
    retriever = doc_index.as_retriever(filters=filters)
    query_engine = doc_index.as_query_engine(filters=filters)

    retriever_dict[index_id] = retriever
    query_engine_dict[index_id] = query_engine
from llama_index.retrievers import RecursiveRetriever

# note: can pass `agents` dict as `query_engine_dict` since every agent can be used as a query engine
recursive_retriever = RecursiveRetriever(
    "vector",
    retriever_dict={"vector": index_retriever, **retriever_dict},
    # query_engine_dict=query_engine_dict,
    verbose=True,
)

nodes = recursive_retriever.retrieve("Tell me about some issues on 12/11")

print(f"Number of source nodes: {len(nodes)}")
nodes[0].node.metadata

# Output
Retrieving with query id None: Tell me about some issues on 12/11
Using query str: 
Using filters: [('month', '==', 12), ('day', '==', 11)]
Retrieved node with id, entering: 9431
Retrieving with query id 9431: Tell me about some issues on 12/11
Retrieving text node: Dev awiss
# Description

Try to use clickhouse as vectorDB.
Try to chunk docs with independent parser service.
Special designed schema and tricks for better query and retriever. 

Fixes # (issue)

## Type of Change

Please delete options that are not relevant.

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] This change requires a documentation update

# How Has This Been Tested?

Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration

- [ ] Added new unit/integration tests
- [ ] Added new notebook (that tests end-to-end)
- [ ] I stared at the code and made sure it makes sense

# Suggested Checklist:

- [ ] I have performed a self-review of my own code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] I have added Google Colab support for the newly added notebooks.
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [ ] I ran `make format; make lint` to appease the lint gods
Retrieved node with id, entering: 9435
Retrieving with query id 9435: Tell me about some issues on 12/11
Retrieving text node: [Bug]: [nltk_data] Error loading punkt: <urlopen error [WinError 10060] A
### Bug Description

I am using a vector Index which connects to a chromaDB client as my database. I have initialized the index as a chat engine. When the query the chat engine, two things happen:

1. The response time is nearly 2-3mins.
2. It throws the below warning

```
[nltk_data] Error loading punkt: <urlopen error [WinError 10060] A
[nltk_data]     connection attempt failed because the connected party
[nltk_data]     did not properly respond after a period of time, or
[nltk_data]     established connection failed because connected host
[nltk_data]     has failed to respond>
```

### Version

0.9.8.post1

### Steps to Reproduce

Clone, setup and run the below repository: (Follow readme for instructions)
https://github.com/umang299/document-gpt

### Relevant Logs/Tracbacks

_No response_
Retrieved node with id, entering: 9426
Retrieving with query id 9426: Tell me about some issues on 12/11
Retrieving text node: Slack Loader with large lack channels
### Question Validation

- [X] I have searched both the documentation and discord for an answer.

### Question

Hi team,

I am using the [Slack Loader ](https://llamahub.ai/l/slack)from Llama Hub. For smaller Slack channels it works fine. However, for larger channels with lots of messages created over months, I keep seeing this message:

`Rate limit error reached, sleeping for: 10 seconds`

Is there a recommended / idiomatic way to load larger Slack channels to avoid this issue?
Retrieved node with id, entering: 9425
Retrieving with query id 9425: Tell me about some issues on 12/11
Retrieving text node: [Feature Request]: Make llama-index compartible with models finetuned and hosted on modal.com
### Feature Description

Modal.com is a cloud computing service that allows you to finetune and host models on their workers. They provide inference points for any models finetuned on their platform.

### Reason

I have not tried implementing the feature. I just read about the capabilities on modal.com and thought it would be a good integration feature for llama-index to allow for more configuration.

### Value of Feature

An integration feature to allow users who host their models on modal to use llama-index for their RAG and prompt engineering pipelines.
Retrieved node with id, entering: 9439
Retrieving with query id 9439: Tell me about some issues on 12/11
Retrieving text node: [Bug]: Metadata filter not working with Elastic search indexing 
### Bug Description

While retrieving from ES with multiple metadatafilter condition(OR/AND) its not taking it into account. It always performs an AND operation even if its explicitly mentioned OR.
Example below code should filter and retrieve only 'mafia' or "Stephen King" bit its not doing as expected.

filters = MetadataFilters(
    filters=[
        MetadataFilter(key="theme", value="Mafia"),
        MetadataFilter(key="author", value="Stephen King"),
    ],
    condition=FilterCondition.OR,
)

retriever = index.as_retriever(filters=filters)

### Version

0.9.13

### Steps to Reproduce

nodes = [
TextNode(
text="The Shawshank Redemption",
metadata={
"author": "Stephen King",
"theme": "Friendship",
},
),
TextNode(
text="The Godfather",
metadata={
"director": "Francis Ford Coppola",
"theme": "Mafia",
},
),
TextNode(
text="Inception",
metadata={
"director": "Christopher Nolan",
},
),
]

filters = MetadataFilters(
    filters=[
        MetadataFilter(key="theme", value="Mafia"),
        MetadataFilter(key="author", value="Stephen King"),
    ],
    condition=FilterCondition.OR,
)

retriever = index.as_retriever(filters=filters)

### Relevant Logs/Tracbacks

_No response_
Retrieved node with id, entering: 9427
Retrieving with query id 9427: Tell me about some issues on 12/11
Retrieving text node: [Feature Request]: Postgres BM25 support
### Feature Description

Feature: add a variation of PGVectorStore which uses ParadeDB's BM25 extension.

BM25 is now possible in Postgres with a Rust extension [pg_bm25): https://github.com/paradedb/paradedb/tree/dev/pg_bm25

Unsure if it might be better to use [pg_search](https://github.com/paradedb/paradedb/tree/dev/pg_search) and get HNSW at the same time..

I'm interested in contributing on this myself, but am just starting to look into it. Interested to hear others' thoughts.

### Reason

Although the code comments for the PGVectorStore class currently suggest BM25 search is present in Postgres - it is not.

### Value of Feature

BM25 retrieval hit rate and MRR is measurable better than Postgres full text search with tsvector and tsquery. Indexing is also supposed to be faster with pg_bm25.
Number of source nodes: 6
{'state': 'open',
 'created_at': '2023-12-11T10:17:52Z',
 'url': 'https://api.github.com/repos/run-llama/llama_index/issues/9431',
 'source': 'https://github.com/run-llama/llama_index/pull/9431',
 'labels': ['size:XL'],
 'index_id': '9431'}

第 8 步:插入RetrieverQueryEngine

from llama_index.query_engine import RetrieverQueryEngine
from llama_index import ServiceContext


llm = OpenAI(model="gpt-3.5-turbo")
service_context = ServiceContext.from_defaults(llm=llm)

query_engine = RetrieverQueryEngine.from_args(recursive_retriever, llm=llm)

response = query_engine.query(
    "Tell me about some open issues related to agents"
)

print(str(response)) 
# Output
There were several issues created on 12/11. One of them is a bug where the metadata filter is not working correctly with Elastic search indexing. Another bug involves an error loading the 'punkt' module in the NLTK library. There are also a couple of feature requests, one for adding Postgres BM25 support and another for making llama-index compatible with models finetuned and hosted on modal.com. Additionally, there is a question about using the Slack Loader with large Slack channels.

结论

总之,将 Llamaindex 集成到多文档 RAG 架构的结构中预示着信息检索的新时代。它能够根据结构化元数据动态选择文档,再加上语义查询优化的技巧,重塑了我们从庞大的文档存储库中导航和利用知识的方式,提高了检索过程的效率、相关性和准确性。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表