Your Streaming Analytics Use Case Needs More State Than You Think

Leonid Ryzhyk
Leonid RyzhykCTO / Co-Founder
| July 11, 2024

There are two types of streaming users: those who believe their workload fits in memory and those who already know it won't. In this blog, we

  • Explain why streaming systems need to maintain significant internal state—a fact often overlooked by users. Since the size of the internal state can exceed the size of the input data, state management is a crucial component of a streaming analytics platform, and in most cases it will require support from a secondary storage system.
  • Offer high-level guidelines for evaluating the storage layer when choosing the right streaming platform.
  • Outline how we solve the state management problem at Feldera—a subject we'll explore in greater depth in upcoming blog posts.

The key to the topic lies in understanding streaming analytics for what it truly is: a real-time incremental view maintenance problem.

Streaming analytics = incremental view maintenance

Streaming analytics operates on streaming data. Shocking, I know. Let's delve deeper and ask ourselves what this data truly represents. The answer is that streaming data represents **changes** to some collections. Streaming queries are defined over the entire collection, not just the latest changes. For example, a query that counts the number of credit card transactions per user relies on the entire transaction history, not just individual transactions.

Thus, streaming analytics maintains a set of queries over changing data, continuously transforming input changes into output updates. The problem of maintaining standing queries over changing data is well known in the database world as incremental view maintenance (IVM). Essentially, streaming analytics is IVM with the added complexity that inputs can arrive continuously at high speed and must be processed with low latency.

Incremental view maintenance requires state

All IVM algorithms have one thing in common: they work by memorizing results of previous computations and using them to update the output of the query for new inputs without full recomputation (i.e., incrementally). Thus, **IVM is fundamentally a space-time tradeoff**. Take for example an equi-join of two tables:

CREATE VIEW transaction_with_user AS
SELECT
    transactions.*,
    users.id as user_id,
    users.age
FROM
    transactions JOIN users
ON users.id = transactions.user_id;

Whenever a new transaction arrives, the IVM engine must find the matching row in the users table, which requires storing the latest snapshot of this table indexed by the id field.

Incremental join operator: whenever a new transaction arrives, the IVM engine must find the matching row in the users table, which requires storing the latest snapshot of this table indexed by the id field.

Let's use the view defined above to compute transaction counts per user:

CREATE VIEW transactions_per_user AS
SELECT
 COUNT(*)
FROM transaction_with_user
GROUP BY user_id;

To evaluate this query incrementally without re-computing transaction counts from scratch every time, we must maintain the latest counts per user. Thus not only the input tables, but also outputs of intermediate sub-queries may need to be materialized and stored.

There are classes of queries that do not require any state. For example, the following query can be implemented by applying the filter condition to each new input record:

CREATE VIEW over_21 AS
SELECT *
FROM users
WHERE age >= 21;

However, `JOINs, DISTINCTs, and aggregates in general require state. As the user builds more complex pipelines with many nested queries, the amount of state grows, potentially surpassing the size of input tables and the size of available memory by a large factor.

A goldfish or an elephant: how good is your streaming engine at keeping state?

In light of this, the ability to maintain state at scale is a key factor in
picking a streaming analytics platform for your application. Here are some
guidelines to help you make an informed decision:

Focus on storage-intensive benchmarks

Benchmarking is a common way to measure and report software performance. However, benchmarks are only useful when they are representative of real-world workloads, and can be misleading otherwise. We often see streaming engines evaluated using queries similar to this one:

-- Count the number of users per country.
SELECT COUNT(*) FROM users GROUP BY country;

This query requires maintaining a grand total of 195 counters (one for each country in the world). Thus, achieving high throughput on it merely proves that computers are good at counting.

While it is possible that your streaming use case fits this pattern, most workloads that we encounter in practice contain many joins and aggregations, evaluated over big data, and require maintaining state that doesn't fit in memory.

Therefore, when evaluating a streaming product, it helps to understand how it behaves on storage-intensive benchmarks and how its performance evolves as the size of its internal state reaches and exceeds available memory.

Focus on steady-state performance

Streaming platforms often operate continuously for weeks or months, making their long-term steady state performance more relevant than the performance on the first N records. Evaluating streaming analytics engines with long-running benchmarks, in particular tracking how their storage footprint changes over time, provides a clearer understanding of their behavior under continuous load.

Build a PoC based on your workload

One key takeaway from the above discussion is that the state footprint of a streaming pipeline depends on the queries it executes and the input payload. Benchmarking alone cannot reliably predict system performance for specific real-world workloads. Therefore at Feldera, we encourage users to develop a proof-of-concept (PoC) prototype using data and queries that simulate their actual workloads. We take it a step further by collaborating with new users to build this PoC together. You're invited to try this free service—click here to schedule an intro call with our team.

How Feldera manages state

At Feldera, our goal is to build a streaming analytics platform that is as powerful and easy to use as traditional batch analytics tools. This means running complex SQL queries over high-volume data streams. You can't get there without solving the state management problem. In this section we outline our solution, including:

  • A high-performance storage engine
  • A query optimizer
  • A garbage collector that automatically detects and deallocates unused state

The Feldera storage engine

The Feldera storage engine enables us to ingest, store, and process data many times larger than memory. Like traditional databases, it stores indexed relational data, but it is specifically designed to support low-latency, high-throughput ingestion and lookup for streaming analytics use cases. It is built from the ground up to support data access patterns typical of IVM workloads. We are currently preparing a detailed design overview of the Feldera storage engine, but here are some key points:

  • The engine is based on the log-structured merge-tree (LSM) data structure for fast ingest and lookup
  • Built in Rust using the ultra-high-performance rkyv (de)serialization framework
  • Co-designed with DBSP, our high-performance incremental query evaluation runtime
  • Supports shared-nothing parallelism
  • Uses memory as a fast cache for on-disk indexes

The query optimizer

The need to maintain state is fundamental to incremental query evaluation. No amount of optimization can convert a stateful pipeline into a stateless one in general. However, a smart query optimizer can greatly reduce the amount of state that must be stored. This is a massive topic that we don't cover in any depth here. Let me leave you with one simple example instead. By moving the filter condition before the join in the following query Feldera only needs to store users who are 21 years or older rather than all users:

CREATE VIEW transaction_with_user AS
SELECT
    transactions.*,
    users.id as user_id,
    users.age
FROM
    transaction JOIN users
ON users.id = transactions.user_id
WHERE
    users.age >= 21;

Garbage collection for time series analytics

Time series analysis is among the main applications of streaming analytics, and also the most challenging one when it comes to maintaining state. Time series (financial transactions, software traces, IoT telemetry data, etc.) are essentially append-only tables that grow over time. Implemented naively, they require unbounded storage. Fortunately, in most practical use cases this can be avoided thanks to specific properties of time series data and queries, namely:

  • Time series data points usually arrive approximately in-order, meaning inputs to time series queries don't get delayed or reordered by more than a fixed amount of time.
  • Time series queries usually operate on recent data rather than the entire history of the input stream.

These two properties allow for automatic deallocation of state that can no longer affect query outputs. Feldera is the first streaming analytics engine to implement this key optimization with precise semantics and strong correctness guarantees:

  • User Annotations: The user specifies the upper bound on how much events in each input time series can get delayed.
  • Static Dataflow Analysis: The Feldera SQL compiler performs static
    dataflow analysis to determine temporal bounds on the state that must be
    stored for each operator.
  • Runtime Compaction: Background data compaction threads deallocate state older than these bounds.

This mechanism is sound: assuming user annotations are accurate, it guarantees that only state that doesn't affect the output of the system is deallocated.

Takeaways

Overlooking the storage requirements of your continuous analytics pipeline is one of the biggest mistakes you can make in choosing a streaming query engine. Here are a few things you need to remember to avoid this mistake:

  • JOINs, DISTINCTs, and aggregates generally need state proportional to the size of their inputs. Complex queries with multiple joins and aggregates may require storage that manyfold exceeds the available memory.
  • Time series queries typically require state proportional to the size of the time window they operate on.
  • The amount of state that must be maintained by the pipeline is a function of the queries it executes and input payload. Therefore, when evaluating a streaming query engine, use data and queries that simulate your actual workload, and run them for a long time to monitor their memory and storage usage.
Dot Pattern
alt text

Questions? Join our Slack community to discuss this blog post.

Other articles you may like

Incremental Update 6 at Felderaincremental-update

Incremental Update 6 at Feldera

We’re excited to announce the release of v0.26, which represents a significant step forward for Feldera. This release includes over 200 commits, adding 25,000 lines of new code and documentation. Let's dive into the highlights!

Database computations on Z-sets

How can Z-sets be used to implement database computations

Incremental Update 5 at Felderareleaseincremental-update

Incremental Update 5 at Feldera

A quick overview of what's new in v0.25.