74 lines
2 KiB
Go
74 lines
2 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
|
|
"github.com/bluesky-social/indigo/api/atproto"
|
|
"github.com/bluesky-social/indigo/api/bsky"
|
|
"github.com/bluesky-social/indigo/events"
|
|
"github.com/bluesky-social/indigo/events/schedulers/sequential"
|
|
lexutil "github.com/bluesky-social/indigo/lex/util"
|
|
"github.com/bluesky-social/indigo/repo"
|
|
"github.com/bluesky-social/indigo/repomgr"
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
func main() {
|
|
uri := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
|
|
con, _, err := websocket.DefaultDialer.Dial(uri, http.Header{})
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
rsc := &events.RepoStreamCallbacks{
|
|
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
|
|
r, err := repo.ReadRepoFromCar(context.Background(), bytes.NewReader(evt.Blocks))
|
|
if err != nil {
|
|
return fmt.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err)
|
|
}
|
|
|
|
for _, op := range evt.Ops {
|
|
ek := repomgr.EventKind(op.Action)
|
|
switch ek {
|
|
case repomgr.EvtKindCreateRecord:
|
|
rc, rec, err := r.GetRecord(context.Background(), op.Path)
|
|
if err != nil {
|
|
e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err)
|
|
slog.Info("error", e)
|
|
continue
|
|
}
|
|
|
|
if lexutil.LexLink(rc) != *op.Cid {
|
|
return fmt.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid)
|
|
}
|
|
|
|
logger := slog.Default()
|
|
switch record := rec.(type) {
|
|
case *bsky.FeedPost:
|
|
logger.WithGroup("post").Info(
|
|
"create",
|
|
slog.String("createDate", record.CreatedAt),
|
|
slog.String("text", record.Text),
|
|
)
|
|
case *bsky.ActorProfile:
|
|
logger.WithGroup("actor").Info(
|
|
"create",
|
|
slog.String("repo", evt.Repo),
|
|
slog.String("name", *record.DisplayName),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
},
|
|
}
|
|
sched := sequential.NewScheduler("myfirehose", rsc.EventHandler)
|
|
err = events.HandleRepoStream(context.Background(), con, sched, slog.Default())
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|