<Niek/>

Arrow downAll posts

Run asynchronous tasks in batches in NodeJS

29 Nov 2020

Say you have a large list of items and you want to run an asynchronous task for every one of them. You can run these tasks one at a time, in parallel or in batches. In this blog post, I try to explain one way of how to run asynchronous tasks in batches.

Task

To run tasks in batches we need a task. So let's create a task that creates a Promise and waits between 0 and 5 seconds to solve it.

function task() {
  return new Promise((resolve) => {
    setTimeout(resolve, Math.floor(Math.random() * 5000 + 1));
  });
}

Run in parallel

Now we need a function which we can feed a list of items and run the task we just created for every item:

function executeTasksConcurrently(list) {
  for (const item of list) {
    task();
  }
}

When we feed this function a list of numbers it will run the task() function for each item in parallel.

To show in which order tasks run and finish we can add some console.log statements. Check the following code and console output.

Code:

function task(item) {
  return new Promise((resolve) => {
    setTimeout(() => {
      console.log(`End task: ${item}`);
      resolve();
    }, Math.floor(Math.random() * 5000 + 1));
  });
}

const list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

async function executeTasksConcurrently(list) {
  for (const item of list) {
    console.log(`Start task: ${item}`);
    task(item);
  }
}

executeTasksConcurrently(list);

Console output:

$ yarn start
Start task: 1
Start task: 2
Start task: 3
Start task: 4
Start task: 5
Start task: 6
Start task: 7
Start task: 8
Start task: 9
Start task: 10
End task: 7
End task: 9
End task: 6
End task: 2
End task: 3
End task: 8
End task: 5
End task: 10
End task: 1
End task: 4
✨  Done in 5.12s.

Run in batches

To run tasks in batches we first need to keep track of the currently active tasks. This list should be updated when starting a task and when a task is finished.

async function executeTasksConcurrently(list) {
  let activeTasks = [];

  for (const item of list) {
    console.log(`Start task: ${item}`);
    const activeTask = task()
      .then(() => {
        activeTasks.splice(activeTasks.indexOf(activeTask), 1);
        console.log(`End task: ${item}`);
      })
      .catch(() => {
        activeTasks.splice(activeTasks.indexOf(activeTask), 1);
        console.log(`End task: ${item}`);
      });
    activeTasks.push(activeTask);
  }
}

Now we have to decide how many tasks we want to run concurrently. In this example, we will allow three tasks to run concurrently. Then we need to force our for loop to wait until a task is finished before starting a new task when the amount of currently active tasks matches this limit. Since our activeTasks list is an array of promises we can use Promise.race to check which task finishes first.

async function executeTasksConcurrently(
  list,
  concurrencyLimit = 3
) {
  let activeTasks = [];

  for (const item of list) {
    if (activeTasks.length >= concurrencyLimit) {
      await Promise.race(activeTasks);
    }

    console.log(`Start task: ${item}`);
    const activeTask = task()
      .then(() => {
        activeTasks.splice(activeTasks.indexOf(activeTask), 1);
        console.log(`End task: ${item}`);
      })
      .catch(() => {
        activeTasks.splice(activeTasks.indexOf(activeTask), 1);
        console.log(`End task: ${item}`);
      });
    activeTasks.push(activeTask);
  }
}

Check out the complete code and console output when we run the script now:

Code:

function task() {
  return new Promise((resolve) => {
    setTimeout(resolve, Math.floor(Math.random() * 5000 + 1));
  });
}

const list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

async function executeTasksConcurrently(
  list,
  concurrencyLimit: number = 3
) {
  let activeTasks = [];

  for (const item of list) {
    if (activeTasks.length >= concurrencyLimit) {
      await Promise.race(activeTasks);
    }

    console.log(`Start task: ${item}`);
    const activeTask = task()
      .then(() => {
        activeTasks.splice(activeTasks.indexOf(activeTask), 1);
        console.log(`End task: ${item}`);
      })
      .catch(() => {
        activeTasks.splice(activeTasks.indexOf(activeTask), 1);
        console.log(`End task: ${item}`);
      });
    activeTasks.push(activeTask);
  }
}

executeTasksConcurrently(list);

Console output:

$ yarn start
Start task: 1
Start task: 2
Start task: 3
End task: 3
Start task: 4
End task: 2
Start task: 5
End task: 1
Start task: 6
End task: 4
Start task: 7
End task: 5
Start task: 8
End task: 8
Start task: 9
End task: 6
Start task: 10
End task: 10
End task: 7
End task: 9
✨  Done in 11.27s.

Conclusion

Running tasks in batches can help to prevent an overload of your resources and is fairly simple to implement. If you don't want to maintain or write this code yourself you can use third-party libraries that have implemented this pattern. For example Supercharge's Promise Pool.

If you want to run this script yourself you can find the code on GitHub.

If you have any questions or feedback feel free to comment or contact me on Twitter!