'How to avoid memory leak when using pub sub to call function?
I stuck on performance issue when using pubsub to triggers the function.
//this will call on index.ts
export function downloadService() {
// References an existing subscription
const subscription = pubsub.subscription("DOWNLOAD-sub");
// Create an event handler to handle messages
// let messageCount = 0;
const messageHandler = async (message : any) => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes.type}`);
// "Ack" (acknowledge receipt of) the message
message.ack();
await exportExcel(message);//my function
// messageCount += 1;
};
// Listen for new messages until timeout is hit
subscription.on("message", messageHandler);
}
async function exportExcel(message : any) {
//get data from database
const movies = await Sales.findAll({
attributes: [
"SALES_STORE",
"SALES_CTRNO",
"SALES_TRANSNO",
"SALES_STATUS",
],
raw: true,
});
... processing to excel// 800k rows
... bucket.upload to gcs
}
The function above is working fine if I trigger ONLY one pubsub message. However, the function will hit memory leak issue or database connection timeout issue if I trigger many pubsub message in short period of time.
The problem I found is, first processing havent finish yet but others request from pubsub will straight to call function again and process at the same time.
I have no idea how to resolve this but I was thinking implement the queue worker or google cloud task will solve the problem?
Solution 1:[1]
As mentioned by @chovy in the comments, there is a need to queue up the excelExport function calls since the function's execution is not keeping up with the rate of invocation. One of the modules that can be used to queue function calls is async.
As an alternative, you can employ flow control features on the subscriber side. Data pipelines often receive sporadic spikes in published traffic which can overwhelm subscribers in an effort to catch up. The usual response to high published throughput on a subscription would be to dynamically autoscale subscriber resources to consume more messages. However, this can incur unwanted costs — for instance, you may need to use more VM’s — which can lead to additional capacity planning. Flow control features on the subscriber side can help control the unhealthy behavior of these tasks on the pipeline by allowing the subscriber to regulate the rate at which messages are ingested. Please refer to this blog for more information on flow control features.
Solution 2:[2]
You can run the command using bash (or sh) to use the redirection operator:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class StdErr {
public static void main(String[] args) throws IOException {
ProcessBuilder pb = new ProcessBuilder("sh", "-c", "echo a >&2");
Process p = pb.start();
try (BufferedReader r = new BufferedReader(new InputStreamReader(p.getErrorStream()))) { // I need this to return "a".
r.lines().forEach(System.err::println);
}
}
}
Outputs: a (to the Java process's standard error stream)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | Tim Moore |
