照片由 real-napster 在 Pixabay上拍摄
在我最近的一个项目中,我必须构建一个语义搜索系统,该系统可以高性能扩展并为报告搜索提供实时响应。我们在 AWS RDS 上使用 PostgreSQL 和 pgvector,并搭配 AWS Lambda 来实现这一目标。面临的挑战是允许用户使用自然语言查询而不是依赖严格的关键字进行搜索,同时确保响应时间在 1-2 秒甚至更短,并且只能利用 CPU 资源。
在这篇文章中,我将逐步介绍构建此搜索系统的步骤,从检索到重新排名,以及使用 OpenVINO 和智能批处理进行标记化进行的优化。
现代最先进的搜索系统通常由两个主要步骤组成:检索和重新排名。
1) 检索:第一步涉及根据用户查询检索相关文档的子集。这可以使用预先训练的嵌入模型来完成,例如 OpenAI 的小型和大型嵌入、Cohere 的嵌入模型或 Mixbread 的 mxbai 嵌入。检索的重点是通过测量文档与查询的相似性来缩小文档池的范围。
这是一个使用 Huggingface 的句子转换器库进行检索的简化示例,这是我最喜欢的库之一:
from sentence_transformers import SentenceTransformer import numpy as np # Load a pre-trained sentence transformer model model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") # Sample query and documents (vectorize the query and the documents) query = "How do I fix a broken landing gear?" documents = ["Report 1 on landing gear failure", "Report 2 on engine problems"] # Get embeddings for query and documents query_embedding = model.encode(query) document_embeddings = model.encode(documents) # Calculate cosine similarity between query and documents similarities = np.dot(document_embeddings, query_embedding) # Retrieve top-k most relevant documents top_k = np.argsort(similarities)[-5:] print("Top 5 documents:", [documents[i] for i in top_k])
2) 重新排名:检索到最相关的文档后,我们使用交叉编码器模型进一步提高这些文档的排名。此步骤会更准确地重新评估与查询相关的每个文档,重点关注更深入的上下文理解。
重新排名是有益的,因为它通过更精确地评分每个文档的相关性来增加额外的细化层。
下面是使用 cross-encoder/ms-marco-TinyBERT-L-2-v2(轻量级交叉编码器)进行重新排名的代码示例:
from sentence_transformers import CrossEncoder # Load the cross-encoder model cross_encoder = CrossEncoder("cross-encoder/ms-marco-TinyBERT-L-2-v2") # Use the cross-encoder to rerank top-k retrieved documents query_document_pairs = [(query, doc) for doc in documents] scores = cross_encoder.predict(query_document_pairs) # Rank documents based on the new scores top_k_reranked = np.argsort(scores)[-5:] print("Top 5 reranked documents:", [documents[i] for i in top_k_reranked])
在开发过程中,我发现在使用句子转换器的默认设置处理 1,000 个报告时,标记化和预测阶段花费了相当长的时间。这造成了性能瓶颈,特别是因为我们的目标是实时响应。
下面我使用 SnakeViz 分析了我的代码以可视化性能:
正如您所看到的,标记化和预测步骤异常缓慢,导致提供搜索结果的严重延迟。总的来说,平均需要 4-5 秒。这是因为标记化和预测步骤之间存在阻塞操作。如果我们还添加其他操作,如数据库调用、过滤等,我们很容易就总共需要 8-9 秒。
我面临的问题是:我们可以让它更快吗?答案是肯定的,通过利用OpenVINO,一个针对 CPU 推理的优化后端。 OpenVINO 有助于加速英特尔硬件上的深度学习模型推理,我们在 AWS Lambda 上使用该硬件。
OpenVINO 优化的代码示例
以下是我如何将 OpenVINO 集成到搜索系统中以加快推理速度:
import argparse import numpy as np import pandas as pd from typing import Any from openvino.runtime import Core from transformers import AutoTokenizer def load_openvino_model(model_path: str) -> Core: core = Core() model = core.read_model(model_path ".xml") compiled_model = core.compile_model(model, "CPU") return compiled_model def rerank( compiled_model: Core, query: str, results: list[str], tokenizer: AutoTokenizer, batch_size: int, ) -> np.ndarray[np.float32, Any]: max_length = 512 all_logits = [] # Split results into batches for i in range(0, len(results), batch_size): batch_results = results[i : i batch_size] inputs = tokenizer( [(query, item) for item in batch_results], padding=True, truncation="longest_first", max_length=max_length, return_tensors="np", ) # Extract input tensors (convert to NumPy arrays) input_ids = inputs["input_ids"].astype(np.int32) attention_mask = inputs["attention_mask"].astype(np.int32) token_type_ids = inputs.get("token_type_ids", np.zeros_like(input_ids)).astype( np.int32 ) infer_request = compiled_model.create_infer_request() output = infer_request.infer( { "input_ids": input_ids, "attention_mask": attention_mask, "token_type_ids": token_type_ids, } ) logits = output["logits"] all_logits.append(logits) all_logits = np.concatenate(all_logits, axis=0) return all_logits def fetch_search_data(search_text: str) -> pd.DataFrame: # Usually you would fetch the data from a database df = pd.read_csv("cnbc_headlines.csv") df = df[~df["Headlines"].isnull()] texts = df["Headlines"].tolist() # Load the model and rerank openvino_model = load_openvino_model("cross-encoder-openvino-model/model") tokenizer = AutoTokenizer.from_pretrained("cross-encoder/ms-marco-TinyBERT-L-2-v2") rerank_scores = rerank(openvino_model, search_text, texts, tokenizer, batch_size=16) # Add the rerank scores to the DataFrame and sort by the new scores df["rerank_score"] = rerank_scores df = df.sort_values(by="rerank_score", ascending=False) return df if __name__ == "__main__": parser = argparse.ArgumentParser( description="Fetch search results with reranking using OpenVINO" ) parser.add_argument( "--search_text", type=str, required=True, help="The search text to use for reranking", ) args = parser.parse_args() df = fetch_search_data(args.search_text) print(df)
通过这种方法,我们可以获得 2-3 倍的加速,将原来的 4-5 秒减少到 1-2 秒。完整的工作代码位于 Github 上。
提高性能的另一个关键因素是优化令牌化流程并调整批量大小和令牌长度。通过增加批量大小(batch_size = 16)和减少令牌长度(max_length = 512),我们可以并行化令牌化并减少重复操作的开销。在我们的实验中,我们发现 16 到 64 之间的 batch_size 效果很好,任何更大的值都会降低性能。同样,我们将 max_length 设置为 128,如果报告的平均长度相对较短,则该值是可行的。通过这些更改,我们实现了 8 倍的整体加速,将重新排名时间缩短至 1 秒以下,即使在 CPU 上也是如此。
在实践中,这意味着尝试不同的批量大小和令牌长度,以找到数据速度和准确性之间的适当平衡。通过这样做,我们看到响应时间显着缩短,使得搜索系统即使有 1,000 份报告也可扩展。
通过使用 OpenVINO 并优化标记化和批处理,我们能够构建一个高性能语义搜索系统,满足仅 CPU 设置的实时要求。事实上,我们的整体速度提升了 8 倍。使用句子转换器的检索和使用交叉编码器模型的重新排名相结合,创造了强大的、用户友好的搜索体验。
如果您正在构建响应时间和计算资源受到限制的类似系统,我强烈建议您探索 OpenVINO 和智能批处理以释放更好的性能。
希望您喜欢这篇文章。如果您觉得这篇文章有用,请给我一个赞,以便其他人也可以找到它,并与您的朋友分享。在 Linkedin 上关注我,了解我的最新工作。感谢您的阅读!
免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。
Copyright© 2022 湘ICP备2022001581号-3