Apache CarbonData を試す

今回はSparkからCarbonDataを作成して UPDATE と DELETE をしてみたいと思います。

CarbonDataの UPDATE と DELETE の動きは次のWikiにまとめられています。QuicStart を参考に試してみます。

Update and Delete Support

前準備

サンプルデータ作成

サンプルのCSVを作成します。

cd carbondata
cat > sample.csv << EOF
id,name,city,age
1,david,shenzhen,31
2,eason,shenzhen,27
3,jarry,wuhan,35
EOF

SparkShell起動

Downloadから apache-carbondata-1.4.0-bin-spark2.2.1-hadoop2.7.2.jar をダウンロードして、spark-shellを起動します。

$ spark-shell --jars apache-carbondata-1.4.0-bin-spark2.2.1-hadoop2.7.2.jar

CarbonSession作成

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._

// SparkのHiveMetastoreと被らないように ./catbon-meta 配下にmetastore_dbが作られるようにする
// データ類は carbon-warehouse ディレクトリに作成されるようにする
val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("carbon-warehouse", "carbon-meta")

TABLE 作成

DATABASE と TABLE を作成します。

carbon.sql("CREATE DATABASE IF NOT EXISTS mydb")
carbon.sql("CREATE TABLE IF NOT EXISTS mydb.test_table(id string, name string, city string, age Int) STORED BY 'carbondata'")

別のシェルでディレクトリの構成を確認します。

$ tree carbon-warehouse/
carbon-warehouse/
└── mydb
    └── test_table
        └── Metadata
            └── schema

空のディレクトリ carbon-warehouse/mydb/test_table がされました。

INSERTとDELETEを試す

サンプルCSVの LOAD

続いて最初に作ったサンプルデータをLOADします。

carbon.sql("LOAD DATA INPATH 'sample.csv' INTO TABLE mydb.test_table")
carbon.sql("SELECT * FROM mydb.test_table").show
+---+-----+--------+---+
| id| name|    city|age|
+---+-----+--------+---+
|  1|david|shenzhen| 31|
|  2|eason|shenzhen| 27|
|  3|jarry|   wuhan| 35|
+---+-----+--------+---+
$ tree carbon-warehouse/
carbon-warehouse/
└── mydb
    └── test_table
        ├── Fact
        │   └── Part0
        │       └── Segment_0
        │           ├── 0_batchno0-0-1532873451153.carbonindex
        │           └── part-0-0_batchno0-0-1532873451153.carbondata
        ├── LockFiles
        │   ├── Segment_0.lock
        │   └── tablestatus.lock
        └── Metadata
            ├── schema
            ├── segments
            │   └── 0_1532873451153.segment
            └── tablestatus

Segment_0が作成されました。INSERTの塊ごとにSegmentでまとめてファイルが作成されます。

INSERTの実施

さらにデータをINSERTしてみます。

carbon.sql("INSERT INTO  mydb.test_table VALUES(4, 'duke', 'menlopark', 28)")
carbon.sql("SELECT * FROM mydb.test_table").show
+---+-----+---------+---+
| id| name|     city|age|
+---+-----+---------+---+
|  1|david| shenzhen| 31|
|  2|eason| shenzhen| 27|
|  3|jarry|    wuhan| 35|
|  4| duke|menlopark| 28|
+---+-----+---------+---+
$ tree carbon-warehouse/
carbon-warehouse/
└── mydb
    └── test_table
        ├── Fact
        │   └── Part0
        │       ├── Segment_0
        │       │   ├── 0_batchno0-0-1532873451153.carbonindex
        │       │   └── part-0-0_batchno0-0-1532873451153.carbondata
        │       └── Segment_1
        │           ├── 0_batchno0-0-1532873814293.carbonindex
        │           └── part-0-0_batchno0-0-1532873814293.carbondata
        ├── LockFiles
        │   ├── Segment_0.lock
        │   ├── Segment_1.lock
        │   └── tablestatus.lock
        └── Metadata
            ├── schema
            ├── segments
            │   ├── 0_1532873451153.segment
            │   └── 1_1532873814293.segment
            └── tablestatus

新しいSegmentが追加されました。

DELETEの実施

では削除を試してみます。

carbon.sql("DELETE FROM mydb.test_table WHERE id = 3")
scala> carbon.sql("SELECT * FROM mydb.test_table").show
+---+-----+---------+---+
| id| name|     city|age|
+---+-----+---------+---+
|  1|david| shenzhen| 31|
|  2|eason| shenzhen| 27|
|  4| duke|menlopark| 28|
+---+-----+---------+---+

削除もできました。

$ tree carbon-warehouse/
carbon-warehouse/
└── mydb
    └── test_table
        ├── Fact
        │   └── Part0
        │       ├── Segment_0
        │       │   ├── 0_batchno0-0-1532873451153.carbonindex
        │       │   ├── part-0-0_batchno0-0-1532873451153.carbondata
        │       │   └── part-0-0_batchno0-0-1532873985129.deletedelta
        │       └── Segment_1
        │           ├── 0_batchno0-0-1532873814293.carbonindex
        │           └── part-0-0_batchno0-0-1532873814293.carbondata
        ├── LockFiles
        │   ├── Segment_0.lock
        │   ├── Segment_1.lock
        │   ├── meta.lock
        │   ├── tablestatus.lock
        │   └── tableupdatestatus.lock
        └── Metadata
            ├── schema
            ├── segments
            │   ├── 0_1532873451153.segment
            │   └── 1_1532873814293.segment
            ├── tablestatus
            └── tableupdatestatus-1532873985129

削除対象のデータがあるSegment0のディレクトリ内に part-0-0_batchno0-0-1532873985129.deletedelta が作成されました。

シンプルなしくみでUPDATEとDELETEを実現していますが便利そうですね。