1. Set up GPSS. Refer to the following:
2. Set up the YAML file to load the data similar to the following:
DATABASE: analytics USER: kafkagpuser PASSWORD: zzz HOST: gpdb PORT: 5432 VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: kakfka:9092 TOPIC: jiff_pre.jiff_pre.external_members KEY: COLUMNS: - NAME: id TYPE: json FORMAT: avro AVRO_OPTION: SCHEMA_REGISTRY_ADDR: http://ETL:8080 VALUE: COLUMNS: - NAME: rec TYPE: json FORMAT: avro AVRO_OPTION: SCHEMA_REGISTRY_ADDR: http://ETL:8080 ERROR_LIMIT: 0 OUTPUT: SCHEMA: kafka_raw TABLE: jsontest MAPPING: - NAME: id EXPRESSION: (id->>'id')::int - NAME: rec EXPRESSION: rec COMMIT: MAX_ROW: 1000 MINIMAL_INTERVAL: 1000
3. Create the target table in the GPDB:
psql analytics CREATE SCHEMA kafka_raw; SET search_path TO kafka_raw; CREATE TABLE jsontest(id int,rec json); CREATE EXTENSION gpss;
4. Set up port forwarding using the socat
command in the ETL server:
socat TCP-LISTEN:8080,fork TCP:kafka:8081
With the above command, the ETL server would forward every traffic on ETL8080 to Kafka 8081 port, which is used for AVRO files. And ETL server would be act as the Kafka registry schema server
5. Use the curl command to test if port forwarding is set up properly:
curl -X GET http://ETL:8080/subjects
6. In Kafka, create the topic similarly to below:
kafka-avro-console-producer --broker-list kafka:9092 --topic jiff_pre.jiff_pre.external_members \ --property schema.registry.url=http://kafka:8081 \ --property parse.key=true \ --property key.schema='{"type":"record","name":"keyrecord","fields":[{"name":"id","type":"int"}]}' \ --property value.schema='{"type":"record","name":"valuerecord","fields":[{"name":"f1","type":"string"}]}'
In the shell, type in the following:
{"id":1}\t{"f1":"value1"} {“id”:1} is the data of key {"f1":"value1"} is the data of value \t means pressing TAB
7. In the Kafka server, check the following command to check if the data has been written into Kafka:
kafka-avro-console-consumer --topic jiff_pre.jiff_pre.external_members \ --bootstrap-server kafka:9092 \ --property schema.registry.url=http://kafka:8081 \ --property print.key=true --from-beginning
8. Run the following job:
gpsscli load custerm_avro.yaml
Some other error messages you might seen while configuring this:
Error message 1:
panic:gpss:gpadmin:sdw2:009230-[CRITICAL]:-pq: formatter function "kafka_in" of type readable was not found: Check executor failed
You need to execute 'CREATE EXTENSION gpss;
' in current database.
Error message 2:
Parse data failed Invalid CP1 magic byte 123, expected 0
The data written into Kafka is not in the format of Avro or the Avro data is invalid.
updated the link for doc