OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

RAM buildup after incrementally extracting data, preprocess and fe

  • Thread starter Thread starter AIS TESS
  • Start date Start date
A

AIS TESS

Guest
I have one year worth of transaction data inside a mssql database and table. I've extracted the table day by day incrementally using pyodbc, then proceed to preprocess the extracted data, and then feature engineer them and then save the data to a csv file. As an example, the ranges of data inside the transaction data is 2022-01-01 until 2022-12-31. As the date goes up one by one, the process of table extraction takes longer and longer. During the extraction at date 2022-01-01, it will only take 10 seconds of extraction, but at 2022-01-29, it will take more than 3 hours of extraction. I don't know the exact reason as to why this happened, and how to solve it. my current solution is to stop the python program at 9pm, and then a task scheduler will start the python program again the next day from a checkpoint.

My main.py

Code:
def should_stop_execution():
    current_time = datetime.now().time()
    stop_time = datetime.strptime('21:00', '%H:%M').time()
    return current_time >= stop_time

# Checkpoint Functions
def save_checkpoint(last_processed_date, checkpoint_file='last_processed_date.txt'):
    with open(checkpoint_file, 'w') as f:
        f.write(last_processed_date.strftime('%Y%m%d'))

def load_checkpoint(checkpoint_file='last_processed_date.txt'):
    try:
        with open(checkpoint_file, 'r') as f:
            return datetime.strptime(f.read().strip(), '%Y%m%d')
    except FileNotFoundError:
        return None

@log_to_file('AIS_log.log')
def main(server, database, table, start_date_str, end_date_str):
    cif_path = "./Data/raw/cif"
    acc_path = "./Data/raw/acc"
    utils.delete_folder_contents(cif_path)
    ais_read_save_data.cifextraction_eod(server, database, cif_table)
    utils.delete_folder_contents(acc_path)
    ais_read_save_data.accextraction_eod(server, database, acc_table)

    # Attempt to load from checkpoint
    checkpoint_start_date = load_checkpoint()
    if checkpoint_start_date:
        start_date = checkpoint_start_date
    else:
        start_date = pd.to_datetime(start_date_str, format='%Y%m%d')
    end_date = pd.to_datetime(end_date_str, format='%Y%m%d')
    
    print(start_date)
    print(end_date)
    all_time = []

    try:
        while start_date <= end_date:
            # Check if it's time to stop
            if should_stop_execution():
                print("Stopping execution at", datetime.now())
                break
            
            start = time.time()
            txn_path = "./data/raw/txn/"
            utils.delete_folder_contents(txn_path)
            txn = ais_read_save_data.bftextraction_initial(server, database, table, start_date) 
            
            # Check if it's time to stop
            if should_stop_execution():
                print("Stopping execution at", datetime.now())
                break
            
            
            print('read save complete')
            if txn is not None:
                del txn
                # Start processing - Assuming ais_eod_fe.main() is another process you want to call
                ais_eod_fe.main()

            end = time.time() - start
            print(end)
            all_time.append((str(start_date), end))

            # Save checkpoint after successful day processing
            save_checkpoint(start_date)
            
            # Move to next date
            start_date += timedelta(days=1)
            
    except Exception as e:
        print(e)
    finally:
        df = pd.Series(all_time)
        df.to_csv('timelog.csv', index=False)


if __name__ == '__main__':
    main(server, database, bftranhist, bftranhist_startdate, bftranhist_enddate)

My extraction.py

Code:
def bftextraction_initial(server, database, table, start_date, save_to_raw=True):
    parquet_path = f"{parent_dir}/data/raw/txn/"
    if not user:
        SQL_SERVER_ENGINE_URL = f"mssql+pyodbc:///?odbc_connect={urllib.parse.quote_plus('DRIVER={SQL Server};SERVER=' + server + ';DATABASE=' + database + ';Trusted_Connection=yes;')}"
    else:
        SQL_SERVER_ENGINE_URL = f"mssql+pyodbc:///?odbc_connect={urllib.parse.quote_plus(f'DRIVER={{SQL Server}};SERVER={server};DATABASE={database};UID={user};PWD={password};')}"
    engine = create_engine(SQL_SERVER_ENGINE_URL)

    columns_to_select = [
        'TRAN_NO',
        'SEQ_AUTO',
        'ACC_NO',
        'NO_CIF',
        'TRAN_PDATE',
        'TRAN_CODE1',
        'PROD_TYPE',
        'TRAN_LOC',
        'AMT_CR',
        'AMT_DR',
        'TRAN_TYPE',
        'AMT_CUR',
        'AMT_RATE',
        'SEND_NAME',
        'BENE_NAME',
        'TRAN_CODE2',
        'TRAN_CHAN',
        'SEND_BCTY',
        'BENE_BCTY',
        'UNIT_PRICE'
    ]

    columns_str = ', '.join(columns_to_select)
    
    end_date_int = datetime.strftime(start_date, '%Y%m%d')
    print(end_date_int)
    # REMOVE TOP 100000 AFTER TEST
    stmt = text(f"SELECT {columns_str} from {urllib.parse.quote_plus(table)} where TRAN_PDATE = {end_date_int}")

    try:
        with engine.connect() as conn:
            df = pd.read_sql(stmt, conn)
            if df is not None:
                print(df.shape)
                df = df.drop_duplicates(['TRAN_NO', 'SEQ_AUTO', 'ACC_NO', 'NO_CIF', 'TRAN_PDATE'])
                # print(df.shape)
                if save_to_raw:
                    dh.df_to_parquet(df, parquet_path)

            return df
        
    finally:
        engine.dispose()
        gc.collect()

If someone could help me, that would be great. Thank you so much
<p>I have one year worth of transaction data inside a mssql database and table. I've extracted the table day by day incrementally using pyodbc, then proceed to preprocess the extracted data, and then feature engineer them and then save the data to a csv file. As an example, the ranges of data inside the transaction data is 2022-01-01 until 2022-12-31. As the date goes up one by one, the process of table extraction takes longer and longer. During the extraction at date 2022-01-01, it will only take 10 seconds of extraction, but at 2022-01-29, it will take more than 3 hours of extraction. I don't know the exact reason as to why this happened, and how to solve it. my current solution is to stop the python program at 9pm, and then a task scheduler will start the python program again the next day from a checkpoint.</p>
<p>My main.py</p>
<pre><code>def should_stop_execution():
current_time = datetime.now().time()
stop_time = datetime.strptime('21:00', '%H:%M').time()
return current_time >= stop_time

# Checkpoint Functions
def save_checkpoint(last_processed_date, checkpoint_file='last_processed_date.txt'):
with open(checkpoint_file, 'w') as f:
f.write(last_processed_date.strftime('%Y%m%d'))

def load_checkpoint(checkpoint_file='last_processed_date.txt'):
try:
with open(checkpoint_file, 'r') as f:
return datetime.strptime(f.read().strip(), '%Y%m%d')
except FileNotFoundError:
return None

@log_to_file('AIS_log.log')
def main(server, database, table, start_date_str, end_date_str):
cif_path = "./Data/raw/cif"
acc_path = "./Data/raw/acc"
utils.delete_folder_contents(cif_path)
ais_read_save_data.cifextraction_eod(server, database, cif_table)
utils.delete_folder_contents(acc_path)
ais_read_save_data.accextraction_eod(server, database, acc_table)

# Attempt to load from checkpoint
checkpoint_start_date = load_checkpoint()
if checkpoint_start_date:
start_date = checkpoint_start_date
else:
start_date = pd.to_datetime(start_date_str, format='%Y%m%d')
end_date = pd.to_datetime(end_date_str, format='%Y%m%d')

print(start_date)
print(end_date)
all_time = []

try:
while start_date <= end_date:
# Check if it's time to stop
if should_stop_execution():
print("Stopping execution at", datetime.now())
break

start = time.time()
txn_path = "./data/raw/txn/"
utils.delete_folder_contents(txn_path)
txn = ais_read_save_data.bftextraction_initial(server, database, table, start_date)

# Check if it's time to stop
if should_stop_execution():
print("Stopping execution at", datetime.now())
break


print('read save complete')
if txn is not None:
del txn
# Start processing - Assuming ais_eod_fe.main() is another process you want to call
ais_eod_fe.main()

end = time.time() - start
print(end)
all_time.append((str(start_date), end))

# Save checkpoint after successful day processing
save_checkpoint(start_date)

# Move to next date
start_date += timedelta(days=1)

except Exception as e:
print(e)
finally:
df = pd.Series(all_time)
df.to_csv('timelog.csv', index=False)


if __name__ == '__main__':
main(server, database, bftranhist, bftranhist_startdate, bftranhist_enddate)
</code></pre>
<p>My extraction.py</p>
<pre><code>def bftextraction_initial(server, database, table, start_date, save_to_raw=True):
parquet_path = f"{parent_dir}/data/raw/txn/"
if not user:
SQL_SERVER_ENGINE_URL = f"mssql+pyodbc:///?odbc_connect={urllib.parse.quote_plus('DRIVER={SQL Server};SERVER=' + server + ';DATABASE=' + database + ';Trusted_Connection=yes;')}"
else:
SQL_SERVER_ENGINE_URL = f"mssql+pyodbc:///?odbc_connect={urllib.parse.quote_plus(f'DRIVER={{SQL Server}};SERVER={server};DATABASE={database};UID={user};PWD={password};')}"
engine = create_engine(SQL_SERVER_ENGINE_URL)

columns_to_select = [
'TRAN_NO',
'SEQ_AUTO',
'ACC_NO',
'NO_CIF',
'TRAN_PDATE',
'TRAN_CODE1',
'PROD_TYPE',
'TRAN_LOC',
'AMT_CR',
'AMT_DR',
'TRAN_TYPE',
'AMT_CUR',
'AMT_RATE',
'SEND_NAME',
'BENE_NAME',
'TRAN_CODE2',
'TRAN_CHAN',
'SEND_BCTY',
'BENE_BCTY',
'UNIT_PRICE'
]

columns_str = ', '.join(columns_to_select)

end_date_int = datetime.strftime(start_date, '%Y%m%d')
print(end_date_int)
# REMOVE TOP 100000 AFTER TEST
stmt = text(f"SELECT {columns_str} from {urllib.parse.quote_plus(table)} where TRAN_PDATE = {end_date_int}")

try:
with engine.connect() as conn:
df = pd.read_sql(stmt, conn)
if df is not None:
print(df.shape)
df = df.drop_duplicates(['TRAN_NO', 'SEQ_AUTO', 'ACC_NO', 'NO_CIF', 'TRAN_PDATE'])
# print(df.shape)
if save_to_raw:
dh.df_to_parquet(df, parquet_path)

return df

finally:
engine.dispose()
gc.collect()
</code></pre>
<p>If someone could help me, that would be great. Thank you so much</p>
 
Top