”工欲善其事,必先利其器。“—孔子《论语.录灵公》
首页 > 编程 > 在 Python 中使用 OpenSearch 掌握 CRUD 操作:实用指南

在 Python 中使用 OpenSearch 掌握 CRUD 操作:实用指南

发布于2024-11-06
浏览:184

Mastering CRUD Operations with OpenSearch in Python: A Practical Guide

OpenSearch, an open-source alternative to Elasticsearch, is a powerful search and analytics engine built to handle large datasets with ease. In this blog, we’ll demonstrate how to perform basic CRUD (Create, Read, Update, Delete) operations in OpenSearch using Python.

Prerequisites:

  • Python 3.7
  • OpenSearch installed locally using Docker
  • Familiarity with RESTful APIs

Step 1: Setting Up OpenSearch Locally with Docker

To get started, we need a local OpenSearch instance. Below is a simple docker-compose.yml file that spins up OpenSearch and OpenSearch Dashboards.

version: '3'
services:
  opensearch-test-node-1:
    image: opensearchproject/opensearch:2.13.0
    container_name: opensearch-test-node-1
    environment:
      - cluster.name=opensearch-test-cluster
      - node.name=opensearch-test-node-1
      - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2
      - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - "DISABLE_INSTALL_DEMO_CONFIG=true"
      - "DISABLE_SECURITY_PLUGIN=true"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-test-data1:/usr/share/opensearch/data
    ports:
      - 9200:9200
      - 9600:9600
    networks:
      - opensearch-test-net

  opensearch-test-node-2:
    image: opensearchproject/opensearch:2.13.0
    container_name: opensearch-test-node-2
    environment:
      - cluster.name=opensearch-test-cluster
      - node.name=opensearch-test-node-2
      - discovery.seed_hosts=opensearch-test-node-1,opensearch-test-node-2
      - cluster.initial_cluster_manager_nodes=opensearch-test-node-1,opensearch-test-node-2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - "DISABLE_INSTALL_DEMO_CONFIG=true"
      - "DISABLE_SECURITY_PLUGIN=true"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-test-data2:/usr/share/opensearch/data
    networks:
      - opensearch-test-net

  opensearch-test-dashboards:
    image: opensearchproject/opensearch-dashboards:2.13.0
    container_name: opensearch-test-dashboards
    ports:
      - 5601:5601
    expose:
      - "5601"
    environment:
      - 'OPENSEARCH_HOSTS=["http://opensearch-test-node-1:9200","http://opensearch-test-node-2:9200"]'
      - "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true"
    networks:
      - opensearch-test-net

volumes:
  opensearch-test-data1:
  opensearch-test-data2:

networks:
  opensearch-test-net:

Run the following command to bring up your OpenSearch instance:
docker-compose up
OpenSearch will be accessible at http://localhost:9200.

Step 2: Setting Up the Python Environment

python -m venv .venv
source .venv/bin/activate
pip install opensearch-py

We'll also structure our project as follows:

├── interfaces.py
├── main.py
├── searchservice.py
├── docker-compose.yml

Step 3: Defining Interfaces and Resources (interfaces.py)

In the interfaces.py file, we define our Resource and Resources classes. These will help us dynamically handle different resource types in OpenSearch (in this case, users).

from dataclasses import dataclass, field

@dataclass
class Resource:
    name: str

    def __post_init__(self) -> None:
        self.name = self.name.lower()

@dataclass
class Resources:
    users: Resource = field(default_factory=lambda: Resource("Users"))

Step 4: CRUD Operations with OpenSearch (searchservice.py)

In searchservice.py, we define an abstract class SearchService to outline the required operations. The HTTPOpenSearchService class then implements these CRUD methods, interacting with the OpenSearch client.

# coding: utf-8

import abc
import logging
import typing as t
from dataclasses import dataclass
from uuid import UUID

from interfaces import Resource, Resources
from opensearchpy import NotFoundError, OpenSearch

resources = Resources()


class SearchService(abc.ABC):
    def search(
        self,
        kinds: t.List[Resource],
        tenants_id: UUID,
        companies_id: UUID,
        query: t.Dict[str, t.Any],
    ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]:
        raise NotImplementedError

    def delete_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> None:
        raise NotImplementedError

    def index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> t.Dict[str, t.Any]:
        raise NotImplementedError

    def delete_document(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        document_id: str,
    ) -> t.Optional[t.Dict[str, t.Any]]:
        raise NotImplementedError

    def create_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> None:
        raise NotImplementedError


@dataclass(frozen=True)
class HTTPOpenSearchService(SearchService):
    client: OpenSearch

    def _gen_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> str:
        return (
            f"tenant_{str(UUID(str(tenants_id)))}"
            f"_company_{str(UUID(str(companies_id)))}"
            f"_kind_{kind.name}"
        )

    def index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        data: t.Dict[str, t.Any],
    ) -> t.Dict[str, t.Any]:
        self.client.index(
            index=self._gen_index(kind, tenants_id, companies_id),
            body=data,
            id=data.get("id"),
        )
        return data

    def delete_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> None:
        try:
            index = self._gen_index(kind, tenants_id, companies_id)
            if self.client.indices.exists(index):
                self.client.indices.delete(index)
        except NotFoundError:
            pass

    def create_index(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
    ) -> None:
        body: t.Dict[str, t.Any] = {}
        self.client.indices.create(
            index=self._gen_index(kind, tenants_id, companies_id),
            body=body,
        )

    def search(
        self,
        kinds: t.List[Resource],
        tenants_id: UUID,
        companies_id: UUID,
        query: t.Dict[str, t.Any],
    ) -> t.Dict[t.Literal["hits"], t.Dict[str, t.Any]]:
        return self.client.search(
            index=",".join(
                [self._gen_index(kind, tenants_id, companies_id) for kind in kinds]
            ),
            body={"query": query},
        )

    def delete_document(
        self,
        kind: Resource,
        tenants_id: UUID,
        companies_id: UUID,
        document_id: str,
    ) -> t.Optional[t.Dict[str, t.Any]]:
        try:
            response = self.client.delete(
                index=self._gen_index(kind, tenants_id, companies_id),
                id=document_id,
            )
            return response
        except Exception as e:
            logging.error(f"Error deleting document: {e}")
            return None

Step 5: Implementing CRUD in Main (main.py)

In main.py, we demonstrate how to:

  • Create an index in OpenSearch.
  • Index documents with sample user data.
  • Search for documents based on a query.
  • Delete a document using its ID.

main.py

# coding=utf-8

import logging
import os
import typing as t
from uuid import uuid4

import searchservice
from interfaces import Resources
from opensearchpy import OpenSearch

resources = Resources()

logging.basicConfig(level=logging.INFO)

search_service = searchservice.HTTPOpenSearchService(
    client=OpenSearch(
        hosts=[
            {
                "host": os.getenv("OPENSEARCH_HOST", "localhost"),
                "port": os.getenv("OPENSEARCH_PORT", "9200"),
            }
        ],
        http_auth=(
            os.getenv("OPENSEARCH_USERNAME", ""),
            os.getenv("OPENSEARCH_PASSWORD", ""),
        ),
        use_ssl=False,
        verify_certs=False,
    ),
)

tenants_id: str = "f0835e2d-bd68-406c-99a7-ad63a51e9ef9"
companies_id: str = "bf58c749-c90a-41e2-b66f-6d98aae17a6c"
search_str: str = "frank"
document_id_to_delete: str = str(uuid4())

fake_data: t.List[t.Dict[str, t.Any]] = [
    {"id": document_id_to_delete, "name": "Franklin", "tech": "python,node,golang"},
    {"id": str(uuid4()), "name": "Jarvis", "tech": "AI"},
    {"id": str(uuid4()), "name": "Parry", "tech": "Golang"},
    {"id": str(uuid4()), "name": "Steve", "tech": "iOS"},
    {"id": str(uuid4()), "name": "Frank", "tech": "node"},
]

search_service.delete_index(
    kind=resources.users, tenants_id=tenants_id, companies_id=companies_id
)

search_service.create_index(
    kind=resources.users,
    tenants_id=tenants_id,
    companies_id=companies_id,
)

for item in fake_data:
    search_service.index(
        kind=resources.users,
        tenants_id=tenants_id,
        companies_id=companies_id,
        data=dict(tenants_id=tenants_id, companies_id=companies_id, **item),
    )

search_query: t.Dict[str, t.Any] = {
    "bool": {
        "must": [],
        "must_not": [],
        "should": [],
        "filter": [
            {"term": {"tenants_id.keyword": tenants_id}},
            {"term": {"companies_id.keyword": companies_id}},
        ],
    }
}
search_query["bool"]["must"].append(
    {
        "multi_match": {
            "query": search_str,
            "type": "phrase_prefix",
            "fields": ["name", "tech"],
        }
    }
)

search_results = search_service.search(
    kinds=[resources.users],
    tenants_id=tenants_id,
    companies_id=companies_id,
    query=search_query,
)

final_result = search_results.get("hits", {}).get("hits", [])
for item in final_result:
    logging.info(["Item -> ", item.get("_source", {})])

deleted_result = search_service.delete_document(
    kind=resources.users,
    tenants_id=tenants_id,
    companies_id=companies_id,
    document_id=document_id_to_delete,
)
logging.info(["Deleted result -> ", deleted_result])

Step 6: Running the project

docker compose up
python main.py

Results:

It should print found & deleted records information.

Step 7: Conclusion

In this blog, we’ve demonstrated how to set up OpenSearch locally using Docker and perform basic CRUD operations with Python. OpenSearch provides a powerful and scalable solution for managing and querying large datasets. While this guide focuses on integrating OpenSearch with dummy data, in real-world applications, OpenSearch is often used as a read-optimized store for faster data retrieval. In such cases, it is common to implement different indexing strategies to ensure data consistency by updating both the primary database and OpenSearch concurrently.

This ensures that OpenSearch remains in sync with your primary data source, optimizing both performance and accuracy in data retrieval.

References:

https://github.com/FranklinThaker/opensearch-integration-example

版本声明 本文转载于:https://dev.to/franklinthaker/mastering-crud-operations-with-opensearch-in-python-a-practical-guide-53k7?1如有侵犯,请联系[email protected]删除
最新教程 更多>
  • 为什么Microsoft Visual C ++无法正确实现两台模板的实例?
    为什么Microsoft Visual C ++无法正确实现两台模板的实例?
    The Mystery of "Broken" Two-Phase Template Instantiation in Microsoft Visual C Problem Statement:Users commonly express concerns that Micro...
    编程 发布于2025-03-12
  • UTF-8 vs. Latin-1:字符编码大揭秘!
    UTF-8 vs. Latin-1:字符编码大揭秘!
    [utf-8和latin1 在他们的应用中,出现了一个基本问题:什么辨别特征区分了这两个编码?超出其字符表现能力,UTF-8具有额外的几个优势。从历史上看,MySQL对UTF-8的支持仅限于每个字符的三个字节,这阻碍了基本多语言平面(BMP)之外的字符的表示。但是,随着MySQL 5.5的出现,...
    编程 发布于2025-03-12
  • 大批
    大批
    [2 数组是对象,因此它们在JS中也具有方法。 切片(开始):在新数组中提取部分数组,而无需突变原始数组。 令ARR = ['a','b','c','d','e']; // USECASE:提取直到索引作...
    编程 发布于2025-03-12
  • 如何在Java字符串中有效替换多个子字符串?
    如何在Java字符串中有效替换多个子字符串?
    在java 中有效地替换多个substring,需要在需要替换一个字符串中的多个substring的情况下,很容易求助于重复应用字符串的刺激力量。 However, this can be inefficient for large strings or when working with nu...
    编程 发布于2025-03-12
  • Part SQL注入系列:高级SQL注入技巧详解
    Part SQL注入系列:高级SQL注入技巧详解
    [2 Waymap pentesting工具:单击此处 trixsec github:单击此处 trixsec电报:单击此处 高级SQL注入利用 - 第7部分:尖端技术和预防 欢迎参与我们SQL注入系列的第7部分!该分期付款将攻击者采用的高级SQL注入技术 1。高...
    编程 发布于2025-03-12
  • 为什么PYTZ最初显示出意外的时区偏移?
    为什么PYTZ最初显示出意外的时区偏移?
    与pytz 最初从pytz获得特定的偏移。例如,亚洲/hong_kong最初显示一个七个小时37分钟的偏移: 差异源利用本地化将时区分配给日期,使用了适当的时区名称和偏移量。但是,直接使用DateTime构造器分配时区不允许进行正确的调整。 example pytz.timezone(...
    编程 发布于2025-03-12
  • 如何修复\“常规错误:2006 MySQL Server在插入数据时已经消失\”?
    如何修复\“常规错误:2006 MySQL Server在插入数据时已经消失\”?
    How to Resolve "General error: 2006 MySQL server has gone away" While Inserting RecordsIntroduction:Inserting data into a MySQL database can...
    编程 发布于2025-03-12
  • 我们如何保护有关恶意内容的文件上传?
    我们如何保护有关恶意内容的文件上传?
    对文件上载上传到服务器的安全性问题可以引入重大的安全风险,因为用户可能会提供潜在的恶意内容。了解这些威胁并实施有效的缓解策略对于维持应用程序的安全性至关重要。用户可以将文件名操作以绕过安全措施。避免将其用于关键目的或使用其原始名称保存文件。用户提供的MIME类型可能不可靠。使用服务器端检查确定实际...
    编程 发布于2025-03-12
  • 如何使用JavaScript中的正则表达式从字符串中删除线路断裂?
    如何使用JavaScript中的正则表达式从字符串中删除线路断裂?
    在此代码方案中删除从字符串在JavaScript中解决此问题,根据操作系统的编码,对线断裂的识别不同。 Windows使用“ \ r \ n”序列,Linux采用“ \ n”,Apple系统使用“ \ r。” 来满足各种线路断裂的变化,可以使用以下正则表达式: [&& && &&&&&&&&&&&...
    编程 发布于2025-03-12
  • 为什么使用Firefox后退按钮时JavaScript执行停止?
    为什么使用Firefox后退按钮时JavaScript执行停止?
    导航历史记录问题:JavaScript使用Firefox Back Back 此行为是由浏览器缓存JavaScript资源引起的。要解决此问题并确保在后续页面访问中执行脚本,Firefox用户应设置一个空功能。 警报'); }; alert('inline Alert')...
    编程 发布于2025-03-12
  • 如何使用PHP将斑点(图像)正确插入MySQL?
    如何使用PHP将斑点(图像)正确插入MySQL?
    essue VALUES('$this->image_id','file_get_contents($tmp_image)')";This code builds a string in PHP, but the function call ...
    编程 发布于2025-03-12
  • 我可以将加密从McRypt迁移到OpenSSL,并使用OpenSSL迁移MCRYPT加密数据?
    我可以将加密从McRypt迁移到OpenSSL,并使用OpenSSL迁移MCRYPT加密数据?
    将我的加密库从mcrypt升级到openssl 问题:是否可以将我的加密库从McRypt升级到OpenSSL?如果是这样,如何?答案:是的,可以将您的Encryption库从McRypt升级到OpenSSL。可以使用openssl。附加说明: [openssl_decrypt()函数要求iv参...
    编程 发布于2025-03-12
  • 在Java中使用for-to-loop和迭代器进行收集遍历之间是否存在性能差异?
    在Java中使用for-to-loop和迭代器进行收集遍历之间是否存在性能差异?
    For Each Loop vs. Iterator: Efficiency in Collection TraversalIntroductionWhen traversing a collection in Java, the choice arises between using a for-...
    编程 发布于2025-03-12
  • 如何检查对象是否具有Python中的特定属性?
    如何检查对象是否具有Python中的特定属性?
    方法来确定对象属性存在寻求一种方法来验证对象中特定属性的存在。考虑以下示例,其中尝试访问不确定属性会引起错误: >>> a = someClass() >>> A.property Trackback(最近的最新电话): 文件“ ”,第1行, AttributeError: SomeClass...
    编程 发布于2025-03-12
  • Java HashSet/LinkedHashSet随机元素获取方法详解
    Java HashSet/LinkedHashSet随机元素获取方法详解
    在编程中找到一个随机元素,在编程中找到一个随机元素,从集合(例如集合)中选择一个随机元素很有用。 Java提供了多种类型的集合,包括障碍物和链接HASHSET。本文将探讨如何从这些特定集合实现的过程中选择一个随机元素。的java的hashset和linkedhashset a HashSet代表...
    编程 发布于2025-03-12

免责声明: 提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发到邮箱:[email protected] 我们会第一时间内为您处理。

Copyright© 2022 湘ICP备2022001581号-3