atproto-firehose/main.go
2025-10-15 11:56:08 +00:00

74 lines
2 KiB
Go

package main
import (
"bytes"
"context"
"fmt"
"log/slog"
"net/http"
"github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/events/schedulers/sequential"
lexutil "github.com/bluesky-social/indigo/lex/util"
"github.com/bluesky-social/indigo/repo"
"github.com/bluesky-social/indigo/repomgr"
"github.com/gorilla/websocket"
)
func main() {
uri := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
con, _, err := websocket.DefaultDialer.Dial(uri, http.Header{})
if err != nil {
return
}
rsc := &events.RepoStreamCallbacks{
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
r, err := repo.ReadRepoFromCar(context.Background(), bytes.NewReader(evt.Blocks))
if err != nil {
return fmt.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err)
}
for _, op := range evt.Ops {
ek := repomgr.EventKind(op.Action)
switch ek {
case repomgr.EvtKindCreateRecord:
rc, rec, err := r.GetRecord(context.Background(), op.Path)
if err != nil {
e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err)
slog.Info("error", e)
continue
}
if lexutil.LexLink(rc) != *op.Cid {
return fmt.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid)
}
logger := slog.Default()
switch record := rec.(type) {
case *bsky.FeedPost:
logger.WithGroup("post").Info(
"create",
slog.String("createDate", record.CreatedAt),
slog.String("text", record.Text),
)
case *bsky.ActorProfile:
logger.WithGroup("actor").Info(
"create",
slog.String("repo", evt.Repo),
slog.String("name", *record.DisplayName),
)
}
}
}
return nil
},
}
sched := sequential.NewScheduler("myfirehose", rsc.EventHandler)
err = events.HandleRepoStream(context.Background(), con, sched, slog.Default())
if err != nil {
return
}
}