Skip to content

Databricks Managed Iceberg table configuration

For Bufstream to export data to Databricks Managed Iceberg tables, you'll need to update its configuration and configure topics.

TL;DR

This minimal example configures Bufstream for Databricks:

yaml
# Add a Databricks catalog as a REST catalog, using an OAuth secret or
# Personal Access Token (PAT):
iceberg:
  catalogs:
    - name: databricks
      rest:
        url: https://DATABRICKS_INSTANCE_NAME/api/2.1/unity-catalog/iceberg-rest
        warehouse: DATABRICKS_CATALOG_NAME
        oauth2:
          token_endpoint_url: https://DATABRICKS_INSTANCE_NAME/oidc/v1/token
          scope: all-apis
          # Names of environment variables containing secrets. `string` can be
          # used instead of env_var to store the credential's value directly
          # within the file.
          client_id:
            env_var: DATABRICKS_CLIENT_ID
          client_secret:
            env_var: DATABRICKS_CLIENT_SECRET
# Associate a schema registry and add a minimal "produce" policy.
data_enforcement:
  schema_registries:
    - name: csr
      confluent:
        url: https://YOUR_DOMAIN.buf.dev/integrations/confluent/instance-name
  produce:
    - schema_registry: csr
      values:
        on_parse_error: PASS_THROUGH

Update your configuration, restart Bufstream, then configure topic parameters: set bufstream.export.iceberg.catalog to databricks, bufstream.export.iceberg.table to a namespace and table name, like bufstream.my-topic, and bufstream.export.iceberg.commit.freq.ms to how often data should be flushed to a new snapshot (e.g., 300000 for five minutes).

After the commit frequency passes, you'll soon see new topic data appear in Databricks.

Overview

Configuring Bufstream's export to Databricks Managed Iceberg tables is typically four steps:

  1. Gather necessary Databricks information.
  2. Add a catalog to Bufstream's configuration.
  3. Configure a schema registry.
  4. Set topic configuration parameters for catalog, table name, and export frequency.

Once you've set these options, Bufstream begins exporting topic data to Databricks.

Gather Databricks information

Start by signing in to Databricks and navigating to your workspace. Gather the following information:

  1. Your Databricks instance name. (If you log into https://acme.cloud.databricks.com/, your instance name is acme.cloud.databricks.com.)
  2. Your Databricks catalog name.
  3. OAuth credentials for a service principal or a personal access token.

Add a catalog

Before configuring topics, add at least one catalog to your top-level Bufstream configuration in bufstream.yaml or, for Kubernetes deployments, your Helm values.yaml file. Every catalog must be assigned a unique name.

To use a Databricks catalog with Bufstream, add a catalog with the rest key and your workspace's configuration.

The following example is a minimal configuration using OAuth for access. Personal access tokens work, too.

yaml
iceberg:
  catalogs:
    - name: databricks
      rest:
        url: https://DATABRICKS_INSTANCE_NAME/api/2.1/unity-catalog/iceberg-rest
        warehouse: DATABRICKS_CATALOG_NAME
        oauth2:
          token_endpoint_url: https://DATABRICKS_INSTANCE_NAME/oidc/v1/token
          scope: all-apis
          # Names of environment variables containing secrets. `string` can be
          # used instead of env_var to store the credential's value directly
          # within the file.
          client_id:
            env_var: DATABRICKS_CLIENT_ID
          client_secret:
            env_var: DATABRICKS_CLIENT_SECRET

Bufstream's reference documentation describes all REST catalog configuration options for both bufstream.yaml and Helm values.yaml, including OAuth and bearer token authentication.

Configure a schema registry

Configuring a Confluent API-compatible schema registry allows Bufstream to translate your Protobuf messages to schemas in Databricks. Without a schema registry providing Protobuf schema information, topic data can still be written to Iceberg tables, but message contents are stored as raw bytes in the __raw__ fields within key and values.

A minimal data enforcement configuration defining one schema registry is shown below. At least one produce policy must be created and associated with the schema registry. By default, it applies to all topics.

yaml
data_enforcement:
  schema_registries:
    - name: csr
      confluent:
        url: https://YOUR_DOMAIN.buf.dev/integrations/confluent/instance-name
  produce:
    # This policy applies to all topics and uses the schema registry
    # configured above. Use the "topics:" key to configure topic-specific
    # policies.
    - schema_registry: csr
      values:
        # The action to perform for messages that fail to parse with their
        # associated schema. PASS_THROUGH allows these messages into the
        # topic and Iceberg table, storing their bytes in the val.__raw__
        # column and any associated error message in val.__err__. Use
        # REJECT_BATCH to reject batches of messages with parsing errors.
        on_parse_error: PASS_THROUGH

Bufstream's reference documentation describes all data enforcement configuration options, including topic-specific configuration and semantic validation options, for both bufstream.yaml and Helm values.yaml.

Configure topics

Last, set the following required topic configuration parameters. It's OK to update these on an existing topic.

  • bufstream.export.iceberg.catalog: This must be the name of a catalog in your Bufstream configuration.

  • bufstream.export.iceberg.commit.freq.ms: How often data is flushed to a new snapshot.

  • bufstream.export.iceberg.table: This must be a valid namespace and table for the configured catalog in the form namespace.table.

    There must be at least one component in the namespace; a value of table is considered invalid since it includes no namespace component.

You can also set the following optional properties:

  • bufstream.export.iceberg.granularity: The granularity to use for partitioning the table by date/time.

    Valid options are MONTHLY, DAILY (default), or HOURLY.

  • bufstream.export.iceberg.partition.fields: Dot-separated paths to fields in the value schema which will be used as partition keys. This must be a comma-delimited list of scalar/leaf fields.

  • bufstream.export.iceberg.use.ingest.time: Whether to use the ingestion timestamp of the record for date/time partitioning. If false, the record timestamp is used.

    Valid options are TRUE, FALSE (default).

Bufstream supports reading and updating topic configuration values from any Kafka API-compatible tool, including browser-based interfaces like AKHQ and Redpanda Console.

Query your table

After your topic is configured, Bufstream will wait up to 30 seconds to start exporting data. If you've set your commit frequency (bufstream.export.iceberg.commit.freq.ms) to five minutes, that means you should start to see records arrive in Databricks within five and a half minutes.

Once you see your table in Databricks, you can start querying your Kafka records' keys and values:

Example Databricks query