This is the multi-page printable view of this section. Click here to print.
Documentation
- 1: Overview
- 2: Install Jikkou
- 3: Jikkou Tutorials
- 4: Jikkou CLI Documentation
- 4.1: Basic CLI Features
- 4.2: CLI Configuration
- 4.3: Automating Jikkou
- 5: Jikkou API Server Documentation
- 5.1: Install Jikkou API Server
- 5.2: Configurations
- 5.2.1: API Server
- 5.2.2: Authentication
- 5.2.2.1: Basic Auth
- 5.2.2.2: JWT
- 5.2.3: CLI Proxy Mode
- 5.3: Jikkou - API References
- 6: Concepts
- 6.1: Resource
- 6.2: Labels and annotations
- 6.3: Reconciliation
- 6.4: Selectors
- 6.5: Transformations
- 6.6: Validations
- 6.7: Template
- 6.8: Collectors
- 6.9: Controllers
- 6.10: Extensions
- 6.11: Reporters
- 6.12: Actions
- 7: Extension Providers
- 7.1: Core
- 7.2: Apache Kafka
- 7.2.1: Configuration
- 7.2.2: Resources
- 7.2.2.1: Kafka Brokers
- 7.2.2.2: Kafka Consumer Groups
- 7.2.2.3: Kafka Topics
- 7.2.2.4: Kafka Users
- 7.2.2.5: Kafka Authorizations
- 7.2.2.6: Kafka Quotas
- 7.2.2.7: Kafka Table Records
- 7.2.3: Transformations
- 7.2.3.1: KafkaTopicMaxNumPartitions
- 7.2.3.2: KafkaTopicMaxRetentionMs
- 7.2.3.3: KafkaTopicMinInSyncReplicas
- 7.2.3.4: KafkaTopicMinReplicas
- 7.2.3.5: KafkaTopicMinRetentionMs
- 7.2.4: Validations
- 7.2.5: Annotations
- 7.2.6: Actions
- 7.2.6.1: KafkaConsumerGroupsResetOffsets
- 7.2.7: Compatibility
- 7.3: Apache Kafka Connect
- 7.3.1: Configuration
- 7.3.2: Resources
- 7.3.2.1: KafkaConnectors
- 7.3.3: Validations
- 7.3.4: Annotations
- 7.3.5: Labels
- 7.3.6: Actions
- 7.3.6.1: KafkaConnectRestartConnectors
- 7.4: Schema Registry
- 7.4.1: Configuration
- 7.4.2: Resources
- 7.4.2.1: Schema Registry Subjects
- 7.4.3: Validations
- 7.4.4: Annotations
- 7.5: Aiven
- 7.5.1: Configuration
- 7.5.2: Resources
- 7.5.2.1: ACL for Aiven Apache Kafka®
- 7.5.2.2: Quotas for Aiven Apache Kafka®
- 7.5.2.3: ACL for Aiven Schema Registry
- 7.5.2.4: Subject for Aiven Schema Registry
- 7.5.3: Validations
- 7.5.4: Annotations
- 8: Developer Guide
- 8.1: Extension Developer Guide
- 8.1.1: Package Extensions
- 8.1.2: Develop Custom Validations
- 8.1.3: Develop Custom Action
- 8.1.4: Develop Custom Transformations
- 9: Frequently Asked Questions
- 10: Community
- 10.1: Developer Guide
- 10.2: Contribution Guidelines
- 11: Releases
- 11.1: Release v0.32.0
- 11.2: Release v0.33.0
- 11.3: Release v0.34.0
- 12: Resources
1 - Overview
Welcome to the Jikkou documentation! Jikkou, means “execution (e.g. of a plan) or actual state (of things)” in Japanese.
What Is Jikkou ?
Jikkou is a powerful, flexible open-source framework that enables self-serve resource provisioning. It allows developers and DevOps teams to easily manage, automate, and provision all the resources needed for their Apache Kafka® platform.
Jikkou was born with the aim to streamline day-to-day operations on Apache Kafka®, ensuring that platform governance is no longer a tedious and boring task for both developers and administrators.
What Are The Use-Cases ?
Jikkou is primarily used as a GitOps solution for Kafka configuration management.
Here are some of the various use cases we’ve observed in different projects:
- Topic as a Service: Build a self-serve platform for managing Kafka topics.
- ACL Management: Centrally manage all ACLs of an Apache Kafka cluster.
- Kafka Connectors Management: Deploy and manage Kafka Connect connectors.
- Ad Hoc Changes: Apply ad hoc changes as needed.
- Audit: Easily check configurations of topics, brokers, or identify divergences between different environments.
- Kafka Configuration Backup: Periodically export all critical configurations of your Kafka cluster.
- Configuration Replication: Replicate the Kafka configuration from one cluster to another.
How Does Jikkou Work ?
Jikkou offers flexibility in deployment, functioning either as a simple CLI (Command Line Interface) or as a REST server, based on your requirements.
By adopting a stateless approach, Jikkou does not store any internal state. Instead, it leverages your platforms or services as the source of truth. This design enables seamless integration with other solutions, such as Ansible and Terraform, or allows for ad hoc use for specific tasks, making Jikkou incredibly flexible and versatile.
Is Jikkou For Me ?
Jikkou can be implemented regardless of the size of your team or data platform.
Small Development Team
Jikkou is particularly useful for small development teams looking to quickly automate the creation and maintenance of their topics without having to implement a complex solution that requires learning a new technology or language.
Centralized Infrastructure (DevOps) Team
Jikkou can be very effective in larger contexts, where the configuration of your Kafka Topics, ACLs, and Quotas for all your data platform is managed by a single and centralized devops team.
Decentralized Data Product Teams
In an organization adopting Data Mesh principles, Jikkou can be leveraged in a decentralized way by each of your Data Teams to manage all the assets (e.g. Topics, ACLs, Schemas, Connectors, etc.) necessary to expose and manage their Data Products.
Can I Use Jikkou with my Apache Kafka vendor ?
Jikkou can be used any Apache Kafka infrastructures, including:
2 - Install Jikkou
Jikkou can be installed either from source, or from releases.
From SDKMan! (recommended)
The latest stable release of jikkou (x86) for Linux, and macOS can be retrieved via SDKMan!:
sdk install jikkou
From The Jikkou Project
Releases
Every release
released versions of Jikkou is available:
- As a zip/tar.gz package from GitHub Releases (for Linux, MacOS)
- As a fatJar available from Maven Central
- As a docker image available from Docker Hub.
These are the official ways to get Jikkou releases that you manually downloaded and installed.
Install From Release distribution
- Download your desired version
- Unpack it (
unzip jikkou-0.34.0-linux-x86_64.zip
) - Move the unpacked directory to the desired destination (
mv jikkou-0.34.0-linux-x86_64 /opt/jikkou
) - Add the executable to your PATH (
export PATH=$PATH:/opt/jikkou/bin
)
From there, you should be able to run the client: jikkou help
.
It is recommended to install the bash/zsh completion script jikkou_completion
:
wget https://raw.githubusercontent.com/streamthoughts/jikkou/master/jikkou_completion . jikkou_completion
or alternatively, run the following command for generation the completion script.
$ source <(jikkou generate-completion)
Using Docker Image
# Create a Jikkou configfile (i.e., jikkouconfig)
cat << EOF >jikkouconfig
{
"currentContext" : "localhost",
"localhost" : {
"configFile" : null,
"configProps" : {
"kafka.client.bootstrap.servers" : "localhost:9092"
}
}
}
EOF
# Run Docker
docker run -it \
--net host \
--mount type=bind,source="$(pwd)"/jikkouconfig,target=/etc/jikkou/config \
streamthoughts/jikkou:latest -V
Development Builds
In addition to releases you can download or install development snapshots of Jikkou.
From Docker Hub
Docker images are built and push to Docker Hub from the latest main
branch.
They are not official releases, and may not be stable.
However, they offer the opportunity to test the cutting edge features.
$ docker run -it streamthoughts/jikkou:main
From Source (Linux, macOS)
Building Jikkou from source is slightly more work, but is the best way to go if you want to test the latest ( pre-release) Jikkou version.
Prerequisites
To build the project you will need:
- Java 21 (i.e.
$JAVA_HOME
environment variable is configured). - GraalVM 22.1.0 or newer to create native executable
- TestContainer to run integration tests
Create Native Executable
# Build and run all tests
./mvnw clean verify -Pnative
You can then execute the native executable with: ./jikkou-cli/target/jikkou-$PROJECT_VERSION-runner
Build Debian Package (.deb)
# Build and run all tests
./mvnw clean package -Pnative
./mvnw package -Pdeb
You can then install the package with: sudo dpkg -i ./dist/jikkou-$PROJECT_VERSION-linux-x86_64.deb
NOTE: Jikkou will install itself in the directory : /opt/jikkou
Build RPM Package
# Build and run all tests
./mvnw clean package -Pnative
./mvnw package -Prpm
The RPM package will available in the ./target/rpm/jikkou/RPMS/noarch/
directory.
3 - Jikkou Tutorials
Try the tutorials for common Jikkou tasks and use cases.
3.1 - Jikkou Getting Started
This document will guide you through setting up Jikkou in a few minutes and managing your first resources with Jikkou.
Prerequisites
The following prerequisites are required for a successful and properly use of Jikkou.
Make sure the following is installed:
- An Apache Kafka cluster.
- Using Docker, Docker Compose is the easiest way to use it.
- Java 21 (not required when using the binary version).
Start your local Apache Kafka Cluster
You must have access to an Apache Kafka cluster for using Jikkou. Most of the time, the latest version of Jikkou is always built for working with the most recent version of Apache Kafka.
Make sure the Docker is up and running.
Then, run the following commands:
$ git clone https://github.com/streamthoughts/jikkou
$ cd jikkou
$ ./up # use ./down for stopping the docker-compose stack
Run Jikkou
Download the latest distribution (For Linux)
Run the following commands to install the latest version:
wget https://github.com/streamthoughts/jikkou/releases/download/v0.34.0/jikkou-0.34.0-linux-x86_64.zip && \
unzip jikkou-0.34.0-linux-x86_64.zip && \
cp jikkou-0.34.0-linux-x86_64/bin/jikkou $HOME/.local/bin && \
source <(jikkou generate-completion) && \
jikkou --version
For more details, or for other options, see the installation guide.
Configure Jikkou for your local Apache Kafka cluster
Set configuration context for localhost
jikkou config set-context localhost --config-props=kafka.client.bootstrap.servers=localhost:9092
Show the complete configuration.
jikkou config view --name localhost
Finally, let’s check if your cluster is accessible:
jikkou health get kafka
(output)
If OK, you should get an output similar to :
---
name: "kafka"
status: "UP"
details:
resource: "urn:kafka:cluster:id:KRzY-7iRTHy4d1UVyNlcuw"
brokers:
- id: "1"
host: "localhost"
port: 9092
Create your first topics
First, create a resource YAML file describing the topics you want to create on your cluster:
file: kafka-topics.yaml
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaTopicList"
items:
- metadata:
name: 'my-first-topic'
spec:
partitions: 5
replicationFactor: 1
configs:
cleanup.policy: 'compact'
- metadata:
name: 'my-second-topic'
spec:
partitions: 4
replicationFactor: 1
configs:
cleanup.policy: 'delete'
Then, run the following Jikkou command to trigger the topic creation on the cluster:
jikkou create -f ./kafka-topics.yaml
(output)
TASK [ADD] Add topic 'my-first-topic' (partitions=5, replicas=-1, configs=[cleanup.policy=compact]) - CHANGED
{
"changed": true,
"end": 1683986528117,
"resource": {
"name": "my-first-topic",
"partitions": {
"after": 5
},
"replicas": {
"after": -1
},
"configs": {
"cleanup.policy": {
"after": "compact",
"operation": "ADD"
}
},
"operation": "ADD"
},
"failed": false,
"status": "CHANGED"
}
TASK [ADD] Add topic 'my-second-topic' (partitions=4, replicas=-1, configs=[cleanup.policy=delete]) - CHANGED
{
"changed": true,
"end": 1683986528117,
"resource": {
"name": "my-second-topic",
"partitions": {
"after": 4
},
"replicas": {
"after": -1
},
"configs": {
"cleanup.policy": {
"after": "delete",
"operation": "ADD"
}
},
"operation": "ADD"
},
"failed": false,
"status": "CHANGED"
}
EXECUTION in 772ms
ok:
0, created:
2, altered:
0, deleted:
0 failed: 0
Tips
In the above command, we chose to use thecreate
command to create the new topics.
But we could just as easily use the update
or apply
command to get the same result depending on our needs.Finally, you can verify that topics are created on the cluster
jikkou get kafkatopics --default-configs
Tips
We use the--default-configs
to export built-in default configuration for configs that have a default value.Update Kafka Topics
Edit your kafka-topics.yaml
to add a retention.ms: 86400000
property to the defined topics.
Then, run the following command.
jikkou update -f ./kafka-topics.yaml
Delete Kafka Topics
To delete all topics defines in the topics.yaml
, add an annotation jikkou.io/delete: true
as follows:
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaTopicList"
metadata:
annotations:
# Annotation to specify that all resources must be deleted.
jikkou.io/delete: true
items:
- metadata:
name: 'my-first-topic'
spec:
partitions: 5
replicationFactor: 1
configs:
cleanup.policy: 'compact'
- metadata:
name: 'my-second-topic'
spec:
partitions: 4
replicationFactor: 1
configs:
cleanup.policy: 'delete'
Then, run the following command:
$ jikkou apply \
--files ./kafka-topics.yaml \
--selector "metadata.name MATCHES (my-.*-topic)" \
--dry-run
Using the dry-run
option, give you the possibility to check the changes that will be made before applying them.
Now, rerun the above command without the --dry-run
option to definitively delete the topics.
Recommendation
When working in a production environment, we strongly recommend running commands with a--selector
option to ensure
that changes are only applied to a specific set of resources. Also, always run your command in --dry-run
mode to
verify the changes
that will be executed by Jikkou before continuing.Reading the Help
To learn more about the available Jikkou commands, use jikkou help
or type a command followed by the -h
flag:
$ jikkou help get
Next Steps
Now, you’re ready to use Jikkou!🚀
As next steps, we suggest reading the following documentation in this order:
4 - Jikkou CLI Documentation
Hands-on: Try the Jikkou: Get Started tutorials.
4.1 - Basic CLI Features
Hands-on: Try the Jikkou: Get Started tutorials.
The command line interface to Jikkou is the jikkou
command, which accepts a variety of subcommands such as
jikkou apply
or jikkou validate
.
To view a list of the commands available in your current Jikkou version, run jikkou
with no additional arguments:
Usage:
jikkou [-hV] [--logger-level=<level>] [COMMAND]
Jikkou CLI:: A command-line client designed to provide an efficient and easy way to manage, automate, and provision all the assets of your data infrastructure.
Find more information at: https://streamthoughts.github.io/jikkou/.
OPTIONS:
-h, --help Show this help message and exit.
--logger-level=<level>
Specify the log level verbosity to be used while running a command.
Valid level values are: TRACE, DEBUG, INFO, WARN, ERROR.
For example, `--logger-level=INFO`
-V, --version Print version information and exit.
CORE COMMANDS:
apply Update the resources as described by the resource definition files.
create Create resources from the resource definition files (only non-existing resources will be created).
delete Delete resources that are no longer described by the resource definition files.
diff Show changes required by the current resource definitions.
get Display one or many specific resources.
prepare Prepare the resource definition files for validation.
update Create or update resources from the resource definition files
validate Check whether the resources definitions meet all validation requirements.
SYSTEM MANAGEMENT COMMANDS:
action List/execute actions.
health Print or describe health indicators.
ADDITIONAL COMMANDS:
api-extensions Print the supported API extensions
api-resources Print the supported API resources
config Sets or retrieves the configuration of this client
generate-completion Generate bash/zsh completion script for jikkou.
help Display help information about the specified command.
(The output from your current Jikkou version may be different than the above example.)
Checking Jikkou Version
Run the jikkou --version
to display your current installation version:
Jikkou version "0.32.0" 2023-11-28
JVM: 21.0.1 (GraalVM Community Substrate VM 21.0.1+12)
Shell Tab-completion
It is recommended to install the bash/zsh completion script jikkou_completion
.
The completion script can be downloaded from the project Github repository:
wget https://raw.githubusercontent.com/streamthoughts/jikkou/main/jikkou_completion . jikkou_completion
or alternatively, you can run the following command to generate it.
source <(jikkou generate-completion)
4.2 - CLI Configuration
Hands-on: Try the Jikkou: Get Started tutorials.
Configuration
To set up the configuration settings used by Jikkou CLI, you will need create a jikkou config file, which is created automatically when you create a configuration context using:
jikkou config set-context <context-name> [--config-file=<config-gile>] [--config-props=<config-value>]
By default, the configuration of jikkou
is located under the path $HOME/.jikkou/config
.
This jikkou config file defines all the contexts that can be used by jikkou CLI.
For example, below is the config file created during the Getting Started.
{
"currentContext": "localhost",
"localhost": {
"configFile": null,
"configProps": {
"kafka.client.bootstrap.servers": "localhost:9092"
}
}
}
Most of the time, a context does not directly contain the configuration properties to be used, but rather points to a
specific HOCON (Human-Optimized Config Object Notation) through the configFile
property.
Then, the configProps
allows you to override some of the property define by this file.
In addition, if no configuration file path is specified, Jikkou will lookup for an application.conf
to
those following locations:
./application.conf
$HOME/.jikkou/application.conf
Finally, Jikkou always fallback to a reference.conf file that you can use as a template to define your own configuration.
reference.conf:
jikkou {
extension.providers {
# By default, disable all extensions
default.enabled: true
# Explicitly enabled/disable extensions
#<provider_name>.enabled: <boolean>
# schemaregistry.enabled = true
# kafka.enabled = true
# aiven.enabled = true
# kafkaconnect.enabled = true
}
# Configure Jikkou Proxy Mode
# proxy {
# url = "http://localhost:8080"
# }
# Kafka Extension
kafka {
# The default Kafka Client configuration
client {
bootstrap.servers = "localhost:9092"
bootstrap.servers = ${?JIKKOU_DEFAULT_KAFKA_BOOTSTRAP_SERVERS}
}
brokers {
# If 'True'
waitForEnabled = true
waitForEnabled = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_ENABLED}
# The minimal number of brokers that should be alive for the CLI stops waiting.
waitForMinAvailable = 1
waitForMinAvailable = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_MIN_AVAILABLE}
# The amount of time to wait before verifying that brokers are available.
waitForRetryBackoffMs = 1000
waitForRetryBackoffMs = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_RETRY_BACKOFF_MS}
# Wait until brokers are available or this timeout is reached.
waitForTimeoutMs = 60000
waitForTimeoutMs = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_TIMEOUT_MS}
}
}
schemaRegistry {
url = "http://localhost:8081"
url = ${?JIKKOU_DEFAULT_SCHEMA_REGISTRY_URL}
}
# The default custom transformations to apply on any resources.
transformations = []
# The default custom validations to apply on any resources.
validations = [
{
name = "topicMustHaveValidName"
type = io.streamthoughts.jikkou.kafka.validation.TopicNameRegexValidation
priority = 100
config = {
topicNameRegex = "[a-zA-Z0-9\\._\\-]+"
topicNameRegex = ${?VALIDATION_DEFAULT_TOPIC_NAME_REGEX}
}
},
{
name = "topicMustHavePartitionsEqualsOrGreaterThanOne"
type = io.streamthoughts.jikkou.kafka.validation.TopicMinNumPartitionsValidation
priority = 100
config = {
topicMinNumPartitions = 1
topicMinNumPartitions = ${?VALIDATION_DEFAULT_TOPIC_MIN_NUM_PARTITIONS}
}
},
{
name = "topicMustHaveReplicasEqualsOrGreaterThanOne"
type = io.streamthoughts.jikkou.kafka.validation.TopicMinReplicationFactorValidation
priority = 100
config = {
topicMinReplicationFactor = 1
topicMinReplicationFactor = ${?VALIDATION_DEFAULT_TOPIC_MIN_REPLICATION_FACTOR}
}
}
]
# The default custom reporters to report applied changes.
reporters = [
# Uncomment following lines to enable default kafka reporter
# {
# name = "default"
# type = io.streamthoughts.jikkou.kafka.reporter.KafkaChangeReporter
# config = {
# event.source = "jikkou/cli"
# kafka = {
# topic.creation.enabled = true
# topic.creation.defaultReplicationFactor = 1
# topic.name = "jikkou-resource-change-event"
# client = ${jikkou.kafka.client} {
# client.id = "jikkou-reporter-producer"
# }
# }
# }
# }
]
}
Listing Contexts
$ jikkou config get-contexts
NAME
localhost *
development
staging
production
Verify Current Context
You can use jikkou config current-context
command to show the context currently used by Jikkou CLI.
$ jikkou config current-context
Using context 'localhost'
KEY VALUE
ConfigFile
ConfigProps {"kafka.client.bootstrap.servers": "localhost:9092"}
Verify Current Configuration
You can use jikkou config view
command to show the configuration currently used by Jikkou CLI.
Tips
To debug the configuration use by Jikkou, you can run the following command:jikkou config view --comments
or jikkou config view --debug
4.3 - Automating Jikkou
4.3.1 - Automate Jikkou with GitHub Actions
Setup Jikkou
The streamthoughts/setup-jikkou
action is a JavaScript action that
sets up Jikkou in your GitHub Actions workflow by:
- Downloading a specific version of Jikkou CLI and adding it to the
PATH
. - Configuring JIKKOU CLI with a custom configuration file.
After you’ve used the action, subsequent steps in the same job can run arbitrary Jikkou commands using the GitHub Actions run syntax. This allows most Jikkou commands to work exactly like they do on your local command line.
Usage
steps:
- uses: streamthoughts/setup-jikkou@v1
A specific version of Jikkou CLI can be installed:
steps:
- uses: streamthoughts/setup-jikkou@v0.1.0
with:
jikkou_version: 0.29.0
A custom configuration file can be specified:
steps:
- uses: streamthoughts/setup-jikkou@v0.1.0
with:
jikkou_config: ./config/jikkouconfig.json
Inputs
This Action additionally supports the following inputs :
Property | Default | Description |
---|---|---|
jikkou_version | latest | The version of Jikkou CLI to install. A value of latest will install the latest version of Jikkou CLI. |
jikkou_config | The path to the Jikkou CLI config file. If set, Jikkou CLI will be configured through the JIKKOUCONFIG environment variable. |
5 - Jikkou API Server Documentation
Jikkou API Server provides a REST interface to any platform supported by Jikkou, making it even easier to manage, automate and visualise all your data platform assets.
Jikkou CLI can be used in combination with Jikkou API Server by configuring it in proxy mode. In this mode, the CLI no longer connects directly to your various platforms, but forwards all operations to the API server. This deployment method allows you to enhance the overall security of the platforms managed through Jikkou.
5.1 - Install Jikkou API Server
Releases
The latest stable release of Jikkou API Server is available:
- As a Java binary distribution (.zip) from GitHub Releases
- As a docker image available from Docker Hub.
Standalone Installation
Follow these few steps to download the latest stable versions and get started.
Prerequisites
To be able to run Jikkou API Server, the only requirement is to have a working Java 21 installation. You can check the correct installation of Java by issuing the following command:
java -version
Step 1: Download
Download the latest Java binary distribution from the GitHub Releases (e.g. jikkou-api-server-0.31.0.zip
)
Unpack the download distribution and move the unpacked directory to a desired destination
unzip jikkou-api-server-$LATEST_VERSION.zip
mv jikkou-api-server-$LATEST_VERSION /opt/jikkou
Step 2: Start the API Server
Launch the application with:
./bin/jikkou-api-server.sh
Step 3: Test the API Server
$ curl -sX GET http://localhost:28082 -H "Accept: application/json" | jq
{
"version": "0.31.0",
"build_time": "2023-11-14T18:07:38+0000",
"commit_id": "dae1be11c092256f36c18c8f1d90f16b0c951716",
"_links": {
"self": {
"href": "/",
"templated": false
},
"get-apis": {
"href": "/apis",
"templated": false
}
}
}
Step 4: Stop the API Server
PID=`ps -ef | grep -v grep | grep JikkouApiServer | awk '{print $2}'`
kill $PID
Docker
# Run Docker
docker run -it \
--net host \
streamthoughts/jikkou-api-server:latest
Development Builds
In addition to releases you can download or install development snapshots of Jikkou API Server.
From Docker Hub
Docker images are built and push to Docker Hub from the latest main
branch.
They are not official releases, and may not be stable. However, they offer the opportunity to test the cutting edge features.
$ docker run -it streamthoughts/jikkou-api-server:main
5.2 - Configurations
Jikkou API Server is built with Micronaut Framework.
The default configuration file is located in the installation directory of you server under the
path /etc/application.yaml
.
You can either modify this configuration file directly or create a new one.
Then, your configuration file path can be targeted through the MICRONAUT_CONFIG_FILES
environment variable.
A YAML Configuration file example can be found here: application.yaml
Note
For more information about how to configure the application, we recommend you to read the official Micronaut documentation (see: Application Configuration).5.2.1 - API Server
Running Server on a Specific Port
By default, the server runs on port 28082
. However, you can set the server to run on a specific port:
# ./etc/application.yaml
micronaut:
server:
port: 80 # Port used to access APIs
endpoints:
all:
port: 80 # Port used to access Health endpoints
Enabling Specific Extension Providers
By default, the server is configured to run only with the core
and kafka
extension providers.
However, you can enable (or disable) additional providers:
jikkou:
extensions.provider:
# By default, disable all extension providers.
default.enabled: false
# Explicitly enabled/disable an extension provider
#<provider_name>.enabled: <boolean>
core.enabled: true
kafka.enabled: true
# schemaregistry.enabled: true
# aiven.enabled: true
# kafkaconnect.enabled: true
5.2.2 - Authentication
Enable Security
To enable secure access to the API Server:
Configuration File
Update the configuration file (i.e., application.yaml
) of the server with:
micronaut:
security:
enabled: true
Environment Variable
As an alternative, you can set the following environment variable MICRONAUT_SECUTIRY_ENABLED=true
.
Note
For more information about how Micronaut binds environment variables and configuration property: https://docs.micronaut.io/latest/guide/index.html#_property_value_binding).Unauthorized Access
When accessing a secured path, the server will return the following response if access is not authorized:
{
"message": "Unauthorized",
"errors": [
{
"status": 401,
"error_code": "authentication_user_unauthorized",
"message": "Unauthorized"
}
]
}
5.2.2.1 - Basic Auth
Jikkou API Server can be secured using a Basic HTTP Authentication Scheme.
RFC7617 defines the “Basic” Hypertext Transfer Protocol (HTTP) authentication scheme, which transmits credentials as user-id/password pairs, encoded using Base64.
Basic Authentication should be used over a secured connection using HTTPS.
Configure Basic HTTP Authentication
Step1: Enable security
Add the following configuration to your server configuration.
# ./etc/application.yaml
micronaut:
security:
enabled: true
Step2: Configure the list of users
The list of username/password
authorized to connect to the API server can be configured as follows:
# ./etc/application.yaml
jikkou:
security:
basic-auth:
- username: "admin"
password: "{noop}password"
For production environment, password must not be configured in plaintext. Password can be passed encoded
in bcrypt
, scrypt
, argon2
, and sha256
.
Example
echo -n password | sha256sum
5e884898da28047151d0e56f8dc6292773603d0d6aabbdd62a11ef721d1542d8
# ./etc/application.yaml
jikkou:
security:
basic-auth:
- username: "admin"
password: "{sha256}5e884898da28047151d0e56f8dc6292773603d0d6aabbdd62a11ef721d1542d8"
Step3: Validate authentication
Encode credentials
echo -n "admin:password" | base64
YWRtaW46cGFzc3dvcmQ=
Send request
curl -IX GET http://localhost:28082/apis/kafka.jikkou.io/v1beta2/kafkabrokers \
-H "Accept: application/json" \
-H "Authorization: Basic YWRtaW46cGFzc3dvcmQ"
HTTP/1.1 200 OK
Content-Type: application/hal+json
content-length: 576
5.2.2.2 - JWT
Jikkou API Server can be secured using JWT (JSON Web Token) Authentication.
Configure JWT
Step1: Set JWT signature secret
Add the following configuration to your server configuration.
# ./etc/application.yaml
micronaut:
security:
enabled: true
authentication: bearer <1>
token:
enabled: true
jwt:
signatures:
secret:
generator:
secret: ${JWT_GENERATOR_SIGNATURE_SECRET:pleaseChangeThisSecretForANewOne} <2>
- <1> Set authentication to bearer to receive a JSON response from the login endpoint.
- <2> Change this to your own secret and keep it safe (do not store this in your VCS).
Step2: Generate a Token
Generate a valid JSON Web Token on https://jwt.io/
using your secret.
Example with pleaseChangeThisSecretForANewOne
as signature secret.
TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.6cD3MnZmX2xyEAWyh-GgGD11TX8SmvmHVLknuAIJ8yE
Step3: Validate authentication
$ curl -I -X GET http://localhost:28082/apis/kafka.jikkou.io/v1beta2/kafkabrokers \
-H "Accept: application/json" \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.6cD3MnZmX2xyEAWyh-GgGD11TX8SmvmHVLknuAIJ8yE"
HTTP/1.1 200 OK
Content-Type: application/hal+json
content-length: 576
5.2.3 - CLI Proxy Mode
Configuration
Step 1: Enable Proxy Mode
To enable proxy mode so that the CLI communicates directly with your API Server, add the following parameters to your configuration:
jikkou {
# Proxy Configuration
proxy {
# Specify whether proxy mode is enabled (default: false).
enabled = true
# URL of the API Server
url = "http://localhost:28082"
# Specifcy whether HTTP request debugging should be enabled (default: false)
debugging = false
# The connect timeout in millisecond (if not configured used ` default-timeout` ).
connect-timeout = 10000
# The read timeout in millisecond (if not configured used ` default-timeout` ).
read-timeout = 10000
# The write timeout in millisecond (if not configured used ` default-timeout` ).
write-timeout = 10000
# The default timeout (i.e., for read/connect) in millisecond (default: 10000)
default-timeout = 10000
# Security settings to authenticate to the API Server.
security = {
# For Token based Authentication.
# access-token = ""
# For Username/Password Basic-Authentication.
# basic-auth = {
# username = ""
# password = ""
# }
}
}
}
Step 2: Check connection
When enabling Proxy Mode, Jikkou CLI provides the additional command server-info
. You can use it to verify the
connectivity with teh server.
$ jikkou server-info -o JSON | jq
{
"version": "0.31.0",
"build_time": "2023-11-15T10:35:22+0100",
"commit_id": "f3384d38e606fb32599c175895d0cbef28258540"
}
5.3 - Jikkou - API References
6 - Concepts
This section explains key concepts used within Jikkou:
6.1 - Resource
Jikkou Resources are entities that represent the state of a concrete instance of a concept that are part of the state of your system, like a Topic on an Apache Kafka cluster.
Resource Objects
All resources can be distinguished between persistent objects, which are used to describe the desired state of your system, and transient objects, which are only used to enrich or provide additional capabilities for the definition of persistent objects.
A resource is an object with a type (called a Kind) and a concrete model that describe the associated data. All resource are scoped by an API Group and Version.
Resource Definition
Resources are described in YAML format.
Here is a sample resource that described a Kafka Topic.
apiVersion: "kafka.jikkou.io/v1beta2"
kind: KafkaTopic
metadata:
name: 'my-topic'
labels:
environment: test
annotations: {}
spec:
partitions: 1
replicas: 1
configs:
min.insync.replicas: 1
cleanup.policy: 'delete'
Resource Properties
The following are the properties that can be set to describe a resource:
Property | Description |
---|---|
apiVersion | The group/version of the resource type. |
kind | The type of the describe resource. |
metadata.name | An optional name to identify the resource. |
metadata.labels | Arbitrary metadata to attach to the resource that can be handy when you have a lot of resources and you only need to identity or filters some objects. |
metadata.annotations | Arbitrary non-identifying metadata to attach to the resource to mark them for a specific operation or to record some metadata. |
spec | The object properties describing a desired state |
6.2 - Labels and annotations
Labels
You can use labels to attach arbitrary identifying metadata to objects.
Labels are key/value maps:
metadata:
labels:
"key1": "value-1"
"key2": "value-2"
Note
The keys in the map must be string, but values can be any scalar types (string, boolean, or numeric).Labels are not persistent
Jikkou is completely stateless. In other words, it will not store any state about the describe resources objects. Thus, when retrieving objects from your system labels may not be reattached to the metadata objects.Example
metadata:
labels:
environment: "stating"
Annotations
You can use annotations to attach arbitrary non-identifying metadata to objects.
Annotations are key/value maps:
metadata:
annotations:
key1: "value-1"
key2: "value-2"
Note
The keys in the map must be string, but the values can be of any scalar types (string, boolean, or numeric).Built-in Annotations
jikkou.io/ignore
Used on: All Objects.
This annotation indicates whether the object should be ignored for reconciliation.
jikkou.io/bypass-validations
Used on: All Objects.
This annotation indicates whether the object should bypass the validation chain. In other words, no validations will be applied on the object.
jikkou.io/delete
Used on: All Objects.
This annotation indicates (when set to true
) that the object should be deleted from your system.
jikkou.io/resource-location
Used by jikkou.
This annotation is automatically added by Jikkou to an object when loaded from your local filesystem.
jikkou.io/items-count
Used by jikkou.
This annotation is automatically added by Jikkou to an object collection grouping several resources of homogeneous type.
6.3 - Reconciliation
In the context of Jikkou, reconciliation refers to the process of comparing the desired state of an object with the actual state of the system and making any necessary corrections or adjustments to align them.
Changes
A Change represents a difference, detected during reconciliation, between two objects that can reconciled or corrected by adding, updating, or deleting an object or property attached to the actual state of the system.
A Change represents a detected difference between two objects during the reconciliation process. These differences can be reconciled or corrected by adding, updating, or deleting an object or property associated with the actual state of the system
Jikkou identifies four types of changes:
ADD: Indicates the addition of a new object or property to an existing object.
UPDATE: Indicates modifications made to an existing object or property of an existing object.
DELETE: Indicates the removal of an existing object or property of an existing object.
NONE: Indicates that no changes were made to an existing object or property.
Reconciliation Modes
Depending on the chosen reconciliation mode, only specific types of changes will be applied.
Jikkou provides four distinct reconciliation modes that determine the types of changes to be applied:
CREATE
: This mode only applies changes that create new resource objects in your system.DELETE
: This mode only applies changes that delete existing resource objects in your system.UPDATE
: This mode only applies changes that create or update existing resource objects in your system.APPLY_ALL
: This mode applies all changes to ensure that the actual state of a resource in the cluster matches the desired state defined in your resource definition file, regardless of the specific type of change.
Each mode corresponds to a command offered by the Jikkou CLI (i.e., create
, update
, delete
, and apply
). Choose
the appropriate mode based on your requirements.
Using JIKKOU CLI
Some reconciliation modes might not be supported for all resources. Usejikkou extensions list --type Controller
to check which actions could be perfomed for each resources.Reconciliation Options
Depending on the type of resources being reconciled, the controller that will be involved in the reconciliation
process might accept some options (i.e., using --options
argument).
Mark Resource for Deletion
To delete all the states associated with resource’s entities, you must add the following annotation to the resource definition:
metadata:
annotations:
jikkou.io/delete: true
6.4 - Selectors
Selectors allows you to include or exclude some resource objects from being returned or reconciled by Jikkou.
Selector Expressions
Selectors are passed as arguments to Jikkou as expression strings in the following form:
<SELECTOR>: <KEY> <OPERATOR> VALUE
<SELECTOR>: <KEY> <OPERATOR> (VALUE[, VALUES])
or (using default field selector):
<KEY> <OPERATOR> VALUE
<KEY> <OPERATOR> (VALUE[, VALUES])
Selectors
Field (default)
Jikkou packs with a built-in FieldSelector
allowing to filter resource objects based on a field key.
For example, the expression below shows you how to select only resource having a label environement
equals to
either staging
or production
.
metadata.labels.environement IN (staging, production)
Note: In the above example, we have omitted the selector because field
is the default selector.
Expression Operators
Five kinds of operators are supported:
- IN
- NOTIN
- EXISTS
- MATCHES
- DOESNOTMATCH
Using JIKKOU CLI
Selectors can be specified via the Jikkou CLI option:--selector
.Matching Strategies
Jikkou allows you to use multiple selector expressions. To indicate how these expressions are to be combined, you can pass one of the following matching strategies:
ALL
: A resource is selected if it matches all selectors.ANY
: A resource is selected if it matches one of the selectors.NONE
: A resource is selected if it matches none of the selectors.
Example:
jikkou get kafkatopics \
--selector 'metadata.name MATCHES (^__.*)' \
--selector 'metadata.name IN (_schemas)' \
--selector-match ANY
6.5 - Transformations
Transformations are applied to inbound resources. Transformations are used to transform, enrich, or filter resource entities before they are validated and thus before the reconciliation process is executed on them.
Available Transformations
You can list all the available transformations using the Jikkou CLI command:
jikkou extensions list --type=Transformation [-kinds <a resource kind to filter returned results>]
Transformation chain
When using Jikkou CLI, you can configure a transformation chain that will be applied to every resource. This chain consists of multiple transformations, each designed to handle different types of resources. Jikkou ensures that a transformation is executed only for the resource types it supports. In cases where a resource is not accepted by a transformation, it is passed to the next transformation in the chain. This process continues until a suitable transformation is found or until all transformations have been attempted.
Configuration
jikkou {
# The list of transformations to execute
transformations: [
{
# Simple or fully qualified class name of the transformation extension.
type = ""
# Priority to be used for executing this transformation extension.
# The lowest value has the highest priority, so it's run first. Minimum value is -2^31 (highest) and a maximum value is 2^31-1 (lowest).
# Usually, values under 0 should be reserved for internal transformation extensions.
priority = 0
config = {
# Configuration properties for this transformation
}
}
]
}
Tips
Theconfig
object of a Transformation always fallback on the top-level jikkou
config. This allows you to globally
declare some properties of the validation configuration.Example
jikkou {
# The list of transformations to execute
transformations: [
{
# Enforce a minimum number of replicas for a kafka topic
type = KafkaTopicMinReplicasTransformation
priority = 100
config = {
minReplicationFactor = 4
}
},
{
# Enforce a {@code min.insync.replicas} for a kafka topic.
type = KafkaTopicMinInSyncReplicasTransformation
priority = 100
config = {
minInSyncReplicas = 2
}
}
]
}
6.6 - Validations
Validations are applied to inbound resources to ensure that the resource entities adhere to specific rules or constraints. These validations are carried out after the execution of the transformation chain and before the reconciliation process takes place.
Available Validations
You can list all the available validations using the Jikkou CLI command:
jikkou api-extensions list --category=validation [--kinds <a resource kind to filter returned results>]
Validation chain
When using Jikkou CLI, you can configure a validation chain that will be applied to every resource. This chain consists of multiple validations, each designed to handle different types of resources. Jikkou ensures that a validation is executed only for the resource types it supports. In cases where a resource is not accepted by a validation, it is passed to the next validation in the chain. This process continues until a suitable validation is found or until all validations have been attempted.
Configuration
jikkou {
# The list of validations to execute
validations: [
{
# Custom name for the validation rule
name = ""
# Simple or fully qualified class name of the validation extension.
type = ""
config = {
# Configuration properties for this validation
}
}
]
}
Tips
Theconfig
object of a Validation always fallback on the top-level jikkou
config. This allows you to globally
declare some properties of the validation configuration.Example
jikkou {
# The list of transformations to execute
validations: [
{
# Custom name for the validation rule
name = topicMustBePrefixedWithRegion
# Simple or fully qualified class name of the validation extension.
type = TopicNameRegexValidation
# The config values that will be passed to the validation.
config = {
topicNameRegex = "(europe|northamerica|asiapacific)-.+"
}
}
]
}
6.7 - Template
Template helps you to dynamically define resource definition files from external data.
Template Engine
Jikkou provides a simple templating mechanism based-on Jinjava, a Jinja template engine for Java.
Read the official documentation of Jinja to learn more about the syntax and semantics of the template engine.
How Does It Work ?
Jikkou performs the rendering of your template in two phases:
- First, an initial rendering is performed using only the
values
andlabels
passed through the command-lines arguments.- Thus, it is perfectly OK if your resource file is not initially a valid YAML file.
- Then, a second and final rendering is performed after parsing the YAML resource file using the additional
values
andlabels
as defined into the YAML resource file.- Therefore, it’s important that your resource file is converted into a valid YAML file after the first rendering.
Important
You should use{% raw %}...{% endraw %}
tags to ensure the variables defined into the template
are not be
interpreted during the first rendering.Variables
Jikkou defines a number of top-level variables that are passed to the template engine.
values
:- The values passed into the template through the command-line
--values-files
and/or--set-value
arguments - In addition, values can be defined into the
application.conf
file and directly into the template file using the propertytemplate.values
. - By default,
values
is empty.
- The values passed into the template through the command-line
labels
:- The labels passed into the template through the command-line argument:
--set-label
. - In addition, labels can be defined into the template file using the property
metadata.labels
. - By default,
labels
is empty.
- The labels passed into the template through the command-line argument:
system.env
:- This provides access to all environment variables.
system.props
:- This provides access to all system properties.
Template Values
When using templating, a resource definition file may contain the additional property template
.
fields:
apiVersion: The api version (required)
kind: The resource kind (required)
metadata:
labels: The set of key/value pairs that you can use to describe your resource file (optional)
annotations: The set of key/value pairs automatically generated by the tool (optional)
template:
values: The set of key/value pairs to be passed to the template engine (optional)
spec: Specification of the resource
Values Data File
Values Data File are used to define all the necessary values (i.e., the variables) to be used for generating a template.
Example
# file: ./values.yaml
topicConfigs:
partitions: 4
replicas: 3
topicPrefix: "{{ system.env.TOPIC_PREFIX | default('test', true) }}"
countryCodes:
- fr
- be
- de
- es
- uk
- us
Template Resource File
Example
# file: ./kafka-topics.tpl
apiVersion: 'kafka.jikkou.io/v1beta2'
kind: 'KafkaTopicList'
items:
{ % for country in values.countryCodes % }
- metadata:
name: "{{ values.topicPrefix}}-iot-events-{{ country }}"
spec:
partitions: { { values.topicConfigs.partitions } }
replicas: { { values.topicConfigs.replicas } }
configMapRefs:
- TopicConfig
{ % endfor % }
---
apiVersion: "core.jikkou.io/v1beta2"
kind: "ConfigMap"
metadata:
name: TopicConfig
template:
values:
default_min_insync_replicas: "{{ values.topicConfigs.replicas | default(3, true) | int | add(-1) }}"
data:
retention.ms: 3600000
max.message.bytes: 20971520
min.insync.replicas: '{% raw %}{{ values.default_min_insync_replicas }}{% endraw %}'
Command
$ TOPIC_PREFIX=local jikkou validate --files topics.tpl --values-files values.yaml
(Output)
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaTopicList"
metadata:
labels: { }
annotations:
jikkou.io/resource-location: "file:///tmp/jikkou/topics.tpl"
spec:
topics:
- metadata:
name: "local-iot-events-fr"
spec:
partitions: 4
replicas: 3
configs:
min.insync.replicas: "2"
retention.ms: 3600000
max.message.bytes: 20971520
- metadata:
name: "local-iot-events-be"
spec:
partitions: 4
replicas: 3
configs:
min.insync.replicas: "2"
retention.ms: 3600000
max.message.bytes: 20971520
- metadata:
name: "local-iot-events-de"
spec:
partitions: 4
replicas: 3
configs:
min.insync.replicas: "2"
max.message.bytes: 20971520
retention.ms: 3600000
- metadata:
name: "local-iot-events-es"
spec:
partitions: 4
replicas: 3
configs:
min.insync.replicas: "2"
max.message.bytes: 20971520
retention.ms: 3600000
- metadata:
name: "local-iot-events-uk"
spec:
partitions: 4
replicas: 3
configs:
min.insync.replicas: "2"
max.message.bytes: 20971520
retention.ms: 3600000
- metadata:
name: "local-iot-events-us"
spec:
partitions: 4
replicas: 3
configs:
min.insync.replicas: "2"
max.message.bytes: 20971520
retention.ms: 3600000
Configuration
jinja {
# Enable/Disable recursive macro calls for rendering
enableRecursiveMacroCalls = false
}
6.8 - Collectors
Collectors are used to collect and describe all entities that exist into your system for a specific resource type.
Available Collectors
You can list all the available collectors using the Jikkou CLI command:
jikkou extensions list --type=Collector [-kinds <a resource kind to filter returned results>]
6.9 - Controllers
Controllers are used to compute and apply changes required to reconcile resources into a managed system.
Available Controllers
You can list all the available controllers using the Jikkou CLI command:
jikkou extensions list --type=Controller [-kinds <a resource kind to filter returned results>]
6.10 - Extensions
Extension Providers
Most of the Jikkou’s features are provided by Jikkou Extension Providers. A provider is a module providing a set of extensions used to manage one or more resources.
Built-in Extension Providers
Jikkou ships with a number of extension providers:
6.11 - Reporters
Reporters can be used to report changes applied by Jikkou to a third-party system.
Configuration
Jikkou allows you to configure multiple reporters as follows:
jikkou {
# The list of reporters to execute
reporters: [
{
# Custom name for the reporter
name = ""
# Simple or fully qualified class name of the transformation extension.
type = ""
config = {
# Configuration properties for this reporter
}
}
]
}
Tips
Theconfig
object passed to a reporter will fallback on the top-level jikkou
config.
This allows you to globally declare some configuration settings.Built-in implementations
Jikkou packs with some built-in ChangeReporter
implementations:
KafkaChangeReporter
The KafkaChangeReporter
can be used to send change results into a given kafka topic. Changes will be published
as Cloud Events.
Configuration
The below example shows how to configure the KafkaChangeReporter
.
jikkou {
# The default custom reporters to report applied changes.
reporters = [
{
name = "kafka-reporter"
type = io.streamthoughts.jikkou.kafka.reporter.KafkaChangeReporter
config = {
# The 'source' of the event that will be generated.
event.source = "jikkou/cli"
kafka = {
# If 'true', topic will be automatically created if it does not already exist.
topic.creation.enabled = true
# The default replication factor used for creating topic.
topic.creation.defaultReplicationFactor = 1
# The name of the topic the events will be sent.
topic.name = "jikkou-resource-change-event"
# The configuration settings for Kafka Producer and AdminClient
client = ${jikkou.kafka.client} {
client.id = "jikkou-reporter-producer"
}
}
}
}
]
}
6.12 - Actions
Actions allow a user to execute a specific and one-shot operation on resources.
Available Actions (CLI)
You can list all the available actions using the Jikkou CLI command:
jikkou api-extensions list --category=action [-kinds <a resource kind to filter returned results>]
Execution Actions (CLI)
You can execute a specific extension using the Jikkou CLI command:
jikkou action <ACTION_NAME> execute [<options>]
7 - Extension Providers
The section helps you learn more about the built-in Extension Providers for Jikkou.
7.1 - Core
Here, you will find information to use the Core extensions.
More information:
7.1.1 - Resources
Here, you will find the list of core resources supported for Jikkou.
Core Resources
More information:
7.1.1.1 - ConfigMap
You can use a ConfigMap
to define reusable data in the form of key/value pairs that can then be referenced and used by
other resources.
Specification
---
apiVersion: "core.jikkou.io/v1beta2"
kind: ConfigMap
metadata:
name: '<CONFIG-MAP-NAME>' # Name of the ConfigMap (required)
data: # Map of key-value pairs (required)
<KEY_1>: "<VALUE_1>"
Example
For example, the below ConfigMap
show how to define default config properties namedcKafkaTopicConfig
that can then
reference and used to define multiple KafkaTopic. resources.
---
apiVersion: "core.jikkou.io/v1beta2"
kind: ConfigMap
metadata:
name: 'KafkaTopicConfig'
data:
cleanup.policy: 'delete'
min.insync.replicas: 2
retention.ms: 86400000 # (1 day)
7.2 - Apache Kafka
Here, you will find information to use the Apache Kafka extensions.
More information:
7.2.1 - Configuration
Here, you will find the list of resources supported for Apache Kafka.
Configuration
The Apache Kafka extension is built on top of the Kafka Admin Client. You can configure the properties to be passed to
kafka client through the Jikkou client configuration property jikkou.kafka.client
.
Example:
jikkou {
kafka {
client {
bootstrap.servers = "localhost:9092"
security.protocol = "SSL"
ssl.keystore.location = "/tmp/client.keystore.p12"
ssl.keystore.password = "password"
ssl.keystore.type = "PKCS12"
ssl.truststore.location = "/tmp/client.truststore.jks"
ssl.truststore.password = "password"
ssl.key.password = "password"
}
}
}
In addition, the extension support configuration settings to wait for at least a minimal number of brokers before processing.
jikkou {
kafka {
brokers {
# If 'True'
waitForEnabled = true
waitForEnabled = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_ENABLED}
# The minimal number of brokers that should be alive for the CLI stops waiting.
waitForMinAvailable = 1
waitForMinAvailable = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_MIN_AVAILABLE}
# The amount of time to wait before verifying that brokers are available.
waitForRetryBackoffMs = 1000
waitForRetryBackoffMs = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_RETRY_BACKOFF_MS}
# Wait until brokers are available or this timeout is reached.
waitForTimeoutMs = 60000
waitForTimeoutMs = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_TIMEOUT_MS}
}
}
}
7.2.2 - Resources
Here, you will find the list of resources supported for Apache Kafka.
Apache Kafka Resources
More information:
7.2.2.1 - Kafka Brokers
This section describes the resource definition format for kafkabrokers
entities, which can be used to define the
brokers you plan to manage on a specific Kafka cluster.
Listing KafkaBroker
You can retrieve the state of Kafka Consumer Groups using the jikkou get kafkabrokers
(or jikkou get kb
) command.
Usage
Usage:
Get all 'KafkaBroker' resources.
jikkou get kafkabrokers [-hV] [--default-configs] [--dynamic-broker-configs]
[--list] [--static-broker-configs]
[--logger-level=<level>] [-o=<format>]
[-s=<expressions>]...
DESCRIPTION:
Use jikkou get kafkabrokers when you want to describe the state of all
resources of type 'KafkaBroker'.
OPTIONS:
--default-configs Describe built-in default configuration for configs
that have a default value.
--dynamic-broker-configs
Describe dynamic configs that are configured as
default for all brokers or for specific broker in
the cluster.
-h, --help Show this help message and exit.
--list Get resources as ResourceListObject.
--logger-level=<level>
Specify the log level verbosity to be used while
running a command.
Valid level values are: TRACE, DEBUG, INFO, WARN,
ERROR.
For example, `--logger-level=INFO`
-o, --output=<format> Prints the output in the specified format. Allowed
values: json, yaml (default yaml).
-s, --selector=<expressions>
The selector expression used for including or
excluding resources.
--static-broker-configs
Describe static configs provided as broker properties
at start up (e.g. server.properties file).
-V, --version Print version information and exit.
(The output from your current Jikkou version may be different from the above example.)
Examples
(command)
$ jikkou get kafkabrokers --static-broker-configs
(output)
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaBroker"
metadata:
name: "101"
labels: {}
annotations:
kafka.jikkou.io/cluster-id: "xtzWWN4bTjitpL3kfd9s5g"
spec:
id: "101"
host: "localhost"
port: 9092
configs:
advertised.listeners: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
authorizer.class.name: "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
broker.id: "101"
controller.listener.names: "CONTROLLER"
controller.quorum.voters: "101@kafka:29093"
inter.broker.listener.name: "PLAINTEXT"
listener.security.protocol.map: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
listeners: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://kafka:29093"
log.dirs: "/var/lib/kafka/data"
node.id: "101"
offsets.topic.replication.factor: "1"
process.roles: "broker,controller"
transaction.state.log.replication.factor: "1"
zookeeper.connect: ""
7.2.2.2 - Kafka Consumer Groups
This section describes the resource definition format for KafkaConsumerGroup
entities, which can be used to define the
consumer groups you plan to manage on a specific Kafka cluster.
Listing KafkaConsumerGroup
You can retrieve the state of Kafka Consumer Groups using the jikkou get kafkaconsumergroups
(or jikkou get kcg
) command.
Usage
$ jikkou get kafkaconsumergroups --help
Usage:
Get all 'KafkaConsumerGroup' resources.
jikkou get kafkaconsumergroups [-hV] [--list] [--offsets]
[--logger-level=<level>] [-o=<format>]
[--in-states=PARAM]... [-s=<expressions>]...
DESCRIPTION:
Use jikkou get kafkaconsumergroups when you want to describe the state of all
resources of type 'KafkaConsumerGroup'.
OPTIONS:
-h, --help Show this help message and exit.
--in-states=PARAM If states is set, only groups in these states will be
returned. Otherwise, all groups are returned. This
operation is supported by brokers with version
2.6.0 or later
--list Get resources as ResourceListObject.
--logger-level=<level>
Specify the log level verbosity to be used while
running a command.
Valid level values are: TRACE, DEBUG, INFO, WARN,
ERROR.
For example, `--logger-level=INFO
-o, --output=<format> Prints the output in the specified format. Allowed
values: json, yaml (default yaml).
--offsets Specify whether consumer group offsets should be
described.
-s, --selector=<expressions>
The selector expression used for including or
excluding resources.
-V, --version Print version information and exit
(The output from your current Jikkou version may be different from the above example.)
Examples
(command)
$ jikkou get kafkaconsumergroups --in-states STABLE --offsets
(output)
---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConsumerGroup"
metadata:
name: "my-group"
labels:
kafka.jikkou.io/is-simple-consumer: false
status:
state: "STABLE"
members:
- memberId: "console-consumer-b103994e-bcd5-4236-9d03-97065057e594"
clientId: "console-consumer"
host: "/127.0.0.1"
assignments:
- "my-topic-0"
offsets:
- topic: "my-topic"
partition: 0
offset: 0
coordinator:
id: "101"
host: "localhost"
port: 9092
7.2.2.3 - Kafka Topics
KafkaTopic resources are used to define the topics you want to manage on your Kafka Cluster(s). A KafkaTopic resource defines the number of partitions, the replication factor, and the configuration properties to be associated to a topics.
KafkaTopic
Specification
Here is the resource definition file for defining a KafkaTopic
.
apiVersion: "kafka.jikkou.io/v1beta2" # The api version (required)
kind: "KafkaTopic" # The resource kind (required)
metadata:
name: <The name of the topic> # (required)
labels: { }
annotations: { }
spec:
partitions: <Number of partitions> # (optional)
replicas: <Number of replicas> # (optional)
configs:
<config_key>: <Config Value> # The topic config properties keyed by name to override (optional)
configMapRefs: [ ] # The list of ConfigMap to be applied to this topic (optional)
The metadata.name
property is mandatory and specifies the name of the kafka topic.
To use the cluster default values for the number of partitions
and replicas
you can set the property
spec.partitions
and spec.replicas
to -1
.
Example
Here is a simple example that shows how to define a single YAML file containing two topic definition using
the KafkaTopic
resource type.
file: kafka-topics.yaml
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: KafkaTopic
metadata:
name: 'my-topic-p1-r1' # Name of the topic
labels:
environment: example
spec:
partitions: 1 # Number of topic partitions (use -1 to use cluster default)
replicas: 1 # Replication factor per partition (use -1 to use cluster default)
configs:
min.insync.replicas: 1
cleanup.policy: 'delete'
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: KafkaTopic
metadata:
name: 'my-topic-p2-r1' # Name of the topic
labels:
environment: example
spec:
partitions: 2 # Number of topic partitions (use -1 to use cluster default)
replicas: 1 # Replication factor per partition (use -1 to use cluster default)
configs:
min.insync.replicas: 1
cleanup.policy: 'delete'
See official Apache Kafka documentation for details about the topic-level configs.
---
lines.KafkaTopicList
If you need to define multiple topics (e.g. using a template), it may be easier to use a KafkaTopicList
resource.
Specification
Here the resource definition file for defining a KafkaTopicList
.
apiVersion: "kafka.jikkou.io/v1beta2" # The api version (required)
kind: "KafkaTopicList" # The resource kind (required)
metadata: # (optional)
labels: { }
annotations: { }
items: [ ] # An array of KafkaTopic
Example
Here is a simple example that shows how to define a single YAML file containing two topic definitions using
the KafkaTopicList
resource type. In addition, the example uses a ConfigMap
object to define the topic configuration
only once.
file: kafka-topics.yaml
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: KafkaTopicList
metadata:
labels:
environment: example
items:
- metadata:
name: 'my-topic-p1-r1'
spec:
partitions: 1
replicas: 1
configMapRefs: [ "TopicConfig" ]
- metadata:
name: 'my-topic-p2-r1'
spec:
partitions: 2
replicas: 1
configMapRefs: [ "TopicConfig" ]
---
apiVersion: "core.jikkou.io/v1beta2"
kind: ConfigMap
metadata:
name: 'TopicConfig'
data:
min.insync.replicas: 1
cleanup.policy: 'delete'
7.2.2.4 - Kafka Users
This section describes the resource definition format for KafkaUser
entities, which can be used to
manage SCRAM Users for Apache Kafka.
Definition Format of KafkaUser
Below is the overall structure of the KafkaUser
resource.
---
apiVersion: kafka.jikkou.io/v1 # The api version (required)
kind: KafkaUser # The resource kind (required)
metadata:
name: <string>
annotations:
# force update
kafka.jikkou.io/force-password-renewal: <boolean>
spec:
authentications:
- type: <enum> # or
password: <string> # leave empty to generate secure password
See below for details about all these fields.
Metadata
metadata.name
[required]
The name of the User.
kafka.jikkou.io/force-password-renewal
[optional]
Specification
spec.authentications
[required]
The list of authentications to manage for the user.
spec.authentications[].type
[required]
The authentication type:
scram-sha-256
scram-sha-512
spec.authentications[].password
[required]
The password of the user.
Examples
The following is an example of a resource describing a User:
---
# Example: file: kafka-scram-users.yaml
apiVersion: "kafka.jikkou.io/v1"
kind: "User"
metadata:
name: "Bob"
spec:
authentications:
- type: scram-sha-256
password: null
- type: scram-sha-512
password: null
Listing Kafka Users
You can retrieve the SCRAM users of a Kafka cluster using the jikkou get kafkausers
(or jikkou get ku
) command.
Usage
$ jikkou get kc --help
Usage:
Get all 'KafkaUser' resources.
jikkou get kafkausers [-hV] [--list] [--logger-level=<level>] [--name=<name>]
[-o=<format>]
[--selector-match=<selectorMatchingStrategy>]
[-s=<expressions>]...
DESCRIPTION:
Use jikkou get kafkausers when you want to describe the state of all resources
of type 'KafkaUser'.
OPTIONS:
-h, --help Show this help message and exit.
--list Get resources as ResourceListObject (default: false).
--logger-level=<level>
Specify the log level verbosity to be used while
running a command.
Valid level values are: TRACE, DEBUG, INFO, WARN,
ERROR.
For example, `--logger-level=INFO`
--name=<name> The name of the resource.
-o, --output=<format> Prints the output in the specified format. Valid
values: JSON, YAML (default: YAML).
-s, --selector=<expressions>
The selector expression used for including or
excluding resources.
--selector-match=<selectorMatchingStrategy>
The selector matching strategy. Valid values: NONE,
ALL, ANY (default: ALL)
-V, --version Print version information and exit.
(The output from your current Jikkou version may be different from the above example.)
Examples
(command)
$ jikkou get ku
(output)
apiVersion: "kafka.jikkou.io/v1"
kind: "KafkaUser"
metadata:
name: "Bob"
labels: {}
annotations:
kafka.jikkou.io/cluster-id: "xtzWWN4bTjitpL3kfd9s5g"
spec:
authentications:
- type: "scram-sha-256"
iterations: 8192
- type: "scram-sha-512"
iterations: 8192
7.2.2.5 - Kafka Authorizations
KafkaPrincipalAuthorization resources are used to define Access Control Lists (ACLs) for principals authenticated to your Kafka Cluster.
Jikkou can be used to describe all ACL policies that need to be created on Kafka Cluster
KafkaPrincipalAuthorization
Specification
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalAuthorization"
metadata:
name: "User:Alice"
spec:
roles: [ ] # List of roles to be added to the principal (optional)
acls: # List of KafkaPrincipalACL (required)
- resource:
type: <The type of the resource> # (required)
pattern: <The pattern to be used for matching resources> # (required)
patternType: <The pattern type> # (required)
type: <The type of this ACL> # ALLOW or DENY (required)
operations: [ ] # Operation that will be allowed or denied (required)
host: <HOST> # IP address from which principal will have access or will be denied (optional)
For more information on how to define authorization and ACLs, see the official Apache Kafka documentation: Security
Operations
The list below describes the valid values for the spec.acls.[].operations
property :
READ
WRITE
CERATE
DELETE
ALTER
DESCRIBE
CLUSTER_ACTION
DESCRIBE_CONFIGS
ALTER_CONFIGS
IDEMPOTENT_WRITE
CREATE_TOKEN
DESCRIBE_TOKENS
ALL
For more information see official Apache Kafka documentation: Operations in Kafka
Resource Types
The list below describes the valid values for the spec.acls.[].resource.type
property :
TOPIC
GROUP
CLUSTER
USER
TRANSACTIONAL_ID
For more information see official Apache Kafka documentation: Resources in Kafka
Pattern Types
The list below describes the valid values for the spec.acls.[].resource.patternType
property :
LITERAL
: Use to allow or denied a principal to have access to a specific resource name.MATCH
: Use to allow or denied a principal to have access to all resources matching the given regex.PREFIXED
: Use to allow or denied a principal to have access to all resources having the given prefix.
Example
---
apiVersion: "kafka.jikkou.io/v1beta2" # The api version (required)
kind: "KafkaPrincipalAuthorization" # The resource kind (required)
metadata:
name: "User:Alice"
spec:
acls:
- resource:
type: 'topic'
pattern: 'my-topic-'
patternType: 'PREFIXED'
type: "ALLOW"
operations: [ 'READ', 'WRITE' ]
host: "*"
- resource:
type: 'topic'
pattern: 'my-other-topic-.*'
patternType: 'MATCH'
type: 'ALLOW'
operations: [ 'READ' ]
host: "*"
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalAuthorization"
metadata:
name: "User:Bob"
spec:
acls:
- resource:
type: 'topic'
pattern: 'my-topic-'
patternType: 'PREFIXED'
type: 'ALLOW'
operations: [ 'READ', 'WRITE' ]
host: "*"
KafkaPrincipalRole
Specification
apiVersion: "kafka.jikkou.io/v1beta2" # The api version (required)
kind: "KafkaPrincipalRole" # The resource kind (required)
metadata:
name: <Name of role> # The name of the role (required)
spec:
acls: [ ] # A list of KafkaPrincipalACL (required)
Example
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalRole"
metadata:
name: "KafkaTopicPublicRead"
spec:
acls:
- type: "ALLOW"
operations: [ 'READ' ]
resource:
type: 'topic'
pattern: '/public-([.-])*/'
patternType: 'MATCH'
host: "*"
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalRole"
metadata:
name: "KafkaTopicPublicWrite"
spec:
acls:
- type: "ALLOW"
operations: [ 'WRITE' ]
resource:
type: 'topic'
pattern: '/public-([.-])*/'
patternType: 'MATCH'
host: "*"
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalAuthorization"
metadata:
name: "User:Alice"
spec:
roles:
- "KafkaTopicPublicRead"
- "KafkaTopicPublicWrite"
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalAuthorization"
metadata:
name: "User:Bob"
spec:
roles:
- "KafkaTopicPublicRead"
7.2.2.6 - Kafka Quotas
KafkaClientQuota resources are used to define the quota limits to be applied on Kafka consumers and producers.
A KafkaClientQuota resource can be used to apply limit to consumers and/or producers identified by a client-id
or a
user principal
.
KafkaClientQuota
Specification
Here is the resource definition file for defining a KafkaClientQuota
.
apiVersion: "kafka.jikkou.io/v1beta2" # The api version (required)
kind: "KafkaClientQuota" # The resource kind (required)
metadata: # (optional)
labels: { }
annotations: { }
spec:
type: <The quota type> # (required)
entity:
clientId: <The id of the client> # (required depending on the quota type).
user: <The principal of the user> # (required depending on the quota type).
configs:
requestPercentage: <The quota in percentage (%) of total requests> # (optional)
producerByteRate: <The quota in bytes for restricting data production> # (optional)
consumerByteRate: <The quota in bytes for restricting data consumption> # (optional)
Quota Types
The list below describes the supported quota types:
USERS_DEFAULT
: Set default quotas for all users.USER
: Set quotas for a specific user principal.USER_CLIENT
: Set quotas for a specific user principal and a specific client-id.USER_ALL_CLIENTS
: Set default quotas for a specific user and all clients.CLIENT
: Set default quotas for a specific client.CLIENTS_DEFAULT
: Set default quotas for all clients.
Example
Here is a simple example that shows how to define a single YAML file containing two quota definitions using
the KafkaClientQuota
resource type.
file: kafka-quotas.yaml
---
apiVersion: 'kafka.jikkou.io/v1beta2'
kind: 'KafkaClientQuota'
metadata:
labels: { }
annotations: { }
spec:
type: 'CLIENT'
entity:
clientId: 'my-client'
configs:
requestPercentage: 10
producerByteRate: 1024
consumerByteRate: 1024
---
apiVersion: 'kafka.jikkou.io/v1beta2'
kind: 'KafkaClientQuota'
metadata:
labels: { }
annotations: { }
spec:
type: 'USER'
entity:
user: 'my-user'
configs:
requestPercentage: 10
producerByteRate: 1024
consumerByteRate: 1024
KafkaClientQuotaList
If you need to define multiple topics (e.g. using a template), it may be easier to use a KafkaClientQuotaList
resource.
Specification
Here the resource definition file for defining a KafkaTopicList
.
apiVersion: "kafka.jikkou.io/v1beta2" # The api version (required)
kind: "KafkaClientQuotaList" # The resource kind (required)
metadata: # (optional)
name: <The name of the topic>
labels: { }
annotations: { }
items: [ ] # An array of KafkaClientQuota
Example
Here is a simple example that shows how to define a single YAML file containing two KafkaClientQuota
definition using
the KafkaClientQuotaList
resource type.
apiVersion: 'kafka.jikkou.io/v1beta2'
kind: 'KafkaClientQuotaList'
metadata:
labels: { }
annotations: { }
items:
- spec:
type: 'CLIENT'
entity:
clientId: 'my-client'
configs:
requestPercentage: 10
producerByteRate: 1024
consumerByteRate: 1024
- spec:
type: 'USER'
entity:
user: 'my-user'
configs:
requestPercentage: 10
producerByteRate: 1024
consumerByteRate: 1024
7.2.2.7 - Kafka Table Records
A KafkaTableRecord resource can be used to produce a key/value record into a given compacted topic, i.e., a topic
with cleanup.policy=compact
(a.k.a. KTable).
KafkaTableRecord
Specification
Here is the resource definition file for defining a KafkaTableRecord
.
apiVersion: "kafka.jikkou.io/v1beta1" # The api version (required)
kind: "KafkaTableRecord" # The resource kind (required)
metadata:
labels: { }
annotations: { }
spec:
type: <string> # The topic name (required)
headers: # The list of headers
- name: <string>
value: <string>
key: # The record-key (required)
type: <string> # The record-key type. Must be one of: BINARY, STRING, JSON (required)
data: # The record-key in JSON serialized form.
$ref: <url or path> # Or an url to a local file containing the JSON string value.
value: # The record-value (required)
type: <string> # The record-value type. Must be one of: BINARY, STRING, JSON (required)
data: # The record-value in JSON serialized form.
$ref: <url or path> # Or an url to a local file containing the JSON string value.
Usage
The KafkaTableRecord
resource has been designed primarily to manage reference data published and shared via Kafka.
Therefore, it
is highly recommended to use this resource only with compacted Kafka topics containing a small amount of data.
Examples
Here are some examples that show how to a KafkaTableRecord
using the different supported data type.
STRING:
---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaTableRecord"
spec:
topic: "my-topic"
headers:
- name: "content-type"
value: "application/text"
key:
type: STRING
data: |
"bar"
value:
type: STRING
data: |
"foo"
JSON:
---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaTableRecord"
spec:
topic: "my-topic"
headers:
- name: "content-type"
value: "application/text"
key:
type: STRING
data: |
"bar"
value:
type: JSON
data: |
{
"foo": "bar"
}
BINARY:
---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaTableRecord"
spec:
topic: "my-topic"
headers:
- name: "content-type"
value: "application/text"
key:
type: STRING
data: |
"bar"
value:
type: BINARY
data: |
"eyJmb28iOiAiYmFyIn0K"
7.2.3 - Transformations
Here, you will find information to use the built-in transformations for Apache Kafka resources.
More information:
7.2.3.1 - KafkaTopicMaxNumPartitions
This transformation can be used to enforce a maximum value for the number of partitions of kafka topics.
Configuration
Name | Type | Description | Default |
---|---|---|---|
maxNumPartitions | Int | maximum value for the number of partitions to be used for Kafka Topics |
Example
jikkou {
transformations: [
{
type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMaxNumPartitions
priority = 100
config = {
maxNumPartitions = 50
}
}
]
}
7.2.3.2 - KafkaTopicMaxRetentionMs
This transformation can be used to enforce a maximum value for the retention.ms
property of kafka topics.
Configuration
Name | Type | Description | Default |
---|---|---|---|
maxRetentionMs | Int | Minimum value of retention.ms to be used for Kafka Topics |
Example
jikkou {
transformations: [
{
type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMinRetentionMsTransformation
priority = 100
config = {
maxRetentionMs = 2592000000 # 30 days
}
}
]
}
7.2.3.3 - KafkaTopicMinInSyncReplicas
This transformation can be used to enforce a minimum value for the min.insync.replicas
property of kafka topics.
Configuration
Name | Type | Description | Default |
---|---|---|---|
minInSyncReplicas | Int | Minimum value of min.insync.replicas to be used for Kafka Topics |
Example
jikkou {
transformations: [
{
type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMinInSyncReplicasTransformation
priority = 100
config = {
minInSyncReplicas = 2
}
}
]
}
7.2.3.4 - KafkaTopicMinReplicas
This transformation can be used to enforce a minimum value for the replication factor of kafka topics.
Configuration
Name | Type | Description | Default |
---|---|---|---|
minReplicationFactor | Int | Minimum value of replication factor to be used for Kafka Topics |
Example
jikkou {
transformations: [
{
type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMinReplicasTransformation
priority = 100
config = {
minReplicationFactor = 3
}
}
]
}
7.2.3.5 - KafkaTopicMinRetentionMs
This transformation can be used to enforce a minimum value for the retention.ms
property of kafka topics.
Configuration
Name | Type | Description | Default |
---|---|---|---|
minRetentionMs | Int | Minimum value of retention.ms to be used for Kafka Topics |
Example
jikkou {
transformations: [
{
type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMinRetentionMsTransformation
priority = 100
config = {
minRetentionMs = 604800000 # 7 days
}
}
]
}
7.2.4 - Validations
Jikkou ships with the following built-in validations:
Topics
NoDuplicateTopicsAllowedValidation
(auto registered)
TopicConfigKeysValidation
(auto registered)
type = io.streamthoughts.jikkou.kafka.validation.TopicConfigKeysValidation
The TopicConfigKeysValidation
allows checking if the specified topic configs are all valid.
TopicMinNumPartitions
type = io.streamthoughts.jikkou.kafka.validation.TopicMinNumPartitionsValidation
The TopicMinNumPartitions
allows checking if the specified number of partitions for a topic is not less than the minimum required.
Configuration
Name | Type | Description | Default |
---|---|---|---|
topicMinNumPartitions | Int | Minimum number of partitions allowed |
TopicMaxNumPartitions
type = io.streamthoughts.jikkou.kafka.validation.TopicMaxNumPartitions
The TopicMaxNumPartitions
allows checking if the number of partitions for a topic is not greater than the maximum configured.
Configuration
Name | Type | Description | Default |
---|---|---|---|
topicMaxNumPartitions | Int | Maximum number of partitions allowed |
TopicMinReplicationFactor
type = io.streamthoughts.jikkou.kafka.validation.TopicMinReplicationFactor
The TopicMinReplicationFactor
allows checking if the specified replication factor for a topic is not less than the minimum required.
Configuration
Name | Type | Description | Default |
---|---|---|---|
topicMinReplicationFactor | Int | Minimum replication factor allowed |
TopicMaxReplicationFactor
type = io.streamthoughts.jikkou.kafka.validation.TopicMaxReplicationFactor
The TopicMaxReplicationFactor
allows checking if the specified replication factor for a topic is not greater than the maximum configured.
Configuration
Name | Type | Description | Default |
---|---|---|---|
topicMaxReplicationFactor | Int | Maximum replication factor allowed |
TopicNamePrefix
type = io.streamthoughts.jikkou.kafka.validation.TopicNamePrefix
The TopicNamePrefix
allows checking if the specified name for a topic starts with one of the configured suffixes.
Configuration
Name | Type | Description | Default |
---|---|---|---|
topicNamePrefixes | List | List of topic name prefixes allows |
TopicNameSuffix
type = io.streamthoughts.jikkou.kafka.validation.TopicNameSuffix
The TopicNameSuffix
allows checking if the specified name for a topic ends with one of the configured suffixes.
Configuration
Name | Type | Description | Default |
---|---|---|---|
topicNameSuffixes | List | List of topic name suffixes allows |
ACLs
NoDuplicateUsersAllowedValidation
(auto registered)
NoDuplicateRolesAllowedValidation
(auto registered)
Quotas
QuotasEntityValidation
(auto registered)
7.2.5 - Annotations
Here, you will find information about the annotations provided the Apache Kafka extension for Jikkou.
List of built-in annotations
kafka.jikkou.io/cluster-id
Used by jikkou.
The annotation is automatically added by Jikkou to a describe object part of an Apache Kafka cluster.
7.2.6 - Actions
Here, you will find the list of actions provided by the Extension Provider for Apache Kafka.
Apache Kafka Action
More information:
7.2.6.1 - KafkaConsumerGroupsResetOffsets
The KafkaConsumerGroupsResetOffsets
action allows resetting offsets of consumer group.
It supports one consumer group at the time, and group should be in EMPTY state.
Usage (CLI)
Usage:
Execute the action.
jikkou action KafkaConsumerGroupsResetOffsets execute [-hV] [--all] [--dry-run]
[--to-earliest] [--to-latest] [--group=PARAM] [--logger-level=<level>]
[-o=<format>] [--to-datetime=PARAM] [--to-offset=PARAM] [--excludes=PARAM]...
[--groups=PARAM]... [--includes=PARAM]... --topic=PARAM [--topic=PARAM]...
DESCRIPTION:
Reset offsets of consumer group. Supports multiple consumer groups, and groups
should be in EMPTY state.
You must choose one of the following reset specifications: to-datetime,
by-duration, to-earliest, to-latest, to-offset.
OPTIONS:
--all Specifies to act on all consumer groups.
--dry-run Only show results without executing changes on
Consumer Groups.
--excludes=PARAM List of patterns to match the consumer groups that
must be excluded from the reset-offset action.
--group=PARAM The consumer group to act on.
--groups=PARAM The consumer groups to act on.
-h, --help Show this help message and exit.
--includes=PARAM List of patterns to match the consumer groups that
must be included in the reset-offset action.
--logger-level=<level>
Specify the log level verbosity to be used while
running a command.
Valid level values are: TRACE, DEBUG, INFO, WARN,
ERROR.
For example, `--logger-level=INFO`
-o, --output=<format> Prints the output in the specified format. Allowed
values: JSON, YAML (default YAML).
--to-datetime=PARAM Reset offsets to offset from datetime. Format:
'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset=PARAM Reset offsets to a specific offset.
--topic=PARAM The topic whose partitions must be included in the
reset-offset action.
-V, --version Print version information and exit.
Examples
Reset Single Consumer Group to the earliest offsets
jikkou action kafkaconsumergroupresetoffsets execute \
--group my-group \
--topic test \
--to-earliest
(output)
---
kind: "ApiActionResultSet"
apiVersion: "core.jikkou.io/v1"
metadata:
labels: {}
annotations:
configs.jikkou.io/to-earliest: "true"
configs.jikkou.io/group: "my-group"
configs.jikkou.io/dry-run: "false"
configs.jikkou.io/topic:
- "test"
results:
- status: "SUCCEEDED"
errors: []
data:
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConsumerGroup"
metadata:
name: "my-group"
labels:
kafka.jikkou.io/is-simple-consumer: false
annotations: {}
status:
state: "EMPTY"
members: []
offsets:
- topic: "test"
partition: 1
offset: 0
- topic: "test"
partition: 0
offset: 0
- topic: "test"
partition: 2
offset: 0
- topic: "--test"
partition: 0
offset: 0
coordinator:
id: "101"
host: "localhost"
port: 9092
Reset All Consumer Groups to the earliest offsets
jikkou action kafkaconsumergroupresetoffsets execute \
--all \
--topic test \
--to-earliest
7.2.7 - Compatibility
The Apache Kafka extension for Jikkou utilizes the Kafka Admin Client which is compatible with any Kafka infrastructure, such as :
- Aiven
- Apache Kafka
- Confluent Cloud
- MSK
- Redpanda
- etc.
In addition, Kafka Protocol has a “bidirectional” client compatibility policy. In other words, new clients can talk to old servers, and old clients can talk to new servers.
7.3 - Apache Kafka Connect
Here, you will find information to use the Kafka Connect extension for Jikkou.
More information:
7.3.1 - Configuration
This section describes how to configure the Kafka Connect extension.
Extension
The Kafka Connect extension can be enabled/disabled via the configuration properties:
# Example
jikkou {
extensions.provider.kafkaconnect.enabled = true
}
Configuration
You can configure the properties to be used to connect the Kafka Connect cluster
through the Jikkou client configuration property: jikkou.kafkaConnect
.
Example:
jikkou {
kafkaConnect {
# Array of Kafka Connect clusters configurations.
clusters = [
{
# Name of the cluster (e.g., dev, staging, production, etc.)
name = "locahost"
# URL of the Kafka Connect service
url = "http://localhost:8083"
# Method to use for authenticating on Kafka Connect. Available values are: [none, basicauth, ssl]
authMethod = none
# Use when 'authMethod' is 'basicauth' to specify the username for Authorization Basic header
basicAuthUser = null
# Use when 'authMethod' is 'basicauth' to specify the password for Authorization Basic header
basicAuthPassword = null
# Enable debug logging
debugLoggingEnabled = false
# Ssl Config: Use when 'authMethod' is 'ssl'
# The location of the key store file.
sslKeyStoreLocation = "/certs/registry.keystore.jks"
# The file format of the key store file.
sslKeyStoreType = "JKS"
# The password for the key store file.
sslKeyStorePassword = "password"
# The password of the private key in the key store file.
sslKeyPassword = "password"
# The location of the trust store file.
sslTrustStoreLocation = "/certs/registry.truststore.jks"
# The file format of the trust store file.
sslTrustStoreType = "JKS"
# The password for the trust store file.
sslTrustStorePassword = "password"
# Specifies whether to ignore the hostname verification.
sslIgnoreHostnameVerification = true
}
]
}
}
7.3.2 - Resources
Here, you will find the list of resources supported by the Kafka Connect Extension.
Kafka Connect Resources
More information:
7.3.2.1 - KafkaConnectors
This section describes the resource definition format for KafkaConnector
entities, which can be used to define the
configuration and status of connectors you plan to create and manage on specific Kafka Connect clusters.
Definition Format of KafkaConnector
Below is the overall structure of the KafkaConnector
resource.
---
apiVersion: "kafka.jikkou.io/v1beta1" # The api version (required)
kind: "KafkaConnector" # The resource kind (required)
metadata:
name: <string> # The name of the connector (required)
labels:
# Name of the Kafka Connect cluster to create the connector instance in (required).
kafka.jikkou.io/connect-cluster: <string>
annotations:
# Override client properties to connect to Kafka Connect cluster (optional).
jikkou.io/config-override: |
<json>
spec:
connectorClass: <string> # Name or alias of the class for this connector.
tasksMax: <integer> # The maximum number of tasks for the Kafka Connector.
config: # Configuration properties of the connector.
<key>: <value>
state: <string> # The state the connector should be in. Defaults to running.
See below for details about all these fields.
Metadata
metadata.name
[required]
The name of the connector.
labels.kafka.jikkou.io/connect-cluster
[required]
The name of the Kafka Connect cluster to create the connector instance in.
The cluster name must be configured through the kafkaConnect.clusters[]
Jikkou’s configuration setting (see: Configuration).
jikkou.io/config-override:
[optional]
The JSON client configurations to override for connecting to the Kafka Connect cluster. The configuration properties passed through this annotation override any cluster properties defined in the Jikkou’s configuration setting (see: Configuration).
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
name: "my-connector"
labels:
kafka.jikkou.io/connect-cluster: "my-connect-cluster"
annotations:
jikkou.io/config-override: |
{ "url": "http://localhost:8083" }
Specification
spec.connectorClass
[required]
The name or alias of the class for this connector.
spec.tasksMax
[optional]
The maximum number of tasks for the Kafka Connector. Default is 1
.
spec.config
[required]
The connector’s configuration properties.
spec.state
[optional]
The state the connector should be in. Defaults to running
.
Below are the valid values:
running
: Transition the connector and its tasks to RUNNING state.paused
: Pause the connector and its tasks, which stops message processing until the connector is resumed.stopped
: Completely shut down the connector and its tasks. The connector config remains present in the config topic of the cluster (if running in distributed mode), unmodified.
Examples
The following is an example of a resource describing a Kafka connector:
---
# Example: file: kafka-connector-filestream-sink.yaml
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
name: "local-file-sink"
labels:
kafka.jikkou.io/connect-cluster: "my-connect-cluster"
spec:
connectorClass: "FileStreamSink"
tasksMax: 1
config:
file: "/tmp/test.sink.txt"
topics: "connect-test"
state: "RUNNING"
Listing KafkaConnector
You can retrieve the state of Kafka Connector instances running on your Kafka Connect clusters using the jikkou get kafkaconnectors
(or jikkou get kc
) command.
Usage
$jikkou get kc --help
Usage:
Get all 'KafkaConnector' resources.
jikkou get kafkaconnectors [-hV] [--expand-status] [-o=<format>]
[-s=<expressions>]...
Description:
Use jikkou get kafkaconnectors when you want to describe the state of all
resources of type 'KafkaConnector'.
Options:
--expand-status Retrieves additional information about the status of
the connector and its tasks.
-h, --help Show this help message and exit.
-o, --output=<format> Prints the output in the specified format. Allowed
values: json, yaml (default yaml).
-s, --selector=<expressions>
The selector expression use for including or
excluding resources.
-V, --version Print version information and exit.
(The output from your current Jikkou version may be different from the above example.)
Examples
(command)
$ jikkou get kc --expand-status
(output)
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
name: "local-file-sink"
labels:
kafka.jikkou.io/connect-cluster: "localhost"
spec:
connectorClass: "FileStreamSink"
tasksMax: 1
config:
file: "/tmp/test.sink.txt"
topics: "connect-test"
state: "RUNNING"
status:
connectorStatus:
name: "local-file-sink"
connector:
state: "RUNNING"
worker_id: "localhost:8083"
tasks:
id: 1
state: "RUNNING"
worker_id: "localhost:8083"
The status.connectorStatus
provides the connector status, as reported by the Kafka Connect REST API.
7.3.3 - Validations
Jikkou ships with the following built-in validations:
7.3.4 - Annotations
This section lists a number of well known annotations, that have defined semantics. They can be attached
to KafkaConnect
resources through the metadata.annotations
field and consumed as needed by extensions (i.e., validations, transformations, controller,
collector, etc.).
List of built-in annotations
7.3.5 - Labels
This section lists a number of well known labels, that have defined semantics. They can be attached
to KafkaConnect
resources through the metadata.labels
field and consumed as needed by extensions (i.e., validations, transformations, controller,
collector, etc.).
Labels
kafka.jikkou.io/connect-cluster
# Example
---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
labels:
kafka.jikkou.io/connect-cluster: 'my-connect-cluster'
The value of this label defined the name of the Kafka Connect cluster to create the connector instance in.
The cluster name must be configured through the kafkaConnect.clusters[]
Jikkou’s configuration setting (see: Configuration).
7.3.6 - Actions
Here, you will find the list of actions provided by the Extension Provider for Kafka Connect.
Kafka Connect Action
More information:
7.3.6.1 - KafkaConnectRestartConnectors
The KafkaConnectRestartConnectors
action allows a user to restart all or just the
failed Connector and Task instances for one or multiple named connectors.
Usage (CLI)
Usage:
Execute the action.
jikkou action KafkaConnectRestartConnectors execute [-hV] [--include-tasks]
[--only-failed] [--connect-cluster=PARAM] [--logger-level=<level>]
[-o=<format>] [--connector-name=PARAM]...
DESCRIPTION:
The KafkaConnectRestartConnectors action a user to restart all or just the
failed Connector and Task instances for one or multiple named connectors.
OPTIONS:
--connect-cluster=PARAM
The name of the connect cluster.
--connector-name=PARAM
The connector's name.
-h, --help Show this help message and exit.
--include-tasks Specifies whether to restart the connector instance
and task instances (includeTasks=true) or just the
connector instance (includeTasks=false)
--logger-level=<level>
Specify the log level verbosity to be used while
running a command.
Valid level values are: TRACE, DEBUG, INFO, WARN,
ERROR.
For example, `--logger-level=INFO`
-o, --output=<format> Prints the output in the specified format. Allowed
values: JSON, YAML (default YAML).
--only-failed Specifies whether to restart just the instances with
a FAILED status (onlyFailed=true) or all instances
(onlyFailed=false)
-V, --version Print version information and exit.
Examples
Restart all connectors for all Kafka Connect clusters.
jikkou action kafkaconnectrestartconnectors execute
(output)
---
kind: "ApiActionResultSet"
apiVersion: "core.jikkou.io/v1"
metadata:
labels: {}
annotations: {}
results:
- status: "SUCCEEDED"
data:
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
name: "local-file-sink"
labels:
kafka.jikkou.io/connect-cluster: "my-connect-cluster"
annotations: {}
spec:
connectorClass: "FileStreamSink"
tasksMax: 1
config:
file: "/tmp/test.sink.txt"
topics: "connect-test"
state: "RUNNING"
status:
connectorStatus:
name: "local-file-sink"
connector:
state: "RUNNING"
workerId: "connect:8083"
tasks:
- id: 0
state: "RUNNING"
workerId: "connect:8083"
Restart all connectors with a FAILED status on all Kafka Connect clusters.
jikkou action kafkaconnectrestartconnectors execute \
--only-failed
Restart specific connector and tasks for on Kafka Connect cluster
jikkou action kafkaconnectrestartconnectors execute \
--cluster-name my-connect-cluster
--connector-name local-file-sink \
--include-tasks
7.4 - Schema Registry
Here, you will find information to use the Schema Registry extensions.
More information:
7.4.1 - Configuration
Here, you will find the list of resources supported for SchemaRegistry.
Configuration
You can configure the properties to be used to connect the SchemaRegistry service
through the Jikkou client configuration property jikkou.schemaRegistry
.
Example:
jikkou {
schemaRegistry {
# Comma-separated list of URLs for schema registry instances that can be used to register or look up schemas
url = "http://localhost:8081"
# The name of the schema registry implementation vendor - can be any value
vendor = generic
# Method to use for authenticating on Schema Registry. Available values are: [none, basicauth, ssl]
authMethod = none
# Use when 'schemaRegistry.authMethod' is 'basicauth' to specify the username for Authorization Basic header
basicAuthUser = null
# Use when 'schemaRegistry.authMethod' is 'basicauth' to specify the password for Authorization Basic header
basicAuthPassword = null
# Enable debug logging
debugLoggingEnabled = false
# Ssl Config: Use when 'authMethod' is 'ssl'
# The location of the key store file.
sslKeyStoreLocation = "/certs/registry.keystore.jks"
# The file format of the key store file.
sslKeyStoreType = "JKS"
# The password for the key store file.
sslKeyStorePassword = "password"
# The password of the private key in the key store file.
sslKeyPassword = "password"
# The location of the trust store file.
sslTrustStoreLocation = "/certs/registry.truststore.jks"
# The file format of the trust store file.
sslTrustStoreType = "JKS"
# The password for the trust store file.
sslTrustStorePassword = "password"
# Specifies whether to ignore the hostname verification.
sslIgnoreHostnameVerification = true
}
}
7.4.2 - Resources
Here, you will find the list of resources supported for Schema Registry.
Schema Registry Resources
More information:
7.4.2.1 - Schema Registry Subjects
SchemaRegistrySubject resources are used to define the subject schemas you want to manage on your SchemaRegistry. A SchemaRegistrySubject resource defines the schema, the compatibility level, and the references to be associated with a subject version.
Definition Format of SchemaRegistrySubject
Below is the overall structure of the SchemaRegistrySubject
resource.
apiVersion: "schemaregistry.jikkou.io/v1beta2" # The api version (required)
kind: "SchemaRegistrySubject" # The resource kind (required)
metadata:
name: <The name of the subject> # (required)
labels: { }
annotations: { }
spec:
schemaRegistry:
vendor: <vendor_name> # (optional) The vendor of the SchemaRegistry, e.g., Confluent, Karapace, etc
compatibilityLevel: <compatibility_level> # (optional) The schema compatibility level for this subject.
schemaType: <The schema format> # (required) Accepted values are: AVRO, PROTOBUF, JSON
schema:
$ref: <url or path> #
references: # Specifies the names of referenced schemas (optional array).
- name: <string> # The name for the reference.
subject: <string> # The subject under which the referenced schema is registered.
version: <string> # The exact version of the schema under the registered subject.
]
Metadata
The metadata.name
property is mandatory and specifies the name of the Subject.
Specification
To use the SchemaRegistry default values for the compatibilityLevel
you can omit the property.
Example
Here is a simple example that shows how to define a single subject AVRO schema for type using
the SchemaRegistrySubject
resource type.
file: subject-user.yaml
---
apiVersion: "schemaregistry.jikkou.io/v1beta2"
kind: "SchemaRegistrySubject"
metadata:
name: "User"
labels: { }
annotations:
schemaregistry.jikkou.io/normalize-schema: true
spec:
compatibilityLevel: "FULL_TRANSITIVE"
schemaType: "AVRO"
schema:
$ref: ./user-schema.avsc
file: user-schema.avsc
---
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": [ "null", "string" ],
"default": null,
},
{
"name": "favorite_number",
"type": [ "null", "int" ],
"default": null
},
{
"name": "favorite_color",
"type": [ "null", "string" ],
"default": null
}
]
}
Alternatively, we can directly pass the Avro schema as follows :
file: subject-user.yaml
---
apiVersion: "schemaregistry.jikkou.io/v1beta2"
kind: "SchemaRegistrySubject"
metadata:
name: "User"
labels: { }
annotations:
schemaregistry.jikkou.io/normalize-schema: true
spec:
compatibilityLevel: "FULL_TRANSITIVE"
schemaType: "AVRO"
schema: |
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": [ "null", "string" ],
"default": null
},
{
"name": "favorite_number",
"type": [ "null", "int" ],
"default": null
},
{
"name": "favorite_color",
"type": [ "null", "string"],
"default": null
}
]
}
SchemaRegistrySubjectList
If you need to manage multiple Schemas at once (e.g. using a template), it may be more suitable to use the resource collection SchemaRegistrySubjectList
.
Specification
Here the resource definition file for defining a SchemaRegistrySubjectList
.
apiVersion: "schemaregistry.jikkou.io/v1beta2" # The api version (required)
kind: "SchemaRegistrySubjectList" # The resource kind (required)
metadata: # (optional)
labels: { }
annotations: { }
items: [ ] # The array of SchemaRegistrySubject
7.4.3 - Validations
Jikkou ships with the following built-in validations:
Subject
SchemaCompatibilityValidation
type = io.streamthoughts.jikkou.schema.registry.validation.SchemaCompatibilityValidation
The SchemaCompatibilityValidation
allows testing the compatibility of the schema with the latest
version already registered in the Schema Registry using the provided compatibility-level.
AvroSchemaValidation
The AvroSchemaValidation
allows checking if the specified Avro schema matches some specific avro schema definition
rules;
type = io.streamthoughts.jikkou.schema.registry.validation.AvroSchemaValidation
Configuration
Name | Type | Description | Default |
---|---|---|---|
fieldsMustHaveDoc | Boolean | Verify that all record fields have a doc property | false |
fieldsMustBeNullable | Boolean | Verify that all record fields are nullable | false |
fieldsMustBeOptional | Boolean | Verify that all record fields are optional | false |
7.4.4 - Annotations
Here, you will find information about the annotations provided by the Schema Registry extension for Jikkou.
List of built-in annotations
schemaregistry.jikkou.io/url
Used by jikkou.
The annotation is automatically added by Jikkou to describe the SchemaRegistry URL from which a subject schema is retrieved.
schemaregistry.jikkou.io/schema-version
Used by jikkou.
The annotation is automatically added by Jikkou to describe the version of a subject schema.
schemaregistry.jikkou.io/schema-id
Used by jikkou.
The annotation is automatically added by Jikkou to describe the version of a subject id.
schemaregistry.jikkou.io/normalize-schema
Used on: schemaregistry.jikkou.io/v1beta2:SchemaRegistrySubject
This annotation can be used to normalize the schema on SchemaRegistry server side. Note, that Jikkou will attempt to normalize AVRO and JSON schema.
See: Confluent SchemaRegistry API Reference
schemaregistry.jikkou.io/permanante-delete
Used on: schemaregistry.jikkou.io/v1beta2:SchemaRegistrySubject
The annotation can be used to specify a hard delete of the subject, which removes all associated metadata including the schema ID. The default is false. If the flag is not included, a soft delete is performed. You must perform a soft delete first, then the hard delete.
See: Confluent SchemaRegistry API Reference
schemaregistry.jikkou.io/use-canonical-fingerprint
This annotation can be used to use a canonical fingerprint to compare schemas (only supported for Avro schema).
7.5 - Aiven
Here, you will find information to use the Aiven for Kafka extensions.
More information:
7.5.1 - Configuration
Here, you will find the list of resources supported by the extension for Aiven.
Configuration
You can configure the properties to be used to connect the Aiven service
through the Jikkou client configuration property jikkou.aiven
.
Example:
jikkou {
aiven {
# Aiven project name
project = "http://localhost:8081"
# Aiven service name
service = generic
# URL to the Aiven REST API.
apiUrl = "https://api.aiven.io/v1/"
# Aiven Bearer Token. Tokens can be obtained from your Aiven profile page
tokenAuth = null
# Enable debug logging
debugLoggingEnabled = false
}
}
7.5.2 - Resources
Here, you will find the list of resources supported by the extensions for Aiven.
Aiven for Apache Kafka Resources
More information:
7.5.2.1 - ACL for Aiven Apache Kafka®
The KafkaTopicAclEntry
resources are used to manage the Access Control Lists in Aiven for Apache Kafka®. A
KafkaTopicAclEntry
resource defines the permission to be granted to a user for one or more kafka topics.
KafkaTopicAclEntry
Specification
Here is the resource definition file for defining a KafkaTopicAclEntry
.
---
apiVersion: "kafka.aiven.io/v1beta1" # The api version (required)
kind: "KafkaTopicAclEntry" # The resource kind (required)
metadata:
labels: { }
annotations: { }
spec:
permission: <> # The permission. Accepted values are: READ, WRITE, READWRITE, ADMIN
username: <> # The username
topic: <> # Topic name or glob pattern
Example
Here is a simple example that shows how to define a single ACL entry using
the KafkaTopicAclEntry
resource type.
file: kafka-topic-acl-entry.yaml
---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "KafkaTopicAclEntry"
metadata:
labels: { }
annotations: { }
spec:
permission: "READWRITE"
username: "alice"
topic: "public-*"
KafkaTopicAclEntryList
If you need to define multiple ACL entries (e.g. using a template), it may be easier to use a KafkaTopicAclEntryList
resource.
Specification
Here the resource definition file for defining a KafkaTopicList
.
---
apiVersion: "kafka.aiven.io/v1beta1" # The api version (required)
kind: "KafkaTopicAclEntryList" # The resource kind (required)
metadata: # (optional)
name: <The name of the topic>
labels: { }
annotations: { }
items: [ ] # An array of KafkaTopicAclEntry
Example
Here is a simple example that shows how to define a single YAML file containing two ACL entry definitions using
the KafkaTopicAclEntryList
resource type.
---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "KafkaTopicAclEntryList"
items:
- spec:
permission: "READWRITE"
username: "alice"
topic: "public-*"
- spec:
permission: "READ"
username: "bob"
topic: "public-*"
7.5.2.2 - Quotas for Aiven Apache Kafka®
The KafkaQuota
resources are used to manage the Quotas in Aiven for Apache Kafka® service.
For more details, see https://docs.aiven.io/docs/products/kafka/concepts/kafka-quotas
KafkaQuota
Specification
Here is the resource definition file for defining a KafkaQuota
.
---
apiVersion: "kafka.aiven.io/v1beta1" # The api version (required)
kind: "KafkaQuota" # The resource kind (required)
metadata:
labels: { }
annotations: { }
spec:
user: <string> # The username: (Optional: 'default' if null)
clientId: <string> # The client-id
consumerByteRate: <number> # The quota in bytes for restricting data consumption
producerByteRate: <number> # The quota in bytes for restricting data production
requestPercentage: <number>
Example
Here is a simple example that shows how to define a single ACL entry using
the KafkaQuota
resource type.
file: kafka-quotas.yaml
---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "KafkaQuota"
spec:
user: "default"
clientId: "default"
consumerByteRate: 1048576
producerByteRate: 1048576
requestPercentage: 25
KafkaQuotaList
If you need to define multiple Kafka quotas (e.g. using a template), it may be easier to use a KafkaQuotaList
resource.
Specification
Here the resource definition file for defining a KafkaTopicList
.
---
apiVersion: "kafka.aiven.io/v1beta1" # The api version (required)
kind: "KafkaQuotaList" # The resource kind (required)
metadata: # (optional)
labels: { }
annotations: { }
items: [ ] # An array of KafkaQuotaList
Example
Here is a simple example that shows how to define a single YAML file containing two ACL entry definitions using
the KafkaQuotaList
resource type.
---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "KafkaQuotaList"
items:
- spec:
user: "default"
clientId: "default"
consumerByteRate: 1048576
producerByteRate: 1048576
requestPercentage: 5
- spec:
user: "avnadmin"
consumerByteRate: 5242880
producerByteRate: 5242880
requestPercentage: 25
7.5.2.3 - ACL for Aiven Schema Registry
The SchemaRegistryAclEntry
resources are used to manage the Access Control Lists in Aiven for Schema Registry. A
SchemaRegistryAclEntry
resource defines the permission to be granted to a user for one or more Schema Registry
Subjects.
SchemaRegistryAclEntry
Specification
Here is the resource definition file for defining a SchemaRegistryAclEntry
.
---
apiVersion: "kafka.aiven.io/v1beta1" # The api version (required)
kind: "SchemaRegistryAclEntry" # The resource kind (required)
metadata:
labels: { }
annotations: { }
spec:
permission: <> # The permission. Accepted values are: READ, WRITE
username: <> # The username
resource: <> # The Schema Registry ACL entry resource name pattern
NOTE: The resource name pattern should be Config:
or Subject:<subject_name>
where subject_name
must consist of
alpha-numeric characters, underscores, dashes, dots and glob characters *
and ?
.
Example
Here is an example that shows how to define a simple ACL entry using
the SchemaRegistryAclEntry
resource type.
file: schema-registry-acl-entry.yaml
---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "SchemaRegistryAclEntry"
spec:
permission: "READ"
username: "Alice"
resource: "Subject:*"
SchemaRegistryAclEntryList
If you need to define multiple ACL entries (e.g. using a template), it may be easier to use
a SchemaRegistryAclEntryList
resource.
Specification
Here the resource definition file for defining a SchemaRegistryAclEntryList
.
---
apiVersion: "kafka.aiven.io/v1beta1" # The api version (required)
kind: "SchemaRegistryAclEntryList" # The resource kind (required)
metadata: # (optional)
labels: { }
annotations: { }
items: [ ] # An array of SchemaRegistryAclEntry
Example
Here is a simple example that shows how to define a single YAML file containing two ACL entry definitions using
the SchemaRegistryAclEntryList
resource type.
---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "SchemaRegistryAclEntryList"
items:
- spec:
permission: "READ"
username: "alice"
resource: "Config:"
- spec:
permission: "WRITE"
username: "alice"
resource: "Subject:*"
7.5.2.4 - Subject for Aiven Schema Registry
SchemaRegistrySubject resources are used to define the subject schemas you want to manage on your Schema Registry. A SchemaRegistrySubject resource defines the schema, the compatibility level, and the references to be associated with a subject version.
SchemaRegistrySubject
Specification
Here is the resource definition file for defining a SchemaRegistrySubject
.
apiVersion: "kafka.aiven.io/v1beta1" # The api version (required)
kind: "SchemaRegistrySubject" # The resource kind (required)
metadata:
name: <The name of the subject> # (required)
labels: { }
annotations: { }
spec:
schemaRegistry:
vendor: 'Karapace' # (optional) The vendor of the Schema Registry
compatibilityLevel: <compatibility_level> # (optional) The schema compatibility level for this subject.
schemaType: <The schema format> # (required) Accepted values are: AVRO, PROTOBUF, JSON
schema:
$ref: <url or path> #
references: # Specifies the names of referenced schemas (optional array).
- name: <> # The name for the reference.
subject: <> # The subject under which the referenced schema is registered.
version: <> # The exact version of the schema under the registered subject.
]
The metadata.name
property is mandatory and specifies the name of the Subject.
To use the SchemaRegistry default values for the compatibilityLevel
you can omit the property.
Example
Here is a simple example that shows how to define a single subject AVRO schema for type using
the SchemaRegistrySubject
resource type.
file: subject-user.yaml
---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "SchemaRegistrySubject"
metadata:
name: "User"
labels: { }
annotations:
schemaregistry.jikkou.io/normalize-schema: true
spec:
compatibilityLevel: "FULL_TRANSITIVE"
schemaType: "AVRO"
schema:
$ref: ./user-schema.avsc
file: user-schema.avsc
---
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": [ "null", "string" ],
"default": null,
},
{
"name": "favorite_number",
"type": [ "null", "int" ],
"default": null
},
{
"name": "favorite_color",
"type": [ "null", "string" ],
"default": null
}
]
}
Alternatively, we can directly pass the Avro schema as follows :
file: subject-user.yaml
---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "SchemaRegistrySubject"
metadata:
name: "User"
labels: { }
annotations:
schemaregistry.jikkou.io/normalize-schema: true
spec:
compatibilityLevel: "FULL_TRANSITIVE"
schemaType: "AVRO"
schema: |
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": [ "null", "string" ],
"default": null
},
{
"name": "favorite_number",
"type": [ "null", "int" ],
"default": null
},
{
"name": "favorite_color",
"type": [ "null", "string"],
"default": null
}
]
}
7.5.3 - Validations
Jikkou ships with the following built-in validations:
No validation
7.5.4 - Annotations
Here, you will find information about the annotations provided by the Aiven extension for Jikkou.
List of built-in annotations
kafka.aiven.io/acl-entry-id
Used by jikkou.
The annotation is automatically added by Jikkou to describe the ID of an ACL entry.
8 - Developer Guide
Here, you will find the necessary information to develop with the Jikkou API.
More information:
8.1 - Extension Developer Guide
This guide describes how developers can write new extensions for Jikkou.
More information:
8.1.1 - Package Extensions
Packaging Extensions
You can extend Jikkou’s capabilities by developing custom extensions and resources.
An extension must be developed in Java and packaged as a tarball or ZIP archive. The archive must contain a single top-level directory containing the extension JAR files, as well as any resource files or third party libraries required by your extensions. An alternative approach is to create an uber-JAR that contains all the extension’s JAR files and other resource files needed.
An extension package is more commonly described as an Extension Provider.
Dependencies
Jikkou’s sources are available on Maven Central
To start developing custom extension for Jikkou, simply add the Core library to your project’s dependencies.
For Maven:
<dependency>
<groupId>io.streamthoughts</groupId>
<artifactId>jikkou-core</artifactId>
<version>${jikkou.version}</version>
</dependency>
For Gradle:
implementation group: 'io.streamthoughts', name: 'jikkou-core', version: ${jikkou.version}
Extension Discovery
Jikkou uses the standard Java ServiceLoader
mechanism to discover and registers custom extensions and resources. For this, you will need to the implement
the Service Provider Interface: io.streamthoughts.jikkou.spi.ExtensionProvider
/**
* <pre>
* Service interface for registering extensions and resources to Jikkou at runtime.
* The implementations are discovered using the standard Java {@link java.util.ServiceLoader} mechanism.
*
* Hence, the fully qualified name of the extension classes that implement the {@link ExtensionProvider}
* interface must be added to a {@code META-INF/services/io.streamthoughts.jikkou.spi.ExtensionProvider} file.
* </pre>
*/
public interface ExtensionProvider extends HasName, Configurable {
/**
* Registers the extensions for this provider.
*
* @param registry The ExtensionRegistry.
*/
void registerExtensions(@NotNull ExtensionRegistry registry);
/**
* Registers the resources for this provider.
*
* @param registry The ResourceRegistry.
*/
void registerResources(@NotNull ResourceRegistry registry);
}
Recommendations
If you are using Maven as project management tool, we recommended to use the Apache Maven Assembly Plugin to package your extensions as a tarball or ZIP archive.
Simply create an assembly descriptor in your project as follows:
src/main/assembly/package.xml
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.2.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.2.0 http://maven.apache.org/xsd/assembly-2.2.0.xsd">
<id>package</id>
<formats>
<format>zip</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>${project.basedir}</directory>
<outputDirectory>${organization.name}-${project.artifactId}/doc</outputDirectory>
<includes>
<include>README*</include>
<include>LICENSE*</include>
<include>NOTICE*</include>
</includes>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<outputDirectory>${organization.name}-${project.artifactId}/lib</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<useTransitiveFiltering>true</useTransitiveFiltering>
<unpack>false</unpack>
<excludes>
<exclude>io.streamthoughts:jikkou-core</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>
Then, configure the maven-assembly-plugin
in the pom.xml
file of your project:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<finalName>${organization.name}-${project.artifactId}-${project.version}</finalName>
<appendAssemblyId>false</appendAssemblyId>
<descriptors>
<descriptor>src/assembly/package.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
<execution>
<id>test-make-assembly</id>
<phase>pre-integration-test</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
Finally, use the mvn clean package
to build your project and create the archive.
Installing Extension Providers
To install an Extension Provider, all you need to do is to unpacks the archive into a desired location (
e.g., /usr/share/jikkou-extensions
).
Also, you should ensure that the archive’s top-level directory name is unique, to prevent overwriting existing files or
extensions.
Configuring Extension Providers
Custom extensions can be supplied to the Jikkou’s API Server and Jikkou CLI (when running the Java Binary Distribution,
i.e., not the native version). For this, you simply need to configure the jikkou.extension.paths
property. The
property accepts a list of paths from which to load extension providers.
Example for the Jikkou API Server:
# application.yaml
jikkou:
extension.paths:
- /usr/share/jikkou-extensions
Once your extensions are configured you should be able to list your extensions using either :
- The Jikkou CLI:
jikkou api-extensions list
command, or - The Jikkou API Server:
GET /apis/core.jikkou.io/v1/extensions -H "Accept: application/json"
8.1.2 - Develop Custom Validations
This section covers the core classes to develop validation extensions.
Interface
To create a custom validation
, you will need to implement the Java
interface: io.streamthoughts.jikkou.core.validation.Validation
.
This interface defines two methods, with a default implementation for each, to give you the option of validating either all resources accepted by validation at once, or each resource one by one.
public interface Validation<T extends HasMetadata> extends Interceptor {
/**
* Validates the specified resource list.
*
* @param resources The list of resources to be validated.
* @return The ValidationResult.
*/
default ValidationResult validate(@NotNull final List<T> resources) {
// code omitted for clarity
}
/**
* Validates the specified resource.
*
* @param resource The resource to be validated.
* @return The ValidationResult.
*/
default ValidationResult validate(@NotNull final T resource) {
// code omitted for clarity
}
}
Examples
The validation class below shows how to validate that any resource has a specific non-empty label.
@Title("HasNonEmptyLabelValidation allows validating that resources have a non empty label.")
@Description("This validation can be used to ensure that all resources are associated to a specific label. The labe key is passed through the configuration of the extension.")
@Example(
title = "Validate that resources have a non-empty label with key 'owner'.",
full = true,
code = {"""
validations:
- name: "resourceMustHaveNonEmptyLabelOwner"
type: "com.example.jikkou.validation.HasNonEmptyLabelValidation"
priority: 100
config:
key: owner
"""
}
)
@SupportedResources(value = {}) // an empty list implies that the extension supports any resource-type
public final class HasNonEmptyLabelValidation implements Validation {
// The required config property.
static final ConfigProperty<String> LABEL_KEY_CONFIG = ConfigProperty.ofString("key");
private String key;
/**
* Empty constructor - required.
*/
public HasNonEmptyLabelValidation() {
}
/**
* {@inheritDoc}
*/
@Override
public void configure(@NotNull final Configuration config) {
// Get the key from the configuration.
this.key = LABEL_KEY_CONFIG
.getOptional(config)
.orElseThrow(() -> new ConfigException(
String.format("The '%s' configuration property is required for %s",
LABEL_KEY_CONFIG.key(),
TopicNamePrefixValidation.class.getSimpleName()
)
));
}
/**
* {@inheritDoc}
*/
@Override
public ValidationResult validate(final @NotNull HasMetadata resource) {
Optional<String> label = resource.getMetadata()
.findLabelByKey(this.key)
.map(NamedValue::getValue)
.map(Value::asString)
.filter(String::isEmpty);
// Failure
if (label.isEmpty()) {
String error = String.format(
"Resource for name '%s' have no defined or empty label for key: '%s'",
resource.getMetadata().getName(),
this.key
);
return ValidationResult.failure(new ValidationError(getName(), resource, error));
}
// Success
return ValidationResult.success();
}
}
8.1.3 - Develop Custom Action
This section covers the core classes to develop action extensions.
Interface
To create a custom action
, you will need to implement the Java
interface: io.streamthoughts.jikkou.core.action.Action
.
/**
* Interface for executing a one-shot action on a specific type of resources.
*
* @param <T> The type of the resource.
*/
@Category(ExtensionCategory.ACTION)
public interface Action<T extends HasMetadata> extends HasMetadataAcceptable, Extension {
/**
* Executes the action.
*
* @param configuration The configuration
* @return The ExecutionResultSet
*/
@NotNull ExecutionResultSet<T> execute(@NotNull Configuration configuration);
}
Examples
The Action
class below shows how to implement a custom action accepting options`.
@Named(EchoAction.NAME)
@Title("Print the input.")
@Description("The EchoAction allows printing the text provided in input.")
@ExtensionSpec(
options = {
@ExtensionOptionSpec(
name = INPUT_CONFIG_NAME,
description = "The input text to print.",
type = String.class,
required = true
)
}
)
public final class EchoAction extends ContextualExtension implements Action<HasMetadata> {
public static final String NAME = "EchoAction";
public static final String INPUT_CONFIG_NAME = "input";
@Override
public @NotNull ExecutionResultSet<HasMetadata> execute(@NotNull Configuration configuration) {
String input = extensionContext().<String>configProperty(INPUT_CONFIG_NAME).get(configuration);
return ExecutionResultSet
.newBuilder()
.result(ExecutionResult
.newBuilder()
.status(ExecutionStatus.SUCCEEDED)
.data(new EchoOut(input))
.build())
.build();
}
@Kind("EchoOutput")
@ApiVersion("core.jikkou.io/v1")
@Reflectable
record EchoOut(@JsonProperty("out") String out) implements HasMetadata {
@Override
public ObjectMeta getMetadata() {
return new ObjectMeta();
}
@Override
public HasMetadata withMetadata(ObjectMeta objectMeta) {
throw new UnsupportedOperationException();
}
}
}
8.1.4 - Develop Custom Transformations
This section covers the core classes to develop transformation extensions.
Interface
To create a custom transformation
, you will need to implement the Java
interface: io.streamthoughts.jikkou.core.transformation.Transformation
.
/**
* This interface is used to transform or filter resources.
*
* @param <T> The resource type supported by the transformation.
*/
public interface Transformation<T extends HasMetadata> extends Interceptor {
/**
* Executes the transformation on the specified {@link HasMetadata} object.
*
* @param resource The {@link HasMetadata} to be transformed.
* @param resources The {@link ResourceListObject} involved in the current operation.
* @param context The {@link ReconciliationContext}.
* @return The list of resources resulting from the transformation.
*/
@NotNull Optional<T> transform(@NotNull T resource,
@NotNull HasItems resources,
@NotNull ReconciliationContext context);
}
Examples
The transformation class below shows how to filter resource having an annotation exclude: true
.
import java.util.Optional;
@Named("ExcludeIgnoreResource")
@Title("ExcludeIgnoreResource allows filtering resources whose 'metadata.annotations.ignore' property is equal to 'true'")
@Description("The ExcludeIgnoreResource transformation is used to exclude from the"
+ " reconciliation process any resource whose 'metadata.annotations.ignore'"
+ " property is equal to 'true'. This transformation is automatically enabled."
)
@Enabled
@Priority(HasPriority.HIGHEST_PRECEDENCE)
public final class ExcludeIgnoreResourceTransformation implements Transformation<HasMetadata> {
/** {@inheritDoc}**/
@Override
public @NotNull Optional<HasMetadata> transform(@NotNull HasMetadata resource,
@NotNull HasItems resources,
@NotNull ReconciliationContext context) {
return Optional.of(resource)
.filter(r -> HasMetadata.getMetadataAnnotation(resource, "ignore")
.map(NamedValue::getValue)
.map(Value::asBoolean)
.orElse(false)
);
}
}
9 - Frequently Asked Questions
This section regroups all frequently asked questions about Jikkou.
Is Jikkou Free to Use?
Yes, Jikkou is developed and distributed under the Apache License 2.0.
Can I Use Jikkou with Any Kafka Implementation?
Yes, Jikkou can be used with a wide range of Apache Kafka infrastructures, including:
Why would I use Jikkou over Terraform?
What is Terraform and how is it typically used?
Terraform (OpenToFu) is widely recognized as the leading solution for infrastructure provisioning and management. It is commonly used by operations teams for managing cloud infrastructure through its HCL (HashiCorp Configuration Language) syntax.
What are the limitations of Terraform for Kafka Users ?
Many development teams find Terraform challenging to use because:
- They need to learn HCL syntax, which is not commonly known among developers.
- They often lack the necessary permissions to apply configuration files directly.
- They often struggle with Terraform states.
How does Jikkou address these limitations?
Jikkou is designed to be a straightforward CLI tool for both developers and operations teams. It simplifies the process of managing infrastructure, especially for development teams who may not have expertise in HCL or the permissions required for Terraform.
What are the benefits of using Jikkou for Kafka management?
On-Premises and Multi-Cloud Support: Unlike many Terraform providers which focus on cloud-based Kafka services ( e.g., Confluent Cloud), Jikkou supports on-premises, multi-cloud, and hybrid infrastructures.
Versatility: Jikkou can manage Kafka topics across various environments, including local Kafka clusters in Docker, ephemeral clusters in Kubernetes for CI/CD, and production clusters in Aiven Cloud.
Auditing and Backup: Beyond provisioning, Jikkou can audit Kafka platforms for configuration issues and create backups of Kafka configurations (Topics, ACLs, Quotas, etc.).
There are, of course, many reasons to use Terraform rather than Jikkou and vice versa. As usual, the choice of tool really depends on your needs, the organization you’re in, the skills of the people involved and so on.
10 - Community
10.1 - Developer Guide
Prerequisites
- Jdk 17 (see https://sdkman.io/ for installing java locally)
- Git
- Docker and Docker-Compose
- Your favorite IDE
Building Jikkou
We use Maven Wrapper to build our project. The simplest way to get started is:
For building distribution files.
$ ./mvnw clean package -Pdist -DskipTests
Alternatively, we also use Make to package and build the Docker image for Jikkou:
$ make
Running tests
For running all tests and checks:
$ ./mvnw clean verify
Code Format
This project uses the Maven plugin Spotless to format all Java classes and to apply some code quality checks.
Bugs & Security
This project uses the Maven plugin SpotBugs and FindSecBugs to run some static analysis to look for bugs in Java code.
Reported bugs can be analysed using SpotBugs GUI:
$ ./mvnw spotbugs:gui
10.2 - Contribution Guidelines
Jikkou is an open source project, and we love getting patches and contributions to make Jikkou and its docs even better.
Contributing to Jikkou
The Jikkou project itself lives in https://github.com/streamthoughts/jikkou
Code reviews
All submissions, including submissions by project members, require review. We use GitHub pull requests for this purpose. Consult GitHub Help for more information on using pull requests.
Creating issues
Alternatively, if there’s something you’d like to see in Jikkou (or if you’ve found something that isn’t working the way you’d expect), but you’re not sure how to fix it yourself, please create an issue.
11 - Releases
11.1 - Release v0.32.0
Jikkou 0.32.0: Moving Beyond Apache Kafka. Introducing new features: Extension Providers, Actions, etc.!
I’m thrilled to announce the release of Jikkou 0.32.0 which packs two major features: External Extension Providers and Actions. 🙂
Highlights: What’s new in Jikkou 0.32.0?
New External Extension Provider mechanism to extend Jikkou features.
New extension type Action to execute specific operations against resources.
New action for resetting consumer group offsets.
New action for restarting connector and tasks for Kafka Connect.
New option selector-match to exclude/include resources from being returned or reconciled by Jikkou.
New API to get resources by their name.
Extension Providers
Jikkou is a project that continues to reinvent and redefine itself with each new version. Initially developed exclusively to manage the configuration of Kafka topics, it can now be used to manage Schema Registries, Kafka Connect connectors, and more. But, the funny thing is that Jikkou isn’t coupled with Kafka. It was designed around a concept of pluggable extensions that enable new capabilities and kind of resources to be seamlessly added to the project. For this, Jikkou uses the Java Service Loader mechanism to automatically discover new extensions at runtime.
Unfortunately, until now there has been no official way of using this mechanism with Jikkou CLI or Jikkou API Server. For this reason, Jikkou 0.32.0 brings the capability to easily configuration external extensions.
So how does it work? Well, let’s imagine you want to be able to load Text Files from the local filesystem using Jikkou.
First, we need to create a new Java project and add the Jikkou Core library to your project’s dependencies (
io.streamthoughts:jikkou-core:0.32.0 dependency
).
Then, you will need to create some POJO classes to represent your resource (e.g., V1File.class) and to implement the Collector interface :
@SupportedResource(type = V1File.class)
@ExtensionSpec(
options = {
@ExtensionOptionSpec(
name = "directory",
description = "The absolute path of the directory from which to collect files",
type = String.class,
required = true
)
}
)
@Description("FileCollector allows listing all files in a given directory.")
public final class FileCollector
extends ContextualExtension
implements Collector<V1File> {
private static final Logger LOG = LoggerFactory.getLogger(FileCollector.class);
@Override
public ResourceListObject<V1File> listAll(@NotNull Configuration configuration,
@NotNull Selector selector) {
// Get the 'directory' option.
String directory = extensionContext().<String>configProperty("directory").get(configuration);
// Collect all files.
List<V1File> objects = Stream.of(new File(directory).listFiles())
.filter(file -> !file.isDirectory())
.map(file -> {
try {
Path path = file.toPath();
String content = Files.readString(path);
V1File object = new V1File(ObjectMeta
.builder()
.withName(file.getName())
.withAnnotation("system.jikkou.io/fileSize", Files.size(path))
.withAnnotation("system.jikkou.io/fileLastModifier", Files.getLastModifiedTime(path))
.build(),
new V1FileSpec(content)
);
return Optional.of(object);
} catch (IOException e) {
LOG.error("Cannot read content from file: {}", file.getName(), e);
return Optional.<V1File>empty();
}
})
.flatMap(Optional::stream)
.toList();
ObjectMeta objectMeta = ObjectMeta
.builder()
.withAnnotation("system.jikkou.io/directory", directory)
.build();
return new DefaultResourceListObject<>("FileList", "system.jikkou.io/v1", objectMeta, objects);
}
}
Next, you will need to implement the ExtensionProvider interface to register both your extension and your resource kind.
public final class FileExtensionProvider implements ExtensionProvider {
/**
* Registers the extensions for this provider.
*
* @param registry The ExtensionRegistry.
*/
public void registerExtensions(@NotNull ExtensionRegistry registry) {
registry.register(FileCollector.class, FileCollector::new);
}
/**
* Registers the resources for this provider.
*
* @param registry The ResourceRegistry.
*/
public void registerResources(@NotNull ResourceRegistry registry) {
registry.register(V1File.class);
}
}
Then, the fully qualified name of the class must be added to the resource file META-INF/service/io.streamthoughts.jikkou.spi.ExtensionProvider.
Finally, all you need to do is to package your project as a tarball or ZIP archive. The archive must contain a single top-level directory containing the extension JAR files, as well as any resource files or third-party libraries required by your extensions.
To install your Extension Provider, all you need to do is to unpacks the archive into a desired location (e.g., /usr/share/jikkou-extensions) and to configure the Jikkou’s API Server or Jikkou CLI (when running the Java Binary Distribution, i.e., not the native version) with the jikkou.extension.paths property (e.g., jikkou.extension.paths=/usr/share/jikkou-extensions). For people who are familiar with how Kafka Connect works, it’s more or less the same mechanism.
(The full code source of this example is available on GitHub).
And that’s it! 🙂
Extension Providers open up the possibility of infinitely expanding Jikkou to manage your own resources, and use it with systems other than Kafka.
Actions
Jikkou uses a declarative approach to manage the asset state of your data infrastructure using resource descriptors written in YAML. But sometimes, ops and development teams may need to perform specific operations on their resources that cannot be included in their descriptor files. For example, you may need to reset offsets for one or multiple Kafka Consumer Groups, restart failed connectors and tasks for Kafka Connect, etc. So instead of having to switch from one tool to another, why not just use Jikkou for this?
Well, to solve that need, Jikkou 0.32.0 introduces a new type of extension called “Actions” that allows users to perform specific operations on resources.
Combined with the External Extension Provider mechanism, you can now implement the Action interface to add custom operations to Jikkou.
@Category(ExtensionCategory.ACTION)
public interface Action<T extends HasMetadata> extends HasMetadataAcceptable, Extension {
/**
* Executes the action.
*
* @param configuration The configuration
* @return The ExecutionResultSet
*/
@NotNull
ExecutionResultSet<T> execute(@NotNull Configuration configuration);
}
Actions are fully integrated with Jikkou API Server through the new Endpoint: /api/v1/actions/{name}/execute{?[options]
Kafka Consumer Groups
Altering Consumer Group Offsets
Jikkou 0.32.0 introduces the new KafkaConsumerGroupsResetOffsets action allows resetting offsets of consumer groups.
Here is an example showing how to reset the group my-group to the earliest offsets for topic test.
$ jikkou action kafkaconsumergroupresetoffsets execute \
--group my-group \
--topic test \
--to-earliest
(output)
kind: "ApiActionResultSet"
apiVersion: "core.jikkou.io/v1"
metadata:
labels: { }
annotations:
configs.jikkou.io/to-earliest: "true"
configs.jikkou.io/group: "my-group"
configs.jikkou.io/dry-run: "false"
configs.jikkou.io/topic:
- "test"
results:
- status: "SUCCEEDED"
errors: [ ]
data:
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConsumerGroup"
metadata:
name: "my-group"
labels:
kafka.jikkou.io/is-simple-consumer: false
annotations: { }
status:
state: "EMPTY"
members: [ ]
offsets:
- topic: "test"
partition: 1
offset: 0
coordinator:
id: "101"
host: "localhost"
port: 9092
This action is pretty similar to the kafka-consumer-group script that ships with Apache Kafka. You can use it to reset a consumer group to the earliest or latest offsets, to a specific datetime or specific offset.
In addition, it can be executed in a dry-run.
Deleting Consumer Groups
You can now add the core annotation jikkou.io/delete to a KafkaConsumerGroup resource to mark it for deletion:
---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConsumerGroup"
metadata:
name: "my-group"
labels:
kafka.jikkou.io/is-simple-consumer: false
annotations:
jikkou.io/delete: true
$ jikkou delete --files my-consumer-group.yaml -o wide
TASK [DELETE] Delete consumer group 'my-group' - CHANGED ************************************************
{
"status" : "CHANGED",
"changed" : true,
"failed" : false,
"end" : 1701162781494,
"data" : {
"apiVersion" : "core.jikkou.io/v1beta2",
"kind" : "GenericResourceChange",
"metadata" : {
"name" : "my-group",
"labels" : {
"kafka.jikkou.io/is-simple-consumer" : false
},
"annotations" : {
"jikkou.io/delete" : true,
"jikkou.io/managed-by-location" : "./my-consumer-group.yaml"
}
},
"change" : {
"before" : {
"apiVersion" : "kafka.jikkou.io/v1beta1",
"kind" : "KafkaConsumerGroup",
"metadata" : {
"name" : "my-group",
"labels" : {
"kafka.jikkou.io/is-simple-consumer" : false
},
"annotations" : { }
}
},
"operation" : "DELETE"
}
},
"description" : "Delete consumer group 'my-group'"
}
EXECUTION in 64ms
ok : 0, created : 0, altered : 0, deleted : 1 failed : 0
Kafka Connect
Restarting Connector and Tasks
This new release also packs with the new action KafkaConnectRestartConnectors action allows a user to restart all or just the failed Connector and Task instances for one or multiple named connectors.
Here are a few examples from the documentation:
- Restarting all connectors for all Kafka Connect clusters.
$ jikkou action kafkaconnectrestartconnectors execute
---
kind: "ApiActionResultSet"
apiVersion: "core.jikkou.io/v1"
metadata:
labels: {}
annotations: {}
results:
- status: "SUCCEEDED"
data:
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
name: "local-file-sink"
labels:
kafka.jikkou.io/connect-cluster: "my-connect-cluster"
annotations: {}
spec:
connectorClass: "FileStreamSink"
tasksMax: 1
config:
file: "/tmp/test.sink.txt"
topics: "connect-test"
state: "RUNNING"
status:
connectorStatus:
name: "local-file-sink"
connector:
state: "RUNNING"
workerId: "connect:8083"
tasks:
- id: 0
state: "RUNNING"
workerId: "connect:8083"
- Restarting a specific connector and tasks for on Kafka Connect cluster
$ jikkou action kafkaconnectrestartconnectors execute \
--cluster-name my-connect-cluster
--connector-name local-file-sink \
--include-tasks
New Selector Matching Strategy
Jikkou CLI allows you to provide one or multiple *selector expressions *in order to include or exclude resources from being returned or reconciled by Jikkou. In previous versions, selectors were cumulative, so resources had to match all selectors to be returned. Now, you can specify a selector matching strategy to determine how expressions must be combined using the option –selector-match=[ANY|ALL|NONE].
ALL: A resource is selected if it matches all selectors.
ANY: A resource is selected if it matches one of the selectors.
NONE: A resource is selected if it matches none of the selectors.
For example, the below command will only return topics matching the regex ^__.* or having a name equals to _schemas.
$ jikkou get kafkatopics \
--selector 'metadata.name MATCHES (^__connect-*)'
--selector 'metadata.name IN (_schemas)'
--selector-match ANY
New Get Resource by Name
In some cases, it may be necessary to retrieve only a specific resource for a specific name. In previous versions, the solution was to use selectors. Unfortunately, this approach isn’t very efficient, as it involves retrieving all the resources and then filtering them. To start solving that issue, Jikkou v0.32.0 adds a new API to retrieve a resource by its name.
Example (using Jikkou CLI):
$ jikkou get kafkatopics --name _schemas
Example (using Jikkou API Server):
$ curl -sX GET \
http://localhost:28082/apis/kafka.jikkou.io/v1/kafkatopics/_schemas \
-H 'Accept:application/json'
Note : Currently not all resources have been updated to use that new API, so it’s possible that selectors are used as a default implementation by internal Jikkou API.
11.2 - Release v0.33.0
Introducing Jikkou 0.33.0
We’re excited to unveil the latest release of Jikkou 0.33.0. 🎉
To install the new version, please visit the installation guide. For detailed release notes, check out the GitHub page.
What’s New in Jikkou 0.33.0?
- Enhanced resource change format.
- Added support for the patch command.
- Introduced the new
--status
option forKafkaTopic
resources. - Exported offset-lag to the status of
KafkaConsumerGroup
resources.
Below is a summary of these new features with examples.
Diff/Patch Commands
In previous versions, Jikkou provided the diff
command to display changes required to reconcile input resources.
However, this command lacked certain capabilities to be truly useful. This new version introduces a standardized change
format for all resource types, along with two new options for filtering changes:
--filter-resource-op=
: Filters out all state changes except those corresponding to the given operations.--filter-change-op=
: Filters out all resources except those corresponding to the given operations.
The new output format you can expect from the diff
command is as follows:
---
apiVersion: [ group/version of the change ]
kind: [ kind of the change ]
metadata: [ resource metadata ]
spec:
# Array of state changes
changes:
- name: [ name of the changed state ]
op: [ change operation ]
before: [ value of the state before the operation ]
after: [ value of the state after the operation ]
data: [ static data attached to the resource ]
op: [ resource change operation ]
The primary motivation behind this new format is the introduction of a new patch command. Prior to Jikkou 0.33.0,
when
using the apply
command after a dry-run
or a diff
command, Jikkou couldn’t guarantee that the applied changes
matched
those returned from the previous command. With Jikkou 0.33.0, you can now directly pass the result of the diff
command
to the new patch
command to efficiently apply the desired state changes.
Here’s a workflow to create your resources:
Step 1) Create a resource descriptor file
cat << EOF > my-topic.yaml
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: KafkaTopic
metadata:
name: 'my-topic'
labels:
environment: example
spec:
partitions: 3
replicas: 1
configs:
min.insync.replicas: 1
cleanup.policy: 'delete'
EOF
Step 2) Run diff
jikkou diff -f ./my-topic.yaml > my-topic-diff.yaml
(output)
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaTopicChange"
metadata:
name: "my-topic"
labels:
environment: "example"
annotations:
jikkou.io/managed-by-location: "my-topic.yaml"
spec:
changes:
- name: "partitions"
op: "CREATE"
after: 3
- name: "replicas"
op: "CREATE"
after: 1
- name: "config.cleanup.policy"
op: "CREATE"
after: "delete"
- name: "config.min.insync.replicas"
op: "CREATE"
after: 1
op: "CREATE"
Step 3) Run patch
jikkou patch -f ./my-topic-diff.yaml --mode FULL --output compact
(output)
TASK [
CREATE
] Create topic 'my-topic' (partitions=3, replicas=1, configs=[cleanup.policy=delete, min.insync.replicas=1]) - CHANGED
EXECUTION in 3s 797ms
ok: 0, created: 1, altered: 0, deleted: 0 failed: 0
Attempting to apply the changes a second time may result in an error from the remote system:
{
"status": "FAILED",
"description": "Create topic 'my-topic' (partitions=3, replicas=1,configs=[cleanup.policy=delete,min.insync.replicas=1])",
"errors": [ {
"message": "TopicExistsException: Topic 'my-topic' already exists."
} ],
...
}
Resource Provider for Apache Kafka
Jikkou 0.33.0 also packs with some minor improvements for the Apache Kafka provider.
KafkaTopic Status
You can now describe the status of a topic-partitions by using the new --status
option
when getting a KafkaTopic
resource.
jikkou get kt --status --selector "metadata.name IN (my-topic)"
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaTopic"
metadata:
name: "my-topic"
labels:
kafka.jikkou.io/topic-id: "UbZI2N2YQTqfNcbKKHps5A"
annotations:
kafka.jikkou.io/cluster-id: "xtzWWN4bTjitpL3kfd9s5g"
spec:
partitions: 1
replicas: 1
configs:
cleanup.policy: "delete"
configMapRefs: [ ]
status:
partitions:
- id: 0
leader: 101
isr:
- 101
replicas:
- 101
KafkaConsumerGroup OffsetLags
With Jikkou 0.33.0, you can export the offset-lag of a KafkaConsumerGroup
resource using the --offsets
option.
jikkou get kafkaconsumergroups --offsets
---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConsumerGroup"
metadata:
name: "my-group"
labels:
kafka.jikkou.io/is-simple-consumer: false
status:
state: "EMPTY"
members: [ ]
offsets:
- topic: "my-topic"
partition: 0
offset: 16
offset-lag: 0
coordinator:
id: "101"
host: "localhost"
port: 9092
Finally, all those new features are also completely available through the Jikkou REST Server.
Wrapping Up
We hope you enjoy these new features. If you encounter any issues with Jikkou v0.33.0, please feel free to open a GitHub issue on our project page. Don’t forget to give us a ⭐️ on Github to support the team, and join us on Slack.
11.3 - Release v0.34.0
Introducing Jikkou 0.34.0
We’re excited to unveil the latest release of Jikkou 0.34.0. 🎉
To install the new version, please visit the installation guide. For detailed release notes, check out the GitHub page.
What’s New in Jikkou 0.34.0?
- Enhanced Aiven provider with support for Kafka topics.
- Added SSL support for Kafka Connect and Schema Registry
- Introduced dynamic connection for Kafka Connect clusters
Below is a summary of these new features with examples.
Topic Aiven for Apache Kafka
Jikkou 0.34.0 adds a new KafkaTopic
kind
that can be used to manage kafka Topics directly though the Aiven API.
You can list kafka topics using the new command below:
jikkou get avn-kafkatopics
In addition, topics can be described, created and updated using the same resource model as the Apache Kafka provider.
# file:./aiven-kafkat-topics.yaml
---
apiVersion: "kafka.aiven.io/v1beta2"
kind: "KafkaTopic"
metadata:
name: "test"
labels:
tag.aiven.io/my-tag: "test-tag"
spec:
partitions: 1
replicas: 3
configs:
cleanup.policy: "delete"
compression.type: "producer"
The main advantages of using this new resource kind are the use of the Aiven Token API to authenticate to the Aiven API and the ability to manage tags for kafka topics.
SSL support for Kafka Connect and Schema Registry
Jikkou 0.34.0 also brings SSL support for the Kafka Connect and Schema Registry providers. Therefore, it’s now possible to configure the providers to authenticate using SSL certificate.
Example for the Schema Registry:
jikkou {
schemaRegistry {
url = "https://localhost:8081"
authMethod = "SSL"
sslKeyStoreLocation = "/certs/registry.keystore.jks"
sslKeyStoreType = "JKS"
sslKeyStorePassword = "password"
sslKeyPassword = "password"
sslTrustStoreLocation = "/certs/registry.truststore.jks"
sslTrustStoreType = "JKS"
sslTrustStorePassword = "password"
sslIgnoreHostnameVerification = true
}
}
Dynamically connection for Kafka Connect clusters
Before Jikkou 0.34.0, to deploy a Kafka Connect connector, it was mandatory to configure a connection to a target cluster:
jikkou {
extensions.provider.kafkaconnect.enabled = true
kafkaConnect {
clusters = [
{
name = "my-connect-cluster"
url = "http://localhost:8083"
}
]
}
}
This connection could then be referenced in a connector resource definition via the
annotation kafka.jikkou.io/connect-cluster
.
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
name: "mu-connector"
labels:
kafka.jikkou.io/connect-cluster: "my-connect-cluster"
This approach is suitable for most use cases, but can be challenging if you need to manage a large and dynamic number of Kafka Connect clusters.
To meet this need, it is now possible to provide connection information for the cluster to connect to directly, through
the new metadata annotation new metadata annotation: jikkou.io/config-override
.
Here is a simple example showing the use of the new annotation:
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
name: "mu-connector"
labels:
kafka.jikkou.io/connect-cluster: "my-connect-cluster"
annotations:
jikkou.io/config-override: |
{ "url": "http://localhost:8083" }
This new annotation can be used with the Jikkou’s Jinja template creation mechanism to define dynamic configuration.
Wrapping Up
We hope you enjoy these new features. If you encounter any issues with Jikkou v0.33.0, please feel free to open a GitHub issue on our project page. Don’t forget to give us a ⭐️ on Github to support the team, and join us on Slack.
12 - Resources
Blog
- GitOps & Kafka: Enabling smooth and seamless Data Schema management with Jikkou and GitHub Actions (by Florian Hussonnois)
- Jikkou: Declarative ACLs configuration for Apache Kafka® and Schema Registry on Aiven (by Florian Hussonnois)
- Kafka Connect + Jikkou- Easily manage Kafka connectors (by Florian Hussonnois)
- Why is Managing Kafka Topics Still Such a Pain? Introducing Jikkou! (Florian Hussonnois)