# 주키퍼 및 브로커 실행
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ bin/kafka-server-start.sh -daemon config/server.properties
# hello.kafka 토픽 생성
bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 --topic hello.kafka
# hello.kafka.2 토픽생성
--partitions 3 : 파티션 갯수 3개
--replication-factor 1 : 토픽의 파티션을 복제할 복제 개수를 지정, 1은 복제를 하지 않고 사용한다는 의미
2이면 1개의 복제본을 사용하겠다는 의미, replication-factor의 최대 설정은 클러스터에 존재하는브로커에 갯수까지 가능하다.(즉, 브로커가 1개면 1개, 2개면 2개까지 replication-factor 설정 가능)
--config retention.ms : 토픽에 데이터를 유지하는 기간을 뜻한다. 172800000는 2일을 ms 단위로 나타낸것 이다.
bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 --partitions 3 --replication-factor 1 --config retention.ms=172800000 --topic hello.kafka.2
# kafka 리스트 확인하기
bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --list
# 토픽 상세(describe) 조회하기
bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --describe --topic hello.kafka.2
guru@master:~/kafka_2.12-2.5.0$ bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --describe --topic hello.kafka.2
Topic: hello.kafka.2 PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=172800000
Topic: hello.kafka.2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka.2 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka.2 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
# 토픽 partition 갯수 변경 하기(기존 1개 -> 4개로 변경)
bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --topic hello.kafka --alter --partitions 4
# 토픽 retention.ms(데이터가 존재하는 시간) 변경하기
bin/kafka-configs.sh --bootstrap-server my-kafka:9092 --entity-type topics --entity-name hello.kafka --alter --add-config retention.ms=86400000
# kafka-console-producer.sh를 통해서 브로커에 메세지 전달 하기
bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic hello.kafka
bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic hello.kafka
>hello
>kafka
>0
>1
>2
>3
>4
>5
# kafka-console-producer.sh를 이용하여 메세지키로 전송하기
parse.key=true로 두면 메세지키를 추가 할 수 있다. key.separator=: 는 ':'를 구분로 하겠다는 뜻이다.(없을시 TAB delimiter(\t)이 구분자다.
※ 메세지키가 동일할 경우 동일 파티션(1번이면 1번으로만)만 저장된다.(키값을 해쉬값으로 만드는데 그 값이 파티션과 맵핑을 한다.)
bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic hello.kafka --property "parse.key=true" --property "key.separator=:"
guru@master:~/kafka_2.12-2.5.0$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic hello.kafka --property "parse.key=true" --property "key.separator=:"
>key1:no1
>key2:no2
>key3:no3
# 토픽에 전송한 데이터 확인하기
bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic hello.kafka --from-beginning
# 토픽에 전송한 데이터 키값 형태로 확인하기
--property print.key=true #메세지 키값 형태로 출력
--property key.seperator="-" #'-'형태로 출력
--group #컨슈머 그룹을 생성, 컨슈머 그룹은 1개 이상의 컨슈머로 구성. 컨슈머가 가져가 topic 메세지에 대해서는 commit을 한다. 커밋정보는 __consumer_offsets 이름의 내부 토픽에 저장된다.
bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic hello.kafka --property print.key=true --property key.seperator="-" --group hello-group --from-beginning
# 컨슈머 그룹 확인하기
bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 --list
# 컨슈머 그룹 상세적으로 확인하기
bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 --group hello-group --describe
guru@master:~/kafka_2.12-2.5.0$ bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 --group hello-group --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
hello-group hello.kafka 3 0 0 0 - - -
hello-group hello.kafka 2 3 3 0 - - -
hello-group hello.kafka 1 3 3 0 - - -
hello-group hello.kafka 0 7 7 0 - - -
# 테스트를 위한 커맨드라인툴 명령어
bin/kafka-verifiable-producer.sh --bootstrap-server my-kafka:9092 --max-message 10 --topic verify-test
{"timestamp":1635969375937,"name":"startup_complete"}
[2021-11-04 04:56:16,267] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {verify-test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
{"timestamp":1635969376418,"name":"producer_send_success","key":null,"value":"0","offset":0,"topic":"verify-test","partition":0}
{"timestamp":1635969376423,"name":"producer_send_success","key":null,"value":"1","offset":1,"topic":"verify-test","partition":0}
{"timestamp":1635969376423,"name":"producer_send_success","key":null,"value":"2","offset":2,"topic":"verify-test","partition":0}
{"timestamp":1635969376423,"name":"producer_send_success","key":null,"value":"3","offset":3,"topic":"verify-test","partition":0}
{"timestamp":1635969376423,"name":"producer_send_success","key":null,"value":"4","offset":4,"topic":"verify-test","partition":0}
{"timestamp":1635969376424,"name":"producer_send_success","key":null,"value":"5","offset":5,"topic":"verify-test","partition":0}
{"timestamp":1635969376426,"name":"producer_send_success","key":null,"value":"6","offset":6,"topic":"verify-test","partition":0}
{"timestamp":1635969376426,"name":"producer_send_success","key":null,"value":"7","offset":7,"topic":"verify-test","partition":0}
{"timestamp":1635969376426,"name":"producer_send_success","key":null,"value":"8","offset":8,"topic":"verify-test","partition":0}
{"timestamp":1635969376427,"name":"producer_send_success","key":null,"value":"9","offset":9,"topic":"verify-test","partition":0}
{"timestamp":1635969376448,"name":"shutdown_complete"}
{"timestamp":1635969376450,"name":"tool_data","sent":10,"acked":10,"target_throughput":-1,"avg_throughput":19.41747572815534}
# 토픽의 데이터를 삭제하기
bin/kafka-delete-records.sh --bootstrap-server my-kafka:9092
'G.Code > kafka' 카테고리의 다른 글
[kafka] 5.카프카 실전 프로젝트 (0) | 2021.12.09 |
---|---|
[kafka] 4.카프카 상세 개념 설명 (0) | 2021.11.23 |
[kafka] 3.카프카 기본개념 설명 (0) | 2021.11.04 |
[kafka] kafka ec2 설치 (0) | 2021.10.28 |
[kafka] kafka가 데이터 파이프 라인에 적합한 이유 (0) | 2021.10.27 |