」工欲善其事,必先利其器。「—孔子《論語.錄靈公》
首頁 > 程式設計 > Ibis 聲明式多引擎資料堆疊

Ibis 聲明式多引擎資料堆疊

發佈於2024-08-30
瀏覽:587

TL;DR

I recently came across the Ju Data Engineering Newsletter by Julien Hurault on the multi-engine data stack. The idea is simple; we'd like to easily port our code across any backend while retaining the flexibility to grow our pipeline as new backends and features are developed. This entails in at least the following high-level workflows:

  1. Offloading part of a SQL query to serverless engines with DuckDB, polars, DataFusion, chdb etc.
  2. Right-size pipeline for various development and deployment scenarios. For example, developers can work locally and ship to production with confidence.
  3. Apply database style optimizations to your pipelines automatically.

In this post, we dive into how we can implement the multi-engine pipeline from a programming language; Instead of SQL, we use propose using a Dataframe API that can be used for both interactive and batch use-cases. Specifically, we show how to break up our pipeline into smaller pieces and execute them across DuckDB, pandas, and Snowflake. We also discuss the advantages of a multi-engine data stack and highlight emerging trends in the field.

The code implemented in this post is available on GitHub^[In order to quickly try out repo, I also provide a nix flake]. The reference work in the newsletter with original implementation is here.

Overview

Multi-engine data stack pipeline works as follows: Some data lands in an S3 bucket, gets preprocessed to remove any duplicates and then loaded into a Snowflake table, where it is transformed further with ML or Snowflake specific functions^[Please note we do not go into implementing the types of things that might be possible in Snowflake and assume that as a requirement for the workflow]. The pipeline takes orders as parquet files that get saved into landing location, are preprocessed and then stored at the staging location in an S3 bucket. The staging data is then loaded in Snowflake to connect downstream BI tools to it. The pipeline is tied together by SQL dbt with one model for each backend and the newsletter chooses Dagster as the orchestration tool.

Declarative Multi-Engine Data Stack with Ibis

Today, we are going to dive into how we can convert our pandas code to Ibis expressions, reproducing the complete example for Julien Hurault's multi engine stack example 1. Instead of using dbt Models and SQL, we use ibis and some Python to compile and orchestrate SQL engines from a shell. By rewriting our code as Ibis expressions, we can declaratively build our data pipelines with deferred execution. Moreover, Ibis supports over 20 backends, so we can write code once and port our ibis.exprs to multiple backends. To further simplify, we leave scheduling and task orchestration2 provided by Dagster, up to the reader.

Core Concept of Multi-Engine Data Stack

Here are the core concepts of the multi-engine data stack as outlined in Julien's newsletter:

Declarative Multi-Engine Data Stack with Ibis

  1. Multi-Engine Data Stack: The concept involves combining different data engines like Snowflake, Spark, DuckDB, and BigQuery. This approach aims to reduce costs, limit vendor lock-in, and increase flexibility. Julien mentions that for certain benchmark queries, using DuckDB could achieve a significant cost reduction compared to Snowflake.
  2. Development of a Cross-Engine Query Layer: The newsletter highlights advancements in technology that allow data teams to transpile their SQL or Dataframe code from one engine to another seamlessly. This development is crucial for maintaining efficiency across different engines.
  3. Use of Apache Iceberg and Alternatives: While Apache Iceberg is seen as a potential unified storage layer, its integration is not yet mature to be used in a dbt project. Instead, Julien has opted to use Parquet files stored in S3, accessed by both DuckDB and Snowflake, in his Proof of Concept (PoC).
  4. Orchestration and Engines in PoC: For the project, Julien used Dagster as the orchestrator, which simplifies the job scheduling of different engines within a dbt project. The engines combined in this PoC were DuckDB and Snowflake.

Why DataFrames and Ibis?

While the pipeline above is nice for ETL and ELT, sometimes we want the power of a full programming language instead of a Query Language like SQL e.g. debugging, testing, complex UDFs etc. For scientific exploration, interactive computing is essential as data scientists need to quickly iterate on their code, visualize the results, and make decisions based on the data.

DataFrames are such a data structure: DataFrames are used to process ordered data and apply compute operations on it in an interactive manner. They provide the flexibility to be able to process large data with SQL style operations, but also provides lower level control to edit cell level changes ala Excel Sheets. Typically, the expectation is that all data is processed in-memory and typically fits in-memory. Moreover, DataFrames make it easy to go back and forth between deferred/batch and interactive modes.

DataFrames excel^[no pun intended] at enabling folks to apply user-defined functions and releases a user from the limitations of SQL i.e. You can now re-use code, test your operations, easily extend relational machinery for complex operations. DataFrames also make it easy to quickly go from Tabular representation of data into Arrays and Tensors expected by Machine Learning libraries.

Specialized and in-process databases e.g. DuckDB for OLAP3, are blurring the boundary between a remote heavy weight database like Snowflake and an ergonomic library like pandas. We believe this is an opportunity for allowing DataFrames to process larger than memory data while maintaining the interactivity expectations and developer feel of a local Python shell, making larger than memory data feel small.

Technical Deep Dive

Our implementation focuses on the 4 concepts presented earlier:

  1. Multi-Engine Data Stack: We will use DuckDB, pandas, and Snowflake as our engines.
  2. Cross-Engine Query Layer: We will use Ibis to write our expressions and compile them to run on DuckDB, pandas, and Snowflake.
  3. Apache Iceberg and Alternatives: We will use Parquet files stored locally as our storage layer with the expectation that its trivial to extend to S3 using s3fs package.
  4. Orchestration and Engines in PoC: We will focus on fine-grained scheduling for engines and leave orchestration to the reader. Fine-grained scheduling is more aligned with Ray, Dask, PySpark as compared to orchestration frameworks e.g. Dagster, Airflow etc.

Implementing with pandas

Declarative Multi-Engine Data Stack with Ibis
pandas is the quintessential DataFrame library and perhaps provides the simplest way to implement the above workflow. First, we generate random data borrowing from the implementation in the newsletter.

#| echo: false
import pandas as pd
from multi_engine_stack_ibis.generator import generate_random_data
generate_random_data("landing/orders.parquet")
df = pd.read_parquet("landing/orders.parquet")
deduped = df.drop_duplicates(["order_id", "dt"])

The pandas implementation is imperative in style and is designed so the data that can fit in memory. The pandas API is hard to compile down to SQL with all its nuances and largely sits in its own special place bringing together Python visualization, plotting, machine learning, AI and complex processing libraries.

pt.write_pandas(
    conn,
    deduped,
    table_name="T_ORDERS",
    auto_create_table=True,
    quote_identifiers=False,
    table_type="temporary"
)

After de-duplicating using pandas operators, we are ready to send the data to Snowflake. Snowflake has a method called write_pandas that comes in handy for our use-case.

Implementing with Ibis aka Ibisify

One pandas limitation is that it has its own API that does not quite map back to relational algebra. Ibis is such a library that's literally built by people who built pandas to provide a sane expressions system that can be mapped back to multiple SQL backends. Ibis takes inspiration from the dplyr R package to build a new expression system that can easily map back to relational algebra and thus compile to SQL. It also is declarative in style, enabling us to apply database style optimizations on the complete logical plan or the expression. Ibis is a key component for enabling composability as highlighted in the excellent composable codex.

#| echo: false
import pathlib

import ibis
import ibis.backends.pandas.executor
import ibis.expr.types.relations
from ibis import _

from multi_engine_stack_ibis.generator import generate_random_data
from multi_engine_stack_ibis.utils import (MyExecutor, checkpoint_parquet,
                                           create_table_snowflake,
                                           replace_unbound)
from multi_engine_stack_ibis.connections import make_ibis_snowflake_connection



ibis.backends.pandas.executor.PandasExecutor = MyExecutor
setattr(ibis.expr.types.relations.Table, "checkpoint_parquet", checkpoint_parquet)
setattr(
    ibis.expr.types.relations.Table,
    "create_table_snowflake",
    create_table_snowflake,
)
ibis.set_backend("pandas")
p_staging = pathlib.Path("staging/staging.parquet")
p_landing = pathlib.Path("landing/orders.parquet")

snow_backend = make_ibis_snowflake_connection(database="MULTI_ENGINE", schema="PUBLIC", warehouse="COMPUTE_WH")
expr = (
  ibis.read_parquet(p_landing)
  .mutate(
      row_number=ibis.row_number().over(group_by=[_.order_id], order_by=[_.dt]))
  .filter(_.row_number == 0)
  .checkpoint_parquet(p_staging)
  .create_table_snowflake("T_ORDERS")
)
expr

Ibis expression prints itself as a plan that is akin to traditional Logical Plan in databases. A Logical Plan is a tree of relational algebra operators that describes the computation that needs to be performed. This plan is then optimized by the query optimizer and converted into a physical plan that is executed by the query executor. Ibis expressions are similar to Logical Plans in that they describe the computation that needs to be performed, but they are not executed immediately. Instead, they are compiled into SQL and executed on the backend when needed. Logical Plan is generally at a higher level of granularity than a DAG produced by a task scheduling framework like Dask. In theory, this plan could be compiled down to Dask's DAG.

While pandas is embedded and is just a pip install away, it still has much documented limitations with plenty of performance improvements left on the table. This is where the recent embedded databases like DuckDB fill the gap of packing the full punch of a SQL engine, with all of its optimizations and benefiting from years of research that is as easy to import as is pandas. In this world, at minimum we can delegate all relational and SQL parts of our pipeline in pandas to DuckDB and only get the processed data ready for complex user defined Python.

Now, we are ready to take our Ibisified code and compile our expression above to execute on arbitrary engines, to truly realize the write-once-run-anywhere paradigm: We have successfully decoupled our compute engine with the expression system describing our computation.

Multi-Engine Data Stack w/ Ibis

DuckDB pandas Snowflake

Let's break our expression above into smaller parts and have them run across DuckDB, pandas and Snowflake. Note that we are not doing anything once the data lands in Snowflake and just show that we can select the data. Instead, we are leaving that up to the user's imagination what is possible with Snowflake native features.

Notice our expression above is bound to the pandas backend. First, lets create an UnboundTable expression to not have to depend on a backend when writing our expressions.

Declarative Multi-Engine Data Stack with Ibis

schema = {
    "user_id": "int64",
    "dt": "timestamp",
    "order_id": "string",
    "quantity": "int64",
    "purchase_price": "float64",
    "sku": "string",
    "row_number": "int64",
}

first_expr_for = (
    ibis.table(schema, name="orders")
    .mutate(
        row_number=ibis.row_number().over(group_by=[_.order_id], order_by=[_.dt])
    )
    .filter(_.row_number == 0)
)
first_expr_for

Next, we replace the UnboundTable expression with the DuckDB backend and execute it with to_parquet method4. This step is covered by the checkpoint_parquet operator that we added to pandas backend above. Here is an excellent blog that discusses inserting data into Snowflake from any Ibis backend with to_pyarrow functionality.

data = pd.read_parquet("landing/orders.parquet")
duck_backend = ibis.duckdb.connect()
duck_backend.con.execute("CREATE TABLE orders as SELECT * from data")

bind_to_duckdb = replace_unbound(first_expr_for, duck_backend) 
bind_to_duckdb.to_parquet(p_staging)
to_sql = ibis.to_sql(bind_to_duckdb)
print(to_sql)

Once the above step creates the de-duplicated table, we can then send data to Snowflake using the pandas backend. This functionality is covered by create_table_snowflake operator that we added to pandas backend above.

second_expr_for = ibis.table(schema, name="T_ORDERS") # nothing special just a reading the data from orders table
snow_backend.create_table("T_ORDERS", schema=second_expr_for.schema(), temp=True)
pandas_backend = ibis.pandas.connect({"T_ORDERS": pd.read_parquet(p_staging)})
snow_backend.insert("T_ORDERS", pandas_backend.to_pyarrow(second_expr_for))

Finally, we can select the data from the Snowflake table to verify that the data has been loaded successfully.

third_expr_for = ibis.table(schema, name="T_ORDERS") # add you Snowflake ML functions here
third_expr_for

Declarative Multi-Engine Data Stack with Ibis

We successfully broke up our computation in pieces, albeit manually, and executed them across DuckDB, pandas, and Snowflake. This demonstrates the flexibility and power of a multi-engine data stack, allowing users to leverage the strengths of different engines to optimize their data processing pipelines.

Acknowledgments

I'd like to thank Neal Richardson, Dan Lovell and Daniel Mesejo for providing the initial feedback on the post. I highly appreciate the early review and encouragement by Wes McKinney.

Resources

  • The Road to Composable Data Systems
  • The Composable Codex
  • Apache Arrow
  • Multi-Engine Data Stack Newsleter v0 v1
  • Ibis, the portable dataframe library
  • dbt Docs
  • Dagster Docs
  • LanceDB
  • KuzuDB
  • DuckDB

  1. In this post, we have primarily focused on v0 of the multi-engine data stack. In the latest version, Apache Iceberg is included as a storage and data format layer. NYC Taxi data is used instead of the random Orders data treated in this and v0 of the posts.  ↩

  2. Orchestration Vs fine-grained scheduling: ↩

    • The orchestration is left to the reader. The orchestration can be done using a tool like Dagster, Prefect, or Apache Airflow.
    • The fine-grained scheduling can be done using a tool like Dask, Ray, or Spark.
  3. Some of the examples of in-process databases is described in this post extending DuckDB example above to newer purpose built databases like LanceDB and KuzuDB.  ↩

  4. The Ibis docs use backend.to_pandas(expr) commands to bind and run the expression in the same go. Instead, we use replace_unbound method to show a generic way to just compile the expression and not execute it to said backend. This is just for illustration purposes. All the code below, uses the backend.to_pyarrow methods from here on. ↩

版本聲明 本文轉載於:https://dev.to/letsql/declarative-multi-engine-data-stack-with-ibis-3015?1如有侵犯,請聯絡[email protected]刪除
最新教學 更多>
  • 棄用 `ereg_replace` 後,如何將多個空格替換為單一空格?
    棄用 `ereg_replace` 後,如何將多個空格替換為單一空格?
    用單一空格取代多個空格:棄用ereg_replace雖然使用ereg_replace 以單一空格取代多個空格看起來很簡單,但它已被棄用。因此,您在嘗試使用它時可能會遇到錯誤。本文提出了一種替代解決方案。 遷移到 preg_replace()要取代 ereg_replace,請切換到 preg_rep...
    程式設計 發佈於2024-11-06
  • 如何開始自由工作?
    如何開始自由工作?
    您是否正在努力透過 Upwork 賺錢?別擔心!我曾經去過那裡,我來這裡是為了分享我將這些掙扎轉化為成功的旅程。憑藉奉獻精神和正確的方法,您可以開始在 Upwork 上賺錢,並打開通往一系列機會的大門。讓我們深入探討最大化您的收入潛力的關鍵步驟。 掌握技巧遊戲 在 Upwork 上...
    程式設計 發佈於2024-11-06
  • 如何將 8 個字元作為打包單精度浮點數載入到 __m256 變數中?
    如何將 8 個字元作為打包單精度浮點數載入到 __m256 變數中?
    將8 個字元從記憶體載入到__m256 變數中作為打包單精度浮點數為了優化高斯模糊演算法,您尋求用__m256 內在變數取代浮點緩衝區的使用。本問題旨在確定此任務的最佳指令。 AVX2 架構說明:利用 PMOVZX 將字元零擴展為 32 位元256b 暫存器中的整數。 使用 VCVTDQ2PS 就地...
    程式設計 發佈於2024-11-06
  • 如何在Python中尋找字串中子字串的第n次出現?
    如何在Python中尋找字串中子字串的第n次出現?
    查找字串中某個子字串第n 次出現識別某個子字串第n 次出現對應的索引為各種程式場景中經常出現的任務。在Python中,沒有專門為此目的而設計的內建函數。但是,可以採用多種方法來實現此結果。 一種簡單的方法是使用循環來迭代字串併計算子字串的出現次數。起始索引被初始化為第一次出現的結果,循環繼續直到找到...
    程式設計 發佈於2024-11-06
  • 程式語言解釋
    程式語言解釋
    ? 注意:缩略图是使用生成的 ComfyUI 帮助下的 Flux Schnell 模型; 本文是在 NI - 自然智能 的帮助下撰写的 ? 没有时间阅读? - 观看我们创建的视频: 集会 ? 最初所有程序都是用二进制编写的 - 所谓的汇编 编程语言。你从字面上指示CPU在内做什么 ...
    程式設計 發佈於2024-11-06
  • Go Context — TODO() 與 Background() 不再令人困惑!
    Go Context — TODO() 與 Background() 不再令人困惑!
    在 Go 中,上下文包有助於管理請求範圍的值、取消訊號和截止日期。 啟動上下文的兩種常見方法是 context.TODO() 和 context.Background()。 儘管它們的行為相似,但它們的目的不同。 上下文.背景() 當您不需要任何特殊處理(例如取消或截止日期...
    程式設計 發佈於2024-11-06
  • 如何偵測 CMake 中的 C++11 編譯器支援?
    如何偵測 CMake 中的 C++11 編譯器支援?
    CMake 中C 11 編譯器支援的檢測概述在本指南中,我們探討自動偵測編譯器是否支援C 11 的方法CMake,提供對最新和以前CMake 版本的全面分析。 CMake 3.1.0 及更高版本CMake 3.1.0 版本引進了一個強大的功能:偵測編譯器。這是透過cmake_minimum_requ...
    程式設計 發佈於2024-11-06
  • 基於屬性的測試:深入探討現代測試方法
    基於屬性的測試:深入探討現代測試方法
    基于属性的测试是一种强大的测试方法,它侧重于软件的属性或特征,而不是特定的输入输出情况。与手动定义特定测试用例的传统测试不同,基于属性的测试会自动生成各种输入来验证某些属性是否始终成立。这种方法可以更广泛地探索潜在场景,使其成为发现隐藏错误并确保稳健的软件行为的有效方法。 测试方法的演变 从传统的...
    程式設計 發佈於2024-11-06
  • 為開發人員和安全團隊提供主動的 AppSec 持續漏洞管理
    為開發人員和安全團隊提供主動的 AppSec 持續漏洞管理
    现代软件开发环境中哪些日益增长的网络安全风险让 CISO 忙碌? 开发人员和安全团队面临着越来越多的威胁,从复杂的开源和供应商控制的供应链攻击到 AI 生成的代码引入的漏洞,例如提示注入和 GitHub Copilot 的代码安全性差。现代应用程序通常严重依赖开源组件(例如在 npm、PyPI 或 ...
    程式設計 發佈於2024-11-06
  • 如何使用 React 對 MeteorJS 中的 Bootstrap Spacing 實用程式類別進行故障排除?
    如何使用 React 對 MeteorJS 中的 Bootstrap Spacing 實用程式類別進行故障排除?
    在 Bootstrap 中使用間距實用程式類別在 Bootstrap 中,間距實用程式類別可讓您輕鬆控制元素周圍的間距。但是,如果您在使用它們時遇到問題,這裡有一個指南可以幫助您解決。 更新的間距語法(Bootstrap 4 和 5)Bootstrap 4引入了間距實用程式類別的簡化語法:邊距:m{...
    程式設計 發佈於2024-11-06
  • 如何在Python中設定子程序的工作目錄?
    如何在Python中設定子程序的工作目錄?
    如何在Python中設定子程序的工作目錄在Python中,subprocess.Popen()函數允許您在Py thon中執行指令子程序。一個常見的要求是指定子程序的工作目錄。 問題:如何使用 subprocess.Popen() 設定子程序的工作目錄? 答案:要指定工作目錄,請使用 subproc...
    程式設計 發佈於2024-11-06
  • Pandas 什麼時候創建視圖而不是副本?
    Pandas 什麼時候創建視圖而不是副本?
    Pandas 視圖與副本生成規則Pandas 在決定 DataFrame 上的切片操作是否產生視圖或結果時採用特定規則複製。透過了解這些規則,您可以優化資料操作並避免意外行為。 從始終產生副本的操作開始:所有操作,除了那些專門設計用於修改的操作就地 DataFrame,創建副本。 只有某些操作支援 ...
    程式設計 發佈於2024-11-06
  • 使用代理伺服器解鎖地理限制網站
    使用代理伺服器解鎖地理限制網站
    利用代理服务器绕过区域封锁是一种常用且有效的方法。代理服务器作为中介,可以隐藏用户的真实IP地址,使用户的请求看起来像是来自代理服务器的地理位置,从而绕过区域封锁。 使用代理服务器绕过区域封锁的关键步骤:‌‌ 选择合适的代理服务器‌:根据目标区域的网络环境和遮挡情况,选择覆盖该区域的...
    程式設計 發佈於2024-11-06
  • 如何為三角形中的線性漸變鋸齒線建立平滑邊緣?
    如何為三角形中的線性漸變鋸齒線建立平滑邊緣?
    為線性漸變鋸齒線創建平滑邊緣為了設計具有由兩個三角形形成的尖底的響應式圖像,開發人員在三角形線上遇到了意外的鋸齒狀邊緣。為了解決這個問題,我們探索了產生更平滑漸變過渡的策略。 雖然硬停止線性漸變影像中的顏色通常會導致鋸齒狀邊緣,但調整停止點和起始點可以緩解此問題。不要突然從一種顏色變為另一種顏色,而...
    程式設計 發佈於2024-11-06
  • Java 中「static」的魔力:一為所有,一切為一!
    Java 中「static」的魔力:一為所有,一切為一!
    老实说,当我们第一次遇到 static 关键字时,我们都会想:“这是什么魔法?” ?但别担心,我会用一种简单、深入、甚至有点有趣的方式来分解它! 想象一下你正在参加一个聚会?你和你所有的朋友都戴着帽子。但每个人都必须分享一顶帽子。这基本上就是 Java 中 static 关键字的作用!您不必为每个朋...
    程式設計 發佈於2024-11-06

免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。

Copyright© 2022 湘ICP备2022001581号-3