Configuration for the Image Resize Request Producer

The Image Resize Request Producer is responsible for listing images that need to be resized and submitting those to the Apache Kafka topic. The following is the configuration for the producer:

@SpringBootApplication
public class SpringBootAsyncProducerApplication {

...

@Bean
public ReplyingKafkaTemplate<String, String, String> kafkaTemplate(
ProducerFactory<String, String> pf,
KafkaMessageListenerContainer<String, String> replyContainer) {
return new ReplyingKafkaTemplate<>(pf, replyContainer);
}

@Bean
public KafkaMessageListenerContainer<String, String> replyContainer(
ConsumerFactory<String, String> cf) {
ContainerProperties containerProperties = new ContainerProperties("asyncReplies");
containerProperties.setGroupId("async");
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}

@Bean
public NewTopic asyncRequests() {
return new NewTopic("asyncRequests", 10, (short) 2);
}

@Bean
public NewTopic asyncReplies() {
return new NewTopic("asyncReplies", 10, (short) 2);
}

}

In the preceding configuration, two Apache Kafka topics by the names asyncRequests and asyncReplies are created as Spring Beans with a number of replications 10 and replication factor of 2. The asyncRequests topic is responsible for sending the message to the consumer and the asyncReplies topic is used to get a response back from the consumer as an acknowledgment.

The kafkaTemplate Spring Bean is created with an instance of ReplyingKafkaTemplate to enable a request-response style of communication.

The replyContainer Spring Bean is created to configure the reply received from the asyncReplies topic.

Furthermore, the following ScheduledImageResizeRequestSubmitter is used to list images and submit those to the Apache Kafka topic to be consumed and processed:

@Component
public class ScheduledImageResizeRequestSubmitter {

private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledImageResizeRequestSubmitter.class);

private final ReplyingKafkaTemplate<String, String, String> template;
private final ObjectMapper objectMapper;
private final String imagesDirectory;

public ScheduledImageResizeRequestSubmitter(ReplyingKafkaTemplate<String, String, String> template, ObjectMapper objectMapper, @Value("${images.input.directory}") String imagesInputDirectory) {
this.template = template;
this.objectMapper = objectMapper;
this.imagesDirectory = imagesInputDirectory;
}

The preceding constructor injects and initializes ReplyingKafkaTemplate, ObjectMapper, and the image's input directory path.

The following code uses two reactor Flux objects:

public void scheduleTaskWithCronExpression() {
Flux.just(new File(imagesDirectory).listFiles()).filter(File::isFile).subscribe(
f -> {
Flux.just(new Dimension(800, 600), new Dimension(180, 180), new Dimension(1200, 630)).subscribe(d -> {
try {
ImageResizeRequest imageResizeRequest = new ImageResizeRequest((int) d.getWidth(), (int) d.getHeight(), f.getAbsolutePath());
ProducerRecord<String, String> record = new ProducerRecord<>("asyncRequests", objectMapper.writeValueAsString(imageResizeRequest));
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "asyncReplies".getBytes()));
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
ConsumerRecord<String, String> consumerRecord = replyFuture.get();
} catch (Exception e) {
LOGGER.error("Error while sending message", e);
}
},
e -> LOGGER.error("Error while running lambda"),
() -> f.renameTo(new File(f.getParent() + "/Done", f.getName())));
}
);
}

}

The initial one is to list files that are not directories inside of the directory specified by the variable imagesDirectory. The value for this variable is configurable and loaded from the application.properties file, as follows:

images.input.directory=C:\Users\Images

Then, it will create another Flux object to send dimensions so that image resizing can take place. So, it will be one image converted into multiple dimensions. For this scenario, the 800 x 600, 180 x 180, and 1,200 x 630 dimensions are used.

It will send ImageResizeRequest serialized into a JSON so that the consumers can deserialize and use it. ImageResizeRequest is shown as follows:

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ImageResizeRequest {

private Integer width;

private Integer height;

private String inputFile;

}

Finally, the ImageResizeRequest JSON will be sent to the asyncRequests topic with the REPLY_TOPIC header set to asyncReplies and sent to the Apache Kafka using the ReplyingKafkaTemplate bean.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.17.164.34