Correctness at Feldera

Correctness at Feldera

Lalith Suresh
Lalith SureshCEO / Co-Founder
| December 6, 2024

Feldera is a strongly consistent incremental compute engine, and is therefore considered core infrastructure where correctness is non-negotiable. The engine is used for financial and risk calculations, fraud and threat infrastructure, managing security policies, and more. These are contexts where we have to be extremely precise about what we compute and the promises we make.

In this blog post, we briefly describe our efforts and development processes that ensure the Feldera engine's correctness.

The traditional manual testing pyramid alone isn’t enough for Feldera, given the complex correctness surface. The various components include:

  • An incremental compute engine (DBSP) underneath Feldera, based on a novel theoretical foundation; its implementation (the dbsp crate) uses many new algorithms and carefully designed data structures.
  • Feldera supports arbitrarily complex SQL via our SQL frontend.
  • Feldera interacts with storage via several different backends.
  • Feldera also works with connectors to external systems outside of its direct control.
  • Feldera has a control plane that helps deploy and manage pipelines.

We are a small team and therefore have to rely on automation to do a lot of the heavy lifting with respect to correctness. In this post, we’ll cover some of the methods and tools we use towards that end.

Formally verifying the DBSP theory

DBSP forms Feldera’s "core". DBSP is an algorithm that can be used to automatically incrementalize arbitrarily complex queries. The beauty of DBSP is that it is composable: that is, if Feldera can incrementalize a primitive operator within a query, then the composition of that operator into a larger dataflow is always correct.

The VLDB 2023 paper contains a set of proofs, but most importantly, all the proofs were machine checked using the Lean Theorem Prover. To the best of our knowledge, Feldera remains the only IVM engine with a formally verified core and theory.

What does it mean to machine check the theory? At a high-level, DBSP defines streams of arbitrary types, and four primitive operators on streams: a lifting operator (which converts computations on values to computations on streams of values), the z^-1 (a.k.a the delay operator, which is the only operator that maintains state) and a pair of operators that can be used to compute fixed-points for recursive computations. Based on these primitives, DBSP defines operations like differentiation and integration over streams, linear operations, and gives a new definition of incrementalization. The paper shows how all operators of the relational algebra can be implemented using DBSP operators; it also describes how other operators necessary for implementing a practical language like SQL can be expressed on top of this theory.

All these concepts were formalized in the Lean theorem prover, and all the proofs from the paper were machine-checked. This gives us a high degree of confidence in the foundation of our state-of-the-art incremental compute engine. These theorems are in fact a concise and very precise specification of how the incremental engine is supposed to work.

The fact that the theory is right is very important: it is like the foundation of a skyscraper. But this does not guarantee that the implementation is right. What can we do to make sure the implementation, i.e., the instantiation of these ideas, is correct?

Differential testing of the implementation

Every testing methodology needs a “test oracle”. A test oracle is what tells you, given an input, what the correct output should be. With manual testing, both generating test cases and writing the oracle (typically hard coded assertions) don't scale well.

A significant bulk of our testing strategy is therefore to make use of differential testing. The core idea is to compare two different executions for the same input and compare the results produced by the runs for disparities. We typically call one run the implementation and the other the reference run. The best part of this approach is that it makes the oracle simple: just compare the outputs produced by the reference and implementation runs. Now all we need to do is get a vast corpus of test cases (generated or reused from standard suites).

This strategy offers a lot of flexibility! A reference run here can be many things:

  • A full-blown database like Postgres, SQLite, or DuckDB
  • A query engine like DataFusion
  • Known outputs from any of the above (i.e., golden traces or test cases)
  • two different runs of Feldera itself (e.g. a run with and without injected crashes to test our fault-tolerance layer)
  • SQL with and without optimizations
  • incremental and non-incremental query plans
  • A simplified model of a program

…and more.

We’ll talk about a few of these cases below, applied to two different facets of Feldera: the core query engine and the control plane.

Testing the Feldera query engine

Feldera is strongly consistent unlike Flink and its derivatives. At any given time, the state of a Feldera pipeline (all views) would be exactly what you’d see in a batch system with data at rest for the same prefix of inputs processed so far.

This concrete guarantee makes testing easy. We can have a convenient method to compare our implementation against reference outputs produced by other consistent database engines. (A corollary is that it is impossible to write such tests for systems that lack well-defined semantics or consistency models -- e.g., try writing a test to validate outputs in a Flink pipeline with lateness).

Our SQL compiler adapts a large corpus of test cases from other open-source projects (Postgres, MySql, Calcite). These tests help us validate everything from semantics around the type system (narrowing, widening, casts, overflows, and interplay with ternary logic), to behavior of aggregate functions, arrays, windows, intervals and more. Each test is made of queries and the expected results as displayed by the reference query engine. Here’s one of the smaller examples:

@Test
public void precisionLossTest() {
	this.qs("""
	SELECT CAST('36854775807.0'::float4 AS int64);

	int8
	-------------
	36854775808
	(1 row)""");
}

Feldera is based on Apache Calcite; no two SQL dialects are identical, so sometimes, these tests need to be adapted slightly. A consequence of our comprehensive testing here is that it helps us improve the Calcite project too!

DataFusion comparison. Feldera also supports running ad-hoc queries using DataFusion. While there are slight dialect differences between the DataFusion and Calcite engines, it does provide yet another convenient way to do differential testing. Here, we run a workload against a Feldera pipeline, and then check whether the views we incrementally maintain match the results from the matching ad-hoc queries against base tables. We use this pattern all the time in our internal QA platform and end-to-end tests.

SQL Logic Tests. We also regularly run the SQL Logic Tests (SLT) from the SQLite project. Each test presents a SQL query and an md5 checksum of the result. Feldera passed all 5M+ tests early in our project lifecycle.

These tests torture the SQL parser, so they’re a good way to find bugs in the kinds of expressions we accept. However, they do not have good coverage across types, aggregate functions and more.

SLT tests are expensive for us to run, given Feldera compiles from SQL to Rust and we compile the Rust program using rustc to run a query. We now run a nightly job that runs a deterministic subset from the SLT suite based on the day of the year.

DuckDB, Snowflake, BigQuery and more. The above options sometimes don’t support the full range of features our engine does. For example, they don't support As-Of joins, so we’ve also been running comparisons against DuckDB. In fact, one of our early users demoed Feldera to his team by showing a similar comparison with DuckDB. Broadly, Feldera supports useful functions borrowed from many other SQL dialects; these functions are validated using input-output pairs against other implementations (e.g., Snowflake, BigQuery, etc.)

SQL Lancer. SQL Lancer is a tool that generates databases and/or queries and provides different oracles to ensure correctness. We’ve recently begun using SQL Lancer to generate test cases as well and it’s already been useful to generate test cases that stress our parser. We've so far ran only the oracle that compares queries with and without optimization but that hasn't revealed any bugs yet -- this is likely because we already do such comparisons ourselves in our SQL compiler's tests.

Manually written tests. Despite all the above approaches, there is still a lot more to test! We still invest quite some time into manual testing via the traditional testing pyramid. One of the most useful pieces of that pyramid is a growing suite of end-to-end tests that exercise a complex test matrix across the different types and aggregates we support (e.g., checking for argmax over all types). All of these are validated for correctness against Postgres or other DBs, and the expected behavior noted alongside the test case itself. Here's a small example:

class aggtst_array_count(TstView):
    def __init__(self):
        # Validated on Postgres
        self.data = [{"count": 4}]
        self.sql = """CREATE MATERIALIZED VIEW array_count 
                      AS SELECT COUNT(*) AS count
                      FROM array_tbl"""

Testing the platform

We'll now discuss how we test Feldera's platform and not just the query engine layer.

Faulty vs fault-free runs. This is a classical type of differential testing used in the reliability literature, including some of my own work (e.g., the Sieve paper from OSDI 2022).

The idea is simple: run a workload twice, one where you’ve injected crashes into the execution, and another where you haven’t. Compare intermediate and end states, and if they’re not identical, you have a bug.

We use this approach to validate the fault tolerant pipelines feature we support in our Enterprise version. In fact, you can see a demo in the announcement blog for fault tolerance.

Model-based tests for the pipeline-manager. The pipeline-manager is the control plane through which users define, compile, run and manage pipelines. The pipeline-manager implements a REST API using Postgres as its backing store. It has an API server that issues SQL queries to marshal/unmarshal API objects to and from the database.

We recognized within the first 3 months of starting Feldera that evolving this API and manually writing tests by hand simply wouldn’t suffice. We quickly turned to model-based differential testing, which we got up and running in a week.

At the end of the day, the model of the Feldera API is quite simple. It’s a per-tenant map of a few API object types with some internal state machines. Different REST endpoints query or mutate this map. This model can be expressed without all the baggage of serialization/deserialization to-and-from Postgres or any SQL queries.

Next, for model-based differential testing, we use proptest in Rust to generate a random sequence of API events. Here’s a small slice of it.

enum StorageAction {
  // ... A lot omitted
  ListPipelines(TenantId),
  GetPipeline(TenantId, String),
  GetPipelineById(TenantId, PipelineId),
  NewPipeline(
	TenantId,
	#[proptest(strategy = "limited_uuid()")] Uuid,
	#[proptest(strategy = "limited_pipeline_descr()")]
    PipelineDescr,
  ),
  NewOrUpdatePipeline(
	TenantId,
	#[proptest(strategy = "limited_uuid()")] Uuid,
	#[proptest(strategy = "limited_pipeline_name()")] String,
	#[proptest(strategy = "limited_pipeline_descr()")]
    PipelineDescr,
  ),
  // ... A lot omitted
}

Then, we feed the same event trace into the "in-memory map" model of the API and the actual implementation involving Postgres, and compare the outputs at every step. Here's roughly the shape of that code:

for (i, action) in actions.into_iter().enumerate() {
  match action {
    // For every action type....
    StorageAction::NewPipeline(tenant_id, new_id, pipeline_descr) => {
    	create_tenants_if_not_exists(&model, &handle, tenant_id).await.unwrap();
    	let model_response = model.new_pipeline(tenant_id, new_id, pipeline_descr.clone()).await;
    	let impl_response = handle.db.new_pipeline(tenant_id, new_id, pipeline_descr.clone()).await;
    	check_response_pipeline(i, model_response, impl_response);
    }
  }
  // ....
}

This strategy has saved us a lot of time and energy hardening the API server. While it does make it a tad annoying when we have to update the model, it was quite effective at producing traces that found real bugs early in the process.

Fuzzing using proptest

As you saw in the previous section, we also make extensive use of fuzzing using the proptest crate in our Rust tests. These come in handy not only for testing the control plane but also internals like our connectors and the DBSP crate itself.

The best part about proptest is reproducibility. When a test fails, it outputs a convenient line you can paste into a proptest-regressions file, which allows anyone else to reproduce that particular execution in the future. For example:

cc c0ddd7741bf1667b3d70ea81ca4c298875235277256ee70f1fe2b6a9a50303ba # shrinks to [NewPipeline(TenantId(03000000-0000-0000-0000-000000000000), 03000000-0000-0000-0000-000000000000, PipelineDescr { name: "pipeline-0", description: "", runtime_config: RuntimeConfig { workers: 0, storage: false, cpu_profiler: false, tracing: false, tracing_endpoint_jaeger: "", min_batch_size_records: 0, max_buffering_delay_usecs: 0, resources: ResourceConfig { cpu_cores_min: None, cpu_cores_max: None, memory_mb_min: None, memory_mb_max: None, storage_mb_max: None, storage_class: None }, min_storage_bytes: None }, program_code: "", program_config: ProgramConfig { profile: Some(Optimized) } }), TransitProgramStatusToCompilingSql(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1)), TransitProgramStatusToCompilingRust(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1), ProgramInfo { schema: ProgramSchema { inputs: [], outputs: [] }, input_connectors: {}, output_connectors: {} }), TransitProgramStatusToSuccess(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1), ""), TransitDeploymentStatusToFailed(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), ErrorResponse { message: "This is an example error response", error_code: "SomeExampleError", details: Object {"extra-info": Number(0)} }), TransitProgramStatusToCompilingSql(TenantId(03000000-0000-0000-0000-000000000000), PipelineId(03000000-0000-0000-0000-000000000000), Version(1))]

These failures then become part of our regression suite going forward. And over time, we see the number of these failures plateau (which is great!). For example, the last time we had a proptest failure in the pipeline-manager was around the time we made a major simplification to the API, four months ago.

Conclusions

With great power comes great responsibility. We owe it to our users to have a platform they can trust to build their business on. As a team with a long history working on formal methods and system reliability, we’re bringing a lot of those ideas to the Feldera project and surgically applying them to scale our correctness efforts. We’ve found these quite effective at catching bugs during development time, long before they have a chance to manifest in production.

Other articles you may like

Incremental Update 6 at Feldera

We’re excited to announce the release of v0.26, which represents a significant step forward for Feldera. This release includes over 200 commits, adding 25,000 lines of new code and documentation. Let's dive into the highlights!

Database computations on Z-sets

How can Z-sets be used to implement database computations

Incremental Update 5 at Feldera

A quick overview of what's new in v0.25.