Data upload from Kafka

Sample code Python to load data into Tengri from Kafka.

Step 1: Function for reading messages

We describe a function to read messages from Kafka and load them into S3 as JSON.

The kafka-python package must be installed in the environment.
import time
import itertools
import io
import json
import os
import boto3
from datetime import date
from kafka import KafkaConsumer

def consume_topic(
    broker_list: list[str], topic_list: list[str], conn_info: dict,
    consumer_group: str, timeout: int, max_messages: int,
    file_path: str,
):
    print(f"Listening for messages on topics: {topic_list}")

    consumer = KafkaConsumer(
        *topic_list,
        **conn_info,
        bootstrap_servers=broker_list,
        auto_offset_reset='earliest',
        enable_auto_commit=False,
        group_id=consumer_group,
    )
    try:
        file = io.BytesIO()
        start_time = time.time()

        while max_messages > 0 and (time.time() - start_time) < timeout:

            records_map = consumer.poll(
                timeout_ms=5000,
                max_records=max_messages,
            )
            cnt = sum(len(messages) for messages in records_map.values())
            print(f"next batch polled: {cnt}")

            for message in itertools.chain(*records_map.values()):
                max_messages -= 1

                header = json.dumps({
                    "batch": int(start_time),
                    "topic": message.topic,
                    "partition": message.partition,
                    "offset": message.offset,
                    "timestamp": message.timestamp,
                })
                file.write(header.encode('ascii')[:-1] + b',' + message.value[1:])
                file.write(b'\n')

        file.seek(0)

        s3bucket = os.environ["TNGRI_S3_BUCKET_NAME"]
        s3path = f'Stage/{file_path}'
        print(f"write to {s3path}")

        s3 = boto3.client('s3')
        s3.upload_fileobj(file, s3bucket, s3path)
        consumer.commit()
    finally:
        consumer.close()

Step 2: Connection parameters

Set the parameters of connection to the cluster Kafka, file name for data saving and other parameters and start data reading.

TABLE_NAME = 'clickstream'
TOPIC_LIST = ['clickstream_topic1','clickstream_topic2','clickstream_topic3']
BROKER_LIST = ['ip1:9093','ip2:9093','ip3:9093']
CONN_INFO = {
    'security_protocol': 'SASL_PLAINTEXT',
    'sasl_mechanism': 'SCRAM-SHA-256',
    'sasl_plain_username': '*****',
    'sasl_plain_password': '*****',
}

file_path = f'kafka/{TABLE_NAME}/{date.today()}/{int(time.time())}.json'

consume_topic(
    broker_list=BROKER_LIST,
    topic_list=TOPIC_LIST,
    conn_info=CONN_INFO,
    consumer_group='tengri-loader',
    timeout=180,
    max_messages=1000000,
    file_path=file_path,
)

Step 3: Creating table and writing the data

Use the tngri.sql function to create a table and write the read data into it.

Each line of the file (file_path) contains the structure JSON. To read the text from the file line by line, we use the read_csv function. And to parse the structure JSON, we use the function json_extract_string.

import tngri

tngri.sql(f"""
    create table if not exists raw.{TABLE_NAME} (
        "batch"         BIGINT,
        "topic"         VARCHAR,
        "partition"	    BIGINT,
        "offset"        BIGINT,
        "timestamp"	    BIGINT,
        "raw_msg"       VARCHAR
    )
""")

tngri.sql(f"""
    insert into raw.{TABLE_NAME}
    select
        json_extract_string(msg, 'batch')::BIGINT as "batch",
        json_extract_string(msg, 'topic')::VARCHAR as "topic",
        json_extract_string(msg, 'partition')::BIGINT as "partition",
        json_extract_string(msg, 'offset')::BIGINT as "offset",
        json_extract_string(msg, 'timestamp')::BIGINT as "timestamp",
        msg as raw_msg
    from (
        select column0::json as msg
        from read_csv(
            '{file_path}',
            header=false,
            all_varchar=true,
            delim='\\0'
        )
    )
""")