본문 바로가기
G.Code/kafka

[kafka] kafka 기본 명령어

by 한선배 2021. 11. 3.
반응형

# 주키퍼 및 브로커 실행

$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ bin/kafka-server-start.sh -daemon config/server.properties

 

# hello.kafka 토픽 생성

bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 --topic hello.kafka 

 

# hello.kafka.2 토픽생성
--partitions 3 : 파티션 갯수 3개

--replication-factor 1 : 토픽의 파티션을 복제할 복제 개수를 지정, 1은 복제를 하지 않고 사용한다는 의미

                             2이면 1개의 복제본을 사용하겠다는 의미, replication-factor의 최대 설정은 클러스터에 존재하는브로커에 갯수까지 가능하다.(즉, 브로커가 1개면 1개, 2개면 2개까지 replication-factor 설정 가능)
--config retention.ms : 토픽에 데이터를 유지하는 기간을 뜻한다. 172800000는 2일을 ms 단위로 나타낸것 이다.

bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 --partitions 3 --replication-factor 1 --config retention.ms=172800000 --topic hello.kafka.2

 

# kafka 리스트 확인하기

bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --list

 

# 토픽 상세(describe) 조회하기

bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --describe --topic hello.kafka.2

guru@master:~/kafka_2.12-2.5.0$ bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --describe --topic hello.kafka.2
Topic: hello.kafka.2	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824,retention.ms=172800000
	Topic: hello.kafka.2	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: hello.kafka.2	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: hello.kafka.2	Partition: 2	Leader: 0	Replicas: 0	Isr: 0

 

# 토픽 partition 갯수 변경 하기(기존 1개 -> 4개로 변경)

bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --topic hello.kafka --alter --partitions 4

 

# 토픽 retention.ms(데이터가 존재하는 시간) 변경하기

bin/kafka-configs.sh --bootstrap-server my-kafka:9092 --entity-type topics --entity-name hello.kafka --alter --add-config retention.ms=86400000

 

# kafka-console-producer.sh를 통해서 브로커에 메세지 전달 하기

bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic hello.kafka

bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic hello.kafka
>hello
>kafka
>0
>1
>2
>3
>4
>5

# kafka-console-producer.sh를 이용하여 메세지키로 전송하기

parse.key=true로 두면 메세지키를 추가 할 수 있다. key.separator=: 는 ':'를 구분로 하겠다는 뜻이다.(없을시 TAB delimiter(\t)이 구분자다.

※ 메세지키가 동일할 경우 동일 파티션(1번이면 1번으로만)만 저장된다.(키값을 해쉬값으로 만드는데 그 값이 파티션과 맵핑을 한다.)

bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic hello.kafka --property "parse.key=true" --property "key.separator=:"

guru@master:~/kafka_2.12-2.5.0$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic hello.kafka --property "parse.key=true" --property "key.separator=:"
>key1:no1
>key2:no2
>key3:no3

# 토픽에 전송한 데이터 확인하기

bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic hello.kafka --from-beginning

 

# 토픽에 전송한 데이터 키값 형태로 확인하기

--property print.key=true #메세지 키값 형태로 출력

--property key.seperator="-" #'-'형태로 출력

--group #컨슈머 그룹을 생성, 컨슈머 그룹은 1개 이상의 컨슈머로 구성. 컨슈머가 가져가 topic 메세지에 대해서는 commit을 한다. 커밋정보는 __consumer_offsets 이름의 내부 토픽에 저장된다.

bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic hello.kafka --property print.key=true --property key.seperator="-" --group hello-group --from-beginning

 

# 컨슈머 그룹 확인하기

bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 --list

 

# 컨슈머 그룹 상세적으로 확인하기

bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 --group hello-group --describe

guru@master:~/kafka_2.12-2.5.0$ bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 --group hello-group --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
hello-group     hello.kafka     3          0               0               0               -               -               -
hello-group     hello.kafka     2          3               3               0               -               -               -
hello-group     hello.kafka     1          3               3               0               -               -               -
hello-group     hello.kafka     0          7               7               0               -               -               -

 

# 테스트를 위한 커맨드라인툴 명령어

bin/kafka-verifiable-producer.sh --bootstrap-server my-kafka:9092 --max-message 10 --topic verify-test

{"timestamp":1635969375937,"name":"startup_complete"}
[2021-11-04 04:56:16,267] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {verify-test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
{"timestamp":1635969376418,"name":"producer_send_success","key":null,"value":"0","offset":0,"topic":"verify-test","partition":0}
{"timestamp":1635969376423,"name":"producer_send_success","key":null,"value":"1","offset":1,"topic":"verify-test","partition":0}
{"timestamp":1635969376423,"name":"producer_send_success","key":null,"value":"2","offset":2,"topic":"verify-test","partition":0}
{"timestamp":1635969376423,"name":"producer_send_success","key":null,"value":"3","offset":3,"topic":"verify-test","partition":0}
{"timestamp":1635969376423,"name":"producer_send_success","key":null,"value":"4","offset":4,"topic":"verify-test","partition":0}
{"timestamp":1635969376424,"name":"producer_send_success","key":null,"value":"5","offset":5,"topic":"verify-test","partition":0}
{"timestamp":1635969376426,"name":"producer_send_success","key":null,"value":"6","offset":6,"topic":"verify-test","partition":0}
{"timestamp":1635969376426,"name":"producer_send_success","key":null,"value":"7","offset":7,"topic":"verify-test","partition":0}
{"timestamp":1635969376426,"name":"producer_send_success","key":null,"value":"8","offset":8,"topic":"verify-test","partition":0}
{"timestamp":1635969376427,"name":"producer_send_success","key":null,"value":"9","offset":9,"topic":"verify-test","partition":0}
{"timestamp":1635969376448,"name":"shutdown_complete"}
{"timestamp":1635969376450,"name":"tool_data","sent":10,"acked":10,"target_throughput":-1,"avg_throughput":19.41747572815534}

 

# 토픽의 데이터를 삭제하기

bin/kafka-delete-records.sh --bootstrap-server my-kafka:9092 

반응형