Собеседование на Backend АРХИТЕКТОРА (Java, Go, Node, Python, C++)
Сегодня мы разберём техническое собеседование с кандидатом на позицию архитектора, в ходе которого глубоко обсуждаются распределённые системы, выбор технологического стека и практические кейсы из реального опыта. Кандидемонстрирует сильное понимание микросервисной архитектуры, паттернов обеспечения консистентности (саги, транзакционные очереди событий), а также аргументированно рассуждает о компромиссах между производительностью, надёжностью и скоростью разработки при выборе языков и инструментов. Особое внимание уделяется разбору конкретных инцидентов — например, потере событий в системе на базе Kafka — и поиску решений для упорядочивания операций в условиях мультиинстансной архитектуры.
Вопрос 1. Какова основная задача архитектора?
Таймкод: 00:00:26
Ответ собеседника: Правильный. Основная задача архитектора — найти наилучшее решение для запроса от продукта. Критерии: время, деньги, наличие разработчиков, соответствие текущему стеку, скорость реализации. По сути — максимально быстро, просто и надёжно реализовать хотелки продукта на существующем стеке компании.
Правильный ответ:
Основная задача архитектора — это принятие ключевых технических решений, которые определяют структуру системы и обеспечивают баланс между бизнес-требованиями и техническими ограничениями.
Ключевые аспекты роли архитектора:
1. Анализ требований и ограничений
Архитектор работает с бизнес-запросами и трансформирует их в технические решения. Критерии принятия решений включают:
- Время реализации (time-to-market)
- Бюджет и стоимость владения (TCO)
- Наличие компетенций в команде
- Совместимость с существующим стеком технологий
- Простота и надёжность решения
2. Проектирование системы
Архитектор определяет:
- Выбор технологий и фреймворков
- Структуру сервисов и их взаимодействие
- Схемы данных и потоков информации
- Стратегии масштабирования и отказоустойчивости
3. Коммуникация и согласование
Архитектор выступает связующим звеном между продуктовой и технической сторонами, обеспечивая понимание решений всеми участниками процесса.
4. Оценка компромиссов
Каждое архитектурное решение — это компромисс. Архитектор должен чётко понимать и доносить последствия выбранного подхода, включая технический долг и ограничения будущих изменений.
Ответ собеседника точно отражает суть: архитектор находит оптимальное решение в рамках заданных ограничений, стремясь к быстрой, простой и надёжной реализации бизнес-требований на существующем стеке компании.
Вопрос 2. Какой язык или инструмент наиболее близок к концепции «серебряной пули»?
Таймкод: 00:02:00
Ответ собеседника: Правильный. Считает, что Rust наиболее близок к серебряной пуле. Он быстрый как C++, но без сборщика мусора благодаря модели владения (borrowing). Компилятор проверяет много вещей, снижая вероятность ошибок в рантайме. Rust позволяет писать и системный код, и веб-сервисы через Tokio с асинхронным Event loop. Однако если ориентироваться на порог входа и лёгкость поиска разработчиков, то Go предпочтительнее благодаря нативной конкурентности.
Правильный ответ:
Концепция «серебряной пули» в программировании подразумевает универсальный инструмент, который оптимален для широкого круга задач. Автор этого термина, Фредерик Брукс, в эссе «No Silver Bullet» (1986) утверждал, что такого инструмента не существует и не может существовать.
Rust как претендент на универсальность
Rust действительно приближается к этой концепции благодаря уникальной комбинации свойств:
- Производительность уровня C++ без сборщика мусора
- Безопасность памяти через систему владения (ownership) и заимствования (borrowing)
- Строгая проверка на этапе компиляции, исключающая целый класс ошибок в рантайме
- Применимость для системного программирования, веб-сервисов (через Tokio), WebAssembly, embedded-систем
Go как прагматичный выбор
Go выигрывает в других аспектах:
- Нативная поддержка конкурентности через горутины и каналы
- Низкий порог входа и быстрое обучение
- Простота и читаемость кода
- Большой пул разработчиков на рынке
- Отличная стандартная библиотека для сетевых сервисов
Почему «серебряной пули» не существует
Каждый язык — это набор компромиссов:
- Rust: высокий порог входа, длительное время компиляции, сложность с borrow checker
- Go: отсутствие дженериков (до версии 1.18), ограниченный контроль над памятью, GC-паузы
- Java/C#: зависимость от JVM, высокое потребление памяти
- Python/JS: низкая производительность, динамическая типизация
Выбор инструмента всегда зависит от контекста: требований к производительности, компетенций команды, сроков разработки и специфики задачи. Именно поэтому опытный архитектор выбирает инструмент под конкретную задачу, а не ищет универсальное решение.
Вопрос 3. gRPC лучше REST? Какой протокол интеграции предпочтительнее с точки зрения продукта?
Таймкод: 00:07:45
Ответ собеседника: Правильный. С точки зрения разработчика gRPC интереснее, но с точки зрения продукта и внешних интеграций REST предпочтительнее. Использование gRPC для внешних интеграций может вызвать сложности у сторонних разработчиков, увеличить время интеграции и даже привести к смене вендора. REST — более универсальный и понятный для внешних партнёров вариант.
Правильный ответ:
Выбор между gRPC и REST — это классический пример компромисса между техническим совершенством и продуктовой целесообразностью. Универсального ответа нет, но есть чёткие критерии выбора.
Сравнение подходов
REST (HTTP/JSON)
- Универсальность: поддерживается любым языком и платформой
- Читаемость: JSON человекочитаем, легко дебажить через curl, Postman
- Экосистема: огромное количество инструментов, документации, специалистов
- Кеширование: нативная поддержка HTTP-кеширования
- Минусы: избыточность данных, отсутствие строгой типизации, ручная валидация
gRPC (HTTP/2 + Protobuf)
- Производительность: бинарный протокол, меньше трафика, быстрее сериализация
- Типизация: строгие контракты через .proto файлы
- Генерация кода: автоматическая генерация клиентов для множества языков
- Стриминг: нативная поддержка двунаправленных потоков
- Минусы: сложнее дебажить, требует HTTP/2, меньше специалистов на рынке
Рекомендации по выбору
Используйте REST для:
- Публичных API и внешних интеграций
- Случаев, когда важна простота интеграции для партнёров
- Проектов с большим количеством внешних потребителей
Используйте gRPC для:
- Внутренних сервисов (microservices communication)
- Высоконагруженных систем с критичной производительностью
- Систем с двунаправленным стримингом
- Команд, где уже есть экспертиза в gRPC
Гибридный подход
Многие компании используют оба подхода одновременно:
- gRPC для внутренней коммуникации между сервисами
- REST API Gateway для внешних потребителей
- gRPC-Gateway для автоматической генерации REST-эндпоинтов из .proto файлов
Пример гибридной архитектектуры:
Внешние клиенты → REST API Gateway → gRPC микросервисы
Такой подход позволяет получить преимущества обоих миров: производительность gRPC внутри и простоту REST для внешних интеграций.
Ответ собеседника абсолютно верен: с продуктовой точки зрения REST предпочтителен для внешних интеграций из-за универсальности и низкого порога входа для партнёров.
Вопрос 4. Как использовались транзакционные очереди (Transactional Outbox), почему от них избавлялись и как реализовывался паттерн Saga?
Таймкод: 00:09:17
Ответ собеседника: Правильный. Transactional Outbox был добавлен по запросу бизнеса, когда события терялись — в одном сервисе обрабатывались, в другом не доходили. Реализация: отдельная таблица в БД, куда в той же транзакции, что и обновление основной сущности, записывались события. Далее отдельный процесс считывал и отправлял в Kafka. После внедрения сильно упал перформанс, решение было частично откачено, найден баланс. Паттерн Saga реализовывался через оркестрацию. Уровень контекста — зона ответственности CTO, детальные диаграммы и код прорабатывались самостоятельно.
Правильный ответ:
Transactional Outbox
Паттерн Transactional Outbox решает проблему атомарности при обновлении данных и публикации событий в распределённых системах.
Проблема:
Без этого паттерна возникает классическая ситуация «двойной записи»:
- Сервис обновляет данные в БД
- Сервис отправляет событие в брокер сообщений
Между этими шагами может произойти сбой: БД обновилась, но событие не отправилось (или наоборот). Это приводит к рассинхронизации данных между сервисами.
Реализация:
-- Таблица основных сущностей
CREATE TABLE orders (
id UUID PRIMARY KEY,
status VARCHAR(50),
created_at TIMESTAMP
);
-- Таблица Outbox
CREATE TABLE outbox_events (
id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
published BOOLEAN DEFAULT FALSE
);
func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 1. Сохраняем заказ
_, err = tx.ExecContext(ctx,
"INSERT INTO orders (id, status, created_at) VALUES ($1, $2, $3)",
order.ID, order.Status, time.Now())
if err != nil {
return err
}
// 2. Записываем событие в outbox в той же транзакции
payload, _ := json.Marshal(order)
_, err = tx.ExecContext(ctx,
`INSERT INTO outbox_events (id, aggregate_id, aggregate_type, event_type, payload)
VALUES ($1, $2, $3, $4, $5)`,
uuid.New(), order.ID, "order", "order.created", payload)
if err != nil {
return err
}
return tx.Commit()
}
Процесс публикации (Polling Publisher):
func (p *OutboxPublisher) Start(ctx context.Context) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
p.publishEvents(ctx)
}
}
}
func (p *OutboxPublisher) publishEvents(ctx context.Context) {
rows, err := p.db.QueryContext(ctx,
`SELECT id, aggregate_id, event_type, payload
FROM outbox_events
WHERE published = FALSE
ORDER BY created_at
LIMIT 100`)
if err != nil {
log.Printf("failed to query outbox: %v", err)
return
}
defer rows.Close()
for rows.Next() {
var event OutboxEvent
if err := rows.Scan(&event.ID, &event.AggregateID, &event.EventType, &event.Payload); err != nil {
continue
}
// Отправляем в Kafka
if err := p.producer.Send(event); err != nil {
log.Printf("failed to send event: %v", err)
continue
}
// Помечаем как опубликованное
p.db.ExecContext(ctx,
"UPDATE outbox_events SET published = TRUE WHERE id = $1",
event.ID)
}
}
Проблемы производительности:
- Дополнительная таблица увеличивает нагрузку на БД
- Polling создаёт постоянную нагрузку на базу
- Необходимость очистки старых записей
- Задержка между записью и публикацией события
Альтернативы:
- CDC (Change Data Capture): Debezium читает WAL (Write-Ahead Log) базы данных без polling
- Transactional Messaging: некоторые брокеры поддерживают транзакции (например, Kafka Transactions)
Паттерн Saga
Saga — паттерн управления распределёнными транзакциями через последовательность локальных транзакций с компенсирующими действиями.
Два подхода:
1. Хореограгия (Choreography)
Каждый сервис слушает события и сам решает, что делать дальше.
// Order Service
func (s *OrderService) HandlePaymentCompleted(ctx context.Context, event PaymentCompleted) error {
return s.orderRepo.UpdateStatus(ctx, event.OrderID, "confirmed")
}
// Payment Service
func (s *PaymentService) HandleOrderCancelled(ctx context.Context, event OrderCancelled) error {
return s.refundPayment(ctx, event.OrderID)
}
2. Оркестрация (Orchestration)
Центральный оркестратор управляет последовательностью шагов.
type SagaOrchestrator struct {
orderService OrderService
paymentService PaymentService
inventoryService InventoryService
}
func (o *SagaOrchestrator) CreateOrderSaga(ctx context.Context, order Order) error {
// Шаг 1: Создать заказ
if err := o.orderService.Create(ctx, order); err != nil {
return fmt.Errorf("create order failed: %w", err)
}
// Шаг 2: Забронировать товар
if err := o.inventoryService.Reserve(ctx, order.Items); err != nil {
// Компенсация: отменить заказ
o.orderService.Cancel(ctx, order.ID)
return fmt.Errorf("reserve inventory failed: %w", err)
}
// Шаг 3: Провести оплату
if err := o.paymentService.Charge(ctx, order.PaymentInfo); err != nil {
// Компенсации
o.inventoryService.Release(ctx, order.Items)
o.orderService.Cancel(ctx, order.ID)
return fmt.Errorf("payment failed: %w", err)
}
// Шаг 4: Подтвердить заказ
return o.orderService.Confirm(ctx, order.ID)
}
Ответ собеседника демонстрирует практический опыт: внедрение Transactional Outbox было вызвано реальной проблемой потери событий, но привело к деградации производительности, что потребовало поиска баланса. Это типичная ситуация — идеальное решение на бумаге не всегда оптимально в продакшене.
Вопрос 5. Как реализовывался паттерн Saga и какие инструменты использовались?
Таймкод: 00:10:55
Ответ собеседника: Правильный. Использовал паттерн Saga для управления распределёнными транзакциями. В качестве инструмента использовалась Kafka с транзакционными возможностями (Transactional Outbox). Кастомная реализация на уровне кода и контейнеров, без использования верхнеуровневых коробочных решений. Уровень контекста (бизнес-контекст) в основном был зоной ответственности CTO.
Правильный ответ:
Данный вопрос является уточняющим к предыдущему, где кандидат уже описал реализацию Saga через оркестрацию. Дополним ответ деталями о транзакционных возможностях Kafka.
Транзакционные возможности Kafka
Kafka поддерживает транзакции начиная с версии 0.11, что позволяет реализовать атомарную запись в несколько партиций и exactly-once semantics.
Пример транзакционного producer в Go:
import "github.com/segmentio/kafka-go"
func createTransactionalProducer(brokers []string, txID string) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(brokers...),
Balancer: &kafka.LeastBytes{},
Transport: &kafka.Transport{
SASL: mechanism,
},
// Транзакционные настройки
AllowAutoTopicCreation: false,
}
}
func (s *OrderService) publishOrderEvent(ctx context.Context, event OrderEvent) error {
// Используем kafka-go с поддержкой транзакций
// или нативный sarama с транзакционным producer
return s.kafkaWriter.WriteMessages(ctx,
kafka.Message{
Topic: "order-events",
Key: []byte(event.OrderID),
Value: event.Payload,
Headers: []kafka.Header{
{Key: "event-type", Value: []byte(event.Type)},
},
},
)
}
С Sarama (полная транзакционная поддержка):
import "github.com/Shopify/sarama"
func (s *OrderService) processWithTransaction(ctx context.Context, order Order) error {
// Транзакционный producer
txnProducer, err := sarama.NewAsyncProducer(s.brokers, s.config)
if err != nil {
return err
}
// Начинаем транзакцию
if err := txnProducer.BeginTxn(); err != nil {
return err
}
// Отправляем сообщения в рамках транзакции
for _, msg := range s.buildMessages(order) {
txnProducer.Input() <- msg
}
// Фиксируем транзакцию
if err := txnProducer.CommitTxn(); err != nil {
txnProducer.AbortTxn()
return fmt.Errorf("transaction failed: %w", err)
}
return nil
}
Кастомная реализация vs коробочные решения
Кандидат упоминает кастомную реализацию без использования готовых фреймворков. Это имеет свои плюсы и минусы:
Плюсы кастомной реализации:
- Полный контроль над логикой
- Отсутствие зависимости от сторонних библиотек
- Возможность точной настройки под бизнес-требования
Минусы:
- Необходимость самостоятельно обрабатывать edge cases
- Сложность отладки и мониторинга
- Отсутствие community support
Коробочные решения для Saga в Go:
- Temporal — workflow engine с поддержкой Saga
- Cadence — аналог Temporal от Uber
- dtm — distributed transaction manager
- masstransit — библиотека для message-based приложений
Распределение ответственности
Упоминание о том, что бизнес-контекст — зона ответственности CTO, а техническая реализация — зона ответственности разработчика, показывает понимание разделения зон ответственности в команде. Это важный аспект зрелой инженерной культуры.
Вопрос 6. Какой опыт работы с Legacy-кодом и переписыванием проектов?
Таймкод: 00:13:40
Ответ собеседника: Правильный. В части Go около 70% работы приходилось на новый код и 30% на Legacy. Был проект по переводу сервиса с PHP на Go. Работа с Legacy не пугает, к этому привык.
Правильный ответ:
Работа с Legacy-кодом и миграция между технологиями — неизбежная часть карьеры любого разработчика. Статистика показывает, что в индустрии соотношение примерно 70/30 в пользу работы с существующим кодом — это реалистичная и распространённая ситуация.
Стратегии работы с Legacy-кодом
1. Страничный рефакторинг (Strangler Fig Pattern)
Постепенная замена функциональности старой системы новой, без полного переписывания:
Клиенты → API Gateway → [Новый сервис] ↘
[Старый сервис] → Единая точка входа
2. Тестирование перед изменениями
Прежде чем менять Legacy-код, необходимо покрыть его тестами:
- Characterization tests — фиксируем текущее поведение
- Golden master — сохраняем эталонные ответы
- Approval tests — сравниваем изменения с эталоном
3. Выделение границ (Seams)
Находим точки, где можно отделить Legacy от нового кода:
- Адаптеры и фасады
- Антикоррупционный слой (Anti-Corruption Layer)
- Событийные интеграции
Миграция с PHP на Go
Типичные шаги при миграции:
Фаза 1: Анализ
- Документирование текущей функциональности
- Определение зависимостей и интеграций
- Выделение критичных бизнес-процессов
Фаза 2: Подготовка
- Настройка инфраструктуры для Go-сервисов
- Создание CI/CD pipelines
- Реализация общих библиотек и утилит
Фаза 3: Параллельная работа
- Запуск нового сервиса рядом со старым
- Синхронизация данных между системами
- Постепенное переключение трафика
Фаза 4: Завершение
- Полное переключение на новый сервис
- Мониторинг и стабилизация
- Деактивация старого сервиса
Пример антикоррупционного слоя:
// PHP Legacy Service
type PHPOrderService struct {
client *http.Client
baseURL string
}
func (s *PHPOrderService) GetOrder(ctx context.Context, id string) (*LegacyOrder, error) {
resp, err := s.client.Get(fmt.Sprintf("%s/orders/%s", s.baseURL, id))
if err != nil {
return nil, err
}
defer resp.Body.Body.Close()
var order LegacyOrder
if err := json.NewDecoder(resp.Body).Decode(&order); err != nil {
return nil, err
}
return &order, nil
}
// Anti-Corruption Layer
type OrderAdapter struct {
legacyService *PHPOrderService
}
func (a *OrderAdapter) GetOrder(ctx context.Context, id string) (*domain.Order, error) {
legacyOrder, err := a.legacyService.GetOrder(ctx, id)
if err != nil {
return nil, fmt.Errorf("legacy service error: %w", err)
}
// Преобразование из legacy-формата в новый доменный формат
return &domain.Order{
ID: legacyOrder.OrderID,
Status: mapLegacyStatus(legacyOrder.StatusCode),
Amount: money.New(legacyOrder.Total, "USD"),
}, nil
}
Ключевые принципы успешной миграции:
- Не переписывать всё сразу — мигрировать поэтапно
- Сохранять бизнес-логику, менять технологию
- Иметь возможность отката на каждом этапе
- Измерять метрики до и после миграции
- Документировать решения и trade-offs
Ответ собеседника показывает зрелый подход: комфортная работа с Legacy и опыт миграции — это ценные навыки, которые свидетельствуют о практическом опыте и гибкости.
Вопрос 7. Как именно реализовывался Transactional Outbox для гарантии доставки событий?
Таймкод: 00:19:29
Ответ собеседника: Правильный. Была создана отдельная таблица в БД, куда записывались события, связанные с пользователями. При обновлении записи в таблице пользователей в той же транзакции БД заносилась запись во вторую таблицу с событиями. За счёт атомарности транзакции достигалось то, что либо обе записи появляются, либо ни одна. Далее отдельный процесс считывал события из этой таблицы и отправлял в шину данных (Kafka). Это вариант паттерна Transactional Outbox.
Правильный ответ:
Детальная реализация Transactional Outbox с гарантией доставки событий.
Схема работы:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Application │ │ Database │ │ Kafka │
│ │ │ │ │ │
│ 1. Begin TX │────▶│ users table │ │ │
│ 2. Update user │ │ outbox table │ │ │
│ 3. Insert event│ │ │ │ │
│ 4. Commit TX │ │ │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ ▲
│ │
▼ │
┌─────────────────┐ │
│ Relay/Poller │────────────┘
│ (separate │
│ process) │
└─────────────────┘
SQL-схема:
-- Основная таблица
CREATE TABLE users (
id UUID PRIMARY KEY,
email VARCHAR(255) NOT NULL,
name VARCHAR(255),
status VARCHAR(50) DEFAULT 'active',
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Таблица Outbox
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP DEFAULT NOW(),
processed_at TIMESTAMP,
status VARCHAR(20) DEFAULT 'pending',
retry_count INT DEFAULT 0,
error TEXT
);
-- Индексы для оптимизации polling
CREATE INDEX idx_outbox_status_created ON outbox_events (status, created_at);
CREATE INDEX idx_outbox_aggregate ON outbox_events (aggregate_id, aggregate_type);
Реализация на Go:
type OutboxEvent struct {
ID uuid.UUID `db:"id"`
AggregateID uuid.UUID `db:"aggregate_id"`
AggregateType string `db:"aggregate_type"`
EventType string `db:"event_type"`
Payload json.RawMessage `db:"payload"`
Metadata json.RawMessage `db:"metadata"`
CreatedAt time.Time `db:"created_at"`
ProcessedAt *time.Time `db:"processed_at"`
Status string `db:"status"`
RetryCount int `db:"retry_count"`
Error *string `db:"error"`
}
type UserService struct {
db *sqlx.DB
kafka *kafka.Writer
}
func (s *UserService) UpdateUser(ctx context.Context, user User) error {
tx, err := s.db.BeginTxx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
defer tx.Rollback()
// 1. Обновляем пользователя
_, err = tx.ExecContext(ctx,
`UPDATE users SET name = $1, email = $2, updated_at = $3 WHERE id = $4`,
user.Name, user.Email, time.Now(), user.ID)
if err != nil {
return fmt.Errorf("update user: %w", err)
}
// 2. Создаём событие в outbox в той же транзакции
payload, _ := json.Marshal(map[string]interface{}{
"user_id": user.ID,
"name": user.Name,
"email": user.Email,
})
_, err = tx.ExecContext(ctx,
`INSERT INTO outbox_events (id, aggregate_id, aggregate_type, event_type, payload)
VALUES ($1, $2, $3, $4, $5)`,
uuid.New(), user.ID, "user", "user.updated", payload)
if err != nil {
return fmt.Errorf("insert outbox event: %w", err)
}
// 3. Фиксируем транзакцию — либо обе записи, либо ни одна
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit transaction: %w", err)
}
return nil
}
Relay/Poller для отправки событий:
type OutboxRelay struct {
db *sqlx.DB
kafka *kafka.Writer
batchSize int
interval time.Duration
}
func (r *OutboxRelay) Start(ctx context.Context) {
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.processBatch(ctx); err != nil {
log.Printf("outbox relay error: %v", err)
}
}
}
}
func (r *OutboxRelay) processBatch(ctx context.Context) error {
tx, err := r.db.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Выбираем необработанные события с блокировкой
var events []OutboxEvent
err = tx.SelectContext(ctx, &events,
`SELECT * FROM outbox_events
WHERE status = 'pending' AND retry_count < 5
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED`,
r.batchSize)
if err != nil {
return fmt.Errorf("select events: %w", err)
}
for _, event := range events {
// Отправляем в Kafka
msg := kafka.Message{
Topic: fmt.Sprintf("%s.events", event.AggregateType),
Key: []byte(event.AggregateID.String()),
Value: event.Payload,
Headers: []kafka.Header{
{Key: "event-id", Value: []byte(event.ID.String())},
{Key: "event-type", Value: []byte(event.EventType)},
},
}
if err := r.kafka.WriteMessages(ctx, msg); err != nil {
// Увеличиваем счётчик попыток
_, _ = tx.ExecContext(ctx,
`UPDATE outbox_events
SET retry_count = retry_count + 1, error = $1
WHERE id = $2`,
err.Error(), event.ID)
continue
}
// Помечаем как обработанное
now := time.Now()
_, err = tx.ExecContext(ctx,
`UPDATE outbox_events
SET status = 'processed', processed_at = $1
WHERE id = $2`,
now, event.ID)
if err != nil {
log.Printf("failed to mark event as processed: %v", err)
}
}
return tx.Commit()
}
Гарантии доставки:
- At-least-once: событие будет доставлено минимум один раз (возможны дубликаты)
- Ordering: события в рамках одного aggregate сохраняют порядок
- Durability: событие не потеряется даже при сбое приложения
Оптимизации:
SKIP LOCKEDпозволяет запускать несколько relay параллельно- Пакетная обработка снижает нагрузку на БД
- Exponential backoff для retry логики
- Dead letter queue для событий, которые не удалось доставить
Вопрос 8. Как решить проблему шардирования событий по нескольким инстансам при Transactional Outbox с сохранением порядка событий?
Таймкод: 00:21:48
Ответ собеседника: Неполный. Предложил использовать ID пользователя как ключ партиционирования, чтобы все события по одному пользователю попадали в одну партицию. Определил проблему с порядком событий (ordering) при удалении и повторном создании пользователя с тем же логином. Рассматривал добавление временной метки (timestamp) или версии агрегата в ключ, но не смог чётко сформулировать окончательное решение. Идея использовать персональные данные (возраст и т.д.) была отвергнута.
Правильный ответ:
Это классическая проблема при реализации Transactional Outbox с несколькими инстансами. Кандидат правильно определил ключевые аспекты проблемы, но решение требует более детальной проработки.
Проблема партиционирования и порядка событий
Основной конфликт:
- Необходимо распределить нагрузку между партициями (шардирование)
- Необходимо сохранить порядок событий для каждого агрегата
- Несколько инстансов relay могут обрабатывать события параллельно
Решение 1: Ключ партиционирования по Aggregate ID
Использование ID агрегата (user_id) как ключа сообщения в Kafka:
func (r *OutboxRelay) sendEvent(ctx context.Context, event OutboxEvent) error {
msg := kafka.Message{
Topic: "user-events",
Key: []byte(event.AggregateID.String()), // Ключ партиционирования
Value: event.Payload,
Headers: []kafka.Header{
{Key: "event-id", Value: []byte(event.ID.String())},
{Key: "event-type", Value: []byte(event.EventType)},
{Key: "aggregate-version", Value: []byte(fmt.Sprintf("%d", event.Version))},
},
}
return r.kafka.WriteMessages(ctx, msg)
}
Гарантии:
- Все события одного пользователя попадают в одну партицию
- Kafka гарантирует порядок сообщений внутри партиции
- Разные пользователи распределяются по разным партициям
Проблема переиспользования ID:
Кандикт правильно подметил проблему: если пользователь удаляется и создаётся заново с тем же логином, события могут перемешаться.
Решение 2: Уникальный Aggregate ID + Version
Используем UUID вместо логина как идентификатор агрегата и добавляем версию:
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), -- Уникальный ID
login VARCHAR(255) UNIQUE NOT NULL, -- Бизнес-ключ
name VARCHAR(255),
version INT DEFAULT 1, -- Версия для optimistic locking
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
version INT NOT NULL, -- Версия события
created_at TIMESTAMP DEFAULT NOW(),
status VARCHAR(20) DEFAULT 'pending'
);
Решение 3: Глобальный порядок через sequence
Добавляем глобальный sequence number в БД:
-- Последовательность для глобального порядка
CREATE SEQUENCE outbox_event_seq;
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sequence BIGINT DEFAULT nextval('outbox_event_seq'),
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
status VARCHAR(20) DEFAULT 'pending'
);
CREATE INDEX idx_outbox_sequence ON outbox_events (sequence);
Реализация с гарантией порядка:
type OutboxRelay struct {
db *sqlx.DB
kafka *kafka.Writer
batchSize int
// Каждый инстанс обрабатывает свой диапазон партиций
partition int
totalPartitions int
}
func (r *OutboxRelay) processBatch(ctx context.Context) error {
#### **Вопрос 9**. Можно ли использовать встроенные функции БД (например, now()) для получения корректного timestamp событий при высокой нагрузке?
**Таймкод:** <YouTubeSeekTo id="UAwnt7fkT08" time="00:28:03"/>
**Ответ собеседника:** **Неполный**. Кандидат предложил использовать встроенные функции БД для получения timestamp, но столкнулся с проблемой: при высокой нагрузке и множестве инстансов application-серверов порядок записей в БД может не совпадать с реальным порядком поступления запросов. Обсудил, что даже при single master возможен неправильный порядок из-за асинхронности внутренних механизмов БД (WAL, запись в лог и последующее применение данных могут быть разделены по времени). Не смог дать точный ответ о том, в какой момент вычисляется timestamp в БД и гарантирует ли он он правильный порядок.
**Правильный ответ:**
Это глубокий вопрос о временных метках и гарантиях порядка в распределённых системах. Кандидат правильно обозначил проблему, но требуется более точное понимание механизмов.
**Проблема now() в PostgreSQL**
**Когда вычисляется now():**
Функция `now()` в PostgreSQL возвращает время начала транзакции, а не время выполнения конкретного оператора:
```sql
BEGIN;
SELECT now(); -- 2024-01-15 10:00:00.000
-- ... проходит 100 мс ...
SELECT now(); -- 2024-01-15 10:00:00.000 (то же время!)
COMMIT;
Демонстрация проблемы:
-- Транзакция A начинается в 10:00:00.000
BEGIN; -- now() = 10:00:00.000
-- ... задержка 50 мс ...
INSERT INTO outbox_events (..., created_at) VALUES (..., now());
-- created_at = 10:00:00.000, хотя реальное время 10:00:00.050
-- Транзакция B начинается в 10:00:00.010
BEGIN; -- now() = 10:00:00.010
-- ... задержка 10 мс ...
INSERT INTO outbox_events (..., created_at) VALUES (..., now());
-- created_at = 10:00:00.010, хотя реальное время 10:00:00.020
-- Результат: событие B имеет МЕНЬШУЮ метку, чем A,
-- хотя было создано ПОЗЖЕ в реальности
Почему now() не гарантирует порядок:
now()фиксируется в момент BEGIN транзакции- Транзакции могут выполняться параллельно
- COMMIT может произойти в другом порядке, чем BEGIN
- WAL записи применяются асинхронно
Решения для корректного порядка
1. Использование sequence (рекомендуется)
CREATE SEQUENCE outbox_event_seq;
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sequence BIGINT NOT NULL DEFAULT nextval('outbox_event_seq'),
aggregate_id UUID NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT now(),
status VARCHAR(20) DEFAULT 'pending'
);
-- sequence гарантирует строгий порядок, независимо от времени
2. Глобальный timestamp из единого источника
// Используем timestamp из Kafka, а не из БД
func (r *OutboxRelay) sendEvent(ctx context.Context, event OutboxEvent) error {
msg := kafka.Message{
Topic: "user-events",
Key: []byte(event.AggregateID.String()),
Value: event.Payload,
Time: time.Now(), // Timestamp из application-сервера
Headers: []kafka.Header{
{Key: "sequence", Value: []byte(fmt.Sprintf("%d", event.Sequence))},
},
}
return r.kafka.WriteMessages(ctx, msg)
}
3. Hybrid Logical Clock (HLC)
Для распределённых систем с несколькими датацентрами:
type HybridLogicalClock struct {
wallTime int64
logical int64
}
func (c *HybridLogicalClock) Now() Timestamp {
wallNow := time.Now().UnixNano()
if wallNow > c.wallTime {
c.wallTime = wallNow
c.logical = 0
} else {
c.logical++
}
return Timestamp{
Physical: c.wallTime,
Logical: c.logical,
}
}
4. TrueTime (подход Google Spanner)
Google использует атомные часы и GPS для синхронизации времени между датацентрами с гарантированной погрешностью (обычно < 10 мс).
Практическая рекомендация:
-- Комбинированный подход: sequence для порядка, now() для информации
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sequence BIGINT NOT NULL DEFAULT nextval('outbox_event_seq'),
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT now(), -- Информационное поле
status VARCHAR(20) DEFAULT 'pending'
);
-- Индекс для polling по sequence (гарантирует порядок)
CREATE INDEX idx_outbox_sequence_status ON outbox_events (sequence)
WHERE status = 'pending';
func (r *OutboxRelay) processBatch(ctx context.Context) error {
var events []OutboxEvent
err := r.db.SelectContext(ctx, &events,
`SELECT * FROM outbox_events
WHERE status = 'pending'
ORDER BY sequence ASC -- Порядок гарантирован sequence
LIMIT $1
FOR UPDATE SKIP LOCKED`,
r.batchSize)
// ...
}
Вывод:
now()не гарантирует корректный порядок при высокой нагрузкеsequence— надёжный способ обеспечить строгий порядокcreated_atможно использовать как информационное поле, но не для сортировки- Для распределённых систем рассмотреть HLC или аналогичные подходы
Вопрос 10. Насколько актуально использовать Envoy и с каким стеком он применялся?
Таймкод: 00:36:56
Ответ собеседника: Правильный. Envoy использовался как gRPC Proxy. Кандидат считает, что Envoy актуален в связке с Go и gRPC, но не стал бы его использовать там, где этот стек не применяется (например, на Java-стеке подойдёт что-то вроде Nginx). С Kubernetes и Redis не работал. Упомянул, что при некоторых нагрузках в Nats терялись сообщения, и был создан issue на GitHub, но на момент ухода из компании баг не был исправлен. Считает Kafka более проверенным временем решением по сравнению с Nats.
Правильный ответ:
Envoy Proxy — современный сервис-прокси
Envoy — это L7 прокси и коммуникационный шина, изначально разработанная Lyft, теперь часть CNCF. Стандарт de facto для service mesh.
Ключевые возможности Envoy:
- gRPC нативная поддержка: HTTP/2, двунаправленные стримы, load balancing
- Service Discovery: интеграция с Consul, Eureka, Kubernetes
- Observability: метрики, трейсинг, логирование из коробки
- Circuit Breaking: автоматическое отключение нездоровых инстансов
- Rate Limiting: ограничение скорости запросов
- TLS termination: централизованное управление сертификатами
Envoy как gRPC Proxy:
# envoy.yaml — пример конфигурации для gRPC
static_resources:
listeners:
- name: grpc_listener
address:
socket_address:
address: 0.0.0.0
port_value: 8080
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: grpc_service
codec_type: AUTO
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match:
prefix: "/"
route:
cluster: backend_service
timeout: 30s
http_filters:
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters:
- name: backend_service
connect_timeout: 5s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
http2_protocol_options: {} # Включаем HTTP/2 для gRPC
load_assignment:
cluster_name: backend_service
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: backend-service
port_value: 9090
circuit_breakers:
thresholds:
- priority: DEFAULT
max_connections: 1000
max_pending_requests: 1000
max_requests: 1000
max_retries: 3
Сравнение Envoy vs Nginx:
| Характеристика | Envoy | Nginx |
|---|---|---|
| gRPC поддержка | Нативная | Ограниченная |
| HTTP/2 | Полная | Частичная |
| Service mesh | Да (Istio, Linkerd) | Нет |
| Hot reload | Без перезапуска | Требует reload |
| Observability | Встроенная | Через модули |
| Динамическая конфигурация | xDS API | Ограниченная |
Когда использовать Envoy:
- Микросервисная архитектура с gRPC
- Service mesh (Istio, Consul Connect)
- Необходимость динамической конфигурации
- Высокие требования к observability
Когда Nginx достаточен:
- Простой reverse proxy
- Статические конфигурации
- HTTP/1.1 нагрузка
- Меньшая сложность эксплуатации
Kafka vs NATS:
Кандидат правильно отметил, что Kafka — более зрелое решение:
| Характеристика | Kafka | NATS |
|---|---|---|
| Гарантия доставки | At-least-once, Exactly-once | At-most-once, At-least-once (JetStream) |
| Персистентность | Да | JetStream — да, классический — нет |
| Сложность | Высокая | Низкая |
| Производительность | Высокая | Очень высокая |
| Экосистема | Огромная | Растущая |
Для критичных систем, где потеря сообщений недопустима, Kafka — более надёжный выбор благодаря персистентности и проверенным механизмам гарантии доставки.
Вопрос 11. Какие типы продуктов разрабатывались — SaaS или on-premise? Как разворачивались сервисы?
Таймкод: 00:41:57
Ответ собеседника: Правильный. В основном разрабатывались SaaS-продукты (облачные решения), а не on-premise. Kafka разворачивалась в облаке (Amazon MSK). Кандидат сам не занимался разворачиванием Kafka — это делали другие специалисты. Локально всё было на Vagrant.
Правильный ответ:
SaaS vs On-Premise — ключевые различия:
SaaS (Software as a Service):
- Развёртывание в облаке (AWS, GCP, Azure)
- Мультитенантность
- Автоматическое масштабирование
- Управляемые сервисы (managed services)
- Непрерывные обновления
On-Premise:
- Установка на серверах клиента
- Каждый клиент — отдельный инстанс
- Сложности с обновлениями
- Требования к безопасности и изоляции
Amazon MSK (Managed Streaming for Kafka):
Управляемый сервис Kafka от AWS, который берёт на себя:
- Управление кластером
- Мониторинг и алертинг
- Автоматическое масштабирование
- Резервное копирование
- Обновления и патчи
Типичный стек SaaS-продукта на Go:
┌─────────────────────────────────────────────────────────────┐
│ Cloud (AWS) │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ API │ │ Auth │ │ Order │ │ Payment │ │
│ │ Gateway │ │ Service │ │ Service │ │ Service │ │
│ │ (Envoy) │ │ (Go) │ │ (Go) │ │ (Go) │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │
│ └──────────────┴──────────────┴──────────────┘ │
│ │ │
│ ┌─────────┴─────────┐ │
│ │ Amazon MSK │ │
│ │ (Kafka) │ │
│ └───────────────────┘ │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ RDS │ │ ElastiCache│ │ S3 │ │
│ │(Postgres)│ │ (Redis) │ │ │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────┘
Локальная разработка с Vagrant:
Vagrant позволяет создавать воспроизводимые окружения:
# Vagrantfile
Vagrant.configure("2") do |config|
config.vm.box = "ubuntu/focal64"
config.vm.provider "virtualbox" do |vb|
vb.memory = "4096"
vb.cpus = 2
end
config.vm.provision "docker" do |d|
d.run "kafka",
image: "confluentinc/cp-kafka:latest",
args: "-p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092"
d.run "postgres",
image: "postgres:15",
args: "-p 5432:5432 -e POSTGRES_PASSWORD=secret"
d.run "redis",
image: "redis:7",
args: "-p 6379:6379"
end
end
Современные альтернативы Vagrant:
- Docker Compose — для локальной разработки
- Tilt — для разработки в Kubernetes
- DevSpace — для отладки в кластере
- Telepresence — для подключения локального сервиса к кластеру
Ответ собеседника показывает опыт работы с SaaS-продуктами и понимание разделения ответственности: разработчики пишут код, а инфраструктурой занимаются специалисты (SRE, DevOps). Это типичная и зрелая модель для компаний с развитой инженерной культурой.
Вопрос 12. Предложите архитектуру системы сбора продуктовых метрик (ETL/ELT)
Таймкод: 00:45:28
Ответ собеседника: Правильный. Предложил следующую архитектуру: события от внутренних и внешних сервисов поступают через входящие эндпоинты (API) для универсальности и лёгкого подключения новых сервисов. Далее события загружаются в Kafka как промежуточный брокер. Затем с помощью Kafka Streams данные трансформируются и передаются аналитикам. Аналитики работают с OLAP-решениями (ClickHouse, Cassandra). Kafka Streams выполняет роль ETL — преобразование данных перед загрузкой в целевое хранилище. Схема: Сервисы → API → Kafka → Kafka Streams (ETL) → ClickHouse/Cassandra → Аналитики.
Правильный ответ:
Предложенная архитектура корректна и покрывает основные компоненты. Дополним деталями и альтернативами.
Полная архитектура системы сбора метрик:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Источники данных │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Web App │ │Mobile App│ │ Backend │ │ External │ │ IoT │ │
│ │ │ │ │ │ Services │ │ APIs │ │ Devices │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │ │
└───────┼─────────────┼─────────────┼─────────────┼─────────────┼────────────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Ingestion Layer │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ API Gateway / Collector │ │
│ │ (REST, gRPC, Webhook endpoints) │ │
│ └─────────────────────────────────┬───────────────────────────────────┘ │
│ │ │
└────────────────────────────────────┼───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Message Broker │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Apache Kafka │ │
│ │ │ │
│ │ Topics: │ │
│ │ - raw-events (все сырые события) │ │
│ │ - user-actions (действия пользователей) │ │
│ │ - system-metrics (метрики систем) │ │
│ │ - business-events (бизнес-события) │ │
│ └─────────────────────────────────┬───────────────────────────────────┘ │
│ │ │
└────────────────────────────────────┼───────────────────────────────────────┘
│
┌────────────────┼────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Processing Layer (ETL/ELT) │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Kafka Streams│ │ Apache Flink │ │ Custom Go │ │
│ │ │ │ │ │ Workers │ │
│ │ - Validation │ │ - Complex │ │ - Enrichment │ │
│ │ - Filtering │ │ windowing │ │ - Transform │ │
│ │ - Enrichment │ │ - Aggregation│ │ - Routing │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
└─────────┼───────────────────┼───────────────────┼──────────────────────────┘
│ │ │
└───────────────────┼───────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Storage Layer │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ ClickHouse │ │ Apache │ │ Redis │ │
│ │ │ │ Druid │ │ │ │
│ │ - Analytics │ │ - Real-time │ │ - Caching │ │
│ │ - Dashboards │ │ analytics │ │ - Hot data │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Consumption Layer │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Grafana │ │ Superset │ │ Jupyter │ │
│ │ │ │ │ │ Notebooks │ │
│ │ - Dashboards │ │ - Ad-hoc │ │ - ML Models │ │
│ │ - Alerts │ │ queries │ │ - Research │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Реализация ETL на Go вместо Kafka Streams:
// Event представляет сырое событие
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Source string `json:"source"`
Timestamp time.Time `json:"timestamp"`
UserID string `json:"user_id"`
Payload map[string]interface{} `json:"payload"`
}
// ProcessedEvent представляет обработанное событие
type ProcessedEvent struct {
Event
ProcessedAt time.Time `json:"processed_at"`
SessionID string `json:"session_id"`
GeoLocation string `json:"geo_location"`
DeviceType string `json:"device_type"`
}
// Transformer преобразует сырые события
type Transformer struct {
geoResolver *GeoResolver
deviceParser *DeviceParser
sessionTracker *SessionTracker
}
func (t *Transformer) Transform(ctx context.Context, raw *Event) (*ProcessedEvent, error) {
// 1. Валидация
if err := t.validate(raw); err != nil {
return nil, fmt.Errorf("validation failed: %w", err)
}
// 2. Обогащение данных
geoLocation, err := t.geoResolver.Resolve(ctx, raw.Payload["ip"].(string))
if err != nil {
log.Printf("geo resolution failed: %v", err)
}
deviceType := t.deviceParser.Parse(raw.Payload["user_agent"].(string))
sessionID := t.sessionTracker.GetOrCreateSession(raw.UserID, raw.Timestamp)
return &ProcessedEvent{
Event: *raw,
ProcessedAt: time.Now(),
SessionID: sessionID,
GeoLocation: geoLocation,
DeviceType: deviceType,
}, nil
}
func (t *Transformer) validate(event *Event) error {
if event.ID == "" {
return fmt.Errorf("event ID is required")
}
if event.Type == "" {
return fmt.Errorf("event type is required")
}
if event.Timestamp.IsZero() {
return fmt.Errorf("timestamp is required")
}
if event.Timestamp.After(time.Now().Add(time.Hour)) {
return fmt.Errorf("timestamp is in the future")
}
return nil
}
// Consumer обрабатывает события из Kafka
type Consumer struct {
reader *kafka.Reader
transformer *Transformer
clickhouse *sqlx.DB
batchSize int
batch []*ProcessedEvent
}
func (c *Consumer) Start(ctx context.Context) error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return c.flush(ctx)
case <-ticker.C:
if err := c.flush(ctx); err != nil {
log.Printf("flush error: %v", err)
}
default:
msg, err := c.reader.ReadMessage(ctx)
if err != nil {
return fmt.Errorf("read message: %w", err)
}
var raw Event
if err := json.Unmarshal(msg.Value, &raw); err != nil {
log.Printf("unmarshal error: %v", err)
continue
}
processed, err := c.transformer.Transform(ctx, &raw)
if err != nil {
log.Printf("transform error: %v", err)
continue
}
c.batch = append(c.batch, processed)
if len(c.batch) >= c.batchSize {
if err := c.flush(ctx); err != nil {
log.Printf("flush error: %v", err)
}
}
}
}
}
func (c *Consumer) flush(ctx context.Context) error {
if len(c.batch) == 0 {
return nil
}
tx, err := c.clickhouse.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, `
INSERT INTO events (id, type, source, timestamp, user_id,
processed_at, session_id, geo_location, device_type, payload)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`)
if err != nil {
return err
}
defer stmt.Close()
for _, event := range c.batch {
payload, _ := json.Marshal(event.Payload)
_, err := stmt.ExecContext(ctx,
event.ID, event.Type, event.Source, event.Timestamp, event.UserID,
event.ProcessedAt, event.SessionID, event.GeoLocation, event.DeviceType, payload)
if err != nil {
log.Printf("insert error: %v", err)
}
}
c.batch = c.batch[:0]
return tx.Commit()
}
ClickHouse схема для аналитики:
-- Основная таблица событий
CREATE TABLE events (
id String,
type LowCardinality(String),
source LowCardinality(String),
timestamp DateTime64(3),
user_id String,
processed_at DateTime64(3),
session_id String,
geo_location LowCardinality(String),
device_type LowCardinality(String),
payload String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (type, timestamp, user_id)
TTL timestamp + INTERVAL 2 YEAR;
-- Агрегированная таблица для быстрых запросов
CREATE MATERIALIZED VIEW events_hourly
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (type, source, hour)
AS SELECT
toStartOfHour(timestamp) as hour,
type,
source,
count() as event_count,
uniq(user_id) as unique_users
FROM events
GROUP BY hour, type, source;
ETL vs ELT:
| Подход | Описание | Когда использовать |
|---|---|---|
| ETL | Transform → Load → Query | Сложные трансформации, legacy системы |
| ELT | Load → Transform → Query | Современные хранилища (ClickHouse, BigQuery), гибкость |
Для ClickHouse предпочтительнее ELT — загружаем сырые данные, трансформируем через materialized views.
Ответ собеседника демонстрирует понимание классической архитектуры аналитических систем и правильный выбор технологий для каждого слоя.
