Spring Cloud Stream Binder Kafka를 위한 글.

버전이 올라오면서 @OUTPUT 어노테이션이 Deprecated 돼서 찾아봤는데 StreamBridge를 이용하면 된다 한다.

일단 처음부터 봐봅시다.

implementation group: 'org.springframework.cloud', name: 'spring-cloud-stream-binder-kafka', version: '3.2.1'

를 추가하면 5개의 라이브러리가 추가 된다.

Gradle: org.springframework.cloud:spring-cloud-function-context:3.2.1
Gradle: org.springframework.cloud:spring-cloud-function-core:3.2.1
Gradle: org.springframework.cloud:spring-cloud-stream:3.2.1
Gradle: org.springframework.cloud:spring-cloud-stream-binder-kafka:3.2.1
Gradle: org.springframework.cloud:spring-cloud-stream-binder-kafka-core:3.2.1

그럼 시작해보자.

우선 application.yml에 cloud kafka를 설정해줍시다.

cloud:
  function:
    definition: consumer;producer
  stream:
    kafka:
      bindings:
        producer-out-0:
          producer:
            configuration:
              key:
                serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer-in-0:
          consumer:
            configuration:
              key:
                deserializer: org.apache.kafka.common.serialization.StringDeserializer
              value:
                deserializer: com.example.studyspringboot.cloud.stream.binder.kafka.MessageDeserializer # (2)
      binder:
        brokers: localhost:9094
    bindings:
      producer-out-0: # (1)
        destination: carbtoon.test.service # topic
        contentType: application/json
      consumer-in-0:
        destination: carbtoon.test.service  # topic
        consumer:
          use-native-decoding: true # (3) 커스텀 deserializer를 사용하기 위해 활성화를 위해 true로 설정합니다.

설정이 끝난다면 Producer를 위한 클래스를 하나 작성.


@RequiredArgsConstructor
@Component
public class KafkaProducer {

    private final StreamBridge streamBridge; // StreamBridge를 주입해주자.

    @Async
    public void send(SpringbootKRPayload payload) {
        streamBridge.send("producer-out-0", MessageBuilder
                //  ↑ application.yml의 (1)과 같아야합니다. 
                // send의 첫번째 String parameter 이름은 String bindingName 입니다.
                .withPayload(payload)
                .setHeader(KafkaHeaders.MESSAGE_KEY, UUID.randomUUID().toString())
                .build());
    }

}

이렇게 Producer가 생성이 끝나면 다음으로는 Consumer 클래스를 생성.


@Component
public class KafkaConsumer {

    @Bean
    public Consumer<Message> consumer() {
        return message -> System.out.println(message);
    }

}

그리고 테스트를위한 Payload를 하나 만듭니다.


@Getter
@Setter
@ToString
public class SpringbootKRPayload {

    private String host;

    private String title;

    private String name;

    @JsonProperty("payload_type")
    private PayloadType payloadType;

    @Builder
    public SpringbootKRPayload(String host, String title, String name, PayloadType payloadType) {
        this.host = host;
        this.title = title;
        this.name = name;
        this.payloadType = payloadType;
    }

}

public enum PayloadType {
    CREATED, UPDATED, DELETED
}

그리고 application.yml의 (2)에 필요한 커스텀 deserializer 클래스를 하나 만듭니다.

public class MessageDeserializer implements Deserializer<SpringbootKRPayloadDeserializer> {

    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public SpringbootKRPayloadDeserializer deserialize(String topic, byte[] data) {
        try {
            return objectMapper.readValue(new String(data), SpringbootKRPayloadDeserializer.class);
        } catch (IOException e) {
            throw new SerializationException(e);
        }
    }
}

위의 클래스를 사용하기 위해서는 application.yml의 (3)을 true로 설정해 주어야합니다.

그리고 다음으로 위의 코드중 Deserializer를 위한 SpringbootKRPayloadDeserializer를 만들어 줍니다.

public class SpringbootKRPayloadDeserializer {

    @JsonProperty("payload_type")
    private String payloadType;

    @JsonProperty("host")
    private String host;

    @JsonProperty("name")
    private String name;

    @JsonProperty("title")
    private String title;

}

그리고 테스트를 위한 스케줄러를 하나 만들어 봅니다. (스케줄러를 만들었으니 application에 @EnableScheduling 추가)


@RequiredArgsConstructor
@Component
public class Scheduler {

    private KafkaProducer kafkaProducer;

    @Scheduled(cron = "*/5 * * * * *")
    public void sendMessage() {
        kafkaProducer.send(SpringbootKRPayload.builder()
                .host("SPRINGBOOT.KR")
                .title("SPRING CLOUD STREAM KAFKA")
                .name("JINWOO-LEE")
                .payloadType(PayloadType.CREATED)
                .build());
    }

}

이제 실행하면??

images

GenericMessage 
[payload=SpringbootKRPayloadDeserializer(
payloadType=CREATED, 
host=SPRINGBOOT.KR, 
name=JINWOO-LEE, 
title=SPRING CLOUD STREAM KAFKA), 
headers={deliveryAttempt=1, 
kafka_timestampType=CREATE_TIME, 
kafka_receivedTopic=carbtoon.test.service, 
skip-input-type-conversion=true, 
kafka_offset=158, 
scst_nativeHeadersPresent=true, 
kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@15d20e35, 
source-type=streamBridge, 
id=6a8cd101-3ac7-59ef-3d54-0429e761a4e8, 
kafka_receivedPartitionId=0, 
contentType=application/json, 
kafka_receivedTimestamp=1642531182377, 
kafka_groupId=anonymous.fa9c0bcf-ba1b-4a0e-be4e-4f997785cf91, 
timestamp=1642531182464}]

성공이다.

images

참조