'Read a CSV file having unknown number of columns in flink

I need to read a CSV file using Flink file source. I am using the below code to read it:

    final TypeInformation[] fieldTypes = IntStream.range(0, 4)
        .mapToObj(i -> BasicTypeInfo.STRING_TYPE_INFO)
        .toArray(TypeInformation[]::new);

    RowCsvInputFormat rowCsvInputFormat =
        new RowCsvInputFormat(new Path(lookupPath), fieldTypes,
            System.getProperty(LOOKUP_RECORD_SEPARATOR, LookupSeparators.LINE_SEPARATOR.getSeparator()),
            lookUpProcessingData.getDelimiter().toString());
    rowCsvInputFormat.setSkipFirstLineAsHeader(true);

    DataStream<Row> lookupStream =
        Context.getEnvironment()
            .readFile(
                rowCsvInputFormat,
                lookupPath
                //+ "/"
                , FileProcessingMode.PROCESS_CONTINUOUSLY,
                refreshIntervalinMS);

In the above code I am specifying that the number of columns in my Row would be 4. But my problem is that I would not be knowing the number of columns in a CSV file beforehand.

Although my type for each column would be String, but number of fields are unknown.

Is there a way I can provide dynamic number of columns in RowCsvInputFormat?

I also tried TextInputFormat & split the line based on my CSV delimiter, but it does not have setSkipFirstLineAsHeader API.

How would simultaneously split my record based on a delimiter & also use setSkipFirstLineAsHeader API without knowing the number of columns in CSV file beforehand?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source