”工欲善其事,必先利其器。“—孔子《论语.录灵公》
首页 > 编程 > 在 Python 中使用 OpenSearch 掌握 CRUD 操作:实用指南

在 Python 中使用 OpenSearch 掌握 CRUD 操作:实用指南

发布于2024-11-06
浏览:329

Mastering CRUD Operations with OpenSearch in Python: A Practical Guide

OpenSearch, an open-source alternative to Elasticsearch, is a powerful search and analytics engine built to handle large datasets with ease. In this blog, we’ll demonstrate how to perform basic CRUD (Create, Read, Update, Delete) operations in OpenSearch using Python.

Prerequisites:

  • Python 3.7
  • OpenSearch installed locally using Docker
  • Familiarity with RESTful APIs

Step 1: Setting Up OpenSearch Locally with Docker

To get started, we need a local OpenSearch instance. Below is a simple docker-compose.yml file that spins up OpenSearch and OpenSearch Dashboards.

version: '3'
services:
  opensearch-test-node-1:
    image: opensearchproject/opensearch:2.13.0
    container_name: opensearch-test-node-1
    environment:
      - cluster.name=opensearch-test-cluster
      - node.name=opensearch-test-node-1
      - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2
      - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - "DISABLE_INSTALL_DEMO_CONFIG=true"
      - "DISABLE_SECURITY_PLUGIN=true"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-test-data1:/usr/share/opensearch/data
    ports:
      - 9200:9200
      - 9600:9600
    networks:
      - opensearch-test-net

  opensearch-test-node-2:
    image: opensearchproject/opensearch:2.13.0
    container_name: opensearch-test-node-2
    environment:
      - cluster.name=opensearch-test-cluster
      - node.name=opensearch-test-node-2
      - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2
      - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - "DISABLE_INSTALL_DEMO_CONFIG=true"
      - "DISABLE_SECURITY_PLUGIN=true"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-test-data2:/usr/share/opensearch/data
    networks:
      - opensearch-test-net

  opensearch-test-dashboards:
    image: opensearchproject/opensearch-dashboards:2.13.0
    container_name: opensearch-test-dashboards
    ports:
      - 5601:5601
    expose:
      - "5601"
    environment:
      - 'OPENSEARCH_HOSTS=["http://opensearch-test-node-1:9200","http://opensearch-test-node-2:9200"]'
      - "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true"
    networks:
      - opensearch-test-net

volumes:
  opensearch-test-data1:
  opensearch-test-data2:

networks:
  opensearch-test-net:

Run the following command to bring up your OpenSearch instance:
docker-compose up
OpenSearch will be accessible at http://localhost:9200.

Step 2: Setting Up the Python Environment

python -m venv .venv
source .venv/bin/activate
pip install opensearch-py

We'll also structure our project as follows:

├── interfaces.py
├── main.py
├── searchservice.py
├── docker-compose.yml

Step 3: Defining Interfaces and Resources (interfaces.py)

In the interfaces.py file, we define our Resource and Resources classes. These will help us dynamically handle different resource types in OpenSearch (in this case, users).

from dataclasses import dataclass, field

@dataclass
class Resource:
    name: str

    def __post_init__(self) -> None:
        self.name = self.name.lower()

@dataclass
class Resources:
    users: Resource = field(default_factory=lambda: Resource("Users"))

Step 4: CRUD Operations with OpenSearch (searchservice.py)

In searchservice.py, we define an abstract class SearchService to outline the required operations. The HTTPOpenSearchService class then implements these CRUD methods, interacting with the OpenSearch client.

# coding: utf-8

import abc
import logging
import typing as t
from dataclasses import dataclass
from uuid import UUID

from interfaces import Resource, Resources
from opensearchpy import NotFoundError, OpenSearch

resources = Resources()


class SearchService(abc.ABC):
    def search(
        self,
        kinds: t.List[Resource],
        tenants_id: UUID,
        companies_id: UUID,
        query: t.Dict[str, t.Any],
    ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]:
        raise NotImplementedError

    def delete_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> None:
        raise NotImplementedError

    def index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> t.Dict[str, t.Any]:
        raise NotImplementedError

    def delete_document(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        document_id: str,
    ) -> t.Optional[t.Dict[str, t.Any]]:
        raise NotImplementedError

    def create_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> None:
        raise NotImplementedError


@dataclass(frozen=True)
class HTTPOpenSearchService(SearchService):
    client: OpenSearch

    def _gen_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> str:
        return (
            f"tenant_{str(UUID(str(tenants_id)))}"
            f"_company_{str(UUID(str(companies_id)))}"
            f"_kind_{kind.name}"
        )

    def index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> t.Dict[str, t.Any]:
        self.client.index(
            index=self._gen_index(kind, tenants_id, companies_id),
            body=data,
            id=data.get("id"),
        )
        return data

    def delete_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> None:
        try:
            index = self._gen_index(kind, tenants_id, companies_id)
            if self.client.indices.exists(index):
                self.client.indices.delete(index)
        except NotFoundError:
            pass

    def create_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> None:
        body: t.Dict[str, t.Any] = {}
        self.client.indices.create(
            index=self._gen_index(kind, tenants_id, companies_id),
            body=body,
        )

    def search(
        self,
        kinds: t.List[Resource],
        tenants_id: UUID,
        companies_id: UUID,
        query: t.Dict[str, t.Any],
    ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]:
        return self.client.search(
            index=",".join(
                [self._gen_index(kind, tenants_id, companies_id) for kind in kinds]
            ),
            body={"query": query},
        )

    def delete_document(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        document_id: str,
    ) -> t.Optional[t.Dict[str, t.Any]]:
        try:
            response = self.client.delete(
                index=self._gen_index(kind, tenants_id, companies_id),
                id=document_id,
            )
            return response
        except Exception as e:
            logging.error(f"Error deleting document: {e}")
            return None

Step 5: Implementing CRUD in Main (main.py)

In main.py, we demonstrate how to:

  • Create an index in OpenSearch.
  • Index documents with sample user data.
  • Search for documents based on a query.
  • Delete a document using its ID.

main.py

# coding=utf-8

import logging
import os
import typing as t
from uuid import uuid4

import searchservice
from interfaces import Resources
from opensearchpy import OpenSearch

resources = Resources()

logging.basicConfig(level=logging.INFO)

search_service = searchservice.HTTPOpenSearchService(
    client=OpenSearch(
        hosts=[
            {
                "host": os.getenv("OPENSEARCH_HOST", "localhost"),
                "port": os.getenv("OPENSEARCH_PORT", "9200"),
            }
        ],
        http_auth=(
            os.getenv("OPENSEARCH_USERNAME", ""),
            os.getenv("OPENSEARCH_PASSWORD", ""),
        ),
        use_ssl=False,
        verify_certs=False,
    ),
)

tenants_id: str = "f0835e2d-bd68-406c-99a7-ad63a51e9ef9"
companies_id: str = "bf58c749-c90a-41e2-b66f-6d98aae17a6c"
search_str: str = "frank"
document_id_to_delete: str = str(uuid4())

fake_data: t.List[t.Dict[str, t.Any]] = [
    {"id": document_id_to_delete, "name": "Franklin", "tech": "python,node,golang"},
    {"id": str(uuid4()), "name": "Jarvis", "tech": "AI"},
    {"id": str(uuid4()), "name": "Parry", "tech": "Golang"},
    {"id": str(uuid4()), "name": "Steve", "tech": "iOS"},
    {"id": str(uuid4()), "name": "Frank", "tech": "node"},
]

search_service.delete_index(
    kind=resources.users, tenants_id=tenants_id, companies_id=companies_id
)

search_service.create_index(
    kind=resources.users,
    tenants_id=tenants_id,
    companies_id=companies_id,
)

for item in fake_data:
    search_service.index(
        kind=resources.users,
        tenants_id=tenants_id,
        companies_id=companies_id,
        data=dict(tenants_id=tenants_id, companies_id=companies_id, **item),
    )

search_query: t.Dict[str, t.Any] = {
    "bool": {
        "must": [],
        "must_not": [],
        "should": [],
        "filter": [
            {"term": {"tenants_id.keyword": tenants_id}},
            {"term": {"companies_id.keyword": companies_id}},
        ],
    }
}
search_query["bool"]["must"].append(
    {
        "multi_match": {
            "query": search_str,
            "type": "phrase_prefix",
            "fields": ["name", "tech"],
        }
    }
)

search_results = search_service.search(
    kinds=[resources.users],
    tenants_id=tenants_id,
    companies_id=companies_id,
    query=search_query,
)

final_result = search_results.get("hits", {}).get("hits", [])
for item in final_result:
    logging.info(["Item -> ", item.get("_source", {})])

deleted_result = search_service.delete_document(
    kind=resources.users,
    tenants_id=tenants_id,
    companies_id=companies_id,
    document_id=document_id_to_delete,
)
logging.info(["Deleted result -> ", deleted_result])

Step 6: Running the project

docker compose up
python main.py

Results:

It should print found & deleted records information.

Step 7: Conclusion

In this blog, we’ve demonstrated how to set up OpenSearch locally using Docker and perform basic CRUD operations with Python. OpenSearch provides a powerful and scalable solution for managing and querying large datasets. While this guide focuses on integrating OpenSearch with dummy data, in real-world applications, OpenSearch is often used as a read-optimized store for faster data retrieval. In such cases, it is common to implement different indexing strategies to ensure data consistency by updating both the primary database and OpenSearch concurrently.

This ensures that OpenSearch remains in sync with your primary data source, optimizing both performance and accuracy in data retrieval.

References:

https://github.com/FranklinThaker/opensearch-integration-example

版本声明 本文转载于:https://dev.to/franklinthaker/mastering-crud-operations-with-opensearch-in-python-a-practical-guide-53k7?1如有侵犯,请联系[email protected]删除
最新教程 更多>
  • PHP 中的用户浏览器检测可靠吗?
    PHP 中的用户浏览器检测可靠吗?
    使用 PHP 进行可靠的用户浏览器检测确定用户的浏览器对于定制 Web 体验至关重要。 PHP 提供了两种可能的方法: $_SERVER['HTTP_USER_AGENT'] 和 get_browser() 函数。$_SERVER['HTTP_USER_AGENT']...
    编程 发布于2024-11-06
  • 增强您的 Web 动画:像专业人士一样优化 requestAnimationFrame
    增强您的 Web 动画:像专业人士一样优化 requestAnimationFrame
    流畅且高性能的动画在现代 Web 应用程序中至关重要。然而,管理不当可能会使浏览器的主线程过载,导致性能不佳和动画卡顿。 requestAnimationFrame (rAF) 是一种浏览器 API,旨在将动画与显示器的刷新率同步,从而确保与 setTimeout 等替代方案相比更流畅的运动。但有效...
    编程 发布于2024-11-06
  • 为什么MySQL服务器在60秒内就消失了?
    为什么MySQL服务器在60秒内就消失了?
    MySQL 服务器已消失 - 恰好在 60 秒内在此场景中,之前成功运行的 MySQL 查询现在遇到了60 秒后超时,显示错误“MySQL 服务器已消失”。即使调整了 wait_timeout 变量,问题仍然存在。分析:超时正好发生在 60 秒,这表明是设置而不是资源限制是原因。直接从 MySQL ...
    编程 发布于2024-11-06
  • 为什么带有“display: block”和“width: auto”的按钮无法拉伸以填充其容器?
    为什么带有“display: block”和“width: auto”的按钮无法拉伸以填充其容器?
    了解具有“display: block”和“width: auto”的按钮的行为当您设置“display: block”时一个按钮,它会调整其布局以占据可用的整个宽度。但是,如果将其与“width: auto”结合使用,则按钮会出现意外行为,并且无法拉伸以填充其容器。此行为源于按钮作为替换元素的基本...
    编程 发布于2024-11-06
  • 为 Bluesky Social 创建机器人
    为 Bluesky Social 创建机器人
    How the bot will work We will develop a bot for the social network Bluesky, we will use Golang for this, this bot will monitor some hashtags ...
    编程 发布于2024-11-06
  • 为什么 PHP 的浮点运算会产生意外的结果?
    为什么 PHP 的浮点运算会产生意外的结果?
    PHP 中的浮点数计算精度:为什么它很棘手以及如何克服它在 PHP 中处理浮点数时,这一点至关重要了解其固有的准确性限制。如代码片段所示:echo("success");} else {echo("error");} 您可能会惊讶地发现,尽管值之间的差异小于 ...
    编程 发布于2024-11-06
  • Python中可以通过变量ID逆向获取对象吗?
    Python中可以通过变量ID逆向获取对象吗?
    从 Python 中的变量 ID 检索对象引用Python 中的 id() 函数返回对象的唯一标识。人们很容易想知道是否可以反转此过程并从其 ID 获取对象。具体来说,我们想要检查取消引用变量的 ID 是否会检索原始对象:dereference(id(a)) == a理解解引用的概念及其在 Pyth...
    编程 发布于2024-11-06
  • Go 的 Defer 关键字如何在函数执行顺序中发挥作用?
    Go 的 Defer 关键字如何在函数执行顺序中发挥作用?
    了解 Go 的 Defer 关键字的功能使用 Go 时,了解 defer 关键字的行为至关重要。该关键字允许开发人员推迟函数的执行,直到周围的函数返回。但是,需要注意的是,函数的值和参数在执行 defer 语句时进行评估。示例:评估 Defer Order为了说明这一点,请考虑以下内容代码:pack...
    编程 发布于2024-11-06
  • WordPress Gutenberg 全局状态管理初学者指南
    WordPress Gutenberg 全局状态管理初学者指南
    构建复杂的 WordPress 块编辑器 (Gutenberg) 应用程序时,有效管理状态变得至关重要。这就是 @wordpress/data 发挥作用的地方。它允许您跨 WordPress 应用程序中的不同块和组件管理和共享全局状态。 如果您不熟悉管理全局状态或使用@wordpress/data,...
    编程 发布于2024-11-06
  • 亚马逊解析简单且完全由您自己完成
    亚马逊解析简单且完全由您自己完成
    I came across a script on the Internet that allows you to parse product cards from Amazon. And I just needed a solution to a problem like that. I wrac...
    编程 发布于2024-11-06
  • React JSX 如何在幕后转换为 JavaScript
    React JSX 如何在幕后转换为 JavaScript
    当您编写 React 时,您会经常看到 JSX – 一种在 JavaScript 代码中看起来像 HTML 的语法。但你有没有想过这段代码在浏览器中是如何运行的? 神奇之处在于:JSX 不是有效的 JavaScript!浏览器无法直接理解它。在幕后,像 Babel 这样的工具介入将 JSX 转换(或...
    编程 发布于2024-11-06
  • 如何通过 CSS 变换实现倾斜:两侧倾斜
    如何通过 CSS 变换实现倾斜:两侧倾斜
    使用 CSS 变换实现倾斜:倾斜两侧提供的图像展示了一种有趣的倾斜效果,该效果使元素的两个角都形成角度。要使用 CSS 转换重新创建此效果,请按照下列步骤操作:应用透视倾斜:要添加透视,请使用以下 CSS 属性:transform: perspective(distance) rotateY(ang...
    编程 发布于2024-11-06
  • Express.js 基础知识:初学者指南 - Node.js 教程系列 - 第 10 部分
    Express.js 基础知识:初学者指南 - Node.js 教程系列 - 第 10 部分
    介绍: 嘿!如果您是 Node.js 新手,您可能听说过 Express.js——一个用于构建 Web 服务器和 API 的轻量级、快速且灵活的框架。在本指南中,我将引导您了解 Express 的基础知识,并向您展示入门是多么容易。 准备好?让我们开始吧! 1.安装...
    编程 发布于2024-11-06
  • Python:未来的语言
    Python:未来的语言
    在不断发展的技术领域,某些编程语言已经占据主导地位,并塑造了我们构建软件和与软件交互的方式。其中,Python 脱颖而出,它不仅获得了巨大的普及,而且还将自己定位为未来技术的关键工具。其简单性、多功能性和强大的库使 Python 成为从 Web 开发到数据科学、人工智能、自动化等各种应用程序的首选语...
    编程 发布于2024-11-06
  • 如何在 PHP 中将 PDF 文件存储为 MySQL BLOB(带有代码示例)?
    如何在 PHP 中将 PDF 文件存储为 MySQL BLOB(带有代码示例)?
    使用 PHP 将 PDF 文件存储为 MySQL BLOB使用 PHP 在 MySQL 中将 PDF 文件存储为 BLOB(二进制大对象)时,建议考虑在数据库中存储二进制数据的潜在缺点。但是,如果您选择这样做,可以采用以下方法:首先,定义一个包含整数 ID 字段和名为 DATA 的 BLOB 列的表...
    编程 发布于2024-11-06

免责声明: 提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发到邮箱:[email protected] 我们会第一时间内为您处理。

Copyright© 2022 湘ICP备2022001581号-3