This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Apache Kafka

Lean how to use the Jikkou Extension Provider for Apache Kafka.

Here, you will find information to use the Apache Kafka extensions.

More information:

1 - Configuration

Learn how to configure the extensions for Apache Kafka.

Here, you will find the list of resources supported for Apache Kafka.

Configuration

The Apache Kafka extension is built on top of the Kafka Admin Client. You can configure the properties to be passed to kafka client through the Jikkou client configuration property jikkou.kafka.client.

Example:

jikkou {
  kafka {
    client {
      bootstrap.servers = "localhost:9092"
      security.protocol = "SSL"
      ssl.keystore.location = "/tmp/client.keystore.p12"
      ssl.keystore.password = "password"
      ssl.keystore.type = "PKCS12"
      ssl.truststore.location = "/tmp/client.truststore.jks"
      ssl.truststore.password = "password"
      ssl.key.password = "password"
    }
  }
}

In addition, the extension support configuration settings to wait for at least a minimal number of brokers before processing.

jikkou {
  kafka {
    brokers {
      # If 'True' 
      waitForEnabled = true
      waitForEnabled = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_ENABLED}
      # The minimal number of brokers that should be alive for the CLI stops waiting.
      waitForMinAvailable = 1
      waitForMinAvailable = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_MIN_AVAILABLE}
      # The amount of time to wait before verifying that brokers are available.
      waitForRetryBackoffMs = 1000
      waitForRetryBackoffMs = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_RETRY_BACKOFF_MS}
      # Wait until brokers are available or this timeout is reached.
      waitForTimeoutMs = 60000
      waitForTimeoutMs = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_TIMEOUT_MS}
    }
  }
}

2 - Resources

Learn how to use the built-in resources provided by the Extension Provider for Apache Kafka.

Here, you will find the list of resources supported for Apache Kafka.

Apache Kafka Resources

More information:

2.1 - Kafka Brokers

Learn how to manage Kafka Brokers.

This section describes the resource definition format for kafkabrokers entities, which can be used to define the brokers you plan to manage on a specific Kafka cluster.

Listing KafkaBroker

You can retrieve the state of Kafka Consumer Groups using the jikkou get kafkabrokers (or jikkou get kb) command.

Usage

Usage:

Get all 'KafkaBroker' resources.

jikkou get kafkabrokers [-hV] [--default-configs] [--dynamic-broker-configs]
                        [--list] [--static-broker-configs]
                        [--logger-level=<level>] [-o=<format>]
                        [-s=<expressions>]...

DESCRIPTION:

Use jikkou get kafkabrokers when you want to describe the state of all
resources of type 'KafkaBroker'.

OPTIONS:

      --default-configs   Describe built-in default configuration for configs
                            that have a default value.
      --dynamic-broker-configs
                          Describe dynamic configs that are configured as
                            default for all brokers or for specific broker in
                            the cluster.
  -h, --help              Show this help message and exit.
      --list              Get resources as ResourceListObject.
      --logger-level=<level>
                          Specify the log level verbosity to be used while
                            running a command.
                          Valid level values are: TRACE, DEBUG, INFO, WARN,
                            ERROR.
                          For example, `--logger-level=INFO`
  -o, --output=<format>   Prints the output in the specified format. Allowed
                            values: json, yaml (default yaml).
  -s, --selector=<expressions>
                          The selector expression used for including or
                            excluding resources.
      --static-broker-configs
                          Describe static configs provided as broker properties
                            at start up (e.g. server.properties file).
  -V, --version           Print version information and exit.

(The output from your current Jikkou version may be different from the above example.)

Examples

(command)

$ jikkou get kafkabrokers --static-broker-configs

(output)

---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaBroker"
metadata:
  name: "101"
  labels: {}
  annotations:
    kafka.jikkou.io/cluster-id: "xtzWWN4bTjitpL3kfd9s5g"
spec:
  id: "101"
  host: "localhost"
  port: 9092
  configs:
    advertised.listeners: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
    authorizer.class.name: "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
    broker.id: "101"
    controller.listener.names: "CONTROLLER"
    controller.quorum.voters: "101@kafka:29093"
    inter.broker.listener.name: "PLAINTEXT"
    listener.security.protocol.map: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
    listeners: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://kafka:29093"
    log.dirs: "/var/lib/kafka/data"
    node.id: "101"
    offsets.topic.replication.factor: "1"
    process.roles: "broker,controller"
    transaction.state.log.replication.factor: "1"
    zookeeper.connect: ""

2.2 - Kafka Consumer Groups

Learn how to manage Kafka Consumer Groups.

This section describes the resource definition format for KafkaConsumerGroup entities, which can be used to define the consumer groups you plan to manage on a specific Kafka cluster.

Listing KafkaConsumerGroup

You can retrieve the state of Kafka Consumer Groups using the jikkou get kafkaconsumergroups (or jikkou get kcg) command.

Usage

$ jikkou get kafkaconsumergroups --help

Usage:

Get all 'KafkaConsumerGroup' resources.

jikkou get kafkaconsumergroups [-hV] [--list] [--offsets]
                               [--logger-level=<level>] [-o=<format>]
                               [--in-states=PARAM]... [-s=<expressions>]...

DESCRIPTION:

Use jikkou get kafkaconsumergroups when you want to describe the state of all
resources of type 'KafkaConsumerGroup'.

OPTIONS:

  -h, --help              Show this help message and exit.
      --in-states=PARAM   If states is set, only groups in these states will be
                            returned. Otherwise, all groups are returned. This
                            operation is supported by brokers with version
                            2.6.0 or later
      --list              Get resources as ResourceListObject.
      --logger-level=<level>
                          Specify the log level verbosity to be used while
                            running a command.
                          Valid level values are: TRACE, DEBUG, INFO, WARN,
                            ERROR.
                          For example, `--logger-level=INFO
  -o, --output=<format>   Prints the output in the specified format. Allowed
                            values: json, yaml (default yaml).
      --offsets           Specify whether consumer group offsets should be
                            described.
  -s, --selector=<expressions>
                          The selector expression used for including or
                            excluding resources.
  -V, --version           Print version information and exit

(The output from your current Jikkou version may be different from the above example.)

Examples

(command)

$ jikkou get kafkaconsumergroups --in-states STABLE --offsets

(output)

---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConsumerGroup"
metadata:
  name: "my-group"
  labels:
    kafka.jikkou.io/is-simple-consumer: false
status:
  state: "STABLE"
  members:
    - memberId: "console-consumer-b103994e-bcd5-4236-9d03-97065057e594"
      clientId: "console-consumer"
      host: "/127.0.0.1"
      assignments:
        - "my-topic-0"
      offsets:
        - topic: "my-topic"
          partition: 0
          offset: 0
  coordinator:
    id: "101"
    host: "localhost"
    port: 9092

2.3 - Kafka Topics

Learn how to manage Kafka Topics.

KafkaTopic resources are used to define the topics you want to manage on your Kafka Cluster(s). A KafkaTopic resource defines the number of partitions, the replication factor, and the configuration properties to be associated to a topics.

KafkaTopic

Specification

Here is the resource definition file for defining a KafkaTopic.

apiVersion: "kafka.jikkou.io/v1beta2"  # The api version (required)
kind: "KafkaTopic"                     # The resource kind (required)
metadata:
  name: <The name of the topic>        # (required)
  labels: { }
  annotations: { }
spec:
  partitions: <Number of partitions>   # (optional)
  replicas: <Number of replicas>       # (optional)
  configs:
    <config_key>: <Config Value>       # The topic config properties keyed by name to override (optional)
  configMapRefs: [ ]                   # The list of ConfigMap to be applied to this topic (optional)

The metadata.name property is mandatory and specifies the name of the kafka topic.

To use the cluster default values for the number of partitions and replicas you can set the property spec.partitions and spec.replicas to -1.

Example

Here is a simple example that shows how to define a single YAML file containing two topic definition using the KafkaTopic resource type.

file: kafka-topics.yaml

---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: KafkaTopic
metadata:
  name: 'my-topic-p1-r1'  # Name of the topic
  labels:
    environment: example
spec:
  partitions: 1           # Number of topic partitions (use -1 to use cluster default)
  replicas: 1             # Replication factor per partition (use -1 to use cluster default)
  configs:
    min.insync.replicas: 1
    cleanup.policy: 'delete'
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: KafkaTopic
metadata:
  name: 'my-topic-p2-r1'   # Name of the topic 
  labels:
    environment: example
spec:
  partitions: 2             # Number of topic partitions (use -1 to use cluster default)
  replicas: 1               # Replication factor per partition (use -1 to use cluster default)
  configs:
    min.insync.replicas: 1
    cleanup.policy: 'delete'

See official Apache Kafka documentation for details about the topic-level configs.

KafkaTopicList

If you need to define multiple topics (e.g. using a template), it may be easier to use a KafkaTopicList resource.

Specification

Here the resource definition file for defining a KafkaTopicList.

apiVersion: "kafka.jikkou.io/v1beta2"  # The api version (required)
kind: "KafkaTopicList"                 # The resource kind (required)
metadata: # (optional)
  labels: { }
  annotations: { }
items: [ ]                             # An array of KafkaTopic

Example

Here is a simple example that shows how to define a single YAML file containing two topic definitions using the KafkaTopicList resource type. In addition, the example uses a ConfigMap object to define the topic configuration only once.

file: kafka-topics.yaml

---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: KafkaTopicList
metadata:
  labels:
    environment: example
items:
  - metadata:
      name: 'my-topic-p1-r1'
    spec:
      partitions: 1
      replicas: 1
      configMapRefs: [ "TopicConfig" ]

  - metadata:
      name: 'my-topic-p2-r1'
    spec:
      partitions: 2
      replicas: 1
      configMapRefs: [ "TopicConfig" ]
---
apiVersion: "core.jikkou.io/v1beta2"
kind: ConfigMap
metadata:
  name: 'TopicConfig'
data:
  min.insync.replicas: 1
  cleanup.policy: 'delete'

2.4 - Kafka Users

Learn how to manage Kafka Users.

This section describes the resource definition format for KafkaUser entities, which can be used to manage SCRAM Users for Apache Kafka.

Definition Format of KafkaUser

Below is the overall structure of the KafkaUser resource.

---
apiVersion: kafka.jikkou.io/v1  # The api version (required)
kind: KafkaUser                 # The resource kind (required)
metadata:
  name: <string>
  annotations:
    # force update
    kafka.jikkou.io/force-password-renewal: <boolean>
spec:
  authentications:
    - type: <enum> # or 
      password: <string>  # leave empty to generate secure password

See below for details about all these fields.

Metadata

metadata.name [required]

The name of the User.

kafka.jikkou.io/force-password-renewal [optional]

Specification

spec.authentications [required]

The list of authentications to manage for the user.

spec.authentications[].type [required]

The authentication type:

  • scram-sha-256
  • scram-sha-512

spec.authentications[].password [required]

The password of the user.

Examples

The following is an example of a resource describing a User:

---
# Example: file: kafka-scram-users.yaml
apiVersion: "kafka.jikkou.io/v1"
kind: "User"
metadata:
  name: "Bob"
spec:
  authentications:
    - type: scram-sha-256
      password: null
    - type: scram-sha-512
      password: null

Listing Kafka Users

You can retrieve the SCRAM users of a Kafka cluster using the jikkou get kafkausers (or jikkou get ku) command.

Usage

$ jikkou get kc --help

Usage:

Get all 'KafkaUser' resources.

jikkou get kafkausers [-hV] [--list] [--logger-level=<level>] [--name=<name>]
                      [-o=<format>]
                      [--selector-match=<selectorMatchingStrategy>]
                      [-s=<expressions>]...

DESCRIPTION:

Use jikkou get kafkausers when you want to describe the state of all resources
of type 'KafkaUser'.

OPTIONS:

  -h, --help              Show this help message and exit.
      --list              Get resources as ResourceListObject (default: false).
      --logger-level=<level>
                          Specify the log level verbosity to be used while
                            running a command.
                          Valid level values are: TRACE, DEBUG, INFO, WARN,
                            ERROR.
                          For example, `--logger-level=INFO`
      --name=<name>       The name of the resource.
  -o, --output=<format>   Prints the output in the specified format. Valid
                            values: JSON, YAML (default: YAML).
  -s, --selector=<expressions>
                          The selector expression used for including or
                            excluding resources.
      --selector-match=<selectorMatchingStrategy>
                          The selector matching strategy. Valid values: NONE,
                            ALL, ANY (default: ALL)
  -V, --version           Print version information and exit.

(The output from your current Jikkou version may be different from the above example.)

Examples

(command)

$ jikkou get ku

(output)

apiVersion: "kafka.jikkou.io/v1"
kind: "KafkaUser"
metadata:
  name: "Bob"
  labels: {}
  annotations:
    kafka.jikkou.io/cluster-id: "xtzWWN4bTjitpL3kfd9s5g"
spec:
  authentications:
    - type: "scram-sha-256"
      iterations: 8192
    - type: "scram-sha-512"
      iterations: 8192

2.5 - Kafka Authorizations

Learn how to manage Kafka Authorizations and ACLs.

KafkaPrincipalAuthorization resources are used to define Access Control Lists (ACLs) for principals authenticated to your Kafka Cluster.

Jikkou can be used to describe all ACL policies that need to be created on Kafka Cluster

KafkaPrincipalAuthorization

Specification

---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalAuthorization"
metadata:
  name: "User:Alice"
spec:
  roles: [ ]                     # List of roles to be added to the principal (optional)
  acls:                          # List of KafkaPrincipalACL (required)
    - resource:
        type: <The type of the resource>                            #  (required)
        pattern: <The pattern to be used for matching resources>    #  (required) 
        patternType: <The pattern type>                             #  (required) 
      type: <The type of this ACL>     # ALLOW or DENY (required) 
      operations: [ ]                  # Operation that will be allowed or denied (required) 
      host: <HOST>                     # IP address from which principal will have access or will be denied (optional)

For more information on how to define authorization and ACLs, see the official Apache Kafka documentation: Security

Operations

The list below describes the valid values for the spec.acls.[].operations property :

  • READ
  • WRITE
  • CERATE
  • DELETE
  • ALTER
  • DESCRIBE
  • CLUSTER_ACTION
  • DESCRIBE_CONFIGS
  • ALTER_CONFIGS
  • IDEMPOTENT_WRITE
  • CREATE_TOKEN
  • DESCRIBE_TOKENS
  • ALL

For more information see official Apache Kafka documentation: Operations in Kafka

Resource Types

The list below describes the valid values for the spec.acls.[].resource.type property :

  • TOPIC
  • GROUP
  • CLUSTER
  • USER
  • TRANSACTIONAL_ID

For more information see official Apache Kafka documentation: Resources in Kafka

Pattern Types

The list below describes the valid values for the spec.acls.[].resource.patternType property :

  • LITERAL: Use to allow or denied a principal to have access to a specific resource name.
  • MATCH: Use to allow or denied a principal to have access to all resources matching the given regex.
  • PREFIXED: Use to allow or denied a principal to have access to all resources having the given prefix.

Example

---
apiVersion: "kafka.jikkou.io/v1beta2"    # The api version (required)
kind: "KafkaPrincipalAuthorization"      # The resource kind (required)
metadata:
  name: "User:Alice"
spec:
  acls:
    - resource:
        type: 'topic'
        pattern: 'my-topic-'
        patternType: 'PREFIXED'
      type: "ALLOW"
      operations: [ 'READ', 'WRITE' ]
      host: "*"
    - resource:
        type: 'topic'
        pattern: 'my-other-topic-.*'
        patternType: 'MATCH'
      type: 'ALLOW'
      operations: [ 'READ' ]
      host: "*"
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalAuthorization"
metadata:
  name: "User:Bob"
spec:
  acls:
    - resource:
        type: 'topic'
        pattern: 'my-topic-'
        patternType: 'PREFIXED'
      type: 'ALLOW'
      operations: [ 'READ', 'WRITE' ]
      host: "*"

KafkaPrincipalRole

Specification

apiVersion: "kafka.jikkou.io/v1beta2"    # The api version (required)
kind: "KafkaPrincipalRole"               # The resource kind (required)
metadata:
  name: <Name of role>                   # The name of the role (required)  
spec:
acls: [ ]                                # A list of KafkaPrincipalACL (required)

Example

---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalRole"
metadata:
  name: "KafkaTopicPublicRead"
spec:
  acls:
    - type: "ALLOW"
      operations: [ 'READ' ]
      resource:
        type: 'topic'
        pattern: '/public-([.-])*/'
        patternType: 'MATCH'
      host: "*"
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalRole"
metadata:
  name: "KafkaTopicPublicWrite"
spec:
  acls:
    - type: "ALLOW"
      operations: [ 'WRITE' ]
      resource:
        type: 'topic'
        pattern: '/public-([.-])*/'
        patternType: 'MATCH'
      host: "*"
---

apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalAuthorization"
metadata:
  name: "User:Alice"
spec:
  roles:
    - "KafkaTopicPublicRead"
    - "KafkaTopicPublicWrite"
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalAuthorization"
metadata:
  name: "User:Bob"
spec:
  roles:
    - "KafkaTopicPublicRead"

2.6 - Kafka Quotas

Learn how to manage Kafka Client Quotas

KafkaClientQuota resources are used to define the quota limits to be applied on Kafka consumers and producers. A KafkaClientQuota resource can be used to apply limit to consumers and/or producers identified by a client-id or a user principal.

KafkaClientQuota

Specification

Here is the resource definition file for defining a KafkaClientQuota.

apiVersion: "kafka.jikkou.io/v1beta2" # The api version (required)
kind: "KafkaClientQuota"              # The resource kind (required)
metadata: # (optional)
  labels: { }
  annotations: { }
spec:
  type: <The quota type> # (required)       
  entity:
    clientId: <The id of the client>    # (required depending on the quota type).
    user: <The principal of the user>   # (required depending on the quota type).
  configs:
    requestPercentage: <The quota in percentage (%) of total requests>      # (optional)
    producerByteRate: <The quota in bytes for restricting data production>  # (optional)
    consumerByteRate: <The quota in bytes for restricting data consumption> # (optional)

Quota Types

The list below describes the supported quota types:

  • USERS_DEFAULT: Set default quotas for all users.
  • USER: Set quotas for a specific user principal.
  • USER_CLIENT: Set quotas for a specific user principal and a specific client-id.
  • USER_ALL_CLIENTS: Set default quotas for a specific user and all clients.
  • CLIENT: Set default quotas for a specific client.
  • CLIENTS_DEFAULT: Set default quotas for all clients.

Example

Here is a simple example that shows how to define a single YAML file containing two quota definitions using the KafkaClientQuota resource type.

file: kafka-quotas.yaml

---
apiVersion: 'kafka.jikkou.io/v1beta2'
kind: 'KafkaClientQuota'
metadata:
  labels: { }
  annotations: { }
spec:
  type: 'CLIENT'
  entity:
    clientId: 'my-client'
  configs:
    requestPercentage: 10
    producerByteRate: 1024
    consumerByteRate: 1024
---
apiVersion: 'kafka.jikkou.io/v1beta2'
kind: 'KafkaClientQuota'
metadata:
  labels: { }
  annotations: { }
spec:
  type: 'USER'
  entity:
    user: 'my-user'
  configs:
    requestPercentage: 10
    producerByteRate: 1024
    consumerByteRate: 1024

KafkaClientQuotaList

If you need to define multiple topics (e.g. using a template), it may be easier to use a KafkaClientQuotaList resource.

Specification

Here the resource definition file for defining a KafkaTopicList.

apiVersion: "kafka.jikkou.io/v1beta2"  # The api version (required)
kind: "KafkaClientQuotaList"           # The resource kind (required)
metadata: # (optional)
  name: <The name of the topic>
  labels: { }
  annotations: { }
items: [ ]                              # An array of KafkaClientQuota

Example

Here is a simple example that shows how to define a single YAML file containing two KafkaClientQuota definition using the KafkaClientQuotaList resource type.

apiVersion: 'kafka.jikkou.io/v1beta2'
kind: 'KafkaClientQuotaList'
metadata:
  labels: { }
  annotations: { }
items:
  - spec:
    type: 'CLIENT'
    entity:
      clientId: 'my-client'
    configs:
      requestPercentage: 10
      producerByteRate: 1024
      consumerByteRate: 1024

  - spec:
      type: 'USER'
      entity:
        user: 'my-user'
      configs:
        requestPercentage: 10
        producerByteRate: 1024
        consumerByteRate: 1024

2.7 - Kafka Table Records

Learn how to manage a KTable Topic Records

A KafkaTableRecord resource can be used to produce a key/value record into a given compacted topic, i.e., a topic with cleanup.policy=compact (a.k.a. KTable).

KafkaTableRecord

Specification

Here is the resource definition file for defining a KafkaTableRecord.

apiVersion: "kafka.jikkou.io/v1beta1" # The api version (required)
kind: "KafkaTableRecord"              # The resource kind (required)
metadata:
  labels: { }
  annotations: { }
spec:
  type: <string>      # The topic name (required)        
  headers: # The list of headers
    - name: <string>
      value: <string>
  key: # The record-key (required)
    type: <string>                   # The record-key type. Must be one of: BINARY, STRING, JSON (required)
    data: # The record-key in JSON serialized form.
      $ref: <url or path>            # Or an url to a local file containing the JSON string value.
  value: # The record-value (required)
    type: <string>                   # The record-value type. Must be one of: BINARY, STRING, JSON (required)
    data: # The record-value in JSON serialized form.
      $ref: <url or path>            # Or an url to a local file containing the JSON string value.

Usage

The KafkaTableRecord resource has been designed primarily to manage reference data published and shared via Kafka. Therefore, it is highly recommended to use this resource only with compacted Kafka topics containing a small amount of data.

Examples

Here are some examples that show how to a KafkaTableRecord using the different supported data type.

STRING:

---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaTableRecord"
spec:
  topic: "my-topic"
  headers:
    - name: "content-type"
      value: "application/text"
  key:
    type: STRING
    data: |
      "bar"      
  value:
    type: STRING
    data: |
      "foo"      

JSON:

---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaTableRecord"
spec:
  topic: "my-topic"
  headers:
    - name: "content-type"
      value: "application/text"
  key:
    type: STRING
    data: |
      "bar"      
  value:
    type: JSON
    data: |
      {
        "foo": "bar"
      }      

BINARY:

---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaTableRecord"
spec:
  topic: "my-topic"
  headers:
    - name: "content-type"
      value: "application/text"
  key:
    type: STRING
    data: |
      "bar"      
  value:
    type: BINARY
    data: |
      "eyJmb28iOiAiYmFyIn0K"      

3 - Transformations

Learn how to use the built-in transformation provided by the Extension Provider for Apache Kafka.

Here, you will find information to use the built-in transformations for Apache Kafka resources.

More information:

3.1 - KafkaTopicMaxNumPartitions

This transformation can be used to enforce a maximum value for the number of partitions of kafka topics.

Configuration

NameTypeDescriptionDefault
maxNumPartitionsIntmaximum value for the number of partitions to be used for Kafka Topics

Example

jikkou {
  transformations: [
    {
      type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMaxNumPartitions
      priority = 100
      config = {
        maxNumPartitions = 50
      }
    }
  ]
}

3.2 - KafkaTopicMaxRetentionMs

This transformation can be used to enforce a maximum value for the retention.ms property of kafka topics.

Configuration

NameTypeDescriptionDefault
maxRetentionMsIntMinimum value of retention.ms to be used for Kafka Topics

Example

jikkou {
  transformations: [
    {
      type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMinRetentionMsTransformation
      priority = 100
      config = {
        maxRetentionMs = 2592000000 # 30 days
      }
    }
  ]
}

3.3 - KafkaTopicMinInSyncReplicas

This transformation can be used to enforce a minimum value for the min.insync.replicas property of kafka topics.

Configuration

NameTypeDescriptionDefault
minInSyncReplicasIntMinimum value of min.insync.replicas to be used for Kafka Topics

Example

jikkou {
  transformations: [
    {
      type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMinInSyncReplicasTransformation
      priority = 100
      config = {
        minInSyncReplicas = 2
      }
    }
  ]
}

3.4 - KafkaTopicMinReplicas

This transformation can be used to enforce a minimum value for the replication factor of kafka topics.

Configuration

NameTypeDescriptionDefault
minReplicationFactorIntMinimum value of replication factor to be used for Kafka Topics

Example

jikkou {
  transformations: [
    {
      type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMinReplicasTransformation
      priority = 100
      config = {
        minReplicationFactor = 3
      }
    }
  ]
}

3.5 - KafkaTopicMinRetentionMs

This transformation can be used to enforce a minimum value for the retention.ms property of kafka topics.

Configuration

NameTypeDescriptionDefault
minRetentionMsIntMinimum value of retention.ms to be used for Kafka Topics

Example

jikkou {
  transformations: [
    {
      type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMinRetentionMsTransformation
      priority = 100
      config = {
        minRetentionMs = 604800000 # 7 days
      }
    }
  ]
}

4 - Validations

Learn how to use the built-in validations provided by the Extension Provider for Apache Kafka.

Jikkou ships with the following built-in validations:

Topics

NoDuplicateTopicsAllowedValidation

(auto registered)

TopicConfigKeysValidation

(auto registered)

type = io.streamthoughts.jikkou.kafka.validation.TopicConfigKeysValidation

The TopicConfigKeysValidation allows checking if the specified topic configs are all valid.

TopicMinNumPartitions

type = io.streamthoughts.jikkou.kafka.validation.TopicMinNumPartitionsValidation

The TopicMinNumPartitions allows checking if the specified number of partitions for a topic is not less than the minimum required.

Configuration

NameTypeDescriptionDefault
topicMinNumPartitionsIntMinimum number of partitions allowed

TopicMaxNumPartitions

type = io.streamthoughts.jikkou.kafka.validation.TopicMaxNumPartitions

The TopicMaxNumPartitions allows checking if the number of partitions for a topic is not greater than the maximum configured.

Configuration

NameTypeDescriptionDefault
topicMaxNumPartitionsIntMaximum number of partitions allowed

TopicMinReplicationFactor

type = io.streamthoughts.jikkou.kafka.validation.TopicMinReplicationFactor

The TopicMinReplicationFactor allows checking if the specified replication factor for a topic is not less than the minimum required.

Configuration

NameTypeDescriptionDefault
topicMinReplicationFactorIntMinimum replication factor allowed

TopicMaxReplicationFactor

type = io.streamthoughts.jikkou.kafka.validation.TopicMaxReplicationFactor

The TopicMaxReplicationFactor allows checking if the specified replication factor for a topic is not greater than the maximum configured.

Configuration

NameTypeDescriptionDefault
topicMaxReplicationFactorIntMaximum replication factor allowed

TopicNamePrefix

type = io.streamthoughts.jikkou.kafka.validation.TopicNamePrefix

The TopicNamePrefix allows checking if the specified name for a topic starts with one of the configured suffixes.

Configuration

NameTypeDescriptionDefault
topicNamePrefixesListList of topic name prefixes allows

TopicNameSuffix

type = io.streamthoughts.jikkou.kafka.validation.TopicNameSuffix

The TopicNameSuffix allows checking if the specified name for a topic ends with one of the configured suffixes.

Configuration

NameTypeDescriptionDefault
topicNameSuffixesListList of topic name suffixes allows

ACLs

NoDuplicateUsersAllowedValidation

(auto registered)

NoDuplicateRolesAllowedValidation

(auto registered)

Quotas

QuotasEntityValidation

(auto registered)

5 - Annotations

Learn how to use the metadata annotations provided by the extensions for Apache Kafka.

Here, you will find information about the annotations provided the Apache Kafka extension for Jikkou.

List of built-in annotations

kafka.jikkou.io/cluster-id

Used by jikkou.

The annotation is automatically added by Jikkou to a describe object part of an Apache Kafka cluster.

6 - Actions

Learn how to use the actions provided by the Extension Provider for Apache Kafka.

Here, you will find the list of actions provided by the Extension Provider for Apache Kafka.

Apache Kafka Action

More information:

6.1 - KafkaConsumerGroupsResetOffsets

Learn how to use the KafkaConsumerGroupsResetOffsets action.

The KafkaConsumerGroupsResetOffsets action allows resetting offsets of consumer group. It supports one consumer group at the time, and group should be in EMPTY state.

Usage (CLI)

Usage:

Execute the action.

jikkou action KafkaConsumerGroupsResetOffsets execute [-hV] [--all] [--dry-run]
[--to-earliest] [--to-latest] [--group=PARAM] [--logger-level=<level>]
[-o=<format>] [--to-datetime=PARAM] [--to-offset=PARAM] [--excludes=PARAM]...
[--groups=PARAM]... [--includes=PARAM]... --topic=PARAM [--topic=PARAM]...

DESCRIPTION:

Reset offsets of consumer group. Supports multiple consumer groups, and groups
should be in EMPTY state.
You must choose one of the following reset specifications: to-datetime,
by-duration, to-earliest, to-latest, to-offset.


OPTIONS:

      --all                 Specifies to act on all consumer groups.
      --dry-run             Only show results without executing changes on
                              Consumer Groups.
      --excludes=PARAM      List of patterns to match the consumer groups that
                              must be excluded from the reset-offset action.
      --group=PARAM         The consumer group to act on.
      --groups=PARAM        The consumer groups to act on.
  -h, --help                Show this help message and exit.
      --includes=PARAM      List of patterns to match the consumer groups that
                              must be included in the reset-offset action.
      --logger-level=<level>
                            Specify the log level verbosity to be used while
                              running a command.
                            Valid level values are: TRACE, DEBUG, INFO, WARN,
                              ERROR.
                            For example, `--logger-level=INFO`
  -o, --output=<format>     Prints the output in the specified format. Allowed
                              values: JSON, YAML (default YAML).
      --to-datetime=PARAM   Reset offsets to offset from datetime. Format:
                              'YYYY-MM-DDTHH:mm:SS.sss'
      --to-earliest         Reset offsets to earliest offset.
      --to-latest           Reset offsets to latest offset.
      --to-offset=PARAM     Reset offsets to a specific offset.
      --topic=PARAM         The topic whose partitions must be included in the
                              reset-offset action.
  -V, --version             Print version information and exit.

Examples

Reset Single Consumer Group to the earliest offsets

jikkou action kafkaconsumergroupresetoffsets execute \
--group my-group \
--topic test \
--to-earliest

(output)

---
kind: "ApiActionResultSet"
apiVersion: "core.jikkou.io/v1"
metadata:
  labels: {}
  annotations:
    configs.jikkou.io/to-earliest: "true"
    configs.jikkou.io/group: "my-group"
    configs.jikkou.io/dry-run: "false"
    configs.jikkou.io/topic: 
        - "test"
results:
- status: "SUCCEEDED"
  errors: []
  data:
    apiVersion: "kafka.jikkou.io/v1beta1"
    kind: "KafkaConsumerGroup"
    metadata:
      name: "my-group"
      labels:
        kafka.jikkou.io/is-simple-consumer: false
      annotations: {}
    status:
      state: "EMPTY"
      members: []
      offsets:
      - topic: "test"
        partition: 1
        offset: 0
      - topic: "test"
        partition: 0
        offset: 0
      - topic: "test"
        partition: 2
        offset: 0
      - topic: "--test"
        partition: 0
        offset: 0
      coordinator:
        id: "101"
        host: "localhost"
        port: 9092

Reset All Consumer Groups to the earliest offsets

jikkou action kafkaconsumergroupresetoffsets execute \
--all \
--topic test \
--to-earliest

7 - Compatibility

Compatibility for Apache Kafka.

The Apache Kafka extension for Jikkou utilizes the Kafka Admin Client which is compatible with any Kafka infrastructure, such as :

  • Aiven
  • Apache Kafka
  • Confluent Cloud
  • MSK
  • Redpanda
  • etc.

In addition, Kafka Protocol has a “bidirectional” client compatibility policy. In other words, new clients can talk to old servers, and old clients can talk to new servers.