Pour illustrer le principe des applications stateful, nous allons déployer un cluster Kafka à l'aide d'un ReplicaSet.
Kafka est un broker de messages distribué qui s'appuie sur Zookeeper pour stocker sa configuration. Zookeeper lui-même gère des données persistantes et nécessite son propre StatefulSet. Pour notre exercice, Zookeeper est déjà déployer et nous allons donc simplement configurer Kafka pour qu'il se connecte à l'instance existante.
Récupérer le nom DNS de Zookeeper
Les StatefulSets ont la particularité de déployer des Pods dont les noms ne varient jamais, basés sur des index ordinaux. Notre première tâche va être de trouver le nom de domaine interne du cluster zookeeper, ce qui nous permettra de configurer Kafka. On attribut généralement un Service headless aux StafulSets (un Service n'ayant pas d'IP propre, et pas de load balancing).
Les Pods obtiennent un nom DNS de la forme suivante :
<nom du pod>.<nom du service headless>.<namespace>.svc.dtl-train.k8s.datailor.lo
- Pour commencer, afficher les Pods dans le namespace zk (zookeeper)
kubectl get pods -n <trigramme>
NAME READY STATUS RESTARTS
zk-zookeeper-0 1/1 Running 0
- Ensuite, afficher les services
kubectl get service -n <trigramme>
AME TYPE CLUSTER-IP EXTERNAL-IP PORT(S)
zk-zookeeper ClusterIP 10.43.162.49 <none> 2181/TCP
zk-zookeeper-headless ClusterIP None <none> 2181/TCP,3888/TCP,2888/TCP
Nous pouvons donc déduire que Zookeeper est joignable sur le nom DNS suivant à l'intérieur du cluster : zk-zookeeper-0.zk-zookeeper-headless.<trigramme>.svc.dtl-train.k8s.datailor.lo ou encore `zk-zookeeper-0.zk-zookeeper-headless.
Déployer Kafka
Service headless
Avant de nous intéresser au StatefulSet Kafka en lui-même, commencer par créer son Service headless.
Un Service headless est un Service comme les autres muni de l'attribut ClusterIP: none
- Déployer le Service suivant
apiVersion: v1
kind: Service
metadata:
name: kafka-headless
labels:
app: kafka
spec:
ports:
- port: 9093
name: server
clusterIP: None
selector:
app: kafka
StatefulSet
Nous allons construire le manifeste du StatefulSet Kafka brique par brique. Les StatefulSet ont une structure similaire aux ReplicaSets avec la possibilité supplémentaire de définir un template de PersistentVolumeClaim.
- Pour commencer, créer un fichier de manifeste contenant les informations de base du StatefulSet.
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
serviceName: kafka-headless # nom du Service headless
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
- Ajouter le template de PersistentVolumeClaim.
volumeClaimTemplates:
- metadata:
name: datadir
spec:
storageClassName: capacitive-fs
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 500Mi
- Nous pouvons maintenant ajouter le template de Pod.
template:
metadata:
labels:
app: kafka
spec:
securityContext:
runAsUser: 1000
fsGroup: 1000
containers:
- name: k8skafka
image: gcr.io/google_samples/k8skafka:v1
ports:
- containerPort: 9093
name: server
- Ainsi qu'un montage de volume en provenance du PVC.
volumeMounts:
- name: ... # nom du pvc
mountPath: /var/lib/kafka
- Et quelques variables d'environnement pour configurer la JVM.
env:
- name: KAFKA_HEAP_OPTS
value : "-Xmx512M -Xms512M"
- name: KAFKA_OPTS
value: "-Dlogging.level=INFO"
- Enfin, nous devons indiquer la commande de démarrage pour l'image
k8skafka. Les nombreux arguments servent à configurer Kafka. Indiquer le nom DNS et le port de connexion des 3 pods Zookeeper dans la variablezookeeper.connect, sous la forme<pod 1>:2181,<pod 2>:2181,<pod 3>:2181
command:
- sh
- -c
- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
--override zookeeper.connect=zk-zookeeper-0.zk-zookeeper-headless \
--override listeners=PLAINTEXT://:9093 \
--override log.dir=/var/lib/kafka \
--override auto.create.topics.enable=true \
--override auto.leader.rebalance.enable=true \
--override background.threads=10 \
--override compression.type=producer \
--override delete.topic.enable=false \
--override leader.imbalance.check.interval.seconds=300 \
--override leader.imbalance.per.broker.percentage=10 \
--override log.flush.interval.messages=9223372036854775807 \
--override log.flush.offset.checkpoint.interval.ms=60000 \
--override log.flush.scheduler.interval.ms=9223372036854775807 \
--override log.retention.bytes=-1 \
--override log.retention.hours=168 \
--override log.roll.hours=168 \
--override log.roll.jitter.hours=0 \
--override log.segment.bytes=1073741824 \
--override log.segment.delete.delay.ms=60000 \
--override message.max.bytes=1000012 \
--override min.insync.replicas=1 \
--override num.io.threads=8 \
--override num.network.threads=3 \
--override num.recovery.threads.per.data.dir=1 \
--override num.replica.fetchers=1 \
--override offset.metadata.max.bytes=4096 \
--override offsets.commit.required.acks=-1 \
--override offsets.commit.timeout.ms=5000 \
--override offsets.load.buffer.size=5242880 \
--override offsets.retention.check.interval.ms=600000 \
--override offsets.retention.minutes=1440 \
--override offsets.topic.compression.codec=0 \
--override offsets.topic.num.partitions=50 \
--override offsets.topic.replication.factor=3 \
--override offsets.topic.segment.bytes=104857600 \
--override queued.max.requests=500 \
--override quota.consumer.default=9223372036854775807 \
--override quota.producer.default=9223372036854775807 \
--override replica.fetch.min.bytes=1 \
--override replica.fetch.wait.max.ms=500 \
--override replica.high.watermark.checkpoint.interval.ms=5000 \
--override replica.lag.time.max.ms=10000 \
--override replica.socket.receive.buffer.bytes=65536 \
--override replica.socket.timeout.ms=30000 \
--override request.timeout.ms=30000 \
--override socket.receive.buffer.bytes=102400 \
--override socket.request.max.bytes=104857600 \
--override socket.send.buffer.bytes=102400 \
--override unclean.leader.election.enable=true \
--override zookeeper.session.timeout.ms=6000 \
--override zookeeper.set.acl=false \
--override broker.id.generation.enable=true \
--override connections.max.idle.ms=600000 \
--override controlled.shutdown.enable=true \
--override controlled.shutdown.max.retries=3 \
--override controlled.shutdown.retry.backoff.ms=5000 \
--override controller.socket.timeout.ms=30000 \
--override default.replication.factor=1 \
--override fetch.purgatory.purge.interval.requests=1000 \
--override group.max.session.timeout.ms=300000 \
--override group.min.session.timeout.ms=6000 \
--override inter.broker.protocol.version=0.10.2-IV0 \
--override log.cleaner.backoff.ms=15000 \
--override log.cleaner.dedupe.buffer.size=134217728 \
--override log.cleaner.delete.retention.ms=86400000 \
--override log.cleaner.enable=true \
--override log.cleaner.io.buffer.load.factor=0.9 \
--override log.cleaner.io.buffer.size=524288 \
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
--override log.cleaner.min.cleanable.ratio=0.5 \
--override log.cleaner.min.compaction.lag.ms=0 \
--override log.cleaner.threads=1 \
--override log.cleanup.policy=delete \
--override log.index.interval.bytes=4096 \
--override log.index.size.max.bytes=10485760 \
--override log.message.timestamp.difference.max.ms=9223372036854775807 \
--override log.message.timestamp.type=CreateTime \
--override log.preallocate=false \
--override log.retention.check.interval.ms=300000 \
--override max.connections.per.ip=2147483647 \
--override num.partitions=1 \
--override producer.purgatory.purge.interval.requests=1000 \
--override replica.fetch.backoff.ms=1000 \
--override replica.fetch.max.bytes=1048576 \
--override replica.fetch.response.max.bytes=10485760 \
--override reserved.broker.max.id=1000 "
- Appliquer le manifeste.
Une fois le StatefulSet créé, les Pods Kafka vont être créés et provisionnés les uns après les autres. À la fin de leur démarrage, nous devons voir 3 Pods avec des noms indexés.
kubectl get pods
NAME READY STATUS RESTARTS
kafka-0 1/1 Running 0
kafka-1 1/1 Running 0
kafka-2 1/1 Running 0
Tests
Pour vérifier le bon fonctionnement de Kafka, nous allons créer un topic et un producteur Kafka, puis nous consommerons les messages à l'aide d'un Job.
Commençons par mettre en place le Job. Nous allons réutiliser l'image k8skafka qui contient et script consommateur, et configurer le Job pour consommer 5 messages avant de s'arrêter. À noter, dans ce mode de fonctionnement, chaque message sera consommé par un nouveau Pod.
- Appliquer le manifeste suivant.
apiVersion: batch/v1
kind: Job
metadata:
name: kafka-consumer-job
spec:
completions: 5
parallelism: 1
template:
spec:
containers:
- name: kafka-consumer
image: gcr.io/google_samples/k8skafka:v1
command: ["bash"]
args: ["-c", "kafka-console-consumer.sh --topic test --bootstrap-server kafka-0.kafka-headless:9093 --max-messages 1"]
restartPolicy: Never
- Afficher les pods.
NAME READY STATUS RESTARTS
kafka-0 1/1 Running 0
kafka-1 1/1 Running 0
kafka-2 1/1 Running 0
kafka-consumer-job-nbx48 1/1 Running 0
Notre Job a bien créé un premier Pod, celui-ci est en attente.
- Se connecter à l'intérieur d'un des Pods Kafka.
kubectl exec -it kafka-0 /bin/bash
- Créer un producteur et envoyer quelques données.
> kafka-console-producer.sh --topic test --broker-list localhost:9093
bonjour
- Afficher les Pods
NAME READY STATUS RESTARTS
kafka-0 1/1 Running 0
kafka-1 1/1 Running 0
kafka-2 1/1 Running 0
kafka-consumer-job-2jrdx 1/1 Running 0
kafka-consumer-job-nbx48 0/1 Completed 0
Le Pod consommateur initialement créé a maintenant le status Completed
- Afficher ses logs
kubectl logs kafka-consumer-job-nbx48
bonjour
Processed a total of 1 messages
Notre cluster Kafka fonctionne !