Сокеты как средство ipc

Делаем вебсокеты на PHP с нуля. Часть 2. IPC

После написания моей предыдущей статьи Делаем вебсокеты на PHP с нуля я понял, что у сообщества есть некоторый интерес к поднятой мною теме.

И, как обычно, — получившийся код и ссылка на демонстрационный чат в конце статьи.

Запуск нескольких процессов для обработки соединений

Для работы простого сервера вебсокетов достаточно одного процесса, но чтобы увеличить количество одновременных соединений (и обойти ограничение 1024 одновременных соединения), а также для использования ресурсов всего процессора (а не только одного ядра), необходимо, чтобы сервер вебсокетов использовал несколько процессов (оптимально — количество процессов = количество ядер процессора).

Про отличие родительского процесса от дочернего можно почитать на википедии.

Мы можем в цикле создавать столько дочерних процессов, сколько нам необходимо:

Межпроцессное взаимодействие

Для взаимодействия между родительским и дочерним процессом мы будем использовать сокеты, а именно связанные сокеты:
Функция stream_socket_pair() создаёт пару связанных неразличимых потоковых сокетов. Таким образом мы можем писать в один сокет, а считывать данные из второго.

Теперь совмещаем этот код с форками и получаем:

Итоговый код для создания множества дочерних процессов:

Разделение процессов на мастера и воркеров

Проксирование вебсокетов с помощью nginx

Nginx поддерживает проксирование вебсокетов начиная с версии 1.3.13. Благодаря nginx можно обрабатывать соединения к серверу вебсокетов на том же порту, что и сайт, а также ограничить количество открытых вебсокетов с одного ip и другие полюбившиеся вам плюшки.

Пример nginx-конфига, который это позволяет:

Запуск из консоли

Интеграция с вашим фреймворком на примере yii

Так как наш мастер прослушивает дополнительный сокет для связи с нашими скриптами (в примере выше был unix:///tmp/websocket.sock ), мы можем в любом месте нашего сайта или в кроне соединиться с этим сокетом и отправить сообщение, которое мастер разошлёт всем воркерам, а они, в свою очередь, все клиентам:

С использованием компонента yii это будет выглядеть вот так:

Скачиваем экстеншн, кладём его в папку extensions/websocket
В папку components кладём Websocket.php, WebsocketMasterHandler.php и WebsocketWorkerHandler.php из папки sample/yii.
В папку commands кладём из WebsocketCommand.php из папки sample/yii.
В конфиги main.php и console.php вставляем в секцию components:

В конфиг console.php также вставляем в секцию import:

Демонстрация

Все исходники я оформил в виде библиотеки и выложил на github

Update: Если сообществу интересна эта тема, то следующая статья будет про то как сделать простую игру, в которой все участники будут находиться на одном игровом поле и взаимодействовать друг с другом в реальном времени (демка уже почти готова).

Источник

Знакомство с межпроцессным взаимодействием на Linux

Межпроцессное взаимодействие (Inter-process communication (IPC)) — это набор методов для обмена данными между потоками процессов. Процессы могут быть запущены как на одном и том же компьютере, так и на разных, соединенных сетью. IPC бывают нескольких типов: «сигнал», «сокет», «семафор», «файл», «сообщение»…

Отступление: данная статья является учебной и расчитана на людей, только еще вступающих на путь системного программирования. Ее главный замысел — познакомиться с различными способами взаимодействия между процессами на POSIX-совместимой ОС.

Именованный канал

Для передачи сообщений можно использовать механизмы сокетов, каналов, D-bus и другие технологии. Про сокеты на каждом углу можно почитать, а про D-bus отдельную статью написать. Поэтому я решил остановиться на малоозвученных технологиях отвечающих стандартам POSIX и привести рабочие примеры.

Рассмотрим передачу сообщений по именованным каналам. Схематично передача выглядит так:
Сокеты как средство ipc. Смотреть фото Сокеты как средство ipc. Смотреть картинку Сокеты как средство ipc. Картинка про Сокеты как средство ipc. Фото Сокеты как средство ipc
Для создания именованных каналов будем использовать функцию, mkfifo():

Примечание: mode используется в сочетании с текущим значением umask следующим образом: (mode &

umask). Результатом этой операции и будет новое значение umask для создаваемого нами файла. По этой причине мы используем 0777 (S_IRWXO | S_IRWXG | S_IRWXU), чтобы не затирать ни один бит текущей маски.

Как только файл создан, любой процесс может открыть этот файл для чтения или записи также, как открывает обычный файл. Однако, для корректного использования файла, необходимо открыть его одновременно двумя процессами/потоками, одним для получение данных (чтение файла), другим на передачу (запись в файл).

Пример

mkfifo.c

Мы открываем файл только для чтения (O_RDONLY). И могли бы использовать O_NONBLOCK модификатор, предназначенный специально для FIFO файлов, чтобы не ждать когда с другой стороны файл откроют для записи. Но в приведенном коде такой способ неудобен.

Компилируем программу, затем запускаем ее:

В соседнем терминальном окне выполняем:

В результате мы увидим следующий вывод от программы:

Разделяемая память

Следующий тип межпроцессного взаимодействия — разделяемая память (shared memory). Схематично изобразим ее как некую именованную область в памяти, к которой обращаются одновременно два процесса:
Сокеты как средство ipc. Смотреть фото Сокеты как средство ipc. Смотреть картинку Сокеты как средство ipc. Картинка про Сокеты как средство ipc. Фото Сокеты как средство ipc
Для выделения разделяемой памяти будем использовать POSIX функцию shm_open():

Функция возвращает файловый дескриптор, который связан с объектом памяти. Этот дескриптор в дальнейшем можно использовать другими функциями (к примеру, mmap() или mprotect()).

Целостность объекта памяти сохраняется, включая все данные связанные с ним, до тех пор пока объект не отсоединен/удален (shm_unlink()). Это означает, что любой процесс может получить доступ к нашему объекту памяти (если он знает его имя) до тех пор, пока явно в одном из процессов мы не вызовем shm_unlink().

После создания общего объекта памяти, мы задаем размер разделяемой памяти вызовом ftruncate(). На входе у функции файловый дескриптор нашего объекта и необходимый нам размер.

Пример

Следующий код демонстрирует создание, изменение и удаление разделяемой памяти. Так же показывается как после создания разделяемой памяти, программа выходит, но при следующем же запуске мы можем получить к ней доступ, пока не выполнен shm_unlink().

shm_open.c

После создания объекта памяти мы установили нужный нам размер shared memory вызовом ftruncate(). Затем мы получили доступ к разделяемой памяти при помощи mmap(). (Вообще говоря, даже с помощью самого вызова mmap() можно создать разделяемую память. Но отличие вызова shm_open() в том, что память будет оставаться выделенной до момента удаления или перезагрузки компьютера.)

Компилировать код на этот раз нужно с опцией -lrt:

Смотрим что получилось:

Аргумент «create» в нашей программе мы используем как для создания разделенной памяти, так и для изменения ее содержимого.

Зная имя объекта памяти, мы можем менять содержимое разделяемой памяти. Но стоит нам вызвать shm_unlink(), как память перестает быть нам доступна и shm_open() без параметра O_CREATE возвращает ошибку «No such file or directory».

Семафор

Семафор — самый часто употребляемый метод для синхронизации потоков и для контролирования одновременного доступа множеством потоков/процессов к общей памяти (к примеру, глобальной переменной). Взаимодействие между процессами в случае с семафорами заключается в том, что процессы работают с одним и тем же набором данных и корректируют свое поведение в зависимости от этих данных.

Семафор со счетчиком

Смысл семафора со счетчиком в том, чтобы дать доступ к какому-то ресурсу только определенному количеству процессов. Остальные будут ждать в очереди, когда ресурс освободится.

Итак, для реализации семафоров будем использовать POSIX функцию sem_open():

В функцию для создания семафора мы передаем имя семафора, построенное по определенным правилам и управляющие флаги. Таким образом у нас получится именованный семафор.
Имя семафора строится следующим образом: в начале идет символ «/» (косая черта), а следом латинские символы. Символ «косая черта» при этом больше не должен применяться. Длина имени семафора может быть вплоть до 251 знака.

Если нам необходимо создать семафор, то передается управляющий флаг O_CREATE. Чтобы начать использовать уже существующий семафор, то oflag равняется нулю. Если вместе с флагом O_CREATE передать флаг O_EXCL, то функция sem_open() вернет ошибку, в случае если семафор с указанным именем уже существует.

Параметр mode задает права доступа таким же образом, как это объяснено в предыдущих главах. А переменной value инициализируется начальное значение семафора. Оба параметра mode и value игнорируются в случае, когда семафор с указанным именем уже существует, а sem_open() вызван вместе с флагом O_CREATE.

Для быстрого открытия существующего семафора используем конструкцию:
, где указываются только имя семафора и управляющий флаг.

Пример семафора со счетчиком

Рассмотрим пример использования семафора для синхронизации процессов. В нашем примере один процесс увеличивает значение семафора и ждет, когда второй сбросит его, чтобы продолжить дальнейшее выполнение.

sem_open.c

В одной консоли запускаем:

В соседней консоли запускаем:

Бинарный семафор

Вместо бинарного семафора, для которого так же используется функция sem_open, я рассмотрю гораздо чаще употребляемый семафор, называемый «мьютекс» (mutex).

Мьютекс по существу является тем же самым, чем является бинарный семафор (т.е. семафор с двумя состояниями: «занят» и «не занят»). Но термин «mutex» чаще используется чтобы описать схему, которая предохраняет два процесса от одновременного использования общих данных/переменных. В то время как термин «бинарный семафор» чаще употребляется для описания конструкции, которая ограничивает доступ к одному ресурсу. То есть бинарный семафор используют там, где один процесс «занимает» семафор, а другой его «освобождает». В то время как мьютекс освобождается тем же процессом/потоком, который занял его.

Без мьютекса не обойтись в написании, к примеру базы данных, к которой доступ могут иметь множество клиентов.

Для использования мьютекса необходимо вызвать функцию pthread_mutex_init():

Функция инициализирует мьютекс (перемнную mutex) аттрибутом mutexattr. Если mutexattr равен NULL, то мьютекс инициализируется значением по умолчанию. В случае успешного выполнения функции (код возрата 0), мьютекс считается инициализированным и «свободным».

Функция pthread_mutex_lock(), если mutex еще не занят, то занимает его, становится его обладателем и сразу же выходит. Если мьютекс занят, то блокирует дальнейшее выполнение процесса и ждет освобождения мьютекса.
Функция pthread_mutex_trylock() идентична по поведению функции pthread_mutex_lock(), с одним исключением — она не блокирует процесс, если mutex занят, а возвращает EBUSY код.
Фунция pthread_mutex_unlock() освобождает занятый мьютекс.

Пример mutex

mutex.c

Данный пример демонстрирует совместный доступ двух потоков к общей переменной. Один поток (первый поток) в автоматическом режиме постоянно увеличивает переменную counter на единицу, при этом занимая эту переменную на целую секунду. Этот первый поток дает второму доступ к переменной count только на 10 миллисекунд, затем снова занимает ее на секунду. Во втором потоке предлагается ввести новое значение для переменной с терминала.

Если бы мы не использовали технологию «мьютекс», то какое значение было бы в глобальной переменной, при одновременном доступе двух потоков, нам не известно. Так же во время запуска становится очевидна разница между pthread_mutex_lock() и pthread_mutex_trylock().

Компилировать код нужно с дополнительным параметром -lpthread:

Запускаем и меняем значение переменной просто вводя новое значение в терминальном окне:

Вместо заключения

В следующих статьях я хочу рассмотреть технологии d-bus и RPC. Если есть интерес, дайте знать.
Спасибо.

UPD: Обновил 3-ю главу про семафоры. Добавил подглаву про мьютекс.

Источник

IPC: сокеты против именованных каналов

Абсолютные числа большого смысла не имеют, но как сравнение информация представляет некоторую ценность

Условия

Результаты

Сокеты
Скорость односторонней передачи — 160 мегабайт в секунду (105 с антивирусом)
Загрузка процессора — 90%, из них примерно 2/3 в ядре
Оперативная память — 4-5 мегабайт на процесс, со временем не росла

Именованные каналы
Скорость односторонней передачи — 755 мегабайт в секунду (антивирус не влияет)
Загрузка процессора — 80%, из них примерно 2/3 в ядре
Оперативная память — 4-5 мегабайт на процесс, со временем не росла

Исходный код

using System;
using System.IO.Pipes;
using System.Net;
using System.Net.Sockets;

namespace Server
<
public static class Program
<
private static byte [] buffer = new byte [512 * 1024];

private static void OnSocketSend(IAsyncResult ar)
<
Socket client = (Socket)ar.AsyncState;

client.EndSend(ar);
client.BeginSend(buffer, 0, buffer.Length, SocketFlags.None, OnSocketSend, client);
>

server.Bind( new IPEndPoint(IPAddress.Loopback, 5000));
server.Listen(10);

while ( true )
<
Socket client = server.Accept();

client.SendBufferSize = 2 * buffer.Length;
client.BeginSend(buffer, 0, buffer.Length, SocketFlags.None, OnSocketSend, client);

using System;
using System.Diagnostics;
using System.IO.Pipes;
using System.Net;
using System.Net.Sockets;

namespace Client
<
public static class Program
<
private static byte [] buffer = new byte [512 * 1024];
private static Stopwatch watch = new Stopwatch();
private static long traffic = 0;
private static int step = 0;

private static void OnSocketReceive(IAsyncResult ar)
<
Socket client = (Socket)ar.AsyncState;

traffic += client.EndReceive(ar);
step++;

if ((step % 1000) == 0)
<
watch.Stop();

client.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, OnSocketReceive, client);
>

client.ReceiveBufferSize = 2 * buffer.Length;
client.Connect( new IPEndPoint(IPAddress.Loopback, 5000));

client.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, OnSocketReceive, client);

using System;
using System.IO.Pipes;
using System.Net;
using System.Net.Sockets;

namespace Server
<
public static class Program
<
private static byte [] buffer = new byte [256 * 1024];

private static void OnPipeSend(IAsyncResult ar)
<
NamedPipeServerStream server = (NamedPipeServerStream)ar.AsyncState;

server.EndWrite(ar);
server.BeginWrite(buffer, 0, buffer.Length, OnPipeSend, server);
>

server.WaitForConnection();
server.BeginWrite(buffer, 0, buffer.Length, OnPipeSend, server);

using System;
using System.Diagnostics;
using System.IO.Pipes;
using System.Net;
using System.Net.Sockets;

namespace Client
<
public static class Program
<
private static byte [] buffer = new byte [256 * 1024];
private static Stopwatch watch = new Stopwatch();
private static long traffic = 0;
private static int step = 0;

private static void OnPipeReceive(IAsyncResult ar)
<
NamedPipeClientStream client = (NamedPipeClientStream)ar.AsyncState;

traffic += client.EndRead(ar);
step++;

if ((step % 1000) == 0)
<
watch.Stop();

client.BeginRead(buffer, 0, buffer.Length, OnPipeReceive, client);
>

client.BeginRead(buffer, 0, buffer.Length, OnPipeReceive, client);

Источник

Сокеты¶

Сокеты (англ. socket — разъём) — название программного интерфейса для обеспечения обмена данными между процессами. Процессы при таком обмене могут исполняться как на одной ЭВМ, так и на различных ЭВМ, связанных между собой сетью. Сокет — абстрактный объект, представляющий конечную точку соединения.

Принципы сокетов¶

Каждый процесс может создать слушающий сокет (серверный сокет) и привязать его к какому-нибудь порту операционной системы (в UNIX непривилегированные процессы не могут использовать порты меньше 1024). Слушающий процесс обычно находится в цикле ожидания, то есть просыпается при появлении нового соединения. При этом сохраняется возможность проверить наличие соединений на данный момент, установить тайм-аут для операции и т.д.

Каждый сокет имеет свой адрес. ОС семейства UNIX могут поддерживать много типов адресов, но обязательными являются INET-адрес и UNIX-адрес. Если привязать сокет к UNIX-адресу, то будет создан специальный файл (файл сокета) по заданному пути, через который смогут сообщаться любые локальные процессы путём чтения/записи из него (см. Доменный сокет Unix). Сокеты типа INET доступны из сети и требуют выделения номера порта.

Обычно клиент явно подсоединяется к слушателю, после чего любое чтение или запись через его файловый дескриптор будут передавать данные между ним и сервером.

Основные функции¶

Общие
SocketСоздать новый сокет и вернуть файловый дескриптор
SendОтправить данные по сети
ReceiveПолучить данные из сети
CloseЗакрыть соединение
Серверные
BindСвязать сокет с IP-адресом и портом
ListenОбъявить о желании принимать соединения. Слушает порт и ждет когда будет установлено соединение
AcceptПринять запрос на установку соединения
Клиентские
ConnectУстановить соединение

socket()¶

Создаёт конечную точку соединения и возвращает файловый дескриптор. Принимает три аргумента:

domain указывающий семейство протоколов создаваемого сокета

type

protocol

Протоколы обозначаются символьными константами с префиксом IPPROTO_* (например, IPPROTO_TCP или IPPROTO_UDP). Допускается значение protocol=0 (протокол не указан), в этом случае используется значение по умолчанию для данного вида соединений.

Функция возвращает −1 в случае ошибки. Иначе, она возвращает целое число, представляющее присвоенный дескриптор.

Связывает сокет с конкретным адресом. Когда сокет создается при помощи socket(), он ассоциируется с некоторым семейством адресов, но не с конкретным адресом. До того как сокет сможет принять входящие соединения, он должен быть связан с адресом. bind() принимает три аргумента:

Возвращает 0 при успехе и −1 при возникновении ошибки.

Автоматическое получение имени хоста.

listen()¶

Подготавливает привязываемый сокет к принятию входящих соединений. Данная функция применима только к типам сокетов SOCK_STREAM и SOCK_SEQPACKET. Принимает два аргумента:

После принятия соединения оно выводится из очереди. В случае успеха возвращается 0, в случае возникновения ошибки возвращается −1.

accept()¶

Используется для принятия запроса на установление соединения от удаленного хоста. Принимает следующие аргументы:

Функция возвращает дескриптор сокета, связанный с принятым соединением, или −1 в случае возникновения ошибки.

connect()¶

Устанавливает соединение с сервером.

Некоторые типы сокетов работают без установления соединения, это в основном касается UDP-сокетов. Для них соединение приобретает особое значение: цель по умолчанию для посылки и получения данных присваивается переданному адресу, позволяя использовать такие функции как send() и recv() на сокетах без установления соединения.

Загруженный сервер может отвергнуть попытку соединения, поэтому в некоторых видах программ необходимо предусмотреть повторные попытки соединения.

Возвращает целое число, представляющее код ошибки: 0 означает успешное выполнение, а −1 свидетельствует об ошибке.

Передача данных¶

Для передачи данных можно пользоваться стандартными функциями чтения/записи файлов read и write, но есть специальные функции для передачи данных через сокеты:

Нужно обратить внимание, что при использовании протокола TCP (сокеты типа SOCK_STREAM) есть вероятность получить меньше данных, чем было передано, так как ещё не все данные были переданы, поэтому нужно либо дождаться, когда функция recv возвратит 0 байт, либо выставить флаг MSG_WAITALL для функции recv, что заставит её дождаться окончания передачи. Для остальных типов сокетов флаг MSG_WAITALL ничего не меняет (например, в UDP весь пакет = целое сообщение).

Источник

C: сокеты и пример модели client-server

Перевод с дополнениями. Оригинал – тут>>>.

Как правило – два процесса общаются друг с другом с помощью одного из Inter Process Communication (IPC) механизма ядра, таких как:

Кроме перечисленных IPC – в ядре присутствует много других возможностей, но что если процессам необходимо обмениваться данными по сети?

Тут используется ещё один механизм IPC – сокеты.

Что такое сокет?

Сокеты (англ. socket — разъём) — название программного интерфейса для обеспечения обмена данными между процессами. Процессы при таком обмене могут исполняться как на одной ЭВМ, так и на различных ЭВМ, связанных между собой сетью. Сокет — абстрактный объект, представляющий конечную точку соединения.

Кратко говоря – существует два типа сокетов – UNIX-сокеты (или сокеты домена UNIXUnix domain sockets) и INET-сокеты (IP-сокеты, network sockets).

UNIX-сокеты чвляются частью механизма IPC и позволяют обмен данными в обоих направлениях между процессами, работающими на одной машине.

INET-сокеты в свою очередь представляют собой механизм, позволяющий выполнять коммуникацию между процессами по сети.

Грубо говоря – если UNIX-сокет использует файл в файловой системе, то INET-сокет – требует присваивания сетевого адреса и порта.

Коммуникация в среде TCP/IP происходит по клиент-серверной модели, т.е. – клиент инициализирует связь, а сервер его принимает.

Ниже – пример сервера, который будет работать как демон и ожидать подключения клиента, а при инициализации клиентом соединения – передаст ему дату и время.

Socket сервер

Наш сервер будет выглядеть следующим образом:

Теперь – давайте рассмотрим сам код сервера.

Далее – вызывается функция bind() :

Socket клиент

Перейдём ко второй программе – клиенту.

Код её будет выглядеть следующим образом:

Кратко рассмотрим его:

И в конце-концов – клиент с помощью read() получает данные из своего сокета, в который поступают данные от сокета на сервере.

Собираем клиент, и пробуем подключиться к нашему серверу:

Источник

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *