A synchronous streaming model

A synchronous streaming model

Mihai Budiu
Mihai BudiuChief Scientist / Co-Founder
| April 3, 2024

What is a streaming system?

This seems like an easy question: it's a system which receives data as one or multiple independent sequences of *events* and, similarly, produces results as one or more sequences of events. An event is just a (typed) bundle of data. The data can be a database tuple, a JSON record, a frame in a video stream, etc.

There are many ways to design a streaming system, but some are more useful than others. In this blog post we explore some of the possible high-level design decisions.

The diagram below shows a streaming system with 3 input streams and 2 output streams. Each arrow is a stream, and inside the arrows one can see the *events*, ordered in time.

A streaming system with multiple inputs and outputs

Synchronous streaming

Let me first describe the streaming model used in the DBSP theory, and implemented in Feldera. DBSP uses a very simple, idealized model for streaming computations. Although idealized, the model can be efficiently implemented in practice.

In Feldera, time is given by a *logical clock*. The logical clock divides the real time into disjoint time intervals, and each event belongs to exactly one such interval. There is a single logical clock for the entire streaming system, and it applies to *all* input and output streams.

A streaming system with a global centralized clock

In the previous diagram the vertical dotted lines show the first few logical clock ticks. Each event must fall within a time interval defined by the logical clock.

In this model we call all the input events that fall within a time interval a *change*. The way you interact with a synchronous system is the following:

  • The system collects events in all the input streams
  • When the clock ticks, indicating the end of the clock cycle, all
    events collected are fed to the streaming system as a single
    change
  • The system instantly (i.e., within the same logical clock cycle)
    computes the result and emits it using the output streams. There is
    exactly one output change for every input change.
  • The output change consists of zero or many events for each output
    stream. The output change may be empty (having zero events,
    signaling "no changes from the previous output"), but must be
    present.
  • The cycle repeats itself

We call this model "synchronous streaming", because there is a single clock, and inputs and outputs alternate precisely. Output 1 is produced immediately after receiving input 1, output 2 is produced immediately after receiving input 2, and so on.

Notice that a time slot may contain zero, one, or many events for every stream.

I want to emphasize the importance of expressing empty changes. Empty changes are equivalent to the 0 value in mathematics.

The DBSP system specification

The DBSP model is *causal*, which means that the output at logical time 1 cannot depend in any way on the input at logical time 2.

Moreover, DBSP requires the system itself to be *deterministic*. For a given input sequence, the output sequence that is produced should be unique, and must only depend on the events received in the inputs. The streaming system receives all the information through inputs; it cannot generate random bits, or look at the position of the sun (i.e., use a separate clock) to decide the output.

The synchronous model allows us to give a precise definition of what a DBSP system is supposed to do. The definition roughly says that output for time slot K must be defined as a simple function of inputs for time slots 1, 2, ..., K. This model of execution is very similar to a *function* in many programming languages: a function is invoked with input arguments and produces a result for each invocation.

Databases, transactions, and streams

If this seems too restrictive, remember that this model is actually very similar to the way modern databases work. Jim Gray, who received the 1998 Turing award, has defined many of the core concepts of transaction processing. Database
transactions are atomic computation over multiple changes. Transactions have this nice property called linearizability, which says that all transactions of a database have to behave as if they happened in some sequential order, completely independent of each other. So transactions in a database define a logical clock, very much like the DBSP logical clock: the clock ticks every time a transaction is completed. Within a transaction many changes can happen in the database: e.g., multiple tables can each be modified by many insertions and deletions. Notice that the operations can be grouped into transactions even if the individual events in the multiple streams of activity do not use a global clock, and may even be interleaved.

The Feldera computational model

Feldera is an implementation of the DBSP synchronous model where inputs are database tables, outputs are database views, and computations are described as SQL queries. An input change in Feldera is an arbitrary set of updates (insertions or deletions) to the database tables, and an output change is an arbitrary set of updates (insertions or deletions) to the views. For each table and each view, the set of changes is described by a Z-set. Currently in Feldera the changes cannot include the creation, deletion, or modification of tables or views, these must happen before the streaming system is started.

Unlike a database, Feldera requires some external entity to group events into transactions; databases and operating system use very sophisticated concurrency control mechanisms to achieve this effect. In Feldera this job can be done by the input adapters.

Other synchronous models

DBSP is not the first model for synchronous streaming computation. There are papers going back two decades, such as Stanford's Continuous Query Language used in the STREAM project. A related model of computing over time-varying relations is proposed in this excellent paper: One SQL to rule them
all
. Spark streaming and Spark structured streaming are also synchronous models.

Computing on isolated events

What is so special about the synchronous model?

To understand this, let's look at an alternative model, provided by the Flink streaming system. In Flink events are isolated, and processed independently of each other by the system. Moreover, there is no global clock, and many input events may be received before the corresponding outputs are produced.

A completely asynchronous streaming model

Isn't Flink's model equivalent to DBSP? It isn't. In fact, DBSP can precisely simulate the Flink computational model, by encapsulating each event into a separate change. However, Flink cannot simulate DBSP, for two reasons:

  • Some computations may produce no events at all (e.g., a view that contains the MAX value may not change at all for some inputs). In a system like Flink the absence of an event cannot be distinguished from an event that indicates no change.
  • Some changes can only be described using multiple events, possibly in separate streams. For example, the result of moving money between two back accounts requires events that describe changes to both the debiting and crediting account. A system that does not group events in related changes (or transactions) cannot describe *when* the correct result has been computed. Jamie Brandon has very clearly illustrated this problem.

Labeling events with timestamps

An alternative model of streaming computation was introduced by Timely Dataflow, introduced by the paper Naiad, a timely dataflow
system
, from Microsoft Research, published in the SOSP conference in 2013. In this model each input event is labeled with a logical clock timestamp. Timely events carry not just data, but also metadata. DBSP events require no metadata. Timely is unique in that events in a stream do not need to arrive in order; an event for logical time 1 can come after an event for logical time 2. The outputs of Timely are also labeled with timestamps. Every time a new input event is received, Timely produces corrections for all output timestamps that have to be updated. So if an event labeled "1" arrives after an event labeled "2", Timely may produce output events that provide corrections to both outputs "1" and "2", in light of the new available information. (Timely also supports some additional control signals, saying "we promise that all events in this stream from now on will have a logical time greater than 10".)

The timely model above, allowing corrections to past inputs and outputs, is more general than DBSP. Timely can simulate DBSP by restricting inputs to arrive in increasing order of clock labels. But Timely is perhaps too general, and systems constructed on top of Timely, such as Materialize do not expose this capability. We must point out that Differential Dataflow, an intermediate layer built on Timely and used in the implementation of Materialize, also uses Z-sets as the central data structure for representing changes.

Logical clocks do not exist

Strictly speaking, that is true. Distributed systems do not have centralized clocks, and events that come from separate data sources do not necessarily have a clear order with respect to each other.

To build a bridge between the reality of continuous time and a synchronous system like Feldera, one can use *buffers*. Each buffer is a centralized system which defines its own logical clock. A buffer can accumulate multiple events and release them when the destination is ready to receive them.

Buffers can be used to separate regions with different clocks

A buffer can be used to:

  • introduce a logical clock when the data sources do not have any
  • queue events if the destination system is not ready to accept them
  • re-group events into *batches*, to improve processing efficiency
  • move events between synchronous systems with independent logical
    clocks, like the output buffer in the previous picture.

Changes that have natural operations of addition and subtraction, as Z-sets do, provide a natural way for buffers to manipulate changes by simply adding the Z-sets of each stream.

In Feldera the input adapters provide a buffering function, and there is also an explicit buffer construct in the DBSP library for output buffering.

Streaming systems performance

In a pure synchronous system, like Feldera, since the system cannot start working on a new input until it has finished the previous one, the throughput is given by the inverse of the latency (the time elapsed between receiving a change and producing the corresponding output). If it takes the system 1 second (latency) to process 1 million events (change size), the throughput is 1 million events per second.

For most streaming systems, due to cache locality effects, and sometimes algorithmic factors, it is more efficient to process multiple events together. This is called "batching," or "micro-batching." There is a trade-off between processing events one by one (low latency to start the computation, but lower computational efficiency per event) and collecting bigger batches (increased latency to assemble the batches, but lower computation latency per event). The sweet spot of the trade-off depends on the nature of the data and of the computation performed.

Conclusion

In this article we do not discuss at all the kinds of computations that streaming systems can perform. We only look at the way streaming systems interact with their environment. We have seen that different streaming system provide APIs that differ in subtle, but important ways, which have profound implications on the classes of applications that these systems can be used for.

DBSP provides a simple, function-like API to the external world. DBSP inputs and outputs are described in terms of changes, and follow a common logical clock: for each input change, the system provides a corresponding output change. A change spans all input or output streams, and is described by a Z-set for each stream. The input-output relationship computed by a DBSP system is thus crystal clear. In future blog posts we will look at the kinds of computations that DBSP and Feldera can perform.

Other articles you may like

Incremental Update 6 at Feldera

We’re excited to announce the release of v0.26, which represents a significant step forward for Feldera. This release includes over 200 commits, adding 25,000 lines of new code and documentation. Let's dive into the highlights!

Database computations on Z-sets

How can Z-sets be used to implement database computations

Incremental Update 5 at Feldera

A quick overview of what's new in v0.25.