Перейти к основному содержимому

Открытое System Design интервью на Senior Go-разработчика

· 53 мин. чтения

Сегодня мы разберём содержательное и технически глубокое собеседование по системному дизайну, в ходе которого кандидат и интервьюер совместно проектируют архитектуру мессенджера уровня Telegram с 500 млн пользователей. Обсуждение охватывает ключевые аспекты высоконагруженных систем: от выбора баз данных (ScyllaDB, PostgreSQL) и поисковых движков (Elasticsearch) до механизмов доставки сообщений через WebSocket, сервиса Discovery на базе ZooKeeper, а также стратегий отказоустойчивости и обработки медиа-контента с разграничением на «горячее» и «холодное» хранилища.

Вопрос 1. Какая базовая функциональность мессенджера должна быть реализована?

Таймкод: 00:07:01

Ответ собеседника: правильный. Отправка и получение сообщений, приватные чаты, групповые чаты до 500 человек, отправка текста, картинок и видео, перманентная история сообщений, индикатор онлайн-статуса, пуш-уведомления о новых сообщениях, поиск по истории сообщений (полнотекстовый и по дате).

Правильный ответ:

Ответ собеседника покрывает основные сценарии, но для полноты картины стоит расширить список и структурировать функциональность по категориям.

1. Ядро мессенджера — обмен сообщениями

  • Отправка и получение текстовых сообщений — базовый сценарий. Сообщения должны доставляться в реальном времени (WebSocket, long polling, SSE).
  • Приватные чаты (1-on-1) — переписка между двумя пользователями.
  • Групповые чаты — поддержка групп от небольших (5–10 человек) до крупных (500+). Для больших групп нужна оптимизация: пагинация участников, lazy-loading истории, ограничение на частоту сообщений.
  • Медиа-контент — отправка изображений, видео, файлов, голосовых сообщений. Требуется хранение в object storage (S3-совместимое), генерация превью, ограничение размера файлов.
  • Перманентная история сообщений — все сообщения сохраняются на сервере и доступны при переустановке приложения или смене устройства. Это ключевое отличие от эфемерных мессенджеров вроде Signal по умолчанию.

2. Статусы и индикаторы доставки

  • Индикаторы доставки и прочтения — галочки (sent / delivered / read). Для групповых чатов — список прочитавших.
  • Онлайн-статус — отображение «в сети / был(а) N минут назад». Требует аккуратной работы с presence-системой (heartbeat, TTL).
  • Набор текста (typing indicator) — «печатает...» в реальном времени.

3. Уведомления

  • Push-уведомления — APNs для iOS, FCM для Android. Должны работать даже когда приложение свёрнуто. Важно корректно обрабатывать токены, обновлять их при переустановке.
  • Бейджи непрочитанных сообщений — счётчик на иконке приложения и внутри списка чатов.
  • Группировка уведомлений — по чату, чтобы не заваливать пользователя сотней отдельных пушей.

4. Поиск и навигация

  • Полнотекстовый поиск — по содержимому сообщений. Требует индексации (Elasticsearch, Meilisearch, или встроенные возможности PostgreSQL через tsvector).
  • Поиск по дате — навигация к сообщениям за конкретный день.
  • Поиск по вложениям — фильтрация сообщений с файлами, изображениями, ссылками.

5. Управление чатами и контактами

  • Список чатов (inbox) — сортировка по времени последнего сообщения, поддержка закреплённых чатов.
  • Добавление / удаление участников в групповые чаты.
  • Модерация — назначение администраторов, бан пользователей, ограничение прав на отправку сообщений.
  • Поиск пользователей — по имени, телефону, username.

6. Безопасность и приватность

  • End-to-end шифрование (E2EE) — опционально, но ожидаемый стандарт для приватных чатов (Signal Protocol, Double Ratchet).
  • Аутентификация — по телефону (SMS/звонок), по email, поддержка 2FA.
  • Блокировка пользователей — запрет на получение сообщений от конкретного пользователя.

7. Дополнительная функциональность (часто ожидаемая)

  • Цитирование и ответы на сообщения — reply с привязкой к конкретному сообщению.
  • Реакции (emoji reactions) — быстрый отклик без отправки отдельного сообщения.
  • Редактирование и удаление сообщений — с ограничением по времени и пометкой «edited».
  • Синхронизация между устройствами — мультидевайс поддержка (телефон + десктоп + веб).
  • Оффлайн-режим — очередь сообщений при потере соединения, отправка при восстановлении.

С точки зрения архитектуры на Go, для реализации этих функций потребуется: WebSocket-сервер (gorilla/websocket или nhooyr/websocket), брокер сообщений (NATS, Kafka или Redis Streams) для маршрутизации между инстансами, отдельный presence-сервис, асинхронные воркеры для обработки медиа и пуш-уведомлений.

Вопрос 2. Какую нагрузку ожидаем на мессенджер (количество пользователей, сообщений)?

Таймкод: 00:10:57

Ответ собеседника: неполный. Мессенджер предположительно очень популярный, но конкретные цифры по количеству пользователей и сообщений не были названы.

Правильный ответ:

Ответ слишком абстрактный. При проектировании мессенджера необходимо опираться на конкретные числа — даже если они гипотетические, потому что от них зависит выбор архитектуры, технологий и инфраструктуры. Правильный подход — задать интервьюеру уточняющие вопросы, а если цифр нет, предложить реалистичные оценки для обсуждения.

Уточняющие вопросы интервьюеру:

  • Какое ожидаемое количество пользователей при запуске и через год?
  • Какой процент пользователей будет активен одновременно (DAU/MAU ratio)?
  • Какое среднее количество сообщений в день на пользователя?
  • Какие пиковые часы нагрузки?
  • География аудитории — один регион или глобально?

Типичные ориентиры для масштабного мессенджера:

1. Пользователи

  • Зарегистрированные: 10–50 млн
  • DAU (daily active users): 20–30% от зарегистрированных = 2–15 млн
  • MAU (monthly active users): 50–60% от зарегистрированных = 5–30 млн
  • Одновременно онлайн (CCU): 5–10% от DAU = 100K–1.5M одновременных соединений

2. Сообщения

  • Среднее количество сообщений на пользователя в день: 20–50 (для активного мессенджера)
  • Общий объём сообщений в день: 100M–750M
  • Пиковый RPS на отправку: если пик приходится на 4 часа, то 750M / (4 × 3600) ≈ 50K–100K msg/s в пике
  • С учётом доставки в групповые чаты (одно сообщение → N получателей) — реальный RPS на доставку может быть в 3–5 раз выше: 150K–500K delivery/s

3. Медиа-контент

  • Доля сообщений с медиа: 20–40%
  • Средний размер вложения: 200KB (фото) – 5MB (видео)
  • Трафик медиа: десятки ТБ в день при масштабе 10M+ DAU

4. WebSocket-соединения

При 1M одновременно онлайн пользователей и мультидевайс-поддержке (в среднем 2 устройства на активного пользователя):

  • 2M+ одновременных WebSocket-соединений
  • Каждое соединение потребляет ~10–50KB RAM на сервере
  • Итого: 20–100GB RAM только на хранение состояний соединений

5. Хранение данных

  • Средний размер текстового сообщения: ~200 байт
  • Текстовые сообщения в день: 500M × 200B = 100GB/день
  • За год: ~36TB только текстовых сообщений
  • С метадатами (ID, timestamp, chat_id, sender_id): ~500 байт на сообщение → ~90TB/год
  • Медиа: ещё десятки ТБ в год

Почему это важно для архитектуры:

От этих цифр напрямую зависит:

  • Количество инстансов WebSocket-серверов — один сервер удерживает ~50K–100K соединений
  • Выбор базы данных — PostgreSQL справится на уровне до ~50M сообщений/день с шардированием, далее нужны специализированные решения (ScyllaDB, CockroachDB, или кастомное хранилище)
  • Брокер сообщений — Kafka для гарантированной доставки и высокой пропускной способности, или NATS JetStream для более простых сценариев
  • Шардирование — по user_id или chat_id, в зависимости от паттернов доступа
  • CDN — обязателен для медиа-контента при глобальной аудитории

Если интервьюер не даёт конкретных цифр, правильная стратегия — назвать диапазоны и обсудить, какие архитектурные решения подходят под каждый масштаб. Это демонстрирует системное мышление и понимание связи между нагрузкой и техническими решениями.

Вопрос 3. Какая нагрузка ожидается на мессенджер — сколько пользователей и сообщений в день?

Таймкод: 00:11:08

Ответ собеседника: правильный. Ожидается около 500 млн пользователей, каждый пишет примерно 50 сообщений в день, что даёт около 25 млрд сообщений в день.

Правильный ответ:

Это нагрузка уровня WhatsApp или Telegram. Дачим полный разбор масштаба и его последствий для архитектуры.

1. Ключевые цифры

  • Зарегистрированные пользователи: 500M
  • DAU (daily active): 20–30% = 100–150M активных в день
  • Сообщений в день: 25 млрд (500M × 50)
  • Одновременно онлайн (CCU): 5–10% от DAU = 5–15M WebSocket-соединений
  • С учётом мультидевайс: в среднем 1.5–2 устройства на активного пользователя = 10–30M одновременных соединений

2. Пропускная способность (RPS)

Распределение нагрузки неравномерно. Пиковые часы — утро и вечер в основных часовых поясах. Если пик длится ~6 часов и в него приходит 40% всех сообщений:

Пиковый RPS на отправку = (25B × 0.4) / (6 × 3600) ≈ 370K msg/s

Но одно сообщение в групповой чат нужно доставить каждому участнику. При среднем коэффициенте fan-out ×3 (с учёта групповых чатов):

Пиковый RPS на доставку ≈ 1M msg/s

3. Трафик и хранение

  • Текст: 25B × ~300 байт (с метаданными) = ~7.5TB/день
  • За год: ~2.7PB текстовых данных
  • Медиа (30% сообщений): 7.5B медиа-сообщений × 500KB средний размер = ~3.75PB/день
  • Итого хранение в год: несколько петабайт

4. Последствия для архитектуры Go-сервисов

WebSocket-серверы:

  • Один инстанс Go удерживает ~50K–100K соединений (зависит от RAM и паттернов)
  • При 20M соединений: 200–400 инстансов только для WebSocket
  • Каждый инстанс: 4–8 CPU, 8–16GB RAM

Брокер сообщений:

  • Kafka — стандарт индустрии для такого масштаба
  • Пропускная способность: 1M msg/s × ~500 байт = 500MB/s через брокер
  • Партиционирование по chat_id или user_id для параллелизма

База данных сообщений:

  • Ни одна СУБД не справится с 25B INSERT в день на одном кластере
  • Обязательно шардирование — по chat_id или по временному диапазону
  • ScyllaDB (Cassandra-совместимая, но на C++) или кастомное хранилище
  • Горячие данные (последние 30 дней) на SSD, архив — на HDD/S3

5. Пример архитектуры на Go

// Упрощённая схема обработки входящего сообщения

type MessageService struct {
wsHub *WebSocketHub // управление соединениями
broker *kafka.Writer // публикация в брокер
shardRouter ShardRouter // маршрутизация по шардам
}

func (s *MessageService) SendMessage(ctx context.Context, msg *Message) error {
// 1. Валидация и обогащение
msg.ID = uuid.New()
msg.CreatedAt = time.Now().UTC()

// 2. Публикация в Kafka для гарантированной доставки
payload, _ := json.Marshal(msg)
err := s.broker.WriteMessages(ctx, kafka.Message{
Key: []byte(msg.ChatID), // партиционирование по чату
Value: payload,
})
if err != nil {
return fmt.Errorf("publish message: %w", err)
}

// 3. Асинхронная доставка онлайн-получателям через WebSocket
// (делается consumer-ом из Kafka)
return nil
}

// Consumer обрабатывает сообщения из Kafka
func (s *MessageService) ConsumeAndDeliver(ctx context.Context) {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka:9092"},
Topic: "messages",
GroupID: "message-delivery",
})

for {
m, err := reader.ReadMessage(ctx)
if err != nil {
log.Error("read from kafka", err)
continue
}

var msg Message
json.Unmarshal(m.Value, &msg)

// Параллельная доставка всем участникам чата
members := s.getChatMembers(msg.ChatID)

var wg sync.WaitGroup
for _, memberID := range members {
wg.Add(1)
go func(uid string) {
defer wg.Done()

// Проверяем, онлайн ли пользователь
if conn := s.wsHub.GetConnection(uid); conn != nil {
conn.Send(msg)
} else {
// Пользователь оффлайн — отправляем push
s.pushNotifier.Send(ctx, uid, msg)
}
}(memberID)
}
wg.Wait()

// Сохраняем в персистентное хранилие
shard := s.shardRouter.GetShard(msg.ChatID)
shard.Save(ctx, msg)
}
}

6. Инфраструктурные требования (оценка)

КомпонентКоличествоРесурсы на инстанс
WebSocket-серверы3008 CPU, 16GB RAM
Message API504 CPU, 8GB RAM
Kafka брокеры1516 CPU, 64GB RAM, NVMe
Consumer-ы доставки1004 CPU, 8GB RAM
Шарды БД (ScyllaDB)30 нод32 CPU, 128GB RAM, NVMe
Redis (presence, кэш)108 CPU, 32GB RAM

7. Ключевые выводы

При 25 млрд сообщений в день невозможно обойтись монолитом и одной базой. Обязательно нужны: шардирование по чатам, асинхронная обработка через брокер, горизонтальное масштабирование каждого слоя, и чёткое разделение между hot path (доставка сообщения) и cold path (сохранение в историю, аналитика). Go отлично подходит для этой задачи благодаря эффективной работе с большим количеством горутин и сетевых соединений при умерленном потреблении памяти.

Вопрос 4. Какой объём хранилища потребуется для текстовых сообщений и медиа (видео) в день?

Таймкод: 00:12:39

Ответ собеседника: правильный. Для текста: среднее сообщение — 150 байт, 25 млрд сообщений × 150 байт ≈ 3,7 ТБ в день. Для видео: в среднем 1 видео на пользователя в день, размер видео около 3 МБ, 500 млн × 3 МБ ≈ 1,5 ПБ (около 1500 ТБ) в день.

Правильный ответ:

Расчёты собеседателя в целом верны. Детализируем и расширим оценку для полноты картины.

1. Текстовые сообщения

Базовый расчёт без метаданных:

  • 25 млрд × 150 байт = 3.75 ТБ/день

Но в реальности каждое сообщение хранится с метаданными, которые значительно увеличивают размер:

message_id: 16 байт (UUID)
chat_id: 16 байт
sender_id: 16 байт
created_at: 8 байт (timestamp)
text: ~150 байт (средний текст)
status: 1 байт (sent/delivered/read)
reply_to_id: 16 байт (nullable, но место зарезервировано)
---
Итого: ~220-250 байт на сообщение

С метаданными:

  • 25 млрд × 250 байт = ~6.25 ТБ/день
  • С индексами (B-tree на chat_id + created_at, sender_id): индексы добавляют 30–50% → ~8–9 ТБ/день
  • За год: ~3 ПБ только текстовых сообщений с индексами

2. Медиа-контент

Собеседатель оценил только видео. Рассмотрим все типы медиа:

Видео:

  • 1 видео на пользователя в день × 500M = 500M видео
  • Средний размер 3 МБ (с учётом сжатия, короткие видео преобладают)
  • 500M × 3 МБ = 1.5 ПБ/день

Изображения:

  • В среднем 3–5 изображений на пользователя в день
  • 500M × 4 × 200KB (с учётом сжатия и превью) = ~400 ТБ/день

Голосовые сообщения:

  • 1–2 на пользователя в день
  • 500M × 1.5 × 100KB (opus, ~30 сек) = ~75 ТБ/день

Файлы (документы, прочее):

  • 0.2 файла на пользователя в день
  • 500M × 0.2 × 1MB = ~100 ТБ/день

Итого медиа: ~2 ПБ/день

3. Превью и транскодирование

Каждое видео и изображение генерирует дополнительные файлы:

  • Превью видео (thumbnail): ~50KB × 500M = 25 ТБ/день
  • Несколько разрешений видео (360p, 720p): ×1.5 к оригиналу → +750 ТБ/день
  • Превью изображений (thumbnail, medium): ×1.2 к оригиналу → +80 ТБ/день

С транскодированием: ~2.8–3 ПБ/день медиа-хранилища

4. Общий итог

Тип данныхВ деньВ год
Текст + метаданные + индексы~9 ТБ~3 ПБ
Медиа (оригиналы)~2 ПБ~730 ПБ
Превью и транскодирование~1 ПБ~365 ПБ
Итого~3–4 ПБ~1.1 ЭБ

5. Стратегия хранения

При таких объёмах критически важна политика хранения:

  • Горячее хранилище (SSD/NVMe): последние 30 дней → ~100–120 ПБ
  • Тёплое хранилиство (HDD): 30–365 дней → ~1–2 ЭБ
  • Холодное хранилище (S3 Glacier / лента): старше года → архив

Для медиа используется S3-совместимое хранилище (MinIO, AWS S3) с lifecycle-политиками для автоматического перемещения в холодные типы хранения.

6. Оптимизация объёма

  • Дедупликация медиа: если одно и то же видео отправлено 1000 раз, хранить один файл с 1000 ссылками. Экономия до 30–50% на медиа.
  • Сжатие текста: при хранении в БД использовать TOAST (PostgreSQL) или сжатие на уровне СУБД.
  • TTL для медиа: возможность автоудаления старых файлов по политике (например, через 2 года).

Вопрос 5. Какие основные компоненты должны быть в высокоуровневой архитектуре мессенджера и какие технологии хранения данных для них предлагаются?

Таймкод: 00:17:19

Ответ собеседника: неполный. Выделены компоненты: сервис авторизации, сервис пользователей и чатов, сервис хранения сообщений, сервис медиа. Для авторизации и пользователей предлагается обычная реляционная БД. Для сообщений — отдельное хранилище сообщений плюс поисковый движок (например, Elasticsearch) для полнотекстового поиска. Для медиа — отдельное хранилище большого объёма. Кандидат высказал сомнение, что Elasticsearch подойдёт как основная БД для сообщений из-за сложности обновления записей, предложил разделять основное хранилище и поисковый индекс.

Правильный ответ:

Кандидат верно выделил ключевые компоненты и правильно отметил проблему Elasticsearch как primary store. Но архитектура мессенджера на масштабе 500M пользователей значительно сложнее. Приведём полный разбор.

1. Полный список компонентов

Клиентский слой:

  • Мобильные приложения (iOS, Android)
  • Веб-клиент
  • Десктоп-клиент
  • API Gateway / Load Balancer (Envoy, NGINX, HAProxy)

WebSocket-слой:

  • WebSocket-серверы — поддержание постоянных соединений с клиентами
  • Connection Registry — реестр «какой пользователь на каком сервере» (Redis)
  • Presence Service — отслеживание онлайн/оффлайн статусов

Сервисный слой (бизнес-логика):

СервисОтветственность
Auth ServiceРегистрация, логин, токены, 2FA
User ServiceПрофили, настройки, контакты
Chat ServiceСоздание чатов, управление участниками, метаданные
Message ServiceОтправка, получение, редактирование, удаление сообщений
Media ServiceЗагрузка, транскодирование, хранение медиа
Search ServiceПолнотекстовый поиск по сообщениям
Notification ServicePush-уведомления (APNs, FCM)
Sync ServiceСинхронизация состояния между устройствами

Инфраструктурный слой:

  • Message Broker (Kafka / NATS JetStream) — асинхронная маршрутизация сообщений
  • CDN — раздача медиа-контента
  • Object Storage (S3 / MinIO) — файлы медиа

2. Хранение данных по компонентам

Auth Service и User Service:

  • PostgreSQL — ACID-транзакции критичны для авторизации
  • Шардирование по user_id при масштабе 500M
  • Репликация: master для записи, read-replicas для чтения профилей
  • Для сессий и токенов: Redis с TTL
-- Пример схемы для пользователей
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
phone VARCHAR(20) UNIQUE NOT NULL,
username VARCHAR(50) UNIQUE,
display_name VARCHAR(100),
avatar_url TEXT,
last_seen_at TIMESTAMPTZ DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_users_phone ON users(phone);
CREATE INDEX idx_users_username ON users(username);

Chat Service:

  • PostgreSQL — метаданные чатов, участники, настройки
  • Относительно небольшой объём данных (миллионы чатов, не миллиарды)
CREATE TABLE chats (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
type VARCHAR(10) NOT NULL CHECK (type IN ('private', 'group')),
title VARCHAR(200),
creator_id UUID REFERENCES users(id),
created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE chat_members (
chat_id UUID REFERENCES chats(id),
user_id UUID REFERENCES users(id),
role VARCHAR(20) DEFAULT 'member', -- member, admin, owner
joined_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (chat_id, user_id)
);

CREATE INDEX idx_chat_members_user ON chat_members(user_id);

Message Service — самый критичный компонент по нагрузке:

Здесь кандидат абсолютно прав: Elasticsearch не должен быть основным хранилищем сообщений. Правильный подход — разделение:

  • Primary Store: ScyllaDB (или Cassandra) — оптимизирована для высокой записи, горизонтальное масштабирование, tunable consistency
  • Поисковый индекс: Elasticsearch — синхронизируется из primary store через CDC (Change Data Capture) или двойную запись
  • Кэш горячих сообщений: Redis — последние N сообщений каждого чата
-- Схема для ScyllaDB (CQL)
CREATE TABLE messages (
chat_id UUID,
bucket TEXT, -- партиция по времени: '2025-01'
message_id TIMEUUID,
sender_id UUID,
text TEXT,
media_urls LIST<TEXT>,
reply_to UUID,
created_at TIMESTAMP,
status TEXT,
PRIMARY KEY ((chat_id, bucket), message_id)
) WITH CLUSTERING ORDER BY (message_id DESC)
AND compaction = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_size': 7,
'compaction_window_unit': 'DAYS'};

Ключевое решение — составной ключ партиции (chat_id, bucket):

  • chat_id обеспечивает локальность данных чата на одной ноде
  • bucket (месяц) предотвращает рост партиций до бесконечности
  • message_id как TIMEUUID даёт натуральную сортировку по времени

Media Service:

  • Object Storage (S3/MinIO) — оригиналы и транскодированные версии
  • PostgreSQL — метаданные файлов (owner, chat, content_type, size, storage_key)
  • Redis — кэш горячих превью

Search Service:

  • Elasticsearch — только как поисковый индекс, не как source of truth
  • Синхронизация через Kafka Connect или кастомный consumer
// Пример: двойная запись — в ScyllaDB и в очередь для Elasticsearch
func (s *MessageService) SaveAndIndex(ctx context.Context, msg *Message) error {
// 1. Сохраняем в primary store (ScyllaDB)
if err := s.messageStore.Save(ctx, msg); err != nil {
return fmt.Errorf("save message: %w", err)
}

// 2. Публикуем событие для индексации в Elasticsearch
event := &MessageIndexedEvent{
MessageID: msg.ID,
ChatID: msg.ChatID,
SenderID: msg.SenderID,
Text: msg.Text,
CreatedAt: msg.CreatedAt,
}

if err := s.broker.Publish(ctx, "search-index", event); err != nil {
// Не фейлим всю операцию — индексация может быть восстановлена
log.Error("publish search index event", err)
s.dlq.Send(event) // Dead Letter Queue для ручного восстановления
}

return nil
}

Presence Service:

  • Redis с TTL — ключ presence:{user_id} с expire 60 секунд
  • Pub/Sub для рассылки изменений статуса контактам

3. Поток данных (data flow)

Client → WebSocket → API Gateway → Message Service
├──→ ScyllaDB (primary write)
├──→ Kafka (async delivery + indexing)
│ ├──→ Consumer → WebSocket delivery
│ ├──→ Consumer → Elasticsearch
│ └──→ Consumer → Push notification
└──→ Redis (cache recent messages)

4. Почему именно такие технологии

  • PostgreSQL для пользователей и чатов: данные относительно небольшие, важна консистентность, сложные запросы с JOIN
  • ScyllaDB для сообщений: написана на C++ (в отличие от Cassandra на Java), даёт в 10× пропускную способность на ноду, идеальна для write-heavy workload, автоматическое шардирование
  • Redis для presence и кэша: субмиллисекундная латентность, pub/subscribe, нативный TTL
  • Kafka для брокера: гарантированная доставка, возможность ребалансировки consumer group, retention policy
  • Elasticsearch только для поиска: инвертированные индексы, полнотекстовый поиск, но плохо подходит для frequent updates и не является source of truth

Вопрос 6. Как обеспечить порядок сообщений в чате?

Таймкод: 00:22:46

Ответ собеседника: правильный. Предложено использовать время отправки сообщения для установления порядка. Для текстового чата допустимо, если пара сообщений будет слегка перемешана по порядку.

Правильный ответ:

Ответ в целом верный, но требует существенного углубления. Порядок сообщений — это нетривиальная проблема распределённых систем, и у каждого подхода есть компромиссы.

1. Проблема времени в распределённых системах

Использование NOW() или time.Now() ненадёжно:

  • Часы на разных серверах расходятся (clock skew) — даже с NTP разница может быть 10–100мс
  • Два сообщения, отправленные одновременно разными пользователями, могут получить неправильный порядок
  • Сообщение, записанное позже из-за сетевой задержки, получит более поздний timestamp

2. Подходы к обеспечению порядка

A. Логические часы (Lamport Timestamps)

Каждый узел поддерживает монотонно возрастающий счётчик. При получении сообщения от другого узла — обновляет max(local, received) + 1.

type LamportClock struct {
mu sync.Mutex
counter uint64
}

func (c *LamportClock) Tick() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
c.counter++
return c.counter
}

func (c *LamportClock) Update(received uint64) uint64 {
c.mu.Lock()
defer c.mu.Unlock()
if received > c.counter {
c.counter = received
}
c.counter++
return c.counter
}

Плюсы: не зависит от физических часов. Минусы: не даёт глобального порядка между разными чатами, только частичный.

B. Hybrid Logical Clocks (HLC)

Комбинация физического времени и логического счётчика. Используется в CockroachDB и Spanner-подобных системах.

type HLC struct {
mu sync.Mutex
wallTime int64 // физическое время в наносекундах
logical uint16 // логический счётчик
}

func (h *HLC) Now() (int64, uint16) {
h.mu.Lock()
defer h.mu.Unlock()

now := time.Now().UnixNano()
if now > h.wallTime {
h.wallTime = now
h.logical = 0
} else {
h.logical++
}
return h.wallTime, h.logical
}

Плюсы: близко к реальному времени, монотонно возрастает. Минусы: сложнее реализовать, всё ещё не гарантирует глобальный порядок без координации.

C. Централизованный серийный номер (Sequence Service)

Единый сервис выдаёт монотонно возрастающие ID для каждого чата. Это наиболее надёжный подход.

type SequenceService struct {
db *sql.DB
}

func (s *SequenceService) NextMessageID(ctx context.Context, chatID string) (int64, error) {
var seqID int64
err := s.db.QueryRowContext(ctx,
`UPDATE chat_sequences
SET last_id = last_id + 1
WHERE chat_id = $1
RETURNING last_id`, chatID,
).Scan(&seqID)

if err == sql.ErrNoRows {
// Первое сообщение в чате
_, err = s.db.ExecContext(ctx,
`INSERT INTO chat_sequences (chat_id, last_id) VALUES ($1, 1)`,
chatID,
)
return 1, err
}
return seqID, err
}
CREATE TABLE chat_sequences (
chat_id UUID PRIMARY KEY,
last_id BIGINT NOT NULL DEFAULT 0
);

Плюсы: гарантированный строгий порядок внутри чата. Минусы: единая точка контенции, дополнительный запрос при каждом сообщении.

D. Snowflake / UUIDv7 (рекомендуемый подход для мессенджера)

ID генерируется на стороне сервиса и содержит встроенный timestamp. Позволяет сортировку без дополнительных запросов.

// Используем UUIDv7 (timestamp-based UUID)
import "github.com/gofrs/uuid"

func GenerateMessageID() uuid.UUID {
// UUIDv7 содержит timestamp в первых 48 битах
// Естественно сортируется по времени
id, _ := uuid.NewV7()
return id
}

Для ScyllaDB встроен тип TIMEUUID, который идеально подходит:

CREATE TABLE messages (
chat_id UUID,
bucket TEXT,
message_id TIMEUUID, -- содержит timestamp + уникальность
sender_id UUID,
text TEXT,
PRIMARY KEY ((chat_id, bucket), message_id)
) WITH CLUSTERING ORDER BY (message_id DESC);

3. Рекомендуемая стратегия для мессенджера

Комбинированный подход:

  1. В пределах одного чата — используем TIMEUUID или Snowflake ID как message_id. Они содержат timestamp и гарантируют уникальность. При равных timestamp-ах (маловероятно на наносекундном уровне) — логическая часть UUID разрешает коллизию.

  2. Для кросс-чатовой сортировки (список чатов по последнему сообщению) — отдельный last_message_at в таблице чатов.

  3. Допустимость пересортировки — кандидат прав: для текстового мессенджера допустимо, если 2–3 сообщения окажутся в «неправильном» порядке из-за сетевой задержки. Это не критично для UX. Но важно, чтобы в 99.9% случаев порядок был корректным.

  4. Клиентская дедупликация и сортировка — клиент должен сортировать сообщения по message_id (или created_at) и дедуплицировать по message_id, потому что при retry одно сообщение может прийти дважды.

// Клиентская сортировка при получении
type Message struct {
ID string `json:"id"`
ChatID string `json:"chat_id"`
SenderID string `json:"sender_id"`
Text string `json:"text"`
CreatedAt time.Time `json:"created_at"`
SeqNum int64 `json:"seq_num,omitempty"` // опционально
}

// Сервис на стороне клиента
func (c *ChatView) OnMessageReceived(msg Message) {
// Дедупликация
if c.seenIDs.Contains(msg.ID) {
return
}
c.seenIDs.Add(msg.ID)

// Вставка в правильную позицию (бинарный поиск по времени)
idx := sort.Search(len(c.messages), func(i int) bool {
return c.messages[i].CreatedAt.After(msg.CreatedAt)
})
c.messages = append(c.messages, Message{})
copy(c.messages[idx+1:], c.messages[idx:])
c.messages[idx] = msg
}

4. Обработка граничных случаев

  • Сообщения с одинаковым timestamp: UUIDv7 гарантирует уникальность даже при одинаковом timestamp через random bits
  • Пользователь с неправильными часами: timestamp берётся с сервера, а не от клиента
  • Retry при потере соединения: клиент отправляет сообщение с тем же client-generated ID, сервер дедуплицирует
  • Сообщения из разных партиций (при шардировании): каждая партиция сортируется локально, клиент выполняет merge при пагинации

Вопрос 7. Как организовать хранение медиафайлов (видео, картинки) при объёме около 1,5 ПБ в день?

Таймкод: 00:25:23

Ответ собеседника: неполный. Предложено разделение на горячее хранилище (object storage / S3-подобное) для свежих данных, которые активно просматриваются в первые часы, и холодное хранилище на более дешёвых медленных дисках для старых данных. Также предложено пережимать медиа (сжатие с потерей качества) при загрузке и дополнительно сжимать для холодного хранилища. Кандидат честно признал, что не имеет опыта работы с такими объёмами и не может точно сказать, какое именно решение лучше подойдёт для холодного хранилища.

Правильный ответ:

Направление мысли верное — tiered storage и пережатие медиа это правильные подходы. Раскроем тему детально.

1. Архитектура медиа-хранилища

A. Горячее хранилище (0–7 дней)

Свежие медиа просматриваются чаще всего — 80% просмотров приходится на контент младше 3 дней.

  • Технология: NVMe SSD через S3-совместимое хранилище
  • Варианты: AWS S3 Standard, MinIO на NVMe, Ceph с SSD-pool
  • Объём: 1.5 ПБ/день × 7 дней = ~10 ПБ
  • Репликация: 3 реплики (или erasure coding 4+2 для экономии)
  • CDN: CloudFront / Cloudflare CDN перед хранилищем для снижения latency

B. Тёплое хранилище (7–90 дней)

Контент всё ещё иногда запрашивается, но значительно реже.

  • Технология: HDD массивы через S3
  • Варианты: AWS S3 Infrequent Access, MinIO на HDD, Ceph с HDD-pool
  • Объём: 1.5 ПБ/день × 83 дня = ~125 ПБ
  • Erasure coding: 8+3 (экономия ~40% по сравнению с 3 репликами)

C. Холодное хранилище (90+ дней)

Практически не запрашивается, но должен быть доступен (перманентная история).

  • Технология: AWS S3 Glacier Deep Archive, tape storage
  • Стоимость: ~1/TB/месяц(против1/TB/месяц (против 23/TB для S3 Standard)
  • Объём: накапливается годами — десятки петабайт
  • Время доступа: минуты для Glacier, часы для tape

2. Lifecycle-политики автоматического перемещения

// Пример lifecycle-политики в формате AWS S3 (аналогично для других хранилищ)
lifecycleConfig := &s3types.LifecycleRule{
ID: aws.String("media-tiered-storage"),
Status: s3types.ExpirationStatusEnabled,
Transitions: []s3types.Transition{
{
Days: 7,
StorageClass: s3types.TransitionStorageClassStandardIa, // горячее → тёплое
},
{
Days: 90,
StorageClass: s3types.TransitionStorageClassGlacier, // тёплое → холодное
},
{
Days: 365,
StorageClass: s3types.TransitionStorageClassDeepArchive, // холодное → архив
},
},
}

3. Оптимизация объёма — транскодирование и сжатие

A. Видео — адаптивный bitrate (HLS/DASH)

Вместо хранения одного файла — набор разрешений:

original_1080p.mp4 → 3 МБ
720p.mp4 → 1.5 МБ
480p.mp4 → 0.7 МБ
360p.mp4 → 0.3 МБ
thumbnail.jpg → 50 КБ
---
Итого: ~5.5 МБ вместо 3 МБ (но пользователь скачивает только нужное разрешение)

Экономия трафика: пользователь с плохим соединением грузит 360p вместо 1080p — экономия 90%.

B. Изображения — множественные размеры

original.jpg → 2 МБ
large.jpg (1200px) → 300 КБ
medium.jpg (600px) → 100 КБ
thumbnail.jpg (150px) → 20 КБ
---
Итого: ~3.4 МБ на изображение

Но клиент загружает только thumbnail в списке чатов и medium для предпросмотра — экономия 95% трафика.

C. Пережатие для холодного хранилища

При перемещении в холодное хранилище (90+ дней):

  • Удаляем оригинал и 1080p, оставляем только 480p + thumbnail
  • Изображения конвертируем в WebP/AVIF (на 30–50% меньше JPEG)
  • Экономия: 60–70% объёма
type TranscodingWorker struct {
s3Client *s3.Client
ffmpeg *ffmpeg.FFmpeg
}

func (w *TranscodingWorker) ProcessMedia(ctx context.Context, key string) error {
// 1. Загружаем оригинал
original, err := w.s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String("media-hot"),
Key: aws.String(key),
})

// 2. Генерируем варианты
variants := []Variant{
{Suffix: "_360p", Width: 640, Bitrate: "400k"},
{Suffix: "_480p", Width: 854, Bitrate: "800k"},
{Suffix: "_720p", Width: 1280, Bitrate: "1500k"},
{Suffix: "_thumb", Width: 150, Bitrate: "50k"},
}

for _, v := range variants {
output, err := w.ffmpeg.Transcode(original.Body, v)
if err != nil {
return fmt.Errorf("transcode %s: %w", v.Suffix, err)
}

_, err = w.s3Client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String("media-hot"),
Key: aws.String(key + v.Suffix),
Body: output,
ContentType: aws.String("video/mp4"),
})
if err != nil {
return fmt.Errorf("upload %s: %w", v.Suffix, err)
}
}

return nil
}

4. Дедупликация медиа

Критическая оптимизация при масштабе 500M пользователей. Один и тот же мем может быть отправлен миллионы раз.

type MediaDeduplicator struct {
s3Client *s3.Client
db *sql.DB
}

func (d *MediaDeduplicator) StoreMedia(ctx context.Context, fileData []byte, uploaderID string) (string, error) {
// 1. Вычисляем хеш содержимого
hash := sha256.Sum256(fileData)
hashKey := hex.EncodeToString(hash[:])

// 2. Проверяем, есть ли уже такой файл
var existingMediaID string
err := d.db.QueryRowContext(ctx,
`SELECT media_id FROM media_dedup WHERE content_hash = $1`,
hashKey,
).Scan(&existingMediaID)

if err == nil {
// Файл уже существует — просто создаём ссылку
_, err = d.db.ExecContext(ctx,
`INSERT INTO media_references (media_id, chat_id, uploader_id) VALUES ($1, $2, $3)`,
existingMediaID, chatID, uploaderID,
)
return existingMediaID, nil
}

// 3. Новый файл — загружаем в storage
mediaID := uuid.New().String()
storageKey := fmt.Sprintf("media/%s/%s", mediaID[:2], mediaID)

_, err = d.s3Client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String("media-hot"),
Key: aws.String(storageKey),
Body: bytes.NewReader(fileData),
})
if err != nil {
return "", err
}

// 4. Записываем хеш и метаданные
_, err = d.db.ExecContext(ctx,
`INSERT INTO media_dedup (media_id, content_hash, storage_key, size) VALUES ($1, $2, $3, $4)`,
mediaID, hashKey, storageKey, len(fileData),
)

return mediaID, err
}

Экономия: при масштабе мессенджера дедупликация может сократить объём хранилища на 30–50%, потому что популярный контент (мемы, стикеры, рассылки) отправляется многократно.

5. Метаданные медиа

CREATE TABLE media_dedup (
media_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
content_hash VARCHAR(64) UNIQUE NOT NULL, -- SHA-256
storage_key TEXT NOT NULL,
content_type VARCHAR(50),
size_bytes BIGINT,
ref_count INT DEFAULT 1, -- счётчик ссылок
created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE media_references (
id BIGSERIAL PRIMARY KEY,
media_id UUID REFERENCES media_dedup(media_id),
message_id UUID REFERENCES messages(message_id),
chat_id UUID NOT NULL,
uploader_id UUID NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_media_dedup_hash ON media_dedup(content_hash);
CREATE INDEX idx_media_refs_media ON media_references(media_id);

6. Экономика хранения (примерные цифры)

При 1.5 ПБ/день с учётом всех оптимизаций:

ОптимизацияЭкономия
Транскодирование (только нужные разрешения)-40% объёма, но +90% экономия трафика
Дедупликация-30–50% объёма
Пережатие в холодном хранилище-60% для старых данных
Erasure coding вместо реплик-40% overhead

Реальный объём хранения: ~0.8–1 ПБ/день вместо 1.5 ПБ/день.

Стоимость хранения в год (при смешанном tiered-подходе):

  • Горячее (10 ПБ × 23/TB/мес×12)23/TB/мес × 12) ≈ 2.8M/год
  • Тёплое (40 ПБ × 12.5/TB/мес×12)12.5/TB/мес × 12) ≈ 6M/год
  • Холодное (400 ПБ × 1/TB/мес×12)1/TB/мес × 12) ≈ 4.8M/год
  • Итого: ~$13–15M/год только на хранение медиа

Вопрос 8. Как реализовать индикатор онлайн-статуса пользователей?

Таймкод: 00:29:26

Ответ собеседника: правильный. Предложено хранить в отдельной БД время последней активности каждого пользователя (last seen). Если пользователь был активен менее минуты назад — он считается онлайн, иначе отображается «был в сети N минут назад». Клиент отправляет heartbeat-пакеты раз в несколько секунд для обновления статуса.

Правильный ответ:

Подход с last_seen рабочий, но при масштабе 500M пользователей и 10–15M одновременно онлайн возникает ряд нетривиальных проблем. Разберём полную архитектуру.

1. Проблема с простым last_seen подходом

Если каждый из 10M онлайн-пользователей отправляет heartbeat каждые 10 секунд:

  • 1M записей в секунду в БД — это колоссальная нагрузка на запись
  • Даже при использовании Redis — 1M ops/s требует кластера из 10–20 нод
  • При этом большая часть записей — обновление одного и того же ключа для пользователя, который и так онлайн

2. Оптимизированная архитектура — Presence Service

A. Два механизма: heartbeat + connection events

Вместо непрерывного heartbeat — комбинация:

  • При подключении WebSocket: пользователь точно онлайн → записываем в Redis
  • При отключении WebSocket: пользователь точно оффлайн → удаляем из Redis
  • Heartbeat только для last_seen: обновляем timestamp раз в 30–60 секунд (не каждые 10 секунд)

Это снижает нагрузку в 3–6 раз.

B. Хранение в Redis — два уровня

type PresenceService struct {
redis *redis.ClusterClient
db *sql.DB // PostgreSQL для персистентного last_seen
}

// Уровень 1: Redis Set для онлайн-пользователей
// Ключ: "presence:online" → Set из user_id
// O(1) для проверки, O(N) для получения всех (но не используется)

// Уровень 2: Redis String для last_seen
// Ключ: "presence:last_seen:{user_id}" → timestamp
// Обновляется при disconnect и периодически при heartbeat
func (s *PresenceService) OnUserConnect(ctx context.Context, userID string) error {
pipe := s.redis.Pipeline()

// Добавляем в множество онлайн
pipe.SAdd(ctx, "presence:online", userID)

// Устанавливаем last_seen с TTL 120 секунд
pipe.Set(ctx,
fmt.Sprintf("presence:last_seen:%s", userID),
time.Now().Unix(),
120*time.Second,
)

// Публикуем событие для подписчиков
pipe.Publish(ctx, fmt.Sprintf("presence:channel:%s"), "online")

_, err := pipe.Exec(ctx)
return err
}

func (s *PresenceService) OnUserDisconnect(ctx context.Context, userID string) error {
now := time.Now().Unix()

pipe := s.redis.Pipeline()

// Удаляем из множества онлайн
pipe.SRem(ctx, "presence:online", userID)

// Сохраняем финальный last_seen (без TTL — персистентно)
pipe.Set(ctx,
fmt.Sprintf("presence:last_seen:%s", userID),
now,
0, // без TTL
)

// Публикуем событие
pipe.Publish(ctx, fmt.Sprintf("presence:channel:%s"),
fmt.Sprintf("offline:%d", now))

_, err := pipe.Exec(ctx)
if err != nil {
return err
}

// Асинхронно сохраняем в PostgreSQL для истории
go s.saveLastSeenToDB(context.Background(), userID, now)

return nil
}

func (s *PresenceService) IsOnline(ctx context.Context, userID string) (bool, error) {
// Проверяем наличие в Redis Set — O(1)
return s.redis.SIsMember(ctx, "presence:online", userID).Result()
}

func (s *PresenceService) GetLastSeen(ctx context.Context, userID string) (time.Time, error) {
// Пробуем Redis
ts, err := s.redis.Get(ctx, fmt.Sprintf("presence:last_seen:%s", userID)).Int64()
if err == nil {
return time.Unix(ts, 0), nil
}

// Fallback на PostgreSQL
var lastSeen time.Time
err = s.db.QueryRowContext(ctx,
"SELECT last_seen FROM user_presence WHERE user_id = $1", userID,
).Scan(&lastSeen)

return lastSeen, err
}

C. Подписка на изменение статусов

Пользователь не должен опрашивать статус каждого контакта — это слишком дорого. Вместо этого — pub/sub:

type PresenceSubscriber struct {
redis *redis.ClusterClient
wsHub *WebSocketHub
userContacts map[string]map[string]bool // user_id → set of contact_ids
}

func (s *PresenceSubscriber) SubscribeToContacts(ctx context.Context, userID string, contactIDs []string) {
// Подписываемся на каналы всех контактов
pubsub := s.redis.Subscribe(ctx, contactChannels(contactIDs)...)
defer pubsub.Close()

ch := pubsub.Channel()
for msg := range ch {
// Получили изменение статуса контакта
// Рассылаем всем подписчикам этого контакта
s.broadcastStatusChange(msg.Channel, msg.Payload)
}
}

func (s *PresenceSubscriber) broadcastStatusChange(contactID string, status string) {
// Находим всех, кто имеет этого контакта в друзьях
subscribers := s.wsHub.GetSubscribersForContact(contactID)

for _, conn := range subscribers {
conn.Send(PresenceEvent{
UserID: contactID,
Status: status, // "online" или "offline:1705286400"
Timestamp: time.Now(),
})
}
}

3. Оптимизация для масштаба

A. Батчинг heartbeat-ов

Вместо индивидуального обновления — собираем heartbeat-ы и пишем пачками:

type HeartbeatBatcher struct {
redis *redis.ClusterClient
buffer map[string]int64 // user_id → last_seen


#### **Вопрос 9**. Какой протокол использовать для доставки сообщений в реальном времени и как решить задачу оповещения всех участников чата при горизонтальном масштабировании WebSocket-сервиса?

**Таймкод:** <YouTubeSeekTo id="IDSOAPPZsdU" time="00:31:54"/>

**Ответ собеседника:** **неполный**. Предложено использовать WebSocket как наиболее подходящий протокол для двусторонней связи клиент-сервер в реальном времени, особенно для веб-клиентов. Отмечено, что WebSocket-сервис будет запущен на нескольких экземплярах (нотах), и ключевая проблема — при поступлении нового сообщения определить, на какой именно нот отправить оповещение, чтобы уведомить нужного пользователя. Кандидат осознаёт проблему маршрутизации сообщений между шардами WebSocket-сервиса, но конкретного решения (например, через брокер сообщений / pub-sub / consistent hashing) не предложил.

**Правильный ответ:**

Кандидат верно определил проблему, но не довёл до решения. Это одна из ключевых архитектурных задач мессенджера. Разберём полностью.

**1. Выбор протокола**

**WebSocket** — правильный выбор для основного канала:
- Двусторонняя связь через одно постоянное TCP-соединение
- Низкий overhead по сравнению с HTTP polling
- Поддерживается всеми браузерами и мобильными SDK
- Работает через прокси и CDN (в отличие от raw TCP)

**Альтернативы и дополнения:**
- **gRPC Streaming** — для сервер-серверной коммуникации между микросервисами
- **MQTT** — для IoT-устройств или очень слабых соединений (используется в Facebook Messenger изначально)
- **HTTP/2 Server-Sent Events (SSE)** — как fallback для сетей, блокирующих WebSocket
- **QUIC / HTTP/3** — перспективное направление, лучше работает при смене сети (WiFi → 4G)

**2. Проблема маршрутизации — суть**

┌─────────────┐ │ User A │ │ (connected │ │ to WS-1) │ └──────┬──────┘ │ ┌──────▼──────┐ │ WS Server 1│ └──────┬──────┘ │ User B ──── WS Server 2 ───┤── Как доставить сообщение от A к B, │ если они на разных серверах? ┌──────▼──────┐ │ WS Server 3│ └──────┬──────┘ │ ┌──────▼──────┐ │ User C │ │ (connected │ │ to WS-3) │ └─────────────┘


При 20M соединений и 300 серверах — в среднем каждый сервер знает только о 67K пользователях из 500M. Нужен механизм маршрутизации.

**3. Решение — трёхуровневая архитектура**

**Уровень 1: Connection Registry (Redis)**

Хранит маппинг `user_id → ws_server_id`:

```go
type ConnectionRegistry struct {
redis *redis.ClusterClient
}

func (r *ConnectionRegistry) Register(ctx context.Context, userID string, serverID string) error {
// Пользователь может быть подключен с нескольких устройств
key := fmt.Sprintf("conn:%s", userID)
return r.redis.SAdd(ctx, key, serverID).Err()
}

func (r *ConnectionRegistry) Unregister(ctx context.Context, userID string, serverID string) error {
key := fmt.Sprintf("conn:%s", userID)
return r.redis.SRem(ctx, key, serverID).Err()
}

func (r *ConnectionRegistry) GetServers(ctx context.Context, userID string) ([]string, error) {
key := fmt.Sprintf("conn:%s", userID)
return r.redis.SMembers(ctx, key).Result()
}

Уровень 2: Message Broker (Kafka / Redis Pub-Sub)

Для маршрутизации сообщений между WebSocket-серверами:

type WebSocketServer struct {
id string
connections map[string]*ClientConn // user_id → connection
registry *ConnectionRegistry
kafkaReader *kafka.Reader
kafkaWriter *kafka.Writer
mu sync.RWMutex
}

func (s *WebSocketServer) Start(ctx context.Context) error {
// Запускаем consumer для получения сообщений из Kafka
go s.consumeMessages(ctx)

// Запускаем HTTP-сервер для WebSocket
return s.startHTTPServer(ctx)
}

func (s *WebSocketServer) consumeMessages(ctx context.Context) {
for {
msg, err := s.kafkaReader.ReadMessage(ctx)
if err != nil {
log.Error("kafka read", err)
continue
}

var deliveryMsg DeliveryMessage
json.Unmarshal(msg.Value, &deliveryMsg)

// Проверяем, есть ли получатель на нашем сервере
s.mu.RLock()
conn, exists := s.connections[deliveryMsg.RecipientID]
s.mu.RUnlock()

if exists {
// Пользователь подключен к этому серверу — доставляем напрямую
conn.Send(deliveryMsg)
}
// Если нет — сообщение проигнорируется (другой сервер его обработает)
}
}

Уровень 3: Delivery Service (Consumer Group)

Отдельный сервис, который читает из Kafka и маршрутизирует:

type DeliveryService struct {
registry *ConnectionRegistry
kafkaReader *kafka.Reader
wsServers map[string]WSClient // ws_server_id → gRPC client
}

func (s *DeliveryService) ProcessDelivery(ctx context.Context, msg *Message) error {
// 1. Получаем участников чата
members, err := s.chatService.GetMembers(ctx, msg.ChatID)
if err != nil {
return err
}

// 2. Для каждого участника определяем сервер
for _, memberID := range members {
if memberID == msg.SenderID {
continue // не доставляем отправителю
}

// 3. Проверяем, онлайн ли пользователь
servers, err := s.registry.GetServers(ctx, memberID)
if err != nil || len(servers) == 0 {
// Пользователь оффлайн — отправляем push
s.pushNotifier.Queue(ctx, memberID, msg)
continue
}

// 4. Отправляем на каждый сервер, где пользователь подключен
for _, serverID := range servers {
delivery := &DeliveryMessage{
RecipientID: memberID,
Message: msg,
ServerID: serverID,
}

// Публикуем в топик, партиционированный по serverID
// Каждый WebSocket-сервер читает только свою партицию
s.kafkaWriter.WriteMessages(ctx, kafka.Message{
Key: []byte(serverID),
Value: mustMarshal(delivery),
})
}
}

return nil
}

4. Альтернативный подход — Consistent Hashing

Вместо брокера можно использовать consistent hashing для маршрутизации:

type ConsistentHashRouter struct {
ring *consistent.Consistent
wsServers map[string]*WSClientConn
}

func NewConsistentHashRouter() *ConsistentHashRouter {
cfg := consistent.Config{
PartitionCount: 271, // простое число для равномерности
ReplicationFactor: 3, // количество реплик
Load: 1.25, // коэффициент загрузки
Hasher: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
}

return &ConsistentHashRouter{
ring: consistent.New(nil, cfg),
}
}

func (r *ConsistentHashRouter) AddServer(serverID string) {
r.ring.Add(consistent.Member(serverID))
}

func (r *ConsistentHashRouter) Route(userID string) (string, error) {
// Определяем, на каком сервере должен быть пользователь
serverID := r.ring.LocateKey([]byte(userID))
return string(serverID), nil
}

Плюсы: детерминированная маршрутизация, не нужен lookup в Redis. Минусы: сложнее обрабатывать добавление/удаление серверов, нет нативной поддержки мультидевайс.

5. Рекомендуемая архитектура — гибрид

Для масштаба 500M пользователей рекомендуется комбинация:

Client → WebSocket → WS Server 1

├──→ Kafka (partitioned by server_id)
│ │
│ ┌────▼─────┐
│ │Consumer │
│ │Group │
│ └────┬─────┘
│ │
│ ┌────▼─────┐
│ │ WS │
│ │ Server 2 │──→ Client
│ └──────────┘

└──→ Redis (connection registry)

Ключевые решения:

  • Kafka партиционирован по ws_server_id — каждый WS-сервер читает только свою партицию, гарантируя что сообщение будет обработано ровно одним сервером
  • Redis хранит user_id → [server_ids] — для мультидевайс поддержки
  • Delivery Service определяет целевые серверы и публикует в нужные партиции
  • gRPC для сервер-серверной коммуникации внутри датацентра (ниже latency чем Kafka для внутренних вызовов)

6. Обработка граничных случаев

func (s *WebSocketServer) HandleClientDisconnect(userID string) {
s.mu.Lock()
delete(s.connections, userID)
s.mu.Unlock()

// Уведомляем registry
s.registry.Unregister(context.Background(), userID, s.id)

// Публикуем событие для presence service
s.kafkaWriter.WriteMessages(context.Background(), kafka.Message{
Key: []byte("presence"),
Value: mustMarshal(PresenceEvent{UserID: userID, Status: "offline"}),
})
}

// Обработка дублирующих соединений (один пользователь, два устройства)
func (s *WebSocketServer) HandleNewConnection(userID string, conn *ClientConn) {
s.mu.Lock()

// Проверяем, есть ли существующее соединение с этого же устройства
if existing, ok := s.connections[userID]; ok {
// Отправляем клиенту команду закрыть старое соединение
existing.Send(ClientCommand{Type: "force_disconnect"})
existing.Close()
}

s.connections[userID] = conn
s.mu.Unlock()

s.registry.Register(context.Background(), userID, s.id)
}

7. Fallback-стратегия

WebSocket может быть заблокирован корпоративными прокси или нестабильными сетями:

type TransportManager struct {
wsServer *WebSocketServer
sseServer *SSEServer
lpServer *LongPollingServer
}

func (t *TransportManager) HandleConnect(w http.ResponseWriter, r *http.Request) {
// Пробуем WebSocket
if isWebSocket(r) {
t.wsServer.Upgrade(w, r)
return
}

// Fallback на SSE (для односторонней доставки)
if acceptsSSE(r) {
t.sseServer.ServeHTTP(w, r)
return
}

// Последний fallback — long polling
t.lpServer.ServeHTTP(w, r)
}

Эта архитектура позволяет горизонтально масштабировать WebSocket-слой до сотен серверов, сохраняя гарантию доставки и поддерживая мультидевайс-подключения.

Вопрос 10. Как организовать доставку сообщений в реальном времени между двумя пользователями, подключёнными к разным нодам WebSocket-сервиса, и какой протокол для этого использовать?

Таймкод: 00:35:30

Ответ собеседника: неполный. Предложено использовать WebSocket для доставки сообщений в реальном времени. Для маршрутизации сообщений между нодами предложены два подхода: 1) Через очередь (Kafka) с ключом по получателю — простой, но требующий хранения большого объёма данных. 2) Через сервис обнаружения (Discovery), который хранит информацию о том, к какой ноде подключён каждый пользователь. Сервис сообщений при доставке сначала узнаёт у Discovery, где находится получатель, и отправляет сообщение напрямую. Discovery можно реализовать поверх ZooKeeper для обеспечения консистентности. Также отмечено, что если пользователь оффлайн, сообщение не нужно отправлять через WebSocket, и следует сохранить и/или отправить push-уведомление.

Правильный ответ:

Кандидат предложил два валидных подхода, но не описал их детально и не указал на ключевые проблемы. Разберём оба подхода и оптимальное решение.

1. Протокол доставки клиенту

WebSocket — стандарт для реального времени:

  • Полнодуплексная связь через одно TCP-соединение
  • ~2 байта overhead на фрейм (в отличие от HTTP-заголовков ~800 байт)
  • Поддержка бинарных данных и текста
  • Нативная поддержка в браузерах и мобильных SDK

Для мобильных клиентов — дополнительно учитывать:

  • Ping/Pong фреймы для поддержания соединения через NAT
  • Reconnection с экспоненциальным backoff при обрыве
  • Foreground/Background режимы — в background соединение может быть убито OS

2. Подход A: Kafka с партиционированием по server_id

Кандидат верно отметил этот подход, но не раскрыл детали.

Принцип работы:

  • Kafka топик message-delivery с партициями по ws_server_id
  • Каждый WebSocket-сервер является consumer-ом и читает только свои партиции
  • Delivery Service определяет целевой сервер и пишет в соответствующую партицию
type KafkaDeliveryRouter struct {
writer *kafka.Writer
registry *ConnectionRegistry
}

func (r *KafkaDeliveryRouter) RouteMessage(ctx context.Context, msg *Message, recipientID string) error {
// 1. Узнаём, на каких серверах подключён получатель
serverIDs, err := r.registry.GetServers(ctx, recipientID)
if err != nil || len(serverIDs) == 0 {
// Пользователь оффлайн — только push
return r.pushNotifier.Queue(ctx, recipientID, msg)
}

// 2. Отправляем в Kafka, партиционируя по server_id
delivery := &DeliveryMessage{
RecipientID: recipientID,
Message: msg,
}

for _, serverID := range serverIDs {
err := r.writer.WriteMessages(ctx, kafka.Message{
Topic: "message-delivery",
Key: []byte(serverID), // определяет партицию
Value: mustMarshal(delivery),
})
if err != nil {
return fmt.Errorf("write to kafka: %w", err)
}
}

return nil
}

// На стороне WebSocket-сервера
type WebSocketServer struct {
serverID string
reader *kafka.Reader
conns map[string]*Conn
}

func (s *WebSocketServer) StartConsumer(ctx context.Context) {
// Consumer читает только свою партицию
s.reader = kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka:9092"},
Topic: "message-delivery",
GroupID: "ws-servers",
// Каждый сервер назначен на свои партиции через Assign
PartitionAssigner: s,
})

for {
msg, err := s.reader.ReadMessage(ctx)
if err != nil {
log.Error("kafka read", err)
continue
}

var delivery DeliveryMessage
json.Unmarshal(msg.Value, &delivery)

// Доставляем клиенту
if conn, ok := s.conns[delivery.RecipientID]; ok {
conn.Send(delivery.Message)
}
}
}

Плюсы Kafka-подхода:

  • Гарантия доставки (at-least-once) с ребалансировкой
  • Буферизация при перегрузке конкретного сервера
  • Масштабируемость — добавление серверов = добавление партиций
  • Аудит — все сообщения логируются в Kafka

Минусы Kafka-подхода:

  • Латентность: Kafka добавляет ~5–20мс к доставке
  • Сложность управления партициями при добавлении/удалении серверов
  • Overhead для каждого сообщения (сериализация, сетевой вызов)

3. Подход B: Discovery Service + прямая доставка через gRPC

Более низколатентный подход для hot path.

type DiscoveryService struct {
redis *redis.ClusterClient
}

func (d *DiscoveryService) FindServers(ctx context.Context, userID string) ([]ServerAddress, error) {
key := fmt.Sprintf("conn:%s", userID)
serverIDs, err := d.redis.SMembers(ctx, key).Result()
if err != nil {
return nil, err
}

// Преобразуем server_id в адреса
addrs := make([]ServerAddress, 0, len(serverIDs))
for _, id := range serverIDs {
addr, err := d.resolveServer(id)
if err != nil {
continue
}
addrs = append(addrs, addr)
}

return addrs, nil
}

type DirectDeliveryService struct {
discovery *DiscoveryService
grpcConns map[string]*grpc.ClientConn // server_id → gRPC connection
}

func (s *DirectDeliveryService) DeliverMessage(ctx context.Context, msg *Message, recipientID string) error {
// 1. Находим серверы получателя
servers, err := s.discovery.FindServers(ctx, recipientID)
if err != nil || len(servers) == 0 {
return s.pushNotifier.Queue(ctx, recipientID, msg)
}

// 2. Отправляем напрямую через gRPC
delivery := &pb.DeliveryRequest{
RecipientId: recipientID,
Message: convertToProto(msg),
}

var lastErr error
for _, server := range servers {
conn := s.grpcConns[server.ID]
client := pb.NewWebSocketDeliveryClient(conn)

_, err := client.Deliver(ctx, delivery)
if err != nil {
lastErr = err
log.Error("gRPC delivery failed", err, "server", server.ID)
}
}

return lastErr
}

// На стороне WebSocket-сервера — gRPC handler
type WSServer struct {
conns map[string]*ClientConn
}

func (s *WSServer) Deliver(ctx context.Context, req *pb.DeliveryRequest) (*pb.DeliveryResponse, error) {
conn, ok := s.conns[req.RecipientId]
if !ok {
return &pb.DeliveryResponse{Status: "user_not_here"}, nil
}

err := conn.Send(req.Message)
if err != nil {
return nil, err
}

return &pb.DeliveryResponse{Status: "delivered"}, nil
}

Плюсы gRPC-подхода:

  • Латентность: ~1–5мс (в пределах датацентра)
  • Прямая доставка без промежуточного брокера
  • Меньше инфраструктуры

Минусы gRPC-подхода:

  • Нет гарантии доставки при сбое — нужен retry и dead letter queue
  • Нет буферизации — если сервер перегружен, сообщения теряются
  • Сложнее при мультидевайс (нужно отправлять на несколько серверов)

4. Рекомендуемое решение — гибрид

Для production мессенджера рекомендуется комбинация:

Hot path (доставка онлайн-пользователям):
gRPC direct delivery для минимальной латентности

Guaranteed delivery (гарантия доставки):
Kafka для сохранения сообщения и retry

Offline users:
Push notification + сохранение в БД
type HybridDeliveryService struct {
direct *DirectDeliveryService
kafka *KafkaDeliveryService
push *PushNotificationService
}

func (s *HybridDeliveryService) Deliver(ctx context.Context, msg *Message, recipientID string) error {
// 1. Пробуем прямую доставку через gRPC (быстро)
err := s.direct.DeliverMessage(ctx, msg, recipientID)

if err == nil {
return nil // Успешно доставлено
}

// 2. Прямая доставка не удалась — публикуем в Kafka
// Kafka гарантирует доставку даже при временном сбое
err = s.kafka.Publish(ctx, msg, recipientID)
if err != nil {
return fmt.Errorf("both direct and kafka delivery failed: %w", err)
}

// 3. Если пользователь точно оффлайн — отправляем push
if !s.direct.IsOnline(recipientID) {
s.push.Queue(ctx, recipientID, msg)
}

return nil
}

5. Почему не ZooKeeper для Discovery

Кандидат предложил ZooKeeper — это рабочий вариант, но не оптимальный:

  • ZooKeeper: сильная консистентность, но высокая латентность записи (~10–50мс), сложность эксплуатации, ограниченная пропускная способность
  • Redis Cluster: субмиллисекундная латентность, нативный TTL (автоочистка при disconnect), pub/sub для уведомлений, проще в эксплуатации
  • etcd: компромисс между ZK и Redis, используется в Kubernetes, но избыточен для этой задачи

Для connection registry лучше подходит Redis — он проще, быстрее и имеет встроенный TTL для автоматической очистки записей при аварийном отключении сервера.

6. Обработка мультидевайс

Пользователь может быть онлайн на телефоне и десктопе одновременно:

func (s *HybridDeliveryService) DeliverToAllDevices(ctx context.Context, msg *Message, recipientID string) error {
servers, err := s.discovery.FindServers(ctx, recipientID)
if err != nil {
return err
}

if len(servers) == 0 {
// Все устройства оффлайн
return s.handleOffline(ctx, msg, recipientID)
}

// Отправляем на все устройства параллельно
var wg sync.WaitGroup
errChan := make(chan error, len(servers))

for _, server := range servers {
wg.Add(1)
go func(srv ServerAddress) {
defer wg.Done()
if err := s.direct.DeliverToServer(ctx, msg, recipientID, srv); err != nil {
errChan <- err
}
}(server)
}

wg.Wait()
close(errChan)

// Если хотя бы одно устройство получило — считаем доставленным
// Для остальных — retry через Kafka
for err := range errChan {
log.Warn("delivery to device failed", err)
}

return nil
}

7. Итоговая рекомендация

КритерийKafkagRPC DirectГибрид
Латентность5–20мс1–5мс1–5мс
Гарантия доставки
Буферизация
СложностьСредняяНизкаяВысокая
Масштабируемость

Для мессенджера уровня 500M пользователей рекомендуется гибридный подход: gRPC для hot path с fallback на Kafka для гарантии доставки. Это даёт минимальную латентность при сохранении надёжности.

Вопрос 11. Как обрабатывать доставку сообщений, если пользователь оффлайн, и как при повторном подключении подгрузить все пропущенные сообщения? Как учитывать наличие нескольких устройств у одного пользователя?

Таймкод: 00:46:54

Ответ собеседника: правильный. Если пользователь оффлайн — сообщение сохраняется в хранилище и при необходимости отправляется как push-уведомление. При повторном подключении клиент регистрируется в Discovery-сервисе, запрашивает актуальный список чатов, и по каждому чату запрашивает все сообщения, начиная с последнего известного ему номера сообщения. Сервис сообщений отдаёт все новые сообщения плюс информацию о прочтенных (в том числе с других устройств). Для поддержки нескольких устройств Discovery хранит список всех нод, к которым подключён пользователь. Также предложена альтирнатива — детерминированное вычисление ноды по ID пользователя (без Discovery), но это создаёт проблемы при отказе серверов и неоптимальной маршрутизации.

Правильный ответ:

Кандидат дал полный и правильный ответ. Детализируем и дополним техническими нюансами.

1. Механизм оффлайн-доставки

Когда пользователь оффлайн, сообщение проходит через несколько этапов:

type OfflineDeliveryHandler struct {
messageStore *MessageStore
pushNotifier *PushNotificationService
syncService *SyncService
}

func (h *OfflineDeliveryHandler) HandleOfflineUser(ctx context.Context, msg *Message, recipientID string) error {
// 1. Сохраняем сообщение в персистентное хранилище (ScyllaDB)
if err := h.messageStore.Save(ctx, msg); err != nil {
return fmt.Errorf("save message: %w", err)
}

// 2. Обновляем счётчик непрочитанных для получателя
if err := h.syncService.IncrementUnread(ctx, recipientID, msg.ChatID); err != nil {
log.Error("increment unread counter", err)
}

// 3. Отправляем push-уведомление
if err := h.pushNotifier.Send(ctx, recipientID, PushPayload{
Title: msg.SenderName,
Body:

#### **Вопрос 12**. Какую базу данных выбрать для хранения сообщений (источник истины) и какие к ней требования?

**Таймкод:** <YouTubeSeekTo id="IDSOAPPZsdU" time="00:54:33"/>

**Ответ собеседника:** **неполный**. К БД предъявлены требования: шардирование (желательно автоматическое), поддержка большого объёма записи (несколько ТБ в день только текст), основные запросы — добавление нового сообщения и чтение последних сообщений чата (доступ к «хвосту»). Сложные индексы не нужны, поиск вынесен в отдельный поисковый движок. Кандидат предположил, что PostgreSQL «из коробки» не подойдёт, но конкретную альтернативу назвать не смог, честно признавшись, что не имеет опыта работы с такими нагрузками.

**Правильный ответ:**

Требования кандидат сформулировал верно. Теперь разберём конкретные варианты и обоснование выбора.

**1. Ключевые требования к хранилищу сообщений**

| Требование | Значение |
|-----------|----------|
| Запись | ~370K msg/s в пике, 25B msg/день |
| Чтение | Пагинация по истории чата, last N messages |
| Шардирование | Обязательно автоматическое |
| Доступность | 99.99% — сообщения нельзя терять |
| Consistency | Eventual consistency допустима |
| Модель данных | Простая: key-value с сортировкой по timestamp |
| TTL | Опционально (автоудаление старых) |

**2. Почему не PostgreSQL «из коробки»**

PostgreSQL — отличная СУБД, но для 25B INSERT/день потребуется:
- Шардирование через Citus или ручное — добавляет сложность
- Каждый шард — отдельный инстанс PostgreSQL с настройкой
- VACUUM и bloat при таком объёме записи — серьёзная проблема
- Partition pruning помогает, но не решает проблему горизонтального масштабирования

PostgreSQL подходит для:
- Метаданных пользователей и чатов (относительно небольшой объём)
- Транзакционных операций (авторизация, платежи)

Но не как primary store для сообщений при таком масштабе.

**3. Кандидаты на роль primary store**

**A. ScyllaDB (рекомендуемый выбор)**

ScyllaDB — drop-in замена Apache Cassandra, написанная на C++:

- **Пропускная способность:** до 1M ops/s на ноду (в 10× быстрее Cassandra)
- **Автоматическое шардирование:** встроенный consistent hashing
- **Латентность:** p99 &lt; 10мс для чтения, &lt; 5мс для записи
- **Нет GC pause:** C++ вместо Java — нет проблем с garbage collection
- **Tunable consistency:** ONE, QUORUM, ALL для каждого запроса

```cql
-- Схема для ScyllaDB
CREATE KEYSPACE messages WITH replication = {
'class': 'NetworkTopologyStrategy',
'dc1': 3, 'dc2': 3
};

CREATE TABLE messages.by_chat (
chat_id UUID,
bucket TEXT, -- временной бакет: '2025-01'
message_id TIMEUUID, -- содержит timestamp + уникальность
sender_id UUID,
msg_type TEXT, -- 'text', 'image', 'video', 'file'
text TEXT,
media_urls LIST<TEXT>,
reply_to UUID,
status TEXT, -- 'sent', 'delivered', 'read'
created_at TIMESTAMP,
PRIMARY KEY ((chat_id, bucket), message_id)
) WITH CLUSTERING ORDER BY (message_id DESC)
AND compaction = {
'class': 'TimeWindowCompactionStrategy',
'compaction_window_size': 7,
'compaction_window_unit': 'DAYS'
}
AND gc_grace_seconds = 864000; -- 10 дней для удаления tombstones

Ключевые решения:

  • Составной ключ партиции (chat_id, bucket) — все сообщения чата за месяц на одной ноде
  • TIMEUUID — натуральная сортировка по времени, уникальность без дополнительных индексов
  • TimeWindowCompactionStrategy — оптимальна для time-series данных, старые данные компактифицируются пачками

B. Apache Cassandra

Та же модель данных, что и ScyllaDB, но:

  • Java-based → GC pauses при больших объёмах
  • Медленнее ScyllaDB в 5–10 раз на той же железе
  • Зрелая экосистема, но ScyllaDB предпочтительнее для нового проекта

C. TiDB

Горизонтально-масштабируемый SQL:

  • MySQL-совместимый протокол
  • Автоматическое шардирование (Region-based)
  • Strong consistency через Raft
  • Подходит если нужны сложные запросы и JOIN

Минусы:

  • Выше латентность записи чем ScyllaDB (из-за Raft consensus)
  • Сложнее в эксплуатации (TiKV + PD + TiDB компоненты)
  • Оверхед на strong consistency, который не нужен для сообщений

D. CockroachDB

  • PostgreSQL-совместимый протокол
  • Strong consistency, serializable isolation
  • Автоматическое шардирование

Минусы:

  • Латентность записи выше из-за distributed consensus
  • Не оптимизирован для write-heavy workload
  • Лучше подходит для OLTP, чем для мессенджера

4. Сравнительная таблица

КритерийScyllaDBCassandraTiDBCockroachDB
Запись (peak)✅ 1M/s на ноду~100K/s на ноду~50K/s~30K/s
ШардированиеАвтоматическоеАвтоматическоеАвтоматическоеАвтоматическое
Латентность p99< 10мс< 50мс< 30мс< 50мс
ConsistencyTunableTunableStrongStrong
МодельWide-columnWide-columnSQLSQL
ЭкосистемаЗрелаяЗрелаяРастущаяРастущая
Операционные затратыСредниеВысокиеВысокиеВысокие

5. Рекомендуемая архитектура хранения

┌──────────────────┐
│ Elasticsearch │ ← Поиск (secondary index)
│ (read-only) │
└────────▲─────────┘
│ CDC / Kafka Connect

┌──────────┐ ┌────────────┴───────────┐
│ Client │───→│ ScyllaDB │ ← Primary Store
│ │ │ (source of truth) │
└──────────┘ └────────────┬───────────┘

┌────────▼─────────┐
│ Redis │ ← Кэш горячих сообщений
│ (last 50 msg) │
└──────────────────┘

6. Реализация на Go

type MessageStore struct {
session *gocql.Session
cache *redis.Client
}

func NewMessageStore(scyllaHosts []string, redisAddr string) (*MessageStore, error) {
cluster := gocql.NewCluster(scyllaHosts...)
cluster.Keyspace = "messages"
cluster.Consistency = gocql.One // запись с минимальной латентностью
cluster.NumConns = 4 // соединения на ноду
cluster.Timeout = 500 * time.Millisecond

session, err := cluster.CreateSession()
if err != nil {
return nil, fmt.Errorf("connect to scylla: %w", err)
}

rdb := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{redisAddr},
})

return &MessageStore{session: session, cache: rdb}, nil
}

func (s *MessageStore) SaveMessage(ctx context.Context, msg *Message) error {
bucket := msg.CreatedAt.Format("2006-01")

query := `INSERT INTO by_chat
(chat_id, bucket, message_id, sender_id, msg_type, text, media_urls, created_at, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`

err := s.session.Query(query,
msg.ChatID, bucket, gocql.TimeUUID(), msg.SenderID,
msg.Type, msg.Text, msg.MediaURLs, msg.CreatedAt, "sent",
).WithContext(ctx).Exec()

if err != nil {
return fmt.Errorf("save message: %w", err)
}

// Инвалидируем кэш
cacheKey := fmt.Sprintf("chat:%s:recent", msg.ChatID)
s.cache.Del(ctx, cacheKey)

return nil
}

func (s *MessageStore) GetRecentMessages(ctx context.Context, chatID string, limit int) ([]Message, error) {
// 1. Проверяем кэш
cacheKey := fmt.Sprintf("chat:%s:recent", chatID)
cached, err := s.cache.Get(ctx, cacheKey).Result()
if err == nil {
var messages []Message
if json.Unmarshal([]byte(cached), &messages) == nil {
return messages, nil
}
}

// 2. Читаем из ScyllaDB (последний месяц)
now := time.Now()
bucket := now.Format("2006-01")

query := `SELECT message_id, sender_id, msg_type, text, media_urls, created_at, status
FROM by_chat
WHERE chat_id = ? AND bucket = ?
LIMIT ?`

iter := s.session.Query(query, chatID, bucket, limit).
WithContext(ctx).Iter()

var messages []Message
var msg Message
for iter.Scan(&msg.ID, &msg.SenderID, &msg.Type, &msg.Text,
&msg.MediaURLs, &msg.CreatedAt, &msg.Status) {
messages = append(messages, msg)
}

if err := iter.Close(); err != nil {
return nil, fmt.Errorf("query messages: %w", err)
}

// 3. Кэшируем на 30 секунд
if data, err := json.Marshal(messages); err == nil {
s.cache.Set(ctx, cacheKey, data, 30*time.Second)
}

return messages, nil
}

func (s *MessageStore) GetMessagesBefore(ctx context.Context, chatID string, before time.Time, limit int) ([]Message, error) {
// Пагинация: сообщения до определённого времени
bucket := before.Format("2006-01")

query := `SELECT message_id, sender_id, msg_type, text, media_urls, created_at, status
FROM by_chat
WHERE chat_id = ? AND bucket = ? AND message_id < ?
LIMIT ?`

iter := s.session.Query(query, chatID, bucket, gocql.UUIDFromTime(before), limit).
WithContext(ctx).Iter()

// ... аналогично GetRecentMessages
}

7. Инфраструктурные требования

Для 25B сообщений/день:

  • ScyllaDB кластер: 20–30 нод

    • Каждая нода: 32 CPU, 256GB RAM, 2× NVMe SSD (по 3.2TB)
    • Репликация: RF=3, 2 датацентра
    • Итого raw storage: ~200TB (с учётом репликации и запаса на compaction)
  • Redis Cluster: 6–10 нод

    • Каждая: 8 CPU, 64GB RAM
    • Хранит только горячие данные (кэш последних сообщений)

8. Итог

ScyllaDB — оптимальный выбор для primary store сообщений:

  • Автоматическое шардирование без операционных затрат
  • Линейное масштабирование записи
  • Низкая латентность
  • Tunable consistency (write ONE, read ONE для минимальной латентности)
  • TimeWindowCompaction для эффективного хранения time-series данных

PostgreSQL остаётся для метаданных (пользователи, чаты), Elasticsearch для поиска, Redis для кэширования. Каждая СУБД решает свою задачу — это классический polyglot persistence подход.

Вопрос 13. Как сервис сообщений общается с WebSocket-нодами — синхронно через RPC или асинхронно через очередь? Как решить проблему доставки на несколько устройств, если одно из них недоступно?

Таймкод: 01:03:56

Ответ собеседника: неполный. Кандидат предложил синхронный RPC между сервисом сообщений и WebSocket-нодами, чтобы избежать лишней инфраструктуры (Kafka) и обеспечить немедленную доставку. Однако при обсуждении доставки на несколько устройства возникла проблема: если одно устройство недоступно, сообщение может быть потеряно. Предложено использовать гибридный подход — очередь (Kafka) для каждого WebSocket-нода, куда сервис сообщений пишет одно сообщение, а нода сама раздаёт его всем подключённым устройствам пользователя. Это снимает проблему недоступности одного устройства — при переподключении оно запросит пропущенные сообщения. Кандидат согласился, что проблема решаема и такая схема приемлема.

Правильный ответ:

Кандидат пришёл к правильному решению через итерации. Разберём полную картину с обоснованием.

1. Сравнение подходов: RPC vs Message Queue

Синхронный RPC (gRPC):

// Прямая доставка через gRPC
func (s *MessageService) DeliverViaRPC(ctx context.Context, msg *Message, recipientID string) error {
servers, err := s.registry.GetServers(ctx, recipientID)
if err != nil {
return err
}

for _, serverID := range servers {
conn := s.grpcConns[serverID]
client := pb.NewWebSocketDeliveryClient(conn)

// Синхронный вызов — блокируемся до ответа
resp, err := client.Deliver(ctx, &pb.DeliveryRequest{
RecipientId: recipientID,
Message: convertToProto(msg),
})

if err != nil {
// Проблема: что делать при ошибке?
// Retry? Тогда латентность растёт
// Не retry? Тогда сообщение потеряно
return fmt.Errorf("delivery to %s failed: %w", serverID, err)
}

if resp.Status != "delivered" {
// Пользователь не на этом сервере
// Но мы уже потратили время на вызов
}
}

return nil
}

Плюсы: минимальная латентность, простота понимания. Минусы: нет гарантии доставки, нет буферизации, каскадные сбои при недоступности сервера.

Асинхронная очередь (Kafka):

// Доставка через Kafka
func (s *MessageService) DeliverViaKafka(ctx context.Context, msg *Message, recipientID string) error {
servers, err := s.registry.GetServers(ctx, recipientID)
if err != nil {
return err
}

delivery := &DeliveryMessage{
RecipientID: recipientID,
Message: msg,
}

for _, serverID := range servers {
// Публикуем в топик, партиционированный по server_id
err := s.kafkaWriter.WriteMessages(ctx, kafka.Message{
Topic: "ws-delivery",
Key: []byte(serverID),
Value: mustMarshal(delivery),
})

if err != nil {
return fmt.Errorf("kafka write failed: %w", err)
}
}

return nil
}

// WebSocket-сервер потребляет из своей партиции
func (ws *WebSocketServer) consumeDeliveries(ctx context.Context) {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka:9092"},
Topic: "ws-delivery",
GroupID: fmt.Sprintf("ws-%s", ws.id),
})

for {
msg, err := reader.ReadMessage(ctx)
if err != nil {
log.Error("kafka read", err)
continue
}

var delivery DeliveryMessage
json.Unmarshal(msg.Value, &delivery)

// Доставляем всем устройствам пользователя на этом сервере
ws.deliverToAllDevices(delivery.RecipientID, delivery.Message)
}
}

Плюсы: гарантия доставки, буферизация, развязка компонентов. Минусы: дополнительная латентность (5–20мс), сложность инфраструктуры.

2. Проблема мультидевайс и недоступного устройства

Это ключевая проблема, которую кандидат верно идентифицировал.

Сценарий:

  • Пользователь подключён с телефона (WS-сервер 1) и десктопа (WS-сервер 2)
  • Десктоп ушёл в sleep mode или потерял соединение, но WebSocket ещё не закрылся (half-open connection)
  • Сообщение доставлено на оба сервера, но десктоп не подтвердил получение
  • Через 30 секунд десктоп переподключается — сообщение потеряно

Решение: Acknowledgment + Sync протокол

// Сообщение содержит порядковый номер для каждого получателя
type Message struct {
ID string `json:"id"`
ChatID string `json:"chat_id"`
SenderID string `json:"sender_id"`
Text string `json:"text"`
SeqNum int64 `json:"seq_num"` // глобальный sequence в чате
CreatedAt time.Time `json:"created_at"`
}

// Клиент подтверждает получение
type AckMessage struct {
MessageID string `json:"message_id"`
ChatID string `json:"chat_id"`
SeqNum int64 `json:"seq_num"`
DeviceID string `json:"device_id"`
}

// При переподключении клиент отправляет SyncRequest
type SyncRequest struct {
ChatID string `json:"chat_id"`
LastSeqNum int64 `json:"last_seq_num"` // последний полученный
LastMessageID string `json:"last_message_id"` // для надёжности
DeviceID string `json:"device_id"`
}

type SyncResponse struct {
Messages []Message `json:"messages"`
UnreadCount int `json:"unread_count"`
LastSeqNum int64 `json:"last_seq_num"`
}

Реализация Sync-сервиса:

type SyncService struct {
messageStore *MessageStore
ackTracker *AckTracker
}

func (s *SyncService) HandleSync(ctx context.Context, req *SyncRequest) (*SyncResponse, error) {
// 1. Находим все сообщения, которые клиент пропустил
messages, err := s.messageStore.GetMessagesAfterSeqNum(
ctx, req.ChatID, req.LastSeqNum, 100, // limit 100
)
if err != nil {
return nil, fmt.Errorf("get messages: %w", err)
}

// 2. Фильтруем сообщения, которые уже были доставлены на другие устройства
// (опционально — если хотим избежать дублирования)
filtered := make([]Message, 0, len(messages))
for _, msg := range messages {
// Проверяем, было ли сообщение уже прочитано на другом устройстве
read, err := s.ackTracker.IsReadByOtherDevice(ctx, msg.ID, req.DeviceID)
if err != nil {
log.Error("check read status", err)
}

// Всегда отправляем, даже если прочитано на другом устройстве
// Клиент сам разберётся с дедупликацией по message_id
filtered = append(filtered, msg)
}

// 3. Получаем актуальный unread count
unreadCount, err := s.ackTracker.GetUnreadCount(ctx, req.ChatID, req.DeviceID)
if err != nil {
log.Error("get unread count", err)
}

// 4. Получаем последний sequence number в чате
lastSeqNum, err := s.messageStore.GetLastSeqNum(ctx, req.ChatID)
if err != nil {
log.Error("get last seq num", err)
}

return &SyncResponse{
Messages: filtered,
UnreadCount: unreadCount,
LastSeqNum: lastSeqNum,
}, nil
}

// Обработка ACK от клиента
func (s *SyncService) HandleAck(ctx context.Context, ack *AckMessage, deviceID string) error {
// 1. Сохраняем подтверждение
if err := s.ackTracker.RecordAck(ctx, ack.MessageID, ack.ChatID, deviceID); err != nil {
return err
}

// 2. Проверяем, все ли устройства подтвердили
allAcked, err := s.ackTracker.IsFullyDelivered(ctx, ack.MessageID)
if err != nil {
return err
}

if allAcked {
// 3. Обновляем статус сообщения на "delivered"
if err := s.messageStore.UpdateStatus(ctx, ack.MessageID, "delivered"); err != nil {
return err
}

// 4. Уведомляем отправителя о доставке
s.notifySender(ctx, ack.MessageID, "delivered")
}

return nil
}

3. Рекомендуемая архитектура — гибрид

type HybridDeliveryService struct {
rpcClient *RPCDeliveryClient // для hot path
kafkaWriter *KafkaWriter // для guaranteed delivery
ackTracker *AckTracker
syncService *SyncService
}

func (s *HybridDeliveryService) Deliver(ctx context.Context, msg *Message, recipientID string) error {
servers, err := s.registry.GetServers(ctx, recipientID)
if err != nil || len(servers) == 0 {
// Пользователь оффлайн — только push + сохранение в БД
return s.handleOffline(ctx, msg, recipientID)
}

// Пробуем быструю доставку через gRPC
delivered := false
var failedServers []string

for _, serverID := range servers {
err := s.rpcClient.Deliver(ctx, msg, recipientID, serverID)
if err != nil {
failedServers = append(failedServers, serverID)
} else {
delivered = true
}
}

if !delivered {
// Все серверы недоступны — пишем в Kafka для retry
return s.kafkaWriter.Publish(ctx, msg, recipientID)
}

// Часть серверов недоступна — пишем в Kafka для них
for _, serverID := range failedServers {
s.kafkaWriter.PublishToServer(ctx, msg, recipientID, serverID)
}

return nil
}

func (s *HybridDeliveryService) handleOffline(ctx context.Context, msg *Message, recipientID string) error {
// 1. Сохраняем сообщение (уже сохранено ранее в MessageService)

// 2. Увеличиваем unread counter
s.ackTracker.IncrementUnread(ctx, recipientID, msg.ChatID)

// 3. Отправляем push
return s.pushNotifier.Send(ctx, recipientID, PushPayload{
Title: msg.SenderName,
Body: truncate(msg.Text, 100),
Data: map[string]string{
"chat_id": msg.ChatID,
"message_id": msg.ID,
"seq_num": fmt.Sprintf("%d", msg.SeqNum),
},
})
}

4. Протокол взаимодействия при переподключении

Client Server
│ │
│──── WebSocket Connect ───────→│
│ │
│──── SyncRequest ─────────────→│
│ {chat_id, last_seq_num, │
│ last_message_id, │
│ device_id} │
│ │
│←──── SyncResponse ────────────│
│ {messages[], unread, │
│ last_seq_num} │
│ │
│──── Ack (msg_id_1) ──────────→│
│──── Ack (msg_id_2) ──────────→│
│ │
│←──── Real-time messages ──────│
│ │

5. Итоговые рекомендации

АспектРекомендация
Доставка onlinegRPC direct для минимальной латентности
Гарантия доставкиKafka fallback при сбое
МультидевайсSequence numbers + Sync протокол
ПодтверждениеACK от каждого устройства
ОффлайнPush + сохранение + sync при переподключении
ПорядокSeqNum на уровне чата, сортировка на клиенте

Гибридный подход даёт лучшее из двух миров: минимальную латентность для онлайн-пользователей через gRPC и гарантию доставки через Kafka при сбоях. Sync протокол с sequence numbers решает проблему мультидевайс и гарантирует, что ни одно сообщение не будет потеряно при переподключении.

Вопрос 14. Чем отличается доставка сообщений в групповые чаты от приватных и как реализовать эффективную доставку на большое количество участников?

Таймкод: 01:08:56

Ответ собеседника: правильный. Отличие в том, что сообщение нужно доставить всем участникам группы (до 500 человек), которые могут быть подключены к разным WebSocket-нодам. При пике ~50 млн одновременно подключённых пользователей и 5000 серверов, каждый участник группы может оказаться на отдельном сервере. Отправлять 500 отдельных RPC-запросов неэффективно. Предложено использовать брокер сообщений (Kafka): сервис сообщений пишет одно сообщение с полным списком получателей в очередь, и каждая WebSocket-нода читает это сообщение, фильтрует своих подключённых пользователей и доставляет им. Это позволяет записать одно сообщение и раздать его всем заинтересованным нодам.

Правильный ответ:

Кандидат верно описал проблему и предложил правильное решение. Детализируем и дополним техническими нюансами.

1. Ключевые отличия групповых чатов от приватных

АспектПриватный чатГрупповой чат
Получатели2 человекаДо 500 человек
Fan-out×2До ×500
Маршрутизация2 сервера максимумДо 500 серверов
Задержка доставкиМинимальнаяЗависит от размера группы
Нагрузка на брокерНизкаяВысокая

При 25B сообщений/день и среднем fan-out ×3 (с учётом групп):

  • Пиковый delivery rate: ~1M msg/s
  • Из них групповые: ~400K msg/s
  • Средний размер группы: ~50 человек
  • Итого: ~20M delivery-ев/секунду в пике

2. Проблема наивного подхода

// ❌ Плохо: 500 RPC-запросов на одно сообщение
func (s *MessageService) DeliverToGroupNaive(msg *Message, groupID string) error {
members := s.chatService.GetMembers(groupID) // 500 участников

for _, memberID := range members {
servers := s.registry.GetServers(memberID)
for _, serverID := range servers {
// До 500 отдельных вызовов!
s.rpcClient.Deliver(msg, memberID, serverID)
}
}
return nil
}

Проблемы:

  • 500 сетевых вызовов на одно сообщение
  • Латентность определяется самым медленным вызовом
  • Один сбой → всё сообщение не доставлено
  • Нагрузка на сервис сообщений растёт линейно с размером группы

3. Оптимальное решение — Fan-out через Kafka

Архитектура:

┌─────────────────┐
│ Message Service │
│ (один вызов) │
└────────┬────────┘

┌────────▼────────┐
│ Kafka Topic │
│ "group-delivery"│
│ (partitioned │
│ by chat_id) │
└───┬───┬───┬─────┘
│ │ │
┌───────────▼┐ ┌▼──────────┐ ┌▼───────────┐
│ WS Server 1│ │WS Server 2│ │ WS Server N│
│ (фильтрует │ │(фильтрует │ │ (фильтрует │
│ своих) │ │ своих) │ │ своих) │
└────────────┘ └───────────┘ └────────────┘

Реализация:

type GroupDeliveryService struct {
kafkaWriter *kafka.Writer
chatService *ChatService
}

func (s *GroupDeliveryService) DeliverToGroup(ctx context.Context, msg *Message) error {
// 1. Получаем участников группы
members, err := s.chatService.GetMemberIDs(ctx, msg.ChatID)
if err != nil {
return fmt.Errorf("get members: %w", err)
}

// 2. Формируем delivery message со списком получателей
delivery := &GroupDeliveryMessage{
Message: msg,
RecipientIDs: members, // полный список ID
ChatID: msg.ChatID,
TotalCount: len(members),
}

// 3. Публикуем ОДНО сообщение в Kafka
// Партиционирование по chat_id для порядка
err = s.kafkaWriter.WriteMessages(ctx, kafka.Message{
Topic: "group-delivery",
Key: []byte(msg.ChatID),
Value: mustMarshal(delivery),
})

if err != nil {
return fmt.Errorf("kafka write: %w", err)
}

return nil
}

WebSocket-сервер потребляет и фильтрует:

type WebSocketServer struct {
id string
connections map[string]*ClientConn // user_id → connection
localUsers map[string]bool // множество локальных пользователей
}

func (ws *WebSocketServer) ConsumeGroupDeliveries(ctx context.Context) {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka:9092"},
Topic: "group-delivery",
GroupID: "ws-servers",
})

for {
kafkaMsg, err := reader.ReadMessage(ctx)
if err != nil {
log.Error("kafka read", err)
continue
}

var delivery GroupDeliveryMessage
json.Unmarshal(kafkaMsg.Value, &delivery)

// Фильтруем: оставляем только пользователей на этом сервере
var localRecipients []string
for _, userID := range delivery.RecipientIDs {
if ws.localUsers[userID] {
localRecipients = append(localRecipients, userID)
}
}

// Доставляем локальным пользователям
for _, userID := range localRecipients {
if conn, ok := ws.connections[userID]; ok {
conn.Send(delivery.Message)
}
}
}
}

4. Оптимизация для больших групп

При 500 участниках список ID занимает ~500 × 16 байт = 8KB. При 400K msg/s это 3.2GB/сек только списков. Оптимизации:

A. Битовая маска вместо списка ID

Если пользователи пронумерованы последовательно:

type OptimizedGroupDelivery struct {
Message *Message
ChatID string
MemberBitmap []uint64 // битовая маска: bit[i] = 1 если пользователь i в группе
}

// Проверка принадлежности: O(1)
func (d *OptimizedGroupDelivery) IsMember(userIndex int) bool {
word := userIndex / 64
bit := uint(userIndex % 64)
return d.MemberBitmap[word]&(1<<bit) != 0
}

B. Fan-out on write vs Fan-out on read

Для очень больших групп (1000+ участников) — два подхода:

// Fan-out on write: пишем отдельное сообщение для каждого получателя
// Плюсы: простота потребления, персонализация
// Минусы: 500 записей в Kafka на одно сообщение

// Fan-out on read: пишем одно сообщение, клиент сам забирает
// Плюсы: одна запись
// Минусы: клиент должен опрашивать, не подходит для real-time

Для мессенджера с группами до 500 человек — fan-out on write через Kafka оптимален.

5. Гарантия доставки в группах

type GroupAckTracker struct {
redis *redis.ClusterClient
}

func (t *GroupAckTracker) TrackDelivery(ctx context.Context, messageID string, chatID string, totalMembers int) error {
key := fmt.Sprintf("group_ack:%s", messageID)

// Инициализируем счётчик
pipe := t.redis.Pipeline()
pipe.Set(ctx, key+":total", totalMembers, 24*time.Hour)
pipe.Set(ctx, key+":acked", 0, 24*time.Hour)
_, err := pipe.Exec(ctx)

return err
}

func (t *GroupAckTracker) RecordAck(ctx context.Context, messageID string, userID string) error {
key := fmt.Sprintf("group_ack:%s", messageID)

// Инкрементируем счётчик подтверждений
count, err := t.redis.Incr(ctx, key+":acked").Result()
if err != nil {
return err
}

// Проверяем, все ли подтвердили
total, err := t.redis.Get(ctx, key+":total").Int64()
if err != nil {
return err
}

if count >= total {
// Все подтвердили — обновляем статус
t.onFullyDelivered(ctx, messageID)
}

return nil
}

6. Обработка участников, которые покинули группу

func (s *GroupDeliveryService) DeliverToGroup(ctx context.Context, msg *Message) error {
// Получаем АКТУАЛЬНЫЙ список участников
members, err := s.chatService.GetActiveMemberIDs(ctx, msg.ChatID)
if err != nil {
return err
}

// Фильтруем: исключаем заблокированных и покинувших
filtered := make([]string, 0, len(members))
for _, m := range members {
if !s.isBlocked(m, msg.SenderID) && !s.hasLeft(m, msg.ChatID) {
filtered = append(filtered, m)
}
}

// ... доставка
}

7. Итоговая архитектура групповой доставки

Sender → Message Service → ScyllaDB (save)


Kafka "group-delivery"
(partitioned by chat_id)

┌───────────────┼───────────────┐
▼ ▼ ▼
WS Server 1 WS Server 2 WS Server N
(фильтрует (фильтрует (фильтрует
своих) своих) своих)
│ │ │
▼ ▼ ▼
User A, B User C, D User E, F

Ключевые принципы:

  • Одна запись в Kafka на сообщение, а не 500 RPC-вызовов
  • Фильтрация на стороне WebSocket-сервера — каждый сервер проверяет своих пользователей
  • Партиционирование по chat_id — гарантирует порядок сообщений в чате
  • ACK tracking — для индикатора доставки в группах