OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Nodejs and Bull queue - Issue with moveToFailed function reprocessing jobs and blocking queue

  • Thread starter Thread starter Haris Ali
  • Start date Start date
H

Haris Ali

Guest
I was dabbling into bull queue because I was going to incorporate it into my project. The scenario in my project is that every job can be retried once (i.e. if it fails a second time, it should be removed from the queue). The job is basically handling some API call to a third party server which may or may not respond always. Which is why I have a setInterval to check if certain time has elapsed since job started processing. I have to fail the job and send it to retry if this is the first attempt otherwise remove it/mark as failed. Below is a minimum test code for the issue I am facing. Basically, moveToFailed function takes the job into waiting state (which doesn't make sense? Should it not move to failed state?) It then retries the job once more after which no other job is being processed.

I have to manually call moveToFailed and/or moveToCompleted because of my desired functionality. If there's something I should do differently please do let me know as it is pretty essential I incorporate this feature.

Code:
import Bull from "bull";
import dotenv from "dotenv";

dotenv.config();
const { REDIS_HOST, REDIS_PORT } = process.env;

const jobInactivityMap = new Map();
const jobTimeout = 180000; // 3 Minutes

const queueOptions = {
  redis: { host: REDIS_HOST, port: REDIS_PORT },
  defaultJobOptions: {
    attempts: 2,
  },
};

const testQueue = new Bull("testQueue", queueOptions);

testQueue.process(async (payload, done) => {
    console.log(`Processing job ${payload.id} with data: ${JSON.stringify(payload.data)}`);
});

testQueue.on('completed', (job, result) => {
    console.log(`Job ${job.id} completed with result ${result}`);
});

testQueue.on('failed', (job, err) => {
    console.log(`Job ${job.id} failed with error ${err.message || err}`);
});

testQueue.on("active", (job) => {
    console.log("jobInActivityMap", jobInactivityMap);
    jobInactivityMap.set(job.id, Date.now());
    console.log("jobInActivityMap", jobInactivityMap);
});

const jobs = [...new Array(3)].map((_) => ({
    text: "Hello World!"
}));

jobs.forEach((job) => testQueue.add(job));

setInterval(async () => {
    const currentTimestamp = Date.now();
    console.log('jobInactivityMap inside setInterval', jobInactivityMap)
    for (const [jobId, startTime] of jobInactivityMap.entries()) {
        // Check if job elapsed time is greater than or equal to 3 minutes
        console.log(`Job ID: ${jobId} with start time: ${startTime}`);
        if (currentTimestamp - startTime >= jobTimeout) {
            console.log(`Job elapsed time is greater than job timeout`);
            const job = await testQueue.getJob(jobId);
            if (job && job.attemptsMade === 0) {
                // If this is first attempt, retry the job
                await job.moveToFailed(new Error('first attempt failed'), true);
                jobInactivityMap.set(jobId, Date.now());
            } else {
                // Job has reached/is on max attempts or has received no response
                console.log(`Job's last attempt`);
                jobInactivityMap.delete(jobId);
                await job.discard();
            }
        }
    }
}, 60000);

I am using bull version 4.12.9

<p>I was dabbling into bull queue because I was going to incorporate it into my project. The scenario in my project is that every job can be retried once (i.e. if it fails a second time, it should be removed from the queue). The job is basically handling some API call to a third party server which may or may not respond always. Which is why I have a setInterval to check if certain time has elapsed since job started processing. I have to fail the job and send it to retry if this is the first attempt otherwise remove it/mark as failed. Below is a minimum test code for the issue I am facing. Basically, <strong>moveToFailed</strong> function takes the job into waiting state (which doesn't make sense? Should it not move to failed state?) It then retries the job once more after which no other job is being processed.</p>
<p>I have to manually call <strong>moveToFailed</strong> and/or <strong>moveToCompleted</strong> because of my desired functionality. If there's something I should do differently please do let me know as it is pretty essential I incorporate this feature.</p>
<pre><code>import Bull from "bull";
import dotenv from "dotenv";

dotenv.config();
const { REDIS_HOST, REDIS_PORT } = process.env;

const jobInactivityMap = new Map();
const jobTimeout = 180000; // 3 Minutes

const queueOptions = {
redis: { host: REDIS_HOST, port: REDIS_PORT },
defaultJobOptions: {
attempts: 2,
},
};

const testQueue = new Bull("testQueue", queueOptions);

testQueue.process(async (payload, done) => {
console.log(`Processing job ${payload.id} with data: ${JSON.stringify(payload.data)}`);
});

testQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result ${result}`);
});

testQueue.on('failed', (job, err) => {
console.log(`Job ${job.id} failed with error ${err.message || err}`);
});

testQueue.on("active", (job) => {
console.log("jobInActivityMap", jobInactivityMap);
jobInactivityMap.set(job.id, Date.now());
console.log("jobInActivityMap", jobInactivityMap);
});

const jobs = [...new Array(3)].map((_) => ({
text: "Hello World!"
}));

jobs.forEach((job) => testQueue.add(job));

setInterval(async () => {
const currentTimestamp = Date.now();
console.log('jobInactivityMap inside setInterval', jobInactivityMap)
for (const [jobId, startTime] of jobInactivityMap.entries()) {
// Check if job elapsed time is greater than or equal to 3 minutes
console.log(`Job ID: ${jobId} with start time: ${startTime}`);
if (currentTimestamp - startTime >= jobTimeout) {
console.log(`Job elapsed time is greater than job timeout`);
const job = await testQueue.getJob(jobId);
if (job && job.attemptsMade === 0) {
// If this is first attempt, retry the job
await job.moveToFailed(new Error('first attempt failed'), true);
jobInactivityMap.set(jobId, Date.now());
} else {
// Job has reached/is on max attempts or has received no response
console.log(`Job's last attempt`);
jobInactivityMap.delete(jobId);
await job.discard();
}
}
}
}, 60000);
</code></pre>
<p>I am using bull version <strong>4.12.9</strong></p>
 

Latest posts

Top