18 июл. 2015 г.

Unix: Обработка отсутствия читателей FIFO, потоко-безопасная обработка SIGPIPE

Ссылки

 

Открытие при отсутствии читателей


open(O_RDWR) 
open() не блокируется и сразу открывает FIFO, так как вызывающий процесс становится читателем.

open(O_WRONLY | O_NONBLOCK)
open() не блокируется и сразу возвращает ошибку ENXIO.

open(O_WRONLY)
open() блокируется до тех пор, пока не появится читатель.

 

Запись при отсутствии читателей (обработка SIGPIPE)


write() в FIFO без читателей генерирует SIGPIPE независимо от флага O_NONBLOCK. Обработка SIGPIPE может вызвать определенные затруднения. Ниже перечислены решения проблемы.

1. Использовать сокеты вместо FIFO


Если использовать сокеты (например UNIX domain sockets), то генерацию SIGPIPE можно отключить, если:
  • вместо write() использовать send(), и
  • передавать в send() флаг MSG_NOSIGNAL (Linux) или установить в setsockopt() опцию SO_NOSIGPIPE (BSD).

Замечание: опция SO_NOSIGPIPE работает для send(), но не для write().

2. Использовать режим открытия O_RDWR


При использовании флага O_RDWR, SIGPIPE никогда не будет генерироваться.

В этом случае пропадает возможность получить уведомление о завершении читателя.

3. Игнорировать или перехватывать SIGPIPE


Если SIGPIPE игнорируется или перехватывается, write() будет возвращать ошибку EPIPE.

В этом случае возникает проблема потоко-безопасности. Т.к. игнорирование или перехват сигналов устанавливаются только для всего процесса, нет потоко-безопасного способа временно изменить диспозицию сигнала.

4. Использовать select()/poll()


Если вместо блокирующего write() использовать select() или poll(), то об отсутствии читателей можно узнать, не вызывая write():
  • select() добавит дескриптор в exceptfds;
  • poll() взведет для дескриптора флаг POLLERR.

В этом случае можно не вызывать write() и избежать генерации SIGPIPE. Такой подход может работать, однако здесь присутствует гонка: читатель может завершиться за небольшой промежуток времени между вызовами select()/poll() и write(), и тогда SIGPIPE все равно будет сгенерирован.

5. Использовать синхронную обработку сигналов


Наконец, существует потоко-безопасный способ обработать SIGPIPE, используя функции для синхронной обработки сигналов. Этот способ требует больше всего кода. Он подробно описан здесь.

Схема работы следующая:
  1. Перед вызовом write() блокируем SIGPIPE для текущего потока с помощью pthread_sigmask(). Нужно заметить, что ядро всегда доставляет SIGPIPE тому потоку, который вызвал write (см. signal(7)).
  2. После блокировки сигнала с помощью sigpending() проверяем, не ожидает ли SIGPIPE обработки еще до вызова write().
  3. Вызываем write(). Если читатель отсутствует, SIGPIPE будет добавлен в маску сигналов, ожидающих обработки, для текущего потока, а write() вернет ошибку EPIPE.
  4. Если write() вернул EPIPE, удаляем сигнал с помощью неблокирующего вызов sigtimedwait(), но только если сигнал не ожидал обработки до вызова write() (в противном случае наш код "съедал" бы чужой сигнал, т.к. один и тот же сигнал, пришедший несколько раз, не ставится в очередь, а доставляется только один раз).
  5. Восстанавливаем изначальную маску сигналов с помощью pthread_sigmask().
Пример реализации

#include <unistd.h>
#include <time.h>
#include <errno.h>
#include <sys/signal.h>

ssize_t safe_write(int fd, const void* buf, size_t bufsz)
{
    sigset_t sig_block, sig_restore, sig_pending;

    sigemptyset(&sig_block);
    sigaddset(&sig_block, SIGPIPE);

    /* Block SIGPIPE for this thread.
     *
     * This works since kernel sends SIGPIPE to the thread that called write(),
     * not to the whole process.
     */
    if (pthread_sigmask(SIG_BLOCK, &sig_block, &sig_restore) != 0) {
        return -1;
    }

    /* Check if SIGPIPE is already pending.
     */
    int sigpipe_pending = -1;
    if (sigpending(&sig_pending) != -1) {
        sigpipe_pending = sigismember(&sig_pending, SIGPIPE);
    }

    if (sigpipe_pending == -1) {
        pthread_sigmask(SIG_SETMASK, &sig_restore, NULL);
        return -1;
    }

    ssize_t ret;
    while ((ret = write(fd, buf, bufsz)) == -1) {
        if (errno != EINTR)
            break;
    }

    /* Fetch generated SIGPIPE if write() failed with EPIPE.
     *
     * However, if SIGPIPE was already pending before calling write(), it was also
     * generated and blocked by caller, and caller may expect that it can fetch it
     * later. Since signals are not queued, we don't fetch it in this case.
     */
    if (ret == -1 && errno == EPIPE && sigpipe_pending == 0) {
        struct timespec ts;
        ts.tv_sec = 0;
        ts.tv_nsec = 0;

        int sig;
        while ((sig = sigtimedwait(&sig_block, 0, &ts)) == -1) {
            if (errno != EINTR)
                break;
        }
    }

    pthread_sigmask(SIG_SETMASK, &sig_restore, NULL);
    return ret;
}