'Read a CSV file having unknown number of columns in flink
I need to read a CSV file using Flink file source. I am using the below code to read it:
final TypeInformation[] fieldTypes = IntStream.range(0, 4)
.mapToObj(i -> BasicTypeInfo.STRING_TYPE_INFO)
.toArray(TypeInformation[]::new);
RowCsvInputFormat rowCsvInputFormat =
new RowCsvInputFormat(new Path(lookupPath), fieldTypes,
System.getProperty(LOOKUP_RECORD_SEPARATOR, LookupSeparators.LINE_SEPARATOR.getSeparator()),
lookUpProcessingData.getDelimiter().toString());
rowCsvInputFormat.setSkipFirstLineAsHeader(true);
DataStream<Row> lookupStream =
Context.getEnvironment()
.readFile(
rowCsvInputFormat,
lookupPath
//+ "/"
, FileProcessingMode.PROCESS_CONTINUOUSLY,
refreshIntervalinMS);
In the above code I am specifying that the number of columns in my Row would be 4. But my problem is that I would not be knowing the number of columns in a CSV file beforehand.
Although my type for each column would be String, but number of fields are unknown.
Is there a way I can provide dynamic number of columns in RowCsvInputFormat?
I also tried TextInputFormat & split the line based on my CSV delimiter, but it does not have setSkipFirstLineAsHeader API.
How would simultaneously split my record based on a delimiter & also use setSkipFirstLineAsHeader API without knowing the number of columns in CSV file beforehand?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
