'KafkaConsumer loses some alarms which are in others environments

I have facing and issue related whit a KafkaConsumer, our scenario is the following we have 5 environments which have a KafkaConsumer implemented, all of them pointing to the same Kafka server and topic also all the consumers have the same config and group.id.

I notice that some of the environments are losing messages but these lose messages are reach other environments. I think that somehow is related with I am using the same group.id.

For example if the message 'A' is present in env1, is not present in env2,3,4,5.

Could someone give me an idea of what could be the cause or if is related with the group.id.



Solution 1:[1]

I suspect the bug is in this line: sc = malloc(sizeof(nbthreads));

You probably wanted sc = malloc(sizeof(string_count));

I'm also not sure if struct string_count* sc = malloc(sizeof(arg)); does what you intended in thread_main.

You probably need the sc in main be an array and pass a different item in it to each thread and then aggregate them after the join.

Solution 2:[2]

Here are the first few errors I have spotted, in no particular order.

  1. Unprotected modification of data structures from multiple threads. Read something about multithreading. Pay attention to the word "mutex".
  2. pthread_join(threads[nbthreads--], NULL); goes out of bounds.
  3. struct string_count* sc = malloc(sizeof(arg)); makes no sense. sizeof(arg) is the size of a pointer (8 on most PC-like systems). This is not enough to hold one struct string_count.
  4. struct string_count_entry* a = malloc(sizeof(pt2)); apparently has even less sense. You are allocating something in a string comparison function, using a wrong size, then you are using the allocated memory without initializing it, and without even trying to compare the things passed to the function.
  5. while ((line == readline()) != '\0') does not assign anything.
  6. pt->entries = realloc(pt->entries, pt->nb_string + 1 * sizeof(pt->entries[0])); is missing a couple of parentheses.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 littleadv
Solution 2 n. 1.8e9-where's-my-share m.