Analyzing query performance in Feldera

Analyzing query performance in Feldera

Ben Pfaff
Ben PfaffChief Engineer / Co-Founder
| August 24, 2024

In this post, I'll show you the process we go through to troubleshoot Feldera's performance. I'll show you how we inspect the internal dataflow graph for query execution, and how we sleuth around to diagnose and fix performance issues.

Let's look at one of the queries in Nexmark, a benchmark often used to compare the performance of streaming engine. In particular, I noticed that Nexmark query q12 ran very slowly: for 100 million input events, it took almost 3 minutes whereas most other queries took 20 to 60. This query was implemented as:

CREATE VIEW Q12 AS
SELECT
    B.bidder,
    count(*) as bid_count,
    TUMBLE_START(B.p_time, INTERVAL '10' SECOND) as starttime,
    TUMBLE_END(B.p_time, INTERVAL '10' SECOND) as endtime
FROM (SELECT *, NOW() as p_time FROM bid) B
GROUP BY B.bidder, TUMBLE(B.p_time, INTERVAL '10' SECOND);

This query is a simple query that only uses one table, the bid table, counting the bids that a user makes within fixed windows of processing time. I found this odd, because nothing here seemed like it should run slowly.

Profile Graphs

I ran the query again and, this time, I used the Feldera circuit_profile API to obtain a profile graph mid-run, retrieving it via a command like:

curl 'http://localhost:8080/v0/pipelines/nexmark-q9/circuit_profile' -H 'accept: application/zip' -o profile.zip

The profile.zip that this outputs visualizes how data flows through Feldera internally, with performance data for each worker thread. For this query, the profile for thread 0 looked like the one below, scaled down from the full profile. Each operator node is highlighted according to the amount of CPU time it used:

Circuit profile

I noticed the bright red nodes in the middle and zoomed in on them, as shown below. A single join operation was consuming most of the runtime, over 78 + 16 = 94 seconds out of 114 seconds for the overall thread. That meant that any optimization would have to target that join operation in particular:

worker 1 join performance

top

The profiles for the rest of the 16 worker threads also showed them being idle, like thread 1. To confirm that, I ran the top program for system monitoring and hit H so it would display threads instead of processes. As expected, it showed dbsp-worker-0 and its paired background thread dbsp-bg-0 as busy and the rest of the workers as idle:

    PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND
  96202 blp       20   0   57.4g  23.1g  22716 R  99.0   9.2   1:08.98 dbsp-bg-0
  96219 blp       20   0   57.4g  23.1g  22716 R  98.7   9.2   1:45.15 dbsp-worker-0
  96251 blp       20   0   57.4g  23.1g  22716 S  18.1   9.2   0:14.30 nexmark-13
  96241 blp       20   0   57.4g  23.1g  22716 S  16.8   9.2   0:13.88 nexmark-3
  96243 blp       20   0   57.4g  23.1g  22716 S  16.5   9.2   0:13.16 nexmark-5
  96239 blp       20   0   57.4g  23.1g  22716 S  15.8   9.2   0:13.67 nexmark-1
  96249 blp       20   0   57.4g  23.1g  22716 S  15.8   9.2   0:12.94 nexmark-11
  96245 blp       20   0   57.4g  23.1g  22716 S  15.5   9.2   0:12.83 nexmark-7
  96247 blp       20   0   57.4g  23.1g  22716 R  15.5   9.2   0:13.18 nexmark-9
  96253 blp       20   0   57.4g  23.1g  22716 S  15.5   9.2   0:12.57 nexmark-15
  96242 blp       20   0   57.4g  23.1g  22716 S  14.8   9.2   0:11.97 nexmark-4
  96248 blp       20   0   57.4g  23.1g  22716 S  14.8   9.2   0:12.12 nexmark-10
  96238 blp       20   0   57.4g  23.1g  22716 S  14.5   9.2   0:12.04 nexmark-0
  96240 blp       20   0   57.4g  23.1g  22716 S  14.5   9.2   0:12.10 nexmark-2
  96244 blp       20   0   57.4g  23.1g  22716 S  14.5   9.2   0:11.95 nexmark-6
  96246 blp       20   0   57.4g  23.1g  22716 S  14.5   9.2   0:12.95 nexmark-8
  96250 blp       20   0   57.4g  23.1g  22716 S  14.2   9.2   0:11.91 nexmark-12
  96252 blp       20   0   57.4g  23.1g  22716 S  14.2   9.2   0:11.72 nexmark-14
  96225 blp       20   0   57.4g  23.1g  22716 S  13.5   9.2   0:10.40 dbsp-worker-6
  96226 blp       20   0   57.4g  23.1g  22716 S  13.5   9.2   0:10.38 dbsp-worker-7
  96231 blp       20   0   57.4g  23.1g  22716 S  13.5   9.2   0:10.55 dbsp-worker-11
  96221 blp       20   0   57.4g  23.1g  22716 S  13.2   9.2   0:10.42 dbsp-worker-2
  96227 blp       20   0   57.4g  23.1g  22716 S  13.2   9.2   0:10.70 dbsp-worker-8
  96229 blp       20   0   57.4g  23.1g  22716 S  13.2   9.2   0:10.41 dbsp-worker-9
  96233 blp       20   0   57.4g  23.1g  22716 S  13.2   9.2   0:10.35 dbsp-worker-13
  96235 blp       20   0   57.4g  23.1g  22716 S  13.2   9.2   0:10.67 dbsp-worker-15
  96220 blp       20   0   57.4g  23.1g  22716 S  12.9   9.2   0:10.35 dbsp-worker-1
  96222 blp       20   0   57.4g  23.1g  22716 S  12.9   9.2   0:10.23 dbsp-worker-3
  96230 blp       20   0   57.4g  23.1g  22716 S  12.9   9.2   0:10.52 dbsp-worker-10
  96232 blp       20   0   57.4g  23.1g  22716 S  12.9   9.2   0:10.46 dbsp-worker-12
  96234 blp       20   0   57.4g  23.1g  22716 S  12.9   9.2   0:10.41 dbsp-worker-14
  96223 blp       20   0   57.4g  23.1g  22716 S  12.6   9.2   0:10.35 dbsp-worker-4
  96224 blp       20   0   57.4g  23.1g  22716 S  12.6   9.2   0:10.43 dbsp-worker-5

Key Distribution

Once confirmed, I puzzled over why one thread would be processing all 100 million records and the others none of them. Feldera executes inner joins via "sharding": it computes a hash of the fields used for the join and evenly distributes the records among the worker threads based on the hash. For 16 worker threads, each thread should process one-sixteenth of the records, if the join fields themselves have a uniform distribution.

Maybe the fields being joined were not evenly distributed? If the data being input to a join has a very skewed distribution, for example if the join fields take fewer values than there are worker threads, or if one or a few values of the join fields are associated with the bulk of the records, then this approach to a join will not perform well because one or a few threads would do all of the work.

What fields are being joined in this case? The profile does not say directly, but it references the Rust code generated by the Feldera SQL compiler. I looked at the code that generated one input to the join:

// rel#59:LogicalProject.(input=LogicalTableScan#1,exprs=[$1, $TUMBLE(NOW(), 10000:INTERVAL SECOND)])
// DBSPMapIndexOperator 2812(575)
let stream2812: Stream<_, IndexedWSet<Tup0, Tup7<Option<i64>, Option<i64>, Option<i64>, Option<String>, Option<String>, Timestamp, Option<String>>>> = stream2779.map_index(move |t_8: &Tup7<Option<i64>, Option<i64>, Option<i64>, Option<String>, Option<String>, Timestamp, Option<String>>, | ->
(Tup0, Tup7<Option<i64>, Option<i64>, Option<i64>, Option<String>, Option<String>, Timestamp, Option<String>>, ) {
    (Tup0::new(), (*t_8).clone(), )
});

This code lists the types very explicitly, which makes the code harder to read. With types removed and the code rephrased slightly, this is essentially:

let output = input.map_index(|x| ((), x));

This means that the join fields are `(), the empty tuple. In other words, the implementation of q12 was using a constant value as the join field. A constant value always hashes to the same value, which in turn meant that all of the data in the join was being processed in a single thread, leading to the behavior that we observed.

Solution

The preceding analysis pointed to the SQL compiler's interpretation ofthe query, which was outside my area of expertise, so I passed it along to Mihai, our SQL implementation expert, by filing issue 2229. Mihai determined that [the SQL for the query was wrong] and provided a fix that updated the query's SQL.

Mihai's fix was based on the observation that the query's use of the SQL `now()` function, which reports the current time, was not correct in the context of the query. The query was supposed to be written in terms of the time that the record was originally received, which is better approximated using the record's own date and time than with the current time.

With Mihai's fix, the join that caused trouble is no longer in the generated code at all. The result is that `q12` performance improved from almost 3 minutes to about 20 seconds for 100 million input events, changing it from an outlier to among the fastest Nexmark queries.

Conclusion

This blog entry explored some of the tools that can be useful for
exploring the details of Feldera's performance and described their use
for debugging one particular performance problem.

Other articles you may like

Incremental Update 6 at Felderaincremental-update

Incremental Update 6 at Feldera

We’re excited to announce the release of v0.26, which represents a significant step forward for Feldera. This release includes over 200 commits, adding 25,000 lines of new code and documentation. Let's dive into the highlights!

Database computations on Z-sets

How can Z-sets be used to implement database computations

Incremental Update 5 at Felderarelease

Incremental Update 5 at Feldera

A quick overview of what's new in v0.25.