'Logstash JSON from multi source to single source
I have the following log file coming from filebeat to losgstash as a JSON(name of the file that is being harvested through the file is named: application.log ), When I see the file in open-search I get each line as a separate message.
I want each "block" starting with delimiter (can be start with }{ and ends with }{ ) to be a separate _source, instead of each line to be an independent _source.
right now this is what I am getting in the response:
{
"_index": "spark-cluster-bravo-2022.05.15",
"_type": "_doc",
"_id": "EK4FyIABzZJwTsgsGfzY",
"_version": 1,
"_score": null,
"_source": {
"message": " \"message\" : \"Waiting to receive Application Queue Message on smc_queue......0.0.10-SNAPSHOT\",",
"@version": "1",
"shortHostname": "shahar1",
"log": {
"file": {
"path": "/mnt/nfs/var/nfsshare/logs/Spark-Cluster-bravo/reduce-component.log"
}
},
"@timestamp": "2022-05-15T14:00:19.042Z",
"host": {
"name": "10.192.20.124"
},
"input": {},
"tags": [
"beats_input_codec_plain_applied"
],
"Environment": {
"name": "Spark-Cluster-bravo"
},
"ecs": {},
"agent": {}
},
"fields": {
"@timestamp": [
"2022-05-15T14:00:19.042Z"
]
},
"sort": [
1652623219042
]
},
{
"_index": "spark-cluster-bravo-2022.05.15",
"_type": "_doc",
"_id": "Ha4FyIABzZJwTsgsGfzY",
"_version": 1,
"_score": null,
"_source": {
"message": " \"context\" : \"default\"",
"@version": "1",
"shortHostname": "shahar1",
"log": {
"file": {
"path": "/mnt/nfs/var/nfsshare/logs/Spark-Cluster-bravo/reduce-component.log"
}
},
"@timestamp": "2022-05-15T14:00:19.042Z",
"host": {
"name": "10.192.20.124"
},
"input": {},
"tags": [
"beats_input_codec_plain_applied"
],
"Environment": {
"name": "Spark-Cluster-bravo"
},
"ecs": {},
"agent": {}
},
"fields": {
"@timestamp": [
"2022-05-15T14:00:19.042Z"
]
},
"sort": [
1652623219042
]
},
This is a snippet from the log file that filebeat is scanning:
}{
"timestamp" : "2022-05-15 13:17:00",
"level" : "ERROR",
"thread" : "main",
"logger" : "com.crossix.safemine.cloud.SMCFlow",
"message" : "[reduce-jane-phi-no-stats_120_34_1652620469879] Fail SMCFlow on(1) ",
"context" : "default"
}{
"timestamp" : "2022-05-15 13:17:00",
"level" : "ERROR",
"thread" : "main",
"logger" : "com.crossix.safemine.cloud.utils.JmsUtils",
"message" : "failed to handle request {\"sentTime\":\"May 15, 2022 1:17:00 PM\",\"messageType\":\"SMC_REQUEST\",\"taskID\":120,\"taskName\":\"reduce-jane-phi-no-stats\",\"flowID\":34,\"flowMessage\":{\"configName\":\"reduce-jane-phi-no-stats\",\"description\":\"create PHI record for Jane\",\"inputFiles\":[\"s3://smc-input/input\"],\"outputFolder\":\"s3://smc/results/assaf/\",\"fileFlow\":{\"configName\":\"reduce-jane-phi-no-stats\",\"flowActions\":[\"NORMALIZE_ROWS\",\"CREATE_NON_PHI\",\"CREATE_PHI\"],\"headerLines\":0,\"delimiter\":\"|\",\"readPartitioned\":false,\"selectedFields\":[\"CLAIM_NUMBER\",\"PATIENT_LNAME\",\"PATIENT_FNAME\",\"PATIENT_DOB\",\"PATIENT_GENDER\",\"PATIENT_ZIP\",\"SUBSCRIBER_ID\",\"GROUP_NUMBER\",\"GROUP_NAME\",\"SUBSCRIBER_FNAME\",\"SUBSCRIBER_LNAME\",\"SUBSCRIBER_GENDER\",\"SUBSCRIBER_DOB\",\"PATIENT_ADDRESS_LINE1\",\"PATIENT_ADDRESS_LINE2\",\"PATIENT_ADR_CITY\",\"PATIENT_ID\",\"PATIENT_CONTROL\"],\"normalizers\":[{\"normalizerClass\":\"SfmHashDSNormalizer\",\"inColumns\":[\"CLAIM_NUMBER\"],\"outColumn\":\"CLAIM_NUMBER\"},{\"normalizerClass\":\"SfmHashDSNormalizer\",\"inColumns\":[\"PATIENT_CONTROL\"],\"outColumn\":\"PATIENT_CONTROL\"},{\"normalizerClass\":\"CanonicalFieldsNormalizer\",\"inColumns\":[\"PATIENT_LNAME\"],\"outColumn\":\"LastName\",\"params\":\"map/identity_keys_fields.csv\"},{\"normalizerClass\":\"CanonicalFieldsNormalizer\",\"inColumns\":[\"PATIENT_FNAME\"],\"outColumn\":\"FirstName\",\"params\":\"map/identity_keys_fields.csv\"},{\"normalizerClass\":\"CanonicalFieldsNormalizer\",\"inColumns\":[\"PATIENT_DOB\"],\"outColumn\":\"DOB\",\"params\":\"map/identity_keys_fields.csv\"},{\"normalizerClass\":\"CanonicalFieldsNormalizer\",\"inColumns\":[\"PATIENT_ZIP\"],\"outColumn\":\"ZipCode\",\"params\":\"map/identity_keys_fields.csv\"},{\"normalizerClass\":\"CanonicalFieldsNormalizer\",\"inColumns\":[\"SUBSCRIBER_ID\"],\"outColumn\":\"CardHolderID\",\"params\":\"map/identity_keys_fields.csv\"},{\"normalizerClass\":\"EmptyValueDSNormalizer\",\"inColumns\":[\"SM_RID\"],\"outColumn\":\"SM_PID\"},{\"normalizerClass\":\"SfmHashDSNormalizer\",\"inColumns\":[\"PATIENT_ID\"],\"outColumn\":\"Hashed_SupplierPatientID\"},{\"normalizerClass\":\"ConditionalDSNormalizer\",\"inColumns\":[\"PATIENT_GENDER\"],\"outColumn\":\"Gender\",\"params\":\"[U,U][0,U][u,U][M,M][m,M][1,M][F,F][f,F][2,F][default,empty]\"},{\"normalizerClass\":\"DateFormatterDSNormalizer\",\"inColumns\":[\"PATIENT_DOB\"],\"outColumn\":\"DOBYear\",\"params\":\"[yyyyMMdd][yyyy]\"},{\"normalizerClass\":\"DOBYearDSNormalizer\",\"inColumns\":[\"DOBYear\"],\"outColumn\":\"DOBYear\"},{\"normalizerClass\":\"TruncateValueDSNormalizer\",\"inColumns\":[\"PATIENT_ZIP\"],\"outColumn\":\"Zip3\",\"params\":\"3\"},{\"normalizerClass\":\"ClearForbiddenInFileDSNormalizer\",\"inColumns\":[\"Zip3\"],\"outColumn\":\"Zip3\",\"params\":\"[map/ZipCode3-layout-exclusions.map][,]\"},{\"normalizerClass\":\"EmptyValueDSNormalizer\",\"inColumns\":[\"CardHolderID\"],\"outColumn\":\"Hashed_CardHolderID\"}],\"phiFields\":[\"SM_RID\",\"SM_PID\",\"Hashed_SupplierPatientID\",\"Gender\",\"DOBYear\",\"Zip3\",\"Hashed_CardHolderID\"],\"transactionFields\":[\"CLAIM_NUMBER\",\"GROUP_NUMBER\",\"GROUP_NAME\",\"PATIENT_CONTROL\"],\"validations\":{\"headerColumnsCount\":18,\"fileConvention\":[\"\\\\d{4}\\\\d{2}\\\\d{2}_Claims_US_CF_Hash_File.*\"],\"fileNameDateRegex\":\"\\\\d{4}\\\\d{2}\\\\d{2}\",\"fileNameDateFormat\":\"yyyyMMdd\"}},\"inputRootDir\":\"s3://smc-input/input\",\"ingestFolder\":\"hash\",\"dsId\":\"JN\",\"nodeAlias\":\"Jane\"},\"configName\":\"smc-jane1\",\"version\":100}, sending message to error queue with error message: failed to handle request: Exception- Class: [class com.crossix.safemine.cloud.utils.SafemineCloudException]\nException Msg: [[reduce-jane-phi-no-stats_120_34_1652620469879]Fail to execute SMCFlow]\n--> com.crossix.safemine.cloud.SMCFlow.process(SMCFlow.java:184)\n--> com.crossix.safemine.cloud.SMCFlow.execute(SMCFlow.java:77)\n--> com.crossix.safemine.cloud.SMCApplication.run(SMCApplication.java:114)\n--> com.crossix.safemine.cloud.SMCApplication.main(SMCApplication.java:32)\n",
"context" : "default"
}{
"timestamp" : "2022-05-15 13:17:00",
"level" : "INFO",
"thread" : "main",
"logger" : "com.crossix.safemine.cloud.SMCApplication",
"message" : "Waiting to receive Application Queue Message on smc_queue......0.0.10-SNAPSHOT",
"context" : "default"
}
I've tried to use the JSON parser Logstash plugin but it had no effect.
This is my logstash configuration:
input {
beats {
port => 5044
}
gelf {
use_tcp => true
port => 9000
}
}
## Add your filters / logstash plugins configuration here
filter {
if [log][file][path] =~ "reduce-component" {
mutate {
add_field => { "shortHostname" => "shawarma" }
}
}
}
output {
opensearch {
hosts => ["https://csxlogs:443"]
index => "spark-cluster-bravo-%{+yyyy.MM.dd}"
ssl_certificate_verification => true
}
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|

