Журнал операций¶
Журнал операций (operations log) — это полный аудит-лог всех действий с абонентами, пакетами и подписками в Catena. Каждое изменение записывается с временной меткой, что позволяет отслеживать историю и производить расчёты для биллинга.
Что такое операция¶
Операция — это запись о конкретном действии, выполненном в системе. Каждая операция содержит информацию о том, что было сделано, когда и с какими объектами.
Зачем нужен журнал операций:
- 💰 Расчёт доходов — подсчёт созданных и отменённых подписок для биллинга
- 📊 Аудит действий — кто, что и когда делал в системе
- 🔍 Отладка проблем — история изменений для выявления причин
- 📈 Аналитика бизнеса — метрики роста, churn rate, популярные пакеты
- 🔐 Безопасность — отслеживание подозрительных действий
- 📝 Compliance — доказательства для регуляторов и аудиторов
Что записывается:
Создание подписки → Операция createPackageSubscriber
↓
Начало биллингового периода
↓
Отмена подписки → Операция deletePackageSubscriber
↓
Расчёт стоимости = (дата отмены - дата создания) × цена
Типы операций¶
Операции с абонентами¶
autoCreateSubscriber — автоматическое создание абонента
- Происходит при первом входе через SMS (если включена автоматическая регистрация)
- Система создаёт учётную запись "на лету"
- Может автоматически подключить бесплатные пакеты
createSubscriber — ручное создание абонента
- Администратор или биллинг создали учётную запись
- Через веб-интерфейс или Management API
- Обычно с последующим подключением пакетов
deleteSubscriber — удаление абонента
- Полное удаление учётной записи
- Автоматически отменяет все подписки
- Необратимое действие
disableSubscriber — блокировка абонента
- Временная блокировка доступа
- Подписки сохраняются, но доступ к просмотру блокируется
- Используется при неоплате, нарушениях правил
enableSubscriber — разблокировка абонента
- Восстановление доступа после блокировки
- Подписки остаются активными
Операции с подписками¶
createPackageSubscriber — создание подписки
- Подключение пакета к абоненту
- Ключевая операция для биллинга — начало оплачиваемого периода
- Записывается дата начала подписки
deletePackageSubscriber — удаление подписки
- Отключение пакета от абонента
- Ключевая операция для биллинга — конец оплачиваемого периода
- Используется для расчёта стоимости
Операции с пакетами¶
createPackage — создание пакета
- Создан новый тарифный план
- Аудит для отслеживания изменений в продуктовой линейке
deletePackage — удаление пакета
- Пакет удалён из системы
- Все подписки на него должны быть предварительно отменены
Структура операции¶
Основные поля¶
operationId — уникальный идентификатор операции
- Формат: base64-кодированный Snowflake ID
- Пример:
oKl9SW3AAAE.
- Генерируется автоматически при создании записи
type — тип операции
- Один из предопределённых типов (см. выше)
- Используется для фильтрации и группировки
- Пример:
createPackageSubscriber
portalId — идентификатор портала
- К какому порталу относится операция
- Используется для изоляции данных между порталами
- Пример:
pKl9SW3AAAE.
subscriberId — идентификатор абонента (опционально)
- Присутствует в операциях, связанных с абонентами
- Null для операций с пакетами
- Пример:
sKl9SW3AAAE.
packageId — идентификатор пакета (опционально)
- Присутствует в операциях, связанных с пакетами
- Null для операций создания/удаления абонентов
- Пример:
pkKl9SW3AAAE.
createdAt — время создания операции
- Формат: ISO 8601 timestamp
- Пример:
2024-10-16T10:00:00Z
- Ключевое поле для биллинга — точная дата события
updatedAt — время последнего обновления
- Обычно совпадает с createdAt
- Может отличаться, если операция была изменена
- Пример:
2024-10-16T10:00:00Z
payload — дополнительные данные операции
- JSON объект с деталями операции
- Содержимое зависит от типа операции
- Примеры:
{"subscriberId": "sKl9SW3AAAE.", "name": "John Doe"}
{"packageId": "pkKl9SW3AAAE.", "packageName": "premium"}
Получение списка операций¶
Базовый запрос¶
Получить все операции:
curl -X GET https://your-catena-domain.com/tv-management/api/v1/operations \
-H "X-Auth-Token: your-api-key"
Ответ:
{
"operations": [
{
"operationId": "opKl9SW3AAAE.",
"type": "createSubscriber",
"portalId": "pKl9SW3AAAE.",
"subscriberId": "sKl9SW3AAAE.",
"packageId": null,
"createdAt": "2024-10-16T10:00:00Z",
"updatedAt": "2024-10-16T10:00:00Z",
"payload": {
"name": "Иван Петров",
"phone": "+79161234567"
}
},
{
"operationId": "opKl9SW3AAAB.",
"type": "createPackageSubscriber",
"portalId": "pKl9SW3AAAE.",
"subscriberId": "sKl9SW3AAAE.",
"packageId": "pkKl9SW3AAAE.",
"createdAt": "2024-10-16T10:05:00Z",
"updatedAt": "2024-10-16T10:05:00Z",
"payload": {
"packageName": "premium"
}
}
],
"next": "cursor-for-next-page"
}
Пагинация¶
Для больших объёмов используется cursor-based пагинация:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?cursor=cursor-for-next-page" \
-H "X-Auth-Token: your-api-key"
Фильтрация операций¶
По абоненту¶
Все операции конкретного абонента:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?subscriberId=sKl9SW3AAAE." \
-H "X-Auth-Token: your-api-key"
Применение:
- История изменений учётной записи
- Аудит действий с абонентом
- Расчёт стоимости подписок абонента
Несколько абонентов:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?subscriberId=sKl9SW3AAAE.&subscriberId=sKl9SW3AAAB." \
-H "X-Auth-Token: your-api-key"
По пакету¶
Все операции с конкретным пакетом:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?packageId=pkKl9SW3AAAE." \
-H "X-Auth-Token: your-api-key"
Применение:
- Подсчёт активаций пакета
- Анализ популярности тарифного плана
- Расчёт доходов от конкретного пакета
По типу операции¶
Только создания подписок:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?type=createPackageSubscriber" \
-H "X-Auth-Token: your-api-key"
Только удаления подписок:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?type=deletePackageSubscriber" \
-H "X-Auth-Token: your-api-key"
Несколько типов:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?type=createPackageSubscriber&type=deletePackageSubscriber" \
-H "X-Auth-Token: your-api-key"
Применение:
- Фокусировка на операциях, влияющих на биллинг
- Подсчёт новых подписок (создания)
- Анализ оттока (удаления)
По времени¶
Операции после определённой даты:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?created_at_gte=2024-10-01" \
-H "X-Auth-Token: your-api-key"
Операции до определённой даты:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?created_at_lt=2024-11-01" \
-H "X-Auth-Token: your-api-key"
Операции за период (месяц):
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/operations?created_at_gte=2024-10-01&created_at_lt=2024-11-01" \
-H "X-Auth-Token: your-api-key"
Применение:
- Ежемесячные отчёты
- Расчёты за биллинговый период
- Анализ трендов по времени
Формат даты:
- ISO 8601 format:
YYYY-MM-DD
илиYYYY-MM-DDTHH:MM:SSZ
- Примеры:
2024-10-01
,2024-10-16T10:00:00Z
- Timezone: UTC
Расчёты для биллинга¶
Подсчёт доходов за период¶
Задача: Рассчитать доход за октябрь 2024
Логика расчёта:
- Получить все операции
createPackageSubscriber
за октябрь - Для каждой подписки определить длительность
- Умножить на стоимость пакета
- Суммировать
Скрипт расчёта:
import requests
from datetime import datetime, timedelta
from collections import defaultdict
API_URL = "https://catena.example.com/tv-management/api/v1"
API_KEY = "your-api-key"
# Цены пакетов (храните в вашей биллинговой БД)
PACKAGE_PRICES = {
"basicKl9SW3AAAE.": 10.0, # $10/месяц
"premiumKl9SW3AAAE.": 20.0, # $20/месяц
"sportKl9SW3AAAE.": 15.0 # $15/месяц
}
def calculate_monthly_revenue(year, month):
"""
Рассчитать доход за месяц на основе операций
"""
# Границы месяца
start_date = f"{year}-{month:02d}-01"
if month == 12:
end_date = f"{year + 1}-01-01"
else:
end_date = f"{year}-{month + 1:02d}-01"
# 1. Получить все создания подписок за месяц
subscriptions = get_operations(
type="createPackageSubscriber",
created_at_gte=start_date,
created_at_lt=end_date
)
# 2. Получить все удаления подписок
cancellations = get_operations(
type="deletePackageSubscriber",
created_at_gte=start_date,
created_at_lt=end_date
)
# 3. Сгруппировать по пакетам
revenue_by_package = defaultdict(lambda: {
'created': 0,
'cancelled': 0,
'revenue': 0.0
})
# Подсчитать созданные подписки
for op in subscriptions:
package_id = op['packageId']
price = PACKAGE_PRICES.get(package_id, 0)
revenue_by_package[package_id]['created'] += 1
revenue_by_package[package_id]['revenue'] += price
# Подсчитать отменённые (возвраты)
for op in cancellations:
package_id = op['packageId']
revenue_by_package[package_id]['cancelled'] += 1
# 4. Итоговый отчёт
total_revenue = 0
report = []
for package_id, stats in revenue_by_package.items():
package_name = get_package_name(package_id)
total_revenue += stats['revenue']
report.append({
'package': package_name,
'created': stats['created'],
'cancelled': stats['cancelled'],
'net_growth': stats['created'] - stats['cancelled'],
'revenue': stats['revenue']
})
return {
'total_revenue': total_revenue,
'packages': sorted(report, key=lambda x: x['revenue'], reverse=True)
}
def get_operations(**filters):
"""Получить операции с фильтрами и пагинацией"""
operations = []
cursor = None
while True:
# Построить URL с параметрами
params = []
for key, value in filters.items():
if isinstance(value, list):
for v in value:
params.append(f"{key}={v}")
else:
params.append(f"{key}={value}")
if cursor:
params.append(f"cursor={cursor}")
url = f"{API_URL}/operations"
if params:
url += "?" + "&".join(params)
response = requests.get(url, headers={"X-Auth-Token": API_KEY})
data = response.json()
operations.extend(data['operations'])
cursor = data.get('next')
if not cursor:
break
return operations
# Пример использования
revenue = calculate_monthly_revenue(2024, 10)
print(f"Доход за октябрь 2024: ${revenue['total_revenue']:.2f}")
print("\nПо пакетам:")
for pkg in revenue['packages']:
print(f" {pkg['package']}: {pkg['created']} активаций, ${pkg['revenue']:.2f}")
print(f" Отмен: {pkg['cancelled']}, Чистый рост: {pkg['net_growth']}")
Расчёт пропорциональной оплаты (pro-rata)¶
Задача: Абонент подключил пакет 15-го числа, а месяц закончился 30-го. Сколько взимать?
Решение:
from datetime import datetime, timedelta
def calculate_prorata_billing(subscriber_id, start_date, end_date):
"""
Рассчитать пропорциональную стоимость подписок
"""
# Получить все операции абонента за период
operations = get_operations(
subscriberId=subscriber_id,
created_at_gte=start_date.isoformat(),
created_at_lt=end_date.isoformat()
)
# Отфильтровать операции с подписками
subscriptions = {} # package_id: (create_date, delete_date or None)
for op in operations:
package_id = op['packageId']
op_date = datetime.fromisoformat(op['createdAt'].replace('Z', '+00:00'))
if op['type'] == 'createPackageSubscriber':
if package_id not in subscriptions:
subscriptions[package_id] = {'created': op_date, 'deleted': None}
elif op['type'] == 'deletePackageSubscriber':
if package_id in subscriptions:
subscriptions[package_id]['deleted'] = op_date
# Рассчитать стоимость для каждой подписки
total_cost = 0.0
billing_details = []
for package_id, dates in subscriptions.items():
created = dates['created']
deleted = dates['deleted'] or end_date
# Количество дней использования
days_used = (deleted - created).days
# Стоимость пакета в день
monthly_price = PACKAGE_PRICES.get(package_id, 0)
daily_price = monthly_price / 30
# Пропорциональная стоимость
cost = daily_price * days_used
total_cost += cost
billing_details.append({
'package_id': package_id,
'package_name': get_package_name(package_id),
'created': created.isoformat(),
'deleted': deleted.isoformat(),
'days': days_used,
'monthly_price': monthly_price,
'cost': round(cost, 2)
})
return {
'subscriber_id': subscriber_id,
'period': f"{start_date.date()} - {end_date.date()}",
'total_cost': round(total_cost, 2),
'items': billing_details
}
# Пример: расчёт за октябрь
bill = calculate_prorata_billing(
subscriber_id="sKl9SW3AAAE.",
start_date=datetime(2024, 10, 1),
end_date=datetime(2024, 11, 1)
)
print(f"Абонент: {bill['subscriber_id']}")
print(f"Период: {bill['period']}")
print(f"К оплате: ${bill['total_cost']}")
print("\nДетализация:")
for item in bill['items']:
print(f" {item['package_name']}: {item['days']} дней × ${item['monthly_price']}/30 = ${item['cost']}")
Подсчёт метрик роста¶
Задача: Рассчитать MRR (Monthly Recurring Revenue) и churn rate
def calculate_growth_metrics(year, month):
"""Метрики роста бизнеса"""
start_date = f"{year}-{month:02d}-01"
if month == 12:
end_date = f"{year + 1}-01-01"
else:
end_date = f"{year}-{month + 1:02d}-01"
# Получить операции за месяц
creates = get_operations(
type="createPackageSubscriber",
created_at_gte=start_date,
created_at_lt=end_date
)
deletes = get_operations(
type="deletePackageSubscriber",
created_at_gte=start_date,
created_at_lt=end_date
)
# Подсчитать метрики
new_subscriptions = len(creates)
cancelled_subscriptions = len(deletes)
# MRR (упрощённо - без учёта разных цен пакетов)
avg_package_price = 15.0 # средняя цена пакета
new_mrr = new_subscriptions * avg_package_price
lost_mrr = cancelled_subscriptions * avg_package_price
net_mrr_change = new_mrr - lost_mrr
# Churn rate
# Для точного расчёта нужно знать количество активных подписок на начало месяца
active_subscriptions_start = get_active_subscriptions_count(start_date)
churn_rate = (cancelled_subscriptions / active_subscriptions_start * 100) if active_subscriptions_start > 0 else 0
return {
'month': f"{year}-{month:02d}",
'new_subscriptions': new_subscriptions,
'cancelled_subscriptions': cancelled_subscriptions,
'net_growth': new_subscriptions - cancelled_subscriptions,
'new_mrr': new_mrr,
'lost_mrr': lost_mrr,
'net_mrr_change': net_mrr_change,
'churn_rate': round(churn_rate, 2)
}
# Отчёт за октябрь 2024
metrics = calculate_growth_metrics(2024, 10)
print(f"Метрики за {metrics['month']}:")
print(f" Новых подписок: {metrics['new_subscriptions']}")
print(f" Отменено: {metrics['cancelled_subscriptions']}")
print(f" Чистый рост: {metrics['net_growth']}")
print(f" Новый MRR: ${metrics['new_mrr']}")
print(f" Потерянный MRR: ${metrics['lost_mrr']}")
print(f" Изменение MRR: ${metrics['net_mrr_change']:+.2f}")
print(f" Churn rate: {metrics['churn_rate']}%")
Типичные сценарии использования¶
Сценарий 1: Ежемесячный биллинговый отчёт¶
Задача: Подготовить счета для всех абонентов за месяц
Решение:
import csv
from datetime import datetime
def generate_monthly_invoices(year, month, output_csv='invoices.csv'):
"""Генерация счетов на основе журнала операций"""
start_date = datetime(year, month, 1)
if month == 12:
end_date = datetime(year + 1, 1, 1)
else:
end_date = datetime(year, month + 1, 1)
# Получить всех абонентов
response = requests.get(
f"{API_URL}/subscribers",
headers={"X-Auth-Token": API_KEY}
)
subscribers = response.json()['subscribers']
# Для каждого абонента рассчитать стоимость
invoices = []
for subscriber in subscribers:
subscriber_id = subscriber['subscriberId']
# Рассчитать pro-rata биллинг
bill = calculate_prorata_billing(
subscriber_id,
start_date,
end_date
)
if bill['total_cost'] > 0:
invoices.append({
'subscriber_id': subscriber_id,
'subscriber_name': subscriber['name'],
'phone': f"+{subscriber['phoneCountryCode']}{subscriber['phone']}",
'amount': bill['total_cost'],
'details': bill['items']
})
# Сохранить в CSV
with open(output_csv, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=['subscriber_id', 'subscriber_name', 'phone', 'amount'])
writer.writeheader()
for invoice in invoices:
writer.writerow({
'subscriber_id': invoice['subscriber_id'],
'subscriber_name': invoice['subscriber_name'],
'phone': invoice['phone'],
'amount': f"${invoice['amount']:.2f}"
})
total_revenue = sum(inv['amount'] for inv in invoices)
return {
'invoices_count': len(invoices),
'total_revenue': total_revenue,
'file': output_csv
}
# Генерация счетов за октябрь
result = generate_monthly_invoices(2024, 10)
print(f"Создано счетов: {result['invoices_count']}")
print(f"Общий доход: ${result['total_revenue']:.2f}")
print(f"Файл: {result['file']}")
Сценарий 2: Выявление изменений подписок¶
Задача: Найти всех абонентов, которые изменили подписки в октябре
Применение:
- Email рассылка о изменениях
- Анализ причин отмены
- Retention кампании
def find_subscription_changes(year, month):
"""Найти изменения подписок"""
start_date = f"{year}-{month:02d}-01"
if month == 12:
end_date = f"{year + 1}-01-01"
else:
end_date = f"{year}-{month + 1:02d}-01"
# Получить операции с подписками
operations = get_operations(
type=["createPackageSubscriber", "deletePackageSubscriber"],
created_at_gte=start_date,
created_at_lt=end_date
)
# Группировка по абонентам
changes = defaultdict(lambda: {'added': [], 'removed': []})
for op in operations:
subscriber_id = op['subscriberId']
package_id = op['packageId']
package_name = get_package_name(package_id)
if op['type'] == 'createPackageSubscriber':
changes[subscriber_id]['added'].append({
'package': package_name,
'date': op['createdAt']
})
else:
changes[subscriber_id]['removed'].append({
'package': package_name,
'date': op['createdAt']
})
# Результат
results = {
'upgrades': [], # Добавили пакеты
'downgrades': [], # Удалили пакеты
'churned': [] # Удалили все подписки
}
for subscriber_id, activity in changes.items():
subscriber = get_subscriber(subscriber_id)
if activity['added'] and not activity['removed']:
# Только добавления - апгрейд
results['upgrades'].append({
'subscriber': subscriber,
'packages': activity['added']
})
elif activity['removed'] and not activity['added']:
# Только удаления - возможно churn
if len(activity['removed']) >= len(subscriber['packages']):
results['churned'].append({
'subscriber': subscriber,
'packages': activity['removed']
})
else:
results['downgrades'].append({
'subscriber': subscriber,
'packages': activity['removed']
})
else:
# И добавления и удаления - изменение плана
pass
return results
# Анализ изменений за октябрь
changes = find_subscription_changes(2024, 10)
print(f"Апгрейды: {len(changes['upgrades'])}")
print(f"Даунгрейды: {len(changes['downgrades'])}")
print(f"Полностью отменили: {len(changes['churned'])}")
# Email рассылка для churn prevention
for churned in changes['churned']:
send_retention_email(churned['subscriber'], churned['packages'])
Сценарий 3: Аудит действий¶
Задача: Проверить, кто и когда изменял подписку абонента
def audit_subscriber_history(subscriber_id):
"""Полная история изменений абонента"""
# Получить все операции
operations = get_operations(subscriberId=subscriber_id)
# Сортировка по времени
operations.sort(key=lambda x: x['createdAt'])
# Форматированный вывод
print(f"История абонента {subscriber_id}:\n")
for op in operations:
timestamp = op['createdAt']
op_type = op['type']
if op_type == 'createSubscriber':
print(f"{timestamp} - Создан абонент")
print(f" Имя: {op['payload'].get('name')}")
print(f" Телефон: {op['payload'].get('phone')}")
elif op_type == 'createPackageSubscriber':
package_name = get_package_name(op['packageId'])
print(f"{timestamp} - Подключен пакет: {package_name}")
elif op_type == 'deletePackageSubscriber':
package_name = get_package_name(op['packageId'])
print(f"{timestamp} - Отключен пакет: {package_name}")
elif op_type == 'disableSubscriber':
print(f"{timestamp} - Абонент заблокирован")
print(f" Причина: {op['payload'].get('reason', 'не указана')}")
elif op_type == 'enableSubscriber':
print(f"{timestamp} - Абонент разблокирован")
elif op_type == 'deleteSubscriber':
print(f"{timestamp} - Абонент удалён")
print()
# Пример
audit_subscriber_history("sKl9SW3AAAE.")
Сценарий 4: Reconciliation с внешним биллингом¶
Задача: Сверить данные Catena с внешней биллинговой системой
def reconcile_with_billing(year, month):
"""Сверка с внешним биллингом"""
start_date = f"{year}-{month:02d}-01"
if month == 12:
end_date = f"{year + 1}-01-01"
else:
end_date = f"{year}-{month + 1:02d}-01"
# 1. Получить операции из Catena
catena_operations = get_operations(
type=["createPackageSubscriber", "deletePackageSubscriber"],
created_at_gte=start_date,
created_at_lt=end_date
)
# 2. Получить транзакции из биллинга
billing_transactions = get_billing_transactions(start_date, end_date)
# 3. Сравнить
discrepancies = []
# Проверить, есть ли в Catena все транзакции из биллинга
for transaction in billing_transactions:
if transaction['type'] == 'subscription_create':
# Найти соответствующую операцию в Catena
found = any(
op['subscriberId'] == transaction['subscriber_id'] and
op['packageId'] == transaction['package_id'] and
abs((datetime.fromisoformat(op['createdAt'].replace('Z', '+00:00')) -
transaction['date']).total_seconds()) < 300 # 5 минут допуск
for op in catena_operations
if op['type'] == 'createPackageSubscriber'
)
if not found:
discrepancies.append({
'type': 'missing_in_catena',
'billing_transaction': transaction,
'severity': 'high'
})
# Проверить обратное — есть ли в биллинге все операции из Catena
for op in catena_operations:
if op['type'] == 'createPackageSubscriber':
found = any(
t['subscriber_id'] == op['subscriberId'] and
t['package_id'] == op['packageId']
for t in billing_transactions
if t['type'] == 'subscription_create'
)
if not found:
discrepancies.append({
'type': 'missing_in_billing',
'catena_operation': op,
'severity': 'medium'
})
return discrepancies
# Reconciliation за октябрь
discrepancies = reconcile_with_billing(2024, 10)
if discrepancies:
print(f"⚠️ Найдено {len(discrepancies)} расхождений:")
for d in discrepancies:
print(f" - {d['type']}: severity={d['severity']}")
else:
print("✅ Данные синхронизированы, расхождений нет")
Лучшие практики¶
Регулярный экспорт данных¶
Рекомендация: Ежедневно экспортируйте операции в свою БД для долгосрочного хранения
import psycopg2
from datetime import datetime, timedelta
def export_operations_to_db():
"""Экспорт операций в PostgreSQL"""
# Подключение к БД
conn = psycopg2.connect("postgresql://billing_db")
cursor = conn.cursor()
# Создать таблицу (если не существует)
cursor.execute('''
CREATE TABLE IF NOT EXISTS catena_operations (
operation_id TEXT PRIMARY KEY,
type TEXT NOT NULL,
portal_id TEXT,
subscriber_id TEXT,
package_id TEXT,
created_at TIMESTAMP NOT NULL,
payload JSONB
)
''')
# Получить операции за последние 24 часа
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
operations = get_operations(created_at_gte=yesterday)
# Вставить в БД
inserted = 0
for op in operations:
try:
cursor.execute('''
INSERT INTO catena_operations
(operation_id, type, portal_id, subscriber_id, package_id, created_at, payload)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (operation_id) DO NOTHING
''', (
op['operationId'],
op['type'],
op['portalId'],
op.get('subscriberId'),
op.get('packageId'),
op['createdAt'],
json.dumps(op.get('payload', {}))
))
inserted += 1
except Exception as e:
print(f"Error inserting {op['operationId']}: {e}")
conn.commit()
conn.close()
return inserted
# Запускать ежедневно по cron
exported = export_operations_to_db()
print(f"Exported {exported} operations to database")
Использование в биллинговой системе¶
Архитектура:
Catena Operations Log ←→ Биллинговая система
↓ ↓
Факт события Финансовый расчёт
(подписка) (счёт)
Workflow:
- Событие в Catena — создание/удаление подписки
- Webhook или polling — биллинг узнаёт о событии
- Запись в биллинг — транзакция в биллинговой БД
- Расчёт стоимости — на основе тарифов и периода
- Выставление счёта — абоненту или партнёру
Пример webhook от Catena:
from flask import Flask, request
import requests
app = Flask(__name__)
@app.route('/catena-webhook', methods=['POST'])
def catena_webhook():
"""Обработка событий из Catena"""
event = request.json
if event['type'] == 'createPackageSubscriber':
# Новая подписка - начать биллинг
billing_start_subscription(
subscriber_id=event['subscriberId'],
package_id=event['packageId'],
start_date=event['createdAt']
)
return {"status": "ok", "action": "billing_started"}
elif event['type'] == 'deletePackageSubscriber':
# Отмена подписки - остановить биллинг
billing_end_subscription(
subscriber_id=event['subscriberId'],
package_id=event['packageId'],
end_date=event['createdAt']
)
return {"status": "ok", "action": "billing_stopped"}
return {"status": "ok", "action": "ignored"}
if __name__ == '__main__':
app.run(port=5001)
Мониторинг финансовых метрик¶
Дашборд для руководства:
def get_financial_dashboard():
"""Финансовый дашборд в реальном времени"""
today = datetime.now().date()
month_start = today.replace(day=1)
# Метрики текущего месяца
current_month = calculate_monthly_revenue(today.year, today.month)
# Метрики прошлого месяца
last_month_date = month_start - timedelta(days=1)
last_month = calculate_monthly_revenue(
last_month_date.year,
last_month_date.month
)
# Сравнение
revenue_growth = (
(current_month['total_revenue'] - last_month['total_revenue']) /
last_month['total_revenue'] * 100
) if last_month['total_revenue'] > 0 else 0
# Операции за сегодня
today_str = today.isoformat()
tomorrow_str = (today + timedelta(days=1)).isoformat()
today_creates = len(get_operations(
type="createPackageSubscriber",
created_at_gte=today_str,
created_at_lt=tomorrow_str
))
today_cancels = len(get_operations(
type="deletePackageSubscriber",
created_at_gte=today_str,
created_at_lt=tomorrow_str
))
return {
'current_month_revenue': current_month['total_revenue'],
'last_month_revenue': last_month['total_revenue'],
'revenue_growth_percent': round(revenue_growth, 1),
'today_new_subscriptions': today_creates,
'today_cancellations': today_cancels,
'today_net_growth': today_creates - today_cancels
}
# Вывести дашборд
dashboard = get_financial_dashboard()
print("📊 Финансовый дашборд")
print(f"Доход текущего месяца: ${dashboard['current_month_revenue']:.2f}")
print(f"Доход прошлого месяца: ${dashboard['last_month_revenue']:.2f}")
print(f"Рост: {dashboard['revenue_growth_percent']:+.1f}%")
print(f"\nСегодня:")
print(f" Новых подписок: {dashboard['today_new_subscriptions']}")
print(f" Отмен: {dashboard['today_cancellations']}")
print(f" Чистый рост: {dashboard['today_net_growth']:+d}")
Отчёты для бизнеса¶
Еженедельный отчёт для руководства¶
Скрипт автоматической генерации:
def generate_weekly_report():
"""Еженедельный отчёт для руководства"""
# Последние 7 дней
end_date = datetime.now()
start_date = end_date - timedelta(days=7)
operations = get_operations(
type=["createPackageSubscriber", "deletePackageSubscriber",
"createSubscriber", "deleteSubscriber"],
created_at_gte=start_date.isoformat(),
created_at_lt=end_date.isoformat()
)
# Анализ
stats = {
'new_subscribers': 0,
'deleted_subscribers': 0,
'new_subscriptions': 0,
'cancelled_subscriptions': 0,
'packages': defaultdict(lambda: {'adds': 0, 'removes': 0})
}
for op in operations:
if op['type'] == 'createSubscriber':
stats['new_subscribers'] += 1
elif op['type'] == 'deleteSubscriber':
stats['deleted_subscribers'] += 1
elif op['type'] == 'createPackageSubscriber':
stats['new_subscriptions'] += 1
package_name = get_package_name(op['packageId'])
stats['packages'][package_name]['adds'] += 1
elif op['type'] == 'deletePackageSubscriber':
stats['cancelled_subscriptions'] += 1
package_name = get_package_name(op['packageId'])
stats['packages'][package_name]['removes'] += 1
# Формирование отчёта
report = f"""
📈 Еженедельный отчёт ({start_date.strftime('%Y-%m-%d')} - {end_date.strftime('%Y-%m-%d')})
👥 Абоненты:
Новых: {stats['new_subscribers']}
Удалено: {stats['deleted_subscribers']}
Чистый рост: {stats['new_subscribers'] - stats['deleted_subscribers']:+d}
📦 Подписки:
Активировано: {stats['new_subscriptions']}
Отменено: {stats['cancelled_subscriptions']}
Чистый рост: {stats['new_subscriptions'] - stats['cancelled_subscriptions']:+d}
📊 По пакетам:
"""
for package, counts in sorted(stats['packages'].items()):
net = counts['adds'] - counts['removes']
report += f" {package}: {counts['adds']} активаций, {counts['removes']} отмен (net: {net:+d})\n"
return report
# Генерация и отправка отчёта
report = generate_weekly_report()
print(report)
# Отправить на email руководству
# send_email(to="management@company.com", subject="Weekly Report", body=report)
Лучшие практики¶
Хранение данных¶
Рекомендации:
- В Catena: Храните операции минимум 90 дней
- В биллинговой БД: Храните навсегда для налоговой отчётности
- Архивирование: Экспортируйте старые операции в cold storage (S3, glacier)
Политика хранения:
Catena Operations API:
- Последние 90 дней: полный доступ
- 90-365 дней: архив (медленный доступ)
- >365 дней: холодное хранилище
Биллинговая БД:
- Все операции навсегда
- Индексы для быстрого поиска
- Регулярные бэкапы
Защита от дубликатов¶
Проблема: Повторная отправка webhook может создать дубликат операции
Решение:
def process_operation_idempotent(operation_data):
"""Идемпотентная обработка операции"""
operation_id = operation_data['operationId']
# Проверить, обрабатывали ли уже
cursor.execute(
"SELECT 1 FROM processed_operations WHERE operation_id = %s",
(operation_id,)
)
if cursor.fetchone():
print(f"Operation {operation_id} already processed, skipping")
return {"status": "already_processed"}
# Обработать операцию
process_billing_event(operation_data)
# Отметить как обработанную
cursor.execute(
"INSERT INTO processed_operations (operation_id, processed_at) VALUES (%s, NOW())",
(operation_id,)
)
conn.commit()
return {"status": "processed"}
Мониторинг критичных операций¶
Настройка алертов:
def monitor_critical_operations():
"""Мониторинг критичных операций для финансов"""
# Проверить последний час
hour_ago = (datetime.now() - timedelta(hours=1)).isoformat()
operations = get_operations(
type=["createPackageSubscriber", "deletePackageSubscriber"],
created_at_gte=hour_ago
)
# Аномалии
alerts = []
# 1. Слишком много отмен подписок
cancellations = [op for op in operations if op['type'] == 'deletePackageSubscriber']
if len(cancellations) > 50: # Порог
alerts.append({
'level': 'warning',
'message': f'High cancellation rate: {len(cancellations)} in last hour'
})
# 2. Нет новых подписок (странно)
creations = [op for op in operations if op['type'] == 'createPackageSubscriber']
if len(creations) == 0 and datetime.now().hour in range(10, 22):
alerts.append({
'level': 'info',
'message': 'No new subscriptions in last hour (working hours)'
})
# 3. Массовые удаления абонентов
deletions = [op for op in operations if op['type'] == 'deleteSubscriber']
if len(deletions) > 10:
alerts.append({
'level': 'critical',
'message': f'Mass subscriber deletion: {len(deletions)} in last hour'
})
# Отправить алерты
for alert in alerts:
send_alert(alert)
return alerts
Получение конкретной операции¶
Запрос по ID:
curl -X GET https://your-catena-domain.com/tv-management/api/v1/operations/opKl9SW3AAAE. \
-H "X-Auth-Token: your-api-key"
Ответ:
{
"operationId": "opKl9SW3AAAE.",
"type": "createPackageSubscriber",
"portalId": "pKl9SW3AAAE.",
"subscriberId": "sKl9SW3AAAE.",
"packageId": "pkKl9SW3AAAE.",
"createdAt": "2024-10-16T10:05:00Z",
"updatedAt": "2024-10-16T10:05:00Z",
"payload": {
"packageName": "premium",
"managerEmail": "admin@company.com"
}
}
Применение:
- Детальная информация об операции
- Восстановление контекста события
- Отладка конкретной транзакции
Устранение проблем¶
Операции не появляются в журнале¶
Проблема: Действия выполняются, но не записываются в журнал
Возможные причины:
- Проблемы с базой данных
- Ошибки в коде при записи операций
- Асинхронная запись с задержкой
Решение:
- Проверьте статус базы данных
- Просмотрите логи сервера на ошибки
- Подождите 1-2 минуты — операции могут записываться асинхронно
Расхождения в расчётах¶
Проблема: Расчёты на основе операций не совпадают с биллингом
Возможные причины:
- Разные методы расчёта (calendar month vs billing cycle)
- Не учитываются refunds или корректировки
- Разные цены в разное время (изменение тарифов)
- Пропущены операции из-за сбоев
Решение:
- Документируйте методологию расчётов
- Регулярно сверяйте данные (reconciliation)
- Храните историю цен пакетов
- Логируйте все корректировки отдельно
Отсутствуют старые операции¶
Проблема: Операции старше определённой даты не доступны
Причина: Политика хранения данных в Catena API
Решение:
- Регулярно экспортируйте данные в свою БД
- Не полагайтесь на API как единственный источник истории
- Используйте собственное долгосрочное хранилище
См. также¶
- Управление абонентами — создание абонентов записывает операции
- Управление подписками — создание/удаление подписок
- Управление пакетами — операции с пакетами
- Сеансы воспроизведения — дополнительные данные для биллинга по трафику