Skip to content

Журнал операций

Журнал операций (operations log) — это полный аудит-лог всех действий с абонентами, пакетами и подписками в Catena. Каждое изменение записывается с временной меткой, что позволяет отслеживать историю и производить расчёты для биллинга.

Что такое операция

Операция — это запись о конкретном действии, выполненном в системе. Каждая операция содержит информацию о том, что было сделано, когда и с какими объектами.

Зачем нужен журнал операций:

  • 💰 Расчёт доходов — подсчёт созданных и отменённых подписок для биллинга
  • 📊 Аудит действий — кто, что и когда делал в системе
  • 🔍 Отладка проблем — история изменений для выявления причин
  • 📈 Аналитика бизнеса — метрики роста, churn rate, популярные пакеты
  • 🔐 Безопасность — отслеживание подозрительных действий
  • 📝 Compliance — доказательства для регуляторов и аудиторов

Что записывается:

Создание подписки → Операция createPackageSubscriber
    ↓
Начало биллингового периода
    ↓
Отмена подписки → Операция deletePackageSubscriber
    ↓
Расчёт стоимости = (дата отмены - дата создания) × цена

Типы операций

Операции с абонентами

autoCreateSubscriber — автоматическое создание абонента

  • Происходит при первом входе через SMS (если включена автоматическая регистрация)
  • Система создаёт учётную запись "на лету"
  • Может автоматически подключить бесплатные пакеты

createSubscriber — ручное создание абонента

  • Администратор или биллинг создали учётную запись
  • Через веб-интерфейс или Management API
  • Обычно с последующим подключением пакетов

deleteSubscriber — удаление абонента

  • Полное удаление учётной записи
  • Автоматически отменяет все подписки
  • Необратимое действие

disableSubscriber — блокировка абонента

  • Временная блокировка доступа
  • Подписки сохраняются, но доступ к просмотру блокируется
  • Используется при неоплате, нарушениях правил

enableSubscriber — разблокировка абонента

  • Восстановление доступа после блокировки
  • Подписки остаются активными

Операции с подписками

createPackageSubscriber — создание подписки

  • Подключение пакета к абоненту
  • Ключевая операция для биллинга — начало оплачиваемого периода
  • Записывается дата начала подписки

deletePackageSubscriber — удаление подписки

  • Отключение пакета от абонента
  • Ключевая операция для биллинга — конец оплачиваемого периода
  • Используется для расчёта стоимости

Операции с пакетами

createPackage — создание пакета

  • Создан новый тарифный план
  • Аудит для отслеживания изменений в продуктовой линейке

deletePackage — удаление пакета

  • Пакет удалён из системы
  • Все подписки на него должны быть предварительно отменены

Структура операции

Основные поля

operationId — уникальный идентификатор операции

  • Формат: base64-кодированный Snowflake ID
  • Пример: oKl9SW3AAAE.
  • Генерируется автоматически при создании записи

type — тип операции

  • Один из предопределённых типов (см. выше)
  • Используется для фильтрации и группировки
  • Пример: createPackageSubscriber

portalId — идентификатор портала

  • К какому порталу относится операция
  • Используется для изоляции данных между порталами
  • Пример: pKl9SW3AAAE.

subscriberId — идентификатор абонента (опционально)

  • Присутствует в операциях, связанных с абонентами
  • Null для операций с пакетами
  • Пример: sKl9SW3AAAE.

packageId — идентификатор пакета (опционально)

  • Присутствует в операциях, связанных с пакетами
  • Null для операций создания/удаления абонентов
  • Пример: pkKl9SW3AAAE.

createdAt — время создания операции

  • Формат: ISO 8601 timestamp
  • Пример: 2024-10-16T10:00:00Z
  • Ключевое поле для биллинга — точная дата события

updatedAt — время последнего обновления

  • Обычно совпадает с createdAt
  • Может отличаться, если операция была изменена
  • Пример: 2024-10-16T10:00:00Z

payload — дополнительные данные операции

  • JSON объект с деталями операции
  • Содержимое зависит от типа операции
  • Примеры:
  • {"subscriberId": "sKl9SW3AAAE.", "name": "John Doe"}
  • {"packageId": "pkKl9SW3AAAE.", "packageName": "premium"}

Получение списка операций

Базовый запрос

Получить все операции:

curl -X GET https://your-catena-domain.com/tv-management/api/v1/operations \
  -H "X-Auth-Token: your-api-key"

Ответ:

{
  "operations": [
    {
      "operationId": "opKl9SW3AAAE.",
      "type": "createSubscriber",
      "portalId": "pKl9SW3AAAE.",
      "subscriberId": "sKl9SW3AAAE.",
      "packageId": null,
      "createdAt": "2024-10-16T10:00:00Z",
      "updatedAt": "2024-10-16T10:00:00Z",
      "payload": {
        "name": "Иван Петров",
        "phone": "+79161234567"
      }
    },
    {
      "operationId": "opKl9SW3AAAB.",
      "type": "createPackageSubscriber",
      "portalId": "pKl9SW3AAAE.",
      "subscriberId": "sKl9SW3AAAE.",
      "packageId": "pkKl9SW3AAAE.",
      "createdAt": "2024-10-16T10:05:00Z",
      "updatedAt": "2024-10-16T10:05:00Z",
      "payload": {
        "packageName": "premium"
      }
    }
  ],
  "next": "cursor-for-next-page"
}

Пагинация

Для больших объёмов используется cursor-based пагинация:

curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?cursor=cursor-for-next-page" \
  -H "X-Auth-Token: your-api-key"

Фильтрация операций

По абоненту

Все операции конкретного абонента:

curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?subscriberId=sKl9SW3AAAE." \
  -H "X-Auth-Token: your-api-key"

Применение:

  • История изменений учётной записи
  • Аудит действий с абонентом
  • Расчёт стоимости подписок абонента

Несколько абонентов:

curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?subscriberId=sKl9SW3AAAE.&subscriberId=sKl9SW3AAAB." \
  -H "X-Auth-Token: your-api-key"

По пакету

Все операции с конкретным пакетом:

curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?packageId=pkKl9SW3AAAE." \
  -H "X-Auth-Token: your-api-key"

Применение:

  • Подсчёт активаций пакета
  • Анализ популярности тарифного плана
  • Расчёт доходов от конкретного пакета

По типу операции

Только создания подписок:

curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?type=createPackageSubscriber" \
  -H "X-Auth-Token: your-api-key"

Только удаления подписок:

curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?type=deletePackageSubscriber" \
  -H "X-Auth-Token: your-api-key"

Несколько типов:

curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?type=createPackageSubscriber&type=deletePackageSubscriber" \
  -H "X-Auth-Token: your-api-key"

Применение:

  • Фокусировка на операциях, влияющих на биллинг
  • Подсчёт новых подписок (создания)
  • Анализ оттока (удаления)

По времени

Операции после определённой даты:

curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?created_at_gte=2024-10-01" \
  -H "X-Auth-Token: your-api-key"

Операции до определённой даты:

curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?created_at_lt=2024-11-01" \
  -H "X-Auth-Token: your-api-key"

Операции за период (месяц):

curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?created_at_gte=2024-10-01&created_at_lt=2024-11-01" \
  -H "X-Auth-Token: your-api-key"

Применение:

  • Ежемесячные отчёты
  • Расчёты за биллинговый период
  • Анализ трендов по времени

Формат даты:

  • ISO 8601 format: YYYY-MM-DD или YYYY-MM-DDTHH:MM:SSZ
  • Примеры: 2024-10-01, 2024-10-16T10:00:00Z
  • Timezone: UTC

Расчёты для биллинга

Подсчёт доходов за период

Задача: Рассчитать доход за октябрь 2024

Логика расчёта:

  1. Получить все операции createPackageSubscriber за октябрь
  2. Для каждой подписки определить длительность
  3. Умножить на стоимость пакета
  4. Суммировать

Скрипт расчёта:

import requests
from datetime import datetime, timedelta
from collections import defaultdict

API_URL = "https://catena.example.com/tv-management/api/v1"
API_KEY = "your-api-key"

# Цены пакетов (храните в вашей биллинговой БД)
PACKAGE_PRICES = {
    "basicKl9SW3AAAE.": 10.0,      # $10/месяц
    "premiumKl9SW3AAAE.": 20.0,    # $20/месяц
    "sportKl9SW3AAAE.": 15.0       # $15/месяц
}

def calculate_monthly_revenue(year, month):
    """
    Рассчитать доход за месяц на основе операций
    """
    # Границы месяца
    start_date = f"{year}-{month:02d}-01"
    if month == 12:
        end_date = f"{year + 1}-01-01"
    else:
        end_date = f"{year}-{month + 1:02d}-01"

    # 1. Получить все создания подписок за месяц
    subscriptions = get_operations(
        type="createPackageSubscriber",
        created_at_gte=start_date,
        created_at_lt=end_date
    )

    # 2. Получить все удаления подписок
    cancellations = get_operations(
        type="deletePackageSubscriber",
        created_at_gte=start_date,
        created_at_lt=end_date
    )

    # 3. Сгруппировать по пакетам
    revenue_by_package = defaultdict(lambda: {
        'created': 0,
        'cancelled': 0,
        'revenue': 0.0
    })

    # Подсчитать созданные подписки
    for op in subscriptions:
        package_id = op['packageId']
        price = PACKAGE_PRICES.get(package_id, 0)

        revenue_by_package[package_id]['created'] += 1
        revenue_by_package[package_id]['revenue'] += price

    # Подсчитать отменённые (возвраты)
    for op in cancellations:
        package_id = op['packageId']
        revenue_by_package[package_id]['cancelled'] += 1

    # 4. Итоговый отчёт
    total_revenue = 0
    report = []

    for package_id, stats in revenue_by_package.items():
        package_name = get_package_name(package_id)
        total_revenue += stats['revenue']

        report.append({
            'package': package_name,
            'created': stats['created'],
            'cancelled': stats['cancelled'],
            'net_growth': stats['created'] - stats['cancelled'],
            'revenue': stats['revenue']
        })

    return {
        'total_revenue': total_revenue,
        'packages': sorted(report, key=lambda x: x['revenue'], reverse=True)
    }

def get_operations(**filters):
    """Получить операции с фильтрами и пагинацией"""
    operations = []
    cursor = None

    while True:
        # Построить URL с параметрами
        params = []
        for key, value in filters.items():
            if isinstance(value, list):
                for v in value:
                    params.append(f"{key}={v}")
            else:
                params.append(f"{key}={value}")

        if cursor:
            params.append(f"cursor={cursor}")

        url = f"{API_URL}/operations"
        if params:
            url += "?" + "&".join(params)

        response = requests.get(url, headers={"X-Auth-Token": API_KEY})
        data = response.json()

        operations.extend(data['operations'])

        cursor = data.get('next')
        if not cursor:
            break

    return operations

# Пример использования
revenue = calculate_monthly_revenue(2024, 10)

print(f"Доход за октябрь 2024: ${revenue['total_revenue']:.2f}")
print("\nПо пакетам:")
for pkg in revenue['packages']:
    print(f"  {pkg['package']}: {pkg['created']} активаций, ${pkg['revenue']:.2f}")
    print(f"    Отмен: {pkg['cancelled']}, Чистый рост: {pkg['net_growth']}")

Расчёт пропорциональной оплаты (pro-rata)

Задача: Абонент подключил пакет 15-го числа, а месяц закончился 30-го. Сколько взимать?

Решение:

from datetime import datetime, timedelta

def calculate_prorata_billing(subscriber_id, start_date, end_date):
    """
    Рассчитать пропорциональную стоимость подписок
    """
    # Получить все операции абонента за период
    operations = get_operations(
        subscriberId=subscriber_id,
        created_at_gte=start_date.isoformat(),
        created_at_lt=end_date.isoformat()
    )

    # Отфильтровать операции с подписками
    subscriptions = {}  # package_id: (create_date, delete_date or None)

    for op in operations:
        package_id = op['packageId']
        op_date = datetime.fromisoformat(op['createdAt'].replace('Z', '+00:00'))

        if op['type'] == 'createPackageSubscriber':
            if package_id not in subscriptions:
                subscriptions[package_id] = {'created': op_date, 'deleted': None}

        elif op['type'] == 'deletePackageSubscriber':
            if package_id in subscriptions:
                subscriptions[package_id]['deleted'] = op_date

    # Рассчитать стоимость для каждой подписки
    total_cost = 0.0
    billing_details = []

    for package_id, dates in subscriptions.items():
        created = dates['created']
        deleted = dates['deleted'] or end_date

        # Количество дней использования
        days_used = (deleted - created).days

        # Стоимость пакета в день
        monthly_price = PACKAGE_PRICES.get(package_id, 0)
        daily_price = monthly_price / 30

        # Пропорциональная стоимость
        cost = daily_price * days_used
        total_cost += cost

        billing_details.append({
            'package_id': package_id,
            'package_name': get_package_name(package_id),
            'created': created.isoformat(),
            'deleted': deleted.isoformat(),
            'days': days_used,
            'monthly_price': monthly_price,
            'cost': round(cost, 2)
        })

    return {
        'subscriber_id': subscriber_id,
        'period': f"{start_date.date()} - {end_date.date()}",
        'total_cost': round(total_cost, 2),
        'items': billing_details
    }

# Пример: расчёт за октябрь
bill = calculate_prorata_billing(
    subscriber_id="sKl9SW3AAAE.",
    start_date=datetime(2024, 10, 1),
    end_date=datetime(2024, 11, 1)
)

print(f"Абонент: {bill['subscriber_id']}")
print(f"Период: {bill['period']}")
print(f"К оплате: ${bill['total_cost']}")
print("\nДетализация:")
for item in bill['items']:
    print(f"  {item['package_name']}: {item['days']} дней × ${item['monthly_price']}/30 = ${item['cost']}")

Подсчёт метрик роста

Задача: Рассчитать MRR (Monthly Recurring Revenue) и churn rate

def calculate_growth_metrics(year, month):
    """Метрики роста бизнеса"""

    start_date = f"{year}-{month:02d}-01"
    if month == 12:
        end_date = f"{year + 1}-01-01"
    else:
        end_date = f"{year}-{month + 1:02d}-01"

    # Получить операции за месяц
    creates = get_operations(
        type="createPackageSubscriber",
        created_at_gte=start_date,
        created_at_lt=end_date
    )

    deletes = get_operations(
        type="deletePackageSubscriber",
        created_at_gte=start_date,
        created_at_lt=end_date
    )

    # Подсчитать метрики
    new_subscriptions = len(creates)
    cancelled_subscriptions = len(deletes)

    # MRR (упрощённо - без учёта разных цен пакетов)
    avg_package_price = 15.0  # средняя цена пакета
    new_mrr = new_subscriptions * avg_package_price
    lost_mrr = cancelled_subscriptions * avg_package_price
    net_mrr_change = new_mrr - lost_mrr

    # Churn rate
    # Для точного расчёта нужно знать количество активных подписок на начало месяца
    active_subscriptions_start = get_active_subscriptions_count(start_date)
    churn_rate = (cancelled_subscriptions / active_subscriptions_start * 100) if active_subscriptions_start > 0 else 0

    return {
        'month': f"{year}-{month:02d}",
        'new_subscriptions': new_subscriptions,
        'cancelled_subscriptions': cancelled_subscriptions,
        'net_growth': new_subscriptions - cancelled_subscriptions,
        'new_mrr': new_mrr,
        'lost_mrr': lost_mrr,
        'net_mrr_change': net_mrr_change,
        'churn_rate': round(churn_rate, 2)
    }

# Отчёт за октябрь 2024
metrics = calculate_growth_metrics(2024, 10)

print(f"Метрики за {metrics['month']}:")
print(f"  Новых подписок: {metrics['new_subscriptions']}")
print(f"  Отменено: {metrics['cancelled_subscriptions']}")
print(f"  Чистый рост: {metrics['net_growth']}")
print(f"  Новый MRR: ${metrics['new_mrr']}")
print(f"  Потерянный MRR: ${metrics['lost_mrr']}")
print(f"  Изменение MRR: ${metrics['net_mrr_change']:+.2f}")
print(f"  Churn rate: {metrics['churn_rate']}%")

Типичные сценарии использования

Сценарий 1: Ежемесячный биллинговый отчёт

Задача: Подготовить счета для всех абонентов за месяц

Решение:

import csv
from datetime import datetime

def generate_monthly_invoices(year, month, output_csv='invoices.csv'):
    """Генерация счетов на основе журнала операций"""

    start_date = datetime(year, month, 1)
    if month == 12:
        end_date = datetime(year + 1, 1, 1)
    else:
        end_date = datetime(year, month + 1, 1)

    # Получить всех абонентов
    response = requests.get(
        f"{API_URL}/subscribers",
        headers={"X-Auth-Token": API_KEY}
    )
    subscribers = response.json()['subscribers']

    # Для каждого абонента рассчитать стоимость
    invoices = []

    for subscriber in subscribers:
        subscriber_id = subscriber['subscriberId']

        # Рассчитать pro-rata биллинг
        bill = calculate_prorata_billing(
            subscriber_id,
            start_date,
            end_date
        )

        if bill['total_cost'] > 0:
            invoices.append({
                'subscriber_id': subscriber_id,
                'subscriber_name': subscriber['name'],
                'phone': f"+{subscriber['phoneCountryCode']}{subscriber['phone']}",
                'amount': bill['total_cost'],
                'details': bill['items']
            })

    # Сохранить в CSV
    with open(output_csv, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=['subscriber_id', 'subscriber_name', 'phone', 'amount'])
        writer.writeheader()

        for invoice in invoices:
            writer.writerow({
                'subscriber_id': invoice['subscriber_id'],
                'subscriber_name': invoice['subscriber_name'],
                'phone': invoice['phone'],
                'amount': f"${invoice['amount']:.2f}"
            })

    total_revenue = sum(inv['amount'] for inv in invoices)

    return {
        'invoices_count': len(invoices),
        'total_revenue': total_revenue,
        'file': output_csv
    }

# Генерация счетов за октябрь
result = generate_monthly_invoices(2024, 10)

print(f"Создано счетов: {result['invoices_count']}")
print(f"Общий доход: ${result['total_revenue']:.2f}")
print(f"Файл: {result['file']}")

Сценарий 2: Выявление изменений подписок

Задача: Найти всех абонентов, которые изменили подписки в октябре

Применение:

  • Email рассылка о изменениях
  • Анализ причин отмены
  • Retention кампании
def find_subscription_changes(year, month):
    """Найти изменения подписок"""

    start_date = f"{year}-{month:02d}-01"
    if month == 12:
        end_date = f"{year + 1}-01-01"
    else:
        end_date = f"{year}-{month + 1:02d}-01"

    # Получить операции с подписками
    operations = get_operations(
        type=["createPackageSubscriber", "deletePackageSubscriber"],
        created_at_gte=start_date,
        created_at_lt=end_date
    )

    # Группировка по абонентам
    changes = defaultdict(lambda: {'added': [], 'removed': []})

    for op in operations:
        subscriber_id = op['subscriberId']
        package_id = op['packageId']
        package_name = get_package_name(package_id)

        if op['type'] == 'createPackageSubscriber':
            changes[subscriber_id]['added'].append({
                'package': package_name,
                'date': op['createdAt']
            })
        else:
            changes[subscriber_id]['removed'].append({
                'package': package_name,
                'date': op['createdAt']
            })

    # Результат
    results = {
        'upgrades': [],      # Добавили пакеты
        'downgrades': [],    # Удалили пакеты
        'churned': []        # Удалили все подписки
    }

    for subscriber_id, activity in changes.items():
        subscriber = get_subscriber(subscriber_id)

        if activity['added'] and not activity['removed']:
            # Только добавления - апгрейд
            results['upgrades'].append({
                'subscriber': subscriber,
                'packages': activity['added']
            })
        elif activity['removed'] and not activity['added']:
            # Только удаления - возможно churn
            if len(activity['removed']) >= len(subscriber['packages']):
                results['churned'].append({
                    'subscriber': subscriber,
                    'packages': activity['removed']
                })
            else:
                results['downgrades'].append({
                    'subscriber': subscriber,
                    'packages': activity['removed']
                })
        else:
            # И добавления и удаления - изменение плана
            pass

    return results

# Анализ изменений за октябрь
changes = find_subscription_changes(2024, 10)

print(f"Апгрейды: {len(changes['upgrades'])}")
print(f"Даунгрейды: {len(changes['downgrades'])}")
print(f"Полностью отменили: {len(changes['churned'])}")

# Email рассылка для churn prevention
for churned in changes['churned']:
    send_retention_email(churned['subscriber'], churned['packages'])

Сценарий 3: Аудит действий

Задача: Проверить, кто и когда изменял подписку абонента

def audit_subscriber_history(subscriber_id):
    """Полная история изменений абонента"""

    # Получить все операции
    operations = get_operations(subscriberId=subscriber_id)

    # Сортировка по времени
    operations.sort(key=lambda x: x['createdAt'])

    # Форматированный вывод
    print(f"История абонента {subscriber_id}:\n")

    for op in operations:
        timestamp = op['createdAt']
        op_type = op['type']

        if op_type == 'createSubscriber':
            print(f"{timestamp} - Создан абонент")
            print(f"  Имя: {op['payload'].get('name')}")
            print(f"  Телефон: {op['payload'].get('phone')}")

        elif op_type == 'createPackageSubscriber':
            package_name = get_package_name(op['packageId'])
            print(f"{timestamp} - Подключен пакет: {package_name}")

        elif op_type == 'deletePackageSubscriber':
            package_name = get_package_name(op['packageId'])
            print(f"{timestamp} - Отключен пакет: {package_name}")

        elif op_type == 'disableSubscriber':
            print(f"{timestamp} - Абонент заблокирован")
            print(f"  Причина: {op['payload'].get('reason', 'не указана')}")

        elif op_type == 'enableSubscriber':
            print(f"{timestamp} - Абонент разблокирован")

        elif op_type == 'deleteSubscriber':
            print(f"{timestamp} - Абонент удалён")

        print()

# Пример
audit_subscriber_history("sKl9SW3AAAE.")

Сценарий 4: Reconciliation с внешним биллингом

Задача: Сверить данные Catena с внешней биллинговой системой

def reconcile_with_billing(year, month):
    """Сверка с внешним биллингом"""

    start_date = f"{year}-{month:02d}-01"
    if month == 12:
        end_date = f"{year + 1}-01-01"
    else:
        end_date = f"{year}-{month + 1:02d}-01"

    # 1. Получить операции из Catena
    catena_operations = get_operations(
        type=["createPackageSubscriber", "deletePackageSubscriber"],
        created_at_gte=start_date,
        created_at_lt=end_date
    )

    # 2. Получить транзакции из биллинга
    billing_transactions = get_billing_transactions(start_date, end_date)

    # 3. Сравнить
    discrepancies = []

    # Проверить, есть ли в Catena все транзакции из биллинга
    for transaction in billing_transactions:
        if transaction['type'] == 'subscription_create':
            # Найти соответствующую операцию в Catena
            found = any(
                op['subscriberId'] == transaction['subscriber_id'] and
                op['packageId'] == transaction['package_id'] and
                abs((datetime.fromisoformat(op['createdAt'].replace('Z', '+00:00')) - 
                     transaction['date']).total_seconds()) < 300  # 5 минут допуск
                for op in catena_operations
                if op['type'] == 'createPackageSubscriber'
            )

            if not found:
                discrepancies.append({
                    'type': 'missing_in_catena',
                    'billing_transaction': transaction,
                    'severity': 'high'
                })

    # Проверить обратное — есть ли в биллинге все операции из Catena
    for op in catena_operations:
        if op['type'] == 'createPackageSubscriber':
            found = any(
                t['subscriber_id'] == op['subscriberId'] and
                t['package_id'] == op['packageId']
                for t in billing_transactions
                if t['type'] == 'subscription_create'
            )

            if not found:
                discrepancies.append({
                    'type': 'missing_in_billing',
                    'catena_operation': op,
                    'severity': 'medium'
                })

    return discrepancies

# Reconciliation за октябрь
discrepancies = reconcile_with_billing(2024, 10)

if discrepancies:
    print(f"⚠️  Найдено {len(discrepancies)} расхождений:")
    for d in discrepancies:
        print(f"  - {d['type']}: severity={d['severity']}")
else:
    print("✅ Данные синхронизированы, расхождений нет")

Лучшие практики

Регулярный экспорт данных

Рекомендация: Ежедневно экспортируйте операции в свою БД для долгосрочного хранения

import psycopg2
from datetime import datetime, timedelta

def export_operations_to_db():
    """Экспорт операций в PostgreSQL"""

    # Подключение к БД
    conn = psycopg2.connect("postgresql://billing_db")
    cursor = conn.cursor()

    # Создать таблицу (если не существует)
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS catena_operations (
            operation_id TEXT PRIMARY KEY,
            type TEXT NOT NULL,
            portal_id TEXT,
            subscriber_id TEXT,
            package_id TEXT,
            created_at TIMESTAMP NOT NULL,
            payload JSONB
        )
    ''')

    # Получить операции за последние 24 часа
    yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')

    operations = get_operations(created_at_gte=yesterday)

    # Вставить в БД
    inserted = 0
    for op in operations:
        try:
            cursor.execute('''
                INSERT INTO catena_operations 
                (operation_id, type, portal_id, subscriber_id, package_id, created_at, payload)
                VALUES (%s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (operation_id) DO NOTHING
            ''', (
                op['operationId'],
                op['type'],
                op['portalId'],
                op.get('subscriberId'),
                op.get('packageId'),
                op['createdAt'],
                json.dumps(op.get('payload', {}))
            ))
            inserted += 1
        except Exception as e:
            print(f"Error inserting {op['operationId']}: {e}")

    conn.commit()
    conn.close()

    return inserted

# Запускать ежедневно по cron
exported = export_operations_to_db()
print(f"Exported {exported} operations to database")

Использование в биллинговой системе

Архитектура:

Catena Operations Log ←→ Биллинговая система
         ↓                      ↓
   Факт события          Финансовый расчёт
   (подписка)                (счёт)

Workflow:

  1. Событие в Catena — создание/удаление подписки
  2. Webhook или polling — биллинг узнаёт о событии
  3. Запись в биллинг — транзакция в биллинговой БД
  4. Расчёт стоимости — на основе тарифов и периода
  5. Выставление счёта — абоненту или партнёру

Пример webhook от Catena:

from flask import Flask, request
import requests

app = Flask(__name__)

@app.route('/catena-webhook', methods=['POST'])
def catena_webhook():
    """Обработка событий из Catena"""

    event = request.json

    if event['type'] == 'createPackageSubscriber':
        # Новая подписка - начать биллинг
        billing_start_subscription(
            subscriber_id=event['subscriberId'],
            package_id=event['packageId'],
            start_date=event['createdAt']
        )

        return {"status": "ok", "action": "billing_started"}

    elif event['type'] == 'deletePackageSubscriber':
        # Отмена подписки - остановить биллинг
        billing_end_subscription(
            subscriber_id=event['subscriberId'],
            package_id=event['packageId'],
            end_date=event['createdAt']
        )

        return {"status": "ok", "action": "billing_stopped"}

    return {"status": "ok", "action": "ignored"}

if __name__ == '__main__':
    app.run(port=5001)

Мониторинг финансовых метрик

Дашборд для руководства:

def get_financial_dashboard():
    """Финансовый дашборд в реальном времени"""

    today = datetime.now().date()
    month_start = today.replace(day=1)

    # Метрики текущего месяца
    current_month = calculate_monthly_revenue(today.year, today.month)

    # Метрики прошлого месяца
    last_month_date = month_start - timedelta(days=1)
    last_month = calculate_monthly_revenue(
        last_month_date.year,
        last_month_date.month
    )

    # Сравнение
    revenue_growth = (
        (current_month['total_revenue'] - last_month['total_revenue']) /
        last_month['total_revenue'] * 100
    ) if last_month['total_revenue'] > 0 else 0

    # Операции за сегодня
    today_str = today.isoformat()
    tomorrow_str = (today + timedelta(days=1)).isoformat()

    today_creates = len(get_operations(
        type="createPackageSubscriber",
        created_at_gte=today_str,
        created_at_lt=tomorrow_str
    ))

    today_cancels = len(get_operations(
        type="deletePackageSubscriber",
        created_at_gte=today_str,
        created_at_lt=tomorrow_str
    ))

    return {
        'current_month_revenue': current_month['total_revenue'],
        'last_month_revenue': last_month['total_revenue'],
        'revenue_growth_percent': round(revenue_growth, 1),
        'today_new_subscriptions': today_creates,
        'today_cancellations': today_cancels,
        'today_net_growth': today_creates - today_cancels
    }

# Вывести дашборд
dashboard = get_financial_dashboard()

print("📊 Финансовый дашборд")
print(f"Доход текущего месяца: ${dashboard['current_month_revenue']:.2f}")
print(f"Доход прошлого месяца: ${dashboard['last_month_revenue']:.2f}")
print(f"Рост: {dashboard['revenue_growth_percent']:+.1f}%")
print(f"\nСегодня:")
print(f"  Новых подписок: {dashboard['today_new_subscriptions']}")
print(f"  Отмен: {dashboard['today_cancellations']}")
print(f"  Чистый рост: {dashboard['today_net_growth']:+d}")

Отчёты для бизнеса

Еженедельный отчёт для руководства

Скрипт автоматической генерации:

def generate_weekly_report():
    """Еженедельный отчёт для руководства"""

    # Последние 7 дней
    end_date = datetime.now()
    start_date = end_date - timedelta(days=7)

    operations = get_operations(
        type=["createPackageSubscriber", "deletePackageSubscriber", 
              "createSubscriber", "deleteSubscriber"],
        created_at_gte=start_date.isoformat(),
        created_at_lt=end_date.isoformat()
    )

    # Анализ
    stats = {
        'new_subscribers': 0,
        'deleted_subscribers': 0,
        'new_subscriptions': 0,
        'cancelled_subscriptions': 0,
        'packages': defaultdict(lambda: {'adds': 0, 'removes': 0})
    }

    for op in operations:
        if op['type'] == 'createSubscriber':
            stats['new_subscribers'] += 1
        elif op['type'] == 'deleteSubscriber':
            stats['deleted_subscribers'] += 1
        elif op['type'] == 'createPackageSubscriber':
            stats['new_subscriptions'] += 1
            package_name = get_package_name(op['packageId'])
            stats['packages'][package_name]['adds'] += 1
        elif op['type'] == 'deletePackageSubscriber':
            stats['cancelled_subscriptions'] += 1
            package_name = get_package_name(op['packageId'])
            stats['packages'][package_name]['removes'] += 1

    # Формирование отчёта
    report = f"""
📈 Еженедельный отчёт ({start_date.strftime('%Y-%m-%d')} - {end_date.strftime('%Y-%m-%d')})

👥 Абоненты:
  Новых: {stats['new_subscribers']}
  Удалено: {stats['deleted_subscribers']}
  Чистый рост: {stats['new_subscribers'] - stats['deleted_subscribers']:+d}

📦 Подписки:
  Активировано: {stats['new_subscriptions']}
  Отменено: {stats['cancelled_subscriptions']}
  Чистый рост: {stats['new_subscriptions'] - stats['cancelled_subscriptions']:+d}

📊 По пакетам:
"""

    for package, counts in sorted(stats['packages'].items()):
        net = counts['adds'] - counts['removes']
        report += f"  {package}: {counts['adds']} активаций, {counts['removes']} отмен (net: {net:+d})\n"

    return report

# Генерация и отправка отчёта
report = generate_weekly_report()
print(report)

# Отправить на email руководству
# send_email(to="management@company.com", subject="Weekly Report", body=report)

Лучшие практики

Хранение данных

Рекомендации:

  • В Catena: Храните операции минимум 90 дней
  • В биллинговой БД: Храните навсегда для налоговой отчётности
  • Архивирование: Экспортируйте старые операции в cold storage (S3, glacier)

Политика хранения:

Catena Operations API:
  - Последние 90 дней: полный доступ
  - 90-365 дней: архив (медленный доступ)
  - >365 дней: холодное хранилище

Биллинговая БД:
  - Все операции навсегда
  - Индексы для быстрого поиска
  - Регулярные бэкапы

Защита от дубликатов

Проблема: Повторная отправка webhook может создать дубликат операции

Решение:

def process_operation_idempotent(operation_data):
    """Идемпотентная обработка операции"""

    operation_id = operation_data['operationId']

    # Проверить, обрабатывали ли уже
    cursor.execute(
        "SELECT 1 FROM processed_operations WHERE operation_id = %s",
        (operation_id,)
    )

    if cursor.fetchone():
        print(f"Operation {operation_id} already processed, skipping")
        return {"status": "already_processed"}

    # Обработать операцию
    process_billing_event(operation_data)

    # Отметить как обработанную
    cursor.execute(
        "INSERT INTO processed_operations (operation_id, processed_at) VALUES (%s, NOW())",
        (operation_id,)
    )
    conn.commit()

    return {"status": "processed"}

Мониторинг критичных операций

Настройка алертов:

def monitor_critical_operations():
    """Мониторинг критичных операций для финансов"""

    # Проверить последний час
    hour_ago = (datetime.now() - timedelta(hours=1)).isoformat()

    operations = get_operations(
        type=["createPackageSubscriber", "deletePackageSubscriber"],
        created_at_gte=hour_ago
    )

    # Аномалии
    alerts = []

    # 1. Слишком много отмен подписок
    cancellations = [op for op in operations if op['type'] == 'deletePackageSubscriber']
    if len(cancellations) > 50:  # Порог
        alerts.append({
            'level': 'warning',
            'message': f'High cancellation rate: {len(cancellations)} in last hour'
        })

    # 2. Нет новых подписок (странно)
    creations = [op for op in operations if op['type'] == 'createPackageSubscriber']
    if len(creations) == 0 and datetime.now().hour in range(10, 22):
        alerts.append({
            'level': 'info',
            'message': 'No new subscriptions in last hour (working hours)'
        })

    # 3. Массовые удаления абонентов
    deletions = [op for op in operations if op['type'] == 'deleteSubscriber']
    if len(deletions) > 10:
        alerts.append({
            'level': 'critical',
            'message': f'Mass subscriber deletion: {len(deletions)} in last hour'
        })

    # Отправить алерты
    for alert in alerts:
        send_alert(alert)

    return alerts

Получение конкретной операции

Запрос по ID:

curl -X GET https://your-catena-domain.com/tv-management/api/v1/operations/opKl9SW3AAAE. \
  -H "X-Auth-Token: your-api-key"

Ответ:

{
  "operationId": "opKl9SW3AAAE.",
  "type": "createPackageSubscriber",
  "portalId": "pKl9SW3AAAE.",
  "subscriberId": "sKl9SW3AAAE.",
  "packageId": "pkKl9SW3AAAE.",
  "createdAt": "2024-10-16T10:05:00Z",
  "updatedAt": "2024-10-16T10:05:00Z",
  "payload": {
    "packageName": "premium",
    "managerEmail": "admin@company.com"
  }
}

Применение:

  • Детальная информация об операции
  • Восстановление контекста события
  • Отладка конкретной транзакции

Устранение проблем

Операции не появляются в журнале

Проблема: Действия выполняются, но не записываются в журнал

Возможные причины:

  1. Проблемы с базой данных
  2. Ошибки в коде при записи операций
  3. Асинхронная запись с задержкой

Решение:

  1. Проверьте статус базы данных
  2. Просмотрите логи сервера на ошибки
  3. Подождите 1-2 минуты — операции могут записываться асинхронно

Расхождения в расчётах

Проблема: Расчёты на основе операций не совпадают с биллингом

Возможные причины:

  • Разные методы расчёта (calendar month vs billing cycle)
  • Не учитываются refunds или корректировки
  • Разные цены в разное время (изменение тарифов)
  • Пропущены операции из-за сбоев

Решение:

  1. Документируйте методологию расчётов
  2. Регулярно сверяйте данные (reconciliation)
  3. Храните историю цен пакетов
  4. Логируйте все корректировки отдельно

Отсутствуют старые операции

Проблема: Операции старше определённой даты не доступны

Причина: Политика хранения данных в Catena API

Решение:

  • Регулярно экспортируйте данные в свою БД
  • Не полагайтесь на API как единственный источник истории
  • Используйте собственное долгосрочное хранилище

См. также