'kafka retry many times when i download large file

I am newbie in kafka, i try build a service send mail with attach files. Execution flow:

  • Kafka will receive a message to send mail
  • function get file will download file from url , scale image, and save file
  • when send mail i will get files from folder and attach to form Issues:
  • when i send mail with large files many times , kafka retry many times, i will receive many mail

kafka error: "kafka server: The provided member is not known in the current generation"

I listened MaxProcessingTime , but i try to test a mail with large file, it still work fine

Kafka info : 1 broker , 3 consumer

func (s *customerMailService) SendPODMail() error { filePaths, err := DownloadFiles(podURLs, orderInfo.OrderCode)

if err != nil{
    countRetry := 0
    for countRetry <= NUM_OF_RETRY{
        filePaths, err = DownloadFiles(podURLs, orderInfo.OrderCode)
        if err == nil{
            break
        }
        countRetry++
    }
}

    err = s.sendMailService.Send(ctx, orderInfo.CustomerEmail, tmsPod, content,filePaths)}

function download file :

func DownloadFiles(files []string, orderCode string) ([]string, error) {
var filePaths []string

err := os.Mkdir(tempDir, 0750)
if err != nil && !os.IsExist(err) {
    return nil, err
}

tempDirPath := tempDir + "/" + orderCode
err = os.Mkdir(tempDirPath, 0750)
if err != nil && !os.IsExist(err) {
    return nil, err
}

for _, fileUrl := range files {
    fileUrlParsed, err := url.ParseRequestURI(fileUrl)
    if err != nil {
        logrus.WithError(err).Infof("Pod url is invalid %s", orderCode)
        return nil, err
    }

    extFile := filepath.Ext(fileUrlParsed.Path)
    dir, err := os.MkdirTemp(tempDirPath, "tempDir")

    if err != nil {
        return nil, err
    }

    f, err := os.CreateTemp(dir, "tmpfile-*"+extFile)
    if err != nil {
        return nil, err
    }
    defer f.Close()

    response, err := http.Get(fileUrl)
    if err != nil {
        return nil, err
    }
    defer response.Body.Close()

    contentTypes := response.Header["Content-Type"]
    isTypeAllow := false
    for _, contentType := range contentTypes {
        if contentType == "image/png" || contentType == "image/jpeg" {
            isTypeAllow = true
        }
    }

    if !isTypeAllow {
        logrus.WithError(err).Infof("Pod image type is invalid %s", orderCode)
        return nil, errors.New("Pod image type is invalid")
    }

    decodedImg, err := imaging.Decode(response.Body)
    if err != nil {
        return nil, err
    }

    resizedImg := imaging.Resize(decodedImg, 1024, 0, imaging.Lanczos)

    imaging.Save(resizedImg, f.Name())

    filePaths = append(filePaths, f.Name())
}
return filePaths, nil}

function send mail

func (s *tikiMailService) SendFile(ctx context.Context, receiver string, templateCode string, data interface{}, filePaths []string) error {
path := "/v1/emails"
fullPath := fmt.Sprintf("%s%s", s.host, path)

formValue := &bytes.Buffer{}
writer := multipart.NewWriter(formValue)
_ = writer.WriteField("template", templateCode)
_ = writer.WriteField("to", receiver)

if data != nil {
    b, err := json.Marshal(data)
    if err != nil {
        return errors.Wrapf(err, "Cannot marshal mail data to json with object %+v", data)
    }

    _ = writer.WriteField("params", string(b))
}

for _, filePath := range filePaths {
    part, err := writer.CreateFormFile(filePath, filepath.Base(filePath))

    if err != nil {
        return err
    }


    pipeReader, pipeWriter := io.Pipe()

    go func() {
        defer pipeWriter.Close()

        file, err := os.Open(filePath)
        if err != nil {
            return 
        }
        defer file.Close()

        io.Copy(pipeWriter, file)
    }()

    io.Copy(part, pipeReader)
}

err := writer.Close()
if err != nil {
    return err
}

request, err := http.NewRequest("POST", fullPath, formValue)
if err != nil {
    return err
}
request.Header.Set("Content-Type", writer.FormDataContentType())

resp, err := s.doer.Do(request)
if err != nil {
    return errors.Wrap(err, "Cannot send request to send email")
}
defer resp.Body.Close()

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
    return err
}

if resp.StatusCode != http.StatusOK {
    return errors.New(fmt.Sprintf("Send email with code %s error: status code %d, response %s",
        templateCode, resp.StatusCode, string(b)))
} else {
    logrus.Infof("Send email with attachment ,code %s success with response %s , box-code", templateCode, string(b),filePaths)
}
return nil
}

Thank



Solution 1:[1]

My team found my problem when I redeploy k8s pods, which lead to conflict leader partition causing rebalance. It will try to process the remaining messages in buffer of pods again.

Solution: I don't fetch many messages saved in buffer , I just get a message and process it by config :

ChannelBufferSize = 0

Example conflict leader parition:

consumer A and B startup in the same time
consumer A registers itself as leader, and owns the topic with all partitions
consumer B registers itself as leader, and then begins to rebalance and owns all partitions
consumer A rebalance and obtains all partitions, but can not consume because the memberId is old and need a new one
consumer B rebalance again and owns the topic with all partitions, but it's already obtained by consumer A

Solution 2:[2]

My two cents: in case of very big attachments, the consumer takes quite a lot of time to read the file and to send it as an attachment.

This increases the amount of time between two poll() calls. If that time is greater than max.poll.interval.ms, the consumer is thought to be failed and the partition offset is not committed. As a result, the message is processed again and eventually, if by chance the execution time stays below the poll interval, the offset is committed. The effect is a multiple email send.

Try increasing the max.poll.interval.ms on the consumer side.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 H?ng hoàng
Solution 2 fcracker79