Интервью на позицию Go-разработчика в VK
Сегодня мы разберём систем-дизайн интервью, на котором кандидат Дима проектирует распределённое файловое хранилище типа S3 с поддержкой операций Put, Get и Delete. В ходе собеседования обсуждаются архитектурные решения: разделение на сервисы (User Service, Bucket Manager, Blob Storage), борьба с дублированием файлов через хэширование, шардирование, кэширование горячих файлов, а также обработка сбоев при загрузке и удалении с использованием паттерна Outbox и брокера сообщений. Интервьюер последовательно усложняет задачу, поднимая вопросы масштабируемости, отказоустойчивости и мониторинга, что позволяет оценить глубину понимания кандидатом принципов проектирования высоконагруженных распределённых систем.
Вопрос 1. Что такое S3?
Таймкод: 00:00:22
Ответ собеседника: Правильный. S3 — это распределённое хранилище файлов, поддерживающее запись и чтение потоками по небольшим кусочкам (чанкам), предназначенное для хранения больших файлов в распределённых системах.
Правильный ответ:
Amazon S3 (Simple Storage Service) — это объектное хранилище от AWS, обеспечивающее масштабируемость, доступность и надёжность хранения данных.
Ключевые характеристики:
- Объектная модель хранения: данные хранятся как объекты внутри бакетов (buckets), каждый объект имеет уникальный ключ (key), данные (body), метаданные и версию.
- Доступность и долговечность: S3 обеспечивает 99.999999999% (11 девяток) долговечности данных и 99.99% доступности.
- Классы хранения: S3 Standard, S3 Intelligent-Tiering, S3 Standard-IA, S3 One Zone-IA, S3 Glacier, S3 Glacier Deep Archive — различаются по стоимости, доступности и времени извлечения.
- Multipart Upload: поддерживает загрузку больших файлов по частям (чанками), что позволяет параллельно загружать части и возобновлять загрузку после сбоев.
- Версионирование: можно включить версионирование бакета для сохранения всех версий объекта.
- Политики доступа: IAM-политики, bucket policies, ACL, presigned URLs для гранулярного управления доступом.
Пример работы с S3 на Go (AWS SDK v2):
package main
import (
"context"
"fmt"
"log"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
func main() {
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
if err != nil {
log.Fatal(err)
}
client := s3.NewFromConfig(cfg)
// Загрузка объекта
_, err = client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String("my-bucket"),
Key: aws.String("my-key"),
Body: strings.NewReader("hello world"),
})
if err != nil {
log.Fatal(err)
}
// Получение объекта
resp, err := client.GetObject(context.TODO(), &s3.GetObjectInput{
Bucket: aws.String("my-bucket"),
Key: aws.String("my-key"),
})
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
}
Важные нюансы для интервью:
- S3 — это eventually consistent для перезаписи и удаления (до перехода к strong consistency, которую AWS добавила в 2020 году — теперь S3 предоставляет read-after-write consistency).
- Максимальный размер одного объекта — 5 ТБ, одна загрузка через PutObject — до 5 ГБ, для больших файлов используется Multipart Upload.
- S3 Select позволяет выполнять SQL-подобные запросы прямо по объектам (CSV, JSON, Parquet) без полной загрузки.
Вопрос 2. Как устроен storage файлов в рамках проектирования S3? Какие операции он поддерживает?
Таймкод: 00:01:00
Ответ собеседника: Правильный. Storage файлов — это отдельный компонент, не знающий о метаданных. Он предоставляет базовый HTTP-интерфейс: POST для загрузки файла (возвращает строковый идентификатор), GET по идентификатору для получения файла и DELETE по идентификатору для удаления набора байт.
Правильный ответ:
Storage файлов (File Storage / Blob Storage) — это отдельный сервис в архитектуре S3, отвечающий исключительно за хранение «сырых» байтов. Он ничего не знает о метаданных, именах файлов, правах доступа — только принимает и отдаёт данные по идентификатору.
Принцип разделения ответственности:
- Metadata Service хранит маппинг: имя файла → storage_id, владелец, размер, чексумма, права доступа.
- File Storage хранит только байты по storage_id. Это позволяет масштабировать метаданные и файловое хранилище независимо.
Базовый HTTP API File Storage:
| Метод | Путь | Описание |
|---|---|---|
POST | /files | Загрузка файла, возвращает storage_id (строковый идентификатор) |
GET | /files/{storage_id} | Получение файла по идентификатору |
DELETE | /files/{storage_id} | Удаление файла по идентификатору |
Пример реализации на Go:
package filestorage
import (
"crypto/sha256"
"encoding/hex"
"io"
"net/http"
"sync"
)
type Storage struct {
mu sync.RWMutex
files map[string][]byte // storage_id -> data
}
func New() *Storage {
return &Storage{
files: make(map[string][]byte),
}
}
func (s *Storage) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodPost:
s.upload(w, r)
case http.MethodGet:
s.download(w, r)
case http.MethodDelete:
s.delete(w, r)
default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
}
func (s *Storage) upload(w http.ResponseWriter, r *http.Request) {
data, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "read error", http.StatusInternalServerError)
return
}
defer r.Body.Close()
// Генерация идентификатора на основе содержимого (content-addressable)
hash := sha256.Sum256(data)
id := hex.EncodeToString(hash[:])
s.mu.Lock()
s.files[id] = data
s.mu.Unlock()
w.WriteHeader(http.StatusCreated)
w.Write([]byte(id))
}
func (s *Storage) download(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("storage_id")
s.mu.RLock()
data, ok := s.files[id]
s.mu.RUnlock()
if !ok {
http.Error(w, "not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(data)
}
func (s *Storage) delete(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("storage_id")
s.mu.Lock()
_, ok := s.files[id]
if ok {
delete(s.files, id)
}
s.mu.Unlock()
if !ok {
http.Error(w, "not found", http.StatusNotFound)
return
}
w.WriteHeader(http.StatusNoContent)
}
Важные архитектурные нюансы:
- Content-addressable storage: идентификатор генерируется из хэша содержимого (SHA-256), что обеспечивает дедупликацию — одинаковые файлы хранятся один раз.
- Stateless: File Storage не знает о пользователях и именах файлов, что упрощает горизонтальное масштабирование.
- Реальное масштабирование: в продакшене вместо in-memory map используется распределённое хранилище (HDFS, MinIO, Ceph) или шардирование по storage_id.
- Garbage Collection: при удалении файла из метаданных нужно проверить, не ссылается ли другой объект на тот же storage_id (reference counting).
- Chunked upload: для больших файлов поддерживается загрузка чанками с последующей сборкой, аналогично Multipart Upload в AWS S3.
Вопрос 3. Нужно ли разбивать большой файл на чанки при загрузке в storage?
Таймкод: 00:02:58
Ответ собеседателя: Правильный. Для упрощения на данном этапе файл не нужно разбивать на чанки — сколько пришло, столько и кладём в storage целиком.
Правильный ответ:
На начальном этапе проектирования — нет, можно загружать файл целиком. Это упрощает архитектуру и позволяет быстрее получить работающий прототип.
Когда чанки становятся необходимы:
- Файлы больше 100 МБ — загрузка целиком ненадёжна: обрыв соединения означает повтор загрузки всего файла.
- Параллельная загрузка — чанки можно загружать одновременно, увеличивая пропускную способность.
- Возобновление после сбоев (resume) — при обрыве загружаются только недостающие чанки.
- Ограничения инфраструктуры — лимиты на размер HTTP-запросов, таймауты прокси и балансировщиков.
Как работает chunked upload (Multipart Upload):
- Initiate — клиент инициирует загрузку, сервер возвращает
upload_id. - Upload Part — клиент разбивает файл на чанки (например, по 5–10 МБ) и загружает каждый отдельно, получая
ETagдля каждого. - Complete — клиент отправляет список
ETag'ов, сервер собирает файл. - Abort — можно отменить загрузку, удалив все загруженные чанки.
Пример реализации на Go:
package filestorage
import (
"crypto/sha256"
"encoding/hex"
"io"
"net/http"
"sync"
)
type MultipartStorage struct {
mu sync.RWMutex
parts map[string]map[int][]byte // upload_id -> part_number -> data
files map[string][]byte // storage_id -> assembled data
}
func NewMultipartStorage() *MultipartStorage {
return &MultipartStorage{
parts: make(map[string]map[int][]byte),
files: make(map[string][]byte),
}
}
// InitiateUpload создаёт новую multipart-сессию
func (s *MultipartStorage) InitiateUpload(w http.ResponseWriter, r *http.Request) {
uploadID := generateID()
s.mu.Lock()
s.parts[uploadID] = make(map[int][]byte)
s.mu.Unlock()
w.Write([]byte(uploadID))
}
// UploadPart загружает одну часть
func (s *MultipartStorage) UploadPart(w http.ResponseWriter, r *http.Request) {
uploadID := r.PathValue("upload_id")
partNum := atoi(r.PathValue("part_number"))
data, _ := io.ReadAll(r.Body)
r.Body.Close()
s.mu.Lock()
s.parts[uploadID][partNum] = data
s.mu.Unlock()
hash := sha256.Sum256(data)
w.Write([]byte(hex.EncodeToString(hash[:])))
}
// CompleteUpload собирает файл из частей
func (s *MultipartStorage) CompleteUpload(w http.ResponseWriter, r *http.Request) {
uploadID := r.PathValue("upload_id")
partNumbers := parsePartNumbers(r) // [1, 2, 3, ...]
s.mu.Lock()
defer s.mu.Unlock()
var fullFile []byte
for _, num := range partNumbers {
part, ok := s.parts[uploadID][num]
if !ok {
http.Error(w, "missing part", http.StatusBadRequest)
return
}
fullFile = append(fullFile, part...)
}
hash := sha256.Sum256(fullFile)
storageID := hex.EncodeToString(hash[:])
s.files[storageID] = fullFile
delete(s.parts, uploadID) // очистка временных данных
w.Write([]byte(storageID))
}
Практические рекомендации:
- Размер чанка: 5–10 МБ (AWS S3 требует минимум 5 МБ, кроме последней части).
- Хранить чанки во временном хранилище с TTL для автоматической очистки незавершённых загрузок.
- Идемпотентность: повторная загрузка того же чанка не должна ломать состояние.
- Для продакшена стоит рассмотреть готовые решения: MinIO, AWS S3 SDK с автоматическим multipart.
Вопрос 4. Каковы ограничения по размеру файлов и нагрузке на систему (rps на чтение и запись)?
Таймкод: 00:03:35
Ответ собеседника: Правильный. Максимальный размер файла — до 1 терабайта. Нагрузка: несколько сотен тысяч rps на чтение и несколько десятков тысяч rps на запись.
Правильный ответ:
Ограничения по размеру файлов:
- Минимальный размер: 0 байт (пустой файл допустим).
- Максимальный размер: до 1 ТБ (1024 ГБ) на один файл. Для файлов больше 5 ГБ обязана использоваться chunked upload (multipart).
- Размер чанка: от 5 МБ до 5 ГБ (для multipart upload), минимум 100 чанков, максимум 10 000 чанков.
Нагрузка (RPS):
- Чтение: несколько сотен тысяч rps (200 000–500 000+ GET-запросов в секунду на один бакет).
- Запись: несколько десятков тысяч rps (30 000–100 000+ PUT-запросов в секунду на один бакет).
AWS S3 масштабируется автоматически, но есть важные нюансы:
- Лимиты на префикс: 3 500 PUT/COPY/POST/DELETE и 5 500 GET/HEAD на один префикс в секунду. Для равномерного распределения нагрузки ключи должны быть равномерно распределены (избегать последовательных префиксов вроде
2024/01/01/, лучше использовать хэш-префикс). - NoSQL-паттерн: для метаданных с высокой нагрузкой на запись используется DynamoDB или партиционированные таблицы с равномерным распределением ключей.
Пример расчёта для проектирования:
Дано:
- 10 млн пользователей
- Каждый загружает 2 файла в день
- Средний размер файла: 5 МБ
- Пиковая нагрузка: 3x от средней
Расчёт записи:
- Средняя: 10M * 2 / 86400 ≈ 231 rps
- Пиковая: 231 * 3 ≈ 693 rps
Расчёт чтения (предположим 10 просмотров на файл в день):
- Средняя: 10M * 2 * 10 / 86400 ≈ 2 315 rps
- Пиковая: 2 315 * 3 ≈ 6 945 rps
Трафик записи: 693 * 5 МБ ≈ 3.5 ГБ/с
Трафик чтения: 6 945 * 5 МБ ≈ 35 ГБ/с
Архитектурные решения для масштабирования:
- CDN (CloudFront): для снижения rps на origin при высокой нагрузке на чтение.
- Sharding метаданных: разделение по user_id или storage_id.
- Асинхронная обработка: загрузка → ответ клиенту → фоновая обработка (генерация превью, антивирусная проверка).
- Rate limiting: защита от злоупотреблений на уровне API Gateway.
Вопрос 5. Нужен ли функционал очистки файлов по времени, дедупликация по контенту и физическое удаление файлов из storage при удалении из S3?
Таймкод: 00:05:04
Ответ собеседния: Правильный. Очистка файлов по времени не требуется. Дедупликация по контенту нужна — если файл с таким же контентом уже загружен, не нужно хранить его повторно. При удалении файла из S3 необходимо физически удалять его из storage, чтобы не допускать утечек места.
Правильный ответ:
Очистка файлов по времени (TTL/Lifecycle):
В рамках базового проектирования не требуется, но в реальной системе жизненный цикл файлов управляется через:
- Lifecycle Policies: автоматический переход между классами хранения (Standard → IA → Glacier) или удаление через N дней.
- Временные файлы: multipart-загрузки, которые не были завершены за 24–72 часа, должны автоматически очищаться.
- Корзина (soft delete): файлы помечаются как удалённые и физически удаляются через 30 дней.
Дедупликация по контенту:
Критически важна для экономии места. Реализуется через content-addressable storage:
// Генерация storage_id из хэша содержимого
func generateStorageID(data []byte) string {
hash := sha256.Sum256(data)
return hex.EncodeToString(hash[:])
}
// При загрузке: если файл с таким хэшем уже есть — просто добавляем ссылку в метаданные
func (s *Service) Upload(ctx context.Context, filename string, data []byte) error {
storageID := generateStorageID(data)
// Проверяем, существует ли файл с таким содержимым
exists, err := s.meta.Exists(ctx, storageID)
if err != nil {
return err
}
if !exists {
// Загружаем в File Storage
if err := s.storage.Put(ctx, storageID, data); err != nil {
return err
}
}
// Увеличиваем счётчик ссылок
return s.meta.CreateFile(ctx, filename, storageID, int64(len(data)))
}
Физическое удаление файлов из storage:
Обязательно, но с оговорками — нужно реализовать reference counting:
// Удаление файла с проверкой счётчика ссылок
func (s *Service) Delete(ctx context.Context, filename string) error {
// Получаем storage_id из метаданных
file, err := s.meta.GetFile(ctx, filename)
if err != nil {
return err
}
// Уменьшаем счётчик ссылок
refs, err := s.meta.DecrementRefs(ctx, file.StorageID)
if err != nil {
return err
}
// Если больше никто не ссылается — удаляем физически
if refs <= 0 {
if err := s.storage.Delete(ctx, file.StorageID); err != nil {
return err
}
}
// Удаляем метаданные
return s.meta.DeleteFile(ctx, filename)
}
Важные нюансы:
- Race condition: при одновременном удалении двух файлов с одинаковым storage_id нужна атомарная операция decrement-and-check. Решается через транзакции или атомарные операции в БД (например,
UPDATE refs SET count = count - 1 WHERE storage_id = ? AND count > 0). - Отложенное удаление: физическое удаление можно откладывать и выполнять фоновым процессом (garbage collector), чтобы не блокировать пользовательский запрос.
- Soft delete: файл помечается как удалённый и физически удаляется через N дней — это защищает от случайного удаления и позволяет восстановить данные.
- Аудит удалений: логирование всех операций удаления для отслеживания и возможности восстановления.
Вопрос 6. Нужна ли репликация самих файлов в storage?
Таймкод: 00:06:11
Ответ собеседника: Правильный. Да, репликация файлов обеспечивается самим storage — после загрузки можно быть уверенным, что файлы будут доступны и не потеряются.
Правильный ответ:
Да, репликация критически важна. Без неё любой сбой сервера или диска приведёт к потере данных. В проектировании S3-подобной системы репликация обеспечивается на уровне файлового хранилища.
Уровни репликации:
А. Внутри одного дата-центра (rack-level replication)
- Файл хранится на 3 разных серверах (replication factor = 3).
- Записи идут параллельно на все реплики или на одну (primary), а остальные синхронизируются асинхронно.
- При чтении запрос идёт на любую доступную реплику.
Б. Между дата-центрами (cross-region replication)
- Файлы реплицируются в другой регион для отказоустойчивости.
- Используется асинхронная репликация (eventual consistency между регионами).
- Время репликации: секунды — минуты.
Стратегии записи:
| Стратегия | Описание | Плюсы | Минусы |
|---|---|---|---|
| Quorum (W + R > N) | Запись на W реплик, чтение с R из N | Баланс consistency/availability | Сложнее в реализации |
| All replicas | Запись на все N реплик | Strong consistency | Медленно, отказ одной = отказ записи |
| Primary + async | Запись на primary, асинхронная репликация | Быстрая запись | Риск потери данных при сбое primary |
Пример реализации на Go (quorum write):
package storage
import (
"context"
"sync"
"errors"
)
type ReplicaStorage struct {
replicas []*StorageNode
factor int // replication factor N
writeW int // минимальное число успешных записей
readR int // минимальное число успешных чтений
}
func NewReplicaStorage(replicas []*StorageNode, factor int) *ReplicaStorage {
return &ReplicaStorage{
replicas: replicas,
factor: factor,
writeW: factor/2 + 1, // большинство
readR: factor/2 + 1,
}
}
func (rs *ReplicaStorage) Put(ctx context.Context, id string, data []byte) error {
var wg sync.WaitGroup
errCh := make(chan error, len(rs.replicas))
successCh := make(chan struct{}, len(rs.replicas))
for _, replica := range rs.replicas {
wg.Add(1)
go func(r *StorageNode) {
defer wg.Done()
if err := r.Put(ctx, id, data); err != nil {
errCh <- err
} else {
successCh <- struct{}{}
}
}(replica)
}
wg.Wait()
close(errCh)
close(successCh)
successes := len(successCh)
if successes < rs.writeW {
return errors.New("quorum write failed")
}
return nil
}
func (rs *ReplicaStorage) Get(ctx context.Context, id string) ([]byte, error) {
// Читаем с R реплик, возвращаем самую свежую версию
// (с учётом векторных часов или версий)
// ...
}
Рекомендации по проектированию:
- Replication factor = 3 — стандарт для большинства распределённых хранилищ.
- Erasure coding вместо полной репликации для экономии места: файл разбивается на K фрагментов, добавляются M контрольных фрагментов. Для восстановления достаточно любых K из K+M. Экономия: ~50% vs 3x репликация.
- Anti-entropy протокол: периодическая синхронизация реплик через Merkle trees для обнаружения расхождений.
- Hinted handoff: если реплика недоступна, другая нода принимает данные и передаёт, когда реплика вернётся.
Вопрос 7. Какова архитектура предлагаемой системы S3 и какие основные компоненты выделяются?
Таймкод: 00:06:49
Ответ собеседния: Неполный. Система разделяется на несколько слоёв: клиент, CDN для публичного контента, балансировщик, API Gateway, сервис пользователей (User Service) с JWT-авторизацией, кэшированием (Redis) и базой данных (таблицы users, sessions, divisions для управления правами); сервис управления бакетами (Bucket Manager) с базой данных (таблицы bucket, file, block_file), кэшем горячих файлов и отдельным сервисом List Objects для агрегированных списков файлов; blob storage для хранения самих файлов; шина данных (Kafka) для асинхронного удаления файлов из storage через Outbox-паттерн. Дедупликация реализуется через хранение хэша файла в таблице block_file. Шардирование по ключу (bucket_name + key) для равномерной нагрузки.
Правильный ответ:
Полная архитектура S3-подобной системы:
1. Клиентский слой:
- SDK/CLI — клиентские библиотеки для загрузки/скачивания файлов, поддержка multipart upload, retry logic, presigned URLs.
- CDN (CloudFront) — кеширование горячего контента на edge-серверах для снижения latency и нагрузки на origin. Используется для публичных бакетов с контентом, который часто читается (изображения, видео, статические файлы).
2. Балансировка и API Gateway:
- Load Balancer — распределение трафика между инстансами сервисов (round-robin, least connections, consistent hashing).
- API Gateway — маршрутизация запросов, rate limiting, аутентификация JWT, валидация входных данных, SSL termination.
3. User Service:
- Авторизация — JWT-токены, управление сессиями.
- Управление правами — разграничение доступа к бакетам и файлам.
- База данных:
users— данные пользователейsessions— активные сессииdivisions— организационные единицы для управления правами
- Кэш — Redis для хранения сессий и горячих данных пользователей.
4. Bucket Manager:
- Основной сервис для операций с файлами (upload, download, delete, metadata).
- База данных:
bucket— информация о бакетах (имя, владелец, настройки, ACL)file— метаданные файлов (filename, storage_id, size, content_type, created_at)block_file— информация о блоках для дедупликации (hash, storage_id, ref_count)
- Кэш — Redis для горячих метаданных и горячих файлов (small file cache).
- Шардирование — по
(bucket_name + key)для равномерного распределения нагрузки.
5. List Objects Service:
- Отдельный сервис для агрегированных запросов списков файлов.
- Оптимизирован для тяжёлых запросов с фильтрацией, пагинацией, сортировкой.
- Может использовать отдельную read-replica или Elasticsearch для полнотекстового поиска.
6. Blob Storage (File Storage):
- Распределённое файловое хранилище для самих данных.
- Репликация с фактором 3, erasure coding для экономии места.
- Content-addressable: файл хранится по хэшу содержимого.
7. Шина данных (Kafka):
- Outbox-паттерн — при удалении файла из метаданных событие публикуется в Kafka.
- Consumer — асинхронно обрабатывает события: физическое удаление из storage, обновление кэшей, аудит.
- Обеспечивает eventual consistency между метаданными и storage.
Схема взаимодействия:
Client → CDN (для чтения публичного контента)
↓
Load Balancer → API Gateway → User Service (auth)
→ Bucket Manager (CRUD файлов)
→ List Objects Service (списки)
↓
Blob Storage (File Storage) ← Kafka (async cleanup)
Ключевые паттерны:
- CQRS — разделение на сервис записи (Bucket Manager) и сервис чтения списков (List Objects Service).
- Outbox — гарантированная доставка событий об удалении.
- Content-addressable storage — дедупликация через хэш содержимого.
- Sharding — шардирование метаданных по
(bucket_name + key)для равномерной нагрузки.
Вопрос 8. Что произойдёт, если Bucket Manager упадёт после того, как blob storage подтвердил сохранение файла, но до записи метаданных в базу? Как решить эту проблему?
Таймкод: 00:46:21
Ответ собеседника: Неполный. Признаётся, что в storage появится «мусор» — файл без ссылки в метаданных, при этом клиент также не получит подтверждения загрузки. Предлагается модифицировать интерфейс storage, добавить ручку POST для начала загрузки, GET для проверки статуса, DELETE для отмены. Также предлагается хранить в базе информацию о прогрессе записи и использовать таймаут или ping-механизм в Redis. Однако предложенное решение с жёстким таймаутом вызывает обоснованные сомнения в корректности. Интервьюер далее подсказал Outbox-паттерн как правильный подход.
Правильный ответ:
Проблема рассинхрона между storage и метаданными:
При падении Bucket Manager после сохранения в storage, но до записи метаданных:
- В storage остаётся «осиротевший» файл (orphan data), на который нет ссылки из метаданных.
- Клиент не получил подтверждения загрузки и может повторить загрузку.
- Со временем storage засоряется мусором.
Решение через Outbox-паттерн (рекомендуемый подход):
Идея: запись метаданных и публикация события в outbox выполняются в одной транзакции с БД. Даже если сервис упадёт сразу после коммита, outbox-процессор доставит событие.
package service
import (
"context"
"database/sql"
"encoding/json"
"fmt"
)
type BucketManager struct {
db *sql.DB
storage BlobStorage
kafka KafkaProducer
}
type FileMetadata struct {
Filename string
BucketName string
StorageID string
Size int64
ContentType string
}
type OutboxEvent struct {
ID string
Type string
Payload []byte
}
func (bm *BucketManager) Upload(ctx context.Context, meta FileMetadata, data []byte) error {
// Шаг 1: Загружаем файл в storage
if err := bm.storage.Put(ctx, meta.StorageID, data); err != nil {
return fmt.Errorf("storage put: %w", err)
}
// Шаг 2: Записываем метаданные И outbox-событие в одной транзакции
tx, err := bm.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
// Запись метаданных
_, err = tx.ExecContext(ctx,
`INSERT INTO file (bucket_name, filename, storage_id, size, content_type)
VALUES ($1, $2, $3, $4, $5)`,
meta.BucketName, meta.Filename, meta.StorageID, meta.Size, meta.ContentType)
if err != nil {
return fmt.Errorf("insert metadata: %w", err)
}
// Запись в outbox (для последующей обработки, например, индексации)
eventPayload, _ := json.Marshal(map[string]string{
"storage_id": meta.StorageID,
"action": "file_uploaded",
})
_, err = tx.ExecContext(ctx,
`INSERT INTO outbox (id, type, payload, created_at)
VALUES (gen_random_uuid(), 'file_uploaded', $1, NOW())`,
eventPayload)
if err != nil {
return fmt.Errorf("insert outbox: %w", err)
}
// Коммит транзакции — атомарно сохраняются и метаданные, и outbox
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit: %w", err)
}
return nil
}
// OutboxProcessor — фоновая горутина, читает из outbox и публикует в Kafka
func (bm *BucketManager) RunOutboxProcessor(ctx context.Context) {
for {
events, err := bm.pollOutbox(ctx)
if err != nil {
continue
}
for _, event := range events {
if err := bm.kafka.Publish(ctx, event.Type, event.Payload); err != nil {
continue // retry later
}
bm.markOutboxProcessed(ctx, event.ID)
}
}
}
Дополнительные механизмы:
А. Two-phase commit через storage (альтернатива):
Сначала зарезервировать место в storage с временным статусом, затем записать метаданные, затем подтвердить storage.
// Шаг 1: Загружаем с временным статусом "pending"
storageID := generateStorageID(data)
bm.storage.PutWithStatus(ctx, storageID, data, "pending")
// Шаг 2: Записываем метаданные в БД
bm.db.InsertMetadata(ctx, meta)
// Шаг 3: Подтверждаем в storage
bm.storage.Confirm(ctx, storageID)
// Фоновый garbage collector удаляет "pending" файлы старше N минут
Б. Idempotency key:
Клиент отправляет уникальный Idempotency-Key при загрузке. Сервер проверяет, была ли уже обработка с таким ключом.
func (bm *BucketManager) Upload(ctx context.Context, meta FileMetadata, data []byte, idempotencyKey string) error {
// Проверяем, не обрабатывали ли уже этот запрос
if processed, _ := bm.isProcessed(ctx, idempotencyKey); processed {
return nil // уже обработан, просто возвращаем успех
}
// ... основная логика загрузки ...
// Помечаем ключ как обработанный
bm.markProcessed(ctx, idempotencyKey)
}
В. Garbage Collector:
Фоновый процесс, который периодически сканирует storage и удаляет файлы без ссылок из метаданных:
func (bm *BucketManager) RunGC(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
orphans, _ := bm.findOrphanedFiles(ctx)
for _, orphan := range orphans {
bm.storage.Delete(ctx, orphan)
}
}
}
Итог: Outbox-паттерн — основной механизм обеспечения consistency. Garbage collector — страховка от orphan data. Idempotency key — защита от двойной обработки при retry.
Вопрос 9. Как обрабатывается операция Delete Object? Что произойдёт, если Bucket Manager упадёт после пометки файла как удалённого, но до фактического удаления из storage?
Таймкод: 00:54:54
Ответ собеседника: Неполный. При Delete Object запрос проходит через балансировку и Gateway в Bucket Manager, который проверяет таблицу file: если на block_file ссылается только одна запись — удаляет запись из file и отправляет запрос на удаление в blob storage; если ссылок больше — удаляет только запись из file. Для устойчивости предлагается сначала помечать файл статусом deleted в транзакции, отвечать пользователю «ок», а затем выполнять фактическое удаление. Если удаление из storage не удалось — предлагается фоновый процесс, периодически сканирующий базу. Интервьюер подсказал Outbox-паттерн: в рамках транзакции записывается событие в таблицу-аутбокс, воркер читает его и кладёт в Kafka, откуда любой Bucket Manager может забрать задачу на удаление из storage.
Правильный ответ:
Полный флоу Delete Object:
Шаг 1: Проверка прав и получение метаданных
func (bm *BucketManager) DeleteObject(ctx context.Context, bucketName, filename string) error {
// Проверка прав доступа
if err := bm.auth.CheckPermission(ctx, bucketName, "delete"); err != nil {
return err
}
// Получаем storage_id из метаданных
file, err := bm.getMetadata(ctx, bucketName, filename)
if err != nil {
return fmt.Errorf("get metadata: %w", err)
}
if file == nil {
return ErrNotFound
}
Шаг 2: Атомарная транзакция — удаление метаданных + outbox-событие
tx, err := bm.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
// Удаляем запись из file
_, err = tx.ExecContext(ctx,
`DELETE FROM file WHERE bucket_name = $1 AND filename = $2`,
bucketName, filename)
if err != nil {
return fmt.Errorf("delete file metadata: %w", err)
}
// Уменьшаем ref_count в block_file
var refs int
err = tx.QueryRowContext(ctx,
`UPDATE block_file SET ref_count = ref_count - 1
WHERE storage_id = $1 RETURNING ref_count`,
file.StorageID).Scan(&refs)
if err != nil {
return fmt.Errorf("decrement ref_count: %w", err)
}
// Если больше нет ссылок — создаём outbox-событие на физическое удаление
if refs <= 0 {
// Удаляем block_file
_, err = tx.ExecContext(ctx,
`DELETE FROM block_file WHERE storage_id = $1`, file.StorageID)
if err != nil {
return fmt.Errorf("delete block_file: %w", err)
}
// Пишем в outbox — это гарантирует, что удаление из storage произойдёт
eventPayload, _ := json.Marshal(map[string]string{
"storage_id": file.StorageID,
"action": "delete_from_storage",
})
_, err = tx.ExecContext(ctx,
`INSERT INTO outbox (id, type, payload, status, created_at)
VALUES (gen_random_uuid(), 'storage_delete', $1, 'pending', NOW())`,
eventPayload)
if err != nil {
return fmt.Errorf("insert outbox: %w", err)
}
}
// Коммит — атомарно: метаданные удалены + outbox-событие записано
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit: %w", err)
}
// Инвалидируем кэш
bm.cache.Delete(ctx, cacheKey(bucketName, filename))
return nil
}
Шаг 3: Outbox Processor → Kafka → Storage Worker
// OutboxProcessor читает pending-события и публикует в Kafka
func (bm *BucketManager) RunOutboxProcessor(ctx context.Context) {
for {
rows, _ := bm.db.QueryContext(ctx,
`SELECT id, payload FROM outbox
WHERE status = 'pending' AND type = 'storage_delete'
LIMIT 100`)
for rows.Next() {
var id string
var payload []byte
rows.Scan(&id, &payload)
// Публикуем в Kafka
if err := bm.kafka.Publish(ctx, "storage-deletes", payload); err == nil {
// Помечаем как processed
bm.db.ExecContext(ctx,
`UPDATE outbox SET status = 'processed', processed_at = NOW()
WHERE id = $1`, id)
}
}
time.Sleep(100 * time.Millisecond)
}
}
// Storage Worker — конsumer из Kafka, удаляет из blob storage
func RunStorageWorker(ctx context.Context, kafka KafkaConsumer, storage BlobStorage) {
for msg := range kafka.Consume(ctx, "storage-deletes") {
var event struct {
StorageID string `json:"storage_id"`
Action string `json:"action"`
}
json.Unmarshal(msg.Value, &event)
// Идемпотентное удаление — повторное удаление того же файла безопасно
if err := storage.Delete(ctx, event.StorageID); err != nil {
// Retry через exponential backoff
continue
}
msg.Ack()
}
}
Что происходит при сбоях:
| Сценарий | Состояние |
|---|---|
| Падение до коммита транзакции | Метаданные НЕ удалены, outbox НЕ записан → файл на месте, клиент retry |
| Падение после коммита, до outbox processor | Метаданные удалены, outbox записан → клиент получил успех, outbox processor обработает позже |
| Падение после Kafka publish, до storage delete | Kafka гарантирует delivery → storage worker обработает при следующем старте |
| Storage недоступен при delete | Retry через Kafka → eventual deletion |
Ключевые принципы:
- Атомарность: удаление метаданных и запись в outbox — одна транзакция с БД.
- Идемпотентность: повторное удаление из storage безопасно (storage.Delete — идемпотентная операция).
- Eventual consistency: физическое удаление из storage может произойти с задержкой, но гарантированно произойдёт.
- Reference counting: файл удаляется из storage только когда ref_count достигает 0 — другие файлы с тем же содержимым продолжают работать.
Вопрос 10. Какие метрики нужно добавить в систему для мониторинга её работоспособности?
Таймкод: 01:01:48
Ответ собеседника: Неполный. Стандартные метрики: rps по каждому типу запроса, время загрузки файла, количество ошибок, успешных сессий. Нестандартные: размер загружаемых файлов, количество фактически созданных block_file, количество бакетов, соотношение фактически загруженных файлов к логически хранящимся. Основной фокус на мониторинг взаимодействия Bucket Manager и blob storage. Интервьюер дополнил: нужны метрики success rate, количество созданных/удалённых файлов, размер очереди на воркере, а также метрики базы данных.
Правильный ответ:
Полный набор метрик для мониторинга S3-подобной системы:
1. Application-level метрики (RED-метод):
| Метрика | Описание | Тип |
|---|---|---|
requests_total | RPS по каждому типу запроса (PUT, GET, DELETE, LIST) | Counter |
request_duration_seconds | Latency запросов (p50, p95, p99) | Histogram |
errors_total | Количество ошибок по типам (4xx, 5xx, timeout) | Counter |
success_rate | Доля успешных запросов (2xx / total) | Gauge |
active_uploads | Текущие активные multipart-загрузки | Gauge |
2. Storage-специфичные метрики:
| Метрика | Описание | Тип |
|---|---|---|
files_created_total | Количество созданных файлов | Counter |
files_deleted_total | Количество удалённых файлов | Counter |
files_size_bytes | Размер загружаемых файлов (distribution) | Histogram |
block_files_created_total | Количество фактически созданных block_file (дедупликация) | Counter |
dedup_ratio | Соотношение логических файлов к физическим (files / block_files) | Gauge |
orphan_files_total | Количество осиротевших файлов в storage | Gauge |
storage_used_bytes | Использованное место в blob storage | Gauge |
storage_capacity_bytes | Общая ёмкость storage | Gauge |
3. Outbox и Kafka метрики:
| Метрика | Описание | Тип |
|---|---|---|
outbox_queue_size | Размер очереди необработанных outbox-событий | Gauge |
outbox_processing_duration | Время обработки outbox-событий | Histogram |
kafka_consumer_lag | Отставание consumer'а от producer'а в Kafka | Gauge |
worker_queue_size | Размер очереди задач на storage worker | Gauge |
storage_delete_retries_total | Количество retry при удалении из storage | Counter |
4. Метрики базы данных:
| Метрика | Описание | Тип |
|---|---|---|
db_query_duration_seconds | Время выполнения запросов (p50, p95, p99) | Histogram |
db_connections_active | Активные соединения с БД | Gauge |
db_connections_idle | Idle-соединения в пуле | Gauge |
db_disk_usage_percent | Заполнение диска БД | Gauge |
db_replication_lag_seconds | Отставание реплики от master | Gauge |
db_deadlocks_total | Количество дедлоков | Counter |
5. Метрики кэша (Redis):
| Метрика | Описание | Тип |
|---|---|---|
cache_hit_ratio | Доля попаданий в кэш | Gauge |
cache_evictions_total | Количество вытеснённых записей | Counter |
cache_memory_usage_bytes | Использование памяти Redis | Gauge |
Пример реализации метрик на Go (Prometheus):
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
RequestsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "s3_requests_total",
Help: "Total number of requests",
},
[]string{"method", "operation", "status"},
)
RequestDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "s3_request_duration_seconds",
Help: "Request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"method", "operation"},
)
FilesCreatedTotal = promauto.NewCounter(
prometheus.CounterOpts{
Name: "s3_files_created_total",
Help: "Total number of created files",
},
)
DedupRatio = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "s3_dedup_ratio",
Help: "Ratio of logical files to physical blocks",
},
)
OutboxQueueSize = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "s3_outbox_queue_size",
Help: "Number of pending outbox events",
},
)
StorageDeleteRetries = promauto.NewCounter(
prometheus.CounterOpts{
Name: "s3_storage_delete_retries_total",
Help: "Total storage delete retries",
},
)
)
// Использование в коде:
func (bm *BucketManager) Upload(ctx context.Context, meta FileMetadata, data []byte) error {
start := time.Now()
defer func() {
RequestDuration.WithLabelValues("PUT", "upload").Observe(time.Since(start).Seconds())
}()
// ... логика загрузки ...
RequestsTotal.WithLabelValues("PUT", "upload", "200").Inc()
FilesCreatedTotal.Inc()
}
Ключевые алерты:
success_rate < 99.9%— деградация сервисаp99 latency > 5s— проблемы с производительностьюoutbox_queue_size > 10000— outbox processor не справляетсяkafka_consumer_lag > 1000— storage worker отстаётdb_disk_usage_percent > 80%— нужна очистка или масштабированиеorphan_files_total > 0— проблемы с консистентностьюcache_hit_ratio < 80%— кэш неэффективен
Вопрос 11. Можно ли идентифицировать объект по UUID вместо пары bucket+key? Можно ли шардировать Bucket Manager по User ID?
Таймкод: 01:23:09
Ответ собеседника: Неполный. По вопросу UUID: формально можно, но возникает проблема получения списка файлов по бакету, по префиксу, по времени жизни — это будут сложные операции при UUID-идентификации. Поэтому для полноценного S3-интерфейса это не подходит. По вопросу шардирования по User ID: формально возможно, но проблема в том, что запросы могут приходить извне системы без знания User ID, а также разные пользователи могут иметь доступ к одним и тем же бакетам. Интервьюер подтвердил, что шардирование по ключу (bucket+key) предпочтительнее — оно даёт более равномерную нагрузку и лучшую отказоустойчивость. Для операции List Objects предлагается отдельный сервис, подписанный на шину событий.
Правильный ответ:
Идентификация по UUID vs bucket+key:
UUID — технически возможно, но не подходит для S3-интерфейса:
S3 API требует операций, которые невозможны с чистым UUID:
ListObjects(bucket, prefix)— получить все файлы в бакете с префиксомListBuckets— список бакетов пользователяGetBucketLifecycle— настройки жизненного цикла бакетаCopyObject— копирование между бакетами
С UUID для этих операций потребуются дополнительные индексы, фактически дублирующие структуру bucket+key.
Bucket+key — нативная идентификация для S3:
-- Естественная структура для S3-операций
CREATE TABLE file (
bucket_name VARCHAR(255),
key VARCHAR(1024),
storage_id VARCHAR(64),
size BIGINT,
created_at TIMESTAMP,
PRIMARY KEY (bucket_name, key)
);
-- ListObjects с префиксом — простой и быстрый запрос
SELECT * FROM file
WHERE bucket_name = 'my-bucket' AND key LIKE 'photos/2024/%'
ORDER BY key
LIMIT 1000;
Шардирование: User ID vs bucket+key:
Шардирование по User ID — проблемы:
| Проблема | Описание |
|---|---|
| Внешний доступ | S3 API принимает запросы по bucket+key, User ID неизвестен до аутентификации |
| Shared buckets | Несколько пользователей могут иметь доступ к одному бакету |
| Hot spots | Пользователи с большими бакетами создают неравномерную нагрузку |
| Cross-shard queries | ListObjects для shared bucket'а требует scatter-gather по шардам |
Шардирование по (bucket_name + key) — предпочтительный подход:
func getShard(bucketName, key string, totalShards int) int {
// Хэшируем пару bucket+key для определения шарда
h := fnv.New32a()
h.Write([]byte(bucketName + "/" + key))
return int(h.Sum32()) % totalShards
}
Преимущества шардирования по ключу:
- Равномерная нагрузка: ключи распределены случайно, нет hot spots.
- Локальность данных: все операции с конкретным файлом идут на один шард.
- Масштабируемость: добавление шардов через consistent hashing.
- Отказоустойчивость: падение шарда затрагивает только подмножество ключей, не весь бакет.
Для List Objects — отдельный сервис:
// ListObjectsService — отдельный сервис с read-optimized хранилищем
type ListObjectsService struct {
// Подписан на Kafka-события: file_created, file_deleted
// Поддерживает материализованное представление для быстрых запросов
index SearchIndex // Elasticsearch / ScyllaDB / denormalized table
}
func (s *ListObjectsService) ListObjects(ctx context.Context, bucket, prefix string, maxKeys int) ([]Object, error) {
// Быстрый запрос к индексу без нагрузки на основную БД метаданных
return s.index.Search(ctx, bucket, prefix, maxKeys)
}
// Kafka consumer для обновления индекса
func (s *ListObjectsService) HandleEvent(ctx context.Context, event FileEvent) {
switch event.Type {
case "file_created":
s.index.Index(ctx, event.Bucket, event.Key, event.Metadata)
case "file_deleted":
s.index.Delete(ctx, event.Bucket, event.Key)
}
}
Итоговая схема шардирования:
Client → API Gateway → Bucket Manager (sharded by bucket+key)
→ List Objects Service (separate, read-optimized)
→ User Service (by user_id, for auth only)
- Bucket Manager: шардирование по
(bucket_name + key)— равномерная нагрузка, локальность данных. - User Service: шардирование по
user_id— для аутентификации и управления правами. - List Objects Service: отдельный сервис с собственным индексом, подписанный на Kafka-события для поддержания актуальности данных.
