When loading data from the Greenplum database to spark via spark-connector, you might observe the following OutOfMemoryError (OOM):
20/10/13 10:03:17 WARN TaskSetManager: Lost task 1.1 in stage 2.0 (TID 5, 192.168.6.198, executor 2): java.lang.OutOfMemoryError: GC overhead limit exceeded
When you see this issue, please consider tuning the value of "partitionColumn" and partitions in spark-connector jobs.
Please make sure that you always choose a column with a high portion of unique data such as "partitionColumn".
If you do not have such a column in the table, then consider adding one more column with sequence data (e.g. 1,2,3,4,5...) into the table and use that column as your "partitionColumn"
Greenplum and Spark connector
In spark-connector, there is an option named "partitions". When loading the data, spark-connector will separate jobs into batches, and it will use this value (partitions) to determine how much raws of each job, see Reading from Greenplum Database into Spark for more information.
Example
Create a table with 43GB of data where the column "id" is "partitionColumn". In this example. 93.7% of the data in this column are has a unique value.
gpadmin=# select pg_size_pretty(pg_total_relation_size('big_table')); pg_size_pretty ---------------- 43 GB gpadmin=# SELECT count(id) from big_table ; count --------- 1950000 gpadmin=# SELECT count(distinct(id)) from big_table ; count --------- 1827444 << 93.7% are unique data
Loading the data with the default value of "partitions":
spark-shell --master spark://192.168.6.198:7077 scala> val gscReadOptionMap = Map( "url" -> "jdbc:postgresql://192.168.6.200:5432/gpadmin", "user" -> "gpadmin", "password" -> "abc123", "dbschema" -> "public", "dbtable" -> "big_table", "partitionColumn" -> "id" ) scala> val gpdf = spark.read.format("greenplum") scala> .options(gscReadOptionMap) scala> .load() scala> .count res2: Long = 1950000 scala> gpdf.load().write.csv("/data/big_table1")
By default, the "partitions" is the value of the total count of primary segments. In this example environment, there are 2 primary segments. In the GPDB log, we can see the spark-connect created 2 batch jobs to load data:
### job#1 INSERT INTO ""public"".""spark_d880ae721e69cd77_2f609c8cf10fa734_0_30"" ... WHERE id < 14966796 OR id IS NULL ### job#2 INSERT INTO ""public"".""spark_d880ae721e69cd77_2f609c8cf10fa734_2_29"" ... WHERE id >= 14966796",0,,"cdbdisp.c",254,
Since there are too much data in each job (~20GB), we get the OOM error from spark-connector:
20/10/13 10:03:17 WARN TaskSetManager: Lost task 1.1 in stage 2.0 (TID 5, 192.168.6.198, executor 2): java.lang.OutOfMemoryError: GC overhead limit exceeded
"2" is not a good value in this example, so we change the partitions to 1000:
# spark-shell --master spark://192.168.6.198:7077 scala> val gscReadOptionMap = Map( "url" -> "jdbc:postgresql://192.168.6.200:5432/gpadmin", "user" -> "gpadmin", "password" -> "abc123", "dbschema" -> "public", "dbtable" -> "big_table", "partitionColumn" -> "id", "partitions" -> "1000" )
Run the job again - in the GPDB log, we can see the spark-connector separated the "WHERE" into a smaller range:
# tail -f gpdb-2020-10-13_075650.csv | grep "WHERE id" WHERE id >= 7528611 AND id < 7543577",0,,"postgres.c",2756, WHERE id >= 7588475 AND id < 7603441",,,,,," WHERE id >= 7588475 AND id < 7603441",0,,"postgres.c",2756, WHERE id >= 7573509 AND id < 7588475",,,,,," WHERE id >= 7573509 AND id < 7588475",0,,"postgres.c",2756, WHERE id >= 7618407 AND id < 7633373",,,,,," WHERE id >= 7618407 AND id < 7633373",0,,"postgres.c",2756, WHERE id >= 7603441 AND id < 7618407",,,,,,"
As a result, the loading job goes smoothly without an OOM error:
scala> gpdf.load().write.csv("/data/big_table2") 20/10/13 10:26:35 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. [Stage 0:===============> (290 + 6) / 1000]