Lambdaを使って S3 Select の結果をS3に保存する

こんにちは、コネクトムのコジマです。 コネクトムでは、一部のデータの前処理にS3 SELECTを使ってます。 そのなかで調べたことを少しまとめます。

S3 SELECT API について

(2019-03時点で) 今のところ S3 Select 関連のAPISELECT Object Content だけです。

残念ながら結果を直接S3に出力するような、気の利いた機能はありません。

上記の SELECT Object Content ですが、↑のリンク先のとおり結果をストリームで取得するようなAPIになっています。 このストリームは次の5種類のメッセージが流れてきます。

  • Records message
    • SELECT 結果のレコードを含むメッセージ
  • Continuation message
    • KeepAliveパケット的なもの
  • Progress message
    • 処理の進捗状況を伝えるもの。ファイルをスキャンしたバイト数とかが入ってる。
  • Stats message
    • 処理の最後にトータルでスキャンしたバイト数とかを伝えるもの 。メッセージの内容はProgress messageと同じ
  • End message
    • 処理が完了したことを示すメッセージ

JavaScript から 利用する

AWS-SDK for JavaScript には次のAPIが用意されています。

JavaScript AWS-SDK

このメソッド selectObjectContent は 5種類のメッセージを、そのまんまイベントとして受け取るような作りになってます。

とりあえずファイルとして保存するだけであれば、Records だけを処理すればよいです

S3 Selectの結果をS3に保存するサンプル

AWS Lambdaで S3 SELECT の結果をS3にそのまま保存するサンプルを作りました。

S3の保存の処理では、 s3-upload-stream を使ってます。

import { S3 } from "aws-sdk";
import { default as s3Stream } from "s3-upload-stream";
import { Handler, Context, Callback } from "aws-lambda";
import { StreamingEventStream } from "aws-sdk/lib/event-stream/event-stream";
import { ProgressEvent } from "aws-sdk/clients/s3";

const PART_SIZE: number = 16 * 1024 * 1024;

interface S3SelectEvent { Records?: S3.RecordsEvent; Stats?: S3.StatsEvent; Progress?: ProgressEvent; Cont?: S3.ContinuationEvent; End?: S3.EndEvent; }

export const handler: Handler = (event: any, context: Context, callback: Callback) => {
    const sql = event.sql
    const bucket = event.bucket;
    const key = event.key;
    const putBucket = event.putBucket;
    const putKey = event.putKey;
    console.log(sql, bucket, key, putBucket, putKey);

    const s3 = new S3();
    const uploadParams = { Bucket: putBucket, Key: putKey };
    const upload = s3Stream(s3).upload(uploadParams);
    upload.maxPartSize(PART_SIZE);

    const params = selectObjectContentParams(bucket, key, sql);
    s3.selectObjectContent(params, (err, data) => {
        if (err) { throw err; }
        const stream = data.Payload! as StreamingEventStream<S3SelectEvent>;
        stream.on("data", (event) => {
            if (event.Records) { upload.write(event.Records.Payload); }
        });
        stream.on("end", () => { upload.end(); });
        stream.on("error", (err) => { throw err; });
    });
    upload.on("uploaded", () => { console.log("end"); callback(null, "End"); });
    upload.on("error", (err) => { console.log(err); callback(err); });
}
// パラメータ作成
function selectObjectContentParams(bucketName: string, objectKey: string, sql: string): S3.SelectObjectContentRequest {
    return {
        Bucket: bucketName,
        Key: objectKey,
        ExpressionType: "SQL",
        Expression: sql,
        InputSerialization: {
            CSV: {
                FileHeaderInfo: "USE",
            },
        },
        OutputSerialization: {
            CSV: {},
        },
    };
}

次のようなJsonをLambdaの引数で渡すと、s3://putBucket/putKey に処理結果が保存されます。

{
    "sql": "SELECT * FROM s3Object",
    "bucket": "<処理したいファイルのあるバケット>",
    "key": "<処理したいファイルパス>",
    "putBucket": "<保存先のバケット>",
    "putKey": "<保存先のファイルパス>"
}

以上です。

ReagentでLeafletを試す

こんにちは。analysisチームの河野です。
趣味的な記事が続いたので、今回は業務に関係したことを書きます。

弊社は位置情報を利用したWebサービスを提供しているので、地図ライブラリは避けては通れないものなのですが、私自身はこの分野にものすごく疎いので、個人的な勉強を兼ねて、Leafletを試してみました。業務ではおそらくGoogle Maps APIを使うので、直接的には役に立たないかと思いますが。

Leafletとは

公式ページ: https://leafletjs.com/
詳細は公式ページを見てください。 ざっくり言うと、オープンソースの地図ライブラリです。wikipediaによると、「Leafletを使うと、GISの知識のない開発者でも容易にタイルベースのWeb地図を表示できる。」とのことですが、まさにその通りでした。

今回作ったもの

リポジトリこれです。
Reactも試してみたかったし、Clojureも使いたかったので、Reagentを使いました。 ちなみに、弊社というかanalysisチームのフロントエンドはVueとTypeScriptを使用しております。

解説

ReactとReagentについてはReagentのチュートリアルを見れば雰囲気は掴めます。
私自身、Reactを触るのは初めてでしたが何となく雰囲気は伝わりました。
またReactではLeafletを扱いやすくするReact-Leafletというライブラリがあるので、これを使います。
React界隈はライブラリが色々あって便利そうです。

今回はReact-Leafletの公式ページのトップにある、単にマップを表示するだけのことをしました。

import React from 'react'
import { render } from 'react-dom'
import { Map, Marker, Popup, TileLayer } from 'react-leaflet'

const position = [51.505, -0.09]
const map = (
  <Map center={position} zoom={13}>
    <TileLayer
      url="https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png"
      attribution="&copy; <a href=&quot;http://osm.org/copyright&quot;>OpenStreetMap</a> contributors"
    />
    <Marker position={position}>
      <Popup>A pretty CSS3 popup.<br />Easily customizable.</Popup>
    </Marker>
  </Map>
)

render(map, document.getElementById('map-container'))

Reagentで書くとこんな感じになります
positionは弊社のお世話になっている東急番町ビルの緯度経度を指定しています。

;; src/leaflet_tutorial/core.cljs

(ns leaflet-tutorial.core
  (:require [reagent.core :as r]
            [react-leaflet :refer [Map TileLayer Marker Popup]]))

(def position [35.689236 139.735744])

;; -------------------------
;; Views
(defn leaflet-map
  []
  [:> Map {:center position :zoom 16}
   [:> TileLayer {:url "https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png"
                    :attribution "&copy; <a href=&quot;http://osm.org/copyright&quot;>OpenStreetMap</a> contributors"}]
   [:> Marker {:position position}
    [:> Popup "A pretty CSS3 popup.<br />Easily customizable." ]]])



;; -------------------------
;; Initialize app

(defn mount-root
  []
  (r/render [leaflet-map] (.getElementById js/document "app")))

(defn init!
  []
  (mount-root))

ただ、これだけでは地図が崩れるのでまだ手入れが必要です。
LeafletのスタイルをCDNから取得するようにします。

<!-- public/index.html -->

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-8">
    <meta content="width=device-width, initial-scale=1" name="viewport">
    <link href="/css/site.css" rel="stylesheet" type="text/css">
    <!--追加 ここから-->
    <link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/leaflet/1.4.0/leaflet.css" />
    <!--追加 ここまで-->
  </head>
  <body>
    <div id="app">
      <h3>ClojureScript has not been compiled!</h3>
      <p>please run <b>lein figwheel</b> in order to start the compiler</p>
    </div>
    <script src="/js/app.js" type="text/javascript"></script>
  </body>
</html>

CSSにも手を加え、Leafletの描画領域を指定します。
.leaflet-containerheight100%にすると、地図が表示されないので注意してください。

/* public/css/site.css */

body {
  margin: 0;
  padding: 0;
  height: 100%;
  width: 100%;
}

#app {
  width: inherit;
  height: inherit;
}

.leaflet-container {
  width: inherit;
  height: 600px;
  margin: 10px;
}

これでlein figwheelすると、下記のような地図が表示されます。
f:id:t-kono0912:20190329124935p:plain

まとめ

Leafletを使うと、本当に簡単に地図の表示ができます。
しかし地図表示はまだ序の口で、ヒートマップやコロプレスマップ等を表示できないと分析ツールとしては使えないので、今度はそのへんのことを試してみたいです。

コネクトムでは地図ライブラリを使用したアプリケーションを一緒に作るエンジニアを募集しています。
https://open.talentio.com/1/c/connectom/requisitions/detail/6956

朝起きる技術

こんにちは。analysisチームの河野です。
皆さん、朝は得意でしょうか?私は子供の頃から苦手です。休みの日は14:00ぐらいまで寝たりしてます。
そんな感じで朝が苦手なので、遅刻をたまにしてしまいます。

目覚まし時計は大学生の頃に買ったのですが、強く叩きすぎたのか、音が出なくなってしまいました。
携帯はさすがにそんな乱暴に扱ったりしないのですが、アラームが鳴っているのか鳴っていなのか、よく分からないことが多々あります。たまに設定を変えて、戻さずにそのままになってたりすることがあるので、簡単に設定を変更できてしまうのが良くないのだと思います。

なので、簡単に設定の変更ができなく、かつ、起きたい時間の30分前から5分おきで通知してくれる仕組みを作りました。

作ったもの

これです。
先日、LINE Notifyなるものを知ったので、それと、AWS Lambdaを組み合わせて作りました。
ServerlessのTemplate で aws-clojurescript-gradleaws-clojure-gradleなるものがあったので、Clojureで作りたかったのですが、gralde-clojurenreplへつなぐ方法が分からなかったので、仕事で使い慣れているTypescriptを使いました。

仕組みは単純で、AWS Lambda から LINE Notifyへ、8:30から8:59まで5分おきに通知するだけのシンプルなアプリです。
ひとまず自分で使えれば良いので、かなり手抜きです。
CloudWatchのcronではGMTしか使えないようなので、時間を設定するときは注意が必要です。

解説

src/handler/wake-up.tsからnotify関数を実行しているだけです。notify関数は、headerとbodyに値を入れて、LINE Notify APIを実行しているだけなので、特に難しいことはしていないです。 ちなみにLINE Notify APIcurlで実行すると、こんな感じです。

$ curl -X POST -H 'Authorization: Bearer <API TOKEN>' -F 'message=<YOUR MESSAGE>' https://notify-api.line.me/api/notify

src/handler/wake-up.tsnotify関数です。

// src/handler/wake-up.ts

import { ScheduledHandler, ScheduledEvent, Context, Callback } from "aws-lambda";
import "source-map-support/register";
import { notify } from "../lib/util";

export const handler: ScheduledHandler = async (event: ScheduledEvent, context: Context, cb: Callback) => {
    try {
        await notify("起きなさい");
    } catch (err) {
        const result = err;
        console.log("Error: ", result);
    }
    cb(null, "send!!");
}
// src/lib/util.ts

import axios from "axios";
import * as querystring from "querystring";

export async function notify(message: string) {
    const url = "https://notify-api.line.me/api/notify"
    const token = process.env.TOKEN;

    const headers = {
        "Content-Type": "application/x-www-form-urlencoded",
        "Authorization": `Bearer ${token}`,
    };

    const params = querystring.stringify({
        message
    });

    axios.post(url, params, { headers })
}

API TOKENはserverless.yml環境変数で指定しています。
と言っても、dev環境しかないので、特に意味はありませんが。要は外部から指定できるようにしたかっただけなので、こうしています。
使う場合は、<API TOKEN>の部分に自分のAPI TOKENを記載してください。

# serverless.yml

service:
  name: scheduled-notify

# Add the serverless-webpack plugin
plugins:
  - serverless-webpack

provider:
  name: aws
  stage: dev
  region: ap-northeast-1
  runtime: nodejs8.10
  timeout: 60
  environment:
    TOKEN: <API TOKEN>

functions:
  wake-up:
    handler: src/handler/wake-up.handler
    events:
      - schedule: cron(30-59/5 23 ? * 1-5 *)

まとめ

これで毎朝ちゃんと起きられるようになりたいです。

コネクトムではServerlessなアプリケーションを一緒に作るエンジニアを募集しています。
https://open.talentio.com/1/c/connectom/requisitions/detail/6956

AWS LambdaでClojureを使いたかった

はじめまして。analysisチームの河野です。ここ2年ぐらい趣味でLispを勉強しています。
最近はClojureでwebサーバみたいなものを作ったりして遊んでいます。

業務ではTypeScriptとAWS Lambdaでサーバレスなアプリケーションを作っていますが
AWS LambdaでもClojureを使いたい、Javaも使えるのだからClojureも使えるはずと思ったので、調べてみると、古い記事ですが、公式ブログで解説されていました。
https://aws.amazon.com/jp/blogs/compute/clojure/

ちょっと大変そう。。。

Nodeが実行できるのだから、ClojureScriptも使えるのでは?
調べてみると、Serverless Frameworkのプラグインがありました。
https://github.com/nervous-systems/serverless-cljs-plugin

ClojureScriptは使ったことはないのですが、とりあえずREADMEにしたがって進めてみます。

$ lein new serverless-cljs example
$ cd example
$ lein deps

依存関係の解決ができたらひとまずデプロイします。

$ serverless deploy

デプロイに成功したら、エンドポイントが表示されるの試してみます。 おそらく、echoというハンドラ名からして受け取った値をそのまま返すのだと思います。

$ curl -X POST -d '{"hello": "world"}' https://xxxx/dev/echo
-> {"hello": "world"}

bodyの値がそのまま返ってくる。jsonでなくても良いみたい。

$ curl -X POST -d "hello" https://xxxx/dev/echo
-> hello

ひとまずコードを見てみる。

(defgateway echo [event ctx]
  {:status  200
   :headers {:content-type (-> event :headers :content-type)}
   :body    (event :body)})

おそらく、defgatewayというのでハンドラを定義できるのでしょう。 中身はMapになっており、キーが、status, header, bodyなので、HTTPと対応しているように見えます。 よくわからないですが、bodyの値に何か別の値を入れれば、別の値を返せるっぽいので試してみます。

core.cljsにハンドラを追加します。

(defgateway hello [event ctx]
  {:status 200
   :headers {:content-type (-> event :headers :content-type)}
   :body "hello world"})

serverless.ymlにも設定を追加。 functionshelloを追加。

functions:
  echo:
    cljs: example.core/echo
    events:
      - http:
          path: echo
          method: post
  hello:
    cljs: example.core/hello
    events:
      - http:
          path: hello
          method: get

デプロイ。

$ serverless deploy

成功したらhelloのエンドポイントが表示されるので、ブラウザからアクセスしてみます。

f:id:t-kono0912:20190215162240p:plain
hello_wolrd

表示されてる。

せっかくなので、クエリストリングの値を返してみましょう。 こんな感じで送って

curl https://xxxx/dev/hello?name="your-name"

こんな感じで返したい。

{hello: "your-name"}

クエリストリングがなければ"world"と返す。

{hello: "world"}

helloハンドラを修正します。

(defgateway hello
  [event ctx]
  (let [name (get-in event [:query :name])]
    {:status 200
     :headers {:content-type "application/json"}
     :body (JSON/stringify (clj->js {:hello (or name "world")}))}))

serverless.ymlも修正。

  hello:
    cljs: example.core/hello
    events:
      - http:
          path: hello
          method: get
          request:
            parameters:
              querystrings:
                name: false

試してみる。

# パラメータなし
$ curl https://xxxx/dev/hello -v
-> {"hello": "world"}
# パラメータあり
$ curl https://xxxx/dev/hello?name="hoge" -v
-> {"hello": "hoge"}

できてる。 次回はパスパラメータを取れるようにしたいです。

コネクトムではエンジニアを募集しています。 https://open.talentio.com/1/c/connectom/requisitions/550

AWS S3のオブジェクトの ETag を MD5ハッシュにしたい

S3にアップロードされたファイルの同一性をチェックしたりするために、ハッシュ値を使いたく、 いろいろ調べたのでまとめます。

先に結論

ETagがMD5になる

ETagの値は、次の条件の場合 MD5ハッシュ が設定されます。

  • PUT、POST、またはCOPYで作成された、SSE-S3または平文で暗号化されたオブジェクト

ETagがMD5にならない

次の場合は MD5ハッシュにはなりません。

  • マルチパートアップロードでオブジェクトを作成した場合
  • 暗号化タイプが SSE-C または SSE-KMS の場合
  • 16MB以上のファイルを、AWSマネジメントコンソールで「コピー」->「貼り付け」した場合

マルチパートアップロードしたファイルの ETag にMD5を設定するには?

s3apiの CopyObject を実行するとよいです。

$ aws s3api copy-object --copy-source mybucket/myobject --bucket mybucket --key myobject --metadata-directive REPLACE --metadata a=b

ただし、単純な同一名での上書きはできず、次のいずれかの変更が必要となります。

(なので適当なa=bメタデータに設定してます。)

  • metadata
  • storage class
  • website redirect location
  • encryption attributes.

ちなみに上のコマンドをメタデータの変更をせずに実行すると...

$ aws s3api copy-object --copy-source mybucket/myobject --bucket mybucket --key myobject --metadata-directive REPLACE --metadata copied=true

An error occurred (InvalidRequest) when calling the CopyObject operation: This copy request is illegal because it is trying to copy an object to itself without changing the object's metadata, storage class, website redirect location or encryption attributes.

このようにエラーになります。

その他もろもろ

AWSマネジメントコンソール でのコピーの挙動

(2018-12 時点で) AWSマネジメントコンソールで「コピー」->「貼り付け」をした場合は、 CopyObject を実行せずに、16MBのパートに分割して内部でマルチパートアップロードしているようです。

16MBを超えるファイルは、コピー元のETagが MD5ハッシュ であっても、ETagの値がハイフン付きの値に変更されてしまいます。

aws-cli のファイルコピーについて

8MBを超えるファイルについては、8MBのパートに分割するようです。 つまり8MBを超えるファイルを s3 cp でアップロードしたらETagの値がハイフン付きの値になります。

これを避けるには、s3apiのPutObjectを使うとよいでしょう。

aws s3api put-object --bucket mybucket --key myobject --body myobject

マルチパートアップロード時のETagの値って??

公式な文書は見当たりませんが、次のように作成しているようです。

アップロードした各パートのMD5ハッシュを連結したもののMD5 + '-' + パート数

https://stackoverflow.com/questions/12186993/what-is-the-algorithm-to-compute-the-amazon-s3-etag-for-a-file-larger-than-5gb/19896823#19896823

参考

https://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html

Apache CarbonData を試す

今回はSparkからCarbonDataを作成して UPDATE と DELETE をしてみたいと思います。

CarbonDataの UPDATE と DELETE の動きは次のWikiにまとめられています。QuicStart を参考に試してみます。

Update and Delete Support

前準備

サンプルデータ作成

サンプルのCSVを作成します。

cd carbondata
cat > sample.csv << EOF
id,name,city,age
1,david,shenzhen,31
2,eason,shenzhen,27
3,jarry,wuhan,35
EOF

SparkShell起動

Downloadから apache-carbondata-1.4.0-bin-spark2.2.1-hadoop2.7.2.jar をダウンロードして、spark-shellを起動します。

$ spark-shell --jars apache-carbondata-1.4.0-bin-spark2.2.1-hadoop2.7.2.jar

CarbonSession作成

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._

// SparkのHiveMetastoreと被らないように ./catbon-meta 配下にmetastore_dbが作られるようにする
// データ類は carbon-warehouse ディレクトリに作成されるようにする
val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("carbon-warehouse", "carbon-meta")

TABLE 作成

DATABASE と TABLE を作成します。

carbon.sql("CREATE DATABASE IF NOT EXISTS mydb")
carbon.sql("CREATE TABLE IF NOT EXISTS mydb.test_table(id string, name string, city string, age Int) STORED BY 'carbondata'")

別のシェルでディレクトリの構成を確認します。

$ tree carbon-warehouse/
carbon-warehouse/
└── mydb
    └── test_table
        └── Metadata
            └── schema

空のディレクトリ carbon-warehouse/mydb/test_table がされました。

INSERTとDELETEを試す

サンプルCSVの LOAD

続いて最初に作ったサンプルデータをLOADします。

carbon.sql("LOAD DATA INPATH 'sample.csv' INTO TABLE mydb.test_table")
carbon.sql("SELECT * FROM mydb.test_table").show
+---+-----+--------+---+
| id| name|    city|age|
+---+-----+--------+---+
|  1|david|shenzhen| 31|
|  2|eason|shenzhen| 27|
|  3|jarry|   wuhan| 35|
+---+-----+--------+---+
$ tree carbon-warehouse/
carbon-warehouse/
└── mydb
    └── test_table
        ├── Fact
        │   └── Part0
        │       └── Segment_0
        │           ├── 0_batchno0-0-1532873451153.carbonindex
        │           └── part-0-0_batchno0-0-1532873451153.carbondata
        ├── LockFiles
        │   ├── Segment_0.lock
        │   └── tablestatus.lock
        └── Metadata
            ├── schema
            ├── segments
            │   └── 0_1532873451153.segment
            └── tablestatus

Segment_0が作成されました。INSERTの塊ごとにSegmentでまとめてファイルが作成されます。

INSERTの実施

さらにデータをINSERTしてみます。

carbon.sql("INSERT INTO  mydb.test_table VALUES(4, 'duke', 'menlopark', 28)")
carbon.sql("SELECT * FROM mydb.test_table").show
+---+-----+---------+---+
| id| name|     city|age|
+---+-----+---------+---+
|  1|david| shenzhen| 31|
|  2|eason| shenzhen| 27|
|  3|jarry|    wuhan| 35|
|  4| duke|menlopark| 28|
+---+-----+---------+---+
$ tree carbon-warehouse/
carbon-warehouse/
└── mydb
    └── test_table
        ├── Fact
        │   └── Part0
        │       ├── Segment_0
        │       │   ├── 0_batchno0-0-1532873451153.carbonindex
        │       │   └── part-0-0_batchno0-0-1532873451153.carbondata
        │       └── Segment_1
        │           ├── 0_batchno0-0-1532873814293.carbonindex
        │           └── part-0-0_batchno0-0-1532873814293.carbondata
        ├── LockFiles
        │   ├── Segment_0.lock
        │   ├── Segment_1.lock
        │   └── tablestatus.lock
        └── Metadata
            ├── schema
            ├── segments
            │   ├── 0_1532873451153.segment
            │   └── 1_1532873814293.segment
            └── tablestatus

新しいSegmentが追加されました。

DELETEの実施

では削除を試してみます。

carbon.sql("DELETE FROM mydb.test_table WHERE id = 3")
scala> carbon.sql("SELECT * FROM mydb.test_table").show
+---+-----+---------+---+
| id| name|     city|age|
+---+-----+---------+---+
|  1|david| shenzhen| 31|
|  2|eason| shenzhen| 27|
|  4| duke|menlopark| 28|
+---+-----+---------+---+

削除もできました。

$ tree carbon-warehouse/
carbon-warehouse/
└── mydb
    └── test_table
        ├── Fact
        │   └── Part0
        │       ├── Segment_0
        │       │   ├── 0_batchno0-0-1532873451153.carbonindex
        │       │   ├── part-0-0_batchno0-0-1532873451153.carbondata
        │       │   └── part-0-0_batchno0-0-1532873985129.deletedelta
        │       └── Segment_1
        │           ├── 0_batchno0-0-1532873814293.carbonindex
        │           └── part-0-0_batchno0-0-1532873814293.carbondata
        ├── LockFiles
        │   ├── Segment_0.lock
        │   ├── Segment_1.lock
        │   ├── meta.lock
        │   ├── tablestatus.lock
        │   └── tableupdatestatus.lock
        └── Metadata
            ├── schema
            ├── segments
            │   ├── 0_1532873451153.segment
            │   └── 1_1532873814293.segment
            ├── tablestatus
            └── tableupdatestatus-1532873985129

削除対象のデータがあるSegment0のディレクトリ内に part-0-0_batchno0-0-1532873985129.deletedelta が作成されました。

シンプルなしくみでUPDATEとDELETEを実現していますが便利そうですね。

Apache CarbonData の紹介

こんにちは。コネクトムの児島です。

弊社では広告配信のログなどの分析のために Apache Spark などのOSSビッグデータ分析基盤 を活用しています。

実運用では使っていませんが、列指向フォーマットの CarbonData を調査したので紹介します。

CarbonData って何?

CarbonData はビッグデータ向けの列指向フォーマットです。

公式サイトはこちら

元は2013年から Huawei によって開発が開始されて、2015年にApacheに寄贈され、2017年4月に Apache のトップレベルプロジェクトに昇格しました。*1

他のオープンソースのファイルフォーマットよりも10倍速いと言ってます。

他のフォーマットとの違いは?

同じような列指向フォーマットといえば、同じApacheのプロジェクトだけでも

がありますが、CarbonDataは インデックスを持っているのが最大の特徴です。 インデックスを持っているので、ファイルの中の必要なデータだけにすばやくアクセスできます。

他のすごいところ

UPDATEとDELETEがサポートされてます。

UPDATEとDELETEの際は、既存のファイルを書き換えずに、新規の差分ファイルを作成することで実現しています。

差分のファイルを統合してキレイなファイルにする命令もあります。 (ALTER TABLE テーブル名 COMPACT 'MINOR/MAJOR/CUSTOM')

どうやったら使えるの?

現在(バージョン 1.4.0)のところ

  • Spark
  • Hive
  • Presto

から利用できますが、デフォルトで組み込まれているわけではないので、自分でCarbonDataのJARを読み込んであげる必要があります。

CarbonDataが普及していくためには、これらのプロダクトにデフォルトで組み込まれる必要がありそうです。

次回は

実際に動作させてみて、どのようなファイルができるか見ていきたいと思います。