❗ Kafka를 실행하기 앞서 로컬 환경에 JAVA 8+ 이상이 설치되어야 합니다.
Step 1: Get Kafka
https://kafka.apache.org/downloads
이곳에서 최신 카프카 릴리즈를 다운로드 받습니다. 이 때 Source
가 아닌 Binary
를 다운로드 받습니다.
Scala 2.12, 2.13은 무슨차이가 있는지 모르겠지만 아무거나 받아도 무방할 듯 합니다.
1
2
3
4
$ mkdir /downloads
$ wget -P /downloads/ https://mirror.navercorp.com/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
$ tar -xzf /downloads/kafka_2.13-2.7.0.tgz
$ cd kafka_2.13-2.7.0
Step 2: Start the Kafka Environment
Zookeeper 실행
새로운 터미널 세션을 열어서 Zookeeper를 실행시킵니다.
1
2
3
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Brocker 서버 실행
터미널을 하나 더 열어서 Broker 서버를 실행시킵니다.
1
2
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
Step 3: Create A Topic to store your events
quickstart-events
라는 Topic을 생성합니다.
1
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
새 Topic의 파티션 카운트 같은 세부정보도 확인할 수 있습니다.
1
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
- 실행 결과
1 2
Topic:quickstart-events PartitionCount:1 ReplicationFactor:1 Configs: Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Step 4: Write some events into the Topic
터미널을 하나 더 열어서 Producer 클라이언트 실행 후 quickstart-events
토픽에 이벤트를 작성합니다.
1
2
3
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
> This is my first event
> This is my second event
Producer 클라이언트는 언제든지
ctrl-c
로 중지할 수 있습니다.
Step 5: Read the Events
터미널을 하나 더 열어서 Consumer 클라이언트를 실행해 방금 만든 이벤트를 읽습니다.
1
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
- 실행결과
1 2
This is my first event This is my second event
consumer 클라이언트는 언제든지
ctrl-c
로 중지할 수 있습니다.
이벤트는 kafka에 지속적으로 저장되기 떄문에, 필요한 만큼의 consumer로 원하는 횟수만큼 읽어갈 수 있습니다.
Step 6:Process Your Events with Kafka Streams
Kafka Streams에서 많이 사용하는 wordCount
알고리즘입니다.
1
2
3
4
5
6
7
8
KStream<String, String> textLines = builder.stream("quickstart-events");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((keyIgnored, word) -> word)
.count();
wordCounts.toStream().to("output-topic"), Produced.with(Serdes.String(), Serdes.Long()));
Step 7: Terminate the Kafka environment
- producer와 consumer 클라이언트를 중단하지 않았다면
ctrl-c
로 중지합니다. - broker를
ctrl-c
로 중지합니다. - 마지막으로
ctrl-c
로 zookeeper를 중지합니다. - 앞에서 생성한 이벤트 등 로컬 Kafka 환경에 있는 모든 데이터를 삭제하고 싶다면 아래의 명령어를 실행합니다:
1
$ rm -rf /tmp/kafka-logs /tmp/zookeeper