Apache Kafka는 Apache 재단의 오픈소스 Event Streaming Platform 입니다.
Kafka를 사용하면 기존의 end-to-end로 묶여있던 복잡한 아키텍쳐를 단순한 이벤트 버스 중심의 아키텍쳐로 탈바꿈할 수 있다는 장점이 있습니다.
그렇다면 어떻게 Kafka가 이런 일을 할 수 있을까요?
Apache Kafka의 공식 홈페이지에 따르면 Kafka는 Event Streaming을 위한 Platform이라고 말합니다. 그리고 Event Streaming이란 인간으로 따지면 바로 중추신경계에 해당하는 디지털 요소라고 정의하고 있습니다.
왜냐하면 Event streaming Platform이 DB, IoT, App 등 다양한 곳에서 오는 Event들을 real-time으로 모아서 그 Event가 필요한 어딘가로 안전하게 보내줄 수 있기 때문입니다.
즉 Apache Kafka는 Event streaming Platform으로써 여러 곳에서 들어오고 나가는 Event들을 한 곳에서 처리할 수 있도록 묶어주는 Event Bus의 역할을 할 수 있기 때문에 아키텍쳐를 간결하게 만들 수 있습니다.
이번 포스팅에서는 이 Apache Kafka를 Kubernetes 환경에서 구성해보고 어떻게 사용하는지 검증해보겠습니다.
1. Apache kafka 구성 요소
1. Producer : Kafka에서 Producer는 Kafka Producer API와 Kafka로 메세지를 보내는 클라이언트를 뜻합니다. Producer는 애플리케이션이 보내는 이벤트 메세지를 받아서 직렬화한 후, Kafka가 원하는 대로 메세지를 변경하는 역할을 합니다.
2. Cousumer : Kafka에서 Consumer는 Kafka가 저장한 메세지를 꺼내가서 소비하는 클라이언트를 뜻합니다. 여러 개의 Consumer들을 묶어서 하나의 Consumer Group을 형성할 수 있습니다.
3. Broker : Producer와 Consumer 사이에서 메세지를 저장 및 전달하는 Kafka 서버를 Broker라고 합니다. Kafka를 이루는 중추이며 우리가 일반적으로 "Kafka"라고 부르는 시스템은 이 Broker를 지칭합니다.
4. Topic : Kafka는 특정한 Producer가 생성한 메세지를 원하는 Consumer가 가져갈 수 있도록 메세지를 Topic이라는 주제로 구분합니다. Topic은 Kafka 내에서 Unique하며, 우리는 Topic을 통해 꺼내쓰고자 하는 메세지가 어떤 메세지인지 알 수 있습니다.
5. Partition : 각각의 Topic은 여러 개의 Partition으로 나뉘어져 있습니다. Partition은 병렬 처리와 고성능을 얻기 위해 메세지를 분리해 쌓으며 Partition마다 고유한 Offset을 가집니다.
6. Offset : 각 Partition에서 메세지가 저장된 순서를 말합니다. 순서는 Incremental하게 증가하며 이 순서를 통해 데이터가 어떤 순번으로 들어왔는지 알 수 있습니다.
7. Apache Zookeeper : Apache Kafka는 가용성이 매우 중요한 플랫폼이지만 Kafka만으로는 가용성을 보장할 수 없습니다. 그래서 Kafka의 메타데이터를 관리하고 브로커의 헬스 체크를 담당하는 Coordinator인 Zookeeper를 같이 사용합니다. Kubernetes 환경에서 Kafka를 사용할때도 Zookeeper를 같이 구성해서 사용합니다.
2. Apache Kafka의 동작
Apache Kafka의 자세한 동작은 위 다이어그램과 같습니다.
1. Producer는 Message를 Topic으로 발신(혹은 Publish)합니다.
2. Topic은 1개 이상의 복제된 Partition으로 이루어져 있어 Message를 제공합니다.
3. Consumer는 특정 Consumer Group에 속하며 Topic을 구독(Subscribe)해 Partition에서 Message를 Pull합니다.
4. Producer와 Consumer를 잇는 Topic들은 kafka 서버인 Broker 위에서 동작합니다.
5. Broker들의 집합은 Cluster의 일부분으로 동작합니다.
3. Apache Kafka의 특징
동작 과정을 살펴보며 알 수 있었던 중요한 점은,
Kafka가 Topic을 중심으로 Publisher와 Subscriber가 나뉘어져 있는 Pub/Sub 모델을 차용하고 있다는 것,
Consumer가 Message를 가져오는 Pull-based 모델이라는 것입니다.
3-1. Pub/Sub 모델
Pub/Sub 모델은 Message를 제공하는 Publisher와 소비하는 Subscriber가 서로의 존재를 모른 채 비동기적으로 Message를 소비하는 패턴을 말합니다.
Pub/Sub 모델을 사용하면 Publisher와 Subscriber 두 관계자는 서로의 존재를 신경쓸 필요가 없기 때문에 Loosley-coupled되며, 둘 중 하나의 이상상태가 나머지 하나에게 영향을 거의 미치지 않게 됩니다.
이를 통해 이 모델을 채용한 아키텍쳐는 확장성과 내구성을 얻을 수 있게 됩니다.
3-2. Pull-based 모델
Pull-based 모델은 Consumer가 직접 Broker에게서 필요한 만큼의 Message를 요청해 소비하는 패턴을 사용합니다.
이를 통해 Broker의 연산 부담을 줄여줄 수 있으며, Consumer가 자신이 낼 수 있는 최선의 처리능력 만큼의 Message를 소비할 수 있기 때문에 효율적입니다.
또한 Consumer를 특정 offset만큼의 Message를 요청하거나 Consumer를 Batch consumer 형태로도 운영할 수 있는 등의 다양한 기능을 가지고 있습니다.
이러한 Kafka의 모델은 Smart consumer, Dumb broker라고도 하여 Broker보다는 Consumer의 연산 능력에 더 중점을 둔 모델이라고 할 수 있습니다.
3-3. 그 외의 특징
3-3-1. Ordering
Apache Kafka의 Message는 순서 보장이 가능합니다. 단, Partition을 1개만 구독할때만 가능합니다. Partition을 2개 이상 구독할 경우 Message의 순서는 보장되지 않습니다.
3-3-2. Message Retention
Apache Kafka의 Message 보관 기한은 지정한 Policy를 따라갑니다. 이는 Ack를 받으면 Message가 즉시 사라지는 다른 메세지 큐 시스템과는 다른 차별점인데요. Kafka는 Message를 지속적으로 디스크에 저장하기 때문에 가능합니다. 이 덕분에 Kafka에서는 이미 소비한 Message를 다시 볼 수 있는 Message replay 기능도 사용할 수 있습니다.
4. Apache Kafka 구성
이제 본격적으로 Kubernetes Cluster 위에 Apache kafka를 구성해보겠습니다.
Apache kafka는 helm 패키지 매니저를 이용해서 설치하며, Strimzi operator를 사용해서 운영하는 방향으로 구성해보겠습니다.
Strimzi라는 Operator 패턴으로 Kafka를 배포하는 이유는 Operator 패턴이 Kafka Cluster를 배포하기 편리하게 구성되어 있다는 점이 있고, Kafka의 가용성 확보에 필요한 Zookeeper 또한 Operator가 관리해준다는 편리함도 있으며, Kafka Bridge, Exporter 등의 다른 구성요소도 쉽게 추가할 수 있다는 장점이 있기 때문입니다.
준비된 Kubernetes Cluster에서 아래 명령어를 입력해 strimzi-kafka-operator를 설치합니다.
1
2
|
helm repo add strimzi https://strimzi.io/charts/
helm install kafka-operator strimzi/strimzi-kafka-operator --namespace kafka --create-namespace
|
cs |
설치를 성공적으로 완료하고 나면, 아래와 같이 2개의 Statefulsets, 2개의 Deployments로 이루어진 오브젝트들이 생성된 것을 볼 수 있습니다.
Statefulset은 각각 Kafka Cluster와 Zookeeper Cluster를, Deployment는 각각 Entity Operator와 Cluster Operator를 의미합니다.
Kafka Cluster와 Zookeeper Cluster는 Kafka Broker와 Kafka의 가용성을 보장하기 위한 Zookeeper Cluster의 역할을 합니다.
Operator들은 아래와 같은 역할을 합니다.
- Cluster Operator : Kafka cluster, Connect, MirrorMaker, Entity Operator 등의 Kafka를 구성하는 전체 요소들을 관리하고 배포합니다.
- Entity Operator : Topic Operator와 User Operator로 나뉘어집니다.
- Topic Operator : Kafka Topic을 관리합니다.
- User Operator : Kafka User를 관리합니다.
그 외에 Strimzi Operator가 지원하는 CRD 목록은 다음과 같습니다. 아래와 같이 kafkas라는 이름으로 kafka cluster를 배포 및 관리할 수 있으며, kafka bridge, connect, topic 등의 요소들도 CR로 관리할 수 있음을 확인할 수 있습니다.
Kafka broker의 설정을 확인해보겠습니다. 아래 명령어를 입력해 Kafka broker의 설정을 확인합니다. 아래와 같이 id 0,1,2로 명명된 my-cluster-kafka-0,1,2 Broker들의 정보를 볼 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
kubectl exec my-cluster-kafka-0 -it -- bin/kafka-broker-api-versions.sh --bootstrap-server my-cluster-kafka-bootstrap:9092
my-cluster-kafka-2.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2 rack: null) -> (
Produce(0): 0 to 9 [usable: 9],
Fetch(1): 0 to 13 [usable: 13],
ListOffsets(2): 0 to 7 [usable: 7],
Metadata(3): 0 to 12 [usable: 12],
LeaderAndIsr(4): 0 to 6 [usable: 6],
StopReplica(5): 0 to 3 [usable: 3],
UpdateMetadata(6): 0 to 7 [usable: 7],
ControlledShutdown(7): 0 to 3 [usable: 3],
OffsetCommit(8): 0 to 8 [usable: 8],
OffsetFetch(9): 0 to 8 [usable: 8],
FindCoordinator(10): 0 to 4 [usable: 4],
JoinGroup(11): 0 to 9 [usable: 9],
Heartbeat(12): 0 to 4 [usable: 4],
LeaveGroup(13): 0 to 5 [usable: 5],
SyncGroup(14): 0 to 5 [usable: 5],
DescribeGroups(15): 0 to 5 [usable: 5],
ListGroups(16): 0 to 4 [usable: 4],
SaslHandshake(17): 0 to 1 [usable: 1],
ApiVersions(18): 0 to 3 [usable: 3],
CreateTopics(19): 0 to 7 [usable: 7],
DeleteTopics(20): 0 to 6 [usable: 6],
DeleteRecords(21): 0 to 2 [usable: 2],
InitProducerId(22): 0 to 4 [usable: 4],
OffsetForLeaderEpoch(23): 0 to 4 [usable: 4],
AddPartitionsToTxn(24): 0 to 3 [usable: 3],
AddOffsetsToTxn(25): 0 to 3 [usable: 3],
EndTxn(26): 0 to 3 [usable: 3],
WriteTxnMarkers(27): 0 to 1 [usable: 1],
TxnOffsetCommit(28): 0 to 3 [usable: 3],
DescribeAcls(29): 0 to 2 [usable: 2],
CreateAcls(30): 0 to 2 [usable: 2],
DeleteAcls(31): 0 to 2 [usable: 2],
DescribeConfigs(32): 0 to 4 [usable: 4],
AlterConfigs(33): 0 to 2 [usable: 2],
AlterReplicaLogDirs(34): 0 to 2 [usable: 2],
DescribeLogDirs(35): 0 to 3 [usable: 3],
SaslAuthenticate(36): 0 to 2 [usable: 2],
CreatePartitions(37): 0 to 3 [usable: 3],
CreateDelegationToken(38): 0 to 2 [usable: 2],
RenewDelegationToken(39): 0 to 2 [usable: 2],
ExpireDelegationToken(40): 0 to 2 [usable: 2],
DescribeDelegationToken(41): 0 to 2 [usable: 2],
DeleteGroups(42): 0 to 2 [usable: 2],
ElectLeaders(43): 0 to 2 [usable: 2],
IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
AlterPartitionReassignments(45): 0 [usable: 0],
ListPartitionReassignments(46): 0 [usable: 0],
OffsetDelete(47): 0 [usable: 0],
DescribeClientQuotas(48): 0 to 1 [usable: 1],
AlterClientQuotas(49): 0 to 1 [usable: 1],
DescribeUserScramCredentials(50): 0 [usable: 0],
AlterUserScramCredentials(51): 0 [usable: 0],
AlterPartition(56): 0 to 1 [usable: 1],
UpdateFeatures(57): 0 [usable: 0],
DescribeCluster(60): 0 [usable: 0],
DescribeProducers(61): 0 [usable: 0],
DescribeTransactions(65): 0 [usable: 0],
ListTransactions(66): 0 [usable: 0],
AllocateProducerIds(67): 0 [usable: 0]
)
my-cluster-kafka-1.my-cluster-kafka-brokers.kafka.svc:9092 (id: 1 rack: null) -> (
Produce(0): 0 to 9 [usable: 9],
Fetch(1): 0 to 13 [usable: 13],
ListOffsets(2): 0 to 7 [usable: 7],
Metadata(3): 0 to 12 [usable: 12],
LeaderAndIsr(4): 0 to 6 [usable: 6],
StopReplica(5): 0 to 3 [usable: 3],
UpdateMetadata(6): 0 to 7 [usable: 7],
ControlledShutdown(7): 0 to 3 [usable: 3],
OffsetCommit(8): 0 to 8 [usable: 8],
OffsetFetch(9): 0 to 8 [usable: 8],
FindCoordinator(10): 0 to 4 [usable: 4],
JoinGroup(11): 0 to 9 [usable: 9],
Heartbeat(12): 0 to 4 [usable: 4],
LeaveGroup(13): 0 to 5 [usable: 5],
SyncGroup(14): 0 to 5 [usable: 5],
DescribeGroups(15): 0 to 5 [usable: 5],
ListGroups(16): 0 to 4 [usable: 4],
SaslHandshake(17): 0 to 1 [usable: 1],
ApiVersions(18): 0 to 3 [usable: 3],
CreateTopics(19): 0 to 7 [usable: 7],
DeleteTopics(20): 0 to 6 [usable: 6],
DeleteRecords(21): 0 to 2 [usable: 2],
InitProducerId(22): 0 to 4 [usable: 4],
OffsetForLeaderEpoch(23): 0 to 4 [usable: 4],
AddPartitionsToTxn(24): 0 to 3 [usable: 3],
AddOffsetsToTxn(25): 0 to 3 [usable: 3],
EndTxn(26): 0 to 3 [usable: 3],
WriteTxnMarkers(27): 0 to 1 [usable: 1],
TxnOffsetCommit(28): 0 to 3 [usable: 3],
DescribeAcls(29): 0 to 2 [usable: 2],
CreateAcls(30): 0 to 2 [usable: 2],
DeleteAcls(31): 0 to 2 [usable: 2],
DescribeConfigs(32): 0 to 4 [usable: 4],
AlterConfigs(33): 0 to 2 [usable: 2],
AlterReplicaLogDirs(34): 0 to 2 [usable: 2],
DescribeLogDirs(35): 0 to 3 [usable: 3],
SaslAuthenticate(36): 0 to 2 [usable: 2],
CreatePartitions(37): 0 to 3 [usable: 3],
CreateDelegationToken(38): 0 to 2 [usable: 2],
RenewDelegationToken(39): 0 to 2 [usable: 2],
ExpireDelegationToken(40): 0 to 2 [usable: 2],
DescribeDelegationToken(41): 0 to 2 [usable: 2],
DeleteGroups(42): 0 to 2 [usable: 2],
ElectLeaders(43): 0 to 2 [usable: 2],
IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
AlterPartitionReassignments(45): 0 [usable: 0],
ListPartitionReassignments(46): 0 [usable: 0],
OffsetDelete(47): 0 [usable: 0],
DescribeClientQuotas(48): 0 to 1 [usable: 1],
AlterClientQuotas(49): 0 to 1 [usable: 1],
DescribeUserScramCredentials(50): 0 [usable: 0],
AlterUserScramCredentials(51): 0 [usable: 0],
AlterPartition(56): 0 to 1 [usable: 1],
UpdateFeatures(57): 0 [usable: 0],
DescribeCluster(60): 0 [usable: 0],
DescribeProducers(61): 0 [usable: 0],
DescribeTransactions(65): 0 [usable: 0],
ListTransactions(66): 0 [usable: 0],
AllocateProducerIds(67): 0 [usable: 0]
)
my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9092 (id: 0 rack: null) -> (
Produce(0): 0 to 9 [usable: 9],
Fetch(1): 0 to 13 [usable: 13],
ListOffsets(2): 0 to 7 [usable: 7],
Metadata(3): 0 to 12 [usable: 12],
LeaderAndIsr(4): 0 to 6 [usable: 6],
StopReplica(5): 0 to 3 [usable: 3],
UpdateMetadata(6): 0 to 7 [usable: 7],
ControlledShutdown(7): 0 to 3 [usable: 3],
OffsetCommit(8): 0 to 8 [usable: 8],
OffsetFetch(9): 0 to 8 [usable: 8],
FindCoordinator(10): 0 to 4 [usable: 4],
JoinGroup(11): 0 to 9 [usable: 9],
Heartbeat(12): 0 to 4 [usable: 4],
LeaveGroup(13): 0 to 5 [usable: 5],
SyncGroup(14): 0 to 5 [usable: 5],
DescribeGroups(15): 0 to 5 [usable: 5],
ListGroups(16): 0 to 4 [usable: 4],
SaslHandshake(17): 0 to 1 [usable: 1],
ApiVersions(18): 0 to 3 [usable: 3],
CreateTopics(19): 0 to 7 [usable: 7],
DeleteTopics(20): 0 to 6 [usable: 6],
DeleteRecords(21): 0 to 2 [usable: 2],
InitProducerId(22): 0 to 4 [usable: 4],
OffsetForLeaderEpoch(23): 0 to 4 [usable: 4],
AddPartitionsToTxn(24): 0 to 3 [usable: 3],
AddOffsetsToTxn(25): 0 to 3 [usable: 3],
EndTxn(26): 0 to 3 [usable: 3],
WriteTxnMarkers(27): 0 to 1 [usable: 1],
TxnOffsetCommit(28): 0 to 3 [usable: 3],
DescribeAcls(29): 0 to 2 [usable: 2],
CreateAcls(30): 0 to 2 [usable: 2],
DeleteAcls(31): 0 to 2 [usable: 2],
DescribeConfigs(32): 0 to 4 [usable: 4],
AlterConfigs(33): 0 to 2 [usable: 2],
AlterReplicaLogDirs(34): 0 to 2 [usable: 2],
DescribeLogDirs(35): 0 to 3 [usable: 3],
SaslAuthenticate(36): 0 to 2 [usable: 2],
CreatePartitions(37): 0 to 3 [usable: 3],
CreateDelegationToken(38): 0 to 2 [usable: 2],
RenewDelegationToken(39): 0 to 2 [usable: 2],
ExpireDelegationToken(40): 0 to 2 [usable: 2],
DescribeDelegationToken(41): 0 to 2 [usable: 2],
DeleteGroups(42): 0 to 2 [usable: 2],
ElectLeaders(43): 0 to 2 [usable: 2],
IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
AlterPartitionReassignments(45): 0 [usable: 0],
ListPartitionReassignments(46): 0 [usable: 0],
OffsetDelete(47): 0 [usable: 0],
DescribeClientQuotas(48): 0 to 1 [usable: 1],
AlterClientQuotas(49): 0 to 1 [usable: 1],
DescribeUserScramCredentials(50): 0 [usable: 0],
AlterUserScramCredentials(51): 0 [usable: 0],
AlterPartition(56): 0 to 1 [usable: 1],
UpdateFeatures(57): 0 [usable: 0],
DescribeCluster(60): 0 [usable: 0],
DescribeProducers(61): 0 [usable: 0],
DescribeTransactions(65): 0 [usable: 0],
ListTransactions(66): 0 [usable: 0],
AllocateProducerIds(67): 0 [usable: 0]
)
|
cs |
모든 kafka pod에는 Kafka를 운영하는데 유용하게 사용할 수 있는 스크립트 파일들이 내장되어 있습니다.
그 중 하나가 "kafka-broker-api-versions.sh" 파일로 --bootstrap-server 플래그에 kafka broker의 endpoint를 넣으면 해당 broker의 정보를 확인할 수 있습니다.
다음으로 Kafka topic의 목록을 확인하기 위해 다음 명령어를 입력합니다.
1
|
kubectl get kafkatopics -n kafka
|
cs |
명령어를 입력하면 아래와 같이 Kubernetes CR로 관리되고 있는 kafka topic들을 확인할 수 있습니다.
이제 Kubernetes Cluster에서 Apache Kafka를 사용할 준비가 끝났습니다.
5. Apache Kafka 사용해보기
이제 Kafka에서 직접 Message를 생성하고 소비해보는 것으로 Kafka의 동작을 직접 보도록 하겠습니다.
5-1. Kafka Topic 생성
우선 아래 yaml 파일로 KafkaTopic CR을 이용해 Kafka topic을 생성하겠습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
|
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: mytopic
labels:
strimzi.io/cluster: "my-cluster"
spec:
partitions: 1
replicas: 3
config:
retention.ms: 7200000
segment.bytes: 1073741824
min.insync.replicas: 2
|
cs |
yaml 파일 적용 후 kubectl get kafkatopic 명령어로 topic이 정상적으로 생성되었는지 확인합니다.
5-2. Producer & Consumer 생성 및 Message 주고 받기
이제 Message를 주고받기 위해 Producer와 Consumer를 생성하겠습니다. 원래라면 각 역할을 하는 어플리케이션이 있어야 겠지만 편리하게도 스크립트 파일 중 Producer와 Consumer를 흉내낼 수 있는 스크립트가 존재합니다. 이 스크립트 파일을 사용해서 Message를 주고받아 보겠습니다.
먼저 메세지를 생성할 Producer를 생성해보겠습니다. 새 터미널을 연 뒤 아래 명령어를 입력해 Producer 역할의 터미널을 생성합니다.
1
|
kubectl exec my-cluster-kafka-0 -it -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic mytopic
|
cs |
정상적으로 명령어가 실행되면 아래와 같이 Message를 입력할 수 있는 프롬프트가 나타납니다. 여기서 원하는 Message를 입력해 생성할 수 있습니다.
다음으로 Message를 소비할 Consumer를 생성합니다. 새 터미널을 연 뒤 아래 명령어를 입력해 Consumer 역할의 터미널을 생성합니다.
1
|
kubectl exec my-cluster-kafka-0 -it -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic mytopic --from-beginning
|
cs |
정상적으로 명령어가 실행되면 아래와 같이 Producer에서 생성한 Message가 출력되는 것을 확인할 수 있습니다.
같은 Producer 터미널에서 계속 Message를 생성하면 Consumer에서 해당 Message가 계속 출력되는 것을 볼 수 있습니다.
이렇게 Producer가 생성한 Message는 특정 Topic을 중심으로 저장되고, 이를 같은 Topic으로 구독한 Consumer가 해당 Message를 소비하는 것을 확인할 수 있었습니다.
5-3. Consumer Group 생성
Consumer를 단일 객체로만 운영하는 것이 아니라, Consumer group이라는 집단 객체으로 운영할 수도 있습니다.
Consumer group을 사용하게 되면 여러 Consumer들을 하나의 객체처럼 운영할 수 있기 때문에 가용성을 더 확보할 수 있다는 장점이 있습니다.
Consumer group을 사용하는 예제는 아래 명령어를 사용합니다. 기존 명령어에 --group 플래그를 추가합니다.
1
|
kubectl exec my-cluster-kafka-0 -it -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic mytopic --from-beginning --group mygroup
|
cs |
5-4. LAG 확인
Apache Kafka를 사용하기 위해 확인해야 할 가장 중요한 수치는 바로 LAG입니다.
LAG이란 Producer가 생성한 Message의 offset과 Consumer가 소비한 Message의 offset의 차이를 말하는데요.
이 LAG이 늘어난다는 것은 Producer가 Message를 발행하는 속도보다 Consumer가 Message를 가져가는 속도가 느리다는 뜻이므로 LAG은 Consumer의 비정상 상태나, 성능 저하를 유추할 수 있는 중요한 지표입니다.
내장된 스크립트 파일을 통해 LAG을 직접 확인해보겠습니다. 아래 명령어를 실행합니다.
1
|
while true; do kubectl exec my-cluster-kafka-0 -it -- bin/kafka-consumer-groups.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --group mygroup --describe;echo "-----"; sleep 0.5; done
|
cs |
명령어를 정상적으로 실행 후에, Producer 터미널에서 Message를 빠르게 생성하면 LAG 수치가 올라가는 것을 확인할 수 있습니다.
지금은 LAG의 개념을 확인하기 위해 스크립트 파일로 수치를 확인했지만, Production 환경에서 LAG은 Kafka의 상태를 확인할 수 있는 중요한 지표이므로, 별도의 모니터링 솔루션을 통해 확인하는 것을 권장합니다.
5-5. Log Dump 확인
Kafka의 Message는 디스크에 로그 형태로 저장됩니다. 때문에 실제로 로그 덤프를 통해 Message를 확인할 수 있는데요.
Kafka pod에 저장된 Message 파일의 Log dump를 통해 실제 발송한 Message 내역을 확인해보겠습니다.
Message log는 Kafka pod 볼륨 내의 /var/lib/kafka/data-0/kafka-log0 디렉토리에 존재합니다.
하지만 디렉토리 내의 로그 파일을 열어보면 알 수 없는 문자만 출력되는데요. 이는 로그 파일이 일반적인 문자 형식이 아닌 다른 형태 쓰여있기 때문입니다.
로그 파일을 읽기 위해 kafka pod에 존재하는 스크립트 파일인 kafka-run-class.sh를 사용합니다. 스크립트의 메소드 중 하나인 kafka.tools.DumpLogSegments를 사용하면 로그를 정상적으로 읽을 수 있습니다. 아래와 같이 명령어를 입력합니다.
1
2
|
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /var/lib/kafka/data-0/kafka-log0/mytopic-0/00000000000000000000.log --deep-iteration --print-data-log
|
cs |
명령어를 실행하면 아래와 같이 Log dump의 내용이 출력되는 것을 확인할 수 있습니다.
Log dump에는 offset, Createtime부터 Message의 실제 내용을 의미하는 payload까지 다양한 정보를 담고 있습니다.
이렇게 Log dump를 사용해서 Kafka가 저장한 Message를 확인했는데요.
이 것이 가능한 이유는 Apache Kafka가 Message를 메모리에 저장해 ack된 것은 바로 폐기하는 Message queue가 아니라, Message를 디스크에 로그로 저장해 사용하는 분산 로그 저장소의 특성을 가지기 때문입니다.
이러한 특성 덕분에 Kafka는 ack된 Message를 다시 볼 수 있는 Message replay같은 기능을 제공하거나 Message의 가용성을 보장하는데 탁월하다는 특징이 있습니다.
6. Apache Kafka 더 잘 사용해 보기
지금까지 Topic을 생성하고 Message를 주고 받아보는 간단한 동작으로 Kafka를 사용해봤습니다.
지금까지가 Kafka의 기본 동작을 이해하기 위한 단계였다면, 이제부터는 Apache kafka를 실제 Production 환경에서 운영하기 위해 사용할 수 있는 패턴들을 소개해 드리고자 합니다.
6-1. Kafka scaling with KEDA
앞서 LAG이라는 개념을 말씀드리며 Consumer의 Message 소비 속도가 느릴 경우 증가하는 중요한 지표라고 말씀드렸는데요.
그렇다면 이 LAG이 증가하면 Consumer를 Scaling해야 정상적인 상태를 유지할 수 있을 것입니다.
Scaling에는 Vertical Scaling과 Horizontal Scaling이 있는데요. 두 전략 중 Kafka는 Consumer를 Group 형태로 운영할 수 있기 때문에 Horizontal Scaling을 통해 확장하는 것이 더 효율적인 전략으로 보입니다.
하지만 현재 Kubernetes는 CPU와 RAM 등 리소스 기반의 Scaling만 가능한데요.
그래서 이 같은 한계를 극복하기 위해 Event 기반의 Scaling을 가능하게 해주는 KEDA(Kubernets Event Driven Autoscaling)를 이용해서 Consumer의 Lag 기반 Horizontal Autoscaling을 구성해보겠습니다.
먼저 Helm 패키지 매니저를 사용해 KEDA를 설치하겠습니다. 아래 명령어를 실행합니다.
1
2
3
|
kubectl create namespace keda
helm repo add kedacore https://kedacore.github.io/charts
helm install keda kedacore/keda --version 2.7.2 --namespace keda
|
cs |
설치가 정상적으로 완료되었으면 아래와 같이 2개의 Deployment가 생성된 것을 확인할 수 있습니다.
다음으로 Consumer 역할을 할 Deployment 오브젝트를 생성합니다. 아래 yaml 파일 기반으로 작성한 매니페스트를 적용합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
apiVersion: apps/v1
kind: Deployment
metadata:
name: consumer
spec:
replicas: 1
selector:
matchLabels:
app: consumer-service
template:
metadata:
labels:
app: consumer-service
spec:
containers:
- name: simple-consumer
image: pravinmagdum/simple-consumer
ports:
- containerPort: 8090
args: ["java", "-jar", "target/simple-consumer-1.0.0-jar-with-dependencies.jar", "my-cluster-kafka-bootstrap:9092", "mygroup", "mytopic"]
|
cs |
적용을 완료하면 아래와 같이 "consumer" Deployment가 생성된 것을 확인할 수 있습니다. Lag을 기반으로 Autoscaling을 할 때 이 consumer Deployment가 Scaling되도록 설정할 것입니다.
다음으로 아래 매니페스트로 KEDA의 scaledobject CR을 생성합니다. 주의해야 할 값은 다음과 같습니다.
- spec.scaleTargetRef.name : Event에 따라 Scaling할 대상 Deployment 이름을 넣습니다. 여기서는 Consumer Deployment 이름 값을 넣어야 합니다.
- spec.triggers.type : kafka의 LAG을 기반으로 Scaling할 것이기 때문에 type 값에 "kafka"를 넣습니다.
- spec.triggers.metadata.consumerGroup, topic : LAG을 추적할 ConsumerGroup과 Topic 값을 넣습니다.
- spec.triggers.metadata.lagThreshold: Scaling을 Trigger할 LAG 임계값을 넣습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: kafka-scaledobject
namespace: kafka
spec:
scaleTargetRef:
name: consumer
pollingInterval: 30
triggers:
- type: kafka
metadata:
bootstrapServers: my-cluster-kafka-bootstrap.kafka.svc:9092
consumerGroup: mygroup # Make sure that this consumer group name is the same one as the one that is consuming topics
topic: mytopic
# Optional
lagThreshold: "1"
offsetResetPolicy: latest
|
cs |
위 매니페스트를 적용하고 나면 consumer Deployment에 속한 pod가 Terminating되는 것을 확인할 수 있습니다.
이는 KEDA의 Scale to 0 기능에 의한 것으로 KEDA의 ScaledObject는 Event가 없을 시 Target 오브젝트의 개수를 0개로 줄여버립니다.
아래와 같이 KEDA는 1 -> n , n -> 1 Scaling 시에는 기존의 HPA를 사용하지만 0 -> 1 , 1 -> 0 와 같은 Scale to 0 기능을 구현하기 위해 KEDA의 Scaler와 같은 구성 요소들을 활용합니다.
KEDA의 이 같은 기능을 통해서 불필요한 리소스 사용을 방지하고 Event driven 아키텍쳐를 효율적으로 구성할 수 있다는 이점이 있습니다.
이후 Producer를 통해 LAG을 증가시키면 다시 Consumer Pod가 생성되는 것을 확인할 수 있습니다.
이렇게 KEDA를 통해 Kafka를 LAG 수치에 따라 Autoscaling이 가능한 Event-driven 패턴으로 구성할 수 있습니다.
기존의 HPA를 사용하면 CPU, Memory와 같은 간접적인 메트릭으로만 Consumer를 Scaling할 수 밖에 없었겠지만 KEDA를 사용하면 LAG이라는 특수한 Event를 기반으로 Scaling이 가능해 더 효율적인 리소스 사용이 가능해집니다.
6-2. Kafka monitoring with Prometheus
어떤 플랫폼이던 마찬가지겠지만, Kafka라는 이벤트 스트리밍 플랫폼 또한 적절한 가시성을 확보하기 위해 Observability를 위한 도구를 사용하는 것이 좋습니다.
특히 Kafka는 Lag 수치, Message 소비량, Consumer 인스턴스 개수 등 이벤트 스트리밍에 직접적으로 영향을 끼치는 메트릭이 많기 때문에 Monitoring 도구를 적절히 사용하는 것이 운영에 큰 영향을 미칠 것입니다.
Apache Kafka Operator인 Kafka strimzi operator는 Prometheus 연동을 위한 옵션을 제공하고 있습니다. 그래서 이번 포스팅에서는 대표적인 Monitoring 도구인 Prometheus를 통해 Apache Kafka의 메트릭 가시성을 확보하는 방법에 대해 알아보도록 하겠습니다.
우선 Prometheus에 Kafka 메트릭을 전달할 Kafka Exporter를 생성하도록 "kafka" 오브젝트를 아래와 같이 수정합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
entityOperator:
topicOperator: {}
userOperator: {}
kafka:
config:
default.replication.factor: 3
min.insync.replicas: 2
offsets.topic.replication.factor: 3
transaction.state.log.min.isr: 2
transaction.state.log.replication.factor: 3
listeners:
- name: plain
port: 9092
tls: false
type: internal
- name: tls
port: 9093
tls: false
type: internal
- name: external
port: 9094
tls: false
type: nodeport
metricsConfig: ## 추가할 부분
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
key: kafka-metrics-config.yml
name: kafka-metrics
replicas: 3
storage:
type: jbod
volumes:
- deleteClaim: true
id: 0
size: 10Gi
type: persistent-claim
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- kafka
topologyKey: kubernetes.io/hostname
kafkaExporter: ## 추가할 부분
groupRegex: .*
topicRegex: .*
zookeeper:
metricsConfig: ## 추가할 부분
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
key: zookeeper-metrics-config.yml
name: kafka-metrics
replicas: 3
storage:
deleteClaim: true
size: 10Gi
type: persistent-claim
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- zookeeper
topologyKey: kubernetes.io/hostname
|
cs |
주의해야 할 부분은 "spec.kafka"와 "spec.zookeeper" 하위에 metricsConfig 어트리뷰트를 넣는 것입니다.
strimzi 공식 문서(https://strimzi.io/docs/operators/latest/deploying.html#proc-metrics-kafka-deploy-options-str)에 따르면 이 속성을 추가하는 것으로 Kafka 및 zookeeper 클러스터에 Prometheus 메트릭을 활성화하고 구성할 수 있습니다.
해당 어트리뷰트는 kafka-metrics라는 이름의 configmap을 참조하도록 되어있습니다. 참조할 "kafka-metrics" configmap을 아래 매니페스트를 기반으로 생성하도록 합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
kind: ConfigMap
apiVersion: v1
metadata:
name: kafka-metrics
labels:
app: strimzi
data:
kafka-metrics-config.yml: |
# See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics
lowercaseOutputName: true
rules:
# Special cases and very specific rules
- pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
topic: "$4"
partition: "$5"
- pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
name: kafka_server_$1_$2
type: GAUGE
labels:
clientId: "$3"
broker: "$4:$5"
- pattern: kafka.server<type=(.+), cipher=(.+), protocol=(.+), listener=(.+), networkProcessor=(.+)><>connections
name: kafka_server_$1_connections_tls_info
type: GAUGE
labels:
cipher: "$2"
protocol: "$3"
listener: "$4"
networkProcessor: "$5"
- pattern: kafka.server<type=(.+), clientSoftwareName=(.+), clientSoftwareVersion=(.+), listener=(.+), networkProcessor=(.+)><>connections
name: kafka_server_$1_connections_software
type: GAUGE
labels:
clientSoftwareName: "$2"
clientSoftwareVersion: "$3"
listener: "$4"
networkProcessor: "$5"
- pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+):"
name: kafka_server_$1_$4
type: GAUGE
labels:
listener: "$2"
networkProcessor: "$3"
- pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+)
name: kafka_server_$1_$4
type: GAUGE
labels:
listener: "$2"
networkProcessor: "$3"
# Some percent metrics use MeanRate attribute
# Ex) kafka.server<type=(KafkaRequestHandlerPool), name=(RequestHandlerAvgIdlePercent)><>MeanRate
- pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>MeanRate
name: kafka_$1_$2_$3_percent
type: GAUGE
# Generic gauges for percents
- pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>Value
name: kafka_$1_$2_$3_percent
type: GAUGE
- pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*, (.+)=(.+)><>Value
name: kafka_$1_$2_$3_percent
type: GAUGE
labels:
"$4": "$5"
# Generic per-second counters with 0-2 key/value pairs
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
name: kafka_$1_$2_$3_total
type: COUNTER
# Generic gauges with 0-2 key/value pairs
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
name: kafka_$1_$2_$3
type: GAUGE
# Emulate Prometheus 'Summary' metrics for the exported 'Histogram's.
# Note that these are missing the '_sum' metric!
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
labels:
"$4": "$5"
"$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
"$6": "$7"
quantile: "0.$8"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
labels:
"$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
"$4": "$5"
quantile: "0.$6"
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
name: kafka_$1_$2_$3_count
type: COUNTER
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
name: kafka_$1_$2_$3
type: GAUGE
labels:
quantile: "0.$4"
zookeeper-metrics-config.yml: |
# See https://github.com/prometheus/jmx_exporter for more info about JMX Prometheus Exporter metrics
lowercaseOutputName: true
rules:
# replicated Zookeeper
- pattern: "org.apache.ZooKeeperService<name0=ReplicatedServer_id(\\d+)><>(\\w+)"
name: "zookeeper_$2"
type: GAUGE
- pattern: "org.apache.ZooKeeperService<name0=ReplicatedServer_id(\\d+), name1=replica.(\\d+)><>(\\w+)"
name: "zookeeper_$3"
type: GAUGE
labels:
replicaId: "$2"
- pattern: "org.apache.ZooKeeperService<name0=ReplicatedServer_id(\\d+), name1=replica.(\\d+), name2=(\\w+)><>(Packets\\w+)"
name: "zookeeper_$4"
type: COUNTER
labels:
replicaId: "$2"
memberType: "$3"
- pattern: "org.apache.ZooKeeperService<name0=ReplicatedServer_id(\\d+), name1=replica.(\\d+), name2=(\\w+)><>(\\w+)"
name: "zookeeper_$4"
type: GAUGE
labels:
replicaId: "$2"
memberType: "$3"
- pattern: "org.apache.ZooKeeperService<name0=ReplicatedServer_id(\\d+), name1=replica.(\\d+), name2=(\\w+), name3=(\\w+)><>(\\w+)"
name: "zookeeper_$4_$5"
type: GAUGE
labels:
replicaId: "$2"
memberType: "$3"
|
cs |
다음으로 helm 패키지매니저를 통해 Prometheus Operator를 설치합니다. Prometheus 또한 Operator 패턴으로 사용할 수 있는데, Strimzi 공식 문서에서 Core OS의 Prometheus Operator를 사용한 예제로 설명하고 있으니 같은 Operator의 Prometheus를 설치합니다.
1
2
3
|
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo update
helm install prometheus prometheus-community/kube-prometheus-stack -n monitoring --create-namespace
|
cs |
정상적으로 배포 후 Prometheus Operator 및 Prometheus 인스턴스, Grafana, Node exporter 등 다양한 요소들이 설치되는 것을 확인할 수 있습니다.
다음으로 Kafka 모니터링에 필요한 prometheus, prometheusRule,podMonitor 등의 Prometheus Operator 요소들을 배포하도록 하겠습니다.
배포할 요소들은 https://github.com/strimzi/strimzi-kafka-operator/tree/main/examples/metrics/prometheus-install 링크에 첨부된 yaml파일들을 배포하면 됩니다.
helm으로 Prometheus Operator 설치 시 "prometheus" 오브젝트가 하나 배포되어 있을 수도 있으니 이미 설치된 오브젝트는 삭제하고 진행해야 합니다.
그리고 아래와 같이 namespace를 수정하는 작업이 필요합니다. 적절한 namespace를 적도록 주의합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
name: cluster-operator-metrics
labels:
app: strimzi
spec:
selector:
matchLabels:
strimzi.io/kind: cluster-operator
namespaceSelector:
matchNames:
- kafka ## PodMonitor의 matchNames 값에는 Kafka cluster가 속한 ns를
podMetricsEndpoints:
- path: /metrics
port: http
|
cs |
PodMonitor 오브젝트의 spec.selector.namespaceSelector.matchNames값은 Kafka cluster를 배포한 namespace로 수정해야 합니다.
PodMonitor는 Prometheus가 메트릭을 수집할 대상을 지정하는 Prometheus Operator CR입니다. PodMonitor를 이용하면 kafka 클러스터의 pod를 일일이 Prometheus 수집 대상에 추가할 필요 없이 Selector를 통해 자동으로 해당 Pod를 추가할 수 있기 때문에 운영이 편리해집니다.
위 매니페스트의 경우에는 pod의 label과 pod가 속한 namespace를 정의해 이에 일치하는 Kafka pod를 자동으로 Prometheus 메트릭 수집 대상에 추가합니다. 이는 Prometheus UI의 Tartget 페이지에서도 확인 가능합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: prometheus-server
labels:
app: strimzi
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: prometheus-server
subjects:
- kind: ServiceAccount
name: prometheus-server
namespace: monitoring
---
apiVersion: monitoring.coreos.com/v1
kind: Prometheus
metadata:
name: prometheus
labels:
app: strimzi
spec:
replicas: 1
serviceAccountName: prometheus-server
podMonitorSelector:
matchLabels:
app: strimzi
serviceMonitorSelector: {}
resources:
requests:
memory: 400Mi
enableAdminAPI: false
ruleSelector:
matchLabels:
role: alert-rules
app: strimzi
alerting:
alertmanagers:
- namespace: monitoring
name: alertmanager
port: alertmanager
additionalScrapeConfigs:
name: additional-scrape-configs
key: prometheus-additional.yaml
|
cs |
각각 ClusterRoleBinding과 Prometheus 오브젝트의 subjects.namespace 값과 spec.alerting.namespace 값은 Prometheus를 배포한 namespace로 수정해야 합니다.
Prometheus 오브젝트는 Prometheus Operator에서 Prometheus 인스턴스 자체를 정의하는 CR입니다.
아래의 additionalSrapeConfigs는 Podmonitor와 같이 Node단의 메트릭을 바로 수집하는 리소스가 없기 때문에 이를 해결하기 위해 추가적인 yaml 파일을 이용해서 ScrapConfig를 설정해야 합니다.
아래 매니페스트를 "monitoring" namespace에 적용해 additionalScrapeConfigs에 필요한 Secret 오브젝트를 생성합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
apiVersion: v1
kind: Secret
metadata:
name: additional-scrape-configs
type: Opaque
stringData:
prometheus-additional.yaml: |
- job_name: kubernetes-cadvisor
honor_labels: true
scrape_interval: 10s
scrape_timeout: 10s
metrics_path: /metrics/cadvisor
scheme: https
kubernetes_sd_configs:
- role: node
namespaces:
names: []
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
tls_config:
ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
insecure_skip_verify: true
relabel_configs:
- separator: ;
regex: __meta_kubernetes_node_label_(.+)
replacement: $1
action: labelmap
- separator: ;
regex: (.*)
target_label: __address__
replacement: kubernetes.default.svc:443
action: replace
- source_labels: [__meta_kubernetes_node_name]
separator: ;
regex: (.+)
target_label: __metrics_path__
replacement: /api/v1/nodes/${1}/proxy/metrics/cadvisor
action: replace
- source_labels: [__meta_kubernetes_node_name]
separator: ;
regex: (.*)
target_label: node_name
replacement: $1
action: replace
- source_labels: [__meta_kubernetes_node_address_InternalIP]
separator: ;
regex: (.*)
target_label: node_ip
replacement: $1
action: replace
metric_relabel_configs:
- source_labels: [container, __name__]
separator: ;
regex: POD;container_(network).*
target_label: container
replacement: $1
action: replace
- source_labels: [container]
separator: ;
regex: POD
replacement: $1
action: drop
- source_labels: [container]
separator: ;
regex: ^$
replacement: $1
action: drop
- source_labels: [__name__]
separator: ;
regex: container_(network_tcp_usage_total|tasks_state|memory_failures_total|network_udp_usage_total)
replacement: $1
action: drop
- job_name: kubernetes-nodes-kubelet
scrape_interval: 10s
scrape_timeout: 10s
scheme: https
kubernetes_sd_configs:
- role: node
namespaces:
names: []
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
tls_config:
ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
insecure_skip_verify: true
relabel_configs:
- action: labelmap
regex: __meta_kubernetes_node_label_(.+)
- target_label: __address__
replacement: kubernetes.default.svc:443
- source_labels: [__meta_kubernetes_node_name]
regex: (.+)
target_label: __metrics_path__
replacement: /api/v1/nodes/${1}/proxy/metrics
|
cs |
이제 모든 오브젝트의 배포가 완료되었습니다. Grafana service 주소를 통해 Grafana UI로 접속합니다.
첫 접속시 ID,PW는 보통 admin, prom-operator지만, 아래와 같이 secret 값을 base64로 decoding하는 것으로 값을 확인할 수도 있습니다.
Grafana UI에 접속했다면 우측의 Create -> Import 버튼을 클릭해 Dashboard를 import할 수 있는 페이지로 진입합니다.
Grafana는 생성된 Custom dashboard를 ID 번호로 import할 수 있는 기능이 있습니다. Kafka exporter Dashboard의 ID인 "7589"를 적고 Load 버튼을 클릭합니다.
Dashboard를 import하면 kafka exporter를 통해 수집한 메트릭을 이용해서 만들어진 Dashboard를 볼 수 있습니다.
실제로 Message를 발행하거나 Lag를 늘려보는 것으로 Dashboard의 그래프가 달라지는 것을 실시간으로 확인할 수 있습니다.
만약 값이 제대로 안보인다면 Prometheus에서 Status -> Targets에 진입한 뒤 "kafka-resources-metrics" podmonitor가 정상적으로 Up 되어있는지 확인해보시길 바랍니다. 안되어있다면 보통 Prometheus Operator의 Log에서 원인을 찾을 수 있습니다.
이렇게 Apache Kafka와 Prometheus를 연동해서 실시간으로 메트릭을 확인할 수 있는 대쉬보드를 구성해봤습니다.
Kafka는 스트리밍 이벤트를 관장하는 플랫폼인 만큼 모니터링 도구를 통해 실시간으로 상태를 확인하는 것이 매우 중요합니다.
그래서 프로덕션 환경에서는 꼭 적절한 도구를 통해 가시성을 확보하는 것을 권장드립니다.
번외. Apache Kafka vs RabbitMQ
지금까지 Apache Kafka를 소개해드리고 사용하는 방법까지 같이 알아봤는데요.
처음에 Apache Kafka는 Event Streaming Platform이라고 명명했습니다. 하지만 Message Broker라고 불리우는 RabbitMQ와는 어떤 점이 다를까요?
두 도구의 공통점과 차이점을 알아보도록 하겠습니다.
- 공통점
두 플랫폼 모두 Loosely-coupled 아키텍쳐를 구현하기 위한 비동기 메세지 큐 시스템입니다.
비동기(asynchronous)라는 것은 Message를 받는 자와 보내는 자가 둘 다 활성화되어 있을 필요는 없다는 말입니다. 둘 중 하나가 다운되어 있는 상태이더라도 메세지 큐는 Message를 보관하고 있다가 다시 연결되는 순간 Message를 전송합니다.
또한 Pub/Sub 모델을 사용해 Message를 발신하는 Publisher와 Message를 소비하는 Subscriber의 역할을 나누어 놓았기 때문에 N:N 관계에서의 복잡한 아키텍쳐에서 효율적으로 Message를 주고받을 수 있습니다.
이 같은 특징들 덕분에 비동기 메세지 큐 시스템은 애플리케이션의 신뢰도를 높여줄 수 있다는 장점을 가지고 있습니다.
Apache Kafka의 Event Streaming Platform과 RabbitMQ와 같은 Message Broker는 모두 이 같은 장점을 가지고 있습니다. - 차이점
두 플랫폼의 첫 번째 차이는 Message를 보내는 방식입니다.
Apache Kafka는 Consumer가 주도적으로 Message를 가져오는 Pull-based model입니다. 때문에 Message의 소비량은 순전히 Consumer의 처리 역량에 달려있게 되어 효율적으로 Message를 소비할 수 있습니다.
하지만 RabbitMQ는 Broker가 Message를 보내는데 역량을 쏟는 Push-based model입니다. 덕분에 Message를 분배하고 발신하는데 빠르지만 Consumer가 Message를 받을 수 없는 상황이 되어도 신경쓰지 않는다는 단점이 있습니다.
두 번째 차이는 Message의 저장 방식입니다.
Apache Kafka는 Message를 메모리가 아닌 디스크에 저장해 영속성(persistency)을 가지고 있습니다. 보통 성능을 위해서 Message를 메모리에만 저장하는 다른 메세지 큐와 달리 Kafka는 이러한 방식을 택했는데요. 이 같은 특징 덕분에 Kafka를 분산 로그 시스템이라고도 부릅니다. Kafka의 Message는 로그 형태로 디스크에 남기 때문에 log replay, retention policy 등의 기능을 사용할 수 있습니다.
하지만 Rabbitmq는 Message를 메모리에만 저장합니다. 그래서 Rabbitmq Message의 생애주기(lifetime)은 Producer가 Message를 생산하고 Consumer가 소비해 ACK를 보내기까지가 전부입니다. 한 번 소비되면 Message는 저장매체에서 사라지기 때문에 다시 볼 수 없다는 차이점이 있습니다.
세 번째 차이는 Message의 전달 방식입니다.
Apache Kafka는 Broker가 Topic이라는 매체를 사용해 Producer와 Consumer를 중개합니다. 또 Topic은 여러 개의 Partition으로 나뉘어져 있어 Message를 Parallel하게 관리할 수 있습니다. 각 Partition에는 offset이라고 하는 순서 정보가 들어가 있기 때문에 Partition 별로 Message의 순서를 보장할 수도 있다는 특징이 있습니다.
하지만 RabbitMQ는 Topic이 아닌 메세지 큐 방식을 사용하기 때문에 Producer와 Consumer 간의 Message 중개 보다는 Routing, Spanning같은 발신에 특화되어 있습니다. Kafka와 달리 Partition이 없기 때문에 철저하게 FIFO(First In, First Out) 방식의 큐 모델을 가지게 된다는 특징이 있습니다.
'Devops' 카테고리의 다른 글
CKA(Certified Kubernetes Administrator) 자격증 시험 및 합격 후기 (2) | 2022.08.13 |
---|---|
Kubernetes에 존재하는 Metrics Server란 무엇일까? 그리고 어떻게 해야 잘 사용할 수 있을까? (1) | 2022.06.26 |
Mysql Operator로 Kubernetes 환경에서 Mysql DB 운영하기 (3) | 2022.05.31 |
오픈소스 컨테이너 레지스트리 Harbor로 컨테이너 레지스트리 간 복제 수행하기 (0) | 2022.05.15 |
Skaffold + buildpack 으로 쉽게 CI/CD Kubernetes pipeline 구성하기 (0) | 2021.07.30 |