'How to avoid memory leak when using pub sub to call function?

I stuck on performance issue when using pubsub to triggers the function.

//this will call on index.ts
export function downloadService() {
  // References an existing subscription
  const subscription = pubsub.subscription("DOWNLOAD-sub");

  // Create an event handler to handle messages
  // let messageCount = 0;
  const messageHandler = async (message : any) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes.type}`);

    // "Ack" (acknowledge receipt of) the message
    message.ack();
    await exportExcel(message);//my function
    // messageCount += 1;
  };


  // Listen for new messages until timeout is hit
  subscription.on("message", messageHandler);
} 

async function exportExcel(message : any) {
 //get data from database
 const movies = await Sales.findAll({
    attributes: [
      "SALES_STORE",
      "SALES_CTRNO",
      "SALES_TRANSNO",
      "SALES_STATUS",
    ],
    raw: true,
  });
  ... processing to excel// 800k rows
  ... bucket.upload to gcs

}

The function above is working fine if I trigger ONLY one pubsub message. However, the function will hit memory leak issue or database connection timeout issue if I trigger many pubsub message in short period of time.

The problem I found is, first processing havent finish yet but others request from pubsub will straight to call function again and process at the same time.

I have no idea how to resolve this but I was thinking implement the queue worker or google cloud task will solve the problem?



Solution 1:[1]

As mentioned by @chovy in the comments, there is a need to queue up the excelExport function calls since the function's execution is not keeping up with the rate of invocation. One of the modules that can be used to queue function calls is async.

As an alternative, you can employ flow control features on the subscriber side. Data pipelines often receive sporadic spikes in published traffic which can overwhelm subscribers in an effort to catch up. The usual response to high published throughput on a subscription would be to dynamically autoscale subscriber resources to consume more messages. However, this can incur unwanted costs — for instance, you may need to use more VM’s — which can lead to additional capacity planning. Flow control features on the subscriber side can help control the unhealthy behavior of these tasks on the pipeline by allowing the subscriber to regulate the rate at which messages are ingested. Please refer to this blog for more information on flow control features.

Solution 2:[2]

You can run the command using bash (or sh) to use the redirection operator:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class StdErr {
    public static void main(String[] args) throws IOException {
        ProcessBuilder pb = new ProcessBuilder("sh", "-c", "echo a >&2");
        Process p = pb.start();
        try (BufferedReader r = new BufferedReader(new InputStreamReader(p.getErrorStream()))) { // I need this to return "a".
            r.lines().forEach(System.err::println);
        }
    }
}

Outputs: a (to the Java process's standard error stream)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Tim Moore