Lambdaを使って S3 Select の結果をS3に保存する

こんにちは、コネクトムのコジマです。 コネクトムでは、一部のデータの前処理にS3 SELECTを使ってます。 そのなかで調べたことを少しまとめます。

S3 SELECT API について

(2019-03時点で) 今のところ S3 Select 関連のAPISELECT Object Content だけです。

残念ながら結果を直接S3に出力するような、気の利いた機能はありません。

上記の SELECT Object Content ですが、↑のリンク先のとおり結果をストリームで取得するようなAPIになっています。 このストリームは次の5種類のメッセージが流れてきます。

  • Records message
    • SELECT 結果のレコードを含むメッセージ
  • Continuation message
    • KeepAliveパケット的なもの
  • Progress message
    • 処理の進捗状況を伝えるもの。ファイルをスキャンしたバイト数とかが入ってる。
  • Stats message
    • 処理の最後にトータルでスキャンしたバイト数とかを伝えるもの 。メッセージの内容はProgress messageと同じ
  • End message
    • 処理が完了したことを示すメッセージ

JavaScript から 利用する

AWS-SDK for JavaScript には次のAPIが用意されています。

JavaScript AWS-SDK

このメソッド selectObjectContent は 5種類のメッセージを、そのまんまイベントとして受け取るような作りになってます。

とりあえずファイルとして保存するだけであれば、Records だけを処理すればよいです

S3 Selectの結果をS3に保存するサンプル

AWS Lambdaで S3 SELECT の結果をS3にそのまま保存するサンプルを作りました。

S3の保存の処理では、 s3-upload-stream を使ってます。

import { S3 } from "aws-sdk";
import { default as s3Stream } from "s3-upload-stream";
import { Handler, Context, Callback } from "aws-lambda";
import { StreamingEventStream } from "aws-sdk/lib/event-stream/event-stream";
import { ProgressEvent } from "aws-sdk/clients/s3";

const PART_SIZE: number = 16 * 1024 * 1024;

interface S3SelectEvent { Records?: S3.RecordsEvent; Stats?: S3.StatsEvent; Progress?: ProgressEvent; Cont?: S3.ContinuationEvent; End?: S3.EndEvent; }

export const handler: Handler = (event: any, context: Context, callback: Callback) => {
    const sql = event.sql
    const bucket = event.bucket;
    const key = event.key;
    const putBucket = event.putBucket;
    const putKey = event.putKey;
    console.log(sql, bucket, key, putBucket, putKey);

    const s3 = new S3();
    const uploadParams = { Bucket: putBucket, Key: putKey };
    const upload = s3Stream(s3).upload(uploadParams);
    upload.maxPartSize(PART_SIZE);

    const params = selectObjectContentParams(bucket, key, sql);
    s3.selectObjectContent(params, (err, data) => {
        if (err) { throw err; }
        const stream = data.Payload! as StreamingEventStream<S3SelectEvent>;
        stream.on("data", (event) => {
            if (event.Records) { upload.write(event.Records.Payload); }
        });
        stream.on("end", () => { upload.end(); });
        stream.on("error", (err) => { throw err; });
    });
    upload.on("uploaded", () => { console.log("end"); callback(null, "End"); });
    upload.on("error", (err) => { console.log(err); callback(err); });
}
// パラメータ作成
function selectObjectContentParams(bucketName: string, objectKey: string, sql: string): S3.SelectObjectContentRequest {
    return {
        Bucket: bucketName,
        Key: objectKey,
        ExpressionType: "SQL",
        Expression: sql,
        InputSerialization: {
            CSV: {
                FileHeaderInfo: "USE",
            },
        },
        OutputSerialization: {
            CSV: {},
        },
    };
}

次のようなJsonをLambdaの引数で渡すと、s3://putBucket/putKey に処理結果が保存されます。

{
    "sql": "SELECT * FROM s3Object",
    "bucket": "<処理したいファイルのあるバケット>",
    "key": "<処理したいファイルパス>",
    "putBucket": "<保存先のバケット>",
    "putKey": "<保存先のファイルパス>"
}

以上です。