지난 글 (# Kafka - 2 # Kafka Multi Cluster 구성)에 이어서 구성된 Kafka 서버에 데이터를 넣고 확인하는 간단한 예제를 만들어서 테스트를 진행 해보겠다. 예제의 Test용 Application은 SpringBoot로 데이터를 만들 것이고 전체 구성은 아래와 같다.
[ 실습전 준비 사항]
1. kafka 서버 기동
2. elasticsearch 서버 기동
3. Spring 기본 동작 원리
Step 1. gradle 프로젝트 생성
Step 2. gradle dependencies 추가
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '6.5.4'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
Step 3. application.properties 작성
spring.kafka.bootstrap-servers=<Broker1 IP>:9092,<Broker2 IP>:9092,<Broker3 IP>:9092
spring.kafka.topic-name=testkafka
logging.level.root: info
spring.elasticsearch.host=<ES Master IP>
spring.elasticsearch.port=9200 #ES Port 번호
spring.elasticsearch.clusterName=docker-cluster #ES Cluster명
spring.elasticsearch.index.name=kafka_test #ES Index명
spring.elasticsearch.index.type=kafka_test #ES Type명
Step 4-1 MainApplication.class 작성
/**
* @author skysoo
* @version 1.0.0
* @since 2019-11-12 오후 3:33
**/
@Slf4j
@RestController
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class MainApplication {
public static void main(String[] args) {
SpringApplication.run(MainApplication.class,args);
}
}
Step 4-2. KafkaSender.class 작성 => Kafka Producer 기능 클래스
/**
* @author skysoo
* @version 1.0.0
* @since 2019-11-12 오후 3:34
**/
@Slf4j
@RestController
@Configuration
public class KafkaSender {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value(value = "${spring.kafka.topic-name}")
private String topic;
@Bean
public Map<String,Object> producerConfig(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,100000000);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"gzip");
return props;
}
@Bean
public ProducerFactory<String,String> producerFactory(){
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
@PostMapping("/send")
public void sender(@RequestBody String msg){
log.info(">>> kafka sender data : {}",msg);
kafkaTemplate().send(topic,msg);
}
}
Step 4-3. KafkaReceiver.class 작성 => Kafka Consumer 기능 클래스
@Slf4j
@EnableKafka
@RestController
@Configuration
public class KafkaReceiver {
@Autowired
private EsKafkaService esKafkaService;
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value(value = "${spring.kafka.topic-name}")
private String topic;
@Bean
public Map<String,Object> consumerConfig(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG,topic+"-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,100000000);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,100000000);
return props;
}
@Bean
public ConsumerFactory<String,String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@PostMapping("/consumer")
public void consumer(){
List<Map<String,String>> list;
Map<String,String> map;
KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig());
kafkaConsumer.subscribe(Arrays.asList(topic));
while (true){
list = new ArrayList<>();
map = new HashMap<>();
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String,String> record : records){
if (record.value()!= null){
map.put(String.valueOf(record.value().hashCode()),record.value());
log.info("Topic : {}, Partition : {}, Offset : {}, Key : {}", record.topic(),record.partition(),record.offset(),record.key());
}
}
list.add(map);
try {
if (!list.get(0).isEmpty())
esKafkaService.bulk(list);
} catch (IOException e) {
log.error("",e);
} catch (Exception e) {
log.error("",e);
}
}
}
}
Step 4-4. EsConfiguration.class 작성 => Elasticsearch Config 선언 클래스
/**
* @author skysoo
* @version 1.0.0
* @since 2019-11-12 오후 3:40
**/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class EsConfiguration {
@Value("${elasticsearch.host}")
private String clusterNodes;
@Value("${elasticsearch.port}")
private Integer clusterPort;
private RestHighLevelClient restHighLevelClient;
public EsConfiguration(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}
// Context에서 life cycle 관리
@Bean
public RestHighLevelClient createInstance() throws Exception {
restHighLevelClient = new RestHighLevelClient(RestClient.builder(
new HttpHost(clusterNodes,clusterPort,"http")
));
return restHighLevelClient;
}
}
Step 4-5. EsKafkaService.class 작성 => Elasticsearch Data Save 기능 클래스
/**
* @author skysoo
* @version 1.0.0
* @since 2019-11-12 오후 3:35
**/
@Slf4j
@Service
public class EsKafkaService {
@Value("${elasticsearch.host}")
private String clusterNodes;
@Value("${elasticsearch.port}")
private Integer clusterPort;
@Value(value = "${elasticsearch.index.name}")
String TYPE;
@Value(value = "${elasticsearch.index.type}")
String INDEX;
private final ObjectMapper objectMapper;
/**
* @author skysoo
* Date : 2019-11-11
* EsKafkaService와 EsConfiguration 클래스간의 순환참조 오류 발생시 setter로 의존성을 주입받아라.
**/
private final RestHighLevelClient restHighLevelClient;
public EsKafkaService(RestHighLevelClient restHighLevelClient, ObjectMapper objectMapper) {
this.restHighLevelClient = restHighLevelClient;
this.objectMapper = objectMapper;
}
public void bulk(List<Map<String,String>> listData) throws Exception {
Stream<IndexRequest> indexRequestStream = listData.stream().map(vibrationData -> new IndexRequest()
.index(INDEX)
.type(TYPE)
.source(objectMapper.convertValue(vibrationData,Map.class)));
IndexRequest[] indexRequests = indexRequestStream.toArray(IndexRequest[]::new);
// bulkRequest 사용시 connection 한번으로 많은 데이터를 Insert 할 수 있다.
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(indexRequests);
log.info(">>> bulk count = {}",indexRequests.length);
restHighLevelClient.bulk(bulkRequest,RequestOptions.DEFAULT);
log.info(">>> Successful data save with ES<<<");
}
}
Step 5. Application Run
여기까지 하면 필요한 파일은 다 작성한 것이고 기동을 하면 아래와 같이 8080 포트를 사용하는 Application이 뜰 것이다.
Step 6. Kafka Producer 데이터 적재
=> 나는 파이어폭스 플러그인 중에 RESTED라는 플러그인을 사용해서 REST 명령을 Application으로 보냈다.
Step 7. Kafka Consumer 데이터 꺼낸 후 ES에 적재
Step 8. ES에 적재된 데이터 확인
# curl로 ES에 저장된 kafka_test index의 내용 검색할 수도 있다.
curl -X GET "<ES Master 서버 IP>:9200/kafka_test/_search
끝
'Message Broker > Kafka' 카테고리의 다른 글
# Kafka - 2 # Kafka Multi Cluster 구성 (2) | 2019.11.11 |
---|---|
# Kafka - 1 # Kafka 개념 (0) | 2019.11.11 |