We will create a Lambda that gets triggered from AWS CLI using aws lambda invoke command and send messages to an SQS queue as a batch.
Let's start by defining the dependency for AWS Kinesis SDK for Java in the POM file, as shown in the following code:
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
The POM file also has dependencies for aws-lambda-java-core. The aws.sdk.version property is defined, along with other properties in the parent project, POM serverless-cookbook-parent-aws-java.
We can create the Java Lambda project with the following package structure:
The Request.java class will correspond to our input JSON, as shown in the following code:
@Data
public class Request {
private String streamName;
private String partitionKey;
private String payload;
private int count;
private int batchSize;
}
Response.java class correspond to the output JSON from Lambda and will contain a field to send the response back to the invoker, as shown in the following code:
@Data
@AllArgsConstructor
public class Response {
private String message;
private int count;
}
The KinesisService.java is the interface for our Service class. This is not a requirement; you can directly use the implementation class or even embed all logic within the Lambda handler class itself, as shown in the following code:
public interface KinesisService {
Response addRecords(Request request, LambdaLogger logger);
}
The KinesisServiceImpl.java class is the actual service implementation.
The following steps show how we can use it:
- We first define and initialize the Kinesis client and a list of PutRecordsRequestEntry, as shown in the following code:
private final AmazonKinesis kinesisClient;
private final List<PutRecordsRequestEntry> kinesisBatch;
public KinesisServiceImpl(final AmazonKinesis kinesisClient) {
this.kinesisClient = kinesisClient;
this.kinesisBatch = new ArrayList<>();
}
- Check the stream status at the start (optional) using the following code:
public final Response addRecords(final Request request, final LambdaLogger logger) {
this.documentAddedCount = request.getCount();
DescribeStreamResult result = this.kinesisClient.describeStream(request.getStreamName());
logger.log("Stream Status: " + result.getStreamDescription().getStreamStatus() + ". ");
- Put the records into the stream in batches using the following code:
for (int i = 1; i <= request.getCount(); i++) {
payload = request.getPayload() + i;
this.kinesisBatch.add(new PutRecordsRequestEntry()
.withPartitionKey(request.getPartitionKey())
.withData(ByteBuffer.wrap(payload.getBytes())));
if (this.kinesisBatch.size() >= request.getBatchSize()) {
try {
logger.log("Flushing records to Stream...");
flushBatch(request.getStreamName(), logger);
} catch (Exception e) {
logger.log("Exception occurred: " + e);
this.isError = false;
} finally {
this.kinesisBatch.clear();
}
}
}
- The flushBatch() method actually writes to the stream, as shown in the following code:
private void flushBatch(final String streamName, final LambdaLogger logger) {
final PutRecordsResult result = this.kinesisClient.putRecords(new PutRecordsRequest()
.withStreamName(streamName)
.withRecords(this.kinesisBatch));
result.getRecords().forEach(r -> {
if (!(StringUtils.hasValue(r.getErrorCode()))) {
String successMessage = "Successfully processed record with sequence number: " + r.getSequenceNumber()
+ ", shard id: " + r.getShardId();
logger.log(successMessage);
} else {
this.documentAddedCount--;
String errorMessage = "Did not process record with sequence number: " + r.getSequenceNumber()
+ ", error code: " + r.getErrorCode()
+ ", error message: " + r.getErrorMessage();
logger.log(errorMessage);
this.isError = true;
}
});
}
You can also implement retry logic for failed records. Check the code files for additional suggestions.
- Finally, return the Response object from the addRecords method using the following code:
if (this.isError) {
return new Response(ERROR_MESSAGE, documentAddedCount);
} else {
return new Response(SUCCESS_MESSAGE, documentAddedCount);
}
The LambdaKinesisSdkWriteHandler.java is our Lambda handler class and has the following code:
public final class LambdaKinesisSdkWriteHandler implements RequestHandler<Request, Response> {
private final AmazonKinesis kinesisClient;
public LambdaKinesisSdkWriteHandler() {
this.kinesisClient = AmazonKinesisClientBuilder.standard()
.withRegion(System.getenv("AWS_REGION"))
.build();
}
public Response handleRequest(final Request request, final Context context) {
context.getLogger().log("Received Request: " + request);
final KinesisService kinesisService = new KinesisServiceImpl(this.kinesisClient);
return kinesisService.addRecords(request, context.getLogger());
}
}