Step 1 - Creating the Lambda project (Java)

We will create a Lambda that gets triggered from AWS CLI using aws lambda invoke command  and send messages to an SQS queue as a batch. 

In this section, I will be discussing only the core application logic, and will not be discussing supporting code, such as imports, error handling, and Javadoc comments; however, the complete working code is provided in this book along with the code files. 

Let's start by defining the dependency for AWS Kinesis SDK for Java in the POM file, as shown in the following code:

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>${aws.sdk.version}</version>
</dependency>

The POM file also has dependencies for aws-lambda-java-core. The aws.sdk.version property is defined, along with other properties in the parent project, POM serverless-cookbook-parent-aws-java.

We can create the Java Lambda project with the following package structure:

The Request.java class will correspond to our input JSON, as shown in the following code:

@Data
public class Request {
private String streamName;
private String partitionKey;
private String payload;
private int count;
private int batchSize;
}

Response.java class correspond to the output JSON from Lambda and will contain a field to send the response back to the invoker, as shown in the following code:

@Data
@AllArgsConstructor
public class Response {
private String message;
private int count;
}

The KinesisService.java is the interface for our Service class. This is not a requirement; you can directly use the implementation class or even embed all logic within the Lambda handler class itself, as shown in the following code:

public interface KinesisService {
Response addRecords(Request request, LambdaLogger logger);
}

The KinesisServiceImpl.java class is the actual service implementation.

The following steps show how we can use it: 

  1. We first define and initialize the Kinesis client and a list of PutRecordsRequestEntry, as shown in the following code:
private final AmazonKinesis kinesisClient;
private final List<PutRecordsRequestEntry> kinesisBatch;

public KinesisServiceImpl(final AmazonKinesis kinesisClient) {
this.kinesisClient = kinesisClient;
this.kinesisBatch = new ArrayList<>();
}
  1. Check the stream status at the start (optional) using the following code:
public final Response addRecords(final Request request, final LambdaLogger logger) {

this.documentAddedCount = request.getCount();

DescribeStreamResult result = this.kinesisClient.describeStream(request.getStreamName());
logger.log("Stream Status: " + result.getStreamDescription().getStreamStatus() + ". ");
  1. Put the records into the stream in batches using the following code:
for (int i = 1; i <= request.getCount(); i++) {

payload = request.getPayload() + i;

this.kinesisBatch.add(new PutRecordsRequestEntry()
.withPartitionKey(request.getPartitionKey())
.withData(ByteBuffer.wrap(payload.getBytes())));

if (this.kinesisBatch.size() >= request.getBatchSize()) {

try {
logger.log("Flushing records to Stream...");
flushBatch(request.getStreamName(), logger);
} catch (Exception e) {
logger.log("Exception occurred: " + e);
this.isError = false;
} finally {
this.kinesisBatch.clear();
}
}

}
  1. The flushBatch() method actually writes to the stream, as shown in the following code:
private void flushBatch(final String streamName, final LambdaLogger logger) {
final PutRecordsResult result = this.kinesisClient.putRecords(new PutRecordsRequest()
.withStreamName(streamName)
.withRecords(this.kinesisBatch));
result.getRecords().forEach(r -> {
if (!(StringUtils.hasValue(r.getErrorCode()))) {
String successMessage = "Successfully processed record with sequence number: " + r.getSequenceNumber()
+ ", shard id: " + r.getShardId();
logger.log(successMessage);
} else {
this.documentAddedCount--;
String errorMessage = "Did not process record with sequence number: " + r.getSequenceNumber()
+ ", error code: " + r.getErrorCode()
+ ", error message: " + r.getErrorMessage();
logger.log(errorMessage);
this.isError = true;
}
});
}

You can also implement retry logic for failed records. Check the code files for additional suggestions. 

  1. Finally, return the Response object from the addRecords method using the following code:
if (this.isError) {
return new Response(ERROR_MESSAGE, documentAddedCount);
} else {
return new Response(SUCCESS_MESSAGE, documentAddedCount);
}

The LambdaKinesisSdkWriteHandler.java is our Lambda handler class and has the following code:

public final class LambdaKinesisSdkWriteHandler implements RequestHandler<Request, Response> {
private final AmazonKinesis kinesisClient;

public LambdaKinesisSdkWriteHandler() {
this.kinesisClient = AmazonKinesisClientBuilder.standard()
.withRegion(System.getenv("AWS_REGION"))
.build();
}

public Response handleRequest(final Request request, final Context context) {
context.getLogger().log("Received Request: " + request);

final KinesisService kinesisService = new KinesisServiceImpl(this.kinesisClient);
return kinesisService.addRecords(request, context.getLogger());
}
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.212.212