schema registry kafka что это
Schema Registry TutorialsВ¶
OverviewВ¶
These tutorials provide a step-by-step workflow for using Confluent Schema Registry on-premises and in Confluent Cloud. You will learn how to enable client applications to read and write Avro data and check compatibility as schemas evolve.
Schema Registry BenefitsВ¶
Apache Kafka® producers write data to Kafka topics and Kafka consumers read data from Kafka topics. There is an implicit “contract” that producers write data with a schema that can be read by consumers, even as producers and consumers evolve their schemas. Schema Registry helps ensure that this contract is met with compatibility checks.
It is useful to think about schemas as APIs. Applications depend on APIs and expect any changes made to APIs are still compatible and applications can still run. Similarly, streaming applications depend on schemas and expect any changes made to schemas are still compatible and they can still run. Schema evolution requires compatibility checks to ensure that the producer-consumer contract is not broken. This is where Schema Registry helps: it provides centralized schema management and compatibility checks as schemas evolve.
Target AudienceВ¶
The target audience is a developer writing Kafka streaming applications who wants to build a robust application leveraging Avro data and Schema Registry. The principles in this tutorial apply to any Kafka client that interacts with Schema Registry.
Terminology ReviewВ¶
First let us levelset on terminology, and answer the question: What is a topic versus a schema versus a subject?
Starting with Confluent Platform 5.5.0, you can modify the subject name strategy on a per-topic basis. See Change the subject naming strategy for a topic to learn more.
Choose Your DeploymentВ¶
There are two Schema Registry tutorials you can choose from:
Как мы Schema Registry для Kafka настраивали, и что могло пойти не так…
В статье я опишу, как мы настраивали реестр схем данных для того, чтобы использовать его для сериализации и десериализации сообщений Kafka.
Исходные данные (небольшое оправдание)
Что? Почему? Зачем?
Если кратко, то это:
компактность представления бинарного формата данных (а также есть возможность кодировки в JSON);
поддержка “логических” типов данных: BigDecimal, дата, дата/время в нативном виде;
Этих пунктов хватило, чтобы выбрать Avro как схему данных. А версию взяли 1.8.2, которая вышла в мае 2017 года.
До версии 1.9 в Avro используется библиотека Joda Time для обработки логических типов, связанных с датой и временем. А начиная с версии 1.9 для этого используются нативные Java 8 библиотеки.
Перед непосредственно разработкой функциональности мы все-таки попробовали новую версию Avro, но испытали проблемы и конверсиями дат (например, сериализатор не смог корректно обработать LocalDateTime). Точных примеров я не вспомню, но нам нужно было очень аккуратно передавать время в разных временных зонах, а это не самая простая часть для конвертации.
Несомненно, можно найти еще преимущества и недостатки этой схемы данных, но мы остановились на ней.
Итак, с системой сериализации определились.
Далее речь пойдет о реестре схем данных.
Реестр схем данных, или The Confluent Schema Registry for Kafka. Благодаря SR можно обеспечить совместимость схем данных между продюсером и консьюмером Kafka. И, чего нам очень хотелось, SR поможет не потерять сообщения из-за ошибок сериализации и десериализации во время эволюции схемы данных.
В процессе настройки и при тестовых прогонах мы выявили некоторые, скажем так, особенности использования SR, о которых я хочу рассказать вам и попытаюсь собрать все неувязки в одном месте.
Настройка деплоя моделей
Прежде всего, модели нужно описать в JSON-формате. Мы создали репозиторий и описали там нужные нам модели. Описывать модели в JSON просто и понятно, например:
Если хотим nullable-поле, то добавляем в тип возможность null:
Модели описали, теперь воспользуемся maven-плагином для взаимодействия с SR:
Далее смотрим здесь описание возможностей плагина, и как мы можем управлять реестром схем:
проверить (validate) на корректность локальную схему;
проверить изменение схемы на совместимость (test-compatibility);
скачать схему с SR (download);
зарегистрировать схему (register).
Мы настроили в Gitlab CI stage с деплоем схем так (ручной запуск и только на master-ветке):
Возможно, стоит добавить фазу validate в самое начало скрипта:
Итак, после деплоя наши схемы успешно хранятся в Kafka. Стоп, скажете вы, в смысле в Kafka? А как же SR?! Ну, SR живет отдельно от Kafka брокеров. Это дополнительный компонент, который может быть настроен на любом Kafka кластере и который использует Kafka для хранения в том числе и схем. И, SR предоставляет пользователю REST API для взаимодействия со схемами данных.
Упрощенно взаимодействие с SR выглядит так:
Настройка использования Schema Registry
Чтобы использовать SR, достаточно добавить свойство с URL подключения к SR и назначить в качестве сериализатора и десериализатора соответствующие классы: KafkaAvroSerializer и KafkaAvroDeserializer
И для консьюмера, например, так:
Есть несколько нюансов, которые следует учесть перед тестом и тем более перед деплоем на бой.
Стратегия наименования сабжектов (subjects)
Если не кратко, то: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#sr-schemas-subject-name-strategy
А если кратко, то существует 3 стратегии:
Для себя мы выбрали стратегию наименования по названию модели данных:
Выбрали такой вариант потому что:
топики для тестовых стендов и боевого названы по разному, а значит, выбери мы другую стратегию, в реестре схем данных копилось бы много схем для каждого топика, это привело бы к дополнительным сложностям при анализе работы и вообще поиску багов;
возможность на разных топиках протестировать именно те схемы, которые будут использоваться в боевом окружении.
Имейте это ввиду, если тоже зададитесь вопросом «А какую стратегию выбрать?».
Аналогично, можно задать стратегию наименования схемы для ключа (key) сообщения, но мы не используем ключ для сообщений (гарантируя очередность сообщений тем, что у нас задана только 1 партиция на брокерах), поэтому и задавать стратегию для ключа смысла нет.
auto.register.schemas
Свойство, которое отвечает за то, чтобы клиентские приложения автоматически регистрировали новые схемы в SR. Для dev-окружения это может быть полезно, но для боевого сервера все-таки лучше отключить. И да, по умолчанию это свойство активировано, имейте ввиду.
Ну а мы продюсерам это свойство отключили на первых этапах внедрения SR (еще в тестовом режиме):
use.latest.version
Тип совместимости схем
В SR доступно несколько типов совместимости схем. Подробно о них написано, например, в документации Confluent.
По умолчанию используется тип BACKWARD, который разрешает удалять поля из модели данных и добавлять optional-поля (те, которые могут быть null). Сравнение схемы идет только с последней версией, а при обновлении модели в первую очередь нужно обновлять консьюмеры, чтобы не получить ошибку десериализации (например, удалили поле, обновили продюсер, который послал сообщение без этого поля, а консьюмер будет его ожидать и свалится с ошибкой десериализации).
Нас устроил тип по умолчанию, не меняли.
specific.avro.reader
Мы захотели использовать типы BigDecimal и Timestamp в моделях. Чтобы такие типы корректно сериализовать и десериализовать, следует добавить это свойство:
А так же добавить в класс SpecificData при запуске приложения нужные конверторы, например, так:
Вот пример описания модели с такими логическими типами:
Кэширование
В KafkaAvroSerializer и KafkaAvroDeserializer используется локальное кэширование, а значит клиенты не будут каждый раз запрашивать схему у SR, только в случае отсутствия нужной схемы нужной версии в локальном кэше.
И все?
В процессе тестирования и разработки мы столкнулись с не совсем очевидными вещами, о которых я расскажу, возможно, это сэкономит вам времени.
Joda Time
Avro 1.8.2 использует Joda Time библиотеку для работы с полями дат и времени. А мы пишем на Java 8 и используем нативные классы для работы с датами.
В процессе разработки был составлен класс утилитных методов для конвертаций дат. Мы нашли это наименьшей головной болью при передаче дат с часовым поясом с помощью Avro по Kafka.
Вот так мы, например, конвертируем даты перед отправкой:
А вот так, например, принимаем:
Привожу класс целиком, собрали методы из разных уголков вселенной
«avro.java.string»: «String»
Сразу кину ссылку на issue: https://issues.apache.org/jira/browse/AVRO-2702
Если грубо, то в самой схеме, допустим, нет такой строчки. Но maven-плагин пишет в схему внутри модели именно так:
Workaround: добавить «avro.java.string»: «String» для каждого string-поля в моделях (тогда будет совпадение с тем, как схему опишет maven-плагин).
Выводы
В итоге, мы внедрили Schema Registry, и успешно его используем. Инструмент очень полезный, но есть определенные подводные грабли, которые мы успешно протестировали и отложили в сторону еще на этапе разработки.
Schema registry kafka что это
Confluent Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving your Avro®, JSON Schema, and Protobuf schemas. It stores a versioned history of all schemas based on a specified subject name strategy, provides multiple compatibility settings and allows evolution of schemas according to the configured compatibility settings and expanded support for these schema types. It provides serializers that plug into Apache Kafka® clients that handle schema storage and retrieval for Kafka messages that are sent in any of the supported formats.
This README includes the following sections:
Here are a few links to Schema Registry pages in the Confluent Documentation.
Quickstart API Usage examples
The following assumes you have Kafka and an instance of the Schema Registry running using the default settings. These examples, and more, are also available at API Usage examples on docs.confluent.io.
You can download prebuilt versions of the schema registry as part of the Confluent Platform. To install from source, follow the instructions in the Development section.
The REST interface to schema registry includes a built-in Jetty server. The wrapper scripts bin/schema-registry-start and bin/schema-registry-stop are the recommended method of starting and stopping the service.
To build a development version, you may need a development versions of common and rest-utils. After installing these, you can build the Schema Registry with Maven.
This project uses the Google Java code style to keep code clean and consistent.
To run the unit and integration tests:
To run an instance of Schema Registry against a local Kafka cluster (using the default configuration included with Kafka):
To create a packaged version, optionally skipping the tests:
Each of the produced contains a directory layout similar to the packaged binary versions.
You can also produce a standalone fat JAR of schema registry using the standalone profile:
OpenAPI (formerly known as Swagger) specifications are built automatically using swagger-maven-plugin on compile phase.
Thanks for helping us to make Schema Registry even better!
The project is licensed under the Confluent Community License, except for the client and avro-* libs, which are under the Apache 2.0 license. See LICENSE file in each subfolder for detailed license agreement.
Schema Registry Security OverviewВ¶
FeaturesВ¶
Confluent Schema Registry currently supports all Kafka security features, including:
Schema Registry to Kafka ClusterВ¶
Kafka StoreВ¶
Kafka is used as Schema Registry storage backend. The special Kafka topic (default _schemas ), with a single partition, is used as a highly available write ahead log. All schemas, subject/version and ID metadata, and compatibility settings are appended as messages to this log. A Schema Registry instance therefore both produces and consumes messages under the _schemas topic. It produces messages to the log when, for example, new schemas are registered under a subject, or when updates to compatibility settings are registered. Schema Registry consumes from the _schemas log in a background thread, and updates its local caches on consumption of each new _schemas message to reflect the newly added schema or compatibility setting. Updating local state from the Kafka log in this manner ensures durability, ordering, and easy recoverability.
The Schema Registry topic is compacted and therefore the latest value of every key is retained forever, regardless of the Kafka retention policy. You can validate this with kafka-configs :
Your output should resemble:
All Kafka security features are supported by Schema Registry.
Relatively few services need access to Schema Registry, and they are likely internal, so you can restrict access to the Schema Registry itself via firewall rules and/or network segmentation.
ZooKeeperВ¶
Schema Registry supports both unauthenticated and SASL authentication to ZooKeeper.
Setting up ZooKeeper SASL authentication for Schema Registry is similar to Kafka’s setup. Namely, create a keytab for Schema Registry, create a JAAS configuration file, and set the appropriate JAAS Java properties.
If Schema Registry has a different service name tha Kafka, zookeeper.set.acl must be set to false in both Schema Registry and Kafka.
Clients to Schema RegistryВ¶
Configuring the REST API for HTTP or HTTPSВ¶
By default Schema Registry allows clients to make REST API calls over HTTP. You may configure Schema Registry to allow either HTTP or HTTPS or both at the same time.
The following configuration determines the protocol used by Schema Registry:
Comma-separated list of listeners that listen for API requests over HTTP or HTTPS or both. If a listener uses HTTPS, the appropriate SSL configuration parameters need to be set as well.
On the clients, configure schema.registry.url to match the configured Schema Registry listener.
Additional configurations for HTTPSВ¶
If you configure an HTTPS listener, there are several additional configurations for Schema Registry.
First, configure the appropriate SSL configurations for the keystore and optionally truststore for the Schema Registry cluster (for example, in schema-registry.properties ). The truststore is required only when ssl.client.auth is set to true.
You may specify which protocol to use while making calls between the instances of Schema Registry. The secondary to primary node calls for writes and deletes will use the specified protocol.
Starting with 5.4, Confluent Platform provides the Schema Registry dedicated client configuration properties, as shown in the example.
Clients to Schema Registry include both:
To configure clients to use HTTPS to Schema Registry, set the following properties or environment variables:
On the client, configure the schema.registry.url to match the configured listener for HTTPS.
On the client, configure the environment variables to set the SSL keystore and truststore in one of two ways:
(Recommended) Use the Schema Registry dedicated properties to configure the client:
The naming conventions for Confluent Control Center configuration differ slightly from the other clients. To configure Control Center as an HTTPS client to Schema Registry, specify these dedicated properties in the Control Center config file:
Schema Registry under Configuring SSL for Control Center provides a detailed explanation of the naming conventions used in this configuration.
If you use the legacy method of defining SSL values in system environment variables, SSL settings will apply to every Java component running on this JVM. For example on Connect, every connector will use the given truststore. Consider a scenario where you are using an Amazon Web Servces (AWS) connector such as S3 or Kinesis, and do not have the AWS certificate chain in the given truststore. The connector will fail with the following error:
This does not apply if you use the dedicated Schema Registry client configurations.
Migrating from HTTP to HTTPSВ¶
To upgrade Schema Registry to allow REST API calls over HTTPS in an existing cluster:
This process enables HTTPS, but still defaults to HTTP so Schema Registry instances can still communicate before all nodes have been restarted. They will continue to use HTTP as the default until configured not to. To switch to HTTPS as the default and disable HTTP support, perform the following steps:
Configuring the REST API for Basic HTTP AuthenticationВ¶
Schema Registry can be configured to require users to authenticate using a username and password via the Basic HTTP authentication mechanism.
Use the following settings to configure Schema Registry to require authentication:
The authentication.roles config defines a comma-separated list of user roles. To be authorized to access Schema Registry, an authenticated user must belong to at least one of these roles.
An example jaas_config.conf is:
Assign the SchemaRegistry-Props section to the authentication.realm config setting:
The file parameter is the location of the password file. The format is:
Here’s an example:
Get the password hash for a user by using the org.eclipse.jetty.util.security.Password utility:
Your output should resemble:
Each line of the output is the password encrypted using different mechanisms, starting with plain text.
Once Schema Registry is configured to use Basic authentication, clients must be configured with suitable valid credentials, for example:
The schema.registry prefixed versions of these properties were deprecated in Confluent Platform 5.0.
GovernanceВ¶
To provide data governance with the Confluent Schema Registry :
Disabling Auto Schema RegistrationВ¶
By default, client applications automatically register new schemas. If they produce new messages to a new topic, then they will automatically try to register new schemas. This is very convenient in development environments, but in production environments we recommend that client applications do not automatically register new schemas. Best practice is to register schemas outside of the client application to control when schemas are registered with Schema Registry and how they evolve.
Setting auto.register.schemas to false disables auto-registration of the event type, so that it does not override the latest schema in the subject. Setting use.latest.version to true causes the serializer to look up the latest schema version in the subject and use that for serialization. If use.latest.version is set to false (which is the default), the serializer will look for the event type in the subject and fail to find it.
See also, Schema Registry Configuration Options for Kafka Connect.
The configuration option auto.register.schemas is a Confluent Platform feature; not available in Apache KafkaВ®.
Once a client application disables automatic schema registration, it will no longer be able to dynamically register new schemas from within the application. However, it will still be able to retrieve existing schemas from the Schema Registry, assuming proper authorization.
Authorizing Access to the Schemas TopicВ¶
Authorizing Schema Registry Operations with the Security PluginВ¶
The Schema Registry security plugin provides authorization for various Schema Registry operations. It authenticates the incoming requests and authorizes them via the configured authorizer. This allows schema evolution management to be restricted to administrative users, with application users provided with read-only access only.
This is the fourth post in this series where we go through the basics of using Kafka. We saw in the previous posts how to produce and consume data in JSON format. We will now see how to serialize our data with Avro.
Avro and the Schema Registry
Apache Avro is a binary serialization format. It relies on schemas (defined in JSON format) that define what fields are present and their type. Nested fields are supported as well as arrays.
Avro supports schema evolutivity: you can have multiple versions of your schema, by adding or removing fields. A little care needs to be taken to indicate fields as optional to ensure backward or forward compatibility.
Since Avro converts data into arrays of bytes, and that Kafka messages also contain binary data, we can ship Avro messages with Kafka. The real question is: where to store the schema?
The Schema Registry is the answer to this problem: it is a server that runs in your infrastructure (close to your Kafka brokers) and that stores your schemas (including all their versions). When you send Avro messages to Kafka, the messages contain an identifier of a schema stored in the Schema Registry.
A library allows you to serialize and deserialize Avro messages, and to interact transparently with the Schema Registry:
Both the Schema Registry and the library are under the Confluent umbrella: open source but not part of the Apache project. This means you will want to use the Confluent distribution to use the Schema Registry, not the Apache distribution.
Defining the Avro schema
Let’s start by defining an Avro schema. As a reminder, our model looks like this:
Let’s save this under src/main/resources/persons.avsc ( avsc = AVro SChema).
Starting the Schema Registry and registering the schema
We have our schema. Now we need to register it in the Schema Registry.
Make sure you have downloaded the Confluent Platform, then start the Schema Registry:
The Schema Registry is running on port 8081. It offers a REST API with which you can interact with Curl, for instance. Registering a schema is not very easy, though, because you have to embed the JSON schema into another JSON object, meaning you have to do some escaping… Instead, I have a small Python scripts to register a schema:
The equivalent Curl command would have been:
We can check that our schema has been registered:
Producing Avro records
Now, we want to change our producer code to send Avro data. The first thing to know is that there are two flavors of Avro records:
Let’s go ahead and modify our producer:
Three things to note:
Now, because we are going to use generic records, we need to load the schema. We could get it from the Schema Registry, but this is not very safe because you don’t know what version of the schema has been registered. Instead, it is a good practice to store the schema alongside the code. That way, your code always produces the right type of data, even if someone else changes the schema registered in the Schema Registry.
We can now create GenericRecord objects using a GenericRecordBuilder :
As in the first post, we used Kotlin’s apply method to avoid repeating code.
We can now send the record:
Testing the code
So far, we still haven’t created a new topic for our messages. Let’s go ahead and create one:
Notice that we’re just creating a normal topic. Nothing here indicates the format of the messages.
Now, start the code in your IDE and launch a console consumer:
A better option is to use kafka-avro-console-consumer instead, which deserializes Avro records and prints them as JSON objects:
Conclusion
We have seen how to produce Kafka messages in Avro format. This is fairly simple to do with the help of the Schema Registry and of the associated library.
One of the common mistakes is for a producer to fetch the schema from the Schema Registry prior to producing data. As I wrote above, I’d rather know exactly what schema I am producing data with, so keeping a copy of the schema alongside the code gives you that guarantee.
Now, notice that the Kafka Avro serializer will by default register your schema against the Schema Registry if it doesn’t already exist, or if your schema differs from an already registered version. While this can be convenient in development, I suggest disabling this functionality in production ( auto.register.schemas property).
We will see in the next post how to consume the Avro messages we have produced!
The code of this tutorial can be found here.