package main import ( "bytes" "context" "fmt" "log/slog" "net/http" "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/events/schedulers/sequential" lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/repo" "github.com/bluesky-social/indigo/repomgr" "github.com/gorilla/websocket" ) func main() { uri := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" con, _, err := websocket.DefaultDialer.Dial(uri, http.Header{}) if err != nil { return } rsc := &events.RepoStreamCallbacks{ RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { r, err := repo.ReadRepoFromCar(context.Background(), bytes.NewReader(evt.Blocks)) if err != nil { return fmt.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err) } for _, op := range evt.Ops { ek := repomgr.EventKind(op.Action) switch ek { case repomgr.EvtKindCreateRecord: rc, rec, err := r.GetRecord(context.Background(), op.Path) if err != nil { e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err) slog.Info("error", e) continue } if lexutil.LexLink(rc) != *op.Cid { return fmt.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) } logger := slog.Default() switch record := rec.(type) { case *bsky.FeedPost: logger.WithGroup("post").Info( "create", slog.String("createDate", record.CreatedAt), slog.String("text", record.Text), ) case *bsky.ActorProfile: logger.WithGroup("actor").Info( "create", slog.String("repo", evt.Repo), slog.String("name", *record.DisplayName), ) } } } return nil }, } sched := sequential.NewScheduler("myfirehose", rsc.EventHandler) err = events.HandleRepoStream(context.Background(), con, sched, slog.Default()) if err != nil { return } }