Ahorra un 25 % (o incluso más) en tus costes de Kafka | Acepta el reto del ahorro con Kafka de Confluent

Meeting Data (and Analytics) Engineers Where They Are: Introducing the dbt Adapter for Confluent Cloud

Escrito por

dbt is the most commonly used tool by data engineers to define SQL transformations (as models), write tests, generate documentation, and deploy through CI/CD and now it’s available with Confluent Cloud too! The magic of dbt is that it brings the engineering rigor to modern data work and data engineering, regardless of the underlying compute source - Snowflake, BigQuery, Databricks, Redshift or Confluent. You can find out more about the launch in our Q2 Confluent Cloud Launch post and the keynote.

Why Streaming

The trend today, largely driven by AI, as well as customer demand for fresh data is driving processing closer to the source. The source, for many organizations, is a stream, so as a result, batch transformations need to get turned into streaming jobs that deliver fresher data and reduce pipeline latency. As a knock-on effect, this shift tends to reduce architectural complexity. This pattern, that we call "shifting left," positions stream processing as the primary transformation layer rather than purely a loading and ingestion mechanism.

Confluent Cloud for Apache Flink provides the streaming engine with a declarative SQL API and Table API. The engine and API’s alone aren't enough. Data engineers also need the developer experience around it: a way to manage SQL as code, test it before it reaches production, and deploy it through the same review process they use for everything else.

When we set out to build the dbt-confluent adapter, we focused on three things data engineers told us matter most: a familiar interface for managing transformations, reliable and deterministic testing, and seamless CI/CD integration. 

We built the adapter from the ground up to suit production workloads on Confluent Cloud. The dbt-confluent adapter is backed by confluent-sql, an open source purpose-built DB-API v2 compliant Python driver that communicates directly with the Confluent Cloud REST API. This eliminated the need for a proxy or middleware layer. Statement lifecycle management, result pagination, type conversion, and automatic retry on transient infrastructure issues are built into the driver. The result is an adapter that behaves the way data engineers expect a dbt adapter to behave.

An aside

Note: The adapter and the driver are open source under the Apache 2.0 license and available on GitHub. It currently supports dbt Core 1.11 and is in active development.

Streaming-Native Materializations

Most dbt adapters are built for batch: you run a query, it produces a table, and you're done. Streaming is fundamentally different in that queries run continuously and results keep updating. The adapter needed new materializations to account for that:

  • view — Standard Flink SQL views (default).

  • streaming_table — Creates a table with a continuous INSERT INTO...SELECT statement, providing explicit control over changelog semantics.

  • streaming_source — Creates a Kafka topic-backed source table with schema defined in the model SQL. If you are using connectors, this would be the materialization you use to define the schema of the model for that topic.

An aside

Note: Future materializations like materialized_tables (the newest Flink abstraction) and standard tables (generated with snapshot queries) are on the roadmap. 

Data engineers configure these materializations through dbt's standard config() block. Under the hood, the adapter manages the distinct execution modes. The complexity of the materialization stays behind the interface. It’s only a config option. This also separates the SQL logic from the table creation which is what allows dbt to have unit test capabilities.

Deterministic Testing for Streaming Pipelines

Testing has historically been one of the biggest challenges with streaming SQL. When a query runs continuously and results are unbounded, how do you write a test with a deterministic pass/fail?

The adapter solves this by automatically switching to bounded execution mode (snapshot queries) for tests. Rather than setting a timeout and hoping enough data arrives before it expires, which can produce false positives when no data means a silent pass, the adapter runs tests against finite, deterministic result sets. The outcome is reliable and repeatable, and your CI/CD pipeline won't hang waiting on an unbounded stream. This functionality is available via the native dbt unit tests. You can provide mock input data, run your model logic, and verify the output matches expectations. For example, in the configuration below, we are testing the stg_orders model. We feed the test a single mock record (an order for $29.99), and then instruct dbt to assert that the exact same row is successfully processed and emitted without data loss or unwanted mutation:

unit_tests:
    - name: test_stg_orders
      model: stg_orders
      given:
        - input: ref('raw_orders')
          rows:
            - { order_id: '1', price: 29.99, order_time: '2025-01-15 10:00:00' }
      expect:
        rows:
          - { order_id: '1', price: 29.99, order_time: '2025-01-15 10:00:00' }

Combined with dbt's standard data quality tests (not_null, unique, custom assertions), teams get a robust testing framework for streaming pipelines.

From Models to Production

Here's what a dbt project on Confluent Cloud looks like in practice. Start by configuring your connection to Confluent Cloud in profiles.yml:

my_project:
  target: dev
  outputs:
    dev:
      type: confluent
      cloud_provider: AWS
      cloud_region: us-west-2
      organization_id: org-xxxxx
      environment_id: env-xxxxx
      compute_pool_id: lfcp-xxxxx
      flink_api_key: "{{ env_var('CONFLUENT_FLINK_API_KEY') }}"
      flink_api_secret: "{{ env_var('CONFLUENT_FLINK_API_SECRET') }}"
      dbname: my_kafka_cluster
      threads: 1

A staging model references a Kafka topic, using a watermark to define event-time semantics for downstream windowed operations:

-- models/staging/stg_orders.sql
{{ config(materialized='streaming_table') }}

SELECT
  `order_id`,
  `customer_id`,
  `price`,
  CAST(`$rowtime` AS TIMESTAMP(3)) AS order_time
FROM `examples`.`marketplace`.`orders`

-- models/staging/schema.yml

models:
  - name: stg_orders
    constraints:
      - type: custom
        expression: "WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND"

Next, we build a downstream revenue model to process this stream. This model (fct_revenue.sql) uses a Flink tumbling window to group the incoming orders into one-minute buckets, continuously calculating the total revenue and order volume:

-- models/marts/fct_revenue.sql
{{ config(materialized='table') }}

SELECT
  window_start,
  window_end,
  SUM(price) AS total_revenue,
  COUNT(*) AS order_count
FROM TABLE(
  TUMBLE(TABLE {{ ref('stg_orders') }},
         DESCRIPTOR(order_time),
         INTERVAL '1' MINUTE)
  )
GROUP BY window_start, window_end

Run dbt run and both statements deploy to a Flink compute pool on Confluent Cloud. 

The adapter submits each statement, confirms it's running on the compute pool, and returns control to the CLI; your terminal doesn't block waiting on an infinite stream. The revenue model then runs continuously on Flink: as new orders arrive on the Kafka topic, the tumbling window aggregates them into one-minute revenue buckets. This is a live, always-updating pipeline, deployed with the same command a data engineer would use to materialize a batch table on any data warehouse.

Integrating with the Broader Ecosystem

The integration doesn't stop at deployment. Because you are using dbt, the rest of your standard workflow naturally extends to your streaming data:

  • Validation: Running dbt test validates data quality against the live, running pipeline.

  • Documentation: Running dbt docs generate produces data lineage and documentation across all your models.

  • Automation: The entire lifecycle of model changes, test runs, and deployments integrates seamlessly with GitHub Actions or any other CI/CD system.

Ultimately, your streaming pipelines move through the exact same pull request, review, and deploy process as any standard software code.

Getting Started

The adapter is available today for teams already on Confluent Cloud:

  1. Install: pip install dbt-confluent

  2. Initialize: dbt init my_project and select confluent

  3. Configure: Set your Confluent Cloud credentials in profiles.yml

  4. Deploy: dbt run

Full documentation is in the Confluent Cloud docs. Get in touch to let us know how you get on and discuss any features you’d like to see!

¿Te ha gustado esta publicación? Compártela ahora