-
Notifications
You must be signed in to change notification settings - Fork 395
243 lines (237 loc) · 20 KB
/
flink-cdc-hdfs-test.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# SPDX-FileCopyrightText: 2023 LakeSoul Contributors
#
# SPDX-License-Identifier: Apache-2.0
name: CI with flink cdc Test on HDFS
on:
push:
paths-ignore:
- "javadoc/**"
- "website/**"
- "cpp/**"
- "python/**"
- "**.md"
branches:
- 'main'
pull_request:
paths-ignore:
- "javadoc/**"
- "website/**"
- "cpp/**"
- "python/**"
- "**.md"
branches:
- 'main'
- 'release/**'
workflow_dispatch:
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@main
with:
tool-cache: false
android: true
dotnet: true
haskell: true
large-packages: false
docker-images: true
swap-storage: true
- uses: actions/checkout@v4
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '11'
distribution: 'temurin'
cache: maven
- name: Set up Python 3.9
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip setuptools wheel
pip install pymysql cryptography jproperties --no-cache-dir
wget https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.17.1/flink-s3-fs-hadoop-1.17.1.jar -O $HOME/flink-s3-fs-hadoop-1.17.1.jar
wget https://repo1.maven.org/maven2/org/apache/parquet/parquet-hadoop-bundle/1.12.3/parquet-hadoop-bundle-1.12.3.jar -O $HOME/parquet-hadoop-bundle-1.12.3.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-parquet/1.17.1/flink-parquet-1.17.1.jar -O $HOME/flink-parquet-1.17.1.jar
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
default: true
- uses: Swatinem/rust-cache@v2
with:
workspaces: "./rust -> target"
- name: Cache Docker images
uses: ScribeMD/docker-cache@0.4.0
with:
key: docker-${{ runner.os }}-${{ hashFiles('rust/Cross.toml', 'docker/lakesoul-docker-compose-env/docker-compose.yml') }}
- name: Pull images
run: |
docker pull -q bitnami/spark:3.3.1
- uses: actions-rs/cargo@v1
with:
use-cross: true
command: build
args: '--manifest-path rust/Cargo.toml --target x86_64-unknown-linux-gnu --release --all-features'
- name: Build with Maven
run: |
mkdir -p rust/target/release
cp rust/target/x86_64-unknown-linux-gnu/release/liblakesoul_io_c.so rust/target/release
cp rust/target/x86_64-unknown-linux-gnu/release/liblakesoul_metadata_c.so rust/target/release
MAVEN_OPTS="-Xmx4000m" mvn -q -B clean package -f pom.xml -Pcross-build -DskipTests
- name: Get jar names
run: |
echo "FLINK_JAR_NAME=$(python script/get_jar_name.py lakesoul-flink)" >> $GITHUB_ENV
echo "FLINK_TEST_JAR_NAME=$(python script/get_jar_name.py lakesoul-flink | sed -e 's/.jar/-tests.jar/g')" >> $GITHUB_ENV
echo "SPARK_JAR_NAME=$(python script/get_jar_name.py lakesoul-spark)" >> $GITHUB_ENV
echo "SPARK_TEST_JAR_NAME=$(python script/get_jar_name.py lakesoul-spark | sed -e 's/.jar/-tests.jar/g')" >> $GITHUB_ENV
- name: Copy built jar to work-dir
run: |
cp ./lakesoul-flink/target/$FLINK_JAR_NAME ./script/benchmark/work-dir
cp ./lakesoul-flink/target/$FLINK_TEST_JAR_NAME ./script/benchmark/work-dir
cp ./lakesoul-spark/target/$SPARK_JAR_NAME ./script/benchmark/work-dir
cp ./lakesoul-spark/target/$SPARK_TEST_JAR_NAME ./script/benchmark/work-dir
- uses: beyondstorage/setup-hdfs@master
with:
hdfs-version: '3.3.6'
- name: Modify HDFS User Group Mapping
run: |
sed -i '/^<\/configuration>/i <property><name>hadoop.proxyuser.spark.hosts</name><value>*</value></property><property><name>hadoop.proxyuser.spark.groups</name><value>*</value></property>' $HADOOP_HOME/etc/hadoop/core-site.xml
- name: Modify HDFS Host
run: |
sed -i 's/localhost/172.17.0.1/g' $HADOOP_HOME/etc/hadoop/core-site.xml
sed -i 's/localhost/172.17.0.1/g' $HADOOP_HOME/etc/hadoop/hdfs-site.xml
$HADOOP_HOME/sbin/stop-dfs.sh
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/bin/hadoop fs -chmod -R 777 /
- name: Deploy cluster
run: |
cd ./docker/lakesoul-docker-compose-env
docker compose pull -q
docker compose up -d
sleep 30s
- name: Start compaction task
run: |
cd ./script/benchmark/work-dir
nohup docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --proxy-user flink --driver-memory 2G --executor-memory 2G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.hadoop.fs.s3.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.buffer.dir=/tmp --conf spark.hadoop.fs.s3a.fast.upload.buffer=disk --conf spark.hadoop.fs.s3a.fast.upload=true --conf spark.dmetasoul.lakesoul.native.io.enable=true --class com.dmetasoul.lakesoul.spark.compaction.CompactionTask --master local[4] /opt/spark/work-dir/$SPARK_JAR_NAME --threadpool.size=10 --database="" --file_num_limit=5 --file_size_limit=10KB > compaction.log 2>&1 &
- name: Start flink mysql cdc task-1
run: |
docker exec -t -u flink lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.entry.MysqlCdc /opt/flink/work-dir/$FLINK_JAR_NAME --source_db.host mysql --source_db.port 3306 --source_db.db_name test_cdc --source_db.user root --source_db.password root --source.parallelism 2 --sink.parallelism 4 --use.cdc true --warehouse_path hdfs://172.17.0.1:9000/lakesoul-test-bucket/data/ --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/chk --flink.savepoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/svp --job.checkpoint_interval 5000 --server_time_zone UTC
sleep 30s
- name: Start flink source to sink task-2
run: |
docker exec -t -u flink lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.test.benchmark.LakeSoulSourceToSinkTable -C file:///opt/flink/work-dir/$FLINK_JAR_NAME /opt/flink/work-dir/$FLINK_TEST_JAR_NAME --source.database.name test_cdc --source.table.name default_init --sink.database.name flink_sink --sink.table.name default_init --use.cdc true --hash.bucket.number 2 --job.checkpoint_interval 10000 --server_time_zone UTC --warehouse.path hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink-sink/data --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink-sink/chk
sleep 30s
- name: Start flink DataGenSource without primary key task-3
run: |
docker exec -t -u flink lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.test.benchmark.LakeSoulDataGenSourceTable -C file:///opt/flink/work-dir/$FLINK_JAR_NAME /opt/flink/work-dir/$FLINK_TEST_JAR_NAME --sink.database.name flink --sink.table.name sink_table --job.checkpoint_interval 10000 --server_time_zone UTC --warehouse.path hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink/ --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink/chk --sink.parallel 2 --data.size 1000 --write.time 5
- name: Download task dependency jar
run: |
cd ./script/benchmark/work-dir
if [ ! -e mysql-connector-java-8.0.30.jar ]; then wget -q https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar; fi
if [ ! -e flink-connector-jdbc-3.1.1-1.17.jar ]; then wget -q https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar; fi
if [ ! -e assertj-core-3.23.1.jar ]; then wget -q https://repo1.maven.org/maven2/org/assertj/assertj-core/3.23.1/assertj-core-3.23.1.jar; fi
- name: Create table and insert data
run: |
cd ./script/benchmark
python 1_create_table.py
docker exec -i lakesoul-docker-compose-env-mysql-1 bash /2_insert_table_data.sh
sleep 30s
- name: Start flink engine data check task
run: |
docker exec -t lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.test.benchmark.LakeSoulFlinkDataCheck -C file:///opt/flink/work-dir/$FLINK_JAR_NAME -C file:///opt/flink/work-dir/mysql-connector-java-8.0.30.jar -C file:///opt/flink/work-dir/flink-connector-jdbc-3.1.1-1.17.jar -C file:///opt/flink/work-dir/assertj-core-3.23.1.jar /opt/flink/work-dir/$FLINK_TEST_JAR_NAME --mysql.hostname mysql --mysql.database.name test_cdc --mysql.table.name default_init_1 --mysql.username root --mysql.password root --mysql.port 3306 --server.time.zone UTC --single.table.contract true --lakesoul.database.name test_cdc --lakesoul.table.name default_init_1 --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink-engine/chk
sleep 30s
- name: Mysql cdc data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
- name: Flink source to sink data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME --cdc.contract false --single.table.contract true
- name: Adding columns for tables and deleting some data from tables
run: |
cd ./script/benchmark
python3 3_add_column.py
python3 delete_data.py
docker exec -i lakesoul-docker-compose-env-mysql-1 bash /2_insert_table_data.sh
sleep 30s
- name: Start flink engine data check task
run: |
docker exec -t lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.test.benchmark.LakeSoulFlinkDataCheck -C file:///opt/flink/work-dir/$FLINK_JAR_NAME -C file:///opt/flink/work-dir/mysql-connector-java-8.0.30.jar -C file:///opt/flink/work-dir/flink-connector-jdbc-3.1.1-1.17.jar -C file:///opt/flink/work-dir/assertj-core-3.23.1.jar /opt/flink/work-dir/$FLINK_TEST_JAR_NAME --mysql.hostname mysql --mysql.database.name test_cdc --mysql.table.name default_init_1 --mysql.username root --mysql.password root --mysql.port 3306 --server.time.zone UTC --single.table.contract true --lakesoul.database.name test_cdc --lakesoul.table.name default_init_1 --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink-engine/chk
sleep 30s
- name: Mysql cdc data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
- name: Flink source to sink data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME --cdc.contract false --single.table.contract true
- name: Updating data in tables
run: |
cd ./script/benchmark
python3 4_update_data.py
sleep 30s
- name: Start flink engine data check task
run: |
docker exec -t lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.test.benchmark.LakeSoulFlinkDataCheck -C file:///opt/flink/work-dir/$FLINK_JAR_NAME -C file:///opt/flink/work-dir/mysql-connector-java-8.0.30.jar -C file:///opt/flink/work-dir/flink-connector-jdbc-3.1.1-1.17.jar -C file:///opt/flink/work-dir/assertj-core-3.23.1.jar /opt/flink/work-dir/$FLINK_TEST_JAR_NAME --mysql.hostname mysql --mysql.database.name test_cdc --mysql.table.name default_init_1 --mysql.username root --mysql.password root --mysql.port 3306 --server.time.zone UTC --single.table.contract true --lakesoul.database.name test_cdc --lakesoul.table.name default_init_1 --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink-engine/chk
sleep 30s
- name: Mysql cdc data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.dmetasoul.lakesoul.native.io.enable=true --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
- name: Flink source to sink data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME --cdc.contract false --single.table.contract true
- name: Dropping columns and deleting some data in tables
run: |
cd ./script/benchmark
python3 6_drop_column.py
python3 delete_data.py
docker exec -i lakesoul-docker-compose-env-mysql-1 bash /2_insert_table_data.sh
sleep 30s
- name: Start flink engine data check task
run: |
docker exec -t lakesoul-docker-compose-env-jobmanager-1 flink run -d -c org.apache.flink.lakesoul.test.benchmark.LakeSoulFlinkDataCheck -C file:///opt/flink/work-dir/$FLINK_JAR_NAME -C file:///opt/flink/work-dir/mysql-connector-java-8.0.30.jar -C file:///opt/flink/work-dir/flink-connector-jdbc-3.1.1-1.17.jar -C file:///opt/flink/work-dir/assertj-core-3.23.1.jar /opt/flink/work-dir/$FLINK_TEST_JAR_NAME --mysql.hostname mysql --mysql.database.name test_cdc --mysql.table.name default_init_1 --mysql.username root --mysql.password root --mysql.port 3306 --server.time.zone UTC --single.table.contract true --lakesoul.database.name test_cdc --lakesoul.table.name default_init_1 --flink.checkpoint hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink-engine/chk
sleep 30s
- name: Mysql cdc data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --conf spark.dmetasoul.lakesoul.native.io.enable=true --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME
- name: Flink source to sink data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.Benchmark --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME --cdc.contract false --single.table.contract true
- name: Table without primary key data accuracy verification task
run: |
cd ./script/benchmark
docker run --cpus 2 -m 5000m --net lakesoul-docker-compose-env_default --rm -t -v $HADOOP_HOME:/opt/hadoop --env HADOOP_HOME=/opt/hadoop -v ${PWD}/work-dir:/opt/spark/work-dir --env lakesoul_home=/opt/spark/work-dir/lakesoul.properties bitnami/spark:3.3.1 spark-submit --driver-memory 4G --executor-memory 4G --conf spark.driver.memoryOverhead=1500m --conf spark.executor.memoryOverhead=1500m --jars /opt/spark/work-dir/$SPARK_JAR_NAME,/opt/spark/work-dir/mysql-connector-java-8.0.30.jar --class org.apache.spark.sql.lakesoul.benchmark.FlinkWriteDataCheck --master local[4] /opt/spark/work-dir/$SPARK_TEST_JAR_NAME --csv.path hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink/csv --lakesoul.table.path hdfs://172.17.0.1:9000/lakesoul-test-bucket/flink/sink_table --server.time.zone UTC
- name: Print Flink Log
if: always()
run: |
docker logs lakesoul-docker-compose-env-jobmanager-1 > flink-session-cluster.log
- name: Upload Log
if: always()
continue-on-error: true
uses: actions/upload-artifact@v4
with:
name: flink-cluster-log
path: flink-session-cluster.log
retention-days: 5
if-no-files-found: error
- name: Upload Compaction Log
uses: actions/upload-artifact@v4
with:
name: lakesoul-compaction-log
path: ./script/benchmark/work-dir/compaction.log
retention-days: 3
if-no-files-found: error