」工欲善其事,必先利其器。「—孔子《論語.錄靈公》
首頁 > 程式設計 > Ibis 聲明式多引擎資料堆疊

Ibis 聲明式多引擎資料堆疊

發佈於2024-08-30
瀏覽:244

TL;DR

I recently came across the Ju Data Engineering Newsletter by Julien Hurault on the multi-engine data stack. The idea is simple; we'd like to easily port our code across any backend while retaining the flexibility to grow our pipeline as new backends and features are developed. This entails in at least the following high-level workflows:

  1. Offloading part of a SQL query to serverless engines with DuckDB, polars, DataFusion, chdb etc.
  2. Right-size pipeline for various development and deployment scenarios. For example, developers can work locally and ship to production with confidence.
  3. Apply database style optimizations to your pipelines automatically.

In this post, we dive into how we can implement the multi-engine pipeline from a programming language; Instead of SQL, we use propose using a Dataframe API that can be used for both interactive and batch use-cases. Specifically, we show how to break up our pipeline into smaller pieces and execute them across DuckDB, pandas, and Snowflake. We also discuss the advantages of a multi-engine data stack and highlight emerging trends in the field.

The code implemented in this post is available on GitHub^[In order to quickly try out repo, I also provide a nix flake]. The reference work in the newsletter with original implementation is here.

Overview

Multi-engine data stack pipeline works as follows: Some data lands in an S3 bucket, gets preprocessed to remove any duplicates and then loaded into a Snowflake table, where it is transformed further with ML or Snowflake specific functions^[Please note we do not go into implementing the types of things that might be possible in Snowflake and assume that as a requirement for the workflow]. The pipeline takes orders as parquet files that get saved into landing location, are preprocessed and then stored at the staging location in an S3 bucket. The staging data is then loaded in Snowflake to connect downstream BI tools to it. The pipeline is tied together by SQL dbt with one model for each backend and the newsletter chooses Dagster as the orchestration tool.

Declarative Multi-Engine Data Stack with Ibis

Today, we are going to dive into how we can convert our pandas code to Ibis expressions, reproducing the complete example for Julien Hurault's multi engine stack example 1. Instead of using dbt Models and SQL, we use ibis and some Python to compile and orchestrate SQL engines from a shell. By rewriting our code as Ibis expressions, we can declaratively build our data pipelines with deferred execution. Moreover, Ibis supports over 20 backends, so we can write code once and port our ibis.exprs to multiple backends. To further simplify, we leave scheduling and task orchestration2 provided by Dagster, up to the reader.

Core Concept of Multi-Engine Data Stack

Here are the core concepts of the multi-engine data stack as outlined in Julien's newsletter:

Declarative Multi-Engine Data Stack with Ibis

  1. Multi-Engine Data Stack: The concept involves combining different data engines like Snowflake, Spark, DuckDB, and BigQuery. This approach aims to reduce costs, limit vendor lock-in, and increase flexibility. Julien mentions that for certain benchmark queries, using DuckDB could achieve a significant cost reduction compared to Snowflake.
  2. Development of a Cross-Engine Query Layer: The newsletter highlights advancements in technology that allow data teams to transpile their SQL or Dataframe code from one engine to another seamlessly. This development is crucial for maintaining efficiency across different engines.
  3. Use of Apache Iceberg and Alternatives: While Apache Iceberg is seen as a potential unified storage layer, its integration is not yet mature to be used in a dbt project. Instead, Julien has opted to use Parquet files stored in S3, accessed by both DuckDB and Snowflake, in his Proof of Concept (PoC).
  4. Orchestration and Engines in PoC: For the project, Julien used Dagster as the orchestrator, which simplifies the job scheduling of different engines within a dbt project. The engines combined in this PoC were DuckDB and Snowflake.

Why DataFrames and Ibis?

While the pipeline above is nice for ETL and ELT, sometimes we want the power of a full programming language instead of a Query Language like SQL e.g. debugging, testing, complex UDFs etc. For scientific exploration, interactive computing is essential as data scientists need to quickly iterate on their code, visualize the results, and make decisions based on the data.

DataFrames are such a data structure: DataFrames are used to process ordered data and apply compute operations on it in an interactive manner. They provide the flexibility to be able to process large data with SQL style operations, but also provides lower level control to edit cell level changes ala Excel Sheets. Typically, the expectation is that all data is processed in-memory and typically fits in-memory. Moreover, DataFrames make it easy to go back and forth between deferred/batch and interactive modes.

DataFrames excel^[no pun intended] at enabling folks to apply user-defined functions and releases a user from the limitations of SQL i.e. You can now re-use code, test your operations, easily extend relational machinery for complex operations. DataFrames also make it easy to quickly go from Tabular representation of data into Arrays and Tensors expected by Machine Learning libraries.

Specialized and in-process databases e.g. DuckDB for OLAP3, are blurring the boundary between a remote heavy weight database like Snowflake and an ergonomic library like pandas. We believe this is an opportunity for allowing DataFrames to process larger than memory data while maintaining the interactivity expectations and developer feel of a local Python shell, making larger than memory data feel small.

Technical Deep Dive

Our implementation focuses on the 4 concepts presented earlier:

  1. Multi-Engine Data Stack: We will use DuckDB, pandas, and Snowflake as our engines.
  2. Cross-Engine Query Layer: We will use Ibis to write our expressions and compile them to run on DuckDB, pandas, and Snowflake.
  3. Apache Iceberg and Alternatives: We will use Parquet files stored locally as our storage layer with the expectation that its trivial to extend to S3 using s3fs package.
  4. Orchestration and Engines in PoC: We will focus on fine-grained scheduling for engines and leave orchestration to the reader. Fine-grained scheduling is more aligned with Ray, Dask, PySpark as compared to orchestration frameworks e.g. Dagster, Airflow etc.

Implementing with pandas

Declarative Multi-Engine Data Stack with Ibis
pandas is the quintessential DataFrame library and perhaps provides the simplest way to implement the above workflow. First, we generate random data borrowing from the implementation in the newsletter.

#| echo: false
import pandas as pd
from multi_engine_stack_ibis.generator import generate_random_data
generate_random_data("landing/orders.parquet")
df = pd.read_parquet("landing/orders.parquet")
deduped = df.drop_duplicates(["order_id", "dt"])

The pandas implementation is imperative in style and is designed so the data that can fit in memory. The pandas API is hard to compile down to SQL with all its nuances and largely sits in its own special place bringing together Python visualization, plotting, machine learning, AI and complex processing libraries.

pt.write_pandas(
    conn,
    deduped,
    table_name="T_ORDERS",
    auto_create_table=True,
    quote_identifiers=False,
    table_type="temporary"
)

After de-duplicating using pandas operators, we are ready to send the data to Snowflake. Snowflake has a method called write_pandas that comes in handy for our use-case.

Implementing with Ibis aka Ibisify

One pandas limitation is that it has its own API that does not quite map back to relational algebra. Ibis is such a library that's literally built by people who built pandas to provide a sane expressions system that can be mapped back to multiple SQL backends. Ibis takes inspiration from the dplyr R package to build a new expression system that can easily map back to relational algebra and thus compile to SQL. It also is declarative in style, enabling us to apply database style optimizations on the complete logical plan or the expression. Ibis is a key component for enabling composability as highlighted in the excellent composable codex.

#| echo: false
import pathlib

import ibis
import ibis.backends.pandas.executor
import ibis.expr.types.relations
from ibis import _

from multi_engine_stack_ibis.generator import generate_random_data
from multi_engine_stack_ibis.utils import (MyExecutor, checkpoint_parquet,
                                           create_table_snowflake,
                                           replace_unbound)
from multi_engine_stack_ibis.connections import make_ibis_snowflake_connection



ibis.backends.pandas.executor.PandasExecutor = MyExecutor
setattr(ibis.expr.types.relations.Table, "checkpoint_parquet", checkpoint_parquet)
setattr(
    ibis.expr.types.relations.Table,
    "create_table_snowflake",
    create_table_snowflake,
)
ibis.set_backend("pandas")
p_staging = pathlib.Path("staging/staging.parquet")
p_landing = pathlib.Path("landing/orders.parquet")

snow_backend = make_ibis_snowflake_connection(database="MULTI_ENGINE", schema="PUBLIC", warehouse="COMPUTE_WH")
expr = (
  ibis.read_parquet(p_landing)
  .mutate(
      row_number=ibis.row_number().over(group_by=[_.order_id], order_by=[_.dt]))
  .filter(_.row_number == 0)
  .checkpoint_parquet(p_staging)
  .create_table_snowflake("T_ORDERS")
)
expr

Ibis expression prints itself as a plan that is akin to traditional Logical Plan in databases. A Logical Plan is a tree of relational algebra operators that describes the computation that needs to be performed. This plan is then optimized by the query optimizer and converted into a physical plan that is executed by the query executor. Ibis expressions are similar to Logical Plans in that they describe the computation that needs to be performed, but they are not executed immediately. Instead, they are compiled into SQL and executed on the backend when needed. Logical Plan is generally at a higher level of granularity than a DAG produced by a task scheduling framework like Dask. In theory, this plan could be compiled down to Dask's DAG.

While pandas is embedded and is just a pip install away, it still has much documented limitations with plenty of performance improvements left on the table. This is where the recent embedded databases like DuckDB fill the gap of packing the full punch of a SQL engine, with all of its optimizations and benefiting from years of research that is as easy to import as is pandas. In this world, at minimum we can delegate all relational and SQL parts of our pipeline in pandas to DuckDB and only get the processed data ready for complex user defined Python.

Now, we are ready to take our Ibisified code and compile our expression above to execute on arbitrary engines, to truly realize the write-once-run-anywhere paradigm: We have successfully decoupled our compute engine with the expression system describing our computation.

Multi-Engine Data Stack w/ Ibis

DuckDB pandas Snowflake

Let's break our expression above into smaller parts and have them run across DuckDB, pandas and Snowflake. Note that we are not doing anything once the data lands in Snowflake and just show that we can select the data. Instead, we are leaving that up to the user's imagination what is possible with Snowflake native features.

Notice our expression above is bound to the pandas backend. First, lets create an UnboundTable expression to not have to depend on a backend when writing our expressions.

Declarative Multi-Engine Data Stack with Ibis

schema = {
    "user_id": "int64",
    "dt": "timestamp",
    "order_id": "string",
    "quantity": "int64",
    "purchase_price": "float64",
    "sku": "string",
    "row_number": "int64",
}

first_expr_for = (
    ibis.table(schema, name="orders")
    .mutate(
        row_number=ibis.row_number().over(group_by=[_.order_id], order_by=[_.dt])
    )
    .filter(_.row_number == 0)
)
first_expr_for

Next, we replace the UnboundTable expression with the DuckDB backend and execute it with to_parquet method4. This step is covered by the checkpoint_parquet operator that we added to pandas backend above. Here is an excellent blog that discusses inserting data into Snowflake from any Ibis backend with to_pyarrow functionality.

data = pd.read_parquet("landing/orders.parquet")
duck_backend = ibis.duckdb.connect()
duck_backend.con.execute("CREATE TABLE orders as SELECT * from data")

bind_to_duckdb = replace_unbound(first_expr_for, duck_backend) 
bind_to_duckdb.to_parquet(p_staging)
to_sql = ibis.to_sql(bind_to_duckdb)
print(to_sql)

Once the above step creates the de-duplicated table, we can then send data to Snowflake using the pandas backend. This functionality is covered by create_table_snowflake operator that we added to pandas backend above.

second_expr_for = ibis.table(schema, name="T_ORDERS") # nothing special just a reading the data from orders table
snow_backend.create_table("T_ORDERS", schema=second_expr_for.schema(), temp=True)
pandas_backend = ibis.pandas.connect({"T_ORDERS": pd.read_parquet(p_staging)})
snow_backend.insert("T_ORDERS", pandas_backend.to_pyarrow(second_expr_for))

Finally, we can select the data from the Snowflake table to verify that the data has been loaded successfully.

third_expr_for = ibis.table(schema, name="T_ORDERS") # add you Snowflake ML functions here
third_expr_for

Declarative Multi-Engine Data Stack with Ibis

We successfully broke up our computation in pieces, albeit manually, and executed them across DuckDB, pandas, and Snowflake. This demonstrates the flexibility and power of a multi-engine data stack, allowing users to leverage the strengths of different engines to optimize their data processing pipelines.

Acknowledgments

I'd like to thank Neal Richardson, Dan Lovell and Daniel Mesejo for providing the initial feedback on the post. I highly appreciate the early review and encouragement by Wes McKinney.

Resources

  • The Road to Composable Data Systems
  • The Composable Codex
  • Apache Arrow
  • Multi-Engine Data Stack Newsleter v0 v1
  • Ibis, the portable dataframe library
  • dbt Docs
  • Dagster Docs
  • LanceDB
  • KuzuDB
  • DuckDB

  1. In this post, we have primarily focused on v0 of the multi-engine data stack. In the latest version, Apache Iceberg is included as a storage and data format layer. NYC Taxi data is used instead of the random Orders data treated in this and v0 of the posts.  ↩

  2. Orchestration Vs fine-grained scheduling: ↩

    • The orchestration is left to the reader. The orchestration can be done using a tool like Dagster, Prefect, or Apache Airflow.
    • The fine-grained scheduling can be done using a tool like Dask, Ray, or Spark.
  3. Some of the examples of in-process databases is described in this post extending DuckDB example above to newer purpose built databases like LanceDB and KuzuDB.  ↩

  4. The Ibis docs use backend.to_pandas(expr) commands to bind and run the expression in the same go. Instead, we use replace_unbound method to show a generic way to just compile the expression and not execute it to said backend. This is just for illustration purposes. All the code below, uses the backend.to_pyarrow methods from here on. ↩

版本聲明 本文轉載於:https://dev.to/letsql/declarative-multi-engine-data-stack-with-ibis-3015?1如有侵犯,請聯絡[email protected]刪除
最新教學 更多>
  • 如何在Java列表中有效計算元素的發生?
    如何在Java列表中有效計算元素的發生?
    計數列表中的元素出現在列表 中,在java編程中,列舉列表中列舉元素出現的任務來自列表。為此,收集框架提供了全面的工具套件。 在這種情況下,Batocurrences變量將保持值3,代表動物列表中的“ BAT”出現的數量。 &&& [此方法是簡單的,可以得出準確的結果,使其成為計算列表中元素出現的...
    程式設計 發佈於2025-02-19
  • 可以在純CS中將多個粘性元素彼此堆疊在一起嗎?
    可以在純CS中將多個粘性元素彼此堆疊在一起嗎?
    https://webthemez.com/demo/sticky-multi-header-scroll/index.html </main> <section> display:grid; grid-template-col...
    程式設計 發佈於2025-02-19
  • 如何使用替換指令在GO MOD中解析模塊路徑差異?
    如何使用替換指令在GO MOD中解析模塊路徑差異?
    克服go mod中的模塊路徑差異 github.com/coreos/etcd/integration imports :解析GO.mod:模塊將其路徑聲明為: go.etcd.io/bbolt [&&&&&&&&&&&&&&&&&&&&&&&&&&&& github.com/coreos/b...
    程式設計 發佈於2025-02-19
  • 為什麼PYTZ最初顯示出意外的時區偏移?
    為什麼PYTZ最初顯示出意外的時區偏移?
    與pytz 最初從pytz獲得特定的偏移。例如,亞洲/hong_kong最初顯示一個七個小時37分鐘的偏移: 差異源 考慮以下代碼: < pre> import pytz [&& &&&&&&華&& && && && &&&華dt2 = hk.localize(dateTime(2012,1...
    程式設計 發佈於2025-02-19
  • 插入資料時如何修復「常規錯誤:2006 MySQL 伺服器已消失」?
    插入資料時如何修復「常規錯誤:2006 MySQL 伺服器已消失」?
    插入記錄時如何解決“一般錯誤:2006 MySQL 服務器已消失”介紹:將數據插入MySQL 數據庫有時會導致錯誤“一般錯誤:2006 MySQL 服務器已消失”。當與服務器的連接丟失時會出現此錯誤,通常是由於 MySQL 配置中的兩個變量之一所致。 解決方案:解決此錯誤的關鍵是調整wait_tim...
    程式設計 發佈於2025-02-19
  • 如何使用Python的記錄模塊實現自定義處理?
    如何使用Python的記錄模塊實現自定義處理?
    使用Python的Loggging Module 確保正確處理和登錄對於疑慮和維護的穩定性至關重要Python應用程序。儘管手動捕獲和記錄異常是一種可行的方法,但它可能乏味且容易出錯。 解決此問題,Python允許您覆蓋默認的異常處理機制,並將其重定向為登錄模塊。這提供了一種方便而係統的方法來捕獲...
    程式設計 發佈於2025-02-19
  • 如何使用PHP從XML文件中有效地檢索屬性值?
    如何使用PHP從XML文件中有效地檢索屬性值?
    從php 您的目標可能是檢索“ varnum”屬性值,其中提取數據的傳統方法可能會使您留下PHP陷入困境。 使用simplexmlelement :: attributes()函數提供了簡單的解決方案。此函數可訪問對XML元素作為關聯數組的屬性: - > attributes()為$ att...
    程式設計 發佈於2025-02-19
  • Java是否允許多種返回類型:仔細研究通用方法?
    Java是否允許多種返回類型:仔細研究通用方法?
    在java中的多個返回類型:一個誤解介紹,其中foo是自定義類。該方法聲明似乎擁有兩種返回類型:列表和E。但是,情況確實如此嗎? 通用方法:拆開神秘 [方法僅具有單一的返回類型。相反,它採用機制,如鑽石符號“ ”。 分解方法簽名: :本節定義了一個通用類型參數,E。它表示該方法接受擴展FOO類的...
    程式設計 發佈於2025-02-19
  • 如何限制動態大小的父元素中元素的滾動範圍?
    如何限制動態大小的父元素中元素的滾動範圍?
    在交互式界面中實現垂直滾動元素的CSS高度限制 考慮一個佈局,其中我們具有與可滾動的映射div一起移動的subollable map div用戶的垂直滾動,同時保持其與固定側邊欄的對齊方式。但是,地圖的滾動無限期擴展,超過了視口的高度,阻止用戶訪問頁面頁腳。 可以限制地圖的滾動,我們可以利用CS...
    程式設計 發佈於2025-02-19
  • 如何克服PHP的功能重新定義限制?
    如何克服PHP的功能重新定義限制?
    克服PHP的函數重新定義限制在PHP中,多次定義一個相同名稱的函數是一個no-no。嘗試這樣做,如提供的代碼段所示,將導致可怕的“不能重新列出”錯誤。 // error:“ coss redeclare foo()” 但是,php工具腰帶中有一個隱藏的寶石:runkit擴展。它使您能夠靈活...
    程式設計 發佈於2025-02-19
  • 如何在JavaScript對像中動態設置鍵?
    如何在JavaScript對像中動態設置鍵?
    如何為JavaScript對像變量創建動態鍵,嘗試為JavaScript對象創建動態鍵,使用此Syntax jsObj['key' i] = 'example' 1;將不起作用。正確的方法採用方括號:他們維持一個長度屬性,該屬性反映了數字屬性(索引)和一個數字屬性的數量。標準對像沒有模仿這...
    程式設計 發佈於2025-02-19
  • 如何為PostgreSQL中的每個唯一標識符有效地檢索最後一行?
    如何為PostgreSQL中的每個唯一標識符有效地檢索最後一行?
    [2最後一行與數據集中的每個不同標識符關聯。考慮以下數據: 1 2014-02-01 kjkj 1 2014-03-11 ajskj 3 2014-02-01 sfdg 3 2014-06-12 fdsa 為了檢索數據集中每個唯一ID的最後一行信息,您可以在操作員上使用Postgres的有效效...
    程式設計 發佈於2025-02-19
  • 為什麼我會收到MySQL錯誤#1089:錯誤的前綴密鑰?
    為什麼我會收到MySQL錯誤#1089:錯誤的前綴密鑰?
    mySQL錯誤#1089:錯誤的前綴鍵錯誤descript 理解prefix keys primary鍵(movie_id(3))primary鍵(Movie_id) primary鍵(Movie_id) primary鍵(Movie_id) > `這將在整個Movie_ID列上建立標...
    程式設計 發佈於2025-02-19
  • 如何在整個HTML文檔中設計特定元素類型的第一個實例?
    如何在整個HTML文檔中設計特定元素類型的第一個實例?
    [2單獨使用CSS,整個HTML文檔可能是一個挑戰。 the:第一型偽級僅限於與其父元素中類型的第一個元素匹配。 以下CSS將使用添加的類樣式的第一個段落: }
    程式設計 發佈於2025-02-19

免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。

Copyright© 2022 湘ICP备2022001581号-3