This is an example of loading the metadata of KAFKA into Greenplum during the GPSS data loading process.
When loading the data from KAFKA via GPSS, the metadata is in json format like {"partition": 0, "offset": 2, "topic": "test_kafka_topic"}
we can load the metadata by assign "META" settings in the yaml of GPSS.
for the usage of "META" please refer to the document:
Product Version: 6.20
create table test_meta_kafka (id int, m_offset int, m_partition int, m_topic text );
2. create the YAML of the GPSS job
DATABASE: gpadmin
USER: gpadmin
PASSWORD: abc123
HOST: mdw
PORT: 6767
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: smdw:9092
TOPIC: test_kafka_topic
KEY:
COLUMNS:
- NAME: key
TYPE: bytea
FORMAT: BINARY
VALUE:
COLUMNS:
- NAME: json_value
TYPE: json
FORMAT: JSON
META:
COLUMNS:
- NAME: meta_data ### the metadata of kafka, similar like: {"partition": 0, "offset": 2, "topic": "test_kafka_topic"}]
TYPE: json
FORMAT: JSON
ERROR_LIMIT: 999999999
OUTPUT:
SCHEMA: public
TABLE: test_meta_kafka
MAPPING:
- NAME: id
EXPRESSION: (json_value->>'id')::int
- NAME: m_offset
EXPRESSION: (meta_data->>'offset')::int ### the offset in the metadata
- NAME: m_partition
EXPRESSION: (meta_data->>'partition')::int ### the partition in the metadata
- NAME: m_topic
EXPRESSION: (meta_data->>'topic')::text ### the topic in the metadata
COMMIT:
MINIMAL_INTERVAL: 2000
3. submit the job and start the job
gpsscli submit --name test_gpsscli_job --gpss-port 5019 ./test_meta.yaml
gpsscli --gpss-port 5019 start test_gpsscli_job
4. load some data in KAFKA, here we loaded 1 line as below:
{ "id": 1313131, "month": 11, "expenses": 1313.13 }
5. check the table from Greenplum, we can see the metadata of KAFKA has been loaded
gpadmin=# select * from test_meta_kafka ;
id | m_offset | m_partition | m_topic
---------+----------+-------------+------------------
1313131 | 284 | 0 | test_kafka_topic