github ArroyoSystems/arroyo v0.14.0

20 days ago

These release notes are also available on the Arroyo Blog

This release includes several major new features (including lookup joins and more powerful updating support), new convenient syntax for event time, watermarks, and source metadata, support for struct types in DDL, among many other improvements and fixes.

Features

Lookup joins

In streaming applications, joins are typically performed between two streaming sources—for example, between a Kafka stream of orders and one of transactions. But sometimes we have a smallish, static-ish dataset that we want to use to enrich a stream. For example, a SaaS company may be ingesting a stream of events from each of their customers, which they need to enrich with information from a customers table.

Lookup joins are a new feature in Arroyo 0.14 that supports that pattern. It allows you express a query in another system that will be performed based on incoming keys from a streaming query.

In this first release, Redis (GET) is supported as the query system; we will be adding support for other databases like Postgres and MySQL as well.

What does this look like? Let's take an example. First, we need to define the table for the lookup source, which we call a TEMPORARY TABLE, as its not materialized:

CREATE TEMPORARY TABLE customers (
    -- For Redis lookup tables, it's required that there be a single
    -- METADATA FROM 'key' marked as PRIMARY KEY, as Redis only supports
    -- efficient lookups by key
    customer_id TEXT METADATA FROM 'key' PRIMARY KEY,
    name TEXT,
    plan TEXT
) with (
    connector = 'redis',
    address = 'redis://localhost:6379',
    format = 'json',
    'lookup.cache.max_bytes' = 1000000,
    'lookup.cache.ttl' = interval '5 seconds'
);

The lookup.cache.max_bytes and lookup.cache.ttl are optional arguments that control the behavior of the built-in cache, which avoids the need to query the same keys over and over again.

Once we've defined the table we can use it in queries with either JOIN or LEFT JOIN:

CREATE TABLE events (
    event_id TEXT,
    timestamp TIMESTAMP,
    customer_id TEXT,
    event_type TEXT
) WITH (
    connector = 'kafka',
    topic = 'events',
    type = 'source',
    format = 'json',
    bootstrap_servers = 'broker:9092'
);

SELECT  e.event_id,  e.timestamp,  c.name, c.plan
FROM  events e
LEFT JOIN customers c
ON concat('customer.', e.customer_id) = c.customer_id
WHERE c.plan = 'Premium';

See the full Lookup Join docs, and the Redis connector docs.

Nested updates

Arroyo 0.14 ships with much more powerful support for updating SQL, a type of streaming SQL based around incrementally computing SQL queries (for more on the different types of streaming SQL, see this blog post).

Previously, we supported a single updating aggregate for a query, for example

-- count of each user
SELECT user_id, count(*)
FROM events
GROUP BY user_id;

We now support much more sophisticated queries that combine (or "nest") multiple aggregates, like

-- how many users have count above 5?
SELECT count(*) FROM (
    SELECT user_id
    FROM events
    GROUP BY user_id
    HAVING count(*) > 5
)

Another consequence of this is that it's now possible to write sophisticated aggregating queries over incoming updating data, for example from a RDBMS changefeed via Debezium. For example, this query consumes from the changefeed of an orders table (e.g., from Postgres) and computes how many orders are currently in the "pending" state, updating incrementally with every addition and change in the underlying table:

CREATE TABLE orders (
    id INT PRIMARY KEY,
    product_name TEXT,
    price FLOAT,
    order_date TIMESTAMP,
    status TEXT
) WITH (
    connector = 'kafka',
    bootstrap_servers = 'localhost:9092',
    topic = 'orders',
    format = 'debezium_json',
    type = 'source'
);

select count(*)
from orders
where status = 'PENDING';

Currently, nesting is only supported for aggregates, but will be extended to joins in the next release.

Struct types

Arroyo has always supported struct types (also known as composite types), but it's been a bit hard to actually use them. In particular, while you could define tables with struct types using JSON schema, Avro, or Protobuf, it wasn't possible to define them inline in DDL (CREATE TABLE).

In 0.14, we're addressing that by introducing struct type syntax, inspired by Bigquery's. It looks like this:

CREATE TABLE events (
    event_id INT,
    name TEXT,
    properties STRUCT <
      user_id TEXT,
      timings INT[],
      name STRUCT <
        first TEXT,
        last TEXT
      >
    >
) with (
    connector = 'sse',
    format = 'json',
    endpoint = 'http://example.com'
)

This will deserialize JSON with the same nested structure, for example:

{
  "event_id": 1,
  "name": "user_signup",
  "properties": {
    "user_id": "abc123",
    "timings": [100, 200, 300],
    "name": {
      "first": "Alice",
      "last": "Smith"
    }
  }
}
  • Add support for inline struct definitions in DDL by @mwylde in #833

Syntax!

We've finally gone and done it. We've taken Postgres syntax as far as will can go. In 0.14, we're finally embracing custom syntax for streaming-specific concepts, with a custom SQL parser. This enabled the struct syntax in the previous section, and gives us much more flexibility to provide good UX for our SQL dialect.

In addition to STRUCT<>, we're also introducing three other changes in 0.14, for watermarks, metadata, and WITH arguments. We encourage users to switch to these new forms, as the old forms will be dropped in the next release and will produce a warning in the meantime.

Event time and watermark

Event time and watermarks are core to Arroyo's dataflow semantics. These allow users to specify (1) a field of the data that represents the actual, real-world time an event occurred, and (2) how we should generate watermarks based on that time.

Previously users expressed this via generated fields and WITH arguments:

CREATE TABLE logs (
  timestamp TIMESTAMP NOT NULL,
  id INT,
  watermark TIMESTAMP GENERATED ALWAYS AS (timestamp - INTERVAL '5 seconds') STORED
) WITH (
  event_time_field = 'timestamp',
  watermark_field = 'watermark',
  ...
)

which was verbose and required adding potentially unwanted fields to the schema. In 0.14, this can instead be written using a new WATERMARK FOR syntax, following the structure:

WATERMARK FOR fieldname [AS watermark_expr]

For example:

CREATE TABLE logs (
  timestamp TIMESTAMP NOT NULL,
  id INT,
  WATERMARK FOR timestamp AS timestamp - INTERVAL '5 seconds'
) WITH (
  ...
)

See the watermark docs for more details.

Metadata

In 0.13, we introduced support for source metadata, the ability to inject non-data fields into a source table, like a Kafka offset or MQTT topic. Defining metadata columns required the somewhat unfortunate approach of defining a virtual field with a magic metadata UDF, like

create table users (
    id TEXT,
    name TEXT,
    offset BIGINT GENERATED ALWAYS AS (metadata('offset_id')) STORED,
    partition INT GENERATED ALWAYS AS (metadata('partition')) STORED
) with (
    connector = 'kafka',
    ...
);

In 0.14, this has been replaced with a more obvious METADATA FROM syntax:

offset BIGINT METADATA FROM 'offset_id',
partition INT METADATA FROM 'partition'

Typed WITH options

Arroyo connector tables are configured via WITH options, key-value pairs that control the behavior of the connector. Previously, values were required to be SQL strings. We now support richer types, like booleans, numbers, field-references, and intervals:

CREATE TABLE files (
    ...
) WITH (
    connector = 'filesystem',
    rollover_seconds = 60
    time_partition_pattern = '%Y/%m/%d/%H',
    'json.include_schema' = true,
    'event_time_field' = datetime,
    'flush_interval' = interval '5' seconds
);
  • Introduce METADATA and WATERMARK syntax by
    @mwylde in
    #837
  • Support typed SQL opts in with clause by @mwylde
    in #825

Sink shuffles

The FileSystem sink allows users to partition their data by a key. For example, in a data ingestion pipeline this might be an event type, or a customer id; when ingesting into a data lake, partitioning by key can improve query performance by reducing the number of files that need to be read.

However, this advantage is diminished somewhat when running a highly-parallelized Arroyo pipeline, as each parallel subtask will write its own set of files for each partition key. For example, if we have parallelism of 32 and 100 distinct event types, we'd be writing 3200 files for every flush interval.

Arroyo 0.14 introduces a new option for the FileSystem sink: shuffle_by_partition.enabled. If this is set to true, the planner will introduce a shuffle edge before the sink, causing all events for a particular partition key to end up on the same sink node. In our example, this means that each event type file will only be written by a single operator, so we'll end up with only 100 files for every flush interval.

See the FileSystem sink docs for more details

  • Add shuffle_by_partition option to filesystem sink by @mwylde in #827

Improvements

  • Add metadata field to Kafka source by @mwylde in #822
  • Address log message spew when committing multiple sinks by @mwylde in #817
  • Clean up and update dependencies for 0.14 dev cycle by @mwylde in #823
  • Fix new clippy warnings in 1.84 and update toolchain for CI by @mwylde in #826
  • Support inference for Debezium sinks by @mwylde in #830
  • Improve JSON decoding speed by up to 40% by @mwylde in #835
  • Address dependency issue exposed by Rust 1.85 by @mwylde in #836
  • Update dependencies to address dependabot warnings by @mwylde in #841
  • Add kafka metadata fields to confluent connector by @mwylde in #848
  • Add support for mqtt v3 by @mwylde in #849
  • Add nodeSelector and tolerations support for worker pods in configmap by @RatulDawar in #850
  • Support NATS JWT auth by @nlap in #851

Fixes

  • Fix regression with delta writes of invariant columns by @mwylde in #816
  • Correctly handle AWS token refreshes for Delta by @mwylde in #819
  • Fix early exit in sliding window when some data is old by @mwylde in #829
  • Restrict chrono dependency to fix arrow build by @mwylde in #839
  • Address deadlock in AWS token cache by @mwylde in #840
  • Fix NATS consumer conflict by @nlap in #852
  • Use job-distinct ids for MQTT connection by @mwylde in #853
  • Downgrade missing writer error to a warning by @mwylde in #854
  • Support global session windows by @mwylde in #855

Full Changelog: v0.13.0...v0.14.0

Don't miss a new arroyo release

NewReleases is sending notifications on new releases.