Streaming¶
itdpy поддерживает sync SSE streaming для уведомлений.
Итерация через for¶
for event in client.notifications.stream():
print(event.event, event.data)
Callback стиль¶
stream = client.notifications.stream()
@stream.on("connected")
def on_connected(event):
print("Connected!", event.data)
@stream.on("notification")
def on_notification(event):
print("Got notification:", event.data)
@stream.on("reconnecting")
def on_reconnecting(event):
delay = event.data.get("delay")
print(f"Reconnecting in {delay}s (token refreshed automatically)")
@stream.on("error")
def on_error(event):
print(f"Error: {event.data.get('message')}")
stream.run()
Фильтрация по типу уведомления¶
from itdpy.models import NotificationType
@stream.on("notification", type=NotificationType.LIKE)
def on_like(event):
print(event.data)
Доступные значения NotificationType: LIKE, COMMENT, REPLY, REPOST, FOLLOW, WALL_POST.
Остановка¶
stream.stop()
keep_online¶
client.keep_online(
on_event=lambda event_type, data: print(event_type, data),
background=True,
)
События Stream¶
| Событие | Описание | Данные |
|---|---|---|
connected |
Успешное подключение | ConnectedEventData (user_id, timestamp) |
notification |
Новое уведомление | Notification (type, actor, preview) |
reconnecting |
Переподключение | {"delay": int} (секунды до переподключения) |
error |
Ошибка | {"message": str} |