OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Importing directory of files with psycopg2 gives the first data table import over and over but with different updates

  • Thread starter Thread starter aconti74
  • Start date Start date
A

aconti74

Guest
I am still inexperienced with programming as you will see in my code below. I am stuck and can't see why this is happening. I have a directory of text files that I am trying to import into postgres. Each file is in the same format. I am trying to import all files and want to add an extra column with the date of file. I keep getting the last file data repeated but with the dates of the of the other files.

Example:

file_01/01/2024​


Code:
VDS_ID,FWY,co
1001,1,LA
1002,1,LA
1003,2,LA
1004,1,LA
1005,1,LA

file_03/01/2024​


Code:
VDS_ID,FWY,co
1001,1,LA
1002,1,LA
1003,2,LA
1005,1,LA

file_05/01/2024​


Code:
VDS_ID,FWY,co
1001,1,LA
1002,1,LA
1003,2,LA

Result after the import:​


Code:
VDS_ID,FWY,config_date
1001,1,LA,01/01/2024
1002,1,LA,01/01/2024
1003,2,LA,01/01/2024
1001,1,LA,03/01/2024
1002,1,LA,03/01/2024
1003,2,LA,03/01/2024
1001,1,LA,05/01/2024
1002,1,LA,05/01/2024
1003,2,LA,05/01/2024


import psycopg2
from psycopg2.extras import execute_values
import time
import os
import subprocess
import csv

def main():
    mypath = r'C:\Downloads\PEMS\Meta'
    tableName = 'pems_config'
    temptableName = 'temp_pems_config'


    # Connect to an existing database (password blanked out question
    conn = psycopg2.connect(database='Test', user='postgres', password ='****')

    # Open a cursor to perform database operations
    cur = conn.cursor()

    start_time = time.time()
    print ("Start time: {0}", time.asctime(time.localtime(start_time)))

    textfiles = []

    for file in os.listdir(mypath):
        if file.endswith(".txt"):
##           print(os.path.join(mypath, file))
           textfiles.append(file)

    create_table_sql = """CREATE TABLE pems_config
(
    "VDS_ID" integer,
    "Fwy" smallint,
    "Dir" character(1),
    "District" smallint,
    "County" integer,
    "City" integer,
    "State_PM" varchar(9),
    "Abs_PM" real,
    "Latitude" real,
    "Longitude" real,
    "Length" real,
    "Type" character(2),
    "Lanes" smallint,
    "Name" text,
    "User_ID_1" text,
    "User_ID_2" text,
    "User_ID_3" text,
    "User_ID_4" text,
    "pems_config_date" date
);"""

    create_temp_table_sql = """CREATE TEMPORARY TABLE temp_pems_config
    (
        "VDS_ID" integer,
        "Fwy" smallint,
        "Dir" character(1),
        "District" smallint,
        "County" integer,
        "City" integer,
        "State_PM" varchar(9),
        "Abs_PM" real,
        "Latitude" real,
        "Longitude" real,
        "Length" real,
        "Type" character(2),
        "Lanes" smallint,
        "Name" text,
        "User_ID_1" text,
        "User_ID_2" text,
        "User_ID_3" text,
        "User_ID_4" text
    );"""

    drop_table_sql = """DROP TABLE IF EXISTS temp_pems_config"""

    copy_sql = """COPY temp_pems_config FROM stdin WITH DELIMITER '\t' NULL AS '' csv HEADER"""

    add_column_sql = """ALTER TABLE temp_pems_config ADD COLUMN pems_config_date date;"""

    update_date_sql = """UPDATE temp_pems_config SET pems_config_date = %s ;"""

    append_table_sql = """INSERT INTO pems_config select * FROM temp_pems_config;"""

    final_table_sql = """CREATE TABLE final_pems_config AS Select "VDS_ID","Dir","District","County","City","State_PM","Abs_PM","Latitude","Longitude","Length","Type",
    "Lanes","Name","User_ID_1","User_ID_2","User_ID_3","User_ID_4",max("pems_config_date") AS "pems_config_date" from pems_config
    GROUP BY "VDS_ID","Dir","District","County","City","State_PM","Abs_PM","Latitude","Longitude","Length","Type",
    "Lanes","Name","User_ID_1","User_ID_2","User_ID_3","User_ID_4";"""

    cur.execute("SELECT EXISTS (SELECT 1 AS result FROM pg_tables WHERE schemaname = 'public' AND tablename = 'pems_config');")
    tableExists = cur.fetchone()[0]
    print ("{0} Exists: {1}".format(tableName, str(tableExists)))

    if tableExists == False:
        print ("Creating {0} Table:".format(tableName))
        #Execute a command: this creates a new table
        cur.execute(create_table_sql)
        print (cur.statusmessage)
        print ("Time to create {0} Table: {1} s".format(tableName,time.time()-start_time))


    for tfile in textfiles:

        tfile_name = os.path.splitext(tfile)[0]
        config_date = tfile_name[14:18]+"-"+tfile_name[19:21]+"-"+tfile_name[22:25]
##        #check if table exists
        cur.execute("SELECT EXISTS (SELECT 1 AS result FROM pg_tables WHERE schemaname = 'public' AND tablename = 'temp_pems_config');")
        tableExists = cur.fetchone()[0]
        print ("{0} Exists: {1}".format(temptableName, str(tableExists)))
    ##
    ##        #if table does not exist then create it
        if tableExists == False:
           print ("Creating {0} Table:".format(temptableName))
           #Execute a command: this creates a new table
           cur.execute(drop_table_sql)
           cur.execute(create_temp_table_sql)
           print (cur.statusmessage)
           print ("Time to create {0} Table: {1} s".format(tableName,time.time()-start_time))
        else:
            print ("Deleting {0} Table:".format(tableName))
            cur.execute(drop_table_sql)
            print ("Creating {0} Table:".format(tableName))
            cur.execute(create_table_sql)
            print (cur.statusmessage)
            print ("Time to create {0} Table: {1} s".format(tableName,time.time()-start_time))

        with open(os.path.join(mypath,file), 'r') as f:
            copyDataFile_time = time.time()
             #copy zfile into CHTV
            execute_values 
            cur.copy_expert(sql=copy_sql, file=f)
            cur.execute(add_column_sql)
            cur.execute(update_date_sql, (config_date,))
            cur.execute(append_table_sql)
            print ("Time to copy {0} Table: {1} mins".format(tableName,(time.time()-copyDataFile_time)/60))
        print (cur.statusmessage)
    cur.execute(final_table_sql)
    conn.commit()
    cur.close()
    Endtime = time.time()
    print ("Start time: {0}".format(time.asctime(time.localtime(start_time))))
    print ("End time: {0} ".format(time.asctime(time.localtime(Endtime))))
    print ("Total processing time: {0} Minutes".format((Endtime-start_time)/60))

if __name__ == '__main__':
    main()
<p>I am still inexperienced with programming as you will see in my code below. I am stuck and can't see why this is happening. I have a directory of text files that I am trying to import into postgres. Each file is in the same format. I am trying to import all files and want to add an extra column with the date of file. I keep getting the last file data repeated but with the dates of the of the other files.</p>
<p>Example:</p>
<h2>file_01/01/2024</h2>
<pre><code>VDS_ID,FWY,co
1001,1,LA
1002,1,LA
1003,2,LA
1004,1,LA
1005,1,LA
</code></pre>
<h2>file_03/01/2024</h2>
<pre><code>VDS_ID,FWY,co
1001,1,LA
1002,1,LA
1003,2,LA
1005,1,LA
</code></pre>
<h2>file_05/01/2024</h2>
<pre><code>VDS_ID,FWY,co
1001,1,LA
1002,1,LA
1003,2,LA
</code></pre>
<h2>Result after the import:</h2>
<pre><code>VDS_ID,FWY,config_date
1001,1,LA,01/01/2024
1002,1,LA,01/01/2024
1003,2,LA,01/01/2024
1001,1,LA,03/01/2024
1002,1,LA,03/01/2024
1003,2,LA,03/01/2024
1001,1,LA,05/01/2024
1002,1,LA,05/01/2024
1003,2,LA,05/01/2024


import psycopg2
from psycopg2.extras import execute_values
import time
import os
import subprocess
import csv

def main():
mypath = r'C:\Downloads\PEMS\Meta'
tableName = 'pems_config'
temptableName = 'temp_pems_config'


# Connect to an existing database (password blanked out question
conn = psycopg2.connect(database='Test', user='postgres', password ='****')

# Open a cursor to perform database operations
cur = conn.cursor()

start_time = time.time()
print ("Start time: {0}", time.asctime(time.localtime(start_time)))

textfiles = []

for file in os.listdir(mypath):
if file.endswith(".txt"):
## print(os.path.join(mypath, file))
textfiles.append(file)

create_table_sql = """CREATE TABLE pems_config
(
"VDS_ID" integer,
"Fwy" smallint,
"Dir" character(1),
"District" smallint,
"County" integer,
"City" integer,
"State_PM" varchar(9),
"Abs_PM" real,
"Latitude" real,
"Longitude" real,
"Length" real,
"Type" character(2),
"Lanes" smallint,
"Name" text,
"User_ID_1" text,
"User_ID_2" text,
"User_ID_3" text,
"User_ID_4" text,
"pems_config_date" date
);"""

create_temp_table_sql = """CREATE TEMPORARY TABLE temp_pems_config
(
"VDS_ID" integer,
"Fwy" smallint,
"Dir" character(1),
"District" smallint,
"County" integer,
"City" integer,
"State_PM" varchar(9),
"Abs_PM" real,
"Latitude" real,
"Longitude" real,
"Length" real,
"Type" character(2),
"Lanes" smallint,
"Name" text,
"User_ID_1" text,
"User_ID_2" text,
"User_ID_3" text,
"User_ID_4" text
);"""

drop_table_sql = """DROP TABLE IF EXISTS temp_pems_config"""

copy_sql = """COPY temp_pems_config FROM stdin WITH DELIMITER '\t' NULL AS '' csv HEADER"""

add_column_sql = """ALTER TABLE temp_pems_config ADD COLUMN pems_config_date date;"""

update_date_sql = """UPDATE temp_pems_config SET pems_config_date = %s ;"""

append_table_sql = """INSERT INTO pems_config select * FROM temp_pems_config;"""

final_table_sql = """CREATE TABLE final_pems_config AS Select "VDS_ID","Dir","District","County","City","State_PM","Abs_PM","Latitude","Longitude","Length","Type",
"Lanes","Name","User_ID_1","User_ID_2","User_ID_3","User_ID_4",max("pems_config_date") AS "pems_config_date" from pems_config
GROUP BY "VDS_ID","Dir","District","County","City","State_PM","Abs_PM","Latitude","Longitude","Length","Type",
"Lanes","Name","User_ID_1","User_ID_2","User_ID_3","User_ID_4";"""

cur.execute("SELECT EXISTS (SELECT 1 AS result FROM pg_tables WHERE schemaname = 'public' AND tablename = 'pems_config');")
tableExists = cur.fetchone()[0]
print ("{0} Exists: {1}".format(tableName, str(tableExists)))

if tableExists == False:
print ("Creating {0} Table:".format(tableName))
#Execute a command: this creates a new table
cur.execute(create_table_sql)
print (cur.statusmessage)
print ("Time to create {0} Table: {1} s".format(tableName,time.time()-start_time))


for tfile in textfiles:

tfile_name = os.path.splitext(tfile)[0]
config_date = tfile_name[14:18]+"-"+tfile_name[19:21]+"-"+tfile_name[22:25]
## #check if table exists
cur.execute("SELECT EXISTS (SELECT 1 AS result FROM pg_tables WHERE schemaname = 'public' AND tablename = 'temp_pems_config');")
tableExists = cur.fetchone()[0]
print ("{0} Exists: {1}".format(temptableName, str(tableExists)))
##
## #if table does not exist then create it
if tableExists == False:
print ("Creating {0} Table:".format(temptableName))
#Execute a command: this creates a new table
cur.execute(drop_table_sql)
cur.execute(create_temp_table_sql)
print (cur.statusmessage)
print ("Time to create {0} Table: {1} s".format(tableName,time.time()-start_time))
else:
print ("Deleting {0} Table:".format(tableName))
cur.execute(drop_table_sql)
print ("Creating {0} Table:".format(tableName))
cur.execute(create_table_sql)
print (cur.statusmessage)
print ("Time to create {0} Table: {1} s".format(tableName,time.time()-start_time))

with open(os.path.join(mypath,file), 'r') as f:
copyDataFile_time = time.time()
#copy zfile into CHTV
execute_values
cur.copy_expert(sql=copy_sql, file=f)
cur.execute(add_column_sql)
cur.execute(update_date_sql, (config_date,))
cur.execute(append_table_sql)
print ("Time to copy {0} Table: {1} mins".format(tableName,(time.time()-copyDataFile_time)/60))
print (cur.statusmessage)
cur.execute(final_table_sql)
conn.commit()
cur.close()
Endtime = time.time()
print ("Start time: {0}".format(time.asctime(time.localtime(start_time))))
print ("End time: {0} ".format(time.asctime(time.localtime(Endtime))))
print ("Total processing time: {0} Minutes".format((Endtime-start_time)/60))

if __name__ == '__main__':
main()
</code></pre>
 

Latest posts

Top