Deploy Flink on Kubernetes
In this post we are going to take a look at deploying Apache Flink on Kubernetes, specifically on GKE.
Once we are done we are looking at a folders with the following content:
> tree .
.
├── prod-flink
│ ├── flink-configuration-configmap.yaml
│ ├── flink-storage.yaml
│ ├── jobmanager-service.yaml
│ ├── jobmanager-session-deployment.yaml
│ ├── namespace.yaml
│ └── taskmanager-session-deployment.yaml
│
└──prod-nfs
├── namespace.yaml
├── nfs-server-service.yaml
└── nfs-server.yaml
Flink supports different kind of deployment models, we are going to focus on Session Mode.
In the end we will have 3 components running:
- A jobmanager
- A taskmangager
- A Service exposing the UI
Jobmanager & taskmanager need to access the same volume so they can write check & savepoints and clean them up. Since GCEPersistentDisk does not support ReadWriteMany natively, we will add that via NFS.
Let’s create a new directory prod-nfs
:
mkdir prod-nfs
Then create the 3 files with the following content:
> cat namespace.yaml
---
apiVersion: v1
kind: Namespace
metadata:
name: prod-nfs
> cat nfs-server.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nfs-server
namespace: prod-nfs
spec:
replicas: 1
selector:
matchLabels:
role: nfs-server
template:
metadata:
labels:
role: nfs-server
spec:
containers:
- name: nfs-server
image: gcr.io/google_containers/volume-nfs:0.8
ports:
- name: nfs
containerPort: 2049
- name: mountd
containerPort: 20048
- name: rpcbind
containerPort: 111
securityContext:
privileged: true
volumeMounts:
- mountPath: /exports
name: mypvc
volumes:
- name: mypvc
gcePersistentDisk:
pdName: disk-1
fsType: ext4
> cat nfs-server-service.yaml
apiVersion: v1
kind: Service
metadata:
name: nfs-server
namespace: prod-nfs
spec:
ports:
- name: nfs
port: 2049
- name: mountd
port: 20048
- name: rpcbind
port: 111
selector:
role: nfs-server
Then deploy these files via kubectl apply -f
or if via fluxctl
if you use that.
Now, lets start deploying flink:
mkdir prod-flink
And then create the following files:
> cat namespace.yaml
---
apiVersion: v1
kind: Namespace
metadata:
name: prod-flink
> cat flink-storage.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: flink-storage
namespace: prod-flink
spec:
capacity:
storage: 100Gi
accessModes:
- ReadWriteMany
nfs:
server: nfs-server.prod-nfs.svc.cluster.local
path: "/"
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: flink-storage
namespace: prod-flink
spec:
accessModes:
- ReadWriteMany
storageClassName: ""
resources:
requests:
storage: 100Gi
> cat flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
namespace: prod-flink
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 15
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 4g
parallelism.default: 2
state.backend: rocksdb
state.savepoints.dir: file:///cache/savepoints
state.checkpoints.dir: file:///cache/checkpoints
state.backend.incremental: true
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
> cat jobmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: prod-flink
labels:
app: flink
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
securityContext:
fsGroup: 2000
# Weird permission requriements due to NFS being mounted as root
# only, so change the permission to the flink user group.
initContainers:
- name: nfs-fixer
image: alpine
securityContext:
runAsUser: 0
volumeMounts:
- name: cache
mountPath: /cache/
command:
- sh
- -c
- (chmod 0775 /cache/; chgrp 2000 /cache/)
containers:
- name: jobmanager
image: apache/flink:1.13.0-scala_2.11
args: ["jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: cache
mountPath: /cache/
securityContext:
runAsUser: 9999
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: cache
persistentVolumeClaim:
claimName: flink-storage
> cat taskmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: prod-flink
labels:
app: flink
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
securityContext:
fsGroup: 2000
# Weird permission requriements due to NFS being mounted as root
# only, so change the permission to the flink user group.
initContainers:
- name: nfs-fixer
image: alpine
securityContext:
runAsUser: 0
volumeMounts:
- name: cache
mountPath: /cache/
command:
- sh
- -c
- (chmod 0775 /cache/; chgrp 2000 /cache/)
containers:
- name: taskmanager
image: apache/flink:1.13.0-scala_2.11
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: cache
mountPath: /cache/
securityContext:
runAsUser: 9999
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: cache
persistentVolumeClaim:
claimName: flink-storage
cat jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
namespace: prod-flink
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
Now if you have deployed all those files as well you should be able to run:
> k get po -n prod-flink
NAME READY STATUS RESTARTS AGE
flink-jobmanager-679fd56977-mftpj 1/1 Running 0 10m
flink-taskmanager-7dc46c8475-ff6gs 1/1 Running 0 10m
flink-taskmanager-7dc46c8475-q5kbh 1/1 Running 0 10m
Then simply select the jobmanager and port forward it:
k -n prod-flink port-forward flink-jobmanager-679fd56977-mftpj 8081:8081
Navigate to localhost:8081 and you should be able to deploy your jars.
There is also a k8s operator maintained by Lyft at https://github.com/lyft/flinkk8soperator. I have not tested that out yet.