Pour illustrer le principe des applications stateful, nous allons déployer un cluster Kafka à l'aide d'un ReplicaSet.

Kafka est un broker de messages distribué qui s'appuie sur Zookeeper pour stocker sa configuration. Zookeeper lui-même gère des données persistantes et nécessite son propre StatefulSet. Pour notre exercice, Zookeeper est déjà déployer et nous allons donc simplement configurer Kafka pour qu'il se connecte à l'instance existante.

Récupérer le nom DNS de Zookeeper

Les StatefulSets ont la particularité de déployer des Pods dont les noms ne varient jamais, basés sur des index ordinaux. Notre première tâche va être de trouver le nom de domaine interne du cluster zookeeper, ce qui nous permettra de configurer Kafka. On attribut généralement un Service headless aux StafulSets (un Service n'ayant pas d'IP propre, et pas de load balancing).

Les Pods obtiennent un nom DNS de la forme suivante : <nom du pod>.<nom du service headless>.<namespace>.svc.dtl-train.k8s.datailor.lo

  • Pour commencer, afficher les Pods dans le namespace zk (zookeeper)
kubectl get pods -n <trigramme>
NAME             READY     STATUS    RESTARTS
zk-zookeeper-0   1/1       Running   0        
  • Ensuite, afficher les services
kubectl get service -n <trigramme>
AME                    TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)                     
zk-zookeeper            ClusterIP   10.43.162.49   <none>        2181/TCP                   
zk-zookeeper-headless   ClusterIP   None           <none>        2181/TCP,3888/TCP,2888/TCP

Nous pouvons donc déduire que Zookeeper est joignable sur le nom DNS suivant à l'intérieur du cluster : zk-zookeeper-0.zk-zookeeper-headless.<trigramme>.svc.dtl-train.k8s.datailor.lo ou encore `zk-zookeeper-0.zk-zookeeper-headless.

Déployer Kafka

Service headless

Avant de nous intéresser au StatefulSet Kafka en lui-même, commencer par créer son Service headless.

Un Service headless est un Service comme les autres muni de l'attribut ClusterIP: none

  • Déployer le Service suivant
apiVersion: v1
kind: Service
metadata:
  name: kafka-headless
  labels:
    app: kafka
spec:
  ports:
  - port: 9093
    name: server
  clusterIP: None
  selector:
    app: kafka

StatefulSet

Nous allons construire le manifeste du StatefulSet Kafka brique par brique. Les StatefulSet ont une structure similaire aux ReplicaSets avec la possibilité supplémentaire de définir un template de PersistentVolumeClaim.

  • Pour commencer, créer un fichier de manifeste contenant les informations de base du StatefulSet.
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: kafka-headless # nom du Service headless
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
  • Ajouter le template de PersistentVolumeClaim.
  volumeClaimTemplates:
  - metadata:
      name: datadir
    spec:
      storageClassName: capacitive-fs
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 500Mi
  • Nous pouvons maintenant ajouter le template de Pod.
  template:
    metadata:
      labels:
        app: kafka
    spec:
      securityContext:
        runAsUser: 1000
        fsGroup: 1000
      containers:
      - name: k8skafka
        image: gcr.io/google_samples/k8skafka:v1
        ports:
        - containerPort: 9093
          name: server
  • Ainsi qu'un montage de volume en provenance du PVC.
        volumeMounts:
        - name: ... # nom du pvc
          mountPath: /var/lib/kafka
  • Et quelques variables d'environnement pour configurer la JVM.
        env:
        - name: KAFKA_HEAP_OPTS
          value : "-Xmx512M -Xms512M"
        - name: KAFKA_OPTS
          value: "-Dlogging.level=INFO"
  • Enfin, nous devons indiquer la commande de démarrage pour l'image k8skafka. Les nombreux arguments servent à configurer Kafka. Indiquer le nom DNS et le port de connexion des 3 pods Zookeeper dans la variable zookeeper.connect, sous la forme <pod 1>:2181,<pod 2>:2181,<pod 3>:2181
        command:
        - sh
        - -c
        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override zookeeper.connect=zk-zookeeper-0.zk-zookeeper-headless \
          --override listeners=PLAINTEXT://:9093 \
          --override log.dir=/var/lib/kafka \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=false \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override inter.broker.protocol.version=0.10.2-IV0 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000 "
  • Appliquer le manifeste.

Une fois le StatefulSet créé, les Pods Kafka vont être créés et provisionnés les uns après les autres. À la fin de leur démarrage, nous devons voir 3 Pods avec des noms indexés.

kubectl get pods
NAME                       READY     STATUS      RESTARTS
kafka-0                    1/1       Running     0       
kafka-1                    1/1       Running     0       
kafka-2                    1/1       Running     0       

Tests

Pour vérifier le bon fonctionnement de Kafka, nous allons créer un topic et un producteur Kafka, puis nous consommerons les messages à l'aide d'un Job.

Commençons par mettre en place le Job. Nous allons réutiliser l'image k8skafka qui contient et script consommateur, et configurer le Job pour consommer 5 messages avant de s'arrêter. À noter, dans ce mode de fonctionnement, chaque message sera consommé par un nouveau Pod.

  • Appliquer le manifeste suivant.
apiVersion: batch/v1
kind: Job
metadata:
  name: kafka-consumer-job
spec:
  completions: 5
  parallelism: 1
  template:
    spec:
      containers:
      - name: kafka-consumer
        image: gcr.io/google_samples/k8skafka:v1
        command: ["bash"]
        args: ["-c",  "kafka-console-consumer.sh --topic test --bootstrap-server kafka-0.kafka-headless:9093 --max-messages 1"]
      restartPolicy: Never
  • Afficher les pods.
NAME                       READY     STATUS      RESTARTS
kafka-0                    1/1       Running     0       
kafka-1                    1/1       Running     0       
kafka-2                    1/1       Running     0       
kafka-consumer-job-nbx48   1/1       Running     0    

Notre Job a bien créé un premier Pod, celui-ci est en attente.

  • Se connecter à l'intérieur d'un des Pods Kafka.
kubectl exec -it kafka-0 /bin/bash
  • Créer un producteur et envoyer quelques données.
> kafka-console-producer.sh --topic test --broker-list localhost:9093
bonjour
  • Afficher les Pods
NAME                       READY     STATUS      RESTARTS
kafka-0                    1/1       Running     0       
kafka-1                    1/1       Running     0       
kafka-2                    1/1       Running     0       
kafka-consumer-job-2jrdx   1/1       Running     0    
kafka-consumer-job-nbx48   0/1       Completed   0        

Le Pod consommateur initialement créé a maintenant le status Completed

  • Afficher ses logs
kubectl logs kafka-consumer-job-nbx48
bonjour
Processed a total of 1 messages

Notre cluster Kafka fonctionne !