Real-time pipeline — engine → JSONL → Telegram

2026-05-02 · pipeline · ~5 min

Když bot zavře pozici, do 5 vteřin se to objeví v @botlab_cz Telegram channelu. Tady je celá architektura — bez Kafky, bez Redis, bez WebSocketů.

Komponenty

copybot engine ──┐
memebot ─────────┼──► /var/log/botlab/feed.jsonl ──► broadcaster.py ──► Telegram Bot API ──► @botlab_cz
forexbot ────────┘                                       │
                                                         └──► broadcaster.state (byte offset)

Tři producenti, jeden append-only soubor, jeden konzument, jedno persistentní místo pro cursor.

Producent: emit do JSONL

Každý bot má v engine.py malou helper funkci, kterou volá při close trade:

def _emit_feed(side, **kw):
    try:
        rec = {
            "ts": datetime.now(timezone.utc).isoformat(),
            "bot": _bot_letter,
            "side": side,
            **kw
        }
        with open("/var/log/botlab/feed.jsonl", "a", encoding="utf-8") as f:
            f.write(json.dumps(rec, ensure_ascii=False) + "\n")
    except Exception:
        pass  # never crash bot from logging

Klíčové detaily:

Konzument: tail + cursor

botlab_broadcaster.py je 100-řádkový script s tímhle loopem:

FEED = Path("/var/log/botlab/feed.jsonl")
STATE = Path("/var/log/botlab/broadcaster.state")

def main():
    last_offset = int(STATE.read_text() or 0) if STATE.exists() else 0
    if last_offset == 0:
        # First run — start at end, don't replay history
        last_offset = FEED.stat().st_size
        STATE.write_text(str(last_offset))

    while True:
        sz = FEED.stat().st_size
        if sz < last_offset:
            last_offset = 0  # truncated/rotated
        if sz > last_offset:
            with FEED.open("r") as f:
                f.seek(last_offset)
                new = f.read()
                last_offset = f.tell()
            for line in new.splitlines():
                ev = json.loads(line)
                post_telegram(format_event(ev))
                time.sleep(2)  # TG rate limit
            STATE.write_text(str(last_offset))
        time.sleep(15)

Co tahle smyčka řeší:

Telegram Bot API

def post_telegram(text: str) -> bool:
    url = f"https://api.telegram.org/bot{TG_TOKEN}/sendMessage"
    data = urllib.parse.urlencode({
        "chat_id": "@botlab_cz",
        "text": text,
        "parse_mode": "HTML",
        "disable_web_page_preview": "true",
    }).encode()
    try:
        urllib.request.urlopen(urllib.request.Request(url, data=data), timeout=10)
        return True
    except Exception as e:
        log.error("TG post failed: %s", e)
        return False

Žádné dependencies. urllib.request ze stdlib stačí na 90% Bot API operací.

Systemd unit

[Unit]
Description=BotLab Telegram broadcaster
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
User=copybot
EnvironmentFile=/etc/default/botlab-broadcaster
ExecStart=/usr/bin/python3 /opt/copybot/botlab_broadcaster.py
Restart=on-failure
RestartSec=10
StandardOutput=append:/var/log/copybot/broadcaster.log
StandardError=append:/var/log/copybot/broadcaster.log

[Install]
WantedBy=multi-user.target

/etc/default/botlab-broadcaster obsahuje secret:

TELEGRAM_BOT_TOKEN=8766326413:AAH...
TELEGRAM_CHAT_ID=@botlab_cz

Bot promotion — Bot API limitation

Bot API nemůže sám sebe promote-nout na admin v channelu. To je hard limit Telegramu. Musíš to udělat ručně z mobilu, NEBO přes user MTProto session.

BotLab používá Telethon s user account session, jednorázově:

from telethon import TelegramClient
from telethon.tl.functions.channels import EditAdminRequest
from telethon.tl.types import ChatAdminRights

client = TelegramClient("session", api_id, api_hash)
await client.start(phone="+420...")
await client(EditAdminRequest(
    channel="botlab_cz",
    user_id="botlab_cz_bot",
    admin_rights=ChatAdminRights(post_messages=True, edit_messages=True),
    rank="Broadcaster",
))

Po prvním přihlášení (SMS code) je session uložená v session.session souboru, dál se může používat bez interakce. Ale je to user account, ne bot — nemůžeš to dát do publik repu.

Alternativy které jsem zvážil

OptionCo delaProč ne
Redis pub/subReal-time broadcast across servicesOverkill pro 1 producent → 1 konzument; další service co rebootovat
RabbitMQ / NATSRobust message brokerJako Redis ale ještě těžší
WebSocket pushBot direct na TG WebSocketTG nemá WS push pro Bot API; jen long-poll getUpdates
SQLite trigger + listenerNotify on INSERTSQLite nemá native LISTEN/NOTIFY; musel bych poll-ovat anyway
JSONL append + tailCo používámProč ano: 0 dependencies, 0 db, debug-able grep, restart-safe, kompletní pipeline pod 200 řádků
"Když máš jednoho producenta a jednoho konzumenta na stejném stroji, JSONL + cursor je single best technika. Žádný overhead, plnou kontrolu, debug je jen tail -f feed.jsonl."

Co se může pokazit

Latence end-to-end

Měření na produkci 2026-05-02:

Pro paper-trading channel to stačí. Pro HFT signál by 7s bylo ošklivé, ale tady jsme v rozmezí "uživatel obnoví channel a vidí novej post".

Engine + broadcaster code je na GitHubu. Pokud máš tip na zlepšení (proper journald structured logging? graceful TG flood handling?), open PR.