」工欲善其事,必先利其器。「—孔子《論語.錄靈公》
首頁 > 程式設計 > 在 Python 中使用 OpenSearch 掌握 CRUD 操作:實用指南

在 Python 中使用 OpenSearch 掌握 CRUD 操作:實用指南

發佈於2024-11-06
瀏覽:302

Mastering CRUD Operations with OpenSearch in Python: A Practical Guide

OpenSearch, an open-source alternative to Elasticsearch, is a powerful search and analytics engine built to handle large datasets with ease. In this blog, we’ll demonstrate how to perform basic CRUD (Create, Read, Update, Delete) operations in OpenSearch using Python.

Prerequisites:

  • Python 3.7
  • OpenSearch installed locally using Docker
  • Familiarity with RESTful APIs

Step 1: Setting Up OpenSearch Locally with Docker

To get started, we need a local OpenSearch instance. Below is a simple docker-compose.yml file that spins up OpenSearch and OpenSearch Dashboards.

version: '3'
services:
  opensearch-test-node-1:
    image: opensearchproject/opensearch:2.13.0
    container_name: opensearch-test-node-1
    environment:
      - cluster.name=opensearch-test-cluster
      - node.name=opensearch-test-node-1
      - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2
      - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - "DISABLE_INSTALL_DEMO_CONFIG=true"
      - "DISABLE_SECURITY_PLUGIN=true"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-test-data1:/usr/share/opensearch/data
    ports:
      - 9200:9200
      - 9600:9600
    networks:
      - opensearch-test-net

  opensearch-test-node-2:
    image: opensearchproject/opensearch:2.13.0
    container_name: opensearch-test-node-2
    environment:
      - cluster.name=opensearch-test-cluster
      - node.name=opensearch-test-node-2
      - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2
      - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - "DISABLE_INSTALL_DEMO_CONFIG=true"
      - "DISABLE_SECURITY_PLUGIN=true"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-test-data2:/usr/share/opensearch/data
    networks:
      - opensearch-test-net

  opensearch-test-dashboards:
    image: opensearchproject/opensearch-dashboards:2.13.0
    container_name: opensearch-test-dashboards
    ports:
      - 5601:5601
    expose:
      - "5601"
    environment:
      - 'OPENSEARCH_HOSTS=["http://opensearch-test-node-1:9200","http://opensearch-test-node-2:9200"]'
      - "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true"
    networks:
      - opensearch-test-net

volumes:
  opensearch-test-data1:
  opensearch-test-data2:

networks:
  opensearch-test-net:

Run the following command to bring up your OpenSearch instance:
docker-compose up
OpenSearch will be accessible at http://localhost:9200.

Step 2: Setting Up the Python Environment

python -m venv .venv
source .venv/bin/activate
pip install opensearch-py

We'll also structure our project as follows:

├── interfaces.py
├── main.py
├── searchservice.py
├── docker-compose.yml

Step 3: Defining Interfaces and Resources (interfaces.py)

In the interfaces.py file, we define our Resource and Resources classes. These will help us dynamically handle different resource types in OpenSearch (in this case, users).

from dataclasses import dataclass, field

@dataclass
class Resource:
    name: str

    def __post_init__(self) -> None:
        self.name = self.name.lower()

@dataclass
class Resources:
    users: Resource = field(default_factory=lambda: Resource("Users"))

Step 4: CRUD Operations with OpenSearch (searchservice.py)

In searchservice.py, we define an abstract class SearchService to outline the required operations. The HTTPOpenSearchService class then implements these CRUD methods, interacting with the OpenSearch client.

# coding: utf-8

import abc
import logging
import typing as t
from dataclasses import dataclass
from uuid import UUID

from interfaces import Resource, Resources
from opensearchpy import NotFoundError, OpenSearch

resources = Resources()


class SearchService(abc.ABC):
    def search(
        self,
        kinds: t.List[Resource],
        tenants_id: UUID,
        companies_id: UUID,
        query: t.Dict[str, t.Any],
    ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]:
        raise NotImplementedError

    def delete_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> None:
        raise NotImplementedError

    def index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> t.Dict[str, t.Any]:
        raise NotImplementedError

    def delete_document(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        document_id: str,
    ) -> t.Optional[t.Dict[str, t.Any]]:
        raise NotImplementedError

    def create_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> None:
        raise NotImplementedError


@dataclass(frozen=True)
class HTTPOpenSearchService(SearchService):
    client: OpenSearch

    def _gen_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> str:
        return (
            f"tenant_{str(UUID(str(tenants_id)))}"
            f"_company_{str(UUID(str(companies_id)))}"
            f"_kind_{kind.name}"
        )

    def index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> t.Dict[str, t.Any]:
        self.client.index(
            index=self._gen_index(kind, tenants_id, companies_id),
            body=data,
            id=data.get("id"),
        )
        return data

    def delete_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> None:
        try:
            index = self._gen_index(kind, tenants_id, companies_id)
            if self.client.indices.exists(index):
                self.client.indices.delete(index)
        except NotFoundError:
            pass

    def create_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> None:
        body: t.Dict[str, t.Any] = {}
        self.client.indices.create(
            index=self._gen_index(kind, tenants_id, companies_id),
            body=body,
        )

    def search(
        self,
        kinds: t.List[Resource],
        tenants_id: UUID,
        companies_id: UUID,
        query: t.Dict[str, t.Any],
    ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]:
        return self.client.search(
            index=",".join(
                [self._gen_index(kind, tenants_id, companies_id) for kind in kinds]
            ),
            body={"query": query},
        )

    def delete_document(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        document_id: str,
    ) -> t.Optional[t.Dict[str, t.Any]]:
        try:
            response = self.client.delete(
                index=self._gen_index(kind, tenants_id, companies_id),
                id=document_id,
            )
            return response
        except Exception as e:
            logging.error(f"Error deleting document: {e}")
            return None

Step 5: Implementing CRUD in Main (main.py)

In main.py, we demonstrate how to:

  • Create an index in OpenSearch.
  • Index documents with sample user data.
  • Search for documents based on a query.
  • Delete a document using its ID.

main.py

# coding=utf-8

import logging
import os
import typing as t
from uuid import uuid4

import searchservice
from interfaces import Resources
from opensearchpy import OpenSearch

resources = Resources()

logging.basicConfig(level=logging.INFO)

search_service = searchservice.HTTPOpenSearchService(
    client=OpenSearch(
        hosts=[
            {
                "host": os.getenv("OPENSEARCH_HOST", "localhost"),
                "port": os.getenv("OPENSEARCH_PORT", "9200"),
            }
        ],
        http_auth=(
            os.getenv("OPENSEARCH_USERNAME", ""),
            os.getenv("OPENSEARCH_PASSWORD", ""),
        ),
        use_ssl=False,
        verify_certs=False,
    ),
)

tenants_id: str = "f0835e2d-bd68-406c-99a7-ad63a51e9ef9"
companies_id: str = "bf58c749-c90a-41e2-b66f-6d98aae17a6c"
search_str: str = "frank"
document_id_to_delete: str = str(uuid4())

fake_data: t.List[t.Dict[str, t.Any]] = [
    {"id": document_id_to_delete, "name": "Franklin", "tech": "python,node,golang"},
    {"id": str(uuid4()), "name": "Jarvis", "tech": "AI"},
    {"id": str(uuid4()), "name": "Parry", "tech": "Golang"},
    {"id": str(uuid4()), "name": "Steve", "tech": "iOS"},
    {"id": str(uuid4()), "name": "Frank", "tech": "node"},
]

search_service.delete_index(
    kind=resources.users, tenants_id=tenants_id, companies_id=companies_id
)

search_service.create_index(
    kind=resources.users,
    tenants_id=tenants_id,
    companies_id=companies_id,
)

for item in fake_data:
    search_service.index(
        kind=resources.users,
        tenants_id=tenants_id,
        companies_id=companies_id,
        data=dict(tenants_id=tenants_id, companies_id=companies_id, **item),
    )

search_query: t.Dict[str, t.Any] = {
    "bool": {
        "must": [],
        "must_not": [],
        "should": [],
        "filter": [
            {"term": {"tenants_id.keyword": tenants_id}},
            {"term": {"companies_id.keyword": companies_id}},
        ],
    }
}
search_query["bool"]["must"].append(
    {
        "multi_match": {
            "query": search_str,
            "type": "phrase_prefix",
            "fields": ["name", "tech"],
        }
    }
)

search_results = search_service.search(
    kinds=[resources.users],
    tenants_id=tenants_id,
    companies_id=companies_id,
    query=search_query,
)

final_result = search_results.get("hits", {}).get("hits", [])
for item in final_result:
    logging.info(["Item -> ", item.get("_source", {})])

deleted_result = search_service.delete_document(
    kind=resources.users,
    tenants_id=tenants_id,
    companies_id=companies_id,
    document_id=document_id_to_delete,
)
logging.info(["Deleted result -> ", deleted_result])

Step 6: Running the project

docker compose up
python main.py

Results:

It should print found & deleted records information.

Step 7: Conclusion

In this blog, we’ve demonstrated how to set up OpenSearch locally using Docker and perform basic CRUD operations with Python. OpenSearch provides a powerful and scalable solution for managing and querying large datasets. While this guide focuses on integrating OpenSearch with dummy data, in real-world applications, OpenSearch is often used as a read-optimized store for faster data retrieval. In such cases, it is common to implement different indexing strategies to ensure data consistency by updating both the primary database and OpenSearch concurrently.

This ensures that OpenSearch remains in sync with your primary data source, optimizing both performance and accuracy in data retrieval.

References:

https://github.com/FranklinThaker/opensearch-integration-example

版本聲明 本文轉載於:https://dev.to/franklinthaker/mastering-crud-operations-with-opensearch-in-python-a-practical-guide-53k7?1如有侵犯,請聯絡[email protected]刪除
最新教學 更多>
  • 增強您的 Web 動畫:像專業人士一樣最佳化 requestAnimationFrame
    增強您的 Web 動畫:像專業人士一樣最佳化 requestAnimationFrame
    流畅且高性能的动画在现代 Web 应用程序中至关重要。然而,管理不当可能会使浏览器的主线程过载,导致性能不佳和动画卡顿。 requestAnimationFrame (rAF) 是一种浏览器 API,旨在将动画与显示器的刷新率同步,从而确保与 setTimeout 等替代方案相比更流畅的运动。但有效...
    程式設計 發佈於2024-11-06
  • 為什麼MySQL伺服器在60秒內就消失了?
    為什麼MySQL伺服器在60秒內就消失了?
    MySQL 伺服器已消失- 恰好在60 秒內在此場景中,之前成功運行的MySQL 查詢現在遇到了60 秒後逾時,顯示錯誤「MySQL 伺服器已消失」。即使調整了 wait_timeout 變量,問題仍然存在。 分析:超時正好發生在 60 秒,這表明是設置而不是資源限制是原因。直接從 MySQL 客戶...
    程式設計 發佈於2024-11-06
  • 為什麼帶有“display: block”和“width: auto”的按鈕無法拉伸以填充其容器?
    為什麼帶有“display: block”和“width: auto”的按鈕無法拉伸以填充其容器?
    了解具有“display: block”和“width: auto”的按鈕的行為當您設定“display: block”時一個按鈕,它會調整其佈局以佔據可用的整個寬度。但是,如果將其與“width: auto”結合使用,則按鈕會出現意外行為,並且無法拉伸以填充其容器。此行為源自於按鈕作為替換元素的基...
    程式設計 發佈於2024-11-06
  • 為 Bluesky Social 創作機器人
    為 Bluesky Social 創作機器人
    How the bot will work We will develop a bot for the social network Bluesky, we will use Golang for this, this bot will monitor some hashtags ...
    程式設計 發佈於2024-11-06
  • 為什麼 PHP 的浮點運算會產生意外的結果?
    為什麼 PHP 的浮點運算會產生意外的結果?
    PHP 中的浮點數計算精度:為什麼它很棘手以及如何克服它在PHP 中處理浮點數時,這一點至關重要了解其固有的準確性限制。如程式片段所示:echo("success");} else {echo("error");} 您可能會驚訝地發現,儘管值之間的差異小於0....
    程式設計 發佈於2024-11-06
  • Python中可以透過變數ID逆向取得物件嗎?
    Python中可以透過變數ID逆向取得物件嗎?
    從 Python 中的變數 ID 擷取物件參考Python 中的 id() 函數傳回物件的唯一識別。人們很容易想知道是否可以反轉此過程並從其 ID 取得物件。 具體來說,我們想要檢查取消引用變數的ID 是否會擷取原始物件:dereference(id(a)) == a瞭解引用的概念及其在Python...
    程式設計 發佈於2024-11-06
  • Go 的 Defer 關鍵字如何在函數執行順序中發揮作用?
    Go 的 Defer 關鍵字如何在函數執行順序中發揮作用?
    了解 Go 的 Defer 關鍵字的功能使用 Go 時,了解 defer 關鍵字的行為至關重要。此關鍵字允許開發人員推遲函數的執行,直到周圍的函數返回。但是,需要注意的是,函數的值和參數在執行 defer 語句時進行評估。 範例:評估 Defer Order為了說明這一點,請考慮以下內容代碼:pac...
    程式設計 發佈於2024-11-06
  • WordPress Gutenberg 全域狀態管理初學者指南
    WordPress Gutenberg 全域狀態管理初學者指南
    构建复杂的 WordPress 块编辑器 (Gutenberg) 应用程序时,有效管理状态变得至关重要。这就是 @wordpress/data 发挥作用的地方。它允许您跨 WordPress 应用程序中的不同块和组件管理和共享全局状态。 如果您不熟悉管理全局状态或使用@wordpress/data,...
    程式設計 發佈於2024-11-06
  • 亞馬遜解析簡單且完全由您自己完成
    亞馬遜解析簡單且完全由您自己完成
    I came across a script on the Internet that allows you to parse product cards from Amazon. And I just needed a solution to a problem like that. I wrac...
    程式設計 發佈於2024-11-06
  • React JSX 如何在幕後轉換為 JavaScript
    React JSX 如何在幕後轉換為 JavaScript
    當您編寫 React 時,您會經常看到 JSX – 在 JavaScript 程式碼中看起來像 HTML 的語法。但你有沒有想過這段程式碼在瀏覽器中是如何運作的呢? 神奇之處在於:JSX 不是有效的 JavaScript!瀏覽器無法直接理解它。在幕後,像 Babel 這樣的工具介入將 JSX 轉換...
    程式設計 發佈於2024-11-06
  • 如何透過 CSS 變換實現傾斜:兩側傾斜
    如何透過 CSS 變換實現傾斜:兩側傾斜
    使用CSS 變換實現傾斜:傾斜兩側提供的圖像展示了一種有趣的傾斜效果,該效果使元素的兩個角都形成角度。若要使用 CSS 轉換重新建立此效果,請按照下列步驟操作:應用透視傾斜:若要新增透視,請使用下列 CSS屬性:transform: perspective(distance) rotateY(ang...
    程式設計 發佈於2024-11-06
  • Express.js 基礎:初學者指南 - Node.js 教學系列 - 第 10 部分
    Express.js 基礎:初學者指南 - Node.js 教學系列 - 第 10 部分
    介紹: 嘿!如果您是 Node.js 新手,您可能聽說過 Express.js——一個用於建立 Web 伺服器和 API 的輕量級、快速且靈活的框架。在本指南中,我將引導您了解 Express 的基礎知識,並向您展示入門是多麼容易。 準備好?讓我們開始吧! 1....
    程式設計 發佈於2024-11-06
  • Python:未來的語言
    Python:未來的語言
    在不断发展的技术领域,某些编程语言已经占据主导地位,并塑造了我们构建软件和与软件交互的方式。其中,Python 脱颖而出,它不仅获得了巨大的普及,而且还将自己定位为未来技术的关键工具。其简单性、多功能性和强大的库使 Python 成为从 Web 开发到数据科学、人工智能、自动化等各种应用程序的首选语...
    程式設計 發佈於2024-11-06
  • 如何在 PHP 中將 PDF 檔案儲存為 MySQL BLOB(帶有程式碼範例)?
    如何在 PHP 中將 PDF 檔案儲存為 MySQL BLOB(帶有程式碼範例)?
    使用PHP 將PDF 檔案儲存為MySQL BLOB使用PHP 在MySQL 中將PDF 檔案儲存為BLOB(二進位大物件)時,建議考慮在資料庫中儲存二進位資料的潛在缺點。但是,如果您選擇這樣做,可以採用以下方法:首先,定義一個包含整數 ID 欄位和名為 DATA 的 BLOB 欄位的資料表。 用於...
    程式設計 發佈於2024-11-06
  • 使用 React Router v6 在 React 中實作麵包屑
    使用 React Router v6 在 React 中實作麵包屑
    面包屑在网页开发中非常重要,因为它们为用户提供了一种方法来跟踪他们在我们网页中的当前位置,并帮助我们的网页导航。 在本指南中,我们将使用 React-router v6 和 Bootstrap 在 React 中实现面包屑。 React-router v6 是 React 和 React Nati...
    程式設計 發佈於2024-11-06

免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。

Copyright© 2022 湘ICP备2022001581号-3