Message Broker/Kafka

# Kafka - 3 # Spring Boot - Kafka - Elasticsearch 예제 실습

skysoo1111 2019. 11. 12. 17:06

지난 글 (# Kafka - 2 # Kafka Multi Cluster 구성)에 이어서 구성된 Kafka 서버에 데이터를 넣고 확인하는 간단한 예제를 만들어서 테스트를 진행 해보겠다. 예제의 Test용 Application은 SpringBoot로 데이터를 만들 것이고 전체 구성은 아래와 같다.

 

실습 구성도

[ 실습전 준비 사항]

1. kafka 서버 기동

2. elasticsearch 서버 기동

3. Spring 기본 동작 원리

 

Step 1. gradle 프로젝트 생성

 

Step 2. gradle dependencies 추가

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
    compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '6.5.4'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.kafka:spring-kafka'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
}

Step 3. application.properties 작성

spring.kafka.bootstrap-servers=<Broker1 IP>:9092,<Broker2 IP>:9092,<Broker3 IP>:9092
spring.kafka.topic-name=testkafka
logging.level.root: info

spring.elasticsearch.host=<ES Master IP>
spring.elasticsearch.port=9200                       #ES Port 번호
spring.elasticsearch.clusterName=docker-cluster      #ES Cluster명
spring.elasticsearch.index.name=kafka_test           #ES Index명
spring.elasticsearch.index.type=kafka_test           #ES Type명

Step 4-1 MainApplication.class 작성

/**
 * @author skysoo
 * @version 1.0.0
 * @since 2019-11-12 오후 3:33
 **/
@Slf4j
@RestController
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class MainApplication {
    public static void main(String[] args) {
        SpringApplication.run(MainApplication.class,args);
    }
}

Step 4-2. KafkaSender.class 작성 => Kafka Producer 기능 클래스

/**
 * @author skysoo
 * @version 1.0.0
 * @since 2019-11-12 오후 3:34
 **/
@Slf4j
@RestController
@Configuration
public class KafkaSender {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value(value = "${spring.kafka.topic-name}")
    private String topic;

    @Bean
    public Map<String,Object> producerConfig(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,100000000);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");
        return props;
    }

    @Bean
    public ProducerFactory<String,String> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String,String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }

    @PostMapping("/send")
    public void sender(@RequestBody String msg){
        log.info(">>> kafka sender data : {}",msg);
        kafkaTemplate().send(topic,msg);
    }
}

Step 4-3. KafkaReceiver.class 작성 => Kafka Consumer 기능 클래스

@Slf4j
@EnableKafka
@RestController
@Configuration
public class KafkaReceiver {

    @Autowired
    private EsKafkaService esKafkaService;

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value(value = "${spring.kafka.topic-name}")
    private String topic;

    @Bean
    public Map<String,Object> consumerConfig(){
        Map<String, Object> props  = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,topic+"-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,100000000);
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,100000000);
        return props;
    }

    @Bean
    public ConsumerFactory<String,String> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @PostMapping("/consumer")
    public void consumer(){
        List<Map<String,String>> list;
        Map<String,String> map;

        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig());
        kafkaConsumer.subscribe(Arrays.asList(topic));
        while (true){
            list = new ArrayList<>();
            map = new HashMap<>();
            ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(10));
            for (ConsumerRecord<String,String> record : records){
                if (record.value()!= null){
                    map.put(String.valueOf(record.value().hashCode()),record.value());
                    log.info("Topic : {}, Partition : {}, Offset : {}, Key : {}", record.topic(),record.partition(),record.offset(),record.key());
                }
            }
            list.add(map);
            try {
                if (!list.get(0).isEmpty())
                    esKafkaService.bulk(list);
            } catch (IOException e) {
                log.error("",e);
            } catch (Exception e) {
                log.error("",e);
            }
        }
    }
}

Step 4-4. EsConfiguration.class 작성 => Elasticsearch Config 선언 클래스

/**
 * @author skysoo
 * @version 1.0.0
 * @since 2019-11-12 오후 3:40
 **/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class EsConfiguration {
    @Value("${elasticsearch.host}")
    private String clusterNodes;
    @Value("${elasticsearch.port}")
    private Integer clusterPort;

    private RestHighLevelClient restHighLevelClient;

    public EsConfiguration(RestHighLevelClient restHighLevelClient) {
        this.restHighLevelClient = restHighLevelClient;
    }

    // Context에서 life cycle 관리
    @Bean
    public RestHighLevelClient createInstance() throws Exception {
        restHighLevelClient = new RestHighLevelClient(RestClient.builder(
                new HttpHost(clusterNodes,clusterPort,"http")
        ));
        return restHighLevelClient;
    }
}

Step 4-5. EsKafkaService.class 작성 => Elasticsearch Data Save 기능 클래스

/**
 * @author skysoo
 * @version 1.0.0
 * @since 2019-11-12 오후 3:35
 **/

@Slf4j
@Service
public class EsKafkaService {
    @Value("${elasticsearch.host}")
    private String clusterNodes;
    @Value("${elasticsearch.port}")
    private Integer clusterPort;
    @Value(value = "${elasticsearch.index.name}")
    String TYPE;
    @Value(value = "${elasticsearch.index.type}")
    String INDEX;

    private final ObjectMapper objectMapper;

    /**
     * @author skysoo
     * Date : 2019-11-11
     * EsKafkaService와 EsConfiguration 클래스간의 순환참조 오류 발생시 setter로 의존성을 주입받아라.
     **/
    private final RestHighLevelClient restHighLevelClient;

    public EsKafkaService(RestHighLevelClient restHighLevelClient, ObjectMapper objectMapper) {
        this.restHighLevelClient = restHighLevelClient;
        this.objectMapper = objectMapper;
    }

    public void bulk(List<Map<String,String>> listData) throws Exception {
        Stream<IndexRequest> indexRequestStream =  listData.stream().map(vibrationData -> new IndexRequest()
                .index(INDEX)
                .type(TYPE)
                .source(objectMapper.convertValue(vibrationData,Map.class)));

        IndexRequest[] indexRequests = indexRequestStream.toArray(IndexRequest[]::new);

        // bulkRequest 사용시 connection 한번으로 많은 데이터를 Insert 할 수 있다.
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.add(indexRequests);
        log.info(">>> bulk count = {}",indexRequests.length);

        restHighLevelClient.bulk(bulkRequest,RequestOptions.DEFAULT);
        log.info(">>> Successful data save with ES<<<");
    }
}

Step 5. Application Run

여기까지 하면 필요한 파일은 다 작성한 것이고 기동을 하면 아래와 같이 8080 포트를 사용하는 Application이 뜰 것이다.

 

Step 6. Kafka Producer 데이터 적재

          => 나는 파이어폭스 플러그인 중에 RESTED라는 플러그인을 사용해서 REST 명령을 Application으로 보냈다.

application kafka 수신 로그 확인

Step 7. Kafka Consumer 데이터 꺼낸 후 ES에 적재

application ES 적재 로그 확인

Step 8. ES에 적재된 데이터 확인

# curl로 ES에 저장된 kafka_test index의 내용 검색할 수도 있다.
curl -X GET "<ES Master 서버 IP>:9200/kafka_test/_search

 

'Message Broker > Kafka' 카테고리의 다른 글

# Kafka - 2 # Kafka Multi Cluster 구성  (2) 2019.11.11
# Kafka - 1 # Kafka 개념  (0) 2019.11.11