How to use the forward to load the Kafka's Avro data on a different VLAN
search cancel

How to use the forward to load the Kafka's Avro data on a different VLAN

book

Article ID: 296465

calendar_today

Updated On:

Products

VMware Tanzu Greenplum

Issue/Introduction

Before reading this article, please make sure you have a good understanding of Kafka, Greenplum Database (GPDB) and Greenplum Stream Server (GPSS). if you are not familiar with these concepts, refer to the following documentation: https://gpdb.docs.pivotal.io/5210/greenplum-kafka/load-key-value-example.html

The example explained in this documentation is a great resource: https://gpdb.docs.pivotal.io/5210/greenplum-kafka/load-key-value-example.html

Consider we have an environment that has three different hosts: GPDB <==> ETL <==> Kafka

Based on the above documentation, the GPDB can communicate with ETL server directly. However, it can not directly communicate with Kafka, as it is in another VLAN. On the other hand, to load the avro files, GPDB needs to connect to Kafka's schema registry service. This is used for retrieving schemas that describe the data models for the kafka messages. In this scenario, the only option left for us is to use the port-forward function in Linux to make sure it can be connected by GPDB. 

Environment

Product Version: 5.21

Resolution

1. Set up GPSS. Refer to the following:

https://docs.vmware.com/en/VMware-Greenplum-Streaming-Server/1.11/greenplum-streaming-server/load-json-example-gpss.html 


2. Set up the YAML file to load the data similar to the following:

DATABASE: analytics

USER: kafkagpuser

PASSWORD: zzz

HOST: gpdb

PORT: 5432

VERSION: 2

KAFKA:

   INPUT:

     SOURCE:

        BROKERS: kakfka:9092

        TOPIC: jiff_pre.jiff_pre.external_members

     KEY:

        COLUMNS:

          - NAME: id

            TYPE: json

        FORMAT: avro

        AVRO_OPTION:

          SCHEMA_REGISTRY_ADDR: http://ETL:8080

     VALUE:

        COLUMNS:

          - NAME: rec

            TYPE: json

        FORMAT: avro

        AVRO_OPTION:

          SCHEMA_REGISTRY_ADDR: http://ETL:8080

     ERROR_LIMIT: 0

   OUTPUT:

     SCHEMA: kafka_raw

     TABLE: jsontest

     MAPPING:

        - NAME: id

          EXPRESSION: (id->>'id')::int

        - NAME: rec

          EXPRESSION: rec

   COMMIT:

     MAX_ROW: 1000

     MINIMAL_INTERVAL: 1000

3. Create the target table in the GPDB:

psql analytics

    CREATE SCHEMA kafka_raw;

SET search_path TO kafka_raw;

CREATE TABLE jsontest(id int,rec json);

CREATE EXTENSION gpss;

4. Set up port forwarding using the socat command in the ETL server:

socat TCP-LISTEN:8080,fork TCP:kafka:8081

With the above command, the ETL server would forward every traffic on ETL8080 to Kafka 8081 port, which is used for AVRO files. And ETL server would be act as the Kafka registry schema server 


5. Use the curl command to test if port forwarding is set up properly: 

curl -X GET http://ETL:8080/subjects 

6. In Kafka, create the topic similarly to below:

kafka-avro-console-producer --broker-list kafka:9092 --topic jiff_pre.jiff_pre.external_members \

  --property schema.registry.url=http://kafka:8081 \

  --property parse.key=true \

  --property key.schema='{"type":"record","name":"keyrecord","fields":[{"name":"id","type":"int"}]}' \

  --property value.schema='{"type":"record","name":"valuerecord","fields":[{"name":"f1","type":"string"}]}'

In the shell, type in the following:

{"id":1}\t{"f1":"value1"}

{“id”:1} is the data of key

{"f1":"value1"} is the data of value

\t means pressing TAB

7. In the Kafka server, check the following command to check if the data has been written into Kafka:

kafka-avro-console-consumer --topic jiff_pre.jiff_pre.external_members \

--bootstrap-server kafka:9092 \

--property schema.registry.url=http://kafka:8081 \

--property print.key=true  --from-beginning

8. Run the following job:

gpsscli load custerm_avro.yaml

Reference: https://docs.vmware.com/en/VMware-Greenplum-Streaming-Server/1.11/greenplum-streaming-server/ref-gpsscli-load.html


Appendix

Some other error messages you might seen while configuring this:


Error message 1:

panic:gpss:gpadmin:sdw2:009230-[CRITICAL]:-pq: formatter function "kafka_in" of type readable was not found: Check executor failed

You need to execute 'CREATE EXTENSION gpss;' in current database.


Error message 2:

Parse data failed Invalid CP1 magic byte 123, expected 0

The data written into Kafka is not in the format of Avro or the Avro data is invalid.

 

Additional Information

updated the link for doc