Мониторинг сеансов воспроизведения¶
Сеансы воспроизведения (play sessions) — это записи о просмотре каналов абонентами. Catena автоматически регистрирует каждое открытие потока и сохраняет подробную информацию о сеансе для мониторинга, аналитики и отладки.
Что такое сеанс воспроизведения¶
Сеанс воспроизведения — это период времени, когда абонент смотрит конкретный канал. Каждый сеанс содержит информацию о том, кто, что, когда и откуда смотрел.
Жизненный цикл сеанса:
Открытие потока → Активный просмотр → Закрытие потока
(openedAt) (active: true) (closedAt)
Что записывается:
- Кто смотрит: subscriberId, токен
- Что смотрит: channelId, channelName, programId
- Когда: openedAt, closedAt, updatedAt (timestamps)
- Откуда: IP адрес, userAgent (плеер/устройство)
- Сколько: bytes (объём переданных данных), длительность сеанса
- Статус: active (сеанс открыт или закрыт)
Применение:
- Мониторинг в реальном времени — кто сейчас смотрит каналы
- Отладка проблем — почему абонент не может смотреть канал
- Аналитика просмотров — популярные каналы, время просмотра
- Биллинг — подсчёт потреблённого трафика
- Безопасность — выявление аномалий (один токен из разных IP)
- Статистика — отчёты для владельцев контента
Структура сеанса воспроизведения¶
Основные поля¶
Идентификаторы:
- sessionId — уникальный ID сеанса
- Формат: base64-кодированный Snowflake ID
- Пример:
sessKl9SW3AAAE.
-
Генерируется при открытии потока
-
subscriberId — ID абонента, смотрящего канал
- Связь с учётной записью пользователя
-
Пример:
sKl9SW3AAAE.
-
channelId — ID канала, который смотрят
-
Пример:
chKl9SW3AAAE.
-
channelName — техническое имя канала
- Удобнее для отладки, чем ID
-
Пример:
sport1
,news-hd
-
programId — ID программы (если смотрят из архива)
- Null для live просмотра
-
Пример:
prKl9SW3AAAE.
-
portalId — ID портала
- Изоляция данных между порталами
- Пример:
pKl9SW3AAAE.
Временные метки:
- openedAt — Unix timestamp открытия сеанса
- Когда абонент начал смотреть
-
Пример:
1714233600
(28 апреля 2024, 10:00:00 UTC) -
closedAt — Unix timestamp закрытия сеанса
- Когда поток был остановлен
- Null для активных сеансов
-
Пример:
1714237200
-
updatedAt — Unix timestamp последнего обновления
- Обновляется периодически во время просмотра
- Используется для определения "мёртвых" соединений
Сетевая информация:
- ip — IP адрес абонента
- Пример:
192.168.1.100
,2001:db8::1
-
Используется для геолокации и выявления аномалий
-
userAgent — строка User-Agent плеера
- Определяет устройство и приложение
- Примеры:
VLC/3.0.16
Mozilla/5.0 (Linux; Android 11) AppleWebKit/537.36
Catena/1.0 (Android 11; Samsung SM-G991B)
Статистика:
- active — флаг активности сеанса
true
— сеанс открыт, идёт просмотр-
false
— сеанс закрыт, просмотр завершён -
bytes — объём переданных данных в байтах
- Обновляется во время просмотра
- Пример:
5242880000
(5 ГБ) -
Используется для биллинга по трафику
-
token — токен воспроизведения абонента
- Используется streaming-сервером для авторизации
- Пример:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
Устройство:
- deviceId — идентификатор устройства
- Если приложение передаёт device ID
- Пример:
device_android_samsung_s21
- Помогает отслеживать количество устройств у абонента
Получение списка сеансов¶
Базовый запрос¶
Получить список всех сеансов:
curl -X GET https://your-catena-domain.com/tv-management/api/v1/play-sessions \
-H "X-Auth-Token: your-api-key"
Ответ:
{
"sessions": [
{
"sessionId": "sessKl9SW3AAAE.",
"subscriberId": "sKl9SW3AAAE.",
"channelId": "chKl9SW3AAAE.",
"channelName": "sport1",
"programId": null,
"portalId": "pKl9SW3AAAE.",
"openedAt": 1714233600,
"closedAt": null,
"updatedAt": 1714237200,
"active": true,
"bytes": 1073741824,
"ip": "192.168.1.100",
"userAgent": "Catena/1.0 (Android 11)",
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"deviceId": "device_123"
},
{
"sessionId": "sessKl9SW3AAAB.",
"subscriberId": "sKl9SW3AAAB.",
"channelId": "chKl9SW3AAAB.",
"channelName": "news-hd",
"programId": null,
"portalId": "pKl9SW3AAAE.",
"openedAt": 1714230000,
"closedAt": 1714233600,
"updatedAt": 1714233600,
"active": false,
"bytes": 524288000,
"ip": "10.0.0.50",
"userAgent": "VLC/3.0.16",
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"deviceId": null
}
],
"next": "cursor-for-next-page"
}
Пагинация¶
Для больших объёмов данных используется cursor-based пагинация:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/play-sessions?cursor=cursor-for-next-page" \
-H "X-Auth-Token: your-api-key"
Рекомендации:
- Обрабатывайте данные постранично
- Используйте фильтры для уменьшения объёма
- Для периодического мониторинга запрашивайте только активные сеансы
Фильтрация сеансов¶
По абоненту¶
Получить все сеансы конкретного абонента:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/play-sessions?subscriberId=sKl9SW3AAAE." \
-H "X-Auth-Token: your-api-key"
Применение:
- Просмотр истории конкретного пользователя
- Отладка проблем абонента
- Анализ паттернов просмотра
Несколько абонентов:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/play-sessions?subscriberId=sKl9SW3AAAE.&subscriberId=sKl9SW3AAAB." \
-H "X-Auth-Token: your-api-key"
По каналу¶
Получить все сеансы конкретного канала:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/play-sessions?channelId=chKl9SW3AAAE." \
-H "X-Auth-Token: your-api-key"
Применение:
- Определение популярности канала
- Анализ пиковых нагрузок на канал
- Отладка проблем с конкретным каналом
Несколько каналов:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/play-sessions?channelId=chKl9SW3AAAE.&channelId=chKl9SW3AAAB." \
-H "X-Auth-Token: your-api-key"
По статусу активности¶
Только активные сеансы (кто смотрит сейчас):
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/play-sessions?active=true" \
-H "X-Auth-Token: your-api-key"
Только завершённые сеансы (история):
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/play-sessions?active=false" \
-H "X-Auth-Token: your-api-key"
Применение:
- active=true — мониторинг в реальном времени
- active=false — анализ истории, построение отчётов
По времени¶
Сеансы, открытые после определённого времени:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/play-sessions?opened_at_gte=1714233600" \
-H "X-Auth-Token: your-api-key"
Сеансы, открытые до определённого времени:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/play-sessions?opened_at_lt=1714320000" \
-H "X-Auth-Token: your-api-key"
Сеансы в интервале времени:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/play-sessions?opened_at_gte=1714233600&opened_at_lt=1714320000" \
-H "X-Auth-Token: your-api-key"
Применение:
- Анализ просмотров за конкретный период
- Построение временных графиков
- Выявление пиковых часов
Преобразование дат в Unix timestamp:
# Текущая дата/время
date +%s
# Результат: 1714233600
# Конкретная дата (GNU date)
date -d "2024-04-28 10:00:00" +%s
# macOS
date -j -f "%Y-%m-%d %H:%M:%S" "2024-04-28 10:00:00" +%s
Комбинированные фильтры¶
Активные сеансы конкретного абонента:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/play-sessions?subscriberId=sKl9SW3AAAE.&active=true" \
-H "X-Auth-Token: your-api-key"
Сеансы канала за последние 24 часа:
# Текущее время минус 24 часа
TIMESTAMP_24H_AGO=$(date -d '24 hours ago' +%s)
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/play-sessions?channelId=chKl9SW3AAAE.&opened_at_gte=$TIMESTAMP_24H_AGO" \
-H "X-Auth-Token: your-api-key"
Типичные сценарии использования¶
Сценарий 1: Мониторинг в реальном времени¶
Задача: Отобразить dashboard с текущими зрителями
Решение:
#!/bin/bash
# realtime-dashboard.sh
API_URL="https://catena.example.com/tv-management/api/v1"
API_KEY="your-api-key"
while true; do
# Получить активные сеансы
RESPONSE=$(curl -s -X GET "$API_URL/play-sessions?active=true" \
-H "X-Auth-Token: $API_KEY")
# Общее количество зрителей
TOTAL_VIEWERS=$(echo $RESPONSE | jq '.sessions | length')
# Топ-5 популярных каналов
TOP_CHANNELS=$(echo $RESPONSE | jq -r '.sessions | group_by(.channelName) |
map({channel: .[0].channelName, viewers: length}) |
sort_by(.viewers) | reverse | .[0:5]')
clear
echo "=== Текущие зрители ==="
echo "Всего: $TOTAL_VIEWERS"
echo ""
echo "Топ каналов:"
echo "$TOP_CHANNELS" | jq -r '.[] | "\(.channel): \(.viewers) зрителей"'
sleep 10
done
Python версия с Prometheus метриками:
import requests
import time
from prometheus_client import Gauge, start_http_server
# Метрики
active_sessions = Gauge('catena_active_sessions', 'Number of active sessions')
channel_viewers = Gauge('catena_channel_viewers', 'Viewers per channel', ['channel'])
API_URL = "https://catena.example.com/tv-management/api/v1"
API_KEY = "your-api-key"
def update_metrics():
response = requests.get(
f"{API_URL}/play-sessions?active=true",
headers={"X-Auth-Token": API_KEY}
)
sessions = response.json()['sessions']
# Обновить общее количество
active_sessions.set(len(sessions))
# Подсчитать по каналам
channels = {}
for session in sessions:
channel = session['channelName']
channels[channel] = channels.get(channel, 0) + 1
# Обновить метрики по каналам
for channel, count in channels.items():
channel_viewers.labels(channel=channel).set(count)
if __name__ == '__main__':
# Запустить HTTP сервер для Prometheus
start_http_server(8000)
while True:
update_metrics()
time.sleep(30)
Сценарий 2: Отладка проблем абонента¶
Задача: Абонент жалуется, что не может смотреть канал
Шаги отладки:
- Проверить активные сеансы абонента:
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/play-sessions?subscriberId=sKl9SW3AAAE.&active=true" \
-H "X-Auth-Token: your-api-key"
Анализ:
- Если сеансов нет — проблема с авторизацией или сетью
- Если есть сеанс — проверить updatedAt
(недавно ли обновлялся)
- Проверить IP и userAgent — соответствует ли устройству абонента
- Проверить историю последних сеансов:
# Последние 1 час
TIMESTAMP_1H_AGO=$(date -d '1 hour ago' +%s)
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/play-sessions?subscriberId=sKl9SW3AAAE.&opened_at_gte=$TIMESTAMP_1H_AGO" \
-H "X-Auth-Token: your-api-key"
Что искать: - Частые переподключения (много коротких сеансов) - Низкий объём переданных данных (проблемы со stream) - Разные IP адреса (абонент переключается между сетями)
- Проверить, может ли абонент смотреть конкретный канал:
# Проверить подписки
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/subscribers/sKl9SW3AAAE." \
-H "X-Auth-Token: your-api-key" \
| jq '.packages'
# Проверить, в каких пакетах этот канал
curl -X GET "https://your-catena-domain.com/tv-management/api/v1/channels/chKl9SW3AAAE." \
-H "X-Auth-Token: your-api-key" \
| jq '.packages'
Сценарий 3: Выявление аномалий¶
Задача: Обнаружить подозрительную активность (account sharing)
Индикаторы аномалий:
- Одновременный просмотр с разных IP:
import requests
from collections import defaultdict
def detect_concurrent_ips():
response = requests.get(
f"{API_URL}/play-sessions?active=true",
headers={"X-Auth-Token": API_KEY}
)
sessions = response.json()['sessions']
# Группировка по абонентам
subscriber_ips = defaultdict(set)
for session in sessions:
subscriber_id = session['subscriberId']
ip = session['ip']
subscriber_ips[subscriber_id].add(ip)
# Найти абонентов с multiple IP
anomalies = []
for subscriber, ips in subscriber_ips.items():
if len(ips) > 1:
anomalies.append({
'subscriber': subscriber,
'ips': list(ips),
'count': len(ips)
})
return anomalies
# Вывести аномалии
for anomaly in detect_concurrent_ips():
print(f"⚠️ Абонент {anomaly['subscriber']} смотрит с {anomaly['count']} IP:")
for ip in anomaly['ips']:
print(f" - {ip}")
- Слишком много одновременных сеансов:
def detect_excessive_sessions(max_sessions=3):
response = requests.get(
f"{API_URL}/play-sessions?active=true",
headers={"X-Auth-Token": API_KEY}
)
sessions = response.json()['sessions']
subscriber_sessions = defaultdict(list)
for session in sessions:
subscriber_sessions[session['subscriberId']].append(session)
violations = []
for subscriber, sessions in subscriber_sessions.items():
if len(sessions) > max_sessions:
violations.append({
'subscriber': subscriber,
'session_count': len(sessions),
'channels': [s['channelName'] for s in sessions]
})
return violations
- Необычные User-Agents:
def detect_unusual_user_agents():
# Получить историю за последние 7 дней
timestamp_7d = int(time.time()) - 7 * 24 * 3600
response = requests.get(
f"{API_URL}/play-sessions?opened_at_gte={timestamp_7d}",
headers={"X-Auth-Token": API_KEY}
)
sessions = response.json()['sessions']
subscriber_agents = defaultdict(set)
for session in sessions:
subscriber_agents[session['subscriberId']].add(session['userAgent'])
# Найти тех, кто использует много разных устройств
suspicious = []
for subscriber, agents in subscriber_agents.items():
if len(agents) > 5: # Более 5 разных устройств
suspicious.append({
'subscriber': subscriber,
'devices': list(agents)
})
return suspicious
Сценарий 4: Аналитика и отчёты¶
Задача: Построить отчёт о просмотрах за месяц
Скрипт для сбора статистики:
import requests
import json
from datetime import datetime, timedelta
from collections import defaultdict
API_URL = "https://catena.example.com/tv-management/api/v1"
API_KEY = "your-api-key"
def get_monthly_report(year, month):
# Начало и конец месяца
start_date = datetime(year, month, 1)
if month == 12:
end_date = datetime(year + 1, 1, 1)
else:
end_date = datetime(year, month + 1, 1)
start_ts = int(start_date.timestamp())
end_ts = int(end_date.timestamp())
# Получить все сеансы за месяц
all_sessions = []
cursor = None
while True:
url = f"{API_URL}/play-sessions?opened_at_gte={start_ts}&opened_at_lt={end_ts}"
if cursor:
url += f"&cursor={cursor}"
response = requests.get(url, headers={"X-Auth-Token": API_KEY})
data = response.json()
all_sessions.extend(data['sessions'])
cursor = data.get('next')
if not cursor:
break
# Анализ
stats = {
'total_sessions': len(all_sessions),
'unique_subscribers': len(set(s['subscriberId'] for s in all_sessions)),
'total_bytes': sum(s['bytes'] for s in all_sessions),
'channels': defaultdict(int),
'daily': defaultdict(int)
}
for session in all_sessions:
# По каналам
stats['channels'][session['channelName']] += 1
# По дням
day = datetime.fromtimestamp(session['openedAt']).strftime('%Y-%m-%d')
stats['daily'][day] += 1
# Сортировка каналов по популярности
stats['top_channels'] = sorted(
stats['channels'].items(),
key=lambda x: x[1],
reverse=True
)[:10]
return stats
# Получить отчёт за апрель 2024
report = get_monthly_report(2024, 4)
print(f"Отчёт за апрель 2024")
print(f"Всего сеансов: {report['total_sessions']}")
print(f"Уникальных зрителей: {report['unique_subscribers']}")
print(f"Передано данных: {report['total_bytes'] / 1024**3:.2f} ГБ")
print(f"\nТоп-10 каналов:")
for channel, views in report['top_channels']:
print(f" {channel}: {views} просмотров")
Сценарий 5: Биллинг по трафику¶
Задача: Подсчитать потреблённый трафик каждым абонентом
def calculate_traffic_billing(start_date, end_date, price_per_gb=0.5):
"""
Подсчитать стоимость трафика для каждого абонента
price_per_gb: цена за 1 ГБ в долларах
"""
start_ts = int(start_date.timestamp())
end_ts = int(end_date.timestamp())
# Получить все сеансы за период
sessions = []
cursor = None
while True:
url = f"{API_URL}/play-sessions?opened_at_gte={start_ts}&opened_at_lt={end_ts}"
if cursor:
url += f"&cursor={cursor}"
response = requests.get(url, headers={"X-Auth-Token": API_KEY})
data = response.json()
sessions.extend(data['sessions'])
cursor = data.get('next')
if not cursor:
break
# Группировка по абонентам
subscriber_traffic = defaultdict(int)
for session in sessions:
subscriber_traffic[session['subscriberId']] += session['bytes']
# Расчёт стоимости
billing = []
for subscriber_id, bytes_used in subscriber_traffic.items():
gb_used = bytes_used / 1024**3
cost = gb_used * price_per_gb
billing.append({
'subscriber_id': subscriber_id,
'bytes': bytes_used,
'gb': round(gb_used, 2),
'cost': round(cost, 2)
})
# Сортировка по стоимости
billing.sort(key=lambda x: x['cost'], reverse=True)
return billing
# Рассчитать за последний месяц
end = datetime.now()
start = end - timedelta(days=30)
billing = calculate_traffic_billing(start, end)
print("Топ-10 потребителей трафика:")
for item in billing[:10]:
print(f"Абонент {item['subscriber_id']}: {item['gb']} ГБ = ${item['cost']}")
Лучшие практики¶
Периодический сбор данных¶
Рекомендации:
- Активные сеансы: опрашивать каждые 30-60 секунд для мониторинга
- История: собирать раз в день для анализа
- Архивирование: перемещать старые данные (>30 дней) в холодное хранилище
Пример cron job:
# Каждую минуту — мониторинг активных сеансов
*/1 * * * * /usr/local/bin/monitor-active-sessions.sh
# Каждый час — сбор статистики
0 * * * * /usr/local/bin/collect-hourly-stats.sh
# Каждый день в 01:00 — генерация отчётов
0 1 * * * /usr/local/bin/generate-daily-report.sh
Оптимизация запросов¶
Используйте фильтры для уменьшения объёма данных:
# ПЛОХО — получить все сеансы
curl -X GET "$API_URL/play-sessions"
# ХОРОШО — только активные
curl -X GET "$API_URL/play-sessions?active=true"
# ЛУЧШЕ — активные за последний час
TIMESTAMP_1H=$(date -d '1 hour ago' +%s)
curl -X GET "$API_URL/play-sessions?active=true&opened_at_gte=$TIMESTAMP_1H"
Хранение исторических данных¶
Стратегия хранения:
import sqlite3
from datetime import datetime
def archive_sessions_to_db(db_path='sessions.db'):
"""Сохранить завершённые сеансы в локальную БД"""
# Подключение к SQLite
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Создать таблицу (если не существует)
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
subscriber_id TEXT,
channel_id TEXT,
channel_name TEXT,
opened_at INTEGER,
closed_at INTEGER,
bytes INTEGER,
ip TEXT,
user_agent TEXT
)
''')
# Получить закрытые сеансы за последние 24 часа
timestamp_24h = int(time.time()) - 24 * 3600
response = requests.get(
f"{API_URL}/play-sessions?active=false&opened_at_gte={timestamp_24h}",
headers={"X-Auth-Token": API_KEY}
)
sessions = response.json()['sessions']
# Сохранить в БД
for session in sessions:
cursor.execute('''
INSERT OR IGNORE INTO sessions VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
session['sessionId'],
session['subscriberId'],
session['channelId'],
session['channelName'],
session['openedAt'],
session['closedAt'],
session['bytes'],
session['ip'],
session['userAgent']
))
conn.commit()
conn.close()
return len(sessions)
# Запускать ежедневно
archived = archive_sessions_to_db()
print(f"Archived {archived} sessions")
Мониторинг аномалий¶
Настройка алертов:
def check_anomalies_and_alert():
"""Проверка аномалий и отправка уведомлений"""
issues = []
# 1. Проверить слишком долгие сеансы (>24 часа)
threshold = int(time.time()) - 24 * 3600
response = requests.get(
f"{API_URL}/play-sessions?active=true",
headers={"X-Auth-Token": API_KEY}
)
for session in response.json()['sessions']:
if session['openedAt'] < threshold:
issues.append({
'type': 'long_session',
'session_id': session['sessionId'],
'subscriber': session['subscriberId'],
'duration_hours': (time.time() - session['openedAt']) / 3600
})
# 2. Проверить account sharing
concurrent_ips = detect_concurrent_ips()
for anomaly in concurrent_ips:
if anomaly['count'] > 2:
issues.append({
'type': 'account_sharing',
'subscriber': anomaly['subscriber'],
'ip_count': anomaly['count']
})
# 3. Отправить уведомления
if issues:
send_alert(issues)
return issues
def send_alert(issues):
"""Отправка уведомлений (email, Slack, Telegram)"""
message = "⚠️ Обнаружены аномалии:\n\n"
for issue in issues:
if issue['type'] == 'long_session':
message += f"- Долгий сеанс: {issue['session_id']} ({issue['duration_hours']:.1f}ч)\n"
elif issue['type'] == 'account_sharing':
message += f"- Account sharing: {issue['subscriber']} ({issue['ip_count']} IP)\n"
# Отправить в Slack
# requests.post(SLACK_WEBHOOK_URL, json={'text': message})
print(message)
Устранение проблем¶
Сеансы не создаются¶
Проблема: Абоненты смотрят, но сеансы не появляются в API
Возможные причины:
- Streaming-сервер не интегрирован с Management API
- Неверная конфигурация webhook на streaming-сервере
- Проблемы с сетью между серверами
Решение:
- Проверьте конфигурацию streaming-сервера (Flussonic)
- Убедитесь, что webhook настроен на Management API
- Проверьте логи streaming-сервера на ошибки
Сеансы не закрываются¶
Проблема: Сеансы остаются активными даже после остановки просмотра
Причины:
- Абонент закрыл приложение без корректной остановки потока
- Потеря сетевого соединения
- Streaming-сервер не отправил webhook о закрытии
Решение:
- Сеансы имеют timeout (обычно 5-10 минут неактивности)
- Проверяйте поле
updatedAt
— если давно не обновлялось, сеанс "мёртвый" - Настройте автоматическую очистку "зависших" сеансов
Неточные данные по трафику¶
Проблема: Поле bytes
не соответствует реальному трафику
Причины:
- Streaming-сервер обновляет счётчик периодически, а не в реальном времени
- Сеанс был прерван до финального обновления
- Разные методы подсчёта (payload vs full packets)
Решение:
- Используйте данные как приблизительные оценки
- Для точного биллинга используйте логи streaming-сервера
- Учитывайте задержку обновления (обычно 30-60 секунд)
См. также¶
- Управление абонентами — создание и настройка учётных записей
- Управление каналами — настройка телевизионных каналов
- Журнал операций — аудит действий в системе
- Управление порталами — изоляция данных между порталами