Load EBCDIC encoding data through gpss, kafka, to greenplum.
search cancel

Load EBCDIC encoding data through gpss, kafka, to greenplum.

book

Article ID: 417333

calendar_today

Updated On:

Products

VMware Tanzu Data Suite

Issue/Introduction

Some users may need to load EBCDIC encoding data to greenplum, using gpss and kafka. Since greenplum doesn't support EBCDIC encoding, we can only try some workaround.

Environment

GPDB 7.5.1

GPSS 2.2

kakfa 2.13-2.8.0

zookeeper (provided by kafka)

Cause

Currently greenplum or postgresql database doesn't provide official support for EBCDIC encoding data, means from database level, it can not directly read data with this format. If users want to read the original message from database side, there are three directions. 

1. Transform the data encoding from EBCDIC to UTF-8, then load data into greenplum. 

2. Load the original data as binary format to greenplum , but each message or data can only be saved in one bytea column. Later users can use Java script to read the binary format data and transform the data encoding.

3. Publish the original EBCDIC encoding data as avro format to kafka (with schema registry), then use gpss to read those message, when save those message to greenplum, it's necessary to save those message as base64 strings in greenplum (use text or varchar data type,  also add "BYTES_TO_BASE64: true" in the gpss config file.). At last, use Java script to read the base64 string and transform to the UTF-8 encoding message.

AVRO_OPTION:
            BYTES_TO_BASE64: true

 

Resolution

Here are a few steps for loading a whole message a binary data to greenplum.

1. download and unzip kakfa 2.13-2.8.0, start zookeeper first, then start kafka. 

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

2. create a UTF8 encoding file, add some message. Next use iconv to transfer encoding from UTF-8 to IBM-930, the reason of choosing IBM-930, is due to current use case related with traditional Chinese character, for different use case, this encoding may change as well.

iconv -f UTF-8 -t IBM-930 utf8_file.txt > ebcdic_file.txt

3. use a python script to publish the message in this file to kafka. " python3 pub.py"

from kafka import KafkaProducer

# read the file
with open('ebcdic_msg.txt', 'rb') as f:
    raw_bytes = f.read()

# create producer,value_serializer=None means "raw bytes"
producer = KafkaProducer(
    bootstrap_servers=['pt1:9092'],
    value_serializer=None  
)

# publish the message
producer.send('top1', value=raw_bytes)
producer.flush()

4. Use a java script to read this message from kafka, transfer message encoding from IBM-930 to UTF8 and then display the original message:

./test_load_java.sh

SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.

 (HEX): 0e494549480fc5c2c3c4c9c3400e486b4bb00f25

🟢  (IBM-930 → UTF-8):
測試EBCDIC 消息

Java script : 
// ConsumeAndDecode930.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import java.nio.charset.Charset;
import java.time.Duration;
import java.util.*;


class ConsumeAndDecode930 {
    static {
        try {
            
            Class.forName("com.ibm.icu.charset.CharsetProvider");
            System.out.println("✅ ICU4J decode loaded");
        } catch (ClassNotFoundException e) {
            System.err.println("❌ not found ICU4J CharsetProvider,please confirm icu4j-xx.jar included in classpath ");
        }
    }

    private static final String TOPIC = "top1"; // ←  topic
    private static final String BOOTSTRAP_SERVERS = "pt1:9092";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "decode-930-group-" + System.currentTimeMillis());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));

        System.out.println("waiting for Kafka message(5s timeout)...");

        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(5));
        if (records.isEmpty()) {
            System.err.println("❌ not message received in 5s。please check topic and Kafka connection.");
            consumer.close();
            return;
        }

        for (ConsumerRecord<byte[], byte[]> record : records) {
            byte[] rawBytes = record.value();
            System.out.println("✅ message received,length: " + rawBytes.length + " bytes");
            System.out.println("original message (HEX): " + bytesToHex(rawBytes));

5. use gpss, setup a gpkafka job to load this message to gpdb. gpss job config file as below .  run this job : "gpkafka load official-test.yaml"

DATABASE: gpadmin
USER: gpadmin
HOST: cent81
PORT: 5472
KAFKA:
  INPUT:
    SOURCE:
      BROKERS: pt1:9092
      TOPIC: top1
    COLUMNS:
      - NAME: k_msg
        TYPE: bytea          # ←   bytea
    FORMAT: binary         # ←  must be  binary(not text/json)
  OUTPUT:
    TABLE: kafka_bytea_table   #  only table name
    MAPPING:
      - NAME: raw_data
        EXPRESSION: k_msg     # 
  COMMIT:
    MINIMAL_INTERVAL: 1000   # 

6. view the table from database.

gpadmin=# select * from kafka_bytea_table;
 id |                  raw_data
----+--------------------------------------------
  1 | \x0e494549480fc5c2c3c4c9c3400e486b4bb00f25
  3 | \x0e494549480fc5c2c3c4c9c3400e486b4bb00f25
  4 | \x0e494549480fc5c2c3c4c9c3400e486b4bb00f25
  5 | \x0e494549480fc5c2c3c4c9c3400e486b4bb00f25
  2 | \x0e494549480fc5c2c3c4c9c3400e486b4bb00f25

7. setup a Java script to read from this table and transfer the encoding from IBM-930 to UTF-8.

// PrintMessageAsHex.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import java.time.Duration;
import java.util.*;

public class PrintMessageAsHex {
    private static final String TOPIC = "top1"; // ←  Kafka topic
    private static final String BOOTSTRAP_SERVERS = "pt1:9092"; // ← kafka server name

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "hex-printer-" + System.currentTimeMillis());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));

        System.out.println("⏳ wait for 10 seconds ...");

        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(10));
        if (records.isEmpty()) {
            System.err.println("❌ timeout, please check server name and port。");
        } else {
            for (ConsumerRecord<byte[], byte[]> record : records) {
                byte[] value = record.value();
                System.out.println("✅ message recieved:");
                System.out.println("   Partition: " + record.partition());
                System.out.println("   Offset:    " + record.offset());
                System.out.println("   Length:    " + (value != null ? value.length : 0) + " 字节");
                System.out.println("   HEX:       " + bytesToHex(value));
                break; // handle the first message
            }
        }

        consumer.close();
    }

   
    private static String bytesToHex(byte[] bytes) {
        if (bytes == null) return "null";
        StringBuilder sb = new StringBuilder();
        for (byte b : bytes) {
            sb.append(String.format("%02x", b & 0xFF));
        }
        return sb.toString();
    }
}
java -cp ".:postgresql-jdbc.jar" EBCDICToUTF8
ID: 2
Decoded UTF-8 text: 測試EBCDIC 消息

---
ID: 3
Decoded UTF-8 text: 測試EBCDIC 消息

---
ID: 4
Decoded UTF-8 text: 測試EBCDIC 消息

---
ID: 5
Decoded UTF-8 text: 測試EBCDIC 消息

---
ID: 1
Decoded UTF-8 text: 測試EBCDIC 消息