Real-time fraud detection using Feldera and Hopsworks

Real-time fraud detection using Feldera and Hopsworks

Abhinav Gyawali
Abhinav GyawaliEngineer
Leonid Ryzhyk
Leonid RyzhykCTO / Co-Founder
David Bzhalav
David BzhalavHead of Data Science @ Hopsworks
Jim Dowling
Jim DowlingCEO @ Hopsworks
| July 9, 2024

How about combining a state-of-the-art real-time analytics engine with a state of-the-art real-time feature store? In this post, we team up with our friends at Hopsworks.ai to do exactly that. We'llshow you how to do real-time feature engineering with Feldera computing MLfeatures and Hopsworks handling the storage layer.

We selected credit card fraud detection for this demo because of its stringent feature engineering requirements. Accurate fraud detection demands near perfect feature freshness and low-latency feature serving. Feldera ensures the former, while Hopsworks guarantees the latter.

Let's start with a high-level overview of the feature engineering pipeline and the role of a feature store in it.

Anatomy of a feature engineering pipeline

At a high level any feature engineering pipeline consists of three parts:

  1. Storage. Stores raw input data and computed features.
  2. Compute. Computes feature vectors from raw data.
  3. ML. Performs model training and inference using feature vectors as
    inputs.
Feature engineering pipeline

What makes feature engineering challenging is that this pipeline must operate in two modes. During model training, it computes, stores, and serves features from historical data in batch mode. During inference, the pipeline computes, stores, and serves features from real-time data in streaming mode. The two modes require different compute and storage engines. This is problematic because the capabilities of streaming engines differ from the ones of batch engines. As a result, developers must effectively build two separate feature pipelines.

Feldera eliminates the need for two compute engines by unifying batch and
stream analytics
. With Feldera, the user defines features once using SQL queries, and the platform evaluates these queries over batch or streaming inputs.

A feature store solves a similar problem for the storage component of the pipeline. A feature store is a database designed specifically for storing, organizing, and serving features, offering a unified API for batch and real-time data ingress and egress.

By combining Feldera with a modern feature store such as Hopsworks, users can reduce the complexity of building and operating feature pipelines, and focus on designing the best features for their applications.

Hopsworks

Hopsworks.ai is a data platform for ML with a Python-centric Feature Store and MLOps capabilities. Hopsworks is a modular platform. You can use it as a standalone Feature Store, you can use it to manage, govern, and serve your models, and you can even use it to develop and operate feature pipelines and training pipelines. Hopsworks brings collaboration for ML teams, providing a secure, governed platform for developing, managing, and sharing ML assets - features, models, training data, batch scoring data, logs, and more.

Architecture

In the rest of this article, we will build a feature engineering pipeline for real-time credit card fraud detection using Feldera and Hopsworks.

Real-time feature pipeline using Feldera and Hopsworks

Feldera processes inputs in real-time and writes computed features to Hopsworks using the Hopsworks storage API. Internally, Hopsworks stores feature vectors in the offline store where they are available for model training. In addition, the most recent feature vectors are kept in the online store, where they can be looked up during real-time inference.

Prerequisites

This demo is structured as a series of Python notebooks. To run the notebooks, you will need a Hopsworks account, which you can create for free at https://app.hopsworks.ai/.

You will also need access to an instance of Feldera. You can either install it
locally using Docker or create a free account at try.feldera.com.

Step 0. Data preparation

We use a synthetic data generator provided by Hopsworks to populate a Kafka topic with data that simulates a stream of credit card transactions. In reality this stream would arrive in real time from points of sale terminals. The generator also creates a set of user profiles, which we store as a feature group (see below) inside the Hopsworks feature store. For complete data preparation code, see Python notebook.

Step 1. Real-time feature computation

See Python notebook for complete implementation in this section.

Step 1.1. Create Hopsworks feature groups

Hopsworks organizes features into **feature groups**. A feature group is a set of related features with a common primary key. In this example, we define two feature groups:

  • COMBINED - features that extend credit card transaction records with attributes extracted from the card holder's profile, such as their age at the time of the transaction and the number of days until the credit card expires.
  • WINDOWED - frequency of transactions and other metrics in the span of a few hours, modeled as hopping window aggregates.
import hopsworks
from hsfs import engine
from hsfs.feature import Feature
import json

# Connect to Hopsworks.
project = hopsworks.login(host="c.app.hopsworks.ai", api_key_value=KEY)

kafka_api = project.get_kafka_api()
KAFKA_OUTPUT_TOPICS = ["transactions_fraud_streaming_fg_" + str(project.id), "transactions_aggs_fraud_streaming_fg_" + str(project.id)]

fs = project.get_feature_store()

# Create feature groups to store Feldera outputs.

# COMBINED - features that extend credit card transaction records with attributes extracted from the card 
# holder's profile, such as their age at the time of the transaction and the number of days until the credit card expires.
combined_fg = fs.get_or_create_feature_group(
        name=KAFKA_OUTPUT_TOPICS[0],
        primary_key=["cc_num"],
        online_enabled=True,
        version=1,
        topic_name=KAFKA_OUTPUT_TOPICS[0],
        event_time="date_time",
        stream=True,
        features=[
            Feature("tid", type="string"),
            Feature("date_time", type="timestamp"),
            Feature("cc_num", type="string"),
            Feature("category", type="string"),
            Feature("amount", type="double"),
            Feature("latitude", type="double"),
            Feature("longitude", type="double"),
            Feature("city", type="string"),
            Feature("country", type="string"),
            Feature("fraud_label", type="int"),
            Feature("age_at_transaction", type="int"),
            Feature("days_until_card_expires", type="int"),
            Feature("cc_expiration_date", type="timestamp"),
        ],
)

try:
    combined_fg.save()
except Exception as e:
    print(e)

if KAFKA_OUTPUT_TOPICS[0] not in [topic.name for topic in kafka_api.get_topics()]:
    kafka_api.create_schema(KAFKA_OUTPUT_TOPICS[0], json.loads(combined_fg.avro_schema))
    kafka_api.create_topic(KAFKA_OUTPUT_TOPICS[0], KAFKA_OUTPUT_TOPICS[0], 1, replicas=1, partitions=1)

# WINDOWED - frequency of transactions and other metrics in the span of a few hours, modeled as hopping window aggregates.
windowed_fg = fs.get_or_create_feature_group(
    name=str(KAFKA_OUTPUT_TOPICS[1]),
    primary_key=["cc_num"],
    online_enabled=True,
    version=1,
    topic_name=KAFKA_OUTPUT_TOPICS[1],
    event_time="date_time",
    stream=True,
    features=[
        Feature("avg_amt", type="double"),
        Feature("trans", type="bigint"),
        Feature("stddev_amt", type="double"),
        Feature("date_time", type="timestamp"),
        Feature("cc_num", type="string"),
    ],
)

try:
    windowed_fg.save()
except Exception as e:
    print(e)

if KAFKA_OUTPUT_TOPICS[1] not in [topic.name for topic in kafka_api.get_topics()]:
    kafka_api.create_schema(KAFKA_OUTPUT_TOPICS[1], json.loads(windowed_fg.avro_schema))
    kafka_api.create_topic(KAFKA_OUTPUT_TOPICS[1], KAFKA_OUTPUT_TOPICS[1], 1, replicas=1, partitions=1)

Step 1.2. Create Feldera pipeline

We build a Feldera pipeline to transform raw transaction and profile data into
features. In Feldera, feature groups are modeled as SQL views. Thus, we create
a SQL program with two input tables (`TRANSACTION` and `PROFILE`), and two
output views, one for each feature group.

Feldera pipeline
# Create SQL program parameterized by source and sink connnector configurations.
def build_sql(transaction_source_config: str, combined_sink_config: str, windowed_sink_config: str) -> str:
    return f"""
    CREATE TABLE TRANSACTIONS(
        tid STRING,
        date_time TIMESTAMP,
        cc_num STRING,
        category STRING,
        amount DOUBLE,
        latitude DOUBLE,
        longitude DOUBLE,
        city STRING,
        country STRING,
        fraud_label INT
    ) WITH (
        'connectors' = '[{transaction_source_config}]'
    );

    CREATE TABLE PROFILES(
        cc_num STRING,
        cc_provider STRING,
        cc_type STRING,
        cc_expiration_date STRING,
        name STRING,
        mail STRING,
        birthdate TIMESTAMP,
        age INT,
        city STRING,
        country_of_residence STRING
    );

    -- Convert credit card expiration date from MM/YY formatted string to a TIMESTAMP,
    -- so that we can perform computations on it.
    CREATE LOCAL VIEW CC_EXPIRATION as
        SELECT
            cc_num,
            CAST(
                CONCAT(
                    '20',
                    SUBSTRING(
                        cc_expiration_date,
                        4,
                        2
                    ),
                    '-',
                    SUBSTRING(
                        cc_expiration_date,
                        1,
                        2
                    ),
                    '-01 00:00:00'
                ) AS TIMESTAMP
            ) AS cc_expiration_date
        FROM PROFILES;

    -- Compute the age of the individual during the transaction, and the number of days until the
    -- credit card expires from `PROFILES` and `TRANSACTIONS` tables.
    CREATE VIEW COMBINED 
    WITH (
        'connectors' = '[{combined_sink_config}]'
    )
    AS
        SELECT
            T1.*,
            T2.cc_expiration_date,
            TIMESTAMPDIFF(YEAR, T3.birthdate, T1.date_time) age_at_transaction,
            TIMESTAMPDIFF(DAY, T1.date_time, T2.cc_expiration_date) days_until_card_expires
        FROM
            TRANSACTIONS T1 JOIN cc_expiration T2
            ON
                T1.cc_num = T2.cc_num
            JOIN PROFILES T3
        ON
            T1.cc_num = T3.cc_num;
    
    -- Create a 4 hour hopping window aggregation from data from transactions table
    CREATE LOCAL VIEW HOP as 
        SELECT * 
        FROM TABLE(HOP(TABLE TRANSACTIONS, DESCRIPTOR(date_time), INTERVAL 4 HOURS, INTERVAL 1 HOURS));

    -- Compute aggregates from it
    CREATE LOCAL VIEW AGG as
        SELECT
            AVG(amount) AS avg_amt,
            STDDEV(amount) as stddev_amt,
            COUNT(cc_num) as trans,
            ARRAY_AGG(date_time) as moments,
            cc_num
        FROM hop
        GROUP BY cc_num, window_start;
    
    -- Final output view
    CREATE VIEW WINDOWED
    WITH (
        'connectors' = '[{windowed_sink_config}]'
    )
    AS
        SELECT
            avg_amt,
            trans,
            COALESCE(stddev_amt, 0) as stddev_amt,
            date_time,
            cc_num
        FROM agg CROSS JOIN UNNEST(moments) as date_time;
    """

We use the Kafka topic created during the data prep step as the input for the TRANSACTION table. The output views are also connected to the Hopsworks feature store via Kafka. Hopsworks ingests data from Kafka using the Avro format, so we configure Feldera Kafka connectors with Avro schemas generated by Hopsworks for each feature group.

from feldera import FelderaClient, PipelineBuilder

# Connect to the Feldera API

# Use Feldera online sandbox
# client = FelderaClient("https://try.feldera.com", api_key = get_secret('FELDERA_API_KEY'))

# Use local Feldera instance
client = FelderaClient("http://localhost:8080")

# Get Hopsworks public Kafka servers.
kafka_config = engine.get_instance()._get_kafka_config(fs.id, {})

KAFKA_INPUT_TOPIC = "transactions_topic_" + str(project.id)

# Connect the Kafka topic created during data prep to the TRANSACTIONS table.
transaction_source_config = json.dumps({
    "transport": {
        "name": "kafka_input",
        "config": kafka_config | {"topics": [KAFKA_INPUT_TOPIC], "auto.offset.reset": "earliest"}
    },
    "format": {
        "name": "json",
        "config": {
            "update_format": "raw",
            "array": False
        }
    }
})

def create_sink_config(kafka_config: dict, fg, project_id):
    return kafka_config | {
        "topic": fg.topic_name,
        "auto.offset.reset": "earliest",
        "headers": [
            {
                'key': 'projectId',
                'value': str(project_id),
            },
            {
                'key': 'featureGroupId',
                'value': str(fg.id),
            },
            {
                'key': 'subjectId',
                'value': str(fg.subject["id"]),
            },
        ]
    }

# Set the output format to use the avro schema from the feature group.
combined_sink_config = json.dumps({
    "transport": {
        "name": "kafka_output",
        "config": create_sink_config(kafka_config, combined_fg, project.id)
    },
    "format": {
        "name": "avro",
        "config": {
            "schema": combined_fg.avro_schema,
            "skip_schema_id": True
        }
    }
})

windowed_sink_config = json.dumps({
    "transport": {
        "name": "kafka_output",
        "config": create_sink_config(kafka_config, windowed_fg, project.id)
    },
    "format": {
        "name": "avro",
        "config": {
            "schema": windowed_fg.avro_schema,
            "skip_schema_id": True
        }
    }
})

sql = build_sql(transaction_source_config, combined_sink_config windowed_sink_config)
pipeline = PipelineBuilder(client, "hopsworks_kafka", sql).create_or_replace()

Step 1.3. Run the pipeline

We are now ready to start the Feldera pipeline. Once the pipeline is initialized, we populate the PROFILE table from a Pandas dataframe. The pipeline continuously ingests credit card transactions from Kafka and sends computed features to Hopsworks. Hopsworks accumulates all historical features in the offline store, while the latest features for each primary key are instantly available for lookup in the online store.

# Start the Feldera pipeline.
pipeline.start()

# Read profile data from the feature store and write it to the `PROFILE`
# table.
profile_fg = fs.get_or_create_feature_group(
    name="profile",
    version=1
)

profile_df = profile_fg.read()

pipeline.input_pandas("PROFILES", profile_df)

Step 2 and 3. Training and inference

We can now train an ML model using feature vectors accumulated in the offline store and run real-time inference using the online store. These steps are identical to other demos available from Hopsworks. We provide training and inference notebooks as part of the demo for completeness.

Takeaways

Feature engineering poses a unique challenge: how does one build a data pipeline that can operate in both batch and streaming modes? Feldera solves this problem for the compute stage of the pipeline by offering a unified engine for stream and batch analytics. In this demo we ingested transaction data from a streaming data source (Kafka), while reading profile data as a batch from the Hopsworks feature store. In general, we can use arbitrary combinations of streaming and batch inputs without changing a line in our SQL code.

In a similar vein, the Hopsworks team has built a unified storage engine optimized for ML workloads. By combining the two platforms, we obtain an end-to-end solution that:

  • Reduces the costly platform engineering required for real-time feature
    pipelines
  • Eliminates the need for writing two implementations of the same feature
    queries
  • Allows users to concentrate on designing the best features for their
    applications

We will continue exploring Hopsworks integration in a future blog post; in particular we will examine the performance achieved with Feldera and provide a comparison with Flink.

Other articles you may like

Incremental Update 6 at Felderaincremental-update

Incremental Update 6 at Feldera

We’re excited to announce the release of v0.26, which represents a significant step forward for Feldera. This release includes over 200 commits, adding 25,000 lines of new code and documentation. Let's dive into the highlights!

Database computations on Z-sets

How can Z-sets be used to implement database computations

Incremental Update 5 at Felderarelease

Incremental Update 5 at Feldera

A quick overview of what's new in v0.25.