I have created the external stage
in Snowflake
as below:
CREATE or REPLACE STAGE sk_demo_stage
URL='s3://raw-data/'
FILE_FORMAT=(type = csv)
STORAGE_INTEGRATION = s3_int
DIRECTORY = ( ENABLE = TRUE);
For a full load, I am using the below syntax:
COPY INTO sk_demo.sk_schema.sk_employees
FROM @sk_demo_stage/sample/employees;
And since CDC is enabled for MySQL and the first column indicates the type of load, I have modified the query a bit like the below:
COPY INTO sk_demo.sk_schema.sk_employees
FROM (select $2, $3 from @sk_demo_stage/sample/employees);
I created the stream as below:
CREATE STREAM sk_stream ON STAGE sk_demo_stage;
Everything is working fine when I trigger the commands manually. So to automate the process, I created Stream
and Task
like below:
CREATE OR REPLACE TASK load_new_file_data
WAREHOUSE = 'COMPUTE_WH'
SCHEDULE = '1 minute'
COMMENT = 'Process new files on the stage and insert their data into the table.'
WHEN
SYSTEM$STREAM_HAS_DATA('sk_stream')
AS
INSERT INTO sk_demo.sk_schema.sk_employees (
SELECT $2, $3
FROM sk_stream
WHERE METADATA$ACTION='INSERT'
);
I also used the below command to trigger the task:
ALTER TASK load_new_file_data RESUME;
But somehow I see the state of the task as suspended
and I pushed the new records via MySQL but it is not inserted automatically like I was expecting. I want to automate this process. Data files are listed properly when I query the stage.
What am I missing here?
You need to sign in to view this answers
Leave feedback about this