Перейти к основному содержимому

Собеседование на Backend АРХИТЕКТОРА (Java, Go, Node, Python, C++)

· 28 мин. чтения

Сегодня мы разберём техническое собеседование с кандидатом на позицию архитектора, в ходе которого глубоко обсуждаются распределённые системы, выбор технологического стека и практические кейсы из реального опыта. Кандидемонстрирует сильное понимание микросервисной архитектуры, паттернов обеспечения консистентности (саги, транзакционные очереди событий), а также аргументированно рассуждает о компромиссах между производительностью, надёжностью и скоростью разработки при выборе языков и инструментов. Особое внимание уделяется разбору конкретных инцидентов — например, потере событий в системе на базе Kafka — и поиску решений для упорядочивания операций в условиях мультиинстансной архитектуры.

Вопрос 1. Какова основная задача архитектора?

Таймкод: 00:00:26

Ответ собеседника: Правильный. Основная задача архитектора — найти наилучшее решение для запроса от продукта. Критерии: время, деньги, наличие разработчиков, соответствие текущему стеку, скорость реализации. По сути — максимально быстро, просто и надёжно реализовать хотелки продукта на существующем стеке компании.

Правильный ответ:

Основная задача архитектора — это принятие ключевых технических решений, которые определяют структуру системы и обеспечивают баланс между бизнес-требованиями и техническими ограничениями.

Ключевые аспекты роли архитектора:

1. Анализ требований и ограничений

Архитектор работает с бизнес-запросами и трансформирует их в технические решения. Критерии принятия решений включают:

  • Время реализации (time-to-market)
  • Бюджет и стоимость владения (TCO)
  • Наличие компетенций в команде
  • Совместимость с существующим стеком технологий
  • Простота и надёжность решения

2. Проектирование системы

Архитектор определяет:

  • Выбор технологий и фреймворков
  • Структуру сервисов и их взаимодействие
  • Схемы данных и потоков информации
  • Стратегии масштабирования и отказоустойчивости

3. Коммуникация и согласование

Архитектор выступает связующим звеном между продуктовой и технической сторонами, обеспечивая понимание решений всеми участниками процесса.

4. Оценка компромиссов

Каждое архитектурное решение — это компромисс. Архитектор должен чётко понимать и доносить последствия выбранного подхода, включая технический долг и ограничения будущих изменений.

Ответ собеседника точно отражает суть: архитектор находит оптимальное решение в рамках заданных ограничений, стремясь к быстрой, простой и надёжной реализации бизнес-требований на существующем стеке компании.

Вопрос 2. Какой язык или инструмент наиболее близок к концепции «серебряной пули»?

Таймкод: 00:02:00

Ответ собеседника: Правильный. Считает, что Rust наиболее близок к серебряной пуле. Он быстрый как C++, но без сборщика мусора благодаря модели владения (borrowing). Компилятор проверяет много вещей, снижая вероятность ошибок в рантайме. Rust позволяет писать и системный код, и веб-сервисы через Tokio с асинхронным Event loop. Однако если ориентироваться на порог входа и лёгкость поиска разработчиков, то Go предпочтительнее благодаря нативной конкурентности.

Правильный ответ:

Концепция «серебряной пули» в программировании подразумевает универсальный инструмент, который оптимален для широкого круга задач. Автор этого термина, Фредерик Брукс, в эссе «No Silver Bullet» (1986) утверждал, что такого инструмента не существует и не может существовать.

Rust как претендент на универсальность

Rust действительно приближается к этой концепции благодаря уникальной комбинации свойств:

  • Производительность уровня C++ без сборщика мусора
  • Безопасность памяти через систему владения (ownership) и заимствования (borrowing)
  • Строгая проверка на этапе компиляции, исключающая целый класс ошибок в рантайме
  • Применимость для системного программирования, веб-сервисов (через Tokio), WebAssembly, embedded-систем

Go как прагматичный выбор

Go выигрывает в других аспектах:

  • Нативная поддержка конкурентности через горутины и каналы
  • Низкий порог входа и быстрое обучение
  • Простота и читаемость кода
  • Большой пул разработчиков на рынке
  • Отличная стандартная библиотека для сетевых сервисов

Почему «серебряной пули» не существует

Каждый язык — это набор компромиссов:

  • Rust: высокий порог входа, длительное время компиляции, сложность с borrow checker
  • Go: отсутствие дженериков (до версии 1.18), ограниченный контроль над памятью, GC-паузы
  • Java/C#: зависимость от JVM, высокое потребление памяти
  • Python/JS: низкая производительность, динамическая типизация

Выбор инструмента всегда зависит от контекста: требований к производительности, компетенций команды, сроков разработки и специфики задачи. Именно поэтому опытный архитектор выбирает инструмент под конкретную задачу, а не ищет универсальное решение.

Вопрос 3. gRPC лучше REST? Какой протокол интеграции предпочтительнее с точки зрения продукта?

Таймкод: 00:07:45

Ответ собеседника: Правильный. С точки зрения разработчика gRPC интереснее, но с точки зрения продукта и внешних интеграций REST предпочтительнее. Использование gRPC для внешних интеграций может вызвать сложности у сторонних разработчиков, увеличить время интеграции и даже привести к смене вендора. REST — более универсальный и понятный для внешних партнёров вариант.

Правильный ответ:

Выбор между gRPC и REST — это классический пример компромисса между техническим совершенством и продуктовой целесообразностью. Универсального ответа нет, но есть чёткие критерии выбора.

Сравнение подходов

REST (HTTP/JSON)

  • Универсальность: поддерживается любым языком и платформой
  • Читаемость: JSON человекочитаем, легко дебажить через curl, Postman
  • Экосистема: огромное количество инструментов, документации, специалистов
  • Кеширование: нативная поддержка HTTP-кеширования
  • Минусы: избыточность данных, отсутствие строгой типизации, ручная валидация

gRPC (HTTP/2 + Protobuf)

  • Производительность: бинарный протокол, меньше трафика, быстрее сериализация
  • Типизация: строгие контракты через .proto файлы
  • Генерация кода: автоматическая генерация клиентов для множества языков
  • Стриминг: нативная поддержка двунаправленных потоков
  • Минусы: сложнее дебажить, требует HTTP/2, меньше специалистов на рынке

Рекомендации по выбору

Используйте REST для:

  • Публичных API и внешних интеграций
  • Случаев, когда важна простота интеграции для партнёров
  • Проектов с большим количеством внешних потребителей

Используйте gRPC для:

  • Внутренних сервисов (microservices communication)
  • Высоконагруженных систем с критичной производительностью
  • Систем с двунаправленным стримингом
  • Команд, где уже есть экспертиза в gRPC

Гибридный подход

Многие компании используют оба подхода одновременно:

  • gRPC для внутренней коммуникации между сервисами
  • REST API Gateway для внешних потребителей
  • gRPC-Gateway для автоматической генерации REST-эндпоинтов из .proto файлов

Пример гибридной архитектектуры:

Внешние клиенты → REST API Gateway → gRPC микросервисы

Такой подход позволяет получить преимущества обоих миров: производительность gRPC внутри и простоту REST для внешних интеграций.

Ответ собеседника абсолютно верен: с продуктовой точки зрения REST предпочтителен для внешних интеграций из-за универсальности и низкого порога входа для партнёров.

Вопрос 4. Как использовались транзакционные очереди (Transactional Outbox), почему от них избавлялись и как реализовывался паттерн Saga?

Таймкод: 00:09:17

Ответ собеседника: Правильный. Transactional Outbox был добавлен по запросу бизнеса, когда события терялись — в одном сервисе обрабатывались, в другом не доходили. Реализация: отдельная таблица в БД, куда в той же транзакции, что и обновление основной сущности, записывались события. Далее отдельный процесс считывал и отправлял в Kafka. После внедрения сильно упал перформанс, решение было частично откачено, найден баланс. Паттерн Saga реализовывался через оркестрацию. Уровень контекста — зона ответственности CTO, детальные диаграммы и код прорабатывались самостоятельно.

Правильный ответ:

Transactional Outbox

Паттерн Transactional Outbox решает проблему атомарности при обновлении данных и публикации событий в распределённых системах.

Проблема:

Без этого паттерна возникает классическая ситуация «двойной записи»:

  1. Сервис обновляет данные в БД
  2. Сервис отправляет событие в брокер сообщений

Между этими шагами может произойти сбой: БД обновилась, но событие не отправилось (или наоборот). Это приводит к рассинхронизации данных между сервисами.

Реализация:

-- Таблица основных сущностей
CREATE TABLE orders (
id UUID PRIMARY KEY,
status VARCHAR(50),
created_at TIMESTAMP
);

-- Таблица Outbox
CREATE TABLE outbox_events (
id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
published BOOLEAN DEFAULT FALSE
);
func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

// 1. Сохраняем заказ
_, err = tx.ExecContext(ctx,
"INSERT INTO orders (id, status, created_at) VALUES ($1, $2, $3)",
order.ID, order.Status, time.Now())
if err != nil {
return err
}

// 2. Записываем событие в outbox в той же транзакции
payload, _ := json.Marshal(order)
_, err = tx.ExecContext(ctx,
`INSERT INTO outbox_events (id, aggregate_id, aggregate_type, event_type, payload)
VALUES ($1, $2, $3, $4, $5)`,
uuid.New(), order.ID, "order", "order.created", payload)
if err != nil {
return err
}

return tx.Commit()
}

Процесс публикации (Polling Publisher):

func (p *OutboxPublisher) Start(ctx context.Context) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
p.publishEvents(ctx)
}
}
}

func (p *OutboxPublisher) publishEvents(ctx context.Context) {
rows, err := p.db.QueryContext(ctx,
`SELECT id, aggregate_id, event_type, payload
FROM outbox_events
WHERE published = FALSE
ORDER BY created_at
LIMIT 100`)
if err != nil {
log.Printf("failed to query outbox: %v", err)
return
}
defer rows.Close()

for rows.Next() {
var event OutboxEvent
if err := rows.Scan(&event.ID, &event.AggregateID, &event.EventType, &event.Payload); err != nil {
continue
}

// Отправляем в Kafka
if err := p.producer.Send(event); err != nil {
log.Printf("failed to send event: %v", err)
continue
}

// Помечаем как опубликованное
p.db.ExecContext(ctx,
"UPDATE outbox_events SET published = TRUE WHERE id = $1",
event.ID)
}
}

Проблемы производительности:

  • Дополнительная таблица увеличивает нагрузку на БД
  • Polling создаёт постоянную нагрузку на базу
  • Необходимость очистки старых записей
  • Задержка между записью и публикацией события

Альтернативы:

  • CDC (Change Data Capture): Debezium читает WAL (Write-Ahead Log) базы данных без polling
  • Transactional Messaging: некоторые брокеры поддерживают транзакции (например, Kafka Transactions)

Паттерн Saga

Saga — паттерн управления распределёнными транзакциями через последовательность локальных транзакций с компенсирующими действиями.

Два подхода:

1. Хореограгия (Choreography)

Каждый сервис слушает события и сам решает, что делать дальше.

// Order Service
func (s *OrderService) HandlePaymentCompleted(ctx context.Context, event PaymentCompleted) error {
return s.orderRepo.UpdateStatus(ctx, event.OrderID, "confirmed")
}

// Payment Service
func (s *PaymentService) HandleOrderCancelled(ctx context.Context, event OrderCancelled) error {
return s.refundPayment(ctx, event.OrderID)
}

2. Оркестрация (Orchestration)

Центральный оркестратор управляет последовательностью шагов.

type SagaOrchestrator struct {
orderService OrderService
paymentService PaymentService
inventoryService InventoryService
}

func (o *SagaOrchestrator) CreateOrderSaga(ctx context.Context, order Order) error {
// Шаг 1: Создать заказ
if err := o.orderService.Create(ctx, order); err != nil {
return fmt.Errorf("create order failed: %w", err)
}

// Шаг 2: Забронировать товар
if err := o.inventoryService.Reserve(ctx, order.Items); err != nil {
// Компенсация: отменить заказ
o.orderService.Cancel(ctx, order.ID)
return fmt.Errorf("reserve inventory failed: %w", err)
}

// Шаг 3: Провести оплату
if err := o.paymentService.Charge(ctx, order.PaymentInfo); err != nil {
// Компенсации
o.inventoryService.Release(ctx, order.Items)
o.orderService.Cancel(ctx, order.ID)
return fmt.Errorf("payment failed: %w", err)
}

// Шаг 4: Подтвердить заказ
return o.orderService.Confirm(ctx, order.ID)
}

Ответ собеседника демонстрирует практический опыт: внедрение Transactional Outbox было вызвано реальной проблемой потери событий, но привело к деградации производительности, что потребовало поиска баланса. Это типичная ситуация — идеальное решение на бумаге не всегда оптимально в продакшене.

Вопрос 5. Как реализовывался паттерн Saga и какие инструменты использовались?

Таймкод: 00:10:55

Ответ собеседника: Правильный. Использовал паттерн Saga для управления распределёнными транзакциями. В качестве инструмента использовалась Kafka с транзакционными возможностями (Transactional Outbox). Кастомная реализация на уровне кода и контейнеров, без использования верхнеуровневых коробочных решений. Уровень контекста (бизнес-контекст) в основном был зоной ответственности CTO.

Правильный ответ:

Данный вопрос является уточняющим к предыдущему, где кандидат уже описал реализацию Saga через оркестрацию. Дополним ответ деталями о транзакционных возможностях Kafka.

Транзакционные возможности Kafka

Kafka поддерживает транзакции начиная с версии 0.11, что позволяет реализовать атомарную запись в несколько партиций и exactly-once semantics.

Пример транзакционного producer в Go:

import "github.com/segmentio/kafka-go"

func createTransactionalProducer(brokers []string, txID string) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(brokers...),
Balancer: &kafka.LeastBytes{},
Transport: &kafka.Transport{
SASL: mechanism,
},
// Транзакционные настройки
AllowAutoTopicCreation: false,
}
}

func (s *OrderService) publishOrderEvent(ctx context.Context, event OrderEvent) error {
// Используем kafka-go с поддержкой транзакций
// или нативный sarama с транзакционным producer

return s.kafkaWriter.WriteMessages(ctx,
kafka.Message{
Topic: "order-events",
Key: []byte(event.OrderID),
Value: event.Payload,
Headers: []kafka.Header{
{Key: "event-type", Value: []byte(event.Type)},
},
},
)
}

С Sarama (полная транзакционная поддержка):

import "github.com/Shopify/sarama"

func (s *OrderService) processWithTransaction(ctx context.Context, order Order) error {
// Транзакционный producer
txnProducer, err := sarama.NewAsyncProducer(s.brokers, s.config)
if err != nil {
return err
}

// Начинаем транзакцию
if err := txnProducer.BeginTxn(); err != nil {
return err
}

// Отправляем сообщения в рамках транзакции
for _, msg := range s.buildMessages(order) {
txnProducer.Input() <- msg
}

// Фиксируем транзакцию
if err := txnProducer.CommitTxn(); err != nil {
txnProducer.AbortTxn()
return fmt.Errorf("transaction failed: %w", err)
}

return nil
}

Кастомная реализация vs коробочные решения

Кандидат упоминает кастомную реализацию без использования готовых фреймворков. Это имеет свои плюсы и минусы:

Плюсы кастомной реализации:

  • Полный контроль над логикой
  • Отсутствие зависимости от сторонних библиотек
  • Возможность точной настройки под бизнес-требования

Минусы:

  • Необходимость самостоятельно обрабатывать edge cases
  • Сложность отладки и мониторинга
  • Отсутствие community support

Коробочные решения для Saga в Go:

  • Temporal — workflow engine с поддержкой Saga
  • Cadence — аналог Temporal от Uber
  • dtm — distributed transaction manager
  • masstransit — библиотека для message-based приложений

Распределение ответственности

Упоминание о том, что бизнес-контекст — зона ответственности CTO, а техническая реализация — зона ответственности разработчика, показывает понимание разделения зон ответственности в команде. Это важный аспект зрелой инженерной культуры.

Вопрос 6. Какой опыт работы с Legacy-кодом и переписыванием проектов?

Таймкод: 00:13:40

Ответ собеседника: Правильный. В части Go около 70% работы приходилось на новый код и 30% на Legacy. Был проект по переводу сервиса с PHP на Go. Работа с Legacy не пугает, к этому привык.

Правильный ответ:

Работа с Legacy-кодом и миграция между технологиями — неизбежная часть карьеры любого разработчика. Статистика показывает, что в индустрии соотношение примерно 70/30 в пользу работы с существующим кодом — это реалистичная и распространённая ситуация.

Стратегии работы с Legacy-кодом

1. Страничный рефакторинг (Strangler Fig Pattern)

Постепенная замена функциональности старой системы новой, без полного переписывания:

Клиенты → API Gateway → [Новый сервис] ↘
[Старый сервис] → Единая точка входа

2. Тестирование перед изменениями

Прежде чем менять Legacy-код, необходимо покрыть его тестами:

  • Characterization tests — фиксируем текущее поведение
  • Golden master — сохраняем эталонные ответы
  • Approval tests — сравниваем изменения с эталоном

3. Выделение границ (Seams)

Находим точки, где можно отделить Legacy от нового кода:

  • Адаптеры и фасады
  • Антикоррупционный слой (Anti-Corruption Layer)
  • Событийные интеграции

Миграция с PHP на Go

Типичные шаги при миграции:

Фаза 1: Анализ

  • Документирование текущей функциональности
  • Определение зависимостей и интеграций
  • Выделение критичных бизнес-процессов

Фаза 2: Подготовка

  • Настройка инфраструктуры для Go-сервисов
  • Создание CI/CD pipelines
  • Реализация общих библиотек и утилит

Фаза 3: Параллельная работа

  • Запуск нового сервиса рядом со старым
  • Синхронизация данных между системами
  • Постепенное переключение трафика

Фаза 4: Завершение

  • Полное переключение на новый сервис
  • Мониторинг и стабилизация
  • Деактивация старого сервиса

Пример антикоррупционного слоя:

// PHP Legacy Service
type PHPOrderService struct {
client *http.Client
baseURL string
}

func (s *PHPOrderService) GetOrder(ctx context.Context, id string) (*LegacyOrder, error) {
resp, err := s.client.Get(fmt.Sprintf("%s/orders/%s", s.baseURL, id))
if err != nil {
return nil, err
}
defer resp.Body.Body.Close()

var order LegacyOrder
if err := json.NewDecoder(resp.Body).Decode(&order); err != nil {
return nil, err
}
return &order, nil
}

// Anti-Corruption Layer
type OrderAdapter struct {
legacyService *PHPOrderService
}

func (a *OrderAdapter) GetOrder(ctx context.Context, id string) (*domain.Order, error) {
legacyOrder, err := a.legacyService.GetOrder(ctx, id)
if err != nil {
return nil, fmt.Errorf("legacy service error: %w", err)
}

// Преобразование из legacy-формата в новый доменный формат
return &domain.Order{
ID: legacyOrder.OrderID,
Status: mapLegacyStatus(legacyOrder.StatusCode),
Amount: money.New(legacyOrder.Total, "USD"),
}, nil
}

Ключевые принципы успешной миграции:

  • Не переписывать всё сразу — мигрировать поэтапно
  • Сохранять бизнес-логику, менять технологию
  • Иметь возможность отката на каждом этапе
  • Измерять метрики до и после миграции
  • Документировать решения и trade-offs

Ответ собеседника показывает зрелый подход: комфортная работа с Legacy и опыт миграции — это ценные навыки, которые свидетельствуют о практическом опыте и гибкости.

Вопрос 7. Как именно реализовывался Transactional Outbox для гарантии доставки событий?

Таймкод: 00:19:29

Ответ собеседника: Правильный. Была создана отдельная таблица в БД, куда записывались события, связанные с пользователями. При обновлении записи в таблице пользователей в той же транзакции БД заносилась запись во вторую таблицу с событиями. За счёт атомарности транзакции достигалось то, что либо обе записи появляются, либо ни одна. Далее отдельный процесс считывал события из этой таблицы и отправлял в шину данных (Kafka). Это вариант паттерна Transactional Outbox.

Правильный ответ:

Детальная реализация Transactional Outbox с гарантией доставки событий.

Схема работы:

┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Application │ │ Database │ │ Kafka │
│ │ │ │ │ │
│ 1. Begin TX │────▶│ users table │ │ │
│ 2. Update user │ │ outbox table │ │ │
│ 3. Insert event│ │ │ │ │
│ 4. Commit TX │ │ │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ ▲
│ │
▼ │
┌─────────────────┐ │
│ Relay/Poller │────────────┘
│ (separate │
│ process) │
└─────────────────┘

SQL-схема:

-- Основная таблица
CREATE TABLE users (
id UUID PRIMARY KEY,
email VARCHAR(255) NOT NULL,
name VARCHAR(255),
status VARCHAR(50) DEFAULT 'active',
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);

-- Таблица Outbox
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP DEFAULT NOW(),
processed_at TIMESTAMP,
status VARCHAR(20) DEFAULT 'pending',
retry_count INT DEFAULT 0,
error TEXT
);

-- Индексы для оптимизации polling
CREATE INDEX idx_outbox_status_created ON outbox_events (status, created_at);
CREATE INDEX idx_outbox_aggregate ON outbox_events (aggregate_id, aggregate_type);

Реализация на Go:

type OutboxEvent struct {
ID uuid.UUID `db:"id"`
AggregateID uuid.UUID `db:"aggregate_id"`
AggregateType string `db:"aggregate_type"`
EventType string `db:"event_type"`
Payload json.RawMessage `db:"payload"`
Metadata json.RawMessage `db:"metadata"`
CreatedAt time.Time `db:"created_at"`
ProcessedAt *time.Time `db:"processed_at"`
Status string `db:"status"`
RetryCount int `db:"retry_count"`
Error *string `db:"error"`
}

type UserService struct {
db *sqlx.DB
kafka *kafka.Writer
}

func (s *UserService) UpdateUser(ctx context.Context, user User) error {
tx, err := s.db.BeginTxx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
defer tx.Rollback()

// 1. Обновляем пользователя
_, err = tx.ExecContext(ctx,
`UPDATE users SET name = $1, email = $2, updated_at = $3 WHERE id = $4`,
user.Name, user.Email, time.Now(), user.ID)
if err != nil {
return fmt.Errorf("update user: %w", err)
}

// 2. Создаём событие в outbox в той же транзакции
payload, _ := json.Marshal(map[string]interface{}{
"user_id": user.ID,
"name": user.Name,
"email": user.Email,
})

_, err = tx.ExecContext(ctx,
`INSERT INTO outbox_events (id, aggregate_id, aggregate_type, event_type, payload)
VALUES ($1, $2, $3, $4, $5)`,
uuid.New(), user.ID, "user", "user.updated", payload)
if err != nil {
return fmt.Errorf("insert outbox event: %w", err)
}

// 3. Фиксируем транзакцию — либо обе записи, либо ни одна
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit transaction: %w", err)
}

return nil
}

Relay/Poller для отправки событий:

type OutboxRelay struct {
db *sqlx.DB
kafka *kafka.Writer
batchSize int
interval time.Duration
}

func (r *OutboxRelay) Start(ctx context.Context) {
ticker := time.NewTicker(r.interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.processBatch(ctx); err != nil {
log.Printf("outbox relay error: %v", err)
}
}
}
}

func (r *OutboxRelay) processBatch(ctx context.Context) error {
tx, err := r.db.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

// Выбираем необработанные события с блокировкой
var events []OutboxEvent
err = tx.SelectContext(ctx, &events,
`SELECT * FROM outbox_events
WHERE status = 'pending' AND retry_count < 5
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED`,
r.batchSize)
if err != nil {
return fmt.Errorf("select events: %w", err)
}

for _, event := range events {
// Отправляем в Kafka
msg := kafka.Message{
Topic: fmt.Sprintf("%s.events", event.AggregateType),
Key: []byte(event.AggregateID.String()),
Value: event.Payload,
Headers: []kafka.Header{
{Key: "event-id", Value: []byte(event.ID.String())},
{Key: "event-type", Value: []byte(event.EventType)},
},
}

if err := r.kafka.WriteMessages(ctx, msg); err != nil {
// Увеличиваем счётчик попыток
_, _ = tx.ExecContext(ctx,
`UPDATE outbox_events
SET retry_count = retry_count + 1, error = $1
WHERE id = $2`,
err.Error(), event.ID)
continue
}

// Помечаем как обработанное
now := time.Now()
_, err = tx.ExecContext(ctx,
`UPDATE outbox_events
SET status = 'processed', processed_at = $1
WHERE id = $2`,
now, event.ID)
if err != nil {
log.Printf("failed to mark event as processed: %v", err)
}
}

return tx.Commit()
}

Гарантии доставки:

  • At-least-once: событие будет доставлено минимум один раз (возможны дубликаты)
  • Ordering: события в рамках одного aggregate сохраняют порядок
  • Durability: событие не потеряется даже при сбое приложения

Оптимизации:

  • SKIP LOCKED позволяет запускать несколько relay параллельно
  • Пакетная обработка снижает нагрузку на БД
  • Exponential backoff для retry логики
  • Dead letter queue для событий, которые не удалось доставить

Вопрос 8. Как решить проблему шардирования событий по нескольким инстансам при Transactional Outbox с сохранением порядка событий?

Таймкод: 00:21:48

Ответ собеседника: Неполный. Предложил использовать ID пользователя как ключ партиционирования, чтобы все события по одному пользователю попадали в одну партицию. Определил проблему с порядком событий (ordering) при удалении и повторном создании пользователя с тем же логином. Рассматривал добавление временной метки (timestamp) или версии агрегата в ключ, но не смог чётко сформулировать окончательное решение. Идея использовать персональные данные (возраст и т.д.) была отвергнута.

Правильный ответ:

Это классическая проблема при реализации Transactional Outbox с несколькими инстансами. Кандидат правильно определил ключевые аспекты проблемы, но решение требует более детальной проработки.

Проблема партиционирования и порядка событий

Основной конфликт:

  • Необходимо распределить нагрузку между партициями (шардирование)
  • Необходимо сохранить порядок событий для каждого агрегата
  • Несколько инстансов relay могут обрабатывать события параллельно

Решение 1: Ключ партиционирования по Aggregate ID

Использование ID агрегата (user_id) как ключа сообщения в Kafka:

func (r *OutboxRelay) sendEvent(ctx context.Context, event OutboxEvent) error {
msg := kafka.Message{
Topic: "user-events",
Key: []byte(event.AggregateID.String()), // Ключ партиционирования
Value: event.Payload,
Headers: []kafka.Header{
{Key: "event-id", Value: []byte(event.ID.String())},
{Key: "event-type", Value: []byte(event.EventType)},
{Key: "aggregate-version", Value: []byte(fmt.Sprintf("%d", event.Version))},
},
}
return r.kafka.WriteMessages(ctx, msg)
}

Гарантии:

  • Все события одного пользователя попадают в одну партицию
  • Kafka гарантирует порядок сообщений внутри партиции
  • Разные пользователи распределяются по разным партициям

Проблема переиспользования ID:

Кандикт правильно подметил проблему: если пользователь удаляется и создаётся заново с тем же логином, события могут перемешаться.

Решение 2: Уникальный Aggregate ID + Version

Используем UUID вместо логина как идентификатор агрегата и добавляем версию:

CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), -- Уникальный ID
login VARCHAR(255) UNIQUE NOT NULL, -- Бизнес-ключ
name VARCHAR(255),
version INT DEFAULT 1, -- Версия для optimistic locking
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
version INT NOT NULL, -- Версия события
created_at TIMESTAMP DEFAULT NOW(),
status VARCHAR(20) DEFAULT 'pending'
);

Решение 3: Глобальный порядок через sequence

Добавляем глобальный sequence number в БД:

-- Последовательность для глобального порядка
CREATE SEQUENCE outbox_event_seq;

CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sequence BIGINT DEFAULT nextval('outbox_event_seq'),
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
status VARCHAR(20) DEFAULT 'pending'
);

CREATE INDEX idx_outbox_sequence ON outbox_events (sequence);

Реализация с гарантией порядка:

type OutboxRelay struct {
db *sqlx.DB
kafka *kafka.Writer
batchSize int
// Каждый инстанс обрабатывает свой диапазон партиций
partition int
totalPartitions int
}

func (r *OutboxRelay) processBatch(ctx context.Context) error {


#### **Вопрос 9**. Можно ли использовать встроенные функции БД (например, now()) для получения корректного timestamp событий при высокой нагрузке?

**Таймкод:** <YouTubeSeekTo id="UAwnt7fkT08" time="00:28:03"/>

**Ответ собеседника:** **Неполный**. Кандидат предложил использовать встроенные функции БД для получения timestamp, но столкнулся с проблемой: при высокой нагрузке и множестве инстансов application-серверов порядок записей в БД может не совпадать с реальным порядком поступления запросов. Обсудил, что даже при single master возможен неправильный порядок из-за асинхронности внутренних механизмов БД (WAL, запись в лог и последующее применение данных могут быть разделены по времени). Не смог дать точный ответ о том, в какой момент вычисляется timestamp в БД и гарантирует ли он он правильный порядок.

**Правильный ответ:**

Это глубокий вопрос о временных метках и гарантиях порядка в распределённых системах. Кандидат правильно обозначил проблему, но требуется более точное понимание механизмов.

**Проблема now() в PostgreSQL**

**Когда вычисляется now():**

Функция `now()` в PostgreSQL возвращает время начала транзакции, а не время выполнения конкретного оператора:

```sql
BEGIN;
SELECT now(); -- 2024-01-15 10:00:00.000
-- ... проходит 100 мс ...
SELECT now(); -- 2024-01-15 10:00:00.000 (то же время!)
COMMIT;

Демонстрация проблемы:

-- Транзакция A начинается в 10:00:00.000
BEGIN; -- now() = 10:00:00.000
-- ... задержка 50 мс ...
INSERT INTO outbox_events (..., created_at) VALUES (..., now());
-- created_at = 10:00:00.000, хотя реальное время 10:00:00.050

-- Транзакция B начинается в 10:00:00.010
BEGIN; -- now() = 10:00:00.010
-- ... задержка 10 мс ...
INSERT INTO outbox_events (..., created_at) VALUES (..., now());
-- created_at = 10:00:00.010, хотя реальное время 10:00:00.020

-- Результат: событие B имеет МЕНЬШУЮ метку, чем A,
-- хотя было создано ПОЗЖЕ в реальности

Почему now() не гарантирует порядок:

  • now() фиксируется в момент BEGIN транзакции
  • Транзакции могут выполняться параллельно
  • COMMIT может произойти в другом порядке, чем BEGIN
  • WAL записи применяются асинхронно

Решения для корректного порядка

1. Использование sequence (рекомендуется)

CREATE SEQUENCE outbox_event_seq;

CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sequence BIGINT NOT NULL DEFAULT nextval('outbox_event_seq'),
aggregate_id UUID NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT now(),
status VARCHAR(20) DEFAULT 'pending'
);

-- sequence гарантирует строгий порядок, независимо от времени

2. Глобальный timestamp из единого источника

// Используем timestamp из Kafka, а не из БД
func (r *OutboxRelay) sendEvent(ctx context.Context, event OutboxEvent) error {
msg := kafka.Message{
Topic: "user-events",
Key: []byte(event.AggregateID.String()),
Value: event.Payload,
Time: time.Now(), // Timestamp из application-сервера
Headers: []kafka.Header{
{Key: "sequence", Value: []byte(fmt.Sprintf("%d", event.Sequence))},
},
}
return r.kafka.WriteMessages(ctx, msg)
}

3. Hybrid Logical Clock (HLC)

Для распределённых систем с несколькими датацентрами:

type HybridLogicalClock struct {
wallTime int64
logical int64
}

func (c *HybridLogicalClock) Now() Timestamp {
wallNow := time.Now().UnixNano()

if wallNow > c.wallTime {
c.wallTime = wallNow
c.logical = 0
} else {
c.logical++
}

return Timestamp{
Physical: c.wallTime,
Logical: c.logical,
}
}

4. TrueTime (подход Google Spanner)

Google использует атомные часы и GPS для синхронизации времени между датацентрами с гарантированной погрешностью (обычно < 10 мс).

Практическая рекомендация:

-- Комбинированный подход: sequence для порядка, now() для информации
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sequence BIGINT NOT NULL DEFAULT nextval('outbox_event_seq'),
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT now(), -- Информационное поле
status VARCHAR(20) DEFAULT 'pending'
);

-- Индекс для polling по sequence (гарантирует порядок)
CREATE INDEX idx_outbox_sequence_status ON outbox_events (sequence)
WHERE status = 'pending';
func (r *OutboxRelay) processBatch(ctx context.Context) error {
var events []OutboxEvent
err := r.db.SelectContext(ctx, &events,
`SELECT * FROM outbox_events
WHERE status = 'pending'
ORDER BY sequence ASC -- Порядок гарантирован sequence
LIMIT $1
FOR UPDATE SKIP LOCKED`,
r.batchSize)
// ...
}

Вывод:

  • now() не гарантирует корректный порядок при высокой нагрузке
  • sequence — надёжный способ обеспечить строгий порядок
  • created_at можно использовать как информационное поле, но не для сортировки
  • Для распределённых систем рассмотреть HLC или аналогичные подходы

Вопрос 10. Насколько актуально использовать Envoy и с каким стеком он применялся?

Таймкод: 00:36:56

Ответ собеседника: Правильный. Envoy использовался как gRPC Proxy. Кандидат считает, что Envoy актуален в связке с Go и gRPC, но не стал бы его использовать там, где этот стек не применяется (например, на Java-стеке подойдёт что-то вроде Nginx). С Kubernetes и Redis не работал. Упомянул, что при некоторых нагрузках в Nats терялись сообщения, и был создан issue на GitHub, но на момент ухода из компании баг не был исправлен. Считает Kafka более проверенным временем решением по сравнению с Nats.

Правильный ответ:

Envoy Proxy — современный сервис-прокси

Envoy — это L7 прокси и коммуникационный шина, изначально разработанная Lyft, теперь часть CNCF. Стандарт de facto для service mesh.

Ключевые возможности Envoy:

  • gRPC нативная поддержка: HTTP/2, двунаправленные стримы, load balancing
  • Service Discovery: интеграция с Consul, Eureka, Kubernetes
  • Observability: метрики, трейсинг, логирование из коробки
  • Circuit Breaking: автоматическое отключение нездоровых инстансов
  • Rate Limiting: ограничение скорости запросов
  • TLS termination: централизованное управление сертификатами

Envoy как gRPC Proxy:

# envoy.yaml — пример конфигурации для gRPC
static_resources:
listeners:
- name: grpc_listener
address:
socket_address:
address: 0.0.0.0
port_value: 8080
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: grpc_service
codec_type: AUTO
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match:
prefix: "/"
route:
cluster: backend_service
timeout: 30s
http_filters:
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router

clusters:
- name: backend_service
connect_timeout: 5s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
http2_protocol_options: {} # Включаем HTTP/2 для gRPC
load_assignment:
cluster_name: backend_service
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: backend-service
port_value: 9090
circuit_breakers:
thresholds:
- priority: DEFAULT
max_connections: 1000
max_pending_requests: 1000
max_requests: 1000
max_retries: 3

Сравнение Envoy vs Nginx:

ХарактеристикаEnvoyNginx
gRPC поддержкаНативнаяОграниченная
HTTP/2ПолнаяЧастичная
Service meshДа (Istio, Linkerd)Нет
Hot reloadБез перезапускаТребует reload
ObservabilityВстроеннаяЧерез модули
Динамическая конфигурацияxDS APIОграниченная

Когда использовать Envoy:

  • Микросервисная архитектура с gRPC
  • Service mesh (Istio, Consul Connect)
  • Необходимость динамической конфигурации
  • Высокие требования к observability

Когда Nginx достаточен:

  • Простой reverse proxy
  • Статические конфигурации
  • HTTP/1.1 нагрузка
  • Меньшая сложность эксплуатации

Kafka vs NATS:

Кандидат правильно отметил, что Kafka — более зрелое решение:

ХарактеристикаKafkaNATS
Гарантия доставкиAt-least-once, Exactly-onceAt-most-once, At-least-once (JetStream)
ПерсистентностьДаJetStream — да, классический — нет
СложностьВысокаяНизкая
ПроизводительностьВысокаяОчень высокая
ЭкосистемаОгромнаяРастущая

Для критичных систем, где потеря сообщений недопустима, Kafka — более надёжный выбор благодаря персистентности и проверенным механизмам гарантии доставки.

Вопрос 11. Какие типы продуктов разрабатывались — SaaS или on-premise? Как разворачивались сервисы?

Таймкод: 00:41:57

Ответ собеседника: Правильный. В основном разрабатывались SaaS-продукты (облачные решения), а не on-premise. Kafka разворачивалась в облаке (Amazon MSK). Кандидат сам не занимался разворачиванием Kafka — это делали другие специалисты. Локально всё было на Vagrant.

Правильный ответ:

SaaS vs On-Premise — ключевые различия:

SaaS (Software as a Service):

  • Развёртывание в облаке (AWS, GCP, Azure)
  • Мультитенантность
  • Автоматическое масштабирование
  • Управляемые сервисы (managed services)
  • Непрерывные обновления

On-Premise:

  • Установка на серверах клиента
  • Каждый клиент — отдельный инстанс
  • Сложности с обновлениями
  • Требования к безопасности и изоляции

Amazon MSK (Managed Streaming for Kafka):

Управляемый сервис Kafka от AWS, который берёт на себя:

  • Управление кластером
  • Мониторинг и алертинг
  • Автоматическое масштабирование
  • Резервное копирование
  • Обновления и патчи

Типичный стек SaaS-продукта на Go:

┌─────────────────────────────────────────────────────────────┐
│ Cloud (AWS) │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ API │ │ Auth │ │ Order │ │ Payment │ │
│ │ Gateway │ │ Service │ │ Service │ │ Service │ │
│ │ (Envoy) │ │ (Go) │ │ (Go) │ │ (Go) │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │
│ └──────────────┴──────────────┴──────────────┘ │
│ │ │
│ ┌─────────┴─────────┐ │
│ │ Amazon MSK │ │
│ │ (Kafka) │ │
│ └───────────────────┘ │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ RDS │ │ ElastiCache│ │ S3 │ │
│ │(Postgres)│ │ (Redis) │ │ │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────┘

Локальная разработка с Vagrant:

Vagrant позволяет создавать воспроизводимые окружения:

# Vagrantfile
Vagrant.configure("2") do |config|
config.vm.box = "ubuntu/focal64"

config.vm.provider "virtualbox" do |vb|
vb.memory = "4096"
vb.cpus = 2
end

config.vm.provision "docker" do |d|
d.run "kafka",
image: "confluentinc/cp-kafka:latest",
args: "-p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092"

d.run "postgres",
image: "postgres:15",
args: "-p 5432:5432 -e POSTGRES_PASSWORD=secret"

d.run "redis",
image: "redis:7",
args: "-p 6379:6379"
end
end

Современные альтернативы Vagrant:

  • Docker Compose — для локальной разработки
  • Tilt — для разработки в Kubernetes
  • DevSpace — для отладки в кластере
  • Telepresence — для подключения локального сервиса к кластеру

Ответ собеседника показывает опыт работы с SaaS-продуктами и понимание разделения ответственности: разработчики пишут код, а инфраструктурой занимаются специалисты (SRE, DevOps). Это типичная и зрелая модель для компаний с развитой инженерной культурой.

Вопрос 12. Предложите архитектуру системы сбора продуктовых метрик (ETL/ELT)

Таймкод: 00:45:28

Ответ собеседника: Правильный. Предложил следующую архитектуру: события от внутренних и внешних сервисов поступают через входящие эндпоинты (API) для универсальности и лёгкого подключения новых сервисов. Далее события загружаются в Kafka как промежуточный брокер. Затем с помощью Kafka Streams данные трансформируются и передаются аналитикам. Аналитики работают с OLAP-решениями (ClickHouse, Cassandra). Kafka Streams выполняет роль ETL — преобразование данных перед загрузкой в целевое хранилище. Схема: Сервисы → API → Kafka → Kafka Streams (ETL) → ClickHouse/Cassandra → Аналитики.

Правильный ответ:

Предложенная архитектура корректна и покрывает основные компоненты. Дополним деталями и альтернативами.

Полная архитектура системы сбора метрик:

┌─────────────────────────────────────────────────────────────────────────────┐
│ Источники данных │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Web App │ │Mobile App│ │ Backend │ │ External │ │ IoT │ │
│ │ │ │ │ │ Services │ │ APIs │ │ Devices │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │ │
└───────┼─────────────┼─────────────┼─────────────┼─────────────┼────────────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Ingestion Layer │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ API Gateway / Collector │ │
│ │ (REST, gRPC, Webhook endpoints) │ │
│ └─────────────────────────────────┬───────────────────────────────────┘ │
│ │ │
└────────────────────────────────────┼───────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────────────────┐
│ Message Broker │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Apache Kafka │ │
│ │ │ │
│ │ Topics: │ │
│ │ - raw-events (все сырые события) │ │
│ │ - user-actions (действия пользователей) │ │
│ │ - system-metrics (метрики систем) │ │
│ │ - business-events (бизнес-события) │ │
│ └─────────────────────────────────┬───────────────────────────────────┘ │
│ │ │
└────────────────────────────────────┼───────────────────────────────────────┘

┌────────────────┼────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Processing Layer (ETL/ELT) │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Kafka Streams│ │ Apache Flink │ │ Custom Go │ │
│ │ │ │ │ │ Workers │ │
│ │ - Validation │ │ - Complex │ │ - Enrichment │ │
│ │ - Filtering │ │ windowing │ │ - Transform │ │
│ │ - Enrichment │ │ - Aggregation│ │ - Routing │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
└─────────┼───────────────────┼───────────────────┼──────────────────────────┘
│ │ │
└───────────────────┼───────────────────┘


┌─────────────────────────────────────────────────────────────────────────────┐
│ Storage Layer │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ ClickHouse │ │ Apache │ │ Redis │ │
│ │ │ │ Druid │ │ │ │
│ │ - Analytics │ │ - Real-time │ │ - Caching │ │
│ │ - Dashboards │ │ analytics │ │ - Hot data │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────┬───────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────────────────┐
│ Consumption Layer │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Grafana │ │ Superset │ │ Jupyter │ │
│ │ │ │ │ │ Notebooks │ │
│ │ - Dashboards │ │ - Ad-hoc │ │ - ML Models │ │
│ │ - Alerts │ │ queries │ │ - Research │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘

Реализация ETL на Go вместо Kafka Streams:

// Event представляет сырое событие
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Source string `json:"source"`
Timestamp time.Time `json:"timestamp"`
UserID string `json:"user_id"`
Payload map[string]interface{} `json:"payload"`
}

// ProcessedEvent представляет обработанное событие
type ProcessedEvent struct {
Event
ProcessedAt time.Time `json:"processed_at"`
SessionID string `json:"session_id"`
GeoLocation string `json:"geo_location"`
DeviceType string `json:"device_type"`
}

// Transformer преобразует сырые события
type Transformer struct {
geoResolver *GeoResolver
deviceParser *DeviceParser
sessionTracker *SessionTracker
}

func (t *Transformer) Transform(ctx context.Context, raw *Event) (*ProcessedEvent, error) {
// 1. Валидация
if err := t.validate(raw); err != nil {
return nil, fmt.Errorf("validation failed: %w", err)
}

// 2. Обогащение данных
geoLocation, err := t.geoResolver.Resolve(ctx, raw.Payload["ip"].(string))
if err != nil {
log.Printf("geo resolution failed: %v", err)
}

deviceType := t.deviceParser.Parse(raw.Payload["user_agent"].(string))

sessionID := t.sessionTracker.GetOrCreateSession(raw.UserID, raw.Timestamp)

return &ProcessedEvent{
Event: *raw,
ProcessedAt: time.Now(),
SessionID: sessionID,
GeoLocation: geoLocation,
DeviceType: deviceType,
}, nil
}

func (t *Transformer) validate(event *Event) error {
if event.ID == "" {
return fmt.Errorf("event ID is required")
}
if event.Type == "" {
return fmt.Errorf("event type is required")
}
if event.Timestamp.IsZero() {
return fmt.Errorf("timestamp is required")
}
if event.Timestamp.After(time.Now().Add(time.Hour)) {
return fmt.Errorf("timestamp is in the future")
}
return nil
}

// Consumer обрабатывает события из Kafka
type Consumer struct {
reader *kafka.Reader
transformer *Transformer
clickhouse *sqlx.DB
batchSize int
batch []*ProcessedEvent
}

func (c *Consumer) Start(ctx context.Context) error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return c.flush(ctx)
case <-ticker.C:
if err := c.flush(ctx); err != nil {
log.Printf("flush error: %v", err)
}
default:
msg, err := c.reader.ReadMessage(ctx)
if err != nil {
return fmt.Errorf("read message: %w", err)
}

var raw Event
if err := json.Unmarshal(msg.Value, &raw); err != nil {
log.Printf("unmarshal error: %v", err)
continue
}

processed, err := c.transformer.Transform(ctx, &raw)
if err != nil {
log.Printf("transform error: %v", err)
continue
}

c.batch = append(c.batch, processed)
if len(c.batch) >= c.batchSize {
if err := c.flush(ctx); err != nil {
log.Printf("flush error: %v", err)
}
}
}
}
}

func (c *Consumer) flush(ctx context.Context) error {
if len(c.batch) == 0 {
return nil
}

tx, err := c.clickhouse.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

stmt, err := tx.PrepareContext(ctx, `
INSERT INTO events (id, type, source, timestamp, user_id,
processed_at, session_id, geo_location, device_type, payload)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`)
if err != nil {
return err
}
defer stmt.Close()

for _, event := range c.batch {
payload, _ := json.Marshal(event.Payload)
_, err := stmt.ExecContext(ctx,
event.ID, event.Type, event.Source, event.Timestamp, event.UserID,
event.ProcessedAt, event.SessionID, event.GeoLocation, event.DeviceType, payload)
if err != nil {
log.Printf("insert error: %v", err)
}
}

c.batch = c.batch[:0]
return tx.Commit()
}

ClickHouse схема для аналитики:

-- Основная таблица событий
CREATE TABLE events (
id String,
type LowCardinality(String),
source LowCardinality(String),
timestamp DateTime64(3),
user_id String,
processed_at DateTime64(3),
session_id String,
geo_location LowCardinality(String),
device_type LowCardinality(String),
payload String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (type, timestamp, user_id)
TTL timestamp + INTERVAL 2 YEAR;

-- Агрегированная таблица для быстрых запросов
CREATE MATERIALIZED VIEW events_hourly
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (type, source, hour)
AS SELECT
toStartOfHour(timestamp) as hour,
type,
source,
count() as event_count,
uniq(user_id) as unique_users
FROM events
GROUP BY hour, type, source;

ETL vs ELT:

ПодходОписаниеКогда использовать
ETLTransform → Load → QueryСложные трансформации, legacy системы
ELTLoad → Transform → QueryСовременные хранилища (ClickHouse, BigQuery), гибкость

Для ClickHouse предпочтительнее ELT — загружаем сырые данные, трансформируем через materialized views.

Ответ собеседника демонстрирует понимание классической архитектуры аналитических систем и правильный выбор технологий для каждого слоя.