Data upload from Kafka
Sample code Python to load data into Tengri from Kafka.
Step 1: Function for reading messages
We describe a function to read messages from Kafka and load them into S3 as JSON.
The kafka-python package must be installed in the environment.
|
import time
import itertools
import io
import json
import os
import boto3
from datetime import date
from kafka import KafkaConsumer
def consume_topic(
broker_list: list[str], topic_list: list[str], conn_info: dict,
consumer_group: str, timeout: int, max_messages: int,
file_path: str,
):
print(f"Listening for messages on topics: {topic_list}")
consumer = KafkaConsumer(
*topic_list,
**conn_info,
bootstrap_servers=broker_list,
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id=consumer_group,
)
try:
file = io.BytesIO()
start_time = time.time()
while max_messages > 0 and (time.time() - start_time) < timeout:
records_map = consumer.poll(
timeout_ms=5000,
max_records=max_messages,
)
cnt = sum(len(messages) for messages in records_map.values())
print(f"next batch polled: {cnt}")
for message in itertools.chain(*records_map.values()):
max_messages -= 1
header = json.dumps({
"batch": int(start_time),
"topic": message.topic,
"partition": message.partition,
"offset": message.offset,
"timestamp": message.timestamp,
})
file.write(header.encode('ascii')[:-1] + b',' + message.value[1:])
file.write(b'\n')
file.seek(0)
s3bucket = os.environ["TNGRI_S3_BUCKET_NAME"]
s3path = f'Stage/{file_path}'
print(f"write to {s3path}")
s3 = boto3.client('s3')
s3.upload_fileobj(file, s3bucket, s3path)
consumer.commit()
finally:
consumer.close()
Step 2: Connection parameters
Set the parameters of connection to the cluster Kafka, file name for data saving and other parameters and start data reading.
TABLE_NAME = 'clickstream'
TOPIC_LIST = ['clickstream_topic1','clickstream_topic2','clickstream_topic3']
BROKER_LIST = ['ip1:9093','ip2:9093','ip3:9093']
CONN_INFO = {
'security_protocol': 'SASL_PLAINTEXT',
'sasl_mechanism': 'SCRAM-SHA-256',
'sasl_plain_username': '*****',
'sasl_plain_password': '*****',
}
file_path = f'kafka/{TABLE_NAME}/{date.today()}/{int(time.time())}.json'
consume_topic(
broker_list=BROKER_LIST,
topic_list=TOPIC_LIST,
conn_info=CONN_INFO,
consumer_group='tengri-loader',
timeout=180,
max_messages=1000000,
file_path=file_path,
)
Step 3: Creating table and writing the data
Use the tngri.sql function to create a table and write the read data into it.
Each line of the file (file_path) contains the structure JSON. To read the text from the file line by line, we use the read_csv function. And to parse the structure JSON, we use the function json_extract_string.
import tngri
tngri.sql(f"""
create table if not exists raw.{TABLE_NAME} (
"batch" BIGINT,
"topic" VARCHAR,
"partition" BIGINT,
"offset" BIGINT,
"timestamp" BIGINT,
"raw_msg" VARCHAR
)
""")
tngri.sql(f"""
insert into raw.{TABLE_NAME}
select
json_extract_string(msg, 'batch')::BIGINT as "batch",
json_extract_string(msg, 'topic')::VARCHAR as "topic",
json_extract_string(msg, 'partition')::BIGINT as "partition",
json_extract_string(msg, 'offset')::BIGINT as "offset",
json_extract_string(msg, 'timestamp')::BIGINT as "timestamp",
msg as raw_msg
from (
select column0::json as msg
from read_csv(
'{file_path}',
header=false,
all_varchar=true,
delim='\\0'
)
)
""")