I recently came across the Ju Data Engineering Newsletter by Julien Hurault on the multi-engine data stack. The idea is simple; we'd like to easily port our code across any backend while retaining the flexibility to grow our pipeline as new backends and features are developed. This entails in at least the following high-level workflows:
In this post, we dive into how we can implement the multi-engine pipeline from a programming language; Instead of SQL, we use propose using a Dataframe API that can be used for both interactive and batch use-cases. Specifically, we show how to break up our pipeline into smaller pieces and execute them across DuckDB, pandas, and Snowflake. We also discuss the advantages of a multi-engine data stack and highlight emerging trends in the field.
The code implemented in this post is available on GitHub^[In order to quickly try out repo, I also provide a nix flake]. The reference work in the newsletter with original implementation is here.
Multi-engine data stack pipeline works as follows: Some data lands in an S3 bucket, gets preprocessed to remove any duplicates and then loaded into a Snowflake table, where it is transformed further with ML or Snowflake specific functions^[Please note we do not go into implementing the types of things that might be possible in Snowflake and assume that as a requirement for the workflow]. The pipeline takes orders as parquet files that get saved into landing location, are preprocessed and then stored at the staging location in an S3 bucket. The staging data is then loaded in Snowflake to connect downstream BI tools to it. The pipeline is tied together by SQL dbt with one model for each backend and the newsletter chooses Dagster as the orchestration tool.
Today, we are going to dive into how we can convert our pandas code to Ibis expressions, reproducing the complete example for Julien Hurault's multi engine stack example 1. Instead of using dbt Models and SQL, we use ibis and some Python to compile and orchestrate SQL engines from a shell. By rewriting our code as Ibis expressions, we can declaratively build our data pipelines with deferred execution. Moreover, Ibis supports over 20 backends, so we can write code once and port our ibis.exprs to multiple backends. To further simplify, we leave scheduling and task orchestration2 provided by Dagster, up to the reader.
Here are the core concepts of the multi-engine data stack as outlined in Julien's newsletter:
While the pipeline above is nice for ETL and ELT, sometimes we want the power of a full programming language instead of a Query Language like SQL e.g. debugging, testing, complex UDFs etc. For scientific exploration, interactive computing is essential as data scientists need to quickly iterate on their code, visualize the results, and make decisions based on the data.
DataFrames are such a data structure: DataFrames are used to process ordered data and apply compute operations on it in an interactive manner. They provide the flexibility to be able to process large data with SQL style operations, but also provides lower level control to edit cell level changes ala Excel Sheets. Typically, the expectation is that all data is processed in-memory and typically fits in-memory. Moreover, DataFrames make it easy to go back and forth between deferred/batch and interactive modes.
DataFrames excel^[no pun intended] at enabling folks to apply user-defined functions and releases a user from the limitations of SQL i.e. You can now re-use code, test your operations, easily extend relational machinery for complex operations. DataFrames also make it easy to quickly go from Tabular representation of data into Arrays and Tensors expected by Machine Learning libraries.
Specialized and in-process databases e.g. DuckDB for OLAP3, are blurring the boundary between a remote heavy weight database like Snowflake and an ergonomic library like pandas. We believe this is an opportunity for allowing DataFrames to process larger than memory data while maintaining the interactivity expectations and developer feel of a local Python shell, making larger than memory data feel small.
Our implementation focuses on the 4 concepts presented earlier:
pandas is the quintessential DataFrame library and perhaps provides the simplest way to implement the above workflow. First, we generate random data borrowing from the implementation in the newsletter.
#| echo: false import pandas as pd from multi_engine_stack_ibis.generator import generate_random_data generate_random_data("landing/orders.parquet")
df = pd.read_parquet("landing/orders.parquet") deduped = df.drop_duplicates(["order_id", "dt"])
The pandas implementation is imperative in style and is designed so the data that can fit in memory. The pandas API is hard to compile down to SQL with all its nuances and largely sits in its own special place bringing together Python visualization, plotting, machine learning, AI and complex processing libraries.
pt.write_pandas( conn, deduped, table_name="T_ORDERS", auto_create_table=True, quote_identifiers=False, table_type="temporary" )
After de-duplicating using pandas operators, we are ready to send the data to Snowflake. Snowflake has a method called write_pandas that comes in handy for our use-case.
One pandas limitation is that it has its own API that does not quite map back to relational algebra. Ibis is such a library that's literally built by people who built pandas to provide a sane expressions system that can be mapped back to multiple SQL backends. Ibis takes inspiration from the dplyr R package to build a new expression system that can easily map back to relational algebra and thus compile to SQL. It also is declarative in style, enabling us to apply database style optimizations on the complete logical plan or the expression. Ibis is a key component for enabling composability as highlighted in the excellent composable codex.
#| echo: false import pathlib import ibis import ibis.backends.pandas.executor import ibis.expr.types.relations from ibis import _ from multi_engine_stack_ibis.generator import generate_random_data from multi_engine_stack_ibis.utils import (MyExecutor, checkpoint_parquet, create_table_snowflake, replace_unbound) from multi_engine_stack_ibis.connections import make_ibis_snowflake_connection ibis.backends.pandas.executor.PandasExecutor = MyExecutor setattr(ibis.expr.types.relations.Table, "checkpoint_parquet", checkpoint_parquet) setattr( ibis.expr.types.relations.Table, "create_table_snowflake", create_table_snowflake, ) ibis.set_backend("pandas") p_staging = pathlib.Path("staging/staging.parquet") p_landing = pathlib.Path("landing/orders.parquet") snow_backend = make_ibis_snowflake_connection(database="MULTI_ENGINE", schema="PUBLIC", warehouse="COMPUTE_WH")
expr = ( ibis.read_parquet(p_landing) .mutate( row_number=ibis.row_number().over(group_by=[_.order_id], order_by=[_.dt])) .filter(_.row_number == 0) .checkpoint_parquet(p_staging) .create_table_snowflake("T_ORDERS") ) expr
Ibis expression prints itself as a plan that is akin to traditional Logical Plan in databases. A Logical Plan is a tree of relational algebra operators that describes the computation that needs to be performed. This plan is then optimized by the query optimizer and converted into a physical plan that is executed by the query executor. Ibis expressions are similar to Logical Plans in that they describe the computation that needs to be performed, but they are not executed immediately. Instead, they are compiled into SQL and executed on the backend when needed. Logical Plan is generally at a higher level of granularity than a DAG produced by a task scheduling framework like Dask. In theory, this plan could be compiled down to Dask's DAG.
While pandas is embedded and is just a pip install away, it still has much documented limitations with plenty of performance improvements left on the table. This is where the recent embedded databases like DuckDB fill the gap of packing the full punch of a SQL engine, with all of its optimizations and benefiting from years of research that is as easy to import as is pandas. In this world, at minimum we can delegate all relational and SQL parts of our pipeline in pandas to DuckDB and only get the processed data ready for complex user defined Python.
Now, we are ready to take our Ibisified code and compile our expression above to execute on arbitrary engines, to truly realize the write-once-run-anywhere paradigm: We have successfully decoupled our compute engine with the expression system describing our computation.
Let's break our expression above into smaller parts and have them run across DuckDB, pandas and Snowflake. Note that we are not doing anything once the data lands in Snowflake and just show that we can select the data. Instead, we are leaving that up to the user's imagination what is possible with Snowflake native features.
Notice our expression above is bound to the pandas backend. First, lets create an UnboundTable expression to not have to depend on a backend when writing our expressions.
schema = { "user_id": "int64", "dt": "timestamp", "order_id": "string", "quantity": "int64", "purchase_price": "float64", "sku": "string", "row_number": "int64", } first_expr_for = ( ibis.table(schema, name="orders") .mutate( row_number=ibis.row_number().over(group_by=[_.order_id], order_by=[_.dt]) ) .filter(_.row_number == 0) ) first_expr_for
Next, we replace the UnboundTable expression with the DuckDB backend and execute it with to_parquet method4. This step is covered by the checkpoint_parquet operator that we added to pandas backend above. Here is an excellent blog that discusses inserting data into Snowflake from any Ibis backend with to_pyarrow functionality.
data = pd.read_parquet("landing/orders.parquet") duck_backend = ibis.duckdb.connect() duck_backend.con.execute("CREATE TABLE orders as SELECT * from data") bind_to_duckdb = replace_unbound(first_expr_for, duck_backend) bind_to_duckdb.to_parquet(p_staging) to_sql = ibis.to_sql(bind_to_duckdb) print(to_sql)
Once the above step creates the de-duplicated table, we can then send data to Snowflake using the pandas backend. This functionality is covered by create_table_snowflake operator that we added to pandas backend above.
second_expr_for = ibis.table(schema, name="T_ORDERS") # nothing special just a reading the data from orders table snow_backend.create_table("T_ORDERS", schema=second_expr_for.schema(), temp=True) pandas_backend = ibis.pandas.connect({"T_ORDERS": pd.read_parquet(p_staging)}) snow_backend.insert("T_ORDERS", pandas_backend.to_pyarrow(second_expr_for))
Finally, we can select the data from the Snowflake table to verify that the data has been loaded successfully.
third_expr_for = ibis.table(schema, name="T_ORDERS") # add you Snowflake ML functions here third_expr_for
We successfully broke up our computation in pieces, albeit manually, and executed them across DuckDB, pandas, and Snowflake. This demonstrates the flexibility and power of a multi-engine data stack, allowing users to leverage the strengths of different engines to optimize their data processing pipelines.
I'd like to thank Neal Richardson, Dan Lovell and Daniel Mesejo for providing the initial feedback on the post. I highly appreciate the early review and encouragement by Wes McKinney.
In this post, we have primarily focused on v0 of the multi-engine data stack. In the latest version, Apache Iceberg is included as a storage and data format layer. NYC Taxi data is used instead of the random Orders data treated in this and v0 of the posts. ↩
Orchestration Vs fine-grained scheduling: ↩
Some of the examples of in-process databases is described in this post extending DuckDB example above to newer purpose built databases like LanceDB and KuzuDB. ↩
The Ibis docs use backend.to_pandas(expr) commands to bind and run the expression in the same go. Instead, we use replace_unbound method to show a generic way to just compile the expression and not execute it to said backend. This is just for illustration purposes. All the code below, uses the backend.to_pyarrow methods from here on. ↩
免責聲明: 提供的所有資源部分來自互聯網,如果有侵犯您的版權或其他權益,請說明詳細緣由並提供版權或權益證明然後發到郵箱:[email protected] 我們會在第一時間內為您處理。
Copyright© 2022 湘ICP备2022001581号-3