Многопоточный синхронный ввод-вывод

Предположим, что приложение может получать данные из нескольких потоков (файлов, сетвых соединений и т.д.), причем невозможно заранее сказать, в каком конкретно потоке появятся данные. Если вызвать функцию read, а в потоке нет данных, то вызов read заблокирует выполнение программы до поступления данных. Аналогично, если вызвать функцию write для потока, то выполнение может быть заблокировано, если системные буферы ввода-вывода заполнены (например, программа пытается передать данные через сетевое соединение с скоростью, превышающей пропускную способность сети или скорость чтения данных удаленной программы). Системный вызов select позволяет определить, какие потоки готовы для чтения (т.е. read не заблокирует выполнение программы), записи или находятся в исключительном состоянии (exception).

Потоки задаются файловыми дескрипторами, т.е. целыми числами, которые возвращают функции open, accept, socket. Содержательно функция select получает на вход три битовых множества файловых дескрипторов (чтение, запись и исключения), которые необходимо проверять, и величину таймаута. Если в течение заданного интервала времени ни один из отмеченных дескрипторов не удовлетворяет условиям select (например, мы следим только за наличием входных данных, но они не поступили), то select возвращает управление вызывающей программе, что позволяет избежать "зависания" программы на неопределенное время.

Для работы с битовыми множествами дескрипторов в стандартных заголовочных файлах объявлен специальный тип fd_set и набор функций для работы с ним. Эти функции и функция select имеют следующие прототипы.

#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);

void FD_CLR(int fd, fd_set *set); /* снять отметку с дескриптора fd в множестве set */
int  FD_ISSET(int fd, fd_set *set); /* проверить, отмечен ли дескриптор fd в множестве set */
void FD_SET(int fd, fd_set *set);  /* отметить дескриптор fd в множестве set */
void FD_ZERO(fd_set *set); /* обнулить множество */
Первый аргумент select должен быть на единицу больше максимального (во всех трех множествах) номера дескриптора. Указатели на битовые множества могут быть равны NULL. В этом случае select не проверяет уловия этого типа (например, если нас не интересует только чтение данных, то в качестве аргументов writefds и exceptfds можно передавать NULL).

При успешном завершении, даже если достигнут таймаут, функция select изменяет входные битовые множества таким образом, что отмеченными остаются только те дескрипторы, которые удовлетворяют условию, и возвращает общее количество отмеченных дескрипторов. Таким образом, при выходе по таймауту если функция select везвращает значение 0. В случае ошибки select возвращает -1. Значение структры timeout в любом случае может быть изменено.

Рассмотрим в качестве примера функцию, которая получает данные либо из стандартного ввода (с клавиатуры), либо из заданного сетевого соединения.

#include <sys/select.h>

void
check_data(int sock)
{
    fd_set rfds; /* битовое множество файловых дескрипторов */
    struct timeval tv; /* структура для задания таймаута */
    int retval;

    FD_ZERO(&rfds); /* обнуляем все элемениы множества */
    FD_SET(0, &rfds); /* помечаем 0 — этот дескриптор соответствует stdin */
    FD_SET(sock, &rfds); /* помечаем дескриптор sock */

    /* Задаем таймаут 3.5 сек */
    tv.tv_sec = 3; /* секунды */
    tv.tv_usec = 500; /* микросекунды */

    retval = select(sock+1, &rfds, NULL, NULL, &tv); /* Нас интересуют только данные для чтения */

    /* Проверяем, где появились данные */
    if (retval == -1) {
        /* Ошибка; печатаем errno */
        perror("select()");
    } else if (retval > 0) { /* Где-то есть данные */
        if( FD_ISSET(0, &rfds) )
            printf("Данные с клавиатуры.\n");
        if( FD_ISSET(sock, &rfds) )
            printf("Данные в потоке %d.\n", sock);
    }
    else
        printf("Таймаут.\n");
}

Следует отметить, что стандартый ввод (файловый дескриптор 0) и сокет, который используется для приема соединений, ничем не отличаются от других дескрипторов. Схематично программа обработки запросов может выглядеть так:

#include <algorithm>
#include <vector>
#include <sys/select.h>
    // ...
    int as; // сокет для приема соединений
    std::vector<int> clients; // Все известные нам клиенты; 0-й элемент - as

    /*
    as = socket(...);
    bind(...);
    listen(as, 10);
    */

    clients.push_back(as);

    while(1) {
        fd_set rfds;
        FD_ZERO(&rfds); /* обнуляем все элемениы множества */

        FD_SET(as, &rfds);
        for(int i=0; i<clients.size(); i++) {
            FD_SET(clients[i], &rfds);
        }
        int max_fd = *std::max_element(clients.begin(), clients.end()); /* функция из algorithm */ 

        // Заполняем структуру tv. Это нужно делать каждый раз, так как она "портится"
        struct timeval tv;
        tv.tv_sec = 3; /* секунды */
        tv.tv_usec = 0; /* микросекунды */

        retval = select(max_fd+1, &rfds, NULL, NULL, &tv);

        if( retval > 0 ) {
            for(int i=1; i < clients.size(); i++) { // пропускаем 0-й элемент
                if( FD_ISSET(clients[i], &rfds)) {
                    // Обрабатываем сообщение от клиента clients[i]
                }
            }
            if( FD_ISSET(as, &rfds)) {
                /* новый клиент */
                int new_client_sock = accept(as, 0, 0);
                clients.push_back(new_client_sock);
            }
        }
        // ...
    }