Kafka Schema Registry Upload Avro Schema From Command Line
This example shows how to use the Kafka Schema Registry to shop information schemas for Kafka topics which nosotros will generate using Apache Avro. The example will also demonstrate how to use the Schema Registry to produce and consume generated Apache Avro objects using an Instaclustr Kafka cluster.
Creating an Apache Kafka cluster with the Kafka Schema Registry add-on
Instaclustr now offers Kafka Schema Registry as an addition for our Apache Kafka Managed Service. To take advantage of this offering, you can now select 'Kafka Schema Registry' as an choice when creating a new Apache Kafka cluster.
If you wish to add the Kafka Schema Registry to an existing Apache Kafka cluster, y'all tin contact [email protected] .
Using the Schema Registry
Now that the Schema Registry is up and running, you can now use it in your applications to shop data schemas for your Kafka topics. The following case is a Java application that uses the Schema Registry and Apache Avro to produce and consume some simulated product order events.
Permit admission to your client application
Before nosotros can access our schema registry application, nosotros need to open the firewall to our client application IP address. Once your cluster is upward and running, go to Firewall Rules and add your IP accost to the Kafka Schema Registry Allowed Addresses.
Client Dependencies
Add the kafka_2.12, avro, and kafka-avro-serializer packages to your application. These bundle are available via Maven (kafka_2.12, avro, kafka-avro-serializer). To add together the post-obit dependencies using Maven, add the following to your pom.xml file:
1 two three 4 5 6 vii viii ix 10 xi 12 13 14 15 xvi 17 18 19 20 21 22 23 24 | <repositories> <repository> <id> confluent </id> <url> http://packages.confluent.io/maven/ </url> </repository> </repositories> <dependencies> <dependency> <groupId> io.confluent </groupId> <artifactId> kafka-avro-serializer </artifactId> <version> 3.1.1 </version> </dependency> <dependency> <groupId> org.apache.kafka </groupId> <artifactId> kafka_2.12 </artifactId> <version> 1.1.0 </version> </dependency> <dependency> <groupId> org.apache.avro </groupId> <artifactId> avro </artifactId> <version> one.8.2 </version> </dependency> </dependencies> |
You will too need the avro-tools utility in order to compile the information schema into a Java class. The avro-tools utility is available here.
Create the Avro Schema
Before you tin can produce or consume messages using Avro and the Schema Registry you commencement need to define the data schema. Create a file orderEventSchema.avsc with the following content:
ane two iii 4 5 6 7 eight 9 10 eleven 12 13 14 15 sixteen 17 xviii xix twenty 21 22 23 24 25 26 | { "namespace" : "orderEventSchema.avro" , "type" : "record" , "name" : "OrderEvent" , "fields" : [ { "proper name" : "id" , "blazon" : "int" } , { "name" : "timestamp" , "type" : { "type" : "cord" , "logicalType" : "timestamp-millis" } } , { "proper noun" : "product" , "type" : "string" } , { "name" : "price" , "blazon" : "float" } ] } |
This file specifies a simple OrderEvent data serialization schema for production orders, with each OrderEvent containing an id, timestamp, product name, and cost. For more than information on the Avro serialization format meet the documentation here.
Generate the Avro Object Class
With the schema file created, use the avro-tools utility to compile the schema file into an actual Java class:
java - jar ~ / Downloads / avro - tools - 1.8.2.jar compile schema orderEventSchema .avsc src / main / java / |
Note: The src/primary/java file path at the finish of the command can exist wherever you want, simply brand sure the generated class will be accessible by your application code. An instance file construction is:
Create Kafka Topic
Apply the guide hither to create a new topic chosen orders.
Producing Avro Objects
Client configuration
Earlier creating a Kafka producer client, you lot first need to ascertain the configuration backdrop for the producer client to use. In this instance we provide only the required properties for the producer client. Come across hither for the full list of configuration options.
TheConnectedness Info folio in the Instaclustr Panel has these instance settings pre-configured with your cluster's ip addresses, username and countersign.
If your cluster has customer ⇆ broker encryption enabled, create a new file named producer.properties with the following content, ensuring the password, truststore location, and bootstrap servers list are correct:
bootstrap.servers=<broker ip 1>:9092,<banker ip 2>:9092,<broker ip three>:9092 schema.registry.url=https://ickafkaschema:<schema-registry-password>@kafka-schema.<assigned-hosted-zone-id>.cnodes.io:8085 bones.auth.credentials.source=URL fundamental.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.truststore.location=truststore.jks ssl.truststore.password=instaclustr ssl.protocol=TLS security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ickafka" password="<countersign>"; |
If your cluster does not have customer ⇆ broker encryption enabled, create a new file named producer.backdrop with the post-obit content, ensuring the countersign and bootstrap servers listing are correct:
bootstrap.servers=<broker ip i>:9092,<broker ip 2>:9092,<banker ip 3>:9092 schema.registry.url=https://ickafkaschema:<schema-registry-password>@kafka-schema.<assigned-hosted-zone-id>.cnodes.io:8085 basic.auth.credentials.source=URL primal.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ickafka" password="<password>"; |
Make sure the password and bootstrap servers list are correct.
Important Notes:
- To connect to your Kafka cluster over the private network, use port 9093 instead of 9092.
- Instaclustr'south Kafka Schema Registry is configured with basic authentication credentials in the format 'user:[email protected]:8085'
-
basic.auth.credentials.source=URL
is necessary for this basic authentication to piece of work correctly.
Java Code
Now that the configuration backdrop have been setup you can create a Kafka producer.
First, load the properties:
Backdrop props = new Properties ( ) ; try { props . load ( new FileReader ( "producer.properties" ) ) ; } grab ( IOException east ) { e . printStackTrace ( ) ; } |
Once you've loaded the properties you can create the producer itself:
KafkaProducer < String , OrderEvent > producer = new KafkaProducer <> ( props ) ; |
Next, create some OrderEvents to produce:
ArrayList <OrderEvent> orderEvents = new ArrayList <> ( ) ; orderEvents . add together ( OrderEvent . newBuilder ( ) . setId ( 1 ) . setTimestamp ( getTimestamp ( ) ) . setProduct ( "Black Gloves" ) . setPrice ( 12 ) . build ( ) ) ; orderEvents . add ( OrderEvent . newBuilder ( ) . setId ( 2 ) . setTimestamp ( getTimestamp ( ) ) . setProduct ( "Black Hat" ) . setPrice ( 30 ) . build ( ) ) ; orderEvents . add ( OrderEvent . newBuilder ( ) . setId ( iii ) . setTimestamp ( getTimestamp ( ) ) . setProduct ( "Golden Chapeau" ) . setPrice ( 35 ) . build ( ) ) ; |
Where the getTimestamp() part is:
public static String getTimestamp ( ) { render new Timestamp ( Arrangement . currentTimeMillis ( ) ) . toString ( ) ; } |
At present turn each OrderEvent into a ProducerRecord to be produced to the orders topic, and send them:
for ( OrderEvent orderEvent : orderEvents ) { ProducerRecord < String , OrderEvent > record = new ProducerRecord <> ( "orders" , orderEvent ) ; producer . send ( record ) ; System . out . println ( "sent " + record ) ; } |
Finally, use the producer'south flush() method to ensure all letters get sent to Kafka:
Full code example:
1 2 3 iv v vi 7 viii 9 10 11 12 13 14 xv sixteen 17 18 nineteen 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 l 51 52 53 54 55 56 57 58 59 60 | import orderEventSchema . avro . OrderEvent ; import org . apache . kafka . clients . producer . KafkaProducer ; import org . apache . kafka . clients . producer . ProducerRecord ; import coffee . io . FileReader ; import java . io . IOException ; import java . sql . Timestamp ; import java . util . ArrayList ; import coffee . util . Properties ; public class AvroProducer { // Return the current date and time as a String public static Cord getTimestamp ( ) { render new Timestamp ( System . currentTimeMillis ( ) ) . toString ( ) ; } public static void main ( String [ ] args ) { // Load the properties file Properties props = new Backdrop ( ) ; endeavor { props . load ( new FileReader ( "producer.properties" ) ) ; } catch ( IOException eastward ) { eastward . printStackTrace ( ) ; } // Create the producer from the backdrop KafkaProducer < Cord , OrderEvent > producer = new KafkaProducer <> ( props ) ; // Create some OrderEvents to produce ArrayList <OrderEvent> orderEvents = new ArrayList <> ( ) ; orderEvents . add together ( OrderEvent . newBuilder ( ) . setId ( 1 ) . setTimestamp ( getTimestamp ( ) ) . setProduct ( "Black Gloves" ) . setPrice ( 12 ) . build ( ) ) ; orderEvents . add ( OrderEvent . newBuilder ( ) . setId ( two ) . setTimestamp ( getTimestamp ( ) ) . setProduct ( "Black Hat" ) . setPrice ( 30 ) . build ( ) ) ; orderEvents . add ( OrderEvent . newBuilder ( ) . setId ( 3 ) . setTimestamp ( getTimestamp ( ) ) . setProduct ( "Gold Hat" ) . setPrice ( 35 ) . build ( ) ) ; // Turn each OrderEvent into a ProducerRecord for the orders topic, and transport them for ( OrderEvent orderEvent : orderEvents ) { ProducerRecord < Cord , OrderEvent > tape = new ProducerRecord <> ( "orders" , orderEvent ) ; producer . ship ( record ) ; Arrangement . out . println ( "sent " + record ) ; } // Ensure all messages get sent to Kafka producer . flush ( ) ; } } |
Consuming Avro Objects
Client configuration
As in the producer example, before creating a Kafka consumer customer you offset need to define the configuration properties for the consumer client to use. In this case we provide but the required properties for the consumer client. Run across here for the full list of configuration options.
If your cluster has client ⇆ banker encryption enabled, create a new file named consumer.properties with the following content, ensuring the password, truststore location, and bootstrap servers listing are correct:
bootstrap.servers=<banker ip i>:9092,<broker ip two>:9092,<broker ip 3>:9092 schema.registry.url=https://ickafkaschema:<schema-registry-password>@kafka-schema.<assigned-hosted-zone-id>.cnodes.io:8085 basic.auth.credentials.source=URL grouping.id=avro key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer ssl.enabled.protocols=TLSv1.2,TLSv1.ane,TLSv1 ssl.truststore.location=truststore.jks ssl.truststore.countersign=instaclustr ssl.protocol=TLS security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ickafka" password="<countersign>"; |
If your cluster does not accept client ⇆ broker encryption enabled, create a new file named consumer.backdrop with the following content, ensuring the password and bootstrap servers listing are right:
bootstrap.servers=<banker ip 1>:9092,<broker ip 2>:9092,<broker ip iii>:9092 schema.registry.url=https://ickafkaschema:<schema-registry-password>@kafka-schema.<assigned-hosted-zone-id>.cnodes.io:8085 bones.auth.credentials.source=URL group.id=avro key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ickafka" password="<password>"; |
Brand certain the password and bootstrap servers list are right.
Important Notes:
- To connect to your Kafka cluster over the individual network, use port 9093 instead of 9092.
- Instaclustr'due south Kafka Schema Registry is configured with basic authentication credentials in the format 'user:[email protected]:8085'
-
basic.auth.credentials.source=URL
is necessary for this basic hallmark to work correctly.
Java Code
Now that the configuration properties accept been setup y'all can create a Kafka consumer.
Outset, load the properties:
Backdrop props = new Properties ( ) ; try { props . load ( new FileReader ( "consumer.properties" ) ) ; } grab ( IOException east ) { e . printStackTrace ( ) ; } |
Once you've loaded the properties you can create the consumer itself:
KafkaConsumer < String , OrderEvent > consumer = new KafkaConsumer <> ( props ) ; |
Before you tin consume messages, you need to subscribe the consumer to the topic(due south) y'all wish to receive messages from, in this case the orders topic:
consumer . subscribe ( Collections . singletonList ( "orders" ) ) ; |
Finally, continually poll Kafka for new messages, and print each OrderEvent received:
try { while ( true ) { ConsumerRecords < String , OrderEvent > records = consumer . poll ( chiliad ) ; for ( ConsumerRecord < Cord , OrderEvent > record : records ) { System . out . println ( record . value ( ) ) ; } } } finally { consumer . shut ( ) ; } |
Full code case:
1 2 3 iv five 6 7 8 9 10 eleven 12 13 14 15 sixteen 17 eighteen 19 xx 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | import orderEventSchema . avro . OrderEvent ; import org . apache . kafka . clients . consumer . ConsumerRecord ; import org . apache . kafka . clients . consumer . ConsumerRecords ; import org . apache . kafka . clients . consumer . KafkaConsumer ; import java . io . FileReader ; import java . io . IOException ; import java . util . Collections ; import coffee . util . Backdrop ; public course AvroConsumer { public static void main ( String [ ] args ) { // Load the properties file Properties props = new Properties ( ) ; try { props . load ( new FileReader ( "consumer.properties" ) ) ; } take hold of ( IOException due east ) { e . printStackTrace ( ) ; } // Create the consumer from the properties KafkaConsumer < String , OrderEvent > consumer = new KafkaConsumer <> ( props ) ; // Subscribe the consumer to the orders topic consumer . subscribe ( Collections . singletonList ( "orders" ) ) ; // Continually poll Kafka for new messages, and impress each OrderEvent received try { while ( true ) { ConsumerRecords < String , OrderEvent > records = consumer . poll ( m ) ; for ( ConsumerRecord < Cord , OrderEvent > record : records ) { System . out . println ( record . value ( ) ) ; } } } finally { consumer . close ( ) ; } } } |
Putting Them Together
Now that you take a consumer and producer ready, it's time to combine them.
Get-go the Consumer
Outset the consumer before starting the producer, because past default, consumers merely consume letters that were produced after the consumer started.
Starting time the Producer
Now that the consumer is setup and ready to consume letters, you can now offset your producer.
If the consumer and producer are setup correctly the consumer should output the messages sent past the producer shortly after they were produced, for example:
{ "id" : 1 , "timestamp" : "2018-06-27 15:02:17.253" , "product" : "Black Gloves" , "price" : 12.0 } { "id" : ii , "timestamp" : "2018-06-27 15:02:17.253" , "product" : "Black Chapeau" , "toll" : 30.0 } { "id" : 3 , "timestamp" : "2018-06-27 15:02:17.253" , "product" : "Gold Hat" , "price" : 35.0 } |
Transparent, fair, and flexible pricing for your data infrastructure :Meet Instaclustr Pricing Here
Source: https://www.instaclustr.com/support/documentation/kafka-add-ons/using-the-kafka-schema-registry/
0 Response to "Kafka Schema Registry Upload Avro Schema From Command Line"
Enregistrer un commentaire