'File Path Filter for SFTP plugin in Data Fusion

I'm using the SFTP plugin in GC Data Fusion to download financial reports from the payment gateway opearator that are being stored as csv files on a server.

I'm trying to avoid uploading all the files to save some costs for data processing and it turned out that the plugin offers something like 'File Path Filter' that looks like this: enter image description here

There's no specific example on how to use this property in the documentation and nothing I've been trying actually works.

Files are stored and named like the following:

SettlementInvoice GS Companyname Limited 190024000478 20220315.csv

So my assumption was that I could use:

/20220315/g

Where I could then use macro for 20220315 with the current date from another source (plugin supports macros).

However, after testing it doesn't work and the plugin ignores whatever I input in the filter. This is probably due to the syntax but I can't find any example of how to properly use this option. I would appreciate any suggestion on the matter.



Solution 1:[1]

I was able to find the pattern that can be used to filter out the files. The pattern has been constructed using the rules I mentioned in the comments.

I used the below Java program to test the pattern and subsequently deploy a sample Data Fusion pipeline with the SFTP actions plugin from the CDAP Hub. The pipeline copies files from an SFTP source to GCS.

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class sftpCopyPluginRegex {
    public static void main(String[] args) {
        
        Pattern p = Pattern.compile("[a-zA-Z0-9 ]*20220320[.a-z]*");

        /*
            Explanation: 
            [a-zA-Z0-9 ]* => Allows lower-case and upper-case alphabets and numbers with zero or more instances
            20220320 => File name to match
            [.a-z]* => Allows lower-case alphabets and "." punctuation with zero or more instances
        */ 
        
        Matcher m = p.matcher("SettlementInvoice GS Companyname Limited 190024000478 20220320.csv"); // File name to match
        // Regex with macros ? [a-zA-Z0-9 ]*${file.name}[.a-z]*

        boolean b = m.matches();
        System.out.println(b);

    }
}

The regex used is [a-zA-Z0-9 ]*${file.name}[.a-z]* and it filtered the files based on the date provided in the macro. The Data Fusion pipeline JSON can be found below. To test this pipeline you can import the JSON file and edit the file system properties as needed. The Data Fusion instance version is 6.4.1.

{
    "artifact": {
        "name": "cdap-data-pipeline",
        "version": "6.4.1",
        "scope": "SYSTEM"
    },
    "description": "Data Pipeline Application",
    "name": "sftp-to-gcs_v1",
    "config": {
        "resources": {
            "memoryMB": 2048,
            "virtualCores": 1
        },
        "driverResources": {
            "memoryMB": 2048,
            "virtualCores": 1
        },
        "connections": [],
        "comments": [],
        "postActions": [],
        "properties": {},
        "processTimingEnabled": true,
        "stageLoggingEnabled": false,
        "stages": [
            {
                "name": "SFTPCopyTest",
                "plugin": {
                    "name": "SFTPCopy",
                    "type": "action",
                    "label": "SFTPCopyTest",
                    "artifact": {
                        "name": "sftp-actions",
                        "version": "1.5.1",
                        "scope": "USER"
                    },
                    "properties": {
                        "host": "<host-name>",
                        "port": "22",
                        "userName": "<user-name>",
                        "Authentication": "password-select",
                        "srcDirectory": "<sftp-source-directory>",
                        "destDirectory": "gs://<bucket-name>/<path-inside-the-bucket>",
                        "variableNameHoldingFileList": "sftp.copied.file.names",
                        "extractZipFiles": "false",
                        "password": "password",
                        "fileSystemProperties": "fs.gs.impl=>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem,fs.AbstractFileSystem.gs.impl=>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS,fs.gs.project.id=><project-id>,fs.gs.system.bucket=><bucket-name>,fs.gs.path.encoding=><path-inside-the-bucket>,fs.gs.working.dir=>/,fs.gs.impl.disable.cache=>true",
                        "fileNameRegex": "[a-zA-Z0-9 ]*${file.name}[.a-z]*"
                    }
                },
                "outputSchema": [
                    {
                        "name": "etlSchemaBody",
                        "schema": ""
                    }
                ],
                "id": "SFTPCopyTest"
            }
        ],
        "schedule": "0 * * * *",
        "engine": "spark",
        "numOfRecordsPreview": 100,
        "description": "Data Pipeline Application",
        "maxConcurrentRuns": 1
    }
}

Note: According to this JIRA, it is suggested to use action plugins instead of source plugins since the SFTP/FTP source plugins are set to be removed in the future.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1