Skip to main content

Meet the Feldera storage engine

5 August, 2024

Ben Pfaff

Co-Founder

Introduction

By popular demand, here is a blog post about Feldera's storage design at last!

Incremental computation is fundamentally a time-space tradeoff. That is, Feldera maintains state that allows it to update views by only looking at new changes and completely avoid recomputing over historical data. That means that Feldera might need to keep a large amount of state around for later use, because of operations like joins, certain aggregates and the DISTINCT operator. In many cases, that state can grow beyond the amount of memory that users can affordably populate in their cloud deployments. Therefore, Feldera needs to efficiently support using disks and SSDs for the data that it uses for incremental computation. In the context of Feldera, we call this feature simply "storage".

Requirements

We needed some features beyond just the ability to store data:

  • Efficient lookup for keys and ranges of keys. Many of our internal operators work on ranges of keys and expect to be able to access sequential keys quickly.

  • Fast checkpointing. For fault tolerance, Feldera needs to be able to quickly persist all of the tables in a pipeline. We also needed a simple form of versioning for tables, either built-in or layered over storage.

  • Integration with Rust, the language that Feldera is implemented in, including the ability to store data aligned for in-place access.

  • Confidence in design and implementation maturity.

  • Performance.

With these requirements in mind, we considered existing storage solutions, primarily key-value embedded databases:

  • RocksDB. We experimented extensively with RocksDB, integrating it into our code base, but we rejected it for reasons already explained in a prior blog post.

  • FoundationDB. We were eager for FoundationDB to work, since as a distributed database it would have given Feldera fault tolerance against hardware failure "for free", but we rejected it for some of the same reasons as RocksDB, including data alignment difficulties.

  • Other solutions, including some Rust crates that we decided were not mature or performant enough.

Ultimately, we did not find an existing storage solution that was a good fit for Feldera.

Rolling our own

Since we could not find a suitable pre-existing storage system, the remaining option was to build our own. Building a database, even a key-value embedded database, is a substantial endeavor, so we did not make this choice lightly. However, Feldera's internal architecture made some aspects of the job easier:

  • Log-structured tables. Batches of changes, including insertions, deletions, and updates, are incrementally added to tables, and later merged in background threads.

  • Shared-nothing architecture. Each Feldera worker thread processes an independent stream of data, so the storage layer can be per-thread as well.

For a choice of basic data structures, our requirement for reading ranges of keys led us to a tree-based storage approach. In particular, we adopted a design based on log-structured merge-trees (LSM trees), the data structure that underpins RocksDB and many other databases.

An LSM tree is particularly appropriate for Feldera because it so closely resembles the data structure that it already uses for in-memory storage, called a "spine". A spine, like an LSM tree, is a collection of sorted runs, also called "batches". Adding data to either data structure adds another run. Thus, we were able to convert our in-memory spines into on-storage LSM-like trees by implementing a batch type that was stored on disk rather than in memory.

For this purpose, we implemented our own B-tree like file format. The Feldera spine writes each batch of substantial size to an individual file. Batches inside Feldera are always created in sorted order, which allowed us to write these files sequentially without any seeks and with minimal in-memory buffering. Because batches are never modified in-place, the file format and the code that implements it does not need to make allowances for adding or removing or modifying data.

Searching a spine, or an LSM tree, requires searching all the runs, which means that the number of runs is key to performance. Thus, these data structures support ways to reduce the number of runs, by merging existing runs. Merging runs requires only sequentially reading each of them, which is efficient both in memory and on storage.

Merging can benefit size as well as search performance, because with the Z sets that Feldera uses, identical records merge by adding weights instead of producing duplicates, or even "cancel out" entirely if their weights add to zero. In addition, if the associated query only works with recent data, a merge can delete older data that can no longer influence the results.

Development experience

We started prototyping our storage system in December. It took past the end of January before the first version was ready for testing. After that, we had to revise the approach a number of times along a few different axes.

One axis was flexibility inside Feldera in terms of which batches were kept in memory versus storage. In Feldera, operators pass batches from one to another as streams, with the batch type determined at compile time. We initially implemented new on-storage "file" batches alongside the existing memory-based "vector" batches, which fixed the location of a batch based on a choice made at compile time. This proved too inflexible, and soon we implemented "fallback" batches that internally encapsulated either a "file" or "vector" batch at runtime. This allowed us to easily adjust policies on where to store data.

Another axis was performance. Our storage layer initially had poor performance, with some queries running 100× slower when storage was enabled. Some of this was due to too much movement between disk and memory because of the inflexible location, which we fixed as previously described. A lot of it was due to a poor caching layer, which we refined and improved over time. Another reason was that we had little intuition on which parts of the storage API were likely to be the most performance-sensitive, which we started to optimize by profiling queries that we cared about. We also found some unexpected performance costs, such as high locking overhead in the Rust crate for tracking metrics! With our improvements, some queries now run almost as fast with storage as without.

Correctness, on the other hand, we have not so far found particularly difficult to achieve. The Feldera storage format, and the reader and writer code for it, is quite simple, much simpler than a general-purpose key-value database. This made it straightforward to write good tests.

Results

As a result of our work on storage, Feldera can now run queries that store data using 25% to 75% less memory, for the queries that use the most memory. Queries that use substantial storage do run up to 2× to 3× slower. We're making great improvements in storage every week, so users can expect the typical performance penalty to decrease over time.

We often use queries from the Nexmark benchmarks as part of our optimization work. These queries are good examples because they can be considered engine-neutral and exist for several streaming engines. The following table shows how, for 100 million events, Feldera's performance and memory use compares with and without storage. The results fall into three groups:

  • Group 1, the bulk of our queries. These queries use substantial amounts of memory (10 GiB or more) without storage. For most of these queries, enabling storage reduces memory use by over 50% (the outlier is q17, which benefits only 19%). Enabling storage typically has a performance cost, from 2% up to 56%.

  • Group 2, containing q0, q1, and q2. These queries use hardly any memory (under 1 GiB) whether storage is enabled or not, because they do not store any state. As a results, they also run in about the same amount of time regardless of whether storage is enabled.

    Some of these queries use more memory with storage enabled, up to a factor greater than 2×, but the total memory use is still 1 GiB or less, so it's an insignificant difference.

    These queries also take about the same amount of runtime regardless of whether storage is enabled.

  • Group 3, containing q3, q8, and q10. These queries use small amounts of memory, which increase modestly when storage is enabled. The current Feldera policies for spilling data from memory to disk do not help these queries use less memory, but this may not be an issue since they use little anyhow.

groupqueryruntimepeak memory usage
storagediffstoragediff
disabledenableddisabledenabled
1q462.3 s97.0 s1.56×37.0 GiB18.2 GiB0.49×
q792.7 s133.5 s1.44×26.7 GiB12.8 GiB0.48×
q9122.7 s124.7 s1.02×114.2 GiB47.1 GiB0.41×
q15208.6 s325.6 s1.56×10.0 GiB4.0 GiB0.41×
q1735.7 s49.0 s1.37×15.8 GiB12.9 GiB0.81×
q1859.2 s62.3 s1.05×50.6 GiB23.1 GiB0.46×
q1961.2 s57.2 s0.93×51.6 GiB25.3 GiB0.49×
q2055.1 s57.1 s1.04×40.3 GiB18.0 GiB0.45×
2q039.8 s37.7 s0.95×0.3 GiB0.3 GiB1.01×
q138.7 s37.7 s0.97×0.2 GiB0.3 GiB1.06×
q239.7 s40.7 s1.03×0.2 GiB0.2 GiB0.96×
3q326.5 s29.6 s1.12×3.7 GiB4.3 GiB1.16×
q826.5 s23.4 s0.88×2.5 GiB2.7 GiB1.08×
q1031.6 s39.7 s1.26×0.4 GiB1.0 GiB2.65×

It's also worth looking at how performance and memory scale when we go farther, to 200 million events:

  • For group 1, with storage disabled, we would expect to see memory usage double and, with storage enabled, remain flat. This is approximately the case for q15, q17, q18, and q19.

    Queries q4 and q7 did better than that, using about the same absolute amount of peak memory (even a little less) with 200 million events as with 100 million. This suggests that Feldera's SQL engine was able to make use of "lateness" with these queries to discard intermediate data when it was too old to influence future results. To test that, we reran these queries with the lateness optimization disabled. The runtime increased significantly in all cases. With storage disabled, peak memory approximately tripled, whereas with storage enabled it hardly changed. This evidence supports our hypothesis, since lateness allows less data to be processed, speeding up processing and reducing intermediate data volume.

    Query q9 demonstrated unexpected behavior where memory usage increased with storage but not without. We haven't explored the reason for this yet.

  • Group 2 maintained its performance and minimal memory use, as one would expect.

  • In group 3, memory use and runtime roughly scaled with data volume, which suggests that the Feldera storage policies don't apply differently for 200 million events than for 100 million in these cases.

groupqueryruntimepeak memory usage
storagediffstoragediff
disabledenableddisabledenabled
1q4122.3 s160.2 s1.31×31.0 GiB21.5 GiB0.69×
q7180.2 s392.1 s2.18×24.9 GiB12.2 GiB0.49×
q9191.1 s270.9 s1.42×122.9 GiB91.6 GiB0.75×
q15754.8 s1221.7 s1.62×20.2 GiB6.3 GiB0.31×
q1782.6 s96.9 s1.17×26.7 GiB13.8 GiB0.52×
q18132.8 s168.2 s1.27×111.8 GiB28.1 GiB0.25×
q19143.9 s166.3 s1.16×111.4 GiB30.2 GiB0.27×
q20128.6 s118.4 s0.92×83.9 GiB28.2 GiB0.34×
2q074.3 s75.3 s1.01×0.3 GiB0.3 GiB0.92×
q178.4 s79.4 s1.01×0.2 GiB0.3 GiB1.07×
q296.7 s87.5 s0.90×0.2 GiB0.2 GiB1.02×
3q354.0 s54.0 s1.00×6.1 GiB7.5 GiB1.22×
q850.9 s43.8 s0.86×3.8 GiB3.7 GiB0.97×
q1085.4 s88.5 s1.04×2.2 GiB2.3 GiB1.05×

Conclusion

With storage enabled, the Feldera stream processing engine can run queries that store large amounts of data using far less memory than otherwise needed. Storage comes with a performance penalty, but that penalty will continue to decline as optimization work continues.