'Epoll_wait() Does Not Recognize Client

I tried to make a simple server-client protocol where the server sends a set of fragmented files to clients, each client gets a fragmented file. I ran into an issue that my epoll_wait() blocks after it accepts a new client. It seems that epoll_wait() did not update the number of active events to be 2 (the server and the first client is active). When I printed the client ip address and port number, it did not match the server port number. Is this something that I should expect that the connection still can be established, even though the client is on a different port. How can I resolve this issue that it did not pick up the client event? I appreciate all help that I can get! Thank you!

Here is my server implementation:

#include <sys/epoll.h>
#include <stdio.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <errno.h>

#define MAX_EVENTS 1024
#define LISTEN_BACKLOG 5
#define MAX_BUF_SIZE 256

typedef struct FRAG_FILE
{
    FILE *fp;
    int client_fd;
    int number_bytes;
    int number_bytes_sorted;
    char *line;
    int length;
    int capacity;
    int sorted;
    int assigned; // use it to keep track of the file has been assigned to a client
    int sent;     // use it to keep track of the file has been sent to a client
} frag_file;

FILE *fp_output = NULL;

int initialize_server(int server_port){
    struct sockaddr_in server_addr;
    int server_sock = socket(AF_INET, SOCK_STREAM, 0);
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(server_port);
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    if(bind(server_sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0){
        printf("bind error: %s(%d)\n", strerror(errno), errno);
        exit(1);
    }
    if(listen(server_sock, LISTEN_BACKLOG) < 0){
        printf("listen error: %s(%d)\n", strerror(errno), errno);
        exit(1);
    }
    return server_sock;
}

void add_epoll_event(struct epoll_event* event, int fd, int epoll_fd){
    event->events = EPOLLIN | EPOLLET;
    event->data.fd = fd;
    printf("add fd %d to epoll\n", fd);
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, event) < 0){
        printf("epoll_ctl error in add_poll_event: %s(%d)\n", strerror(errno), errno);
        exit(1);
    }
}

int handle_level_trigger(int epoll_fd, struct epoll_event event, int input_fragmented_files, struct FRAG_FILE *frag_file_array){
    int fd = event.data.fd;
    if (event.events & EPOLLIN){
        int i;
        int num_file_sorted = 0;
        for (i = 0; i < input_fragmented_files; i++)
        {
            // find which file is sorted and count them
            char buf[MAX_BUF_SIZE];
            if (frag_file_array[i].sorted)
            {
                num_file_sorted++;
                continue;
            }
            if (frag_file_array[i].fp != NULL && frag_file_array[i].sent == 0)
            {
                // read in every line of the fragment file
                while (fgets(buf, MAX_BUF_SIZE, frag_file_array[i].fp))
                {
                    if (strlen(buf) > 0)
                    {
                        if (buf[strlen(buf) - 1] != '\n')
                        {
                            buf[strlen(buf) - 1] = '\n';
                        }
                        frag_file_array[i].number_bytes += strlen(buf);
                        int ret_write = write(fd, buf, strlen(buf));
                        if (ret_write == -1)
                        {
                            printf("write error %s\n", strerror(errno));
                            exit(1);
                        }
                    }
                }
                // when it is done sending the data, send a closing delimiter
                if (sprintf(buf, "%s", "-1\n") < 0)
                {
                    printf("sprintf error %s\n", strerror(errno));
                    exit(1);
                }
                if (write(frag_file_array[i].client_fd, buf, strlen(buf)) == -1)
                {
                    printf("write error %s\n", strerror(errno));
                    exit(1);
                }
                frag_file_array[i].sent = 1;
            }
        }
        printf("%d sorted files out of %d\n", num_file_sorted, input_fragmented_files);
        if (num_file_sorted == input_fragmented_files)
        {
            // write to the output file and put into the tree???? I am not sure if this is the right way to do it
            fclose(fp_output);
            fp_output = NULL;
            printf("All files have been merged\n");
            // flag = 1; // all files have been merged
            // break;
        }
    }
    if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0){
        printf("epoll_ctl error in edge trigger: %s(%d)\n", strerror(errno), errno);
        exit(1);
    }
    close(fd);
    return 0;
}

void handle_client(int server_sock, struct epoll_event *event, int epoll_fd){
    struct sockaddr_in client_addr;
    socklen_t client_addr_len = sizeof(client_addr);
    int client_sock = accept(server_sock, (struct sockaddr *)&client_addr, &client_addr_len);
    printf("accept a new client: %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
    printf("client sock: %d\n", client_sock);
    if(client_sock < 0){
        printf("accept error: %s(%d)\n", strerror(errno), errno);
        exit(1);
    }
    add_epoll_event(event, client_sock, epoll_fd); // fd = 9
}

void clean_up(frag_file *file, int num_fragmented_files)
{
    int i;
    for (i = 0; i < num_fragmented_files; i++)
    {
        if (file[i].line != NULL)
        {
            free(file[i].line);
        }
        if (file[i].fp != NULL)
        {
            fclose(file[i].fp);
        }
        if (file[i].client_fd >= 0)
        {
            close(file[i].client_fd);
        }
    }
}

int read_server_file(char *filename)
{
    // read in the file from the argv[1]. Then get the number of fragmented files.
    FILE *fp = fopen(filename, "r");
    if (fp == NULL)
    {
        printf("Error: Cannot open file %s because %s\n", filename, strerror(errno));
        exit(EXIT_FAILURE);
    }
    int line_num = 0, input_fragmented_files = 0;
    while (!feof(fp))
    {
        char buffer[1024];
        if (fscanf(fp, "%s\n", buffer) < 1)
        {
            printf("Fail to read file %s because %s\n", filename, strerror(errno));
            exit(EXIT_FAILURE);
        }
        // first line is the name of the output file
        if (line_num == 0)
        {
            if ((fp_output = fopen(buffer, "w")) == NULL)
            {
                printf("Fail to open file %s because %s\n", buffer, strerror(errno));
                exit(EXIT_FAILURE);
            }
        }
        // second line is the names of files that contain numbered lines for fragments of the main file
        else
        {
            FILE *fp_fragment = NULL;
            if ((fp_fragment = fopen(buffer, "r")) == NULL)
            {
                printf("Fail to open file %s because %s\n", buffer, strerror(errno));
                exit(EXIT_FAILURE);
            }
            input_fragmented_files++;
        }
        line_num++;
    }
    fclose(fp);
    if (input_fragmented_files <= 0)
    {
        printf("Error: Failed to read the server file to get the fragmented files\n");
        exit(1);
    }
    return input_fragmented_files;
}

void initialize_frag_file(frag_file *frag_file_array, int num_fragmented_files)
{
    int i;
    for (i = 0; i < num_fragmented_files; i++)
    {
        frag_file_array[i].fp = NULL;
        frag_file_array[i].client_fd = -1; // -1 means invalid
        frag_file_array[i].number_bytes = 0;
        frag_file_array[i].line = NULL;
        frag_file_array[i].length = 0;
        frag_file_array[i].capacity = 0;
        frag_file_array[i].sorted = 0;
        frag_file_array[i].assigned = 0;
        frag_file_array[i].sent = 0;
    }
}

void open_each_file(frag_file *frag_file_array, int num_fragmented_files, char *filename)
{
    // could do better here by combining the fuunction read_server_file and open_each_file into one function
    FILE *fp = fopen(filename, "r");
    int line_num = 0;
    while (!feof(fp))
    {
        char buffer[1024];
        if (fscanf(fp, "%s\n", buffer) < 1)
        {
            printf("Fail to read file %s because %s\n", filename, strerror(errno));
            exit(EXIT_FAILURE);
        }
        // first line is the name of the output file
        if (line_num == 0) continue;
        // second line is the names of files that contain numbered lines for fragments of the main file
        else
        {
            FILE *fp_fragment = NULL;
            if ((fp_fragment = fopen(buffer, "r")) == NULL)
            {
                printf("Fail to open file %s because %s\n", buffer, strerror(errno));
                exit(EXIT_FAILURE);
            }
            frag_file_array[line_num - 1].fp = fp_fragment;
        }
        line_num++;
    }
    fclose(fp);
}

int main(int argc, char *argv[]){
    struct epoll_event events[MAX_EVENTS];
    struct epoll_event event;

    // checking the arguments
    if (argc != 3)
    {
        printf("Incorrect number of arguments.\nUsage: lab3_server <filename> <port>\n");
        exit(1);
    }
    int server_port = atoi(argv[2]);
    char *filename = argv[1];
    
    // getting the file from the server
    int input_fragmented_files = read_server_file(filename);                                      // step 2
    frag_file *frag_file_array = (frag_file *)malloc(input_fragmented_files * sizeof(frag_file)); // step 3
    if (frag_file_array == NULL)
    {
        printf("Error: Failed to allocate memory for frag_file_array\n");
        exit(1);
    }
    initialize_frag_file(frag_file_array, input_fragmented_files); // step 3
    open_each_file(frag_file_array, input_fragmented_files, filename);

    int epoll_fd = epoll_create1(0);
    if(epoll_fd == -1){
        printf("epoll_create1 error: %s\n", strerror(errno));
        exit(EXIT_FAILURE);
    }
    int server_sock = initialize_server(server_port);
    add_epoll_event(&event, server_sock, epoll_fd); // fd = 8
    int number_fds = 0;
    while(1){
        number_fds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        printf("number_fds: %d\n", number_fds);
        if(number_fds == -1){
            printf("epoll_wait error: %s\n", strerror(errno));
            exit(EXIT_FAILURE);
        }
        int i;
        for(i = 0; i < number_fds; i++){
            if(events[i].data.fd == server_sock){
                printf("Server port %d is ready to accept\n", server_port);
                handle_client(server_sock, &event, epoll_fd);
            }
            else{
                handle_level_trigger(epoll_fd, events[i], input_fragmented_files, frag_file_array);
            }
        }
    }
    close(server_sock);
    close(epoll_fd);
    void clean_up(frag_file * file, int input_fragmented_files);
    return 0;
}

Here is my client implementation:

#define GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/epoll.h>

#define MAX_EVENTS 1
#define MAX_BUF_SIZE 1024

// create a client side
int create_client(char *ip, int port)
{
    int cfd;
    char hostname[100];
    char ip_addr[100];
    char port_num[100];
    struct sockaddr_in my_client_addr;
    cfd = socket(AF_INET, SOCK_STREAM, 0);
    if (cfd == -1)
    {
        perror("socket");
        exit(1);
    }
    memset(&my_client_addr, 0, sizeof(my_client_addr));
    my_client_addr.sin_family = AF_INET;
    my_client_addr.sin_port = htons(port);
    my_client_addr.sin_addr.s_addr = inet_addr(ip);
    gethostname(hostname, 100);
    printf("hostname: %s\n", hostname);
    int con_ret = connect(cfd, (const struct sockaddr *)&my_client_addr, sizeof(my_client_addr));
    printf("connect ret: %d\n", con_ret);
    // get name of the server
    getnameinfo((const struct sockaddr *)&my_client_addr, sizeof(my_client_addr), hostname, 100, NULL, 0, NI_NUMERICHOST | NI_NUMERICSERV);
    printf("server name: %s\n", hostname);
    // get ip address
    getnameinfo((const struct sockaddr *)&my_client_addr, sizeof(my_client_addr), ip_addr, 100, NULL, 0, NI_NUMERICHOST);
    printf("server ip address: %s\n", ip_addr);
    // get port number
    getnameinfo((const struct sockaddr *)&my_client_addr, sizeof(my_client_addr), NULL, 0, port_num, 100, NI_NUMERICSERV);
    printf("server port: %s\n", port_num);
    if (con_ret < 0)
    {
        printf("Error: Failed to connect to server: %s\n", strerror(errno));
        exit(1);
    }
    printf("Success: Connected to server\n");
    return cfd;
}

int main(int argc, char *argv[])
{
    char *line = NULL;
    if (argc != 3)
    {
        printf("Usage: %s <internet address> <port>\n", argv[0]);
        exit(1);
    }

    // read in the ip and the port number
    char *ip = argv[1];
    int port = atoi(argv[2]);
    if (port < 0 || port > 65535)
    {
        printf("Invalid port number\n");
        exit(1);
    }
    int cfd = create_client(ip, port);
    int epfd = epoll_create1(0);
    if (epfd < 0)
    {
        printf("Error: Failed to create epoll file descriptor: %s\n", strerror(errno));
        exit(1);
    }
    struct epoll_event ev, events[MAX_EVENTS];
    ev.events = EPOLLIN | EPOLLRDHUP;
    ev.data.fd = cfd;
    if (epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev) < 0)
    {
        printf("Error: Failed to add file descriptor to epoll: %s\n", strerror(errno));
        exit(1);
    }
    char buf[MAX_BUF_SIZE];
    int nfds;
    int length = 0;
    int capacity = 0;
    // connect to the server
    while (1)
    {
        nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
        printf("nfds: %d\n", nfds);
        if (nfds < 0)
        {
            printf("Error: Failed to wait on epoll: %s\n", strerror(errno));
            exit(1);
        }
        else if (nfds > 0)
        {
            int i;
            for (i = 0; i < nfds ; i++){
                printf("EPOLLIN: %s\n", events[i].events & EPOLLIN ? "true" : "false");
                if (events[i].events & EPOLLRDHUP)
                { // since there is only one connection, index it into the 0 index
                    int ret_ctl = epoll_ctl(epfd, EPOLL_CTL_DEL, cfd, NULL);
                    if (ret_ctl < 0)
                    {
                        printf("Error: Failed to delete file descriptor from epoll: %s\n", strerror(errno));
                        exit(1);
                    }
                }
                else if (events[i].events & EPOLLIN)
                {
                    int avail_bytes = capacity - length;
                    ssize_t num_bytes = read(cfd, buf, strlen(buf));
                    printf("read %ld bytes\n", num_bytes);
                    printf("Read in: %s\n", buf);
                    if (num_bytes < 0)
                    {
                        printf("Error: Failed to read from socket: %s\n", strerror(errno));
                    }
                    int i;
                    for (i = 0; i < num_bytes; i++)
                    {
                        int line_num;
                        char c = buf[i];
                        // create a dynamic array to store the lines
                        if (line == NULL)
                        {
                            line = (char *)malloc(MAX_BUF_SIZE);
                            capacity = MAX_BUF_SIZE;
                            length = 0;
                            avail_bytes = MAX_BUF_SIZE;
                        }
                        // save character into the array
                        // check for the reallocation of the dynamic memory
                        if (avail_bytes == 0)
                        {
                            line = (char *)realloc(line, capacity * 2);
                            capacity *= 2;
                            avail_bytes += MAX_BUF_SIZE; // reset the bytes count
                        }
                        line[length++] = c;
                        avail_bytes--;
                        if (c == '\n')
                        {
                            // end of a line
                            line[length++] = '\0';
                            avail_bytes--;
                            sscanf(line, "%d", &line_num); // the next line is the line number
                            if (line_num == -1)
                            {
                                // when there is no new line, get it from the data structuresend it back to the server
                                // right now I just have nothing here so it does not break
                                printf("%s\n", line);
                            }
                            else
                            {
                                // when there is a line, add it to the data structures
                                // right now I just have nothing here so it does not break
                                printf("%s\n", line);
                            }
                            avail_bytes += length;
                            length = 0; // reset the line length
                        }
                    }
                }
            }
        }
    }
    close(cfd);
    close(epfd);
    return 0;
}

Here is the output of the server when I ran it. The epoll_wait() call blocked the execution of the server and did not recognize the client that has been connected.

add fd 8 to epoll
number_fds: 1
Server port 36085 is ready to accept
accept a new client: 128.252.167.161:56486
client sock: 9
add fd 9 to epoll

Here is the output of the client side:

hostname: DESKTOP-34H42IU
connect ret: 0
server name: 127.0.0.1
server ip address: 127.0.0.1
server port: 36085
Success: Connected to server


Solution 1:[1]

  • After you connect to the server on the client side, you should send a request to the server, the server can receive the EPOLLIN event, or

  • add an EPOLLOUT event after accept on the server side, and send the content to the client when it can be written.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1