deploy flink

This commit is contained in:
Nicolas 2025-08-21 10:50:07 +08:00
parent f6f464dbae
commit c9f681e44b
7 changed files with 864 additions and 0 deletions

View File

@ -0,0 +1,235 @@
# Flink High Availability Cluster Deployment
## Overview
This project uses Apache Flink Kubernetes Operator to deploy a high availability Flink cluster with persistent storage and automatic failover capabilities.
## Component Architecture
- **JobManager**: 2 replicas with high availability configuration
- **TaskManager**: 3 replicas for distributed processing
- **High Availability**: Kubernetes-based HA with persistent storage
- **Checkpointing**: Persistent checkpoints and savepoints storage
## File Description
### 1. flink-operator-v2.yaml
Flink Kubernetes Operator deployment configuration:
- Operator deployment in `flink-system` namespace
- RBAC configuration for cluster-wide permissions
- Health checks and resource limits
- Enhanced CRD definitions with additional printer columns
### 2. flink-crd.yaml
Custom Resource Definitions for Flink:
- FlinkDeployment CRD
- FlinkSessionJob CRD
- Required for Flink Operator to function
### 3. ha-flink-cluster-v2.yaml
Production-ready HA Flink cluster configuration:
- 2 JobManager replicas with HA enabled
- 3 TaskManager replicas with anti-affinity rules
- Persistent storage for HA data, checkpoints, and savepoints
- Memory and CPU resource allocation
- Exponential delay restart strategy
- Proper volume mounts and storage configuration
### 4. simple-ha-flink-cluster.yaml
Simplified HA Flink cluster configuration:
- Uses ephemeral storage to avoid PVC binding issues
- Basic HA configuration for testing and development
- Minimal resource requirements
- Recommended for development and testing
### 5. flink-storage.yaml
Storage and RBAC configuration:
- PersistentVolumeClaims for HA data, checkpoints, and savepoints
- ServiceAccount and RBAC permissions for Flink cluster
- Azure Disk storage class configuration with correct access modes
### 6. flink-rbac.yaml
Enhanced RBAC configuration:
- Complete permissions for Flink HA functionality
- Both namespace-level and cluster-level permissions
- Includes watch permissions for HA operations
## Deployment Steps
### 1. Install Flink Operator
```bash
# Apply Flink Operator configuration
kubectl apply -f flink-operator-v2.yaml
# Verify operator installation
kubectl get pods -n flink-system
```
### 2. Create Storage Resources (Optional - for production)
```bash
# Apply storage configuration
kubectl apply -f flink-storage.yaml
# Verify PVC creation
kubectl get pvc -n freeleaps-data-platform
```
### 3. Deploy HA Flink Cluster
```bash
# Option A: Deploy with persistent storage (production)
kubectl apply -f ha-flink-cluster-v2.yaml
# Option B: Deploy with ephemeral storage (development/testing)
kubectl apply -f simple-ha-flink-cluster.yaml
# Check deployment status
kubectl get flinkdeployments -n freeleaps-data-platform
kubectl get pods -n freeleaps-data-platform -l app=flink
```
## High Availability Features
- **JobManager HA**: 2 JobManager replicas with Kubernetes-based leader election
- **Persistent State**: Checkpoints and savepoints stored on persistent volumes
- **Automatic Failover**: Exponential delay restart strategy with backoff
- **Pod Anti-affinity**: Ensures components are distributed across different nodes
- **Storage Persistence**: HA data, checkpoints, and savepoints persist across restarts
## Network Configuration
- **JobManager**: Port 8081 (Web UI), 6123 (RPC), 6124 (Blob Server)
- **TaskManager**: Port 6121 (Data), 6122 (RPC), 6126 (Metrics)
- **Service Type**: ClusterIP for internal communication
## Storage Configuration
- **HA Data**: 10Gi for high availability metadata
- **Checkpoints**: 20Gi for application checkpoints
- **Savepoints**: 20Gi for manual savepoints
- **Storage Class**: azure-disk-std-ssd-lrs
- **Access Mode**: ReadWriteOnce (Azure Disk limitation)
## Monitoring and Operations
- **Health Checks**: Built-in readiness and liveness probes
- **Web UI**: Accessible through JobManager service
- **Metrics**: Exposed on port 8080 for Prometheus collection
- **Logging**: Centralized logging through Kubernetes
## Configuration Details
### High Availability Settings
- **Type**: kubernetes (native Kubernetes HA)
- **Storage**: Persistent volume for HA metadata
- **Cluster ID**: ha-flink-cluster-v2
### Checkpointing Configuration
- **Interval**: 60 seconds
- **Timeout**: 10 minutes
- **Min Pause**: 5 seconds
- **Backend**: Filesystem with persistent storage
### Resource Allocation
- **JobManager**: 0.5 CPU, 1024MB memory (HA), 1.0 CPU, 1024MB memory (Simple)
- **TaskManager**: 0.5 CPU, 2048MB memory (HA), 2.0 CPU, 2048MB memory (Simple)
## Troubleshooting
### Common Issues and Solutions
#### 1. PVC Binding Issues
```bash
# Check PVC status
kubectl get pvc -n freeleaps-data-platform
# PVC stuck in Pending state - usually due to:
# - Insufficient storage quota
# - Wrong access mode (ReadWriteMany not supported by Azure Disk)
# - Storage class not available
# Solution: Use ReadWriteOnce access mode or ephemeral storage
```
#### 2. Pod CrashLoopBackOff
```bash
# Check pod status
kubectl get pods -n freeleaps-data-platform -l app=flink
# Check pod logs
kubectl logs <pod-name> -n freeleaps-data-platform
# Check pod events
kubectl describe pod <pod-name> -n freeleaps-data-platform
```
#### 3. ServiceAccount Issues
```bash
# Verify ServiceAccount exists
kubectl get serviceaccount -n freeleaps-data-platform
# Check RBAC permissions
kubectl get rolebinding -n freeleaps-data-platform
```
#### 4. Storage Path Issues
```bash
# Ensure storage paths match volume mounts
# For persistent storage: /opt/flink/ha-data, /opt/flink/checkpoints
# For ephemeral storage: /tmp/flink/ha-data, /tmp/flink/checkpoints
```
### Diagnostic Commands
```bash
# Check Flink Operator logs
kubectl logs -n flink-system -l app.kubernetes.io/name=flink-kubernetes-operator
# Check Flink cluster status
kubectl describe flinkdeployment <cluster-name> -n freeleaps-data-platform
# Check pod events
kubectl get events -n freeleaps-data-platform --sort-by='.lastTimestamp'
# Check storage status
kubectl get pvc -n freeleaps-data-platform
kubectl describe pvc <pvc-name> -n freeleaps-data-platform
# Check operator status
kubectl get pods -n flink-system
kubectl logs -n flink-system deployment/flink-kubernetes-operator
```
## Important Notes
1. **Storage Limitations**: Azure Disk storage class only supports ReadWriteOnce access mode
2. **ServiceAccount**: Ensure the correct ServiceAccount is specified in cluster configuration
3. **Resource Requirements**: Verify cluster has enough CPU/memory for all replicas
4. **Network Policies**: May need adjustment for inter-pod communication
5. **Ephemeral vs Persistent**: Use ephemeral storage for development/testing, persistent for production
## Quick Start (Recommended for Testing)
```bash
# 1. Deploy operator
kubectl apply -f flink-operator-v2.yaml
# 2. Wait for operator to be ready
kubectl wait --for=condition=ready pod -l app.kubernetes.io/name=flink-kubernetes-operator -n flink-system
# 3. Deploy simple HA cluster (no persistent storage)
kubectl apply -f simple-ha-flink-cluster.yaml
# 4. Monitor deployment
kubectl get flinkdeployments -n freeleaps-data-platform
kubectl get pods -n freeleaps-data-platform -l app=flink
```
## Production Deployment
```bash
# 1. Deploy operator
kubectl apply -f flink-operator-v2.yaml
# 2. Deploy storage resources
kubectl apply -f flink-storage.yaml
# 3. Deploy production HA cluster
kubectl apply -f ha-flink-cluster-v2.yaml
# 4. Monitor deployment
kubectl get flinkdeployments -n freeleaps-data-platform
kubectl get pods -n freeleaps-data-platform -l app=flink
```

View File

@ -0,0 +1,43 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: flinkdeployments.flink.apache.org
spec:
group: flink.apache.org
versions:
- name: v1beta1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
x-kubernetes-preserve-unknown-fields: true
subresources:
status: {}
scope: Namespaced
names:
plural: flinkdeployments
singular: flinkdeployment
kind: FlinkDeployment
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: flinksessionjobs.flink.apache.org
spec:
group: flink.apache.org
versions:
- name: v1beta1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
x-kubernetes-preserve-unknown-fields: true
subresources:
status: {}
scope: Namespaced
names:
plural: flinksessionjobs
singular: flinksessionjob
kind: FlinkSessionJob

View File

@ -0,0 +1,298 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: flinkdeployments.flink.apache.org
spec:
group: flink.apache.org
names:
kind: FlinkDeployment
listKind: FlinkDeploymentList
plural: flinkdeployments
singular: flinkdeployment
shortNames:
- fd
- flinkdeploy
scope: Namespaced
versions:
- name: v1beta1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
x-kubernetes-preserve-unknown-fields: true
subresources:
status: {}
additionalPrinterColumns:
- name: Job Status
type: string
jsonPath: .status.jobStatus
- name: Flink Version
type: string
jsonPath: .spec.flinkVersion
- name: Age
type: date
jsonPath: .metadata.creationTimestamp
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: flinksessionjobs.flink.apache.org
spec:
group: flink.apache.org
names:
kind: FlinkSessionJob
listKind: FlinkSessionJobList
plural: flinksessionjobs
singular: flinksessionjob
shortNames:
- fsj
- flinksessionjob
scope: Namespaced
versions:
- name: v1beta1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
x-kubernetes-preserve-unknown-fields: true
subresources:
status: {}
additionalPrinterColumns:
- name: Job Status
type: string
jsonPath: .status.jobStatus
- name: Flink Deployment
type: string
jsonPath: .spec.deploymentName
- name: Age
type: date
jsonPath: .metadata.creationTimestamp
---
apiVersion: v1
kind: Namespace
metadata:
name: flink-system
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: flink-kubernetes-operator
namespace: flink-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: flink-kubernetes-operator
rules:
- apiGroups:
- ""
resources:
- configmaps
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
- get
- patch
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- namespaces
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- persistentvolumeclaims
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- secrets
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- serviceaccounts
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- services
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- apps
resources:
- deployments
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- batch
resources:
- jobs
- cronjobs
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- flink.apache.org
resources:
- flinkdeployments
- flinkdeployments/status
- flinksessionjobs
- flinksessionjobs/status
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: flink-kubernetes-operator
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: flink-kubernetes-operator
subjects:
- kind: ServiceAccount
name: flink-kubernetes-operator
namespace: flink-system
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-kubernetes-operator
namespace: flink-system
labels:
app: flink-kubernetes-operator
spec:
replicas: 1
selector:
matchLabels:
app: flink-kubernetes-operator
template:
metadata:
labels:
app: flink-kubernetes-operator
spec:
serviceAccountName: flink-kubernetes-operator
containers:
- name: flink-kubernetes-operator
image: apache/flink-kubernetes-operator:1.8.0
command: ["/docker-entrypoint.sh"]
args: ["operator"]
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: OPERATOR_NAME
value: flink-kubernetes-operator
- name: LEADER_ELECTION_ID
value: flink-kubernetes-operator
- name: LEADER_ELECTION_NAMESPACE
value: flink-system
ports:
- containerPort: 8085
name: metrics
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
livenessProbe:
httpGet:
path: /healthz
port: 8085
initialDelaySeconds: 15
periodSeconds: 20
readinessProbe:
httpGet:
path: /readyz
port: 8085

View File

@ -0,0 +1,66 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: flink
namespace: freeleaps-data-platform
labels:
app: flink
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: flink-role
namespace: freeleaps-data-platform
rules:
- apiGroups: [""]
resources: ["configmaps", "secrets", "services", "pods", "events", "endpoints"]
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
- apiGroups: ["apps"]
resources: ["deployments", "statefulsets"]
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
- apiGroups: ["batch"]
resources: ["jobs", "cronjobs"]
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: flink-role-binding
namespace: freeleaps-data-platform
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: flink-role
subjects:
- kind: ServiceAccount
name: flink
namespace: freeleaps-data-platform
---
# Additional permissions for HA functionality
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: flink-ha-cluster-role
rules:
- apiGroups: [""]
resources: ["configmaps", "secrets", "services", "pods", "events", "endpoints"]
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
- apiGroups: ["apps"]
resources: ["deployments", "statefulsets"]
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
- apiGroups: ["batch"]
resources: ["jobs", "cronjobs"]
verbs: ["get", "list", "create", "update", "patch", "delete", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: flink-ha-cluster-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: flink-ha-cluster-role
subjects:
- kind: ServiceAccount
name: flink
namespace: freeleaps-data-platform

View File

@ -0,0 +1,82 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: ha-flink-ha-data
namespace: freeleaps-data-platform
labels:
app: flink
component: ha-storage
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storageClassName: azure-disk-std-ssd-lrs
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: ha-flink-checkpoints
namespace: freeleaps-data-platform
labels:
app: flink
component: checkpoint-storage
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 20Gi
storageClassName: azure-disk-std-ssd-lrs
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: ha-flink-savepoints
namespace: freeleaps-data-platform
labels:
app: flink
component: savepoint-storage
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 20Gi
storageClassName: azure-disk-std-ssd-lrs
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: flink
namespace: freeleaps-data-platform
labels:
app: flink
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: flink-role
namespace: freeleaps-data-platform
rules:
- apiGroups: [""]
resources: ["configmaps", "secrets", "services", "pods"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: flink-role-binding
namespace: freeleaps-data-platform
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: flink-role
subjects:
- kind: ServiceAccount
name: flink
namespace: freeleaps-data-platform

View File

@ -0,0 +1,94 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: ha-flink-cluster-v2
namespace: freeleaps-data-platform
labels:
app: flink
component: streaming
cluster-type: ha
spec:
flinkVersion: v1_19
image: flink:1.19.0
flinkConfiguration:
# High Availability Configuration
high-availability.type: kubernetes
high-availability.storageDir: file:///opt/flink/ha-data
# Checkpointing Configuration
state.backend.type: filesystem
state.checkpoints.dir: file:///opt/flink/checkpoints
state.savepoints.dir: file:///opt/flink/savepoints
execution.checkpointing.interval: 60s
execution.checkpointing.min-pause: 5s
execution.checkpointing.timeout: 10min
# JobManager Configuration
jobmanager.rpc.address: ha-flink-cluster-v2-jobmanager
jobmanager.rpc.port: "6123"
jobmanager.bind-host: "0.0.0.0"
# REST Configuration
rest.address: ha-flink-cluster-v2-jobmanager
rest.port: "8081"
rest.bind-address: "0.0.0.0"
# Blob Server Configuration
blob.server.port: "6124"
# TaskManager Configuration
taskmanager.numberOfTaskSlots: "2"
# Memory Configuration
taskmanager.memory.process.size: 2048m
jobmanager.memory.process.size: 1024m
# Restart Strategy
restart-strategy.type: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 10s
restart-strategy.exponential-delay.max-backoff: 2min
restart-strategy.exponential-delay.backoff-multiplier: "2.0"
restart-strategy.exponential-delay.reset-backoff-threshold: 10min
restart-strategy.exponential-delay.jitter-factor: "0.1"
serviceAccount: flink
jobManager:
replicas: 2
resource:
memory: "1024m"
cpu: 0.5
podTemplate:
spec:
containers:
- name: flink-main-container
volumeMounts:
- name: ha-data
mountPath: /opt/flink/ha-data
- name: checkpoints
mountPath: /opt/flink/checkpoints
- name: savepoints
mountPath: /opt/flink/savepoints
volumes:
- name: ha-data
persistentVolumeClaim:
claimName: ha-flink-ha-data
- name: checkpoints
persistentVolumeClaim:
claimName: ha-flink-checkpoints
- name: savepoints
persistentVolumeClaim:
claimName: ha-flink-savepoints
taskManager:
replicas: 3
resource:
memory: "2048m"
cpu: 0.5
podTemplate:
spec:
containers:
- name: flink-main-container
volumeMounts:
- name: checkpoints
mountPath: /opt/flink/checkpoints
- name: savepoints
mountPath: /opt/flink/savepoints
volumes:
- name: checkpoints
persistentVolumeClaim:
claimName: ha-flink-checkpoints
- name: savepoints
persistentVolumeClaim:
claimName: ha-flink-savepoints

View File

@ -0,0 +1,46 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: simple-ha-flink-cluster
namespace: freeleaps-data-platform
labels:
app: flink
component: streaming
cluster-type: simple-ha
spec:
flinkVersion: v1_18
image: flink:1.18
flinkConfiguration:
# Basic Configuration
taskmanager.numberOfTaskSlots: "2"
# High Availability Configuration (using ephemeral storage)
high-availability.type: kubernetes
high-availability.storageDir: file:///tmp/flink/ha-data
# Checkpointing Configuration (using ephemeral storage)
state.backend.type: filesystem
state.checkpoints.dir: file:///tmp/flink/checkpoints
state.savepoints.dir: file:///tmp/flink/savepoints
execution.checkpointing.interval: 60s
execution.checkpointing.min-pause: 5s
execution.checkpointing.timeout: 10min
# Memory Configuration
taskmanager.memory.process.size: 2048m
jobmanager.memory.process.size: 1024m
# Restart Strategy
restart-strategy.type: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 10s
restart-strategy.exponential-delay.max-backoff: 2min
restart-strategy.exponential-delay.backoff-multiplier: "2.0"
restart-strategy.exponential-delay.reset-backoff-threshold: 10min
restart-strategy.exponential-delay.jitter-factor: "0.1"
serviceAccount: flink
jobManager:
replicas: 2
resource:
memory: "1024m"
cpu: 1.0
taskManager:
replicas: 3
resource:
memory: "2048m"
cpu: 2.0