GPSS Example: How to load the meta data of KAFKA into Greenplum
search cancel

GPSS Example: How to load the meta data of KAFKA into Greenplum

book

Article ID: 296821

calendar_today

Updated On:

Products

VMware Tanzu Greenplum

Issue/Introduction

This is an example of loading the metadata of KAFKA into Greenplum during the GPSS data loading process.

When loading the data from KAFKA via GPSS, the metadata is in json format like {"partition": 0, "offset": 2, "topic": "test_kafka_topic"}
we can load the metadata by assign "META" settings in the yaml of GPSS. 

for the usage of "META" please refer to the document:


Environment

Product Version: 6.20

Resolution

Below is an example:

1. create a table that will be used as the target of the data loading process
create table test_meta_kafka (id int, m_offset int, m_partition int, m_topic text ); 
2. create the YAML of the GPSS job
DATABASE: gpadmin
USER: gpadmin
PASSWORD: abc123
HOST: mdw
PORT:  6767
VERSION: 2
KAFKA:
  INPUT:
    SOURCE:
      BROKERS: smdw:9092
      TOPIC: test_kafka_topic
    KEY:
        COLUMNS:
            - NAME: key
              TYPE: bytea
        FORMAT: BINARY
    VALUE:
        COLUMNS:
          - NAME: json_value
            TYPE: json
        FORMAT: JSON
    META:
        COLUMNS:
          - NAME: meta_data ### the metadata of kafka, similar like: {"partition": 0, "offset": 2, "topic": "test_kafka_topic"}]
            TYPE: json
        FORMAT: JSON
    ERROR_LIMIT: 999999999
  OUTPUT:
    SCHEMA: public
    TABLE: test_meta_kafka
    MAPPING:
      - NAME: id
        EXPRESSION: (json_value->>'id')::int 
      - NAME: m_offset
        EXPRESSION: (meta_data->>'offset')::int    ### the offset in the metadata
      - NAME: m_partition
        EXPRESSION: (meta_data->>'partition')::int   ### the partition in the metadata
      - NAME: m_topic
        EXPRESSION: (meta_data->>'topic')::text   ### the topic in the metadata
  COMMIT:
      MINIMAL_INTERVAL: 2000
3. submit the job and start the job
gpsscli submit --name test_gpsscli_job --gpss-port 5019 ./test_meta.yaml
gpsscli --gpss-port 5019 start test_gpsscli_job 
4. load some data in KAFKA, here we loaded 1 line as below:
{ "id": 1313131, "month": 11, "expenses": 1313.13 } 
5. check the table from Greenplum, we can see the metadata of KAFKA has been loaded
gpadmin=# select * from test_meta_kafka ;
   id    | m_offset | m_partition |     m_topic
---------+----------+-------------+------------------
 1313131 |      284 |           0 | test_kafka_topic