Загрузка данных из Kafka

Пример кода Python для загрузки данных в Tengri из Kafka.

Шаг 1: Функция для считывания сообщений

Описываем функцию для считывания сообщений из Kafka и загрузки их в S3 в виде JSON.

В среде должен быть установлен пакет kafka-python.
import time
import itertools
import io
import json
import os
import boto3
from datetime import date
from kafka import KafkaConsumer

def consume_topic(
    broker_list: list[str], topic_list: list[str], conn_info: dict,
    consumer_group: str, timeout: int, max_messages: int,
    file_path: str,
):
    print(f"Listening for messages on topics: {topic_list}")

    consumer = KafkaConsumer(
        *topic_list,
        **conn_info,
        bootstrap_servers=broker_list,
        auto_offset_reset='earliest',
        enable_auto_commit=False,
        group_id=consumer_group,
    )
    try:
        file = io.BytesIO()
        start_time = time.time()

        while max_messages > 0 and (time.time() - start_time) < timeout:

            records_map = consumer.poll(
                timeout_ms=5000,
                max_records=max_messages,
            )
            cnt = sum(len(messages) for messages in records_map.values())
            print(f"next batch polled: {cnt}")

            for message in itertools.chain(*records_map.values()):
                max_messages -= 1

                header = json.dumps({
                    "batch": int(start_time),
                    "topic": message.topic,
                    "partition": message.partition,
                    "offset": message.offset,
                    "timestamp": message.timestamp,
                })
                file.write(header.encode('ascii')[:-1] + b',' + message.value[1:])
                file.write(b'\n')

        file.seek(0)

        s3bucket = os.environ["TNGRI_S3_BUCKET_NAME"]
        s3path = f'Stage/{file_path}'
        print(f"write to {s3path}")

        s3 = boto3.client('s3')
        s3.upload_fileobj(file, s3bucket, s3path)
        consumer.commit()
    finally:
        consumer.close()

Шаг 2: Параметры подключения

Задаем параметры подключения к кластеру Kafka, имя файла для сохранения данных и другие параметры и запускаем считывание данных.

TABLE_NAME = 'clickstream'
TOPIC_LIST = ['clickstream_topic1','clickstream_topic2','clickstream_topic3']
BROKER_LIST = ['ip1:9093','ip2:9093','ip3:9093']
CONN_INFO = {
    'security_protocol': 'SASL_PLAINTEXT',
    'sasl_mechanism': 'SCRAM-SHA-256',
    'sasl_plain_username': '*****',
    'sasl_plain_password': '*****',
}

file_path = f'kafka/{TABLE_NAME}/{date.today()}/{int(time.time())}.json'

consume_topic(
    broker_list=BROKER_LIST,
    topic_list=TOPIC_LIST,
    conn_info=CONN_INFO,
    consumer_group='tengri-loader',
    timeout=180,
    max_messages=1000000,
    file_path=file_path,
)

Шаг 3: Создание таблицы и запись данных

С помощью функции tngri.sql создаем таблицу и записываем в нее считанные данные.

В каждой строке файла (file_path) записана структура JSON. Чтобы построчно прочитать текст из файла, используем функцию read_csv. А чтобы распарсить структуру JSON, используем функцию json_extract_string.

import tngri

tngri.sql(f"""
    create table if not exists raw.{TABLE_NAME} (
        "batch"         BIGINT,
        "topic"         VARCHAR,
        "partition"	    BIGINT,
        "offset"        BIGINT,
        "timestamp"	    BIGINT,
        "raw_msg"       VARCHAR
    )
""")

tngri.sql(f"""
    insert into raw.{TABLE_NAME}
    select
        json_extract_string(msg, 'batch')::BIGINT as "batch",
        json_extract_string(msg, 'topic')::VARCHAR as "topic",
        json_extract_string(msg, 'partition')::BIGINT as "partition",
        json_extract_string(msg, 'offset')::BIGINT as "offset",
        json_extract_string(msg, 'timestamp')::BIGINT as "timestamp",
        msg as raw_msg
    from (
        select column0::json as msg
        from read_csv(
            '{file_path}',
            header=false,
            all_varchar=true,
            delim='\\0'
        )
    )
""")