OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

InfluxDB: Check for existing data before writing to database

  • Thread starter Thread starter p.luck
  • Start date Start date
P

p.luck

Guest
Goal:

When writing time series data from a pandas dataframe to an InfluxDB bucket, to check whether a specific row of data already exists in the bucket (and thus prevent data from being written again).

Format of the time series data that exists in the pandas dataframe (sample):

Code:
epoch,open,high,low,close,volume
1332374520.0,2.341,2.341,2.341,2.341,1.0
1332374700.0,2.343,2.343,2.343,2.343,1.0
1332374940.0,2.344,2.344,2.344,2.344,1.0
1332375420.0,2.344,2.344,2.344,2.344,2.0
1332375660.0,2.344,2.344,2.344,2.344,2.0
1332376080.0,2.344,2.344,2.344,2.344,1.0

Current output:

The current python program, as seen below, isn't detecting that the same data has already been written to the database bucket. If the program is run over and over, the output from print statement should be visible notifying that duplicate data has been detected.

Current program:

Code:
import os
import pandas as pd
from tqdm import tqdm
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryApi

# Example OHLCV data
data = {
    "epoch": [1330902000, 1330902060, 1330902120],
    "open": [2.55, 2.532, 2.537],
    "high": [2.55, 2.538, 2.549],
    "low": [2.521, 2.531, 2.537],
    "close": [2.534, 2.538, 2.548],
    "volume": [150, 69, 38]
}
concat_of_all_dfs = pd.DataFrame(data)

def data_point_exists(epoch, bucket, org):
    query = f'''
    from(bucket: "{bucket}")
      |> range(start: 0)
      |> filter(fn: (r) => r["_measurement"] == "ohlcv")
      |> filter(fn: (r) => r["epoch"] == {epoch})
    '''
    result = query_api.query(org=org, query=query)
    return len(result) > 0

if __name__ == "__main__":

    # Database credentials
    token = os.getenv('INFLUXDB_TOKEN')
    bucket = "bucket_test"
    org = "organisation_test"
    url = "http://localhost:8086"

    # Initialize InfluxDB Client
    client = InfluxDBClient(url=url, token=token, org=org)
    write_api = client.write_api(write_options=SYNCHRONOUS)
    query_api = client.query_api()

    # Write data points one by one
    for index, row in tqdm(concat_of_all_dfs.iterrows(), total=len(concat_of_all_dfs)):
        epoch = row['epoch']
        if not data_point_exists(epoch, bucket, org):
            point = Point("ohlcv") \
                    .field("epoch", row['epoch']) \
                    .field("open", row['open']) \
                    .field("high", row['high']) \
                    .field("low", row['low']) \
                    .field("close", row['close']) \
                    .field("volume", row['volume'])
            write_api.write(bucket=bucket, org=org, record=point)
        else:
            print(f"Data point for epoch {epoch} already exists. Skipping...")

    client.close()

Summary:

Are there any mistakes in the above code that would prevent repeat data from being detected (possibly in the querying function seen via the flux script, or anywhere else)?
<p><strong>Goal:</strong></p>
<p>When writing <code>time series</code> data from a <code>pandas</code> dataframe to an <code>InfluxDB</code> <code>bucket</code>, to check whether a specific row of data already exists in the <code>bucket</code> (and thus prevent data from being written again).</p>
<p>Format of the <code>time series</code> data that exists in the pandas dataframe (sample):</p>
<pre><code>epoch,open,high,low,close,volume
1332374520.0,2.341,2.341,2.341,2.341,1.0
1332374700.0,2.343,2.343,2.343,2.343,1.0
1332374940.0,2.344,2.344,2.344,2.344,1.0
1332375420.0,2.344,2.344,2.344,2.344,2.0
1332375660.0,2.344,2.344,2.344,2.344,2.0
1332376080.0,2.344,2.344,2.344,2.344,1.0
</code></pre>
<p><strong>Current output:</strong></p>
<p>The current <code>python</code> program, as seen below, isn't detecting that the same data has already been written to the <code>database</code> <code>bucket</code>. If the program is run over and over, the output from <code>print</code> statement should be visible notifying that duplicate data has been detected.</p>
<p><strong>Current program:</strong></p>
<pre><code>import os
import pandas as pd
from tqdm import tqdm
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryApi

# Example OHLCV data
data = {
"epoch": [1330902000, 1330902060, 1330902120],
"open": [2.55, 2.532, 2.537],
"high": [2.55, 2.538, 2.549],
"low": [2.521, 2.531, 2.537],
"close": [2.534, 2.538, 2.548],
"volume": [150, 69, 38]
}
concat_of_all_dfs = pd.DataFrame(data)

def data_point_exists(epoch, bucket, org):
query = f'''
from(bucket: "{bucket}")
|> range(start: 0)
|> filter(fn: (r) => r["_measurement"] == "ohlcv")
|> filter(fn: (r) => r["epoch"] == {epoch})
'''
result = query_api.query(org=org, query=query)
return len(result) > 0

if __name__ == "__main__":

# Database credentials
token = os.getenv('INFLUXDB_TOKEN')
bucket = "bucket_test"
org = "organisation_test"
url = "http://localhost:8086"

# Initialize InfluxDB Client
client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

# Write data points one by one
for index, row in tqdm(concat_of_all_dfs.iterrows(), total=len(concat_of_all_dfs)):
epoch = row['epoch']
if not data_point_exists(epoch, bucket, org):
point = Point("ohlcv") \
.field("epoch", row['epoch']) \
.field("open", row['open']) \
.field("high", row['high']) \
.field("low", row['low']) \
.field("close", row['close']) \
.field("volume", row['volume'])
write_api.write(bucket=bucket, org=org, record=point)
else:
print(f"Data point for epoch {epoch} already exists. Skipping...")

client.close()
</code></pre>
<p><strong>Summary:</strong></p>
<p>Are there any mistakes in the above code that would prevent repeat data from being detected (possibly in the <code>querying</code> function seen via the <code>flux</code> script, or anywhere else)?</p>
 

Latest posts

A
Replies
0
Views
1
Adrian-Mihai Enache
A
Top