Apache Beam + Kotlin 開発 実践入門
どうも、こんにちは。Re:ゼロ 2 期 始まりましたね 👏、 @silverbirder です。 最近、仕事の関係上、Apache Beam + Kotlin を使うことになりました。それらの技術が一切知らなかったので、この記事に学んだことを書いていきます ✍️。
サンプルリポジトリは、下記に載せています。
https://github.com/silverbirder/apache-beam-kotlin-example/tree/master/src/main/kotlin
Apache Beam とは
https://www.st-hakky-blog.com/entry/2020/04/29/172220
** Batch や Streaming を 1 つのパイプライン処理 ** として実現できるデータパイプライン、それが Apache Beam です。(Batch + Stream → Beam)
言語は、Java, Python, Go(experimental)が選べます。 また、パイプライン上で実行する環境のことをランナーと呼び、Cloud Dataflow や Apache Flink、Apache Spark などがあります。
※ Streaming 処理は、サーバーの能力がボトルネックになりがちです。そこで、Cloud Dataflow という GCP のマネージドサービスを使用すると、その問題が解消されます。
機械学習など豊富な ** 分析ライブラリ ** を使いたい場合は、Python、 ** 型安全な ** 開発をしたい場合は、Java を選べば良いかなと思います。
今回は、Java を選びました。モダンな書き方ができる Kotlin でコーディングします。
セットアップ
ソフトウェアバージョンは、次のとおりです。
$ java -version
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_252-b09)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.252-b09, mixed mode)
IDE として intelliJ を使用しており、Kotlin SDK(1.3.72)が内蔵しています。
$ git clone https://github.com/silverbirder/apache-beam-kotlin-example.git && cd apache-beam-kotlin-example
$ ./gradlew build
パイプライン処理の概要
1. データの入力する(input → PCollection)
2. 入力されたデータを変形させる (PCollection → PTransform → PCollection)
3. 加工したデータを出力する (PCollection → output)
PCollection は、ひとかたまりのデータセットだと思って下さい。
よくあるサンプルコード WordCount を例に進めます。
※ 元々は、ApacheBeam 公式の WordCountがあったのですが、ローカルマシン単体で動かせないため、多少アレンジしました。WordCount は、ある文章から単語を抽出しカウントを取るだけです。
メインのコードは、こちらです。動かすときは、IDE からデバッグ実行します。(この辺りは省略します。詳しくは Makefile を見て下さい 🙇♂️)
@JvmStatic
fun main(args: Array<String>) {
val options = (PipelineOptionsFactory.fromArgs(*args).withValidation().`as`(WordCountOptions::class.java))
runWordCount(options)
}
@JvmStatic
fun runWordCount(options: WordCountOptions) {
// パイプラインを作る(空っぽ)
val p = Pipeline.create(options)
// Textファイルからデータを入力する → PCollection
p.apply("ReadLines", TextIO.read().from(options.inputFile))
// PCollectionをPTransformで変形させる
.apply(CountWords())
.apply(MapElements.via(FormatAsTextFn()))
// Textファイルにデータ(PCollection)を出力する
.apply<PDone>("WriteCounts", TextIO.write().to(options.output))
// パイプラインを実行する
p.run().waitUntilFinish()
}
PTransform
Apache Beam のコアとなる PTransform についてサンプルコードを載せます。
ParDo
ParDo は、PCollection を好きなように加工することができます。 最も、柔軟に処理を書くことができます。
// PTransformによる変形処理
public class CountWords : PTransform<PCollection<String>, PCollection<KV<String, Long>>>() {
override fun expand(lines: PCollection<String>): PCollection<KV<String, Long>> {
// 文章を単語に分割する
val words = lines.apply(ParDo.of(ExtractWordsFn()))
// 分割された単語をカウントする
val wordCounts = words.apply(Count.perElement())
return wordCounts
}
}
public class ExtractWordsFn : DoFn<String, String>() {
@ProcessElement
fun processElement(@Element element: String, receiver: DoFn.OutputReceiver<String>) {
...
}
GroupByKey
Key-Value(KV)の PCollection を Key でグルーピングします。
import java.lang.Iterable as JavaIterable
// PCollection<KV<String, Long>>
val wordCounts = words.apply(Count.perElement())
// PCollection<KV<String, JavaIterable<Long>>>
val groupByWord = wordCounts.apply(GroupByKey.create<String, Long>()) as PCollection<KV<String, JavaIterable<Long>>>
Kotlin では、Iterable が動作できないため、Java の Iterable を使う必要があります。
Flatten
複数の PCollection を 1 つの PCollection に結合します。
// PCollection<KV<String, Long>>
val wordCounts = words.apply(Count.perElement())
// PCollectionList<KV<String, Long>>
val wordCountsDouble = PCollectionList.of(wordCounts).and(wordCounts)
// PCollection<KV<String, Long>>
val flattenWordCount = wordCountsDouble.apply(Flatten.pCollections())
Combine
PCollection の要素を結合します。 GroupByKey の Key 毎に要素を結合する方法と、PCollection 毎に要素を結合する方法があります。 今回は、GroupByKey のサンプルコードです。
// PCollection<KV<String, Long>>
val wordCounts = words.apply(Count.perElement())
// PCollection<KV<String, Long>>
val sumWordsByKey = wordCounts.apply(Sum.longsPerKey())
Partition
PCollection を任意の数でパーティション分割します。
// PCollection<KV<String, Long>>
val wordCounts = words.apply(Count.perElement())
// PCollection<KV<String, Long>>
var 10wordCounts = wordCounts.apply(Partition.of(10, PartitionFunc()))
Streaming と Windowing
パイプラインを、そのまま使えば Batch 実行となります。 Batch は、有限のデータに対し、Streaming は無限のデータに対して使います。 無限のデータを処理するのは、Windowing というものを使い、無限を有限のデータにカットして、処理します。
Streaming 処理するためには、下記のようにコードにします。
@JvmStatic
fun main(args: Array<String>) {
val options = (PipelineOptionsFactory.fromArgs(*args).withValidation().`as`(WordCountOptions::class.java))
runWordCount(options)
}
@JvmStatic
fun runWordCount(options: WordCountOptions) {
val p = Pipeline.create(options)
p.apply("ReadLines",
TextIO
.read()
.from("./src/main/kotlin/*.json")
// fromで指定したファイルがないか監視する。(入力値は無限)
// 10秒ごとに監視、5分間変更がなければ終了。
.watchForNewFiles(standardSeconds(10), afterTimeSinceNewOutput(standardMinutes(5)))
)
// 30秒間毎にWindowingする。(無限のデータを、有限のデータにカットする)
.apply(Window.into<String>(FixedWindows.of(standardSeconds(30))))
.apply(CountWords())
.apply(MapElements.via(FormatAsTextFn()))
.apply<PDone>("WriteCounts", TextIO.write().to(options.output).withWindowedWrites().withNumShards(1))
p.run().waitUntilFinish()
}
テストコード
Apache Beam もテストコードが書けます。 サンプルコードは、こちらです。
実行するパイプラインを TestPipeline にすることで、テストができます。
import org.apache.beam.sdk.testing.TestPipeline
fun countWordsTest() {
// Arrange
val p: Pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false)
val input: PCollection<String> = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of())
val output: PCollection<KV<String, Long>>? = input.apply(CountWords())
// Act
p.run()
// Assert
PAssert.that<KV<String, Long>>(output).containsInAnyOrder(COUNTS_ARRAY)
}
companion object {
val WORDS: List<String> = listOf(
"hi there", "hi", "hi sue bob",
"hi sue", "", "bob hi"
)
val COUNTS_ARRAY = listOf(
KV.of("hi", 5L),
KV.of("there", 2L),
KV.of("sue", 2L),
KV.of("bob", 2L)
)
}
終わりに
Apache Beam は、他にも Side input や Additional outputs などがあります。 使いこなせるためにも、これからも頑張っていきます!
さて、Re:ゼロ 2 期を見ましょう 👍
Tags
2021-09-04
コンパイラ基盤であるLLVMについて、全く知識がない私が、javascriptソースコードをパースしLLVMでコンパイルできるようになりました。LLVMの記事は数多くありますが、初心者向けの記事が少なく感じたため、本記事では、できる限り分かりやすくLLVMについて紹介できる記事を書こうと思います。ソースコードは、こちらに置いています。...
2019-06-10
前回 一足遅れて Kubernetes を学び始める - 15. セキュリティ -では、RBACによる権限について学習しました。今回は最後にKubernetesのコンポーネントについて学習します。...
2019-06-07
前回 一足遅れて Kubernetes を学び始める - 14. スケジューリング -では、AffinityなどでPodのスケジューリングについて学習しました。今回は、セキュリティについて学習します。...
2019-06-05
前回 一足遅れて Kubernetes を学び始める - 13. ヘルスチェックとコンテナライフサイクル -では、requestsやlimitといったヘルスチェックの仕方を学びました。今回は、Affinityなどによるスケジューリングについて学習します。...
2019-05-30
前回 一足遅れて Kubernetes を学び始める - 12. リソース制限 -では、requestsやlimitなどのリソース制限について学習しました。今回は、ヘルスチェックとコンテナライフサイクルについて学習します。...
2019-05-29
前回 一足遅れて Kubernetes を学び始める - 11. config&storage その2 -では、storageについて学習しました。今回は、リソース制限について学習します。...
2019-05-27
前回 一足遅れて Kubernetes を学び始める - 10. config&storage その1 -では、configについて学習しました。今回は、storageを学びます。...
2019-05-23
前回 一足遅れて Kubernetes を学び始める - 09. discovery&LB その2 -では、様々なserviceを学習しました。今回は、config&storageのconfigを学びます。...
2019-05-22
今回、k8sの体験を目的として参加したのですが、意外な収穫があったので、共有したく、記事を書くことにしました。...
2019-05-15
前回 一足遅れて Kubernetes を学び始める - 08. discovery&LB その1 -でServiceについての概要を学びました。今回は下記を一気に学びます。...
2019-05-07
前回 一足遅れて Kubernetes を学び始める - 07. workloads その3 -でようやくworkloadsが終了しました。今回は、discovery&LBを進めようと思います。...
2019-05-06
前回 一足遅れて Kubernetes を学び始める - 06. workloads その2 -にて、DaemonSetとStatefulSet(一部)を学習しました。今回は、StatefulSetの続きとJob,CronJobを学習します。...
2019-05-05
前回 一足遅れて Kubernetes を学び始める - 05. workloads その1 -では、Pod,ReplicaSet,Deploymentの3つを学習しました。今回はDaemonSet,StatefulSet(一部)を学びます。...
2019-05-03
前回 一足遅れて Kubernetes を学び始める - 04. kubectl -では、kubenetesのCLIツールkubectlを学習しました。今回は、目玉機能であるworkloadsについて学習します。...
2019-05-02
前回 一足遅れて Kubernetes を学び始める - 03. Raspberry Pi -では、RaspberryPiの環境にKubernetesを導入しました。無事、動作確認ができたので、さっそく学習していきたいです。...
2019-04-28
前回 一足遅れて Kubernetes を学び始める - 02. Docker For Mac -では、MacでKubernetesを軽く動かしてみました。DockerForMacでは、NodeがMasterのみだったため、Kubernetesを学習するには、ものたりない感がありました。そこで、RaspberryPiを使っておうちKubernetesを構築することになりました。...
2019-04-27
前回 一足遅れて Kubernetes を学び始める - 01. 環境選択編 -にて、Kubernetesを学ぶ環境を考えてみました。いきなりGKEを使うんじゃなくて、お手軽に試せるDockerForMacを使おうとなりました。...
2019-04-18
経緯 Kubernetesを使えるようになりたいな〜(定義不明) けど、他にやりたいこと(アプリ開発)あるから後回しにしちゃえ〜!!と、今までずっと、ちゃんと学ばなかったKubernetesを、本腰入れて使ってみようと思います。...