Как пройти секцию system design на собеседовании по Go: проектируем YouTube | Эйч Навыки
Сегодня мы разберём реальное собеседование по системному дизайну, в котором интервьюер и кандидат совместно проектируют архитектуру видеохостинга уровня YouTube — от загрузки и транскодирования видео до доставки контента через CDN. В ходе диалога обсуждаются ключевые аспекты масштабируемых систем: асинхронная обработка через очереди, шардирование данных, выбор между SQL и NoSQL, отказоустойчивость и безопасность, а также оптимизация затрат на хранение и трафик. Это не просто теоретический разбор, а живой процесс принятия инженерных решений с обоснованием компромиссов и глубоким погружением в доменную область.
Вопрос 1. Расскажите немного о себе и своём опыте.
Таймкод: 00:07:15
Ответ собеседника: Правильный. Владимир, тимлид в команде Яндекс беспилотников, руководит инфраструктурной командой больших данных, где регулярно сталкивается с задачами System Design. Является ментором на Навыках и ведущим вебинара.
Правильный ответ:
Ответ собеседника является хорошим введением, но для позиции Golang-разработчика его стоит дополнить более техническими деталями. Вот пример того, как можно структурировать такой ответ:
Ключевые аспекты, которые стоит осветить:
1. Текущая роль и обязанности Указать конкретный стек технологий (Go, какие фреймворки, с какими базами данных работал), масштаб системы (RPS, объём данных, количество сервисов).
2. Опыт работы с Go Сколько лет пишет на Go, какие проекты — микросервисы, высоконагруженные системы, распределённые системы. Пример:
// Типичный паттерн, который используется в production-коде
type Server struct {
router *chi.Mux
db *sql.DB
logger *zap.Logger
}
func NewServer(cfg Config) (*Server, error) {
db, err := sql.Open("postgres", cfg.DSN)
if err != nil {
return nil, fmt.Errorf("failed to connect to db: %w", err)
}
return &Server{
router: chi.NewRouter(),
db: db,
logger: cfg.Logger,
}, nil
}
3. Опыт System Design Если есть опыт проектирования архитектуры — упомянуть конкретные решения: выбор между синхронной и асинхронной коммуникацией, паттерны Saga, CQRS, Event Sourcing.
4. Опыт менторства и лидерства Участие в code review, онбординг новых разработчиков, проведение технических интервью — всё это добавляет вес кандидату.
Рекомендация по структуре ответа: Кратко (1–2 минуты) рассказать о текущей роли, затем углубиться в технический стек и самые интересные/сложные задачи. Избегать излишне общих фраз без конкретики — интервьюер ожидает техническую глубину от опытного разработчика.
Вопрос 2. В чём главное отличие System Design интервью от обычных технических собеседований (алгоритмы, скрининги по технологиям)?
Таймкод: 00:09:29
Ответ собеседника: Правильный. Главное отличие — в System Design нет единственного правильного ответа. Одну и ту же систему, например YouTube, можно спроектировать разными способами, применяя разные паттерны, и все варианты могут быть корректными. Это требует перестройки мышления после привычных технических дисциплин, где обычно есть единственный верный ответ.
Правильный ответ:
Ответ собеседника точно отражает ключевое отличие. Дополним его более структурированно:
1. Отсутствие единственного правильного ответа
В алгоритмических задачах есть чёткий критерий — корректность и сложность O(n). В System Design интервьюер оценивает процесс мышления, а не конечный результат. Два кандидата могут предложить разные архитектуры для одной и той же задачи, и обе могут быть приняты.
2. Оценка коммуникации и аргументации
На алгоритмическом собеседовании кандидат пишет код молча. На System Design важно уметь:
- Задавать уточняющие вопросы (функциональные и нефункциональные требования)
- Аргументировать каждый выбор (почему Kafka, а не RabbitMQ; почему PostgreSQL, а не MongoDB)
- Обсуждать компромиссы (consistency vs availability, latency vs throughput)
3. Масштаб мышления
Алгоритмические задачи — это про один модуль или функцию. System Design — это про систему целиком: от клиента до базы данных, включая кэширование, балансировку, репликацию, мониторинг, деплой.
4. Требования к опыту
System Design проверяет практический опыт. Кандидат должен уметь:
- Оценивать нагрузку (QPS, объём хранилища, пропускная способность сети)
- Выбирать подходящие технологии под конкретные ограничения
- Проектировать с учётом эволюции системы (как система будет расти через год)
5. Структура оценки
Интервьюер оценивает по нескольким осям:
- Глубина знаний — понимание внутреннего устройства компонентов
- Широта знаний — знание различных технологий и паттернов
- Компромиссы — умение обосновать trade-offs
- Практичность — реалистичность предложенных решений
Пример компромисса, который ожидается на интервью:
При проектировании системы доставки сообщений кандидат должен обсудить:
- Гарантии доставки (at-most-once, at-least-once, exactly-once)
- Порядок сообщений (partition-level ordering vs global ordering)
- Задержка vs надёжность (acknowledgment сразу vs после записи на диск)
Именно способность вести такой диалог отличает сильного кандидата на System Design интервью.
Вопрос 3. На чём основывается System Design собеседование, что в нём важно и для каких грейдов оно обязательно?
Таймкод: 00:10:27
Ответ собеседника: Правильный. Собеседование опирается на опыт интервьюера, его предпочтения в технологиях и паттернах. Важен технический кругозор и насмотренность: понимание разных типов хранилищ, очередей, микросервисов, синхронного и асинхронного взаимодействия. Не обязательно глубоко погружаться в каждую технологию, но нужно знать области применения. System Design обязательно для middle+ и выше.
Правильный ответ:
Ответ собеседника в целом верен. Развернём его более детально:
1. Фундамент System Design интервью
System Design проверяет способность кандидата проектировать распределённые системы с учётом конкретных требований и ограничений. Основа — это понимание базовых строительных блоков и умение их комбинировать.
2. Что именно оценивается
А. Технический кругозор (technology breadth)
Кандидат должен ориентироваться в широком спектре технологий и понимать их назначение:
| Категория | Технологии | Ключевой критерий выбора |
|---|---|---|
| Хранища | PostgreSQL, MySQL, MongoDB, Cassandra, Redis, S3 | Модель данных, консистентность, паттерн доступа |
| Очереди | Kafka, RabbitMQ, SQS, NATS | Порядок сообщений, гарантии доставки, throughput |
| Кэширование | Redis, Memcached, CDN | Частота чтения/записи, инвалидация |
| Поиск | Elasticsearch, Meilisearch | Полнотекстовый поиск, фасеты, ранжирование |
Б. Понимание нефункциональных требований
- Scalability — горизонтальное и вертикальное масштабирование, шардирование, партиционирование
- Availability — SLA, failover, репликация, multi-region deployment
- Consistency — CAP-теорема, eventual consistency, strong consistency
- Latency — кэширование, CDN, географическое размещение
В. Процесс проектирования
Интервьюер смотрит на структуру мышления:
- Уточнение требований (функциональные и нефункциональные)
- Оценка масштаба (back-of-the-envelope calculations)
- High-level design (основные компоненты и связи)
- Deep dive в ключевые компоненты
- Обсуждение узких мест и компромиссов
3. Для каких грейдов обязательно
- Middle+ — базовое понимание архитектурных паттернов, умение спроектировать простую систему с обоснованием выбора технологий
- Senior — глубокое понимание распределённых систем, опыт проектирования высоконагруженных систем, знание паттернов (Circuit Breaker, Bulkhead, Saga, CQRS)
- Staff/Principal — проектирование систем уровня компании, стратегические решения, эволюция архитектуры, cost optimization
4. Практический пример оценки масштаба
Кандидат должен уметь быстро прикидывать:
Проектируем систему как Twitter:
- 300M MAU, 50% ежедневно = 150M DAU
- В среднем 2 твита на пользователя в день
- 150M * 2 = 300M твитов/день ≈ 3500 твитов/сек
- Пиковый х2-3x = ~10K твитов/сек
- Средний твит 300 байт → 300M * 300 bytes = 90GB/день текста
- Медиа (фото/видео): 10% твитов с медиа, 150M * 10% * 2MB = 30TB/день
5. Важный нюас
Даже если интервьюер имеет свои предпочтения (например, любит Kafka), кандидат должен уметь аргументировать выбор. Если он предлагает RabbitMQ вместо Kafka — нужно объяснить почему (нужна простая очередь задач, не нужен log-based подход, важна простота эксплуатации). Это демонстрирует зрелость инженера.
Вопрос 4. Какой лайфхак существует при подготовке к System Design собеседованию в конкретную компанию?
Таймкод: 00:12:42
Ответ собеседника: Правильный. Если идёте в компанию с ярко выраженной областью (СУБД, YouTube), заранее стоит представить высокоуровневые решения, которые там могут применяться. Люди, работающие непосредственно с продуктом, часто спрашивают что-то связанное с их спецификой, и предварительная подготовка поможет быть на одной волне.
Правильный ответ:
Ответ собеседника попадает в точку. Развернём стратегию подготовки к конкретной компании:
1. Изучите инженерный блог компании
Практически каждая крупная компания ведёт технический блог:
- Uber Engineering — про микросервисы, распределённые транзакции, геосервисы
- Netflix Tech Blog — про resilience, chaos engineering, CDN
- Meta Engineering — про AI-инфраструктуру, социальные графы
- Dropbox — про миграцию с AWS, хранение файлов
- Airbnb — про data infrastructure, поиск и рекомендации
Из этих статей можно понять, какие технологии использует компания, какие проблемы решает и какой стек предпочитает.
2. Изучите публичные доклады на YouTube/конференциях
Конференции QCon, StrangeLoop, Highload++ содержат десятки докладов от инженеров крупных компаний. Они раскрывают внутреннюю архитектуру и используемые паттерны.
3. Проанализируйте продукт компании как систему
Перед интервью мысленно разберите продукт компании на компоненты:
Пример — интервью в компанию-разработчика СУБД:
- Как работает storage engine (B-Tree vs LSM-Tree)?
- Как реализован WAL (Write-Ahead Log)?
- Как обеспечивается консистентность (MVCC)?
- Как работает распределённый консенсус (Raft/Paxos)?
- Как оптимизируются запросы (query planner)?
4. Используйте Glassdoor, LeetCode Discuss, Blind
На этих платформах кандидаты делятся вопросами, которые им задавали. Это даёт представление о формате и типичных темах для конкретной компании.
5. Подготовьте примеры из своего опыта, релевантные компании
Если идёте в компанию, работающую с потоковыми данными — подготовьте примеры про Kafka, Flink, обработку событий. Если идёте в e-commerce — примеры про каталог, поиск, корзину, рекомендации.
6. Спросите рекрутера о формате
Рекрутер часто может подсказать:
- Будет ли общий System Design или специфичный для команды?
- Какие темы чаще всего обсуждают?
- Какой фокус — код, архитектура или инфраструктура?
7. Практический пример подготовки к компании, работающей с видео
Компания — видеохостинг (аналог YouTube):
- Заранее продумать: хранение видео (object storage, tiered storage)
- Транскодирование (распределённая обработка, очереди)
- CDN и доставка контента
- Рекомендательная система (ML pipeline)
- Метаданные видео (реляционная БД + поисковый индекс)
- Система комментариев (eventual consistency, fan-out)
- Подсчёт просмотров (approximate counting, HyperLogLog)
Такая подготовка позволяет кандидату говорить на одном языке с интервьюером и демонстрировать не только знания, но и интерес к продукту компании.
Вопрос 5. Какой первый и самый важный шаг нужно сделать, получив задачу на проектировании (например, спроектировать YouTube)?
Таймкод: 00:14:00
Ответ собеседника: Правильный. Первый и важнейший шаг — выяснение обстоятельств и задавание уточняющих вопросов. Нельзя сразу бросаться рисовать квадратики и кубики. YouTube — огромная система с тысячами сервисов, нужно понять, что именно от вас хотят. Это фундамент, который проверяет умение конкретизировать требования и выделять нужные технические пути.
Правильный ответ:
Ответ собеседника полностью верен. Это действительно самый критический шаг, и его нельзя пропускать. Детализируем:
1. Зачем это важно
Задача «спроектируй YouTube» — это не задача. Это направление для разговора. Без уточнений кандидат может потратить 30 минут на проектирование системы рекомендаций, тогда как интервьюер хотел обсудить доставку видео через CDN. Или наоборот — кандидат углубится в инфраструктуру хранения, а интервьюер ждал обсуждения API и клиентских сценариев.
2. Какие вопросы нужно задавать
А. Функциональные требования (Features)
- Какие основные сценарии использования? (загрузка видео, просмотр, поиск, комментарии, рекомендации)
- Какие типы пользователей? (создатели контента, зрители, рекламодатели)
- Какие платформы? (web, mobile, smart TV)
- Нужна ли поддержка стриминга (live) или только VOD?
Б. Нефункциональные требования (Non-functional requirements)
- Какой ожидаемый масштаб? (MAU, DAU, объём загружаемого контента)
- Какие требования к доступности? (SLA 99.9%? 99.99%?)
- Какие требования к задержке? (latency для загрузки, начала воспроизведения)
- Какие требования к консистентности? (нужно ли мгновенное отражение лайков/просмотров?)
В. Ограничения (Constraints)
- Бюджет? (влияет на выбор между managed-решениями и self-hosted)
- Существующий стек технологий?
- Команда? (количество инженеров, которые будут поддерживать систему)
3. Пример диалога с интервьюером
Кандидат: «Прежде чем начать, я хотел бы уточнить несколько вещей.
Интересует ли нас весь YouTube или какой-то конкретный компонент?»
Интервьюер: «Давай сфокусируемся на загрузке и воспроизведении видео.»
Кандидат: «Отлично. Какой масштаб мы рассматриваем?
Будем ли мы проектировать для 10M пользователей или для 1B?»
Интервьюер: «Думай о масштабе реального YouTube.»
Кандидат: «Понял. Требуется ли поддержка адаптивного битрейта (HLS/DASH)
или достаточно простого потокового воспроизведения?»
Интервьюер: «Да, адаптивный битрейт важен.»
4. Чего нельзя делать
- Не начинайте рисовать диаграмму сразу — это сигнал о шаблонном мышлении
- Не делайте предположения молча — лучше спросить, чем угадать неверно
- Не пытайтесь охватить всё — лучше глубоко проработать 2–3 компонента, чем поверхностно все
5. Как это оценивается
Интервьюер на этом этапе проверяет:
- Способность к коммуникации
- Умение декомпозировать задачу
- Понимание того, что реальные задачи никогда не приходят в готовом виде
- Зрелость инженера — опытный разработчик всегда уточняет требования перед началом работы
Именно этот шаг отличает кандидата, который реальные задачи решал, от того, кто только теорию учил.
Вопрос 6. Какие уточняющие вопросы по функционалу и параметрам системы нужно задать, если дали задачу спроектировать YouTube?
Таймкод: 00:16:12
Ответ собеседника: Правильный. Нужно уточнить функционал (социальная часть, трансляции, модерация, форматы, максимальная длина, кто может загружать) и количественные параметры (DAU, RPS, среднее время просмотра, процент загружающих, частота загрузки). В примере получены параметры: 10M DAU, 30 минут просмотра в день, 1% загружают видео (2 в неделю), среднее видео 5 минут при 8 Мбит/с, без лайвов и модерации.
Правильный ответ:
Ответ собеседника полный и хорошо структурирован. Дополним его категоризацией и примерами расчётов:
1. Функциональные вопросы (Scope)
А. Основные фичи
- Загрузка и воспроизведение видео — базовый сценарий
- Поиск и рекомендации — нужна ли поисковая система?
- Комментарии, лайки, подписки — социальная составляющая
- Плейлисты, история просмотров — персонализация
- Монетизация, реклама — бизнес-логика
- Стриминг (live) — принципиально другая архитектура
Б. Контент
- Форматы: MP4, WebM, AVI? Или принимаем всё и транскодируем?
- Максимальная длина видео: 15 минут, 2 часа, без ограничений?
- Максимальный размер файла?
- Нужна ли модерация контента (автоматическая/ручная)?
- Нужна ли поддержка субтитров, автоматических субтитров?
В. Пользователи
- Кто может загружать — все или только верифицированные?
- Нужна ли система подписок (premium/free)?
- Есть ли разные роли (зритель, создатель, модератор)?
2. Количественные параметры (Scale)
А. Аудитория
- DAU (Daily Active Users) — сколько пользователей в день
- Соотношение читателей и писателей (обычно 1% создают контент, 99% потребляют)
- Географическое распределение (влияет на CDN и регионы размещения)
Б. Паттерны использования
- Среднее время просмотра на пользователя в день
- Среднее количество видео за сессию
- Пиковые часы и коэффициент пиковой нагрузки
В. Контент
- Средняя длительность видео
- Средний битрейт
- Количество загрузок в день/неделю
3. Расчёт нагрузки на основе полученных параметров
Дано:
- 10M DAU
- Средний пользователь смотрит 30 минут в день
- Среднее видео — 5 минут
- 1% пользователей загружают 2 видео в неделю
- Битрейт 8 Мбит/с
Расчёты:
Просмотры в день:
10M * (30 мин / 5 мин) = 60M просмотров/день
60M / 86400 ≈ 700 просмотров/сек (среднее)
Пиковый x3 ≈ 2100 RPS на воспроизведение
Загрузки в день:
10M * 1% * (2/7) ≈ 28,571 загрузок/день
≈ 0.33 загрузки/сек (среднее)
Объём хранилища в день:
28,571 * 5 мин * 60 сек * 8 Мбит/с / 8 = 85.7 TB/день
≈ 31 PB/год (без учёта транскодирования в разные качества)
Трафик на воспроизведение:
60M * 5 мин * 60 сек * 8 Мбит/с / 8 = 180 PB/день
4. Нефункциональные требования
- Доступность: SLA 99.95%? Допустим ли downtime?
- Задержка: время начала воспроизведения (time-to-first-frame) < 2 сек?
- Консистентность: нужно ли мгновенное отражение счётчика просмотров?
- Durability: гарантия сохранности видео (99.999999999% — eleven nines как у S3)?
Эти расчёты позволяют обосновать выбор технологий и архитектурных решений на следующих этапах проектирования.
Вопрос 7. Какие количественные параметры и ограничения нужно выяснить после уточнения функционала?
Таймкод: 00:20:27
Ответ собеседника: Правильный. Нужно выяснить DAU и RPS для технических сервисы. В примере дано 10M DAU. Эти базовые параметры необходимы для проектирования масштабируемой системы.
Правильный ответ:
Ответ собеседника верен, но его стоит расширить. DAU и RPS — это только верхушка айсберга. Полный набор количественных параметров включает:
1. Параметры аудитории
- DAU (Daily Active Users) — количество уникальных пользователей в день
- MAU (Monthly Active Users) — для понимания роста и сезонности
- Concurrent users — одновременно онлайн (обычно 10-20% от DAU)
- Географическое распределение — какие регионы, для выбора CDN и дата-центров
2. Параметры нагрузки
- RPS (Requests Per Second) — разбивка по типам запросов (чтение/запись)
- Read/Write ratio — соотношение чтений и записей (критично для выбора БД и кэширования)
- Peak-to-average ratio — обычно 2-5x от средней нагрузки
- Bandwidth — потребляемая пропускная способность (особенно важно для видео/медиа)
3. Параметры хранения
- Объём данных в день — сколько новых данных поступает
- Объём данных всего — общий размер хранилища
- Growth rate — скорость роста (линейная, экспоненциальная)
- Data retention policy — как долго хранить данные
4. Параметры контента (для медиа-платформ)
- Средний размер объекта — видео, изображение, документ
- Количество объектов в день — загрузки, создание записей
- Распределение размеров — важно для планирования хранилища
5. Пример полного расчёта для нашего YouTube
Дано: 10M DAU, 30 мин просмотра/день, среднее видео 5 мин,
битрейт 8 Мбит/с, 1% загружают 2 видео/неделю
Просмотры:
- 60M просмотров/день → ~700 RPS (avg) → ~2100 RPS (peak)
- Трафик: 60M * 5 мин * 60 сек * 8 Мбит/с = 144 PB/день
Загрузки:
- 28,571 загрузок/день → ~0.33 RPS (avg)
- Хранилище: 28,571 * 300 сек * 8 Мбит/с / 8 = 85.7 TB/день
- С транскодированием в 3 качества (1080p, 720p, 480p): ~257 TB/день
Метаданные:
- 60M просмотров * 1KB на собыс = 60 GB/день событий просмотра
- 28,571 видео * 5KB метаданных = ~143 MB/день метаданных видео
6. Ограничения, которые стоит выяснить
- Latency requirements — допустимая задержка для ключевых операций
- Availability SLA — 99.9%, 99.95%, 99.99%?
- Budget constraints — влияет на выбор между managed и self-hosted
- Compliance — GDPR, локальные законы о хранении данных
- Team size — сколько инженеров будут поддерживать систему
Все эти параметры напрямую влияют на архитектурные решения: выбор базы данных, стратегию кэширования, количество реплик, географическое размещение и бюджет инфраструктуры.
Вопрос 8. Какие ресурсные оценки можно сделать на основе собранных параметров и с какими проблемами столкнётся система?
Таймкод: 00:24:44
Ответ собеседника: Правильный. Прирост хранилища около 9 ТБ/сутки на новые видео (без репликации), в год — петабайты. Стоимость S3 — ~350K/день. Ключевые проблемы — трафик и объём хранилища, а не RPS. Для распределения нагрузки используется CDN.
Правильный ответ:
Ответ собеседника демонстрирует хорошее понимание масштабирования. Уточним расчёты и расширим анализ проблем:
1. Детализированные расчёты хранилища
Исходные данные:
- 10M DAU, 1% загружают видео = 100K активных загрузчиков/день
- 2 видео в неделю на загрузчика
- Среднее видео: 5 мин при 8 Мбит/с
Объём одного видео:
5 мин * 60 сек * 8 Мбит/с = 2400 Мбит = 300 МБ
Загрузки в день:
100K * (2/7) ≈ 28,571 видео/день
Сырой объём (одно качество):
28,571 * 300 МБ ≈ 8.6 ТБ/день
С транскодированием (1080p + 720p + 480p + 360p):
≈ 8.6 * 2.5 ≈ 21.5 ТБ/день (исходный + 3 рендера)
С репликацией (3 копии):
21.5 * 3 ≈ 64.5 ТБ/день
Годовой объём:
64.5 * 365 ≈ 23.5 ПБ/год
2. Расчёт трафика на отдачу (egress)
Просмотры в день:
10M DAU * (30 мин / 5 мин) = 60M просмотров/день
Трафик на отдачу:
60M * 300 МБ = 18,000 ТБ/день = 18 ПБ/день
Стоимость egress из AWS:
- Первые 10 ТБ/мес: $0.09/ГБ
- Следующие 40 ТБ/мес: $0.085/ГБ
- Свыше 50 ТБ/мес: $0.07/ГБ
18 ПБ/день = 540 ПБ/мес
Стоимость: ~$35-40M/месяц (при стандартных ценах AWS)
С CDN: значительно дешевле, ~$0.01-0.02/ГБ → ~$5-10M/мес
3. Ключевые проблемы системы
А. Трафик (Bandwidth) — главная проблема
Объём отдачи на порядки превышает объём загрузок. Решения:
- CDN — кэширование на edge-серверах, снижение origin egress
- P2P доставка — WebRTC-based (как делали в Peer5)
- Адаптивный битрейт — HLS/DASH для оптимизации под канал пользователя
Б. Хранилище (Storage)
Петабайты данных требуют:
- Tiered storage — горячие видео на SSD, тёплые на HDD, холодные на Glacier
- Erasure coding — вместо 3x репликации использовать Reed-Solomon (экономия ~50% места)
- Дедупликация — популярные видео хранить в одном экземпляре с множеством ссылок
В. Транскодирование (Compute)
CPU/GPU-ёмкая задача:
- 28,571 видео/день * 5 мин = ~143K минут видео/день
- Транскодирование обычно 0.5-2x реального времени
- Нужен распределённый кластер (Kubernetes + GPU nodes)
Г. Метаданные и RPS
RPS на чтение (просмотр):
- 60M просмотров / 86400 ≈ 700 RPS (avg)
- Peak x3 ≈ 2100 RPS
RPS на запись (загрузка):
- 28,571 / 86400 ≈ 0.33 RPS (avg)
- Peak x3 ≈ 1 RPS
Вывод: RPS — это НЕ проблема.
Проблема — bandwidth и storage.
4. Сводная таблица проблем и решений
| Проблема | Масштаб | Решение |
|---|---|---|
| Egress traffic | 18 ПБ/день | CDN, P2P, адаптивный битрейт |
| Storage | 64.5 ТБ/день | Tiered storage, erasure coding |
| Transcoding | ~2K GPU-часов/день | Kubernetes + GPU autoscaling |
| Metadata reads | ~2100 RPS (peak) | Redis cache + read replicas |
| Upload throughput | ~30 МБ/с (avg) | Multipart upload, resumable |
Именно такой анализ показывает интервьюеру, что кандидат понимает реальные инженерные вызовы, а не просто рисует квадратики на доске.
Вопрос 9. Как выглядит крупноблочная архитектура YouTube после этапа сбора требований и оценок, и какие компоненты в неё входят?
Таймкод: 00:32:49
Ответ собеседника: Правильный. Пользователь обращается к CDN для просмотра и к системе загрузки для заливки видео. За CDN — blob-хранилище (S3-совместимое). Для загрузки используется resumable upload с офсетом байтов. Метаинформация хранится в SQL-БД с транзакционными гарантиями. Система транскодирования преобразует видео в набор артефактов. Аутентификация вынесена как отдельный сервис.
Правильный ответ:
Ответ собеседника хорошо описывает ключевые компоненты. Дополним полной высокоуровневой архитектурой:
1. Общая схема компонентов
┌─────────────────────────┐
│ Load Balancer │
│ (L7 / API Gateway) │
└────────┬──┬──┬───────────┘
│ │ │
┌─────────────┘ │ └──────────────┐
▼ ▼ ▼
┌──────────────┐ ┌─────────────┐ ┌──────────────┐
│ Upload │ │ Metadata │ │ Streaming │
│ Service │ │ Service │ │ Service │
└──────┬───────┘ └──────┬──────┘ └──────┬───────┘
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌─────────────┐ ┌──────────────┐
│ Raw Video │ │ PostgreSQL │ │ CDN │
│ Storage │ │ (metadata) │ │ (Edge PoP) │
└──────┬───────┘ └─────────────┘ └──────┬───────┘
│ ▲
▼ │
┌──────────────┐ ┌──────┴───────┐
│ Transcoding │ │ Transcoded │
│ Pipeline │─────────────────────▶│ Video Store │
└──────────────┘ └──────────────┘
2. Описание каждого компонента
А. Upload Service
Принимает видео от пользователя, реализует resumable upload:
type UploadSession struct {
UploadID string `json:"upload_id"`
UserID string `json:"user_id"`
FileName string `json:"file_name"`
TotalSize int64 `json:"total_size"`
UploadedBytes int64 `json:"uploaded_bytes"`
Status string `json:"status"` // "in_progress", "completed", "failed"
CreatedAt time.Time
UpdatedAt time.Time
}
func (s *UploadService) InitiateUpload(ctx context.Context, req InitUploadRequest) (*UploadSession, error) {
session := &UploadSession{
UploadID: uuid.New().String(),
UserID: req.UserID,
FileName: req.FileName,
TotalSize: req.TotalSize,
Status: "in_progress",
CreatedAt: time.Now(),
}
// Сохраняем сессию в PostgreSQL для гарантии durability
if err := s.db.CreateSession(ctx, session); err != nil {
return nil, fmt.Errorf("create session: %w", err)
}
return session, nil
}
func (s *UploadService) UploadChunk(ctx context.Context, uploadID string, offset int64, data []byte) error {
// Загружаем чанк в временное хранилище (S3 multipart upload)
if err := s.rawStorage.PutChunk(ctx, uploadID, offset, data); err != nil {
return fmt.Errorf("put chunk: %w", err)
}
// Обновляем офсет в БД
return s.db.UpdateOffset(ctx, uploadID, offset+int64(len(data)))
}
Б. Transcoding Pipeline
Распределённая система обработки видео:
Raw Video → Queue (Kafka/SQS) → Transcoder Workers → Transcoded Artifacts → S3
→ Update Metadata DB
type TranscodeJob struct {
UploadID string `json:"upload_id"`
SourceURL string `json:"source_url"`
Targets []Target `json:"targets"`
}
type Target struct {
Resolution string `json:"resolution"` // "1080p", "720p", "480p"
Bitrate int `json:"bitrate"` // kbps
Codec string `json:"codec"` // "h264", "h265", "vp9"
}
func (w *TranscoderWorker) ProcessJob(ctx context.Context, job TranscodeJob) error {
// Скачиваем исходное видео
sourcePath, err := w.downloadSource(ctx, job.SourceURL)
if err != nil {
return fmt.Errorf("download source: %w", err)
}
var wg sync.WaitGroup
errChan := make(chan error, len(job.Targets))
for _, target := range job.Targets {
wg.Add(1)
go func(t Target) {
defer wg.Done()
outputPath := fmt.Sprintf("%s_%s.mp4", job.UploadID, t.Resolution)
// FFmpeg транскодирование
if err := w.ffmpeg.Transcode(sourcePath, outputPath, t); err != nil {
errChan <- fmt.Errorf("transcode %s: %w", t.Resolution, err)
return
}
// Загружаем результат в S3
if err := w.outputStorage.Upload(ctx, outputPath); err != nil {
errChan <- fmt.Errorf("upload %s: %w", t.Resolution, err)
}
}(target)
}
wg.Wait()
close(errChan)
// Проверяем ошибки
for err := range errChan {
if err != nil {
return err
}
}
// Обновляем статус в метаданных
return w.metadataDB.MarkTranscoded(ctx, job.UploadID)
}
В. Metadata Service
Хранит информацию о видео, пользователях, сессиях загрузки:
CREATE TABLE videos (
video_id UUID PRIMARY KEY,
upload_id UUID NOT NULL,
user_id UUID NOT NULL,
title VARCHAR(500),
description TEXT,
status VARCHAR(20) NOT NULL DEFAULT 'uploading',
-- uploading, processing, ready, failed
duration_seconds INT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NULL
);
CREATE TABLE video_artifacts (
artifact_id UUID PRIMARY KEY,
video_id UUID REFERENCES videos(video_id),
resolution VARCHAR(10) NOT NULL, -- '1080p', '720p', '480p'
codec VARCHAR(10) NOT NULL, -- 'h264', 'vp9'
bitrate_kbps INT NOT NULL,
storage_path VARCHAR(500) NOT NULL,
size_bytes BIGINT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_videos_user_id ON videos(user_id);
CREATE INDEX idx_videos_status ON videos(status);
CREATE INDEX idx_artifacts_video_id ON video_artifacts(video_id);
Г. Streaming Service
Обрабатывает запросы на воспроизведение и направляет клиента к CDN:
func (s *StreamingService) GetPlaybackURL(ctx context.Context, videoID string, userID string) (*PlaybackResponse, error) {
// Проверяем наличие видео
video, err := s.metadataDB.GetVideo(ctx, videoID)
if err != nil {
return nil, fmt.Errorf("get video: %w", err)
}
if video.Status != "ready" {
return nil, fmt.Errorf("video not ready: %s", video.Status)
}
// Получаем список доступных артефактов
artifacts, err := s.metadataDB.GetArtifacts(ctx, videoID)
if err != nil {
return nil, fmt.Errorf("get artifacts: %w", err)
}
// Генерируем signed URL для CDN с TTL
var sources []PlaybackSource
for _, a := range artifacts {
signedURL, err := cdn.GenerateSignedURL(a.StoragePath, 1*time.Hour)
if err != nil {
continue
}
sources = append(sources, PlaybackSource{
Resolution: a.Resolution,
URL: signedURL,
Bitrate: a.BitrateKbps,
})
}
return &PlaybackResponse{
VideoID: videoID,
Sources: sources,
HLSURL: s.cdn.GetHLSManifestURL(videoID),
}, nil
}
Д. CDN (Content Delivery Network)
- Кэширует популярный контент на edge-серверах
- Снижает latency для пользователей
- Снижает нагрузку на origin-хранилище
- Поддерживает HLS/DASH для адаптивного битрейта
3. Поток данных (Data Flow)
Загрузка:
User → Upload Service → Raw Storage (S3)
→ Metadata DB (PostgreSQL)
→ Transcoding Queue (Kafka)
→ Transcoder Workers → Transcoded Storage (S3)
→ Update Metadata DB
→ CDN Invalidation/Prefetch
Просмотр:
User → Streaming Service → Metadata DB (проверка статуса)
→ CDN Signed URL
User → CDN Edge → (cache miss) → Transcoded Storage (S3)
Такая архитектура разделяет ответственность между сервисами и позволяет масштабировать каждый компонент независимо.
Вопрос 10. Как устроен процесс загрузки и транскодирования видео внутри системы — какие компоненты задействованы и как они взаимодействуют?
Таймкод: 00:43:12
Ответ собеседника: Правильный. После загрузки API отправляет сообщение в очередь (Kafka) с метаинформацией о задаче. Транскодер забирает задачу, читает видео из S3 (не из очереди), обрабатывает и записывает результаты в основное S3-хранилище. Используется асинхронное взаимодействие, так как транскодирование — длительный процесс. После обработки транскодер сообщает Video Manager (сервис с SQL-БД), который хранит метаинформацию и статусы. Клиент через API запрашивает ссылки. Для оптимизации чтения — кэш. Транскодер — performance-critical компонент, должен быть масштабируемым.
Правильный ответ:
Ответ собеседника подробный и корректный. Оформим его в виде полной диаграммы взаимодействия с кодом:
1. Полный поток загрузки и транскодирования
┌────────┐ ┌──────────┐ ┌─────────────┐ ┌──────────────┐
│ Client │────▶│ Upload │────▶│ Raw Video │────▶│ Kafka │
│ │ │ Service │ │ S3 Bucket │ │ (transcode │
│ │◀────│ │◀────│ │ │ jobs) │
└────────┘ └──────────┘ └─────────────┘ └──────┬───────┘
│
▼
┌──────────────┐
│ Transcoder │
│ Workers │
│ (K8s pods) │
└──────┬───────┘
│
▼
┌────────┐ ┌──────────┐ ┌─────────────┐ ┌──────────────┐
│ Client │────▶│ Streaming│────▶│ Video │────▶│ Transcoded │
│ │◀────│ Service │ │ Manager │ │ Video S3 │
│ │ │ │◀────│ (metadata) │◀────│ │
└────────┘ └──────────┘ └─────────────┘ └──────────────┘
│
▼
┌──────────────┐
│ CDN │
└──────────────┘
2. Детализация каждого шага
Шаг 1: Инициализация загрузки
// Client → Upload Service
type InitUploadRequest struct {
FileName string `json:"file_name"`
FileSize int64 `json:"file_size"`
ContentType string `json:"content_type"`
UserID string `json:"user_id"`
}
type InitUploadResponse struct {
UploadID string `json:"upload_id"`
ChunkSize int64 `json:"chunk_size"` // обычно 5-10 MB
MaxChunks int `json:"max_chunks"`
}
Шаг 2: Загрузка чанков (resumable upload)
// Multipart upload с поддержкой докачивания
func (s *UploadService) UploadChunk(ctx context.Context, req ChunkUploadRequest) error {
// Валидация: не превышен ли общий размер
session, err := s.db.GetSession(ctx, req.UploadID)
if err != nil {
return fmt.Errorf("session not found: %w", err)
}
if session.UploadedBytes+int64(len(req.Data)) > session.TotalSize {
return fmt.Errorf("exceeds total file size")
}
// Загрузка чанка в S3 multipart upload
partNumber := int(req.Offset / s.chunkSize) + 1
if err := s.rawStorage.UploadPart(ctx, req.UploadID, partNumber, req.Data); err != nil {
return fmt.Errorf("upload part: %w", err)
}
// Атомарное обновление офсета в БД
return s.db.UpdateUploadedBytes(ctx, req.UploadID, session.UploadedBytes+int64(len(req.Data)))
}
Шаг 3: Завершение загрузки и постановка в очередь
func (s *UploadService) CompleteUpload(ctx context.Context, uploadID string) error {
session, err := s.db.GetSession(ctx, uploadID)
if err != nil {
return err
}
// Завершаем multipart upload в S3
if err := s.rawStorage.CompleteMultipartUpload(ctx, uploadID); err != nil {
return fmt.Errorf("complete multipart: %w", err)
}
// Обновляем статус в БД
if err := s.db.UpdateStatus(ctx, uploadID, "uploaded"); err != nil {
return fmt.Errorf("update status: %w", err)
}
// Отправляем задачу на транскодирование в Kafka
job := TranscodeJob{
UploadID: uploadID,
SourceURL: s.rawStorage.GetURL(ctx, uploadID),
Targets: []Target{
{Resolution: "1080p", Bitrate: 8000, Codec: "h264"},
{Resolution: "720p", Bitrate: 4000, Codec: "h264"},
{Resolution: "480p", Bitrate: 2000, Codec: "h264"},
{Resolution: "360p", Bitrate: 1000, Codec: "h264"},
},
}
return s.producer.Publish(ctx, "transcode-jobs", job)
}
Шаг 4: Транскодирование
func (w *TranscoderWorker) HandleJob(ctx context.Context, job TranscodeJob) error {
// Обновляем статус видео
if err := w.videoManager.UpdateStatus(ctx, job.UploadID, "processing"); err != nil {
return fmt.Errorf("update status: %w", err)
}
// Скачиваем исходное видео
sourcePath, err := w.download(ctx, job.SourceURL)
if err != nil {
w.videoManager.UpdateStatus(ctx, job.UploadID, "failed")
return fmt.Errorf("download: %w", err)
}
defer os.Remove(sourcePath)
// Параллельное транскодирование в разные разрешения
var wg sync.WaitGroup
results := make([]TranscodeResult, len(job.Targets))
errChan := make(chan error, len(job.Targets))
for i, target := range job.Targets {
wg.Add(1)
go func(idx int, t Target) {
defer wg.Done()
outputPath := fmt.Sprintf("/tmp/%s_%s.mp4", job.UploadID, t.Resolution)
// FFmpeg транскодирование
if err := w.ffmpeg.Transcode(FFmpegInput{
Source: sourcePath,
Output: outputPath,
Resolution: t.Resolution,
Bitrate: t.Bitrate,
Codec: t.Codec,
}); err != nil {
errChan <- fmt.Errorf("transcode %s: %w", t.Resolution, err)
return
}
// Загрузка результата в S3
storagePath := fmt.Sprintf("videos/%s/%s.mp4", job.UploadID, t.Resolution)
size, err := w.transcodedStorage.Upload(ctx, outputPath, storagePath)
if err != nil {
errChan <- fmt.Errorf("upload %s: %w", t.Resolution, err)
return
}
results[idx] = TranscodeResult{
Resolution: t.Resolution,
StoragePath: storagePath,
SizeBytes: size,
Bitrate: t.Bitrate,
Codec: t.Codec,
}
os.Remove(outputPath)
}(i, target)
}
wg.Wait()
close(errChan)
// Проверяем ошибки
for err := range errChan {
if err != nil {
w.videoManager.UpdateStatus(ctx, job.UploadID, "failed")
return err
}
}
// Сохраняем информацию об артефактах
if err := w.videoManager.SaveArtifacts(ctx, job.UploadID, results); err != nil {
return fmt.Errorf("save artifacts: %w", err)
}
// Обновляем статус на "ready"
return w.videoManager.UpdateStatus(ctx, job.UploadID, "ready")
}
Шаг 5: Выдача ссылки на воспроизведение
func (s *StreamingService) GetVideo(ctx context.Context, videoID string) (*VideoResponse, error) {
// Сначала проверяем кэш (Redis)
cached, err := s.cache.Get(ctx, "video:"+videoID)
if err == nil {
return cached.(*VideoResponse), nil
}
// Если нет в кэше — идём в Video Manager
video, err := s.videoManager.GetVideo(ctx, videoID)
if err != nil {
return nil, fmt.Errorf("get video: %w", err)
}
if video.Status != "ready" {
return nil, fmt.Errorf("video is not ready, current status: %s", video.Status)
}
// Генерируем signed URLs для каждого качества
sources := make([]Source, 0, len(video.Artifacts))
for _, a := range video.Artifacts {
signedURL, _ := s.cdn.GenerateSignedURL(a.StoragePath, 2*time.Hour)
sources = append(sources, Source{
Resolution: a.Resolution,
URL: signedURL,
Bitrate: a.Bitrate,
})
}
resp := &VideoResponse{
VideoID: videoID,
Title: video.Title,
Sources: sources,
HLSURL: s.cdn.GetHLSManifestURL(videoID),
}
// Кэшируем на 5 минут
s.cache.Set(ctx, "video:"+videoID, resp, 5*time.Minute)
return resp, nil
}
3. Схема состояний видео (State Machine)
uploading → uploaded → processing → ready
│ │ │
└───────────┴───────────┴──────→ failed
CREATE TABLE video_status_log (
id BIGSERIAL PRIMARY KEY,
video_id UUID NOT NULL,
old_status VARCHAR(20),
new_status VARCHAR(20) NOT NULL,
changed_at TIMESTAMPTZ DEFAULT NOW(),
changed_by VARCHAR(100) -- какой сервис изменил статус
);
CREATE INDEX idx_status_log_video_id ON video_status_log(video_id);
4. Ключевые характеристики транскодирования
| Параметр | Значение |
|---|---|
| Среднее время транскодирования 5-мин видео | 2-10 минут (зависит от количества таргетов) |
| Параллелизм | Каждый worker обрабатывает 1 видео |
| Масштабирование | Kubernetes HPA по длине очереди Kafka |
| GPU vs CPU | GPU (NVENC) для массового транскодирования, CPU для премиум-качества |
5. Масштабирование транскодеров
# Kubernetes HPA для транскодеров
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: transcoder-worker
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: transcoder-worker
minReplicas: 5
maxReplicas: 500
metrics:
- type: External
external:
metric:
name: kafka_consumer_lag
selector:
matchLabels:
topic: transcode-jobs
target:
type: Value
value: "100" # масштабируемся если lag > 100 сообщений
Такая архитектура обеспечивает полный асинхронный пайплайн от загрузки до готового к воспроизведению видео, где каждый компонент масштабируется независимо.
Вопрос 11. Как детально устроен транскодер — из каких компонентов он состоит и как обеспечивается параллельная обработка видео?
Таймкод: 00:54:39
Ответ собеседника: Правильный. Транскодер построен по асинхронной модели с очередями. Сначала валидатор проверяет видео на битость и вредоносность. Затем Task Splitter разделяет видео на независимые кусочки (GOP) и генерирует задачи: перекодирование, нарезка аудио, генерация превью, субтитры. Каждая задача обрабатывается параллельно. Сплиттер пишет задачи в key-value БД (Redis), воркеры забирают через отдельные очереди. Для промежуточных данных — TMP storage. Комбайнер собирает кусочки воедино, используя key-value базу для отслеживания состояния и демон-сервис для мониторинга завершения задач.
Правильный ответ:
Ответ собеседника демонстрирует глубокое понимание архитектуры транскодера. Оформим это в виде полной детализированной схемы:
1. Полная архитектура транскодера
┌─────────────────────────────────────────────┐
│ TRANSCODER PIPELINE │
└─────────────────────────────────────────────┘
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────────────┐
│ Kafka │───▶│Validator │───▶│ Task │───▶│ Redis (State) │
│ Consumer │ │ │ │ Splitter │ │ job:{id}:chunks │
└──────────┘ └──────────┘ └──────────┘ │ job:{id}:status │
│ └──────────┬───────────┘
▼ │
┌──────────────┐ │
│ Task Queues │ │
│ (per type) │ │
└──────┬───────┘ │
│ │
┌──────────────┼──────────────┐ │
▼ ▼ ▼ │
┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ Video │ │ Audio │ │ Thumbnail │ │
│ Transcode │ │ Extract │ │ Generator │ │
│ Workers │ │ Workers │ │ Workers │ │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │
▼ ▼ ▼ │
┌─────────────────────────────────────────┐ │
│ TMP Storage (local/NFS) │ │
└─────────────────┬───────────────────────┘ │
│ │
▼ ▼
┌──────────┐ ┌──────────────┐
│ Combiner │◀─────────────│ Monitor │
│ │ │ Daemon │
└────┬─────┘ └──────────────┘
│
▼
┌──────────────┐
│ Final S3 │
│ + Metadata │
└──────────────┘
2. Компонент: Валидатор
type VideoValidator struct {
maxFileSize int64
allowedCodecs []string
clamav *ClamAVClient
}
type ValidationResult struct {
IsValid bool
Codec string
Duration time.Duration
Width int
Height int
Bitrate int
Errors []string
}
func (v *VideoValidator) Validate(ctx context.Context, sourcePath string) (*ValidationResult, error) {
result := &ValidationResult{}
// Проверка размера файла
info, err := os.Stat(sourcePath)
if err != nil {
return nil, err
}
if info.Size() > v.maxFileSize {
result.IsValid = false
result.Errors = append(result.Errors, "file too large")
return result, nil
}
// Проверка что файл не пустой
if info.Size() == 0 {
result.IsValid = false
result.Errors = append(result.Errors, "empty file")
return result, nil
}
// Извлечение метаданных через ffprobe
probe, err := v.ffprobe.Probe(ctx, sourcePath)
if err != nil {
result.IsValid = false
result.Errors = append(result.Errors, "corrupted video file")
return result, nil
}
result.Codec = probe.Codec
result.Duration = probe.Duration
result.Width = probe.Width
result.Height = probe.Height
result.Bitrate = probe.Bitrate
// Проверка поддерживаемого кодека
codecSupported := false
for _, c := range v.allowedCodecs {
if c == probe.Codec {
codecSupported = true
break
}
}
if !codecSupported {
result.IsValid = false
result.Errors = append(result.Errors, fmt.Sprintf("unsupported codec: %s", probe.Codec))
return result, nil
}
// Антивирусная проверка (на случай вредоносных вложений)
scanResult, err := v.clamav.Scan(ctx, sourcePath)
if err != nil || scanResult.Infected {
result.IsValid = false
result.Errors = append(result.Errors, "security check failed")
return result, nil
}
result.IsValid = true
return result, nil
}
3. Компонент: Task Splitter
type TaskSplitter struct {
redis *redis.Client
tmpStore Storage
}
type SplitJob struct {
JobID string `json:"job_id"`
SourcePath string `json:"source_path"`
Validation ValidationResult `json:"validation"`
Tasks []SubTask `json:"tasks"`
TotalChunks int `json:"total_chunks"`
Status string `json:"status"`
}
type SubTask struct {
TaskID string `json:"task_id"`
Type string `json:"type"` // "video_transcode", "audio_extract", "thumbnail"
InputPath string `json:"input_path"`
OutputPath string `json:"output_path"`
Params map[string]interface{} `json:"params"`
Status string `json:"status"` // "pending", "processing", "done", "failed"
}
func (s *TaskSplitter) Split(ctx context.Context, job TranscodeJob, validation ValidationResult) (*SplitJob, error) {
splitJob := &SplitJob{
JobID: job.UploadID,
SourcePath: job.SourcePath,
Validation: validation,
Status: "splitting",
}
// Нарезаем видео на чанки по GOP (Group of Pictures)
// Типичный чанк — 2-10 секунд
chunkDuration := 5 * time.Second
numChunks := int(validation.Duration / chunkDuration)
// Создаём задачи на транскодирование каждого чанка для каждого разрешения
for i := 0; i < numChunks; i++ {
chunkPath := fmt.Sprintf("chunks/%s/chunk_%04d.ts", job.UploadID, i)
for _, target := range job.Targets {
task := SubTask{
TaskID: fmt.Sprintf("%s_chunk_%d_%s", job.UploadID, i, target.Resolution),
Type: "video_transcode",
InputPath: chunkPath,
OutputPath: fmt.Sprintf("output/%s/%s/chunk_%04d.m4s", job.UploadID, target.Resolution, i),
Params: map[string]interface{}{
"resolution": target.Resolution,
"bitrate": target.Bitrate,
"codec": target.Codec,
"chunk_index": i,
},
Status: "pending",
}
splitJob.Tasks = append(splitJob.Tasks, task)
}
}
// Отдельная задача на извлечение аудио
splitJob.Tasks = append(splitJob.Tasks, SubTask{
TaskID: fmt.Sprintf("%s_audio", job.UploadID),
Type: "audio_extract",
InputPath: job.SourcePath,
OutputPath: fmt.Sprintf("output/%s/audio.m4a", job.UploadID),
Params: map[string]interface{}{"codec": "aac", "bitrate": 128},
Status: "pending",
})
// Задачи на генерацию превью (каждые 10 секунд)
for t := 0; t < int(validation.Duration.Seconds()); t += 10 {
splitJob.Tasks = append(splitJob.Tasks, SubTask{
TaskID: fmt.Sprintf("%s_thumb_%d", job.UploadID, t),
Type: "thumbnail",
InputPath: job.SourcePath,
OutputPath: fmt.Sprintf("output/%s/thumb_%d.jpg", job.UploadID, t),
Params: map[string]interface{}{"timestamp": t, "width": 320, "height": 180},
Status: "pending",
})
}
splitJob.TotalChunks = len(splitJob.Tasks)
splitJob.Status = "split"
// Сохраняем состояние в Redis
if err := s.saveState(ctx, splitJob); err != nil {
return nil, fmt.Errorf("save state: %w", err)
}
// Публикуем задачи в соответствующие очереди
for _, task := range splitJob.Tasks {
queueName := s.getQueueName(task.Type)
if err := s.publishTask(ctx, queueName, task); err != nil {
return nil, fmt.Errorf("publish task %s: %w", task.TaskID, err)
}
}
return splitJob, nil
}
func (s *TaskSplitter) getQueueName(taskType string) string {
switch taskType {
case "video_transcode":
return "transcode-video"
case "audio_extract":
return "transcode-audio"
case "thumbnail":
return "transcode-thumbnail"
default:
return "transcode-default"
}
}
4. Компонент: Worker (обработчик задач)
type TranscodeWorker struct {
workerID string
taskType string
ffmpeg *FFmpeg
tmpStore Storage
redis *redis.Client
}
func (w *TranscodeWorker) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Блокирующее чтение из очереди
task, err := w.fetchTask(ctx, w.taskType)
if err != nil {
log.Error("fetch task failed", "error", err)
time.Sleep(time.Second)
continue
}
if task == nil {
time.Sleep(100 * time.Millisecond)
continue
}
// Обновляем статус задачи
w.updateTaskStatus(ctx, task.TaskID, "processing")
// Обработка в зависимости от типа
var processErr error
switch task.Type {
case "video_transcode":
processErr = w.transcodeChunk(ctx, task)
case "audio_extract":
processErr = w.extractAudio(ctx, task)
case "thumbnail":
processErr = w.generateThumbnail(ctx, task)
}
if processErr != nil {
w.updateTaskStatus(ctx, task.TaskID, "failed")
w.redis.Publish(ctx, "task-failed", task.TaskID)
continue
}
// Помечаем задачу как выполненную
w.updateTaskStatus(ctx, task.TaskID, "done")
// Уведомляем монитор о завершении
w.redis.Publish(ctx, "task-completed", task.TaskID)
// Увеличиваем счётчик выполненных задач для job
w.redis.HIncrBy(ctx, "job:"+task.JobID+":completed", "count", 1)
}
}
func (w *TranscodeWorker) transcodeChunk(ctx context.Context, task SubTask) error {
params := task.Params
return w.ffmpeg.Transcode(FFmpegInput{
Source: w.tmpStore.GetLocalPath(task.InputPath),
Output: w.tmpStore.GetLocalPath(task.OutputPath),
Resolution: params["resolution"].(string),
Bitrate: int(params["bitrate"].(float64)),
Codec: params["codec"].(string),
})
}
5. Компонент: Monitor Daemon
type MonitorDaemon struct {
redis *redis.Client
combiner *Combiner
videoMgr *VideoManager
}
func (m *MonitorDaemon) Run(ctx context.Context) error {
// Подписываемся на события завершения задач
sub := m.redis.Subscribe(ctx, "task-completed", "task-failed")
defer sub.Close()
for msg := range sub.Channel() {
taskID := msg.Payload
// Получаем информацию о задаче
task, err := m.getTask(ctx, taskID)
if err != nil {
log.Error("get task failed", "task_id", taskID, "error", err)
continue
}
// Проверяем, все ли задачи job выполнены
if m.isJobComplete(ctx, task.JobID) {
log.Info("all tasks completed, starting combine", "job_id", task.JobID)
// Запускаем комбайнер
if err := m.combiner.Combine(ctx, task.JobID); err != nil {
log.Error("combine failed", "job_id", task.JobID, "error", err)
m.videoMgr.UpdateStatus(ctx, task.JobID, "failed")
continue
}
m.videoMgr.UpdateStatus(ctx, task.JobID, "ready")
}
}
return nil
}
func (m *MonitorDaemon) isJobComplete(ctx context.Context, jobID string) bool {
// Получаем общее количество задач
total, _ := m.redis.Get(ctx, "job:"+jobID+":total").Int()
// Получаем количество выполненных
completed, _ := m.redis.HGet(ctx, "job:"+jobID+":completed", "count").Int()
return completed >= total
}
6. Компонент: Combiner
type Combiner struct {
tmpStore Storage
outputStore Storage
redis *redis.Client
}
func (c *Combiner) Combine(ctx context.Context, jobID string) error {
// Получаем список всех обработанных чанков
for _, resolution := range []string{"1080p", "720p", "480p", "360p"} {
// Собираем чанки в правильном порядке
chunks, err := c.getCompletedChunks(ctx, jobID, resolution)
if err != nil {
return fmt.Errorf("get chunks for %s: %w", resolution, err)
}
// Конкатенация чанков через FFmpeg concat demuxer
concatFile := c.createConcatFile(chunks)
outputPath := fmt.Sprintf("final/%s/%s.mp4", jobID, resolution)
if err := c.ffmpeg.Concat(ctx, concatFile, outputPath); err != nil {
return fmt.Errorf("concat %s: %w", resolution, err)
}
// Загружаем в S3
if err := c.outputStore.Upload(ctx, outputPath, outputPath); err != nil {
return fmt.Errorf("upload %s: %w", resolution, err)
}
// Очищаем временные файлы
c.cleanupChunks(ctx, chunks)
}
// Обрабатываем аудио
audioChunks, _ := c.getCompletedChunks(ctx, jobID, "audio")
if len(audioChunks) > 0 {
// Аудио обычно один файл, но может быть и нарезано
// ...
}
// Генерируем HLS манифест
if err := c.generateHLSManifest(ctx, jobID); err != nil {
return fmt.Errorf("generate HLS: %w", err)
}
return nil
}
7. Схема состояний задачи
pending → processing → done
│
└──→ failed → retry → pending
(max 3 retries)
└──→ dead_letter_queue
8. Ключевые принципы параллельной обработки
| Принцип | Реализация |
|---|---|
| Независимость чанков | Каждый GOP транскодируется отдельно |
| Разделение по типам | Отдельные очереди для видео, аудио, превью |
| Горизонтальное масштабирование | Kubernetes HPA по длине очередей |
| Отказоустойчивость | Retry с exponential backoff, dead letter queue |
| Идемпотентность | Каждая задача имеет уникальный ID, повторная обработка безопасна |
Такая архитектура позволяет обрабатывать одно видео десятками параллельных воркеров, при этом каждый компонент масштабируется независимо в зависимости от типа нагрузки.
Вопрос 12. Какие вопросы по отказоустойчивости, безопасности и оптимизации стоимости могут задать на этапе обсуждения архитектуры, и как на них отвечать?
Таймкод: 01:06:53
Ответ собеседника: Правильный. Вопросы про оптимизацию CDN ($350K/день) — раздавать из CDN только популярный контент, непопулярный в дешёвом хранилище, нужен сервис аналитики. Отказоустойчивость транскодера — очереди реплицируются, воркеры масштабируются. Критическая уязвимость — S3-хранилище, нужно проектировать как отказоустойчивый распределённый сторедж. Безопасность — изолировать воркеры транскодера на уровне сети, разрешить доступ только к временному хранилищу. Модерация — отдельная нетривиальная асинхронная задача с автоматическими и ручными проверками.
Правильный ответ:
Ответ собеседника покрывает основные направления. Развернём каждое из них детальнее и добавим дополнительные аспекты:
1. Оптимизация стоимости
А. CDN Cost Optimization
Проблема: раздача всего контента через CDN стоит ~$350K/день.
Решения:
- Tiered caching: популярный контент (80% просмотров) — CDN, непопулярный — origin S3
- Popularity-based routing: сервис аналитики определяет популярность и управляет кэшированием
- Regional CDN: использовать разные CDN-провайдеры для разных регионов по стоимости
- P2P delivery: WebRTC-based для живых трансляций (как делали в Peer5, Streamroot)
type PopularityAnalyzer struct {
redis *redis.Client
db *sql.DB
}
func (a *PopularityAnalyzer) ShouldCacheOnCDN(ctx context.Context, videoID string) (bool, error) {
// Проверяем количество просмотров за последние 24 часа
views, err := a.redis.Get(ctx, "video:"+videoID+":views_24h").Int()
if err != nil {
return false, err
}
// Порог: если больше 1000 просмотров в сутки — кэшируем в CDN
const cdnCacheThreshold = 1000
return views >= cdnCacheThreshold, nil
}
func (a *PopularityAnalyzer) GetStorageTier(ctx context.Context, videoID string) (string, error) {
views, _ := a.redis.Get(ctx, "video:"+videoID+":views_7d").Int()
switch {
case views > 10000:
return "hot", nil // SSD + CDN
case views > 100:
return "warm", nil // HDD + regional CDN
default:
return "cold", nil // Glacier/Archive storage
}
}
Б. Storage Cost Optimization
- Erasure coding вместо 3x репликации: Reed-Solomon (10,4) даёт 1.4x overhead вместо 3x
- Deduplication: одинаковые видео (репосты) хранить в одном экземпляре
- Lifecycle policies: автоматический перевод старого контента в холодное хранилище
Сравнение стоимости хранения 23.5 ПБ/год:
- 3x репликация на S3: ~$15M/год
- Erasure coding (1.4x): ~$7M/год
- + Lifecycle (50% в Glacier): ~$4M/год
В. Compute Cost Optimization
- Spot/preemptible instances для транскодирования (до 70% экономии)
- GPU sharing: MIG (Multi-Instance GPU) на NVIDIA A100 для нескольких воркеров
- Batch processing: группировка коротких видео на одном GPU
2. Отказоустойчивость (Fault Tolerance)
А. Отказ воркера транскодирования
// Идемпотентная обработка с дедупликацией
func (w *TranscodeWorker) ProcessWithRetry(ctx context.Context, task SubTask) error {
// Проверяем, не была ли задача уже выполнена
if status, _ := w.redis.Get(ctx, "task:"+task.TaskID+":status").Result(); status == "done" {
log.Info("task already completed, skipping", "task_id", task.TaskID)
return nil
}
// Устанавливаем lock с TTL для предотвращения дублирования
acquired, err := w.redis.SetNX(ctx, "lock:"+task.TaskID, w.workerID, 10*time.Minute).Result()
if err != nil || !acquired {
return fmt.Errorf("task is being processed by another worker")
}
defer w.redis.Del(ctx, "lock:"+task.TaskID)
// Обработка с retry
var lastErr error
for attempt := 0; attempt < 3; attempt++ {
if err := w.processTask(ctx, task); err != nil {
lastErr = err
backoff := time.Duration(math.Pow(2, float64(attempt))) * time.Second
time.Sleep(backoff)
continue
}
return nil
}
// После 3 неудачных попыток — dead letter queue
return w.sendToDeadLetterQueue(ctx, task, lastErr)
}
Б. Отказ комбайнера
Комбайнер должен быть идемпотентным и возобновляемым:
- Состояние сборки хранится в Redis
- При перезапуске комбайнер проверяет, какие чанки уже собраны
- Использование временных файлов с atomic rename
В. Отказ S3-хранилища
- Multi-region replication: данные реплицируются между регионами
- Circuit breaker: при недоступности S3 — буферизация в локальном хранилище
- Graceful degradation: при недоступности хранилища новых видео — показывать старый кэшированный контент
type ResilientStorage struct {
primary Storage // S3
secondary Storage // GCS / другой провайдер
local Storage // локальный диск как fallback
breaker *CircuitBreaker
}
func (s *ResilientStorage) Upload(ctx context.Context, path string, data []byte) error {
if s.breaker.Allow() {
err := s.primary.Upload(ctx, path, data)
if err == nil {
s.breaker.RecordSuccess()
return nil
}
s.breaker.RecordFailure()
}
// Fallback на secondary
if err := s.secondary.Upload(ctx, path, data); err == nil {
// Асинхронная синхронизация с primary когда восстановится
go s.syncToPrimary(ctx, path, data)
return nil
}
// Последний fallback — локальное хранилище
return s.local.Upload(ctx, path, data)
}
3. Безопасность (Security)
А. Изоляция транскодеров
Транскодирование — самый опасный компонент, так как обрабатывает пользовательский контент:
# Kubernetes NetworkPolicy для изоляции воркеров
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: transcoder-isolation
spec:
podSelector:
matchLabels:
app: transcoder-worker
policyTypes:
- Ingress
- Egress
ingress: [] # Нет входящих соединений
egress:
- to:
- namespaceSelector:
matchLabels:
name: storage
ports:
- protocol: TCP
port: 443 # Только к S3
- to:
- namespaceSelector:
matchLabels:
name: redis
ports:
- protocol: TCP
port: 6379 # Только к Redis для состояния
Б. Sandboxing воркеров
// Запуск FFmpeg в изолированном контейнере с ограниченными ресурсами
type SandboxedTranscoder struct {
runtime string // "gvisor", "kata-containers"
}
func (s *SandboxedTranscoder) Transcode(ctx context.Context, input FFmpegInput) error {
cmd := exec.CommandContext(ctx, "runsc", "run",
"--network=none", // Без сети внутри sandbox
"--rootless", // Без root
"--memory-limit=2G", // Лимит памяти
"--cpu-quota=100000", // Лимит CPU
"ffmpeg-container",
"-i", input.Source,
"-c:v", "libx264",
"-b:v", fmt.Sprintf("%dk", input.Bitrate),
"-s", input.Resolution,
input.Output,
)
// Таймаут на случай зависания
ctx, cancel := context.WithTimeout(ctx, 30*time.Minute)
defer cancel()
return cmd.Run()
}
В. Content Security
- ClamAV сканирование загружаемых файлов
- File type validation: проверка magic bytes, а не только расширения
- Rate limiting: ограничение на количество загрузок с одного аккаунта
- Signed URLs: все ссылки на видео подписываются с TTL
4. Модерация контента
type ModerationPipeline struct {
autoModerator *AutoModerator // ML-based
humanQueue *HumanQueue // для ручной проверки
videoManager *VideoManager
}
func (p *ModerationPipeline) Moderate(ctx context.Context, videoID string) error {
// Этап 1: Автоматическая проверка
result, err := p.autoModerator.Check(ctx, videoID)
if err != nil {
return err
}
switch result.Decision {
case "safe":
return p.videoManager.UpdateStatus(ctx, videoID, "published")
case "unsafe":
return p.videoManager.UpdateStatus(ctx, videoID, "blocked")
case "review":
// Этап 2: Ручная проверка
return p.humanQueue.Enqueue(ctx, videoID, result.Flags)
}
return nil
}
5. Дополнительные вопросы, которые могут задать
А. Мониторинг и алертинг
- Как отслеживать здоровье всей системы?
- Какие метрики ключевые? (queue depth, transcoding latency, error rate, storage usage)
- Как быстро обнаружить деградацию?
Б. Disaster Recovery
- RPO (Recovery Point Objective) и RTO (Recovery Time Objective)
- Бэкап метаданных vs бэкап видео
- Multi-region failover
В. Rate Limiting и DDoS Protection
- Защита от массовой загрузки контента
- Защита API от злоупотребления
- CAPTCHA для подозрительных аккаунтов
Все эти вопросы проверяют, что кандидат думает не только о функциональности, но и о production-ready аспектах системы.
Вопрос 13. Каковы итоговые четыре шага System Design собеседования?
Таймкод: 01:14:07
Ответ собеседника: Правильный. Шаг 1 — выяснение требований: задать уточняющие вопросы по функционалу, ограничениям, параметрам. Шаг 2 — оценка ресурсов: посчитать хранилище, трафик, RPS, выявить узкие места. Шаг 3 — углубление в архитектуру: крупноблочная схема, затем детализация компонентов, выбор БД и паттернов. Шаг 4 — обсуждение и защита: ответы на вопросы про отказоустойчивость, безопасность, оптимизацию стоимости.
Правильный ответ:
Ответ собеседника точно описывает четыре шага. Оформим их в виде полного фреймворка, который можно использовать на любом System Design интервью:
1. Шаг 1: Выяснение требований (Requirements Gathering)
Цель: понять задачу, прежде чем начинать решать.
А. Функциональные требования
- Какие основные фичи нужны?
- Какие сценарии использования?
- Какие пользователи и роли?
Б. Нефункциональные требования
- Масштаб (DAU, MAU, объём данных)
- Доступность (SLA)
- Задержка (latency requirements)
- Консистентность (consistency model)
В. Ограничения
- Бюджет
- Существующий стек
- Команда
- Compliance (GDPR и т.д.)
Время: ~5 минут Чего нельзя делать: начинать рисовать диаграмму без уточнений.
2. Шаг 2: Оценка ресурсов (Back-of-the-Envelope Estimation)
Цель: понять порядок величин и выявить узкие места.
Что считаем:
- QPS (queries per second) — средний и пиковый
- Storage — прирост в день/год
- Bandwidth — входящий и исходящий трафик
- Compute — количество серверов/инстансов
Пример структуры расчёта:
Дано: 10M DAU, 30 мин просмотра/день
Просмотры: 10M * (30/5) = 60M/день → 700 RPS avg → 2100 RPS peak
Трафик: 60M * 300MB = 18 PB/день
Хранилище: 28K видео * 300MB * 2.5 (транскод) * 3 (реплика) = 64.5 ТБ/день
Время: ~5 минут Ключевой вывод: определить, что является главным вызовом (в нашем случае — bandwidth и storage, а не RPS).
3. Шаг 3: Проектирование архитектуры (Architecture Design)
Цель: построить систему, которая решает задачу.
А. High-Level Design (крупные блоки)
- Основные компоненты и связи
- Путь пользовательского запроса через систему
- Разделение на сервисы
Б. Deep Dive (детализация)
- Выбор базы данных для каждого компонента с обоснованием
- Паттерны взаимодействия (sync/async, REST/gRPC/message queue)
- Стратегия кэширования
- Стратегия масштабирования
Время: ~20-25 минут Принцип: начинать с простого, итеративно усложнять.
4. Шаг 4: Обсуждение и защита (Discussion & Defense)
Цель: показать глубину опыта и способность к компромиссам.
Темы для обсуждения:
- Отказоустойчивость: что происходит при падении каждого компонента
- Безопасность: защита от злоупотреблений, изоляция, валидация
- Оптимизация стоимости: CDN, tiered storage, spot instances
- Мониторинг: метрики, алертинг, observability
- Эволюция: как система будет расти, какие решения позволят масштабироваться
Время: ~10-15 минут Принцип: не просто отвечать на вопросы, а предлагать решения с обоснованием trade-offs.
5. Сводная таблица
| Шаг | Время | Цель | Критерий успеха |
|---|---|---|---|
| 1. Требования | ~5 мин | Понять задачу | Все ключевые параметры выяснены |
| 2. Оценка ресурсов | ~5 мин | Определить масштаб | Порядок величин посчитан, узкие места выявлены |
| 3. Архитектура | ~25 мин | Построить решение | Схема работает, выборы обоснованы |
| 4. Обсуждение | ~15 мин | Показать глубину | Кандидат умеет защищать решения и видеть trade-offs |
6. Типичные ошибки на каждом шаге
| Шаг | Ошибка | Почему плохо |
|---|---|---|
| 1 | Не задавать вопросы | Рисует не то, что нужно интервьюеру |
| 2 | Пропустить оценку | Нет обоснования выбора технологий |
| 3 | Слишком абстрактно или слишком детально | Нет баланса между широтой и глубиной |
| 4 | Не видеть проблемы в своём решении | Нет критического мышления |
Этот фреймворк применим к любому System Design интервью, независимо от конкретной задачи. Кандидат, который следует этим шагам, демонстрирует структурированное мышление и инженерную зрелость.
Вопрос 14. Разве требования к системе собирает не системный аналитик? Кто собирает требования в технических компаниях?
Таймкод: 01:27:38
Ответ собеседника: Правильный. Не всегда есть системные аналитики. В глубоко технических инфраструктурных компаниях их может не быть вообще. Сбором требований занимаются архитекторы, тимлиды и другие технические специалисты. Идеально, когда есть компетентный человек для сбора требований, но на практике это редкость, поэтому приходится самому выяснять требования на любом уровне.
Правильный ответ:
Ответ собеседника реалистичен и отражает практику многих компаний. Дополним его более полной картиной:
1. Роли, которые собирают требования в разных компаниях
А. Продуктовые компании (Product Companies)
В компаниях, ориентированных на конечного пользователя:
- Product Manager (PM) — собирает бизнес-требования, определяет приоритеты
- Business Analyst / Systems Analyst — формализует требования, пишет спецификации
- Product Owner — в Agile-командах управляет бэклогом
Б. Технические / инфраструктурные компании
В компаниях, где продукт — это платформа или инфраструктура:
- Solutions Architect — проектирует интеграции и собирает требования от клиентов
- Technical Product Manager — мост между бизнесом и инженерами
- Staff/Principal Engineer — часто сам выясняет требования и проектирует решение
В. Стартапы и небольшие команды
- CTO / Tech Lead — совмещает роли архитектора, аналитика и разработчика
- Founder — часто сам формулирует требования на основе видения продукта
2. Почему на интервью кандидат должен сам уточнять требования
А. Это проверка зрелости
Опытный инженер никогда не начинает работу без понимания задачи. Уточнение требований на интервью — это демонстрация того, как кандидат работает в реальной жизни.
Б. В реальности требования всегда неполные
Даже если есть аналитик, требования приходят в виде:
- Прототипа с минимальным описанием
- Устного описания от менеджера
- Баг-репорта без контекста
- Списка хотелок без приоритетов
Инженер должен уметь вытащить недостающую информацию.
В. Разные интерпретации
Одна и та же формулировка может означать разное:
- «Спроектируй чат» — это 1-1 чат? Групповой? С файлами? С уведомлениями?
- «Сделай систему рекомендаций» — это контент? Товары? Пользователи? В реальном времени?
3. Практический пример: как это выглядит в реальной работе
Сценарий: Менеджер приходит с задачей «Нужна система уведомлений»
Неопытный инженер:
- Начинает проектировать: выбирает технологию, рисует схему
- Через 2 недели выясняется, что нужны push-уведомления, а не email
Опытный инженер:
- Задаёт вопросы:
- Какие типы уведомлений? (email, push, SMS, in-app)
- Какой объём? (сколько пользователей, частота)
- Нужна ли персонализация?
- Нужна ли аналитика доставки?
- Какие требования к задержке? (real-time, near real-time, batch)
- Есть ли шаблоны? Нужен ли WYSIWYG-редактор?
- Только после этого начинает проектировать
4. Кто собирает требования в зависимости from контекста
| Контекст | Кто собирает | Форма требований |
|---|---|---|
| Продуктовая фича | PM + Analyst | PRD, User Stories |
| Техническая платформа | Architect + Tech Lead | RFC, Design Doc |
| Интеграция с клиентом | Solutions Architect | SOW, API Spec |
| Внутренний сервис | Сам инженер | Устное обсуждение, тикет |
| Стартап | Founder / CTO | Видение, прототип |
5. Важный нюанс для интервью
На System Design интервью уточнение требований — это не слабость, а сила. Интервьюер специально даёт расплывчатую задачу, чтобы проверить:
- Способность задавать правильные вопросы
- Умение декомпозировать задачу
- Понимание того, что реальные задачи никогда не приходят в готовом виде
- Коммуникативные навыки
Кандидат, который сразу начинает рисовать диаграмму без вопросов, производит впечатление человека, который не умеет работать с неопределённостью — а это критический навык для старших инженеров.
Вопрос 15. Что насчёт DAG (направленного ациклического графа) для обработки видео? Можно ли использовать DAG для описания пайплайна обработки видео?
Таймкод: 01:29:05
Ответ собеседника: Правильный. DAG — подходящая технология для описания взаимодействия задач, их последовательности и зависимостей. Это отличная идея для пайплайна обработки видео. В System Design нет единственного правильного решения. DAG описывает процесс взаимодействия задач, его вполне можно использовать, нужно лишь пояснить как оно работает.
Правильный ответ:
Ответ собеседника верен — DAG является отличным инструментом для описания пайплайнов обработки видео. Раскроем эту тему подробнее:
1. Почему DAG идеально подходит для пайплайна обработки видео
DAG (Directed Acyclic Graph) — это граф с направленными рёбрами без циклов. Он естественно моделирует зависимости между задачами:
Пример DAG для обработки видео:
┌─────────────┐
│ Validate │
└──────┬──────┘
│
┌──────▼──────┐
│ Split │
│ (chunking) │
└──────┬──────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Transcode│ │ Transcode│ │ Audio │
│ 1080p │ │ 720p │ │ Extract │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
┌────▼─────┐ ┌────▼─────┐ │
│ Thumbnail│ │ Thumbnail│ │
│ 1080p │ │ 720p │ │
└────┬─────┘ └────┬─────┘ │
│ │ │
└────────────┼─────────────┘
▼
┌─────────────┐
│ Combine │
└──────┬──────┘
▼
┌─────────────┐
│ Upload │
│ to S3 │
└─────────────┘
2. Реализация DAG для пайплайна
type TaskID string
type Task struct {
ID TaskID
Name string
Dependencies []TaskID // задачи, от которых зависит текущая
Execute func(ctx context.Context, inputs map[TaskID]interface{}) (interface{}, error)
Status TaskStatus // pending, running, done, failed
Result interface{}
Error error
}
type DAG struct {
tasks map[TaskID]*Task
edges map[TaskID][]TaskID // adjacency list
}
func NewDAG() *DAG {
return &DAG{
tasks: make(map[TaskID]*Task),
edges: make(map[TaskID][]TaskID),
}
}
func (d *DAG) AddTask(task *Task) error {
d.tasks[task.ID] = task
for _, dep := range task.Dependencies {
d.edges[dep] = append(d.edges[dep], task.ID)
}
return nil
}
// TopologicalSort возвращает порядок выполнения задач
func (d *DAG) TopologicalSort() ([]TaskID, error) {
inDegree := make(map[TaskID]int)
for id := range d.tasks {
if _, ok := inDegree[id]; !ok {
inDegree[id] = 0
}
}
for _, dependents := range d.edges {
for _, dep := range dependents {
inDegree[dep]++
}
}
queue := []TaskID{}
for id, degree := range inDegree {
if degree == 0 {
queue = append(queue, id)
}
}
result := []TaskID{}
for len(queue) > 0 {
current := queue[0]
queue = queue[1:]
result = append(result, current)
for _, dep := range d.edges[current] {
inDegree[dep]--
if inDegree[dep] == 0 {
queue = append(queue, dep)
}
}
}
if len(result) != len(d.tasks) {
return nil, fmt.Errorf("cycle detected in DAG")
}
return result, nil
}
// Execute запускает DAG с параллельным выполнением независимых задач
func (d *DAG) Execute(ctx context.Context) error {
sorted, err := d.TopologicalSort()
if err != nil {
return err
}
// Группируем задачи по уровням (wave execution)
levels := d.groupByLevels(sorted)
for _, level := range levels {
// Задачи одного уровня выполняются параллельно
errChan := make(chan error, len(level))
var wg sync.WaitGroup
for _, taskID := range level {
task := d.tasks[taskID]
// Проверяем что все зависимости выполнены
if !d.dependenciesMet(task) {
continue
}
wg.Add(1)
go func(t *Task) {
defer wg.Done()
// Собираем результаты зависимостей как входные данные
inputs := make(map[TaskID]interface{})
for _, depID := range t.Dependencies {
inputs[depID] = d.tasks[depID].Result
}
t.Status = "running"
result, err := t.Execute(ctx, inputs)
if err != nil {
t.Status = "failed"
t.Error = err
errChan <- fmt.Errorf("task %s failed: %w", t.ID, err)
return
}
t.Result = result
t.Status = "done"
}(task)
}
wg.Wait()
// Проверяем ошибки
select {
case err := <-errChan:
return err
default:
}
}
return nil
}
func (d *DAG) groupByLevels(sorted []TaskID) [][]TaskID {
// Группировка задач по уровням для параллельного выполнения
levels := [][]TaskID{}
processed := make(map[TaskID]bool)
for len(processed) < len(sorted) {
currentLevel := []TaskID{}
for _, id := range sorted {
if processed[id] {
continue
}
// Проверяем что все зависимости уже обработаны
depsMet := true
for _, dep := range d.tasks[id].Dependencies {
if !processed[dep] {
depsMet = false
break
}
}
if depsMet {
currentLevel = append(currentLevel, id)
}
}
for _, id := range currentLevel {
processed[id] = true
}
levels = append(levels, currentLevel)
}
return levels
}
func (d *DAG) dependenciesMet(task *Task) bool {
for _, depID := range task.Dependencies {
if d.tasks[depID].Status != "done" {
return false
}
}
return true
}
3. Пример использования DAG для пайплайна видео
func BuildVideoProcessingPipeline(videoURL string) *DAG {
dag := NewDAG()
// Задача 1: Валидация
dag.AddTask(&Task{
ID: "validate",
Name: "Validate Video",
Dependencies: []TaskID{},
Execute: func(ctx context.Context, inputs map[TaskID]interface{}) (interface{}, error) {
return validateVideo(videoURL)
},
})
// Задача 2: Извлечение метаданных
dag.AddTask(&Task{
ID: "metadata",
Name: "Extract Metadata",
Dependencies: []TaskID{"validate"},
Execute: func(ctx context.Context, inputs map[TaskID]interface{}) (interface{}, error) {
validationResult := inputs["validate"].(ValidationResult)
return extractMetadata(videoURL, validationResult)
},
})
// Задача 3: Транскодирование 1080p (зависит от валидации)
dag.AddTask(&Task{
ID: "transcode_1080p",
Name: "Transcode 1080p",
Dependencies: []TaskID{"validate"},
Execute: func(ctx context.Context, inputs map[TaskID]interface{}) (interface{}, error) {
return transcode(videoURL, "1080p", 8000)
},
})
// Задача 4: Транскодирование 720p (зависит от валидации)
dag.AddTask(&Task{
ID: "transcode_720p",
Name: "Transcode 720p",
Dependencies: []TaskID{"validate"},
Execute: func(ctx context.Context, inputs map[TaskID]interface{}) (interface{}, error) {
return transcode(videoURL, "720p", 4000)
},
})
// Задача 5: Транскодирование 480p (зависит от валидации)
dag.AddTask(&Task{
ID: "transcode_480p",
Name: "Transcode 480p",
Dependencies: []TaskID{"validate"},
Execute: func(ctx context.Context, inputs map[TaskID]interface{}) (interface{}, error) {
return transcode(videoURL, "480p", 2000)
},
})
// Задача 6: Извлечение аудио (зависит от валидации)
dag.AddTask(&Task{
ID: "audio_extract",
Name: "Extract Audio",
Dependencies: []TaskID{"validate"},
Execute: func(ctx context.Context, inputs map[TaskID]interface{}) (interface{}, error) {
return extractAudio(videoURL)
},
})
// Задача 7: Генерация превью (зависит от транскодирования 1080p)
dag.AddTask(&Task{
ID: "thumbnails",
Name: "Generate Thumbnails",
Dependencies: []TaskID{"transcode_1080p"},
Execute: func(ctx context.Context, inputs map[TaskID]interface{}) (interface{}, error) {
transcodedPath := inputs["transcode_1080p"].(string)
return generateThumbnails(transcodedPath)
},
})
// Задача 8: Сборка финального результата (зависит от всех транскодирований)
dag.AddTask(&Task{
ID: "combine",
Name: "Combine Results",
Dependencies: []TaskID{"transcode_1080p", "transcode_720p", "transcode_480p", "audio_extract", "thumbnails", "metadata"},
Execute: func(ctx context.Context, inputs map[TaskID]interface{}) (interface{}, error) {
return combineResults(CombineInputs{
Video1080p: inputs["transcode_1080p"].(string),
Video720p: inputs["transcode_720p"].(string),
Video480p: inputs["transcode_480p"].(string),
Audio: inputs["audio_extract"].(string),
Thumbnails: inputs["thumbnails"].([]string),
Metadata: inputs["metadata"].(VideoMetadata),
})
},
})
return dag
}
4. Популярные инструменты для DAG-оркестрации
| Инструмент | Тип | Особенности |
|---|---|---|
| Apache Airflow | Open-source | Python-based, богатый UI, множество интеграций |
| Prefect | Open-source / Cloud | Современная альтернатива Airflow |
| Argo Workflows | Kubernetes-native | Контейнерные задачи, идеален для K8s |
| Dagster | Open-source | Data-aware, сильная типизация |
| Temporal | Open-source | Durable execution, fault-tolerant |
| Luigi | Open-source | Простой, от Spotify |
| AWS Step Functions | Managed | Serverless, интеграция с AWS |
5. Преимущества DAG подхода
- Параллелизм: независимые задачи выполняются одновременно
- Отказоустойчивость: можно перезапускать только failed задачи
- Визуализация: граф легко визуализировать и отлаживать
- Декларативность: описываем что нужно сделать, а не как
- Масштабируемость: легко добавлять новые задачи в пайплайн
6. Пример с Argo Workflows (Kubernetes-native)
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: video-processing
spec:
entrypoint: pipeline
templates:
- name: pipeline
dag:
tasks:
- name: validate
template: validate-video
- name: transcode-1080p
dependencies: [validate]
template: transcode
arguments:
parameters: [{name: resolution, value: "1080p"}]
- name: transcode-720p
dependencies: [validate]
template: transcode
arguments:
parameters: [{name: resolution, value: "720p"}]
- name: audio-extract
dependencies: [validate]
template: extract-audio
- name: combine
dependencies: [transcode-1080p, transcode-720p, audio-extract]
template: combine-results
- name: validate-video
container:
image: transcoder:latest
command: ["./validate"]
- name: transcode
container:
image: transcoder:latest
command: ["./transcode", "{{parameters.resolution}}"]
resources:
limits:
nvidia.com/gpu: 1 # GPU для транскодирования
- name: extract-audio
container:
image: transcoder:latest
command: ["./extract-audio"]
- name: combine-results
container:
image: transcoder:latest
command: ["./combine"]
DAG — это не просто допустимый, а рекомендуемый подход для описания сложных пайплайнов обработки видео. Он используется в production практически всеми крупными видеоплатформами.
Вопрос 16. Почему бы не загружать видео напрямую с клиента в хранилище, а не через API? Как тогда ограничить доступ?
Таймкод: 01:30:07
Ответ собеседника: Правильный. Загрузка идёт через API, который абстрагирует доступ к хранилищу. S3 умеет только принимать поток байтов, а API отвечает за маршрутизацию, метаинформацию, общение с клиентами. Существуют signed URLs в S3, которые позволяют выдавать пользователю подписанный URL для загрузки напрямую в бакет, минуя API. Но для этого нужна генерация таких URL в API.
Правильный ответ:
Ответ собеседника корректно описывает оба подхода. Развернём сравнение:
1. Два подхода к загрузке видео
А. Загрузка через API (Proxy Upload)
Client → API Server → S3
Б. Прямая загрузка через Signed URL (Direct Upload)
Client → API Server (get signed URL) → Client → S3
2. Загрузка через API — плюсы и минусы
// Proxy Upload — сервер проксирует данные
func (s *UploadService) UploadDirect(w http.ResponseWriter, r *http.Request) {
// 1. Аутентификация
userID, err := s.auth.Authenticate(r)
if err != nil {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
// 2. Валидация
file, header, err := r.FormFile("video")
if err != nil {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
defer file.Close()
// 3. Проверка размера
if header.Size > s.maxFileSize {
http.Error(w, "file too large", http.StatusRequestEntityTooLarge)
return
}
// 4. Загрузка в S3 (данные проходят через наш сервер)
uploadID := uuid.New().String()
if err := s.storage.Upload(r.Context(), uploadID, file, header.Size); err != nil {
http.Error(w, "upload failed", http.StatusInternalServerError)
return
}
// 5. Сохранение метаданных
s.db.CreateVideo(r.Context(), userID, uploadID, header.Filename)
w.WriteHeader(http.StatusOK)
}
Плюсы:
- Полный контроль над процессом загрузки
- Можно выполнить валидацию до загрузки
- Проще реализовать resumable upload с отслеживанием прогресса
- Единая точка для rate limiting и аутентификации
Минусы:
- Двойной трафик: клиент → API → S3
- API сервер становится bottleneck для больших файлов
- Высокое потребление ресурсов на API сервере (CPU, RAM, bandwidth)
- Медленная загрузка для пользователей далеко от API
3. Прямая загрузка через Signed URL — плюсы и минусы
// Direct Upload — выдаём signed URL клиенту
func (s *UploadService) GetUploadURL(w http.ResponseWriter, r *http.Request) {
// 1. Аутентификация
userID, err := s.auth.Authenticate(r)
if err != nil {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
// 2. Валидация метаданных запроса
var req struct {
FileName string `json:"file_name"`
FileSize int64 `json:"file_size"`
MIMEType string `json:"mime_type"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
// 3. Проверки
if req.FileSize > s.maxFileSize {
http.Error(w, "file too large", http.StatusRequestEntityTooLarge)
return
}
if !s.isAllowedMIMEType(req.MIMEType) {
http.Error(w, "unsupported format", http.StatusUnsupportedMediaType)
return
}
// 4. Проверка квоты пользователя
if !s.quota.Check(userID, req.FileSize) {
http.Error(w, "quota exceeded", http.StatusForbidden)
return
}
// 5. Создаём запись в БД
uploadID := uuid.New().String()
if err := s.db.CreateUpload(r.Context(), userID, uploadID, req.FileName, req.FileSize); err != nil {
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
// 6. Генерируем signed URL
signedURL, err := s.storage.GeneratePresignedPutURL(r.Context(), PresignedURLRequest{
Bucket: "raw-videos",
Key: fmt.Sprintf("%s/%s", userID, uploadID),
ExpiresIn: 15 * time.Minute,
ContentType: req.MIMEType,
MaxFileSize: req.FileSize,
})
if err != nil {
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
// 7. Возвращаем URL клиенту
json.NewEncoder(w).Encode(UploadURLResponse{
UploadID: uploadID,
SignedURL: signedURL.URL,
ExpiresAt: signedURL.ExpiresAt,
Headers: signedURL.RequiredHeaders,
})
}
Плюсы:
- Нет двойного трафика — данные идут напрямую в S3
- API сервер не нагружается передачей данных
- Быстрая загрузка — клиент загружает в ближайший S3 endpoint
- Экономия на инфраструктуре API
Минусы:
- Меньше контроля во время загрузки
- Сложнее реализовать resumable upload
- Клиент может отправить больше данных, чем ожидалось (нужна политика S3)
- Сложнее отменить загрузку после выдачи URL
4. Ограничение доступа при прямой загрузке
// Генерация signed URL с ограничениями
func (s *S3Storage) GeneratePresignedPutURL(ctx context.Context, req PresignedURLRequest) (*PresignedURL, error) {
// Условия для signed URL
conditions := map[string]string{
"Content-Type": req.ContentType,
"x-amz-meta-upload-id": req.UploadID,
}
// Политика безопасности
policy := &s3.PostPolicy{
Expiration: time.Now().Add(req.ExpiresIn),
Conditions: []s3.PolicyCondition{
// Ограничение размера файла
s3.NewContentLengthRangeCondition(1, req.MaxFileSize),
// Ограничение по Content-Type
s3.NewConditionEquals("Content-Type", req.ContentType),
// Ограничение по бакету
s3.NewConditionEquals("bucket", req.Bucket),
// Префикс ключа (пользователь может загружать только в свою папку)
s3.NewConditionStartsWith("key", req.UserID+"/"),
},
}
// Генерация presigned POST URL
result, err := s.presigner.PresignPostObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(req.Bucket),
Key: aws.String(req.Key),
Metadata: map[string]string{
"upload-id": req.UploadID,
"user-id": req.UserID,
},
}, s3.WithPresignExpires(req.ExpiresIn))
if err != nil {
return nil, err
}
return &PresignedURL{
URL: result.URL,
Fields: result.Values,
ExpiresAt: time.Now().Add(req.ExpiresIn),
RequiredHeaders: map[string]string{
"Content-Type": req.ContentType,
},
}, nil
}
5. S3 Bucket Policy для дополнительной защиты
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowUploadOnlyToOwnFolder",
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::account-id:role/uploader"},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::raw-videos/${aws:userid}/*",
"Condition": {
"StringEquals": {
"s3:x-amz-acl": "private"
},
"NumericLessThanEquals": {
"s3:content-length-range": 1073741824
}
}
},
{
"Sid": "DenyNonSSL",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:*",
"Resource": "arn:aws:s3:::raw-videos/*",
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
}
]
}
6. Гибридный подход (рекомендуемый)
На практике часто используют комбинацию:
1. Client → API: запрос на загрузку с метаданными
2. API → Client: signed URL + upload_id
3. Client → S3: прямая загрузка через signed URL
4. S3 → API: уведомление через S3 Event Notification (webhook)
5. API: обновление статуса, запуск транскодирования
// Обработка события от S3 после загрузки
func (s *UploadService) HandleS3UploadComplete(ctx context.Context, event S3Event) error {
for _, record := range event.Records {
bucket := record.S3.Bucket.Name
key := record.S3.Object.Key
// Извлекаем upload_id из метаданных
metadata, err := s.storage.HeadObject(ctx, bucket, key)
if err != nil {
return err
}
uploadID := metadata.Metadata["upload-id"]
userID := metadata.Metadata["user-id"]
// Обновляем статус
if err := s.db.UpdateUploadStatus(ctx, uploadID, "uploaded"); err != nil {
return err
}
// Публикуем задачу на транскодирование
if err := s.producer.Publish(ctx, "transcode-jobs", TranscodeJob{
UploadID: uploadID,
UserID: userID,
SourceURL: fmt.Sprintf("s3://%s/%s", bucket, key),
}); err != nil {
return err
}
}
return nil
}
7. Сравнение подходов
| Критерий | Proxy Upload | Direct Upload (Signed URL) | Гибридный |
|---|---|---|---|
| Контроль | Полный | Ограниченный | Высокий |
| Нагрузка на API | Высокая | Минимальная | Низкая |
| Скорость загрузки | Медленнее | Быстрее | Быстрая |
| Resumable upload | Просто | Сложно | Средне |
| Безопасность | Высокая | Средняя | Высокая |
| Сложность | Простая | Средняя | Высокая |
Для высоконагруженной видеоплатформы гибридный подход является оптимальным — он сочетает контроль API с производительностью прямой загрузки в S3.
Вопрос 17. Что делать, если загрузка видео оборвалась и часть данных осталась в хранилище?
Таймкод: 01:30:20
Ответ собеседника: Правильный. По крону или таймауту вычищать незавершённые данные. Отправлять сообщения в Video Manager о незавершённых загрузках. По расписанию (guard/cron) чистить базу и хранилище от остатков.
Правильный ответ:
Ответ собеседника описывает базовый подход. Детализируем его до production-ready решения:
1. Проблема незавершённых загрузок
При загрузке больших видео могут возникать:
- Обрыв соединения на стороне клиента
- Таймаут на стороне сервера
- Падение пода/контейнера во время загрузки
- Пользователь закрыл браузер/приложение
- Исчерпание квоты пользователя во время загрузки
В результате в хранилище остаются «осиротевшие» данные, которые занимают место и не используются.
2. Многоуровневая стратегия очистки
А. Уровень 1: TTL на сессию загрузки
type UploadSession struct {
UploadID string `json:"upload_id" db:"upload_id"`
UserID string `json:"user_id" db:"user_id"`
Status string `json:"status" db:"status"`
TotalSize int64 `json:"total_size" db:"total_size"`
UploadedBytes int64 `json:"uploaded_bytes" db:"uploaded_bytes"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
ExpiresAt time.Time `json:"expires_at" db:"expires_at"` // TTL
}
func (s *UploadService) CreateSession(ctx context.Context, userID string, totalSize int64) (*UploadSession, error) {
session := &UploadSession{
UploadID: uuid.New().String(),
UserID: userID,
Status: "in_progress",
TotalSize: totalSize,
UploadedBytes: 0,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
ExpiresAt: time.Now().Add(24 * time.Hour), // сессия живёт 24 часа
}
if err := s.db.CreateSession(ctx, session); err != nil {
return nil, err
}
// Устанавливаем TTL в Redis для быстрой проверки
s.redis.Set(ctx, "upload_session:"+session.UploadID, "active", 24*time.Hour)
return session, nil
}
Б. Уровень 2: S3 Lifecycle Policy
// S3 Bucket Lifecycle Configuration
{
"Rules": [
{
"ID": "CleanupIncompleteUploads",
"Status": "Enabled",
"Filter": {
"Prefix": "uploads/"
},
"AbortIncompleteMultipartUpload": {
"DaysAfterInitiation": 1
}
},
{
"ID": "CleanupStaleChunks",
"Status": "Enabled",
"Filter": {
"Prefix": "chunks/"
},
"Expiration": {
"Days": 1
}
}
]
}
В. Уровень 3: Garbage Collector (Guard Service)
type GarbageCollector struct {
db *sql.DB
storage Storage
redis *redis.Client
batchSize int
}
func (gc *GarbageCollector) Run(ctx context.Context) error {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := gc.Cleanup(ctx); err != nil {
log.Error("cleanup failed", "error", err)
}
}
}
}
func (gc *GarbageCollector) Cleanup(ctx context.Context) error {
// 1. Находим просроченные сессии
staleSessions, err := gc.db.GetStaleUploadSessions(ctx, time.Now())
if err != nil {
return fmt.Errorf("get stale sessions: %w", err)
}
log.Info("found stale sessions", "count", len(staleSessions))
for _, session := range staleSessions {
if err := gc.cleanupSession(ctx, session); err != nil {
log.Error("cleanup session failed",
"upload_id", session.UploadID,
"error", err)
continue
}
}
// 2. Находим orphan chunks в S3 (без соответствующей сессии в БД)
orphanChunks, err := gc.findOrphanChunks(ctx)
if err != nil {
return fmt.Errorf("find orphan chunks: %w", err)
}
for _, chunk := range orphanChunks {
if err := gc.storage.Delete(ctx, chunk); err != nil {
log.Error("delete orphan chunk failed",
"key", chunk,
"error", err)
}
}
return nil
}
func (gc *GarbageCollector) cleanupSession(ctx context.Context, session UploadSession) error {
log.Info("cleaning up stale session",
"upload_id", session.UploadID,
"user_id", session.UserID,
"uploaded_bytes", session.UploadedBytes)
// 1. Удаляем данные из S3
prefix := fmt.Sprintf("uploads/%s/%s/", session.UserID, session.UploadID)
if err := gc.storage.DeletePrefix(ctx, prefix); err != nil {
log.Error("failed to delete from S3", "prefix", prefix, "error", err)
}
// 2. Удаляем multipart uploads
if err := gc.storage.AbortMultipartUploads(ctx, prefix); err != nil {
log.Error("failed to abort multipart uploads", "error", err)
}
// 3. Обновляем статус в БД
if err := gc.db.UpdateUploadStatus(ctx, session.UploadID, "expired"); err != nil {
return fmt.Errorf("update status: %w", err)
}
// 4. Удаляем из Redis
gc.redis.Del(ctx, "upload_session:"+session.UploadID)
// 5. Возвращаем квоту пользователю
gc.redis.IncrBy(ctx, "quota:"+session.UserID, session.UploadedBytes)
return nil
}
func (gc *GarbageCollector) findOrphanChunks(ctx context.Context) ([]string, error) {
// Сканируем S3 префикс chunks/ и проверяем,
// существует ли соответствующая сессия в БД
var orphans []string
chunks, err := gc.storage.ListObjects(ctx, "chunks/", 1000)
if err != nil {
return nil, err
}
for _, chunk := range chunks {
// Извлекаем session_id из ключа
sessionID := extractSessionID(chunk.Key)
// Проверяем в Redis (быстро)
exists, _ := gc.redis.Exists(ctx, "upload_session:"+sessionID).Result()
if exists == 0 {
// Двойная проверка в БД (медленно, но надёжно)
session, err := gc.db.GetSession(ctx, sessionID)
if err != nil || session.Status == "expired" {
orphans = append(orphans, chunk.Key)
}
}
}
return orphans, nil
}
3. Мониторинг и алертинг
type CleanupMetrics struct {
staleSessionsFound prometheus.Counter
staleSessionsCleaned prometheus.Counter
orphanChunksFound prometheus.Counter
orphanChunksDeleted prometheus.Counter
cleanupErrors prometheus.Counter
storageReclaimed prometheus.Counter // bytes
}
func (gc *GarbageCollector) recordMetrics(session UploadSession, bytesReclaimed int64) {
gc.metrics.staleSessionsCleaned.Inc()
gc.metrics.storageReclaimed.Add(float64(bytesReclaimed))
}
4. SQL для поиска просроченных сессий
-- Найти все сессии, которые не обновлялись более 24 часов
SELECT upload_id, user_id, uploaded_bytes, updated_at
FROM upload_sessions
WHERE status = 'in_progress'
AND updated_at < NOW() - INTERVAL '24 hours'
ORDER BY updated_at ASC
LIMIT 1000;
-- Найти сессии, которые никогда не были завершены
-- и не обновлялись более 7 дней
SELECT upload_id, user_id, uploaded_bytes, created_at
FROM upload_sessions
WHERE status IN ('in_progress', 'uploaded')
AND created_at < NOW() - INTERVAL '7 days'
AND updated_at < NOW() - INTERVAL '7 days';
-- Статистика по «осиротевшим» данным
SELECT
COUNT(*) as orphan_count,
SUM(uploaded_bytes) as total_orphan_bytes,
AVG(EXTRACT(EPOCH FROM (NOW() - updated_at))) as avg_stale_seconds
FROM upload_sessions
WHERE status = 'in_progress'
AND updated_at < NOW() - INTERVAL '24 hours';
5. Схема процесса очистки
┌─────────────────────────────────────────────────────────────┐
│ GARBAGE COLLECTOR │
│ (runs every 1h) │
└─────────────────────────────────────────────────────────────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌────────────┐ ┌──────────┐ ┌──────────────┐
│ Find Stale │ │ Find │ │ S3 Lifecycle │
│ Sessions │ │ Orphans │ │ (automatic) │
│ in DB │ │ in S3 │ │ │
└─────┬──────┘ └────┬─────┘ └──────────────┘
│ │
▼ ▼
┌──────────────────────┐
│ Cleanup Session │
│ 1. Delete S3 data │
│ 2. Abort multipart │
│ 3. Update DB status │
│ 4. Return quota │
│ 5. Delete Redis key │
└──────────────────────┘
6. Ключевые принципы
- S3 Lifecycle — первая линия защиты, автоматически удаляет незавершённые multipart uploads через 24 часа
- Guard Service — вторая линия, обрабатывает сессии в БД и возвращает квоты
- Идемпотентность — повторный запуск очистки безопасен
- Rate limiting — не удалять слишком много за раз, чтобы не нагруж
Вопрос 18. Какая база данных хорошо масштабируется? Как выбирать между SQL и NoSQL?
Таймкод: 01:33:48
Ответ собеседника: Правильный. PostgreSQL хорошо масштабируется, хотя шардирование не встроено. Из коробки хорошо масштабируется MongoDB. Выбор зависит от задачи: SQL — для транзакционности, консистентности, отношений между данными. NoSQL — это не единый тип, а что угодно кроме SQL: key-value, документные, blob, графовые. Выбор определяется требованиями и нагрузками.
Правильный ответ:
Ответ собеседника корректен. Развернём тему в виде полного руководства по выбору базы данных:
1. Критерии выбора базы данных
Выбор БД — это не вопрос «какая лучше», а вопрос «какая лучше подходит под конкретные требования». Ключевые критерии:
А. Модель данных
- Структурированные данные с чёткой схемой → SQL
- Полуструктурированные/документы → Document DB
- Пары ключ-значение → Key-Value Store
- Связи между сущностями → Graph DB
- Временные ряды → Time-Series DB
Б. Паттерн доступа
- Много JOIN'ов и сложных запросов → SQL
- Чтение по первичному ключу → Key-Value / Document
- Полнотекстовый поиск → Elasticsearch
- Агрегации по времени → Time-Series DB
В. Требования к консистентности
- ACID-транзакции → SQL
- Eventual consistency приемлема → NoSQL
Г. Масштаб
- До 100K QPS → одна инстанс PostgreSQL с репликами
- 100K-1M QPS → шардирование или специализированная БД
- Свыше 1M QPS → распределённые системы (Cassandra, ScyllaDB)
2. SQL базы данных
А. PostgreSQL
-- Пример схемы для метаданных видео
CREATE TABLE videos (
video_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(user_id),
title VARCHAR(500) NOT NULL,
description TEXT,
status VARCHAR(20) NOT NULL DEFAULT 'uploading',
duration_seconds INT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ
);
-- Составной индекс для типичных запросов
CREATE INDEX idx_videos_user_created ON videos(user_id, created_at DESC);
CREATE INDEX idx_videos_status ON videos(status) WHERE status = 'ready';
-- Партиционирование по времени для событий просмотра
CREATE TABLE view_events (
event_id BIGSERIAL,
video_id UUID NOT NULL,
user_id UUID,
watched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
duration INT -- сколько секунд смотрел
) PARTITION BY RANGE (watched_at);
CREATE TABLE view_events_2024_q1 PARTITION OF view_events
FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');
CREATE TABLE view_events_2024_q2 PARTITION OF view_events
FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');
Масштабирование PostgreSQL:
- Read Replicas — для масштабирования чтения
- Connection Pooling — PgBouncer для управления соединениями
- Partitioning — партиционирование больших таблиц по времени
- Citus — расширение для шардирования (distributed PostgreSQL)
Б. MySQL / MariaDB
Аналогично PostgreSQL, но:
- Проще в настройке
- Меньше функций (нет полнотекстового поиска из коробки, меньше типов данных)
- Хорошо работает для простых CRUD-приложений
3. NoSQL базы данных
А. Key-Value: Redis
// Redis для кэширования и быстрого доступа
type VideoCache struct {
client *redis.Client
ttl time.Duration
}
func (c *VideoCache) GetVideoMeta(ctx context.Context, videoID string) (*VideoMeta, error) {
data, err := c.client.Get(ctx, "video:meta:"+videoID).Bytes()
if err == redis.Nil {
return nil, nil // cache miss
}
if err != nil {
return nil, err
}
var meta VideoMeta
if err := json.Unmarshal(data, &meta); err != nil {
return nil, err
}
return &meta, nil
}
func (c *VideoCache) SetVideoMeta(ctx context.Context, videoID string, meta *VideoMeta) error {
data, _ := json.Marshal(meta)
return c.client.Set(ctx, "video:meta:"+videoID, data, c.ttl).Err()
}
// Redis для rate limiting
func (c *VideoCache) CheckUploadRateLimit(ctx context.Context, userID string) (bool, error) {
key := "ratelimit:upload:" + userID
current, err := c.client.Incr(ctx, key).Result()
if err != nil {
return false, err
}
if current == 1 {
c.client.Expire(ctx, key, time.Hour)
}
return current <= 10, nil // максимум 10 загрузок в час
}
Б. Document: MongoDB
// MongoDB для хранения метаданных с гибкой схемой
type VideoDocument struct {
ID primitive.ObjectID `bson:"_id,omitempty"`
VideoID string `bson:"video_id"`
UserID string `bson:"user_id"`
Title string `bson:"title"`
Description string `bson:"description"`
Status string `bson:"status"`
Artifacts []Artifact `bson:"artifacts"` // вложенный массив
Tags []string `bson:"tags"`
Metadata map[string]interface{} `bson:"metadata"` // гибкие поля
CreatedAt time.Time `bson:"created_at"`
}
type Artifact struct {
Resolution string `bson:"resolution"`
Codec string `bson:"codec"`
Bitrate int `bson:"bitrate_kbps"`
SizeBytes int64 `bson:"size_bytes"`
StorageKey string `bson:"storage_key"`
}
// Создание индексов
collection.Indexes().CreateMany(ctx, []mongo.IndexModel{
{
Keys: bson.D{{Key: "video_id", Value: 1}},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{
{Key: "user_id", Value: 1},
{Key: "created_at", Value: -1},
},
},
{
Keys: bson.D{{Key: "tags", Value: 1}}, // мультиключевой индекс
},
})
В. Wide-Column: Cassandra / ScyllaDB
// Cassandra для событий просмотра (высокая запись, чтение по ключу)
// CQL Schema:
CREATE TABLE view_events (
video_id UUID,
user_id UUID,
watched_at TIMESTAMP,
duration INT,
PRIMARY KEY ((video_id), watched_at, user_id)
) WITH CLUSTERING ORDER BY (watched_at DESC)
AND compaction = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 1};
// Запись события просмотра
INSERT INTO view_events (video_id, user_id, watched_at, duration)
VALUES (?, ?, toTimestamp(now()), ?);
4. Сравнительная таблица
| Характеристика | PostgreSQL | MongoDB | Redis | Cassandra |
|---|---|---|---|---|
| Модель | Реляционная | Документная | Key-Value | Wide-Column |
| ACID | Да | Нет (single-doc) | Нет | Нет |
| JOIN'ы | Да | $lookup (ограничен) | Нет | Нет |
| Шардирование | Citus / вручную | Встроено | Cluster mode | Встроено |
| Чтение | ~50K QPS | ~100K QPS | ~500K QPS | ~200K QPS |
| Запись | ~20K QPS | ~50K QPS | ~300K QPS | ~500K QPS |
| Лучше для | Метаданные, транзакции | Гибкая схема, каталог | Кэш, сессии | Временные ряды, события |
5. Правило выбора для System Design интервью
Задайте себе вопросы:
1. Нужны ли транзакции и целостность данных?
ДА → SQL (PostgreSQL)
НЕТ → переход к вопросу 2
2. Какой паттерн доступа?
По ключу → Key-Value (Redis)
Документы с гибкой схемой → Document (MongoDB)
Связи между сущностями → Graph (Neo4j)
Агрегации по времени → Time-Series (InfluxDB, TimescaleDB)
Высокая запись + чтение по partition key → Wide-Column (Cassandra)
3. Нужен ли полнотекстовый поиск?
ДА → Elasticsearch / Meilisearch
НЕТ → переход к вопросу 4
4. Какой масштаб?
< 100K QPS → PostgreSQL + Redis
100K - 1M QPS → шардирование или специализированная БД
> 1M QPS → распределённые системы
6. Polyglot Persistence — норма для сложных систем
В реальных системах используют несколько БД одновременно:
YouTube-подобная система:
PostgreSQL → метаданные видео, пользователи, подписки
Redis → кэш, сессии, rate limiting, счётчики просмотров
Elasticsearch → полнотекстовый поиск по видео
Cassandra → события просмотра, аналитика
S3 → хранение видео-файлов
ClickHouse → агрегированная аналитика
7. Пример выбора для компонентов нашей системы
// Video Manager — метаданные → PostgreSQL
type VideoManager struct {
db *sql.DB // PostgreSQL для ACID
cache *redis.Client // Redis для кэширования
}
func (m *VideoManager) GetVideo(ctx context.Context, videoID string) (*Video, error) {
// 1. Проверяем кэш
if cached, err := m.cache.Get(ctx, "video:"+videoID).Result(); err == nil {
var video Video
json.Unmarshal([]byte(cached), &video)
return &video, nil
}
// 2. Читаем из PostgreSQL
video, err := m.db.GetVideo(ctx, videoID)
if err != nil {
return nil, err
}
// 3. Кэшируем
data, _ := json.Marshal(video)
m.cache.Set(ctx, "video:"+videoID, data, 5*time.Minute)
return video, nil
}
// View Analytics — события просмотра → Cassandra
type ViewAnalytics struct {
session *gocql.Session
}
func (a *ViewAnalytics) RecordView(ctx context.Context, event ViewEvent) error {
return a.session.Query(
`INSERT INTO view_events (video_id, user_id, watched_at, duration)
VALUES (?, ?, ?, ?)`,
event.VideoID, event.UserID, time.Now(), event.Duration,
).WithContext(ctx).Exec()
}
// Search — поиск по видео → Elasticsearch
type VideoSearch struct {
client *elasticsearch.Client
}
func (s *VideoSearch) Search(ctx context.Context, query string) ([]SearchResult, error) {
res, err := s.client.Search(
s.client.Search.WithContext(ctx),
s.client.Search.WithIndex("videos"),
s.client.Search.WithBody(strings.NewReader(fmt.Sprintf(`{
"query": {
"multi_match": {
"query": "%s",
"fields": ["title^3", "description", "tags"]
}
}
}`, query))),
)
// ...
}
Ключевой вывод: не существует «лучшей» базы данных. Каждая БД оптимизирована под определённый тип нагрузки. Зрелый инженер выбирает инструмент под задачу, а не задачу под инструмент.
Вопрос 19. Как развивать техническую насмотренность для понимания, какие инструменты и базы данных использовать?
Таймкод: 01:36:38
Ответ собеседника: Правильный. Нужно читать для чего используются различные технологии и какими компаниями. Соотносить задачи с подходящими инструментами. Книга «System Design Interview» (кабанчик) — лучшая по теме. Книга Алекса Сюя разбирает типовые кейсы. Если проседает тема — углубиться в неё.
Правильный ответ:
Ответ собеседника даёт хороший старт. Развернём это в полный план развития насмотренности:
1. Книги и фундаментальные ресурсы
А. Обязательные книги
| Книга | Автор | Фокус |
|---|---|---|
| System Design Interview (vol. 1 & 2) | Alex Xu | Типовые задачи, подходы к проектированию |
| Designing Data-Intensive Applications | Martin Kleppmann | Фундамент: хранение, обработка, распределённые системы |
| System Design Interview (кабанчик) | Anonymous | Практические кейсы с диаграммами |
| Understanding Distributed Systems | Roberto Vitillo | Распределённые системы с нуля |
| Database Internals | Alex Petrov | Устройство баз данных изнутри |
Б. Онлайн-ресурсы
- System Design Primer (github.com/donnemartin/system-design-primer) — крупнейший сборник материалов
- High Scalability (highscalability.com) — разборы архитектур реальных систем
- AWS Architecture Center — reference architectures от Amazon
- Google Cloud Architecture Framework — best practices от Google
- InfoQ — доклады и статьи от инженеров крупных компаний
2. Инженерные блоги компаний
Чтение блогов — самый быстрый способ понять, как реальные компании решают реальные проблемы:
| Компания | Блог | Что почерпнуть |
|---|---|---|
| Netflix | netflixtechblog.com | Resilience, CDN, microservices |
| Uber | eng.uber.com | Distributed transactions, geoservices |
| Meta | engineering.fb.com | Scale, AI infrastructure |
| Dropbox | dropbox.tech | Storage, migration |
| Airbnb | medium.com/airbnb-engineering | Search, data infrastructure |
| Spotify | engineering.at.spotify.com | Recommendation, event-driven |
| Discord | discord.com/blog | Real-time, Elixir, Rust |
| Cloudflare | blog.cloudflare.com | Edge computing, networking |
| Shopify | shopify.engineering | Rails at scale, Kubernetes |
| ByteDance | medium.com/bytedance | TikTok architecture |
3. Конструктивный подход: ментальная модель
Вместо запоминания «Kafka для всего» — стройте ментальную модель:
Задача: Нужна очередь сообщений
Вопросы для выбора:
├── Нужен порядок сообщений?
│ ├── Да, глобальный → Kafka (partition-level), NATS
│ └── Да, только внутри группы → Kafka (per-key ordering)
├── Нужны гарантии доставки?
│ ├── At-least-once → Kafka, RabbitMQ
│ ├── Exactly-once → Kafka (idempotent producer)
│ └── At-most-once → SQS, NATS
├── Какой throughput?
│ ├── < 10K msg/s → RabbitMQ, SQS
│ ├── 10K-1M msg/s → Kafka
│ └── > 1M msg/s → Kafka (partitioned), Pulsar
├── Нужно ли хранить историю?
│ ├── Да → Kafka (log-based)
│ └── Нет → RabbitMQ, SQS
└── Сложность эксплуатации?
├── Managed → SQS, Google Pub/Sub
└── Self-hosted → Kafka, RabbitMQ
4. Практические способы развития
А. Разбирайте архитектуры известных систем
Для каждого популярного сервиса попробуйте спроектировать его самостоятельно, а затем сравните с реальной архитектурой:
YouTube → CDN, транскодирование, рекомендации
WhatsApp → Real-time messaging, E2E encryption
Twitter → Fan-out, timeline, trending
Uber → Geospatial indexing, matching
Netflix → CDN, transcoding, recommendation
Discord → Real-time voice, sharding
Б. Изучайте исходный код open-source проектов
- Kubernetes — как работает оркестрация
- CockroachDB — как распределённая SQL БД
- Vitess — как шардируется MySQL
- MinIO — как работает S3-совместимое хранилище
- NATS — как работает messaging system
В. Практикуйтесь на реальных задачах
// Пример: при разработке своего проекта осознанно выбирайте технологии
// Вместо "поставлю PostgreSQL потому что знаю её":
// Задача: хранить сессии пользователей
// Варианты:
// - PostgreSQL: надёжно, но избыточно для простых ключ-значение
// - Redis: идеально подходит, TTL из коробки, ~1M ops/sec
// Выбор: Redis
// Задача: хранить логи действий пользователей
// Варианты:
// - PostgreSQL: будет расти бесконечно, нужна ротация
// - Cassandra: отлично для append-only, TTL per row
// - ClickHouse: если нужны аналитические запросы
// Выбор: ClickHouse (аналитика) + S3 (архив)
5. Дорожная карта развития
Уровень 1: Основы (1-2 месяца)
├── Прочитать "System Design Interview" (Alex Xu)
├── Изучить основы: Load Balancer, Cache, CDN, Message Queue
└── Попрактиковаться на 5-10 классических задачах
Уровень 2: Углубление (2-3 месяца)
├── Прочитать "Designing Data-Intensive Applications"
├── Изучить внутреннее устройство 3-5 технологий
├── Читать 2-3 инженерных блога в неделю
└── Разобрать архитектуры 10+ известных систем
Уровень 3: Экспертиза (3-6 месяцев)
├── Прочитать "Database Internals"
├── Изучить распределённые системы (Raft, Paxos, CAP)
├── Писать design docs для своих проектов
└── Участвовать в code review и архитектурных обсуждениях
6. Чек-лист для каждой изучаемой технологии
При изучении новой технологии отвечайте на вопросы:
1. Какую проблему решает?
2. Какие гарантии предоставляет? (consistency, durability, ordering)
3. Каковы ограничения? (max throughput, max data size, latency)
4. Как масштабируется? (vertical, horizontal, sharding)
5. Как обеспечивается отказоустойчивость? (replication, failover)
6. Когда НЕ стоит использовать?
7. Какие альтернативы и в чём отличия?
8. Какие компании используют в production?
7. Пример: чек-лист для Apache Kafka
1. Проблема: Передача событий между сервисами с высоким throughput
2. Гарарантии: At-least-once (default), exactly-once (idempotent), per-partition ordering
3. Ограничения: ~1M msg/s per cluster, message size default 1MB
4. Масштабирование: Добавление партиций и брокеров
5. Отказоустойчивость: Репликация (ISR), min.insync.replicas
6. НЕ использовать: Для простых очередей (overkill), когда нужен request-reply
7. Альтернативы: RabbitMQ (простые очереди), Pulsar (multi-tenancy), NATS (лёгкий)
8. Кто использует: LinkedIn, Uber, Netflix, Airbnb
Техническая насмотренность — это не врождённый талант, а результат систематической работы. Регулярное чтение инженерных блогов, разбор реальных архитектур и практика проектирования формируют интуицию, которая позволяет быстро выбирать правильный инструмент под задачу.
Вопрос 20. Можно ли отдельно купить мок-интервью без курса? Можно ли приобрести практику отдельно?
Таймкод: 01:39:36
Ответ собеседника: Правильный. Мок-интервью можно купить отдельно через заявку в боте или написав менеджеру Алмазу. Практики внутри курса немного, основной упор на теорию. По вопросам продукта можно написать Алмазу.
Правильный ответ:
Ответ собеседника исчерпывающий. Дополним контекстом о том, как мок-интервью вписываются в подготовку:
1. Формат мок-интервью
Мок-интервью — это симуляция реального собеседования с опытным инженером, который выступает в роли интервьюера. Это один из самых эффективных способов подготовки, потому что:
- Даёт реальный опыт общения под давлением
- Выявляет слабые места, которые сложно заметить самостоятельно
- Позволяет отработать структуру ответов и тайминг
- Даёт обратную связь от человека с опытом проведения интервью
2. Как получить максимум от мок-интервью
А. Перед интервью
- Пройти основные теоретические материалы
- Попробовать самостоятельно решить 2-3 задачи
- Подготовить список тем, которые хотите проработать
Б. Во время интервью
- Не бойтесь говорить вслух ход мысли
- Задавайте уточняющие вопросы (как на реальном интервью)
- Просите дать обратную связь по каждому этапу
В. После интервью
- Зафиксировать полученную обратную связь
- Проработать слабые места
- Через 1-2 недели провести повторное интервью
3. Альтернативные способы практики
Если мок-интервью недоступно, можно:
- Практиковаться с коллегой — по очереди выступать интервьюером и кандидатом
- Записывать себя на видео — потом анализировать структуру ответа и подачу
- Использовать Pramp (pramp.com) — бесплатная платформа для peer-to-peer мок-интервью
- Использовать interviewing.io — анонимные мок-интервью с инженерами из FAANG
- Писать design docs — для своих текущих проектов, как если бы вы готовились к интервью
4. Рекомендуемая структура подготовки
Неделя 1-2: Теория
├── Прочитать "System Design Interview" (Alex Xu)
├── Изучить ключевые концепции (CAP, Consistent Hashing, etc.)
└── Посмотреть разборы типовых задач
Неделя 3-4: Самостоятельная практика
├── Решить 5-7 задач с таймером (45 мин на задачу)
├── Записать ответы на видео
└── Сравнить с эталонными решениями
Неделя 5-6: Мок-интервью
├── Провести 2-3 мок-интервью
├── Получить обратную связь
└── Проработать слабые места
Неделя 7-8: Финальная подготовка
├── Повторить сложные темы
├── Провести финальное мок-интервью
└── Подготовить вопросы для интервьюера
Мок-интервью — это инвестиция, которая значительно повышает шансы на успешное прохождение реального собеседования. Даже одно интервью может выявить пробелы, которые вы не замечали при самостоятельной подготовке.
Вопрос 21. Как спроектировать чат (аналог Slack)? Какое хранилище выбрать для сообщений и как организовать доставку?
Таймкод: 01:40:15
Ответ собеседника: Правильный. Для доставки — Kafka, сокеты, шардирование клиентов по подам, доставочные очереди. Задача зависит от ограничений: Telegram/Slack с реакциями и видео — одно, просто текст — другое. Текстовые сообщения — SQL или специализированные решения. Вложения — blob-хранилище со ссылками. Нужно давать оценки и на основе этого выбирать решение.
Правильный ответ:
Ответ собеседника задаёт правильное направление. Развернём полное проектирование чат-системы:
1. Сбор требований
А. Функциональные требования
- 1-1 чат и групповые чаты (до 10K участников в группе)
- Текстовые сообщения, реакции, форварды, ответы на сообщения
- Вложения: файлы, изображения, видео
- Индикаторы: «печатает...», «прочитано», «доставлено»
- История сообщений с поиском
- Push-уведомления
Б. Количественные оценки
Дано: 50M DAU, средний пользователь отправляет 40 сообщений/день
Сообщений в день: 50M * 40 = 2B сообщений/день
Средний размер сообщения: 200 байт (текст) + метаданные
Хранилище сообщений: 2B * 500 байт ≈ 1 ТБ/день
Пиковый RPS на отправку: 2B / 86400 * 3 (peak) ≈ 70K msg/sec
Одновременных соединений (WebSocket): 50M * 20% = 10M
2. Высокоуровневые компоненты
┌────────┐ ┌──────────┐ ┌─────────────┐
│ Client │◀───▶│ API │◀───▶│ Message │
│ (Web/ │ WS │ Gateway │ │ Service │
│Mobile) │ │ │ │ │
└────────┘ └──────────┘ └──────┬──────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Kafka │ │ Message │ │ Presence │
│ (events) │ │ Store │ │ Service │
└───────────┘ │ (ScyllaDB)│ │ (Redis) │
└───────────┘ └───────────┘
3. Хранилище сообщений
А. Выбор базы данных
Для сообщений чата нужна БД, которая:
- Хорошо пишет (миллионы сообщений в секунду)
- Быстро читает по chat_id с пагинацией
- Поддерживает TTL (автоудаление старых сообщений)
- Масштабируется горизонтально
ScyllaDB (Cassandra-совместимая) — оптимальный выбор:
-- Сообщения в чатах
CREATE TABLE chat_messages (
chat_id UUID,
bucket TEXT, -- партиция по времени: "2024-01"
message_id TIMEUUID, -- сортируемый по времени
sender_id UUID,
content TEXT,
msg_type TEXT, -- 'text', 'image', 'file', 'system'
reply_to UUID, -- ссылка на сообщение-родителя
attachments LIST<FROZEN<attachment>>,
created_at TIMESTAMP,
PRIMARY KEY ((chat_id, bucket), message_id)
) WITH CLUSTERING ORDER BY (message_id DESC)
AND default_time_to_live = 31536000; -- 1 год TTL
-- Матрица прочтения: кто до какого сообщения прочитал
CREATE TABLE read_receipts (
chat_id UUID,
user_id UUID,
last_read_msg TIMEUUID,
updated_at TIMESTAMP,
PRIMARY KEY ((chat_id), user_id)
);
-- Список чатов пользователя (с последним сообщением)
CREATE TABLE user_chats (
user_id UUID,
chat_id UUID,
chat_name TEXT,
chat_type TEXT, -- 'direct', 'group', 'channel'
last_msg TEXT,
last_msg_at TIMESTAMP,
unread_count INT,
PRIMARY KEY (user_id, last_msg_at)
) WITH CLUSTERING ORDER BY (last_msg_at DESC);
Б. Почему не PostgreSQL?
PostgreSQL плохо подходит для этого сценария:
- 2B записей в день — нужна партиция по времени
- Шардирование в PostgreSQL сложное (Citus помогает, но это доп. сложность)
- Высокий write throughput — Cassandra/ScyllaDB лучше оптимизированы
PostgreSQL можно использовать для:
- Профилей пользователей
- Метаданных чатов (название, участники, настройки)
- Настроек уведомлений
4. Доставка сообщений в реальном времени
А. WebSocket Gateway
type WebSocketGateway struct {
// shardID → map[userID]*Conn
connections sync.Map
kafka *KafkaProducer
presence *PresenceService
}
type Conn struct {
UserID string
Send chan []byte
ShardID int
}
func (g *WebSocketGateway) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
userID := authenticate(r)
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
client := &Conn{
UserID: userID,
Send: make(chan []byte, 256),
ShardID: g.getShardID(userID),
}
// Регистрируем соединение
g.register(client)
// Помечаем пользователя как онлайн
g.presence.SetOnline(r.Context(), userID)
// Запускаем горутины для чтения и записи
go g.writePump(client)
go g.readPump(client)
}
func (g *WebSocketGateway) readPump(client *Conn) {
defer g.unregister(client)
for {
_, data, err := client.Conn.ReadMessage()
if err != nil {
break
}
var msg IncomingMessage
if err := json.Unmarshal(data, &msg); err != nil {
continue
}
// Создаём событие и публикуем в Kafka
event := &MessageEvent{
MessageID: uuid.New().String(),
ChatID: msg.ChatID,
SenderID: client.UserID,
Content: msg.Content,
Type: "new_message",
Timestamp: time.Now(),
}
// Публикуем в партицию по chat_id для сохранения порядка
g.kafka.Publish(context.Background(), "chat-messages", msg.ChatID, event)
}
}
Б. Message Router (consumer Kafka → WebSocket)
type MessageRouter struct {
kafkaReader *kafka.Reader
gateway *WebSocketGateway
presence *PresenceService
}
func (r *MessageRouter) Run(ctx context.Context) error {
for {
msg, err := r.kafkaReader.ReadMessage(ctx)
if err != nil {
return err
}
var event MessageEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
continue
}
// Получаем список участников чата
members, err := r.getChatMembers(ctx, event.ChatID)
if err != nil {
continue
}
// Отправляем каждому участнику
for _, memberID := range members {
if memberID == event.SenderID {
continue // не отправляем отправителю
}
// Проверяем онлайн ли пользователь
if r.presence.IsOnline(ctx, memberID) {
// Отправляем через WebSocket
r.gateway.SendToUser(memberID, event)
} else {
// Отправляем push-уведомление
r.sendPushNotification(ctx, memberID, event)
}
}
}
}
5. Шардирование WebSocket-соединений
type ShardManager struct {
numShards int
shards []*Shard
}
type Shard struct {
id int
connections map[string]*Conn // userID → Conn
mu sync.RWMutex
}
func (sm *ShardManager) getShard(userID string) *Shard {
hash := fnv32(userID)
return sm.shards[hash%uint32(sm.numShards)]
}
func (sm *ShardManager) SendToUser(userID string, data []byte) error {
shard := sm.getShard(userID)
shard.mu.RLock()
conn, ok := shard.connections[userID]
shard.mu.RUnlock()
if !ok {
return fmt.Errorf("user %s not connected", userID)
}
select {
case conn.Send <- data:
return nil
default:
// Канал переполнен — закрываем соединение
close(conn.Send)
return fmt.Errorf("send buffer full for user %s", userID)
}
}
6. Пагинация истории сообщений
type MessageService struct {
db *gocql.Session
}
func (s *MessageService) GetMessages(ctx context.Context, chatID string, cursor *time.Time, limit int) ([]Message, error) {
bucket := time.Now().Format("2006-01") // текущий месяц
query := `SELECT message_id, sender_id, content, msg_type, created_at
FROM chat_messages
WHERE chat_id = ? AND bucket = ?`
args := []interface{}{chatID, bucket}
if cursor != nil {
query += ` AND message_id < ?`
args = append(args, cursor)
}
query += ` ORDER BY message_id DESC LIMIT ?`
args = append(args, limit)
iter := s.db.Query(query, args...).WithContext(ctx).Iter()
var messages []Message
var msg Message
for iter.Scan(&msg.MessageID, &msg.SenderID, &msg.Content, &msg.Type, &msg.CreatedAt) {
messages = append(messages, msg)
}
if err := iter.Close(); err != nil {
return nil, err
}
return messages, nil
}
7. Схема доставки сообщения
Sender → API Gateway → Kafka (partitioned by chat_id)
│
▼
Message Router (consumer)
│
┌──────────┼──────────┐
▼ ▼ ▼
ScyllaDB WebSocket Push
(persist) (online) (offline)
│ │ │
▼ ▼ ▼
History Recipient APNs/FCM
8. Ключевые решения и компромиссы
| Решение | Обоснование |
|---|---|
| ScyllaDB для сообщений | Высокий write throughput, TTL, горизонтальное масштабирование |
| Kafka для событий | Порядок сообщений внутри чата, decoupling, replay |
| WebSocket для real-time | Низкая latency, full-duplex |
| Redis для presence | Быстрый доступ, TTL для heartbeat |
| PostgreSQL для метаданных | ACID для профилей, настроек чатов |
| S3 для вложений | Дешёвое хранение бинарных данных |
9. Оценка ресурсов
10M одновременных WebSocket соединений:
- ~10K соединений на под
- 1000 подов для WebSocket Gateway
- Каждый под: 2 CPU, 4GB RAM
Kafka:
- 70K msg/sec peak
- 100 партиций для chat-messages
- 3 брокера
ScyllaDB:
- 1 TB/день новых данных
- 6 нод (3 для write, 3 для read)
- Каждая: 16 CPU, 64GB RAM, 2TB NVMe
Это базовая архитектура, которая покрывает основные сценарии чат-системы уровня Slack/Telegram.
Вопрос 22. Что подарит компания тем, кто купит продукт до конца дня (19 декабря 2021)?
Таймкод: 01:44:03
Ответ собеседника: Правильный. Всем, кто оформит и оплатит один из тарифов до 23:59 МСК 19 декабря 2021 года, компания дарит одну бесплатную сессию с ментором. Эту сессию можно потратить на дополнительное мок-интервью или на ответы на сложные вопросы.
Правильный ответ:
Ответ собеседника точен. Это маркетинговое предложение, которое не требует технического раскрытия. Подарок — одна бесплатная сессия с ментором для тех, кто приобрёл продукт в указанный срок. Сессия может быть использована для дополнительного мок-интервью или разбора сложных вопросов с опытным инженером.
