OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

PyAirbyte - Load S3 files from a specific prefix

  • Thread starter Thread starter Bakerstreet Pi
  • Start date Start date
B

Bakerstreet Pi

Guest
I'm using Pyairbyte and Airflow to load data from AWS S3 into Bigquery. This is my task code:

Code:
    @task.external_python(python='/usr/local/airflow/.pyairbyte-venv/bin/python')
    def extract():
        import airbyte as ab
        #from airbyte.caches import BigQueryCache
        
        source = ab.get_source(
            "source-s3",
            config={
                "bucket": "my-bucket",
                "region_name": "us-east-1",
                "path_prefix": "src_data",
                "streams": [
                        {
                        "name": "transaction",
                        "format": {
                            "filetype": "csv"
                        }
                    }
                ],
                "aws_access_key_id": "",
                "aws_secret_access_key": ""
            },
            install_if_missing=True,
        )
        source.check()
        source.select_all_streams()
        result = source.read()

On my bucket, I've multiple folders and each folder can have different file types. What I'm trying to do is copy only the files from my src_data folder (that only contains CSV files). However, I got an error because it tries to search for all the files from the bucket and not only inside the prefix specified. Is there anyone who has the same problem? How did you solve this?

I already try multiple options like adding the "streams.globs": ["src_data/*.csv"] inside streams config but without success
<p>I'm using Pyairbyte and Airflow to load data from AWS S3 into Bigquery. This is my task code:</p>
<pre><code> @task.external_python(python='/usr/local/airflow/.pyairbyte-venv/bin/python')
def extract():
import airbyte as ab
#from airbyte.caches import BigQueryCache

source = ab.get_source(
"source-s3",
config={
"bucket": "my-bucket",
"region_name": "us-east-1",
"path_prefix": "src_data",
"streams": [
{
"name": "transaction",
"format": {
"filetype": "csv"
}
}
],
"aws_access_key_id": "",
"aws_secret_access_key": ""
},
install_if_missing=True,
)
source.check()
source.select_all_streams()
result = source.read()
</code></pre>
<p>On my bucket, I've multiple folders and each folder can have different file types. What I'm trying to do is copy only the files from my src_data folder (that only contains CSV files). However, I got an error because it tries to search for all the files from the bucket and not only inside the prefix specified. Is there anyone who has the same problem? How did you solve this?</p>
<p>I already try multiple options like adding the <code>"streams.globs": ["src_data/*.csv"]</code> inside streams config but without success</p>
 

Latest posts

V
Replies
0
Views
1
vincentsty
V
S
Replies
0
Views
1
Sergey Bakaev Rettley
S
K
Replies
0
Views
1
Kώστας Κούδας
K
Top