Skip to content

[FLINK-39605][Flink-Formats] Add avro variant confluent format#28115

Open
swapna267 wants to merge 1 commit intoapache:masterfrom
swapna267:FLINK-39605
Open

[FLINK-39605][Flink-Formats] Add avro variant confluent format#28115
swapna267 wants to merge 1 commit intoapache:masterfrom
swapna267:FLINK-39605

Conversation

@swapna267
Copy link
Copy Markdown
Contributor

What is the purpose of the change

This pull request adds a new avro-variant-confluent deserialization format that reads Avro binary data from Confluent Schema Registry and converts it into Flink's VARIANT type. Unlike the existing avro-confluent format which requires a fixed reader schema at table creation time, this format uses the writer schema
dynamically per record — enabling schema-agnostic ingestion where the Avro schema varies across records.

Example usage:

    CREATE TABLE kafka_source (                                                                                                                                                                                                                                                                                                
      data VARIANT,                                                                                                                                                                                                                                                                                                          
      avro_schema STRING METADATA FROM 'schema'                                                                                                                                                                                                                                                                              
  ) WITH (                                                                                                                                                                                                                                                                                                                   
      'connector' = 'kafka',                                                                                                                                                                                                                                                                                                 
      'topic' = 'my-topic',                                                                                                                                                                                                                                                                                                  
      'properties.bootstrap.servers' = 'server1',                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
      'format' = 'avro-variant-confluent',                                                                                                                                                                                                                                                                                   
      'avro-variant-confluent.url' = ''                                                                                                                                                                                                                                                                 
  );                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          

Brief change log

  • Added RegistryWriterAvroDeserializationSchema that deserializes Avro binary using the writer schema as both reader and writer, preserving all fields as-is
  • Added AvroVariantDeserializationSchema that converts GenericRecord to Variant with an LRU converter cache keyed by writer schema
  • Added AvroVariantDecodingFormat with support for optional schema metadata column exposing the writer schema string
  • Added ConfluentRegistryAvroVariantFormatFactory as the SPI entry point (identifier: avro-variant-confluent), deserialization only
  • Added test utility fixedRoundRobinSchemaCoderProvider for testing multi-schema scenarios

Verifying this change

This change added tests and can be verified as follows:

  • AvroVariantDeserializationSchemaTest — parameterized deserialization with/without schema metadata, converter cache eviction, null message handling
  • RegistryWriterAvroDeserializationSchemaTest — verifies writer-schema-preserving deserialization of GenericRecord
  • ConfluentRegistryAvroVariantFormatFactoryTest — SPI discovery, optional SSL/auth properties, missing URL validation, schema metadata column via format readable metadata
  • RegistryAvroVariantDeserializationSchemaTest — end-to-end serialize (RowData→Avro) then deserialize (Avro→Variant) with MockSchemaRegistryClient, schema evolution across v1/v2, parameterized with/without schema metadata

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes (new deserialization path for VARIANT type)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs , Will add to Flink docs similar to avro-confluent format page, once this PR is reviewed and ready.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Claude Code (Claude Opus 4.6)

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 5, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants