Spring Cloud Stream Binder Kafka를 위한 글.
버전이 올라오면서 @OUTPUT 어노테이션이 Deprecated 돼서 찾아봤는데 StreamBridge를 이용하면 된다 한다.
일단 처음부터 봐봅시다.
implementation group: 'org.springframework.cloud', name: 'spring-cloud-stream-binder-kafka', version: '3.2.1'
를 추가하면 5개의 라이브러리가 추가 된다.
Gradle: org.springframework.cloud:spring-cloud-function-context:3.2.1
Gradle: org.springframework.cloud:spring-cloud-function-core:3.2.1
Gradle: org.springframework.cloud:spring-cloud-stream:3.2.1
Gradle: org.springframework.cloud:spring-cloud-stream-binder-kafka:3.2.1
Gradle: org.springframework.cloud:spring-cloud-stream-binder-kafka-core:3.2.1
그럼 시작해보자.
우선 application.yml에 cloud kafka를 설정해줍시다.
cloud:
function:
definition: consumer;producer
stream:
kafka:
bindings:
producer-out-0:
producer:
configuration:
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-in-0:
consumer:
configuration:
key:
deserializer: org.apache.kafka.common.serialization.StringDeserializer
value:
deserializer: com.example.studyspringboot.cloud.stream.binder.kafka.MessageDeserializer # (2)
binder:
brokers: localhost:9094
bindings:
producer-out-0: # (1)
destination: carbtoon.test.service # topic
contentType: application/json
consumer-in-0:
destination: carbtoon.test.service # topic
consumer:
use-native-decoding: true # (3) 커스텀 deserializer를 사용하기 위해 활성화를 위해 true로 설정합니다.
설정이 끝난다면 Producer를 위한 클래스를 하나 작성.
@RequiredArgsConstructor
@Component
public class KafkaProducer {
private final StreamBridge streamBridge; // StreamBridge를 주입해주자.
@Async
public void send(SpringbootKRPayload payload) {
streamBridge.send("producer-out-0", MessageBuilder
// ↑ application.yml의 (1)과 같아야합니다.
// send의 첫번째 String parameter 이름은 String bindingName 입니다.
.withPayload(payload)
.setHeader(KafkaHeaders.MESSAGE_KEY, UUID.randomUUID().toString())
.build());
}
}
이렇게 Producer가 생성이 끝나면 다음으로는 Consumer 클래스를 생성.
@Component
public class KafkaConsumer {
@Bean
public Consumer<Message> consumer() {
return message -> System.out.println(message);
}
}
그리고 테스트를위한 Payload를 하나 만듭니다.
@Getter
@Setter
@ToString
public class SpringbootKRPayload {
private String host;
private String title;
private String name;
@JsonProperty("payload_type")
private PayloadType payloadType;
@Builder
public SpringbootKRPayload(String host, String title, String name, PayloadType payloadType) {
this.host = host;
this.title = title;
this.name = name;
this.payloadType = payloadType;
}
}
public enum PayloadType {
CREATED, UPDATED, DELETED
}
그리고 application.yml의 (2)에 필요한 커스텀 deserializer 클래스를 하나 만듭니다.
public class MessageDeserializer implements Deserializer<SpringbootKRPayloadDeserializer> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public SpringbootKRPayloadDeserializer deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(new String(data), SpringbootKRPayloadDeserializer.class);
} catch (IOException e) {
throw new SerializationException(e);
}
}
}
위의 클래스를 사용하기 위해서는 application.yml의 (3)을 true로 설정해 주어야합니다.
그리고 다음으로 위의 코드중 Deserializer를 위한 SpringbootKRPayloadDeserializer를 만들어 줍니다.
public class SpringbootKRPayloadDeserializer {
@JsonProperty("payload_type")
private String payloadType;
@JsonProperty("host")
private String host;
@JsonProperty("name")
private String name;
@JsonProperty("title")
private String title;
}
그리고 테스트를 위한 스케줄러를 하나 만들어 봅니다. (스케줄러를 만들었으니 application에 @EnableScheduling 추가)
@RequiredArgsConstructor
@Component
public class Scheduler {
private KafkaProducer kafkaProducer;
@Scheduled(cron = "*/5 * * * * *")
public void sendMessage() {
kafkaProducer.send(SpringbootKRPayload.builder()
.host("SPRINGBOOT.KR")
.title("SPRING CLOUD STREAM KAFKA")
.name("JINWOO-LEE")
.payloadType(PayloadType.CREATED)
.build());
}
}
이제 실행하면??
GenericMessage
[payload=SpringbootKRPayloadDeserializer(
payloadType=CREATED,
host=SPRINGBOOT.KR,
name=JINWOO-LEE,
title=SPRING CLOUD STREAM KAFKA),
headers={deliveryAttempt=1,
kafka_timestampType=CREATE_TIME,
kafka_receivedTopic=carbtoon.test.service,
skip-input-type-conversion=true,
kafka_offset=158,
scst_nativeHeadersPresent=true,
kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@15d20e35,
source-type=streamBridge,
id=6a8cd101-3ac7-59ef-3d54-0429e761a4e8,
kafka_receivedPartitionId=0,
contentType=application/json,
kafka_receivedTimestamp=1642531182377,
kafka_groupId=anonymous.fa9c0bcf-ba1b-4a0e-be4e-4f997785cf91,
timestamp=1642531182464}]
성공이다.
참조