In this section, we will create a Lambda function to process Kinesis events.
For a more detailed coverage of this topic, refer to the blog by Sunil Dalal at: https://www.polyglotdeveloper.com/lambda/2017-07-05-Using-Lambda-as-Kinesis-events-processor/. Refer to Creating a .jar Deployment Package Using Maven and Eclipse IDE (Java) for more details: https://docs.aws.amazon.com/lambda/latest/dg/java-create-jar-pkg-maven-and-eclipse.html.
- Create a maven project. Create an example package and key in the following Java code in a class (ProcessKinesisEvents):
package example;
import java.io.IOException;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord;
public class ProcessKinesisEvents {
public String handleRequest(KinesisEvent event) throws IOException {
System.out.println("Record Size - " + event.getRecords().size());
for(KinesisEventRecord rec : event.getRecords()) {
System.out.println(new String(rec.getKinesis().getSequenceNumber()));
System.out.println(new String(rec.getKinesis().getData().array()));
}
return "success";
}
}
- The pom.xml file is listed as follows for reference:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>doc-examples</groupId>
<artifactId>lambda-java-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>lambda-java-example</name>
<description>AWS Lambda Java Example</description>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
</plugin>
</plugins>
</build>
</project>
- Select Lambda from the IAM console:
- Search for AWSLambdaKinesisExecutionRole and attach the permissions policy to the AWS Lambda role:
- Use the create-function command as shown to create the Lambda function:
aws lambda create-function
> --region us-west-2
> --function-name ProcessKinesisEvents
> --zip-file fileb:///Users/aurobindosarkar/Downloads/IWebApps/workspace/lambda-java-example/target/lambda-java-example-0.0.1-SNAPSHOT.jar
> --role arn:aws:iam::450394462648:role/lambda-kinesis-execution-role
> --handler example.ProcessKinesisEvents::handleRequest
> --runtime java8
{
"TracingConfig": {
"Mode": "PassThrough"
},
"CodeSha256": "jQQ9jo18W57ngH7HFDDv4S0DF9EYNFkYs2NoUQBFz1Q=",
"FunctionName": "ProcessKinesisEvents",
"CodeSize": 7763705,
"MemorySize": 128,
"FunctionArn": "arn:aws:lambda:us-west-2:450394462648:function:ProcessKinesisEvents",
"Version": "$LATEST",
"Role": "arn:aws:iam::450394462648:role/lambda-kinesis-execution-role",
"Timeout": 3,
"LastModified": "2018-01-28T18:52:25.963+0000",
"Handler": "example.ProcessKinesisEvents::handleRequest",
"Runtime": "java8",
"Description": ""
}
The newly created function will be listed in the Lambda Functions console:
- Create an input file (input.txt) with the following records to test the Lambda function:
{
"Records": [
{
"kinesis": {
"partitionKey": "partitionKey-3",
"kinesisSchemaVersion": "1.0",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=",
"sequenceNumber": "49545115243490985018280067714973144582180062593244200961"
},
"eventSource": "aws:kinesis",
"eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",
"invokeIdentityArn": "arn:aws:iam::account-id:role/testLEBRole",
"eventVersion": "1.0",
"eventName": "aws:kinesis:record",
"eventSourceARN": "arn:aws:kinesis:us-west-2:35667example:stream/examplestream",
"awsRegion": "us-west-2"
}
]
}
- Invoke the Lambda function. Specify the invocation-type as Event, function name as ProcessKinesisEvents, region, input, and output files as shown:
Aurobindos-MacBook-Pro-2:Downloads aurobindosarkar$ aws lambda invoke
> --invocation-type Event
> --function-name ProcessKinesisEvents
> --region us-west-2
> --payload file:///Users/aurobindosarkar/Downloads/input.txt
> outputfile.txt
- For a synchronous response, specify invocation-type as RequestResponse in the preceding command.
- Add an event source in AWS Lambda so that your Lambda function can start polling the Amazon Kinesis stream:
Aurobindos-MacBook-Pro-2:Downloads aurobindosarkar$ aws lambda create-event-source-mapping
> --region us-west-2
> --function-name ProcessKinesisEvents
> --event-source arn:aws:kinesis:us-west-2:450394462648:stream/KinesisTestStream
> --batch-size 100
> --starting-position TRIM_HORIZON
- Using the following AWS CLI command, add event records to your Amazon Kinesis stream. You can run the same command more than once to add multiple records to the stream:
Aurobindos-MacBook-Pro-2:Downloads aurobindosarkar$ aws kinesis put-record
> --stream-name KinesisTestStream
> --data “Test Record 1”
> --partition-key shardId-000000000000
> --region us-west-2
- Validate that the Lambda function and process the records by going to CloudWatch logs:
Aurobindos-MacBook-Pro-2:Downloads aurobindosarkar$ aws logs describe-log-streams --log-group-name '/aws/lambda/ProcessKinesisEvents' --region us-west-2 --order-by LastEventTime --descending