OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

How to get pd.Dataframe from ClickHouseHook()?

  • Thread starter Thread starter John Doe
  • Start date Start date
J

John Doe

Guest
I got this code in my DAG for Airflow:

Code:
import pandas as pd
import datetime
import io
import httpx
from airflow.decorators import dag, task
from airflow.models import Variable
from clickhouse_driver import Client
from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook
from airflow.models import Connection
from airflow.utils.db import create_session


default_args = {
    'owner': 'd-chernovol',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2024, 6, 20)
}
schedule_interval = '*/20 * * * *'
host = Variable.get('host')
database_name = Variable.get('database_name')
user_name = Variable.get('user_name')
password_for_db = Variable.get('password_for_db')
bot_token = Variable.get('bot_token')
chat_id = Variable.get('chat_id')
connections_name = 'clickhouse_default'
try:
    # create Connection
    clickhouse_conn = Connection(
        conn_id=connections_name,
        conn_type='ClickHouse',
        host=host,
        schema=database_name,
        login=user_name,
        password=password_for_db,
        port=8443
    )
    # create connection
    with create_session() as session:
        session.add(clickhouse_conn)
        session.commit()
    # make ClickHouseHook
    ch_hook = ClickHouseHook(clickhouse_conn_id=connections_name)
except:
    ch_hook = ClickHouseHook(clickhouse_conn_id=connections_name)

@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False, concurrency=3)
def test_dag_4():
    @task
    def get_table_from_db(sql_query):
        result = ch_hook.execute(sql_query)
        return result

    @task
    def send_msg(bot_token: str, chat_id: str, message: str):
        # send_message
        url = f'https://api.telegram.org/bot{bot_token}/sendMessage?chat_id={chat_id}&text={message}'
        client = httpx.Client()
        return client.post(url)

get_table_from_db_task >> send_msg_task
st_dag = test_dag_4()

My code is working, but get_table_from_db(sql_query) returns:

Code:
class 'airflow.models.xcom_arg.PlainXComArg'>

If I convert result to string I get list with values from table:

Code:
[[111, 'name_1'], [222, 'name_2'], [222, 'name_3']]

But I need to get table from my database as pandas.Dataframe. How I can achieve it? Or I should write function for this? The main problem, that I get only values without column names.
<p>I got this code in my DAG for Airflow:</p>
<pre><code>import pandas as pd
import datetime
import io
import httpx
from airflow.decorators import dag, task
from airflow.models import Variable
from clickhouse_driver import Client
from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook
from airflow.models import Connection
from airflow.utils.db import create_session


default_args = {
'owner': 'd-chernovol',
'depends_on_past': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2024, 6, 20)
}
schedule_interval = '*/20 * * * *'
host = Variable.get('host')
database_name = Variable.get('database_name')
user_name = Variable.get('user_name')
password_for_db = Variable.get('password_for_db')
bot_token = Variable.get('bot_token')
chat_id = Variable.get('chat_id')
connections_name = 'clickhouse_default'
try:
# create Connection
clickhouse_conn = Connection(
conn_id=connections_name,
conn_type='ClickHouse',
host=host,
schema=database_name,
login=user_name,
password=password_for_db,
port=8443
)
# create connection
with create_session() as session:
session.add(clickhouse_conn)
session.commit()
# make ClickHouseHook
ch_hook = ClickHouseHook(clickhouse_conn_id=connections_name)
except:
ch_hook = ClickHouseHook(clickhouse_conn_id=connections_name)

@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False, concurrency=3)
def test_dag_4():
@task
def get_table_from_db(sql_query):
result = ch_hook.execute(sql_query)
return result

@task
def send_msg(bot_token: str, chat_id: str, message: str):
# send_message
url = f'https://api.telegram.org/bot{bot_token}/sendMessage?chat_id={chat_id}&text={message}'
client = httpx.Client()
return client.post(url)

get_table_from_db_task >> send_msg_task
st_dag = test_dag_4()
</code></pre>
<p>My code is working, but get_table_from_db(sql_query) returns:</p>
<pre><code>class 'airflow.models.xcom_arg.PlainXComArg'>
</code></pre>
<p>If I convert result to string I get list with values from table:</p>
<pre><code>[[111, 'name_1'], [222, 'name_2'], [222, 'name_3']]
</code></pre>
<p>But I need to get table from my database as pandas.Dataframe.
How I can achieve it? Or I should write function for this?
The main problem, that I get only values without column names.</p>
 

Latest posts

A
Replies
0
Views
1
Adrian-Mihai Enache
A
Top