Aws Lambda Go 1.x, Kinesis, CloudSearch

  • Tutorial
In a previous article, I described how to create a simple lambda on Golang, which takes a simple object of two fields as an input and gives the same simple object as an output. Now let's complicate the task a little by connecting Kinesis as a data source to the lambda, and we will transfer the result of processing Kinesis records to CloudSearch. There will be no special logic in the lambda for simplification: just accept requests from Kinesis, pledge them to CloudWatch, convert and send to CloudSearch.

image


The Kinesis event that we expect to receive in the function is as follows:

{
    "Records": [
        {
            "awsRegion": "us-east-1",
            "eventID": "",
            "eventName": "aws:kinesis:record",
            "eventSource": "aws:kinesis",
            "eventSourceARN": "arn:aws:kinesis:us-east-1::stream/",
            "eventVersion": "1.0",
            "invokeIdentityArn": "arn:aws:iam:::role/",
            "kinesis": {
                "approximateArrivalTimestamp": ,
                "data": ,
                "partitionKey": "",
                "sequenceNumber": "",
                "kinesisSchemaVersion": "1.0"
            }
        }
    ]
}


Here we are interested in the data field. Lambda function code that receives events from Kinesis and logged the data field data is described below: (Code taken here ):

package main
import (
	"context"
	"fmt"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
    for _, record := range kinesisEvent.Records {
        kinesisRecord := record.Kinesis
        dataBytes := kinesisRecord.Data
        dataText := string(dataBytes)
        fmt.Printf("%s Data = %s \n", record.EventName, dataText) 
    }
    return nil
}
func main() {
	lambda.Start(handler)
}


Now you need to add the code to record the changed data in CloudSearch.
Here we will form the received data from Kinesis into our presentation for the search domain (CloudSearch).

Data from Kinesis comes in base64 encoded form in the data field . After decoding, the data contains the following fields:


type KinesisEventData struct {
	FilePath string `json:"filePath"`
	Id       int    `json:"id"`
}


In CloudSearch, we send the following data:


type CloudSearchDocument struct {
	Directory     string `json:"dir"`
	FileName      string `json:"name"`
	FileExtension string `json:"ext"`
}


In this case, we will save the id field in the document identifier. You can read more about preparing data for upload to CloudSearch here . In short, in CloudSearch we send json of the following form:

[
 {"type": "add",
  "id":   "12345",
  "fields": {
    "dir": "С:",
    "name": "file.txt",
    "ext": "txt"
  }
 }
]

where type is a request type that takes two values: add or delete; id - the identifier of the document, and in our case, the value stored in the object from kinesis in the Id field ; fields - name-value pairs that we store in the search domain, in our case, an object of type CloudSearchDocument.

The code below converts data from the Records collection of an object that came from Kinesis into a collection of data ready for upload to CloudSearch:


        var amasonDocumentsBatch []AmazonDocumentUploadRequest
	//Preparing data
	for _, record := range kinesisEvent.Records {
		kinesisRecord := record.Kinesis
		dataBytes := kinesisRecord.Data
		fmt.Printf("%s Data = %s \n", record.EventName, string(dataBytes))
		//Deserialize data from kinesis to KinesisEventData
		var eventData KinesisEventData
		err := json.Unmarshal(dataBytes, &eventData)
		if err != nil {
			return failed(), err
		}
		//Convert data to CloudSearch format
		document := ConvertToCloudSearchDocument(eventData)
		request := CreateAmazonDocumentUploadRequest(eventData.Id, document)
		amasonDocumentsBatch = append(amasonDocumentsBatch, request)
	}


The following code connects to the search domain to load previously prepared data into it:


if len(amasonDocumentsBatch) > 0 {
		fmt.Print("Connecting to cloudsearch...\n")
		svc := cloudsearchdomain.New(session.New(&aws.Config{
			Region:     aws.String(os.Getenv("SearchRegion")),
			Endpoint:   aws.String(os.Getenv("SearchEndpoint")),
			MaxRetries: aws.Int(6),
		}))
		fmt.Print("Creating request...\n")
		batch, err := json.Marshal(amasonDocumentsBatch)
		if err != nil {
			return failed(), err
		}
		fmt.Printf("Search document = %s \n", batch)
		params := &cloudsearchdomain.UploadDocumentsInput{
			ContentType: aws.String("application/json"),
			Documents:   strings.NewReader(string(batch)),
		}
		fmt.Print("Starting to upload...\n")
		req, resp := svc.UploadDocumentsRequest(params)
		fmt.Print("Send request...\n")
		err = req.Send()
		if err != nil {
			return failed(), err
		}
		fmt.Println(resp)
	}


In order to compile the code, you need to load the aws -sdk-go and aws-lambda-go libraries :

go get -u github.com/aws/aws-lambda-go/cmd/build-lambda-zip
go get -d github.com/aws/aws-sdk-go/

How to assemble and deploy a lambda is described in the previous article, here you only need to add environment variables through the Lambda console and prepare new test data:


os.Getenv("SearchRegion")
os.Getenv("SearchEndpoint")


Full code is available here .

But first, open the CloudSearch console and create a search domain. For the domain I will choose the most minimal instance and the number of replications = 1. Next, you need to create the fields dir , name , ext . For these fields, I will choose the type string, but some of them may have a different type, for example, a literal field. But it all depends on how you manipulate these fields. For more information, check out the Amazon documentation for more information.

Create a search domain ( Create a new Domain button ), fill in the name and select the type of instance:

image

Create the fields:

image

The domain is created for about 10 minutes, after it becomes active, you will have the Url of the search domain that you need to enter in the environment variables in the Lambda console, do not forget to indicate the protocol before Url as in the image below, and also indicate the region in which it is located search domain:

image

Now do not forget to grant rights to the lambda through the IAM console to work with Kinesis, CloudWatch and CloudSearch. Kinesis can be connected via the Lambda console: to do this, select it in the Add triggers block and fill in the fields, indicating the stream existing in the region, the number of records in the batch and the position in the stream from which the reading will start. We can test the operation of the lambda without connecting it to the kinesis, for this we need to create a test and add json of the following form to it:

{
  "Records": [
    {
      "awsRegion": "us-east-1",
      "eventID": "shardId-000000000001:1",
      "eventName": "aws:kinesis:record",
      "eventSource": "aws:kinesis",
      "eventSourceARN": "arn:aws:kinesis:us-east-1:xxx",
      "eventVersion": "1.0",
      "invokeIdentityArn": "arn:aws:iam::xxx",
      "kinesis": {
        "approximateArrivalTimestamp": 1522222222.06,
        "data": "eyJpZCI6IDEyMzQ1LCJmaWxlUGF0aCI6ICJDOlxcZmlsZS50eHQifQ==",
        "partitionKey": "key",
        "sequenceNumber": "1",
        "kinesisSchemaVersion": "1.0"
      }
    },
    {
      "awsRegion": "us-east-1",
      "eventID": "shardId-000000000001:1",
      "eventName": "aws:kinesis:record",
      "eventSource": "aws:kinesis",
      "eventSourceARN": "arn:aws:kinesis:us-east-1:xxx",
      "eventVersion": "1.0",
      "invokeIdentityArn": "arn:aws:iam::xxx",
      "kinesis": {
        "approximateArrivalTimestamp": 1522222222.06,
        "data": "eyJpZCI6IDEyMzQ2LCJmaWxlUGF0aCI6ICJDOlxcZm9sZGVyXFxmaWxlLnR4dCJ9",
        "partitionKey": "key",
        "sequenceNumber": "2",
        "kinesisSchemaVersion": "1.0"
      }
    }
  ]
}


You can use the link to generate other values ​​of the data field . The result of the work can also be viewed in the search domain: Additional materials: Document search code in the search domain on Go. In the next article, we will consider a CloudFormation script for automatically creating and connecting Lambda, Kinesis, CloudSearch.

image



image






Also popular now: