Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial implementation of the indexer #7

Merged
merged 31 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c53055f
feat: update our state from IPNI state
bajtos Sep 4, 2024
2ec7716
Merge branch 'main' into indexer-impl
bajtos Sep 4, 2024
fe0e9cc
fixup! finish "update our state from IPNI state"
bajtos Sep 4, 2024
9bbe3bc
start working in the advertisement walker
bajtos Sep 4, 2024
ef48905
Update indexer/lib/typings.d.ts
bajtos Sep 4, 2024
df599e1
handle advertisement with no entries
bajtos Sep 5, 2024
7482b0b
small tweaks, fix tests
bajtos Sep 10, 2024
8c612a1
feat: finish processNextAdvertisement
bajtos Sep 10, 2024
3e0d552
refactor: cleanup
bajtos Sep 10, 2024
459a14c
docs: update the design doc
bajtos Sep 10, 2024
36174d8
Merge branch 'main' into indexer-impl
bajtos Sep 10, 2024
17a05e8
walk one step
bajtos Sep 11, 2024
9a93803
refactor: move loop runners to lib
bajtos Sep 11, 2024
72da9eb
wip: walk providers independently from each other
bajtos Sep 11, 2024
39e257a
wip: don't retry failed op too often
bajtos Sep 12, 2024
a22bc7a
don't retry failed op too often, keep the IPNI state in memory only
bajtos Sep 12, 2024
c86f645
feat: detect HTTP request errors
bajtos Sep 16, 2024
a6cb5e2
Merge branch 'main' into indexer-impl
bajtos Sep 16, 2024
778f3e9
cleanup
bajtos Sep 16, 2024
d14a7f0
feat: repository support for REST API
bajtos Sep 16, 2024
89ec9b2
Merge branch 'main' into indexer-impl
bajtos Sep 16, 2024
a0a153a
ci: setup Redis for tests
bajtos Sep 16, 2024
1e31450
test: remove race condition in the timeout test
bajtos Sep 16, 2024
cafe142
feat: deploy to Fly.io
bajtos Sep 16, 2024
455b1ae
perf: remember walker state between steps
bajtos Sep 18, 2024
d89d33a
Update indexer/bin/piece-indexer.js
bajtos Sep 18, 2024
0755f2a
Update indexer/bin/piece-indexer.js
bajtos Sep 18, 2024
fa18f10
fixup! providerIdsBeingWalked
bajtos Sep 18, 2024
052ed7a
fixup! last_head -> lastHead
bajtos Sep 18, 2024
cc70be0
fixup! fix walker state reuse
bajtos Sep 18, 2024
faaa285
Update indexer/package.json
bajtos Sep 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jobs:
- uses: actions/setup-node@v4
with:
node-version: 20
- uses: supercharge/redis-github-action@1.8.0
- run: npm ci
- run: npm test

Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@
A lightweight IPNI node mapping Filecoin PieceCID → payload block CID.

- [Design doc](./docs/design.md)

## Development

```bash
docker run --name redis -p 6379:6379 -d redis
npm start -w indexer
```
205 changes: 88 additions & 117 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ Label-based CID discovery and full Piece Index sampling.

Let's implement a lightweight IPNI ingester that will process the advertisements
from Filecoin SPs and extract the list of `(ProviderID, PieceCID, PayloadCID)`
entries. Store these entries in a Postgres database. Provide a REST API endpoint
accepting `(ProviderID, PieceCID)` and returning a single `PayloadCID`.
entries. Store these entries in a persisted datastore (Redis). Provide a REST
API endpoint accepting `(ProviderID, PieceCID)` and returning a single
`PayloadCID`.

### Terminology

Expand Down Expand Up @@ -188,57 +189,62 @@ head is published before we finish processing the chain.

#### Proposed algorithm

**Persisted state (per provider)**
**Per-provider state**

Use the following per-provider state persisted in the database:
Use the following per-provider state:

- `provider_id` - Primary key.
- `providerId` - Primary key.

- `provider_address` - Provider's address where we can fetch advertisements
from.
- Provider info obtained from IPNI - stored in memory only:

- `last_head` - The CID of the head where we started the previous walk (the last
walk that has already finished). All advertisements from `last_head` to the
end of the chain have already been processed.
- `providerAddress` - Provider's address where we can fetch advertisements
from.

- `next_head` - The CID of the most recent head seen by cid.contact. This is
where we need to start the next walk from.
- `lastAdvertisementCID` - The CID of the most recent head seen by
cid.contact. This is where we need to start the next walk from.

> **Note:** The initial walk will take a long time to complete. While we are
> walking the "old" chain, new advertisements (new heads) will be announced to
> IPNI.
>
> - `next_head` is the latest head announced to IPNI
> - `head` is the advertisement where the current walk-in-progress started
>
> I suppose we don't need to keep track of `next_head`. When the current walk
> finishes, we will wait up to one minute until we make another request to
> cid.contact to find what are the latest heads for each SPs.\_
>
> In the current proposal, when the current walk finishes, we can immediately
> continue with walking from the `next_head`.
- Provider walker state - persisted in our datastore (Redis):

- `head` - The CID of the head advertisement we started the current walk from.
We update this value whenever we start a new walk.
- `head` - The CID of the head advertisement we started the current walk from.
We update this value whenever we start a new walk.

- `tail` - The CID of the next advertisement in the chain that we need to
process in the current walk.
- `tail` - The CID of the next advertisement in the chain that we need to
process in the current walk.

- `lastHead` - The CID of the head where we started the previous walk (the
last walk that has already finished). All advertisements from `lastHead` to
the end of the chain have already been processed.

> **Note:** The initial walk will take a long time to complete. While we are
> walking the "old" chain, new advertisements (new heads) will be announced
> to IPNI.
>
> - `lastAdvertisementCID` is the latest head announced to IPNI
> - `head` is the advertisement where the current walk-in-progress started
>
> I suppose we don't need to keep track of `lastAdvertisementCID`. When the
> current walk finishes, we could wait up to one minute until we make
> another request to cid.contact to find what are the latest heads for each
> SPs.
>
> In the current proposal, when the current walk finishes, we can
> immediately continue with walking from the `lastAdvertisementCID`.

We must always walk the chain all the way to the genesis or to the entry we have
already seen & processed.

The current walk starts from `head` and walks up to `last_head`. When the
current walk reaches `last_head`, we need to set `last_head ← head` so that the
next walk knows where to stop.
The current walk starts from `head` and walks up to `lastHead`. When the current
walk reaches `lastHead`, we need to set `last_head ← head` so that the next walk
knows where to stop.

`next_head` is updated every minute when we query cid.contact for the latest
heads. If the walk takes longer than a minute to finish, then `next_head` will
change and we cannot use it for `last_head`.
`lastAdvertisementCID` is updated every minute when we query cid.contact for the
latest heads. If the walk takes longer than a minute to finish, then
`lastAdvertisementCID` will change and we cannot use it for `lastHead`.

Here is how the state looks like in the middle of a walk:

```
next_head --> [ ] -\
lastAdCID --> [ ] -\
↓ |
... | entries announced after we started the current walk
↓ |
Expand All @@ -256,7 +262,7 @@ next_head --> [ ] -\
↓ |
[ ] -/
last_head --> [ ] -\
lastHead --> [ ] -\
↓ |
... | entries visited in the previous walks
↓ |
Expand All @@ -272,89 +278,63 @@ Every minute, run the following high-level loop:
1. Fetch the list of providers and their latest advertisements (heads) from
https://cid.contact/providers. (This is **one** HTTP request.)

2. Fetch the state of all providers from our database. (This is **one** SQL
query.)

3. Update the state of each provider as described below, using the name
`LastAdvertisement` for the CID of the latest advertisement provided in the
response from cid.contact. (This is a small bit of computation plus **one**
SQL query.)
2. Update the in-memory info we keep for each provider (address, CID of the last
advertisement).

> **Note:** Instead of running the loop every minute, we can introduce a
> one-minute delay between the iterations instead. It should not matter too much
> in practice, though. I expect each iteration to finish within one minute:
>
> - The three steps outlined above require only three interactions over
> HTTP/SQL.
> - The long chain walks are executed in the background and don't block this
> loop.
> in practice, though. I expect each iteration to finish within one minute, as
> it's just a single HTTP call to cid.contact.

For each provider listed in the response:
**Walk advertisement chains (in background)**

1. If `last_head` is not set, then we need to start the ingestion from scratch.
Update the state as follows and start the chain walker:
The chain-walking algorithm runs in the background and loops over the following
steps.
juliangruber marked this conversation as resolved.
Show resolved Hide resolved

```
last_head := new_head
next_head := new_head
head := new_head
tail := new_head
```
1. Preparation

2. If `LastAdvertisement` is the same as `next_head`, then there was no change
since we checked the head last time and we are done.
- If `tail` is not null, then there is an ongoing walk of the chain we need
to continue.

3. If `tail` is not null, then there is an ongoing walk of the chain we need to
finish before we can ingest new advertisements. Update the state as follows
and abort.
- Otherwise, if `nextHead` is the same as `lastHead`, then there are no new
advertisement to process and the walk immediately returns.

```
next_head := LastAdvertisement
```
- Otherwise, we are starting a new walk. Update the walker state as follows:

4. `tail` is null, which means we have finished ingesting all advertisements
from `head` to the end of the chain. Update the state as follows and start
the chain walker.
```
head := newHead
tail := newHead
```

```
next_head := new_head
head := new_head
tail := new_head
```
(`lastHead` does not change until we finish the walk.)

**Walk advertisement chains (in background)**
2. Take one step

The chain-walking algorithm runs in the background and loops over the following
steps:
1. Fetch the advertisement identified by `tail` from the index provider.

1. If ` tail == last_head || tail == null`, then we finished the walk. Update
the state as follows:
2. Process the metadata and entries to extract up to one
`(PieceCID, PayloadCID)` entry to be added to the index and `PreviousID`
linking to the next advertisement in the chain to process.

```
last_head := head
head := null
tail := null
```
3. Update the worker state

If `next_head != last_head` then start a new walk by updating the state as
follows:
- If `PreviousID == lastHead || PreviousID == null`, then we finished the
walk. Update the state as follows:

```
head := next_head
tail := next_head
```
```
lastHead := head
head := null
tail := null
```

2. Otherwise take a step to the next item in the chain:
- Otherwise, update the `tail` field using the `PreviousID` field from the
advertisement.

1. Fetch the advertisement identified by `tail` from the index provider.
2. Process the metadata and entries to extract one `(PieceCID, PayloadCID)`
entry.
3. Update the `tail` field using the `PreviousID` field from the
advertisement.
```
tail := PreviousID
```

```
tail := PreviousID
```
4. Persist the new state in the database.

#### Handling the Scale

Expand All @@ -374,20 +354,11 @@ small steps that can be scheduled and executed independently. This allows us to
avoid the complexity of managing long-running per-provider tasks and instead
repeatedly execute one step of the process.

Loop 1: Every minute, fetch the latest provider information from cid.contact and
update the persisted state as outlined above.

Loop 2: Discover walks in progress and make one step in each walk.

1. Find all provider state records where `tail != null`.

2. For each provider, execute one step as described above. We can execute these
steps in parallel. Since each parallel job will query a different provider,
we are not going to overload any single provider.

3. Optionally, we can introduce a small delay before the next iteration. I think
we won't need it because the time to execute SQL queries should create enough
delay.
However, requests for advertisements from providers can take different amount of
time. Some providers are not configured properly and the request fails after a
long timeout. Such slow providers must not block the ingestion of advertisements
from faster providers, therefore we still need some sort of a per-provider task
runner.

### REST API

Expand Down Expand Up @@ -446,10 +417,10 @@ Response in JSON format:

```json
{
"providerId": "state.provider_id",
"providerAddress": "state.provider_address",
"providerId": "state.providerId",
"providerAddress": "state.providerAddress",
"ingestionStatus": "state.ingestion_status",
"lastHeadWalkedFrom": "state.last_head",
"lastHeadWalkedFrom": "state.lastHead",
"piecesIndexed": 123
// ^^ number of (PieceCID, PayloadCID) records found for this provider
}
Expand Down
58 changes: 58 additions & 0 deletions indexer/bin/piece-indexer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import assert from 'assert'
import { Redis } from 'ioredis'
import { walkChain } from '../lib/advertisement-walker.js'
import { runIpniSync } from '../lib/ipni-watcher.js'
import { RedisRepository } from '../lib/redis-repository.js'

/** @import { ProviderToInfoMap } from '../lib/typings.d.ts' */

const {
REDIS_URL: redisUrl = 'redis://localhost:6379'
} = process.env

// TODO: setup Sentry

const redisUrlParsed = new URL(redisUrl)
const redis = new Redis({
host: redisUrlParsed.hostname,
port: Number(redisUrlParsed.port),
username: redisUrlParsed.username,
password: redisUrlParsed.password,
lazyConnect: true, // call connect() explicitly so that we can exit on connection error
family: 6 // required for upstash
})

await redis.connect()
const repository = new RedisRepository(redis)

/** @type {Set<string>} */
const providerIdsActivelyWalked = new Set()
bajtos marked this conversation as resolved.
Show resolved Hide resolved

/** @type {ProviderToInfoMap} */
const recentProvidersInfo = new Map()

/**
* @param {string} providerId
*/
const getProviderInfo = async (providerId) => {
const info = recentProvidersInfo.get(providerId)
assert(!!info, `Unknown providerId ${providerId}`)
bajtos marked this conversation as resolved.
Show resolved Hide resolved
return info
}

for await (const providerInfos of runIpniSync({ minSyncIntervalInMs: 60_000 })) {
for (const [providerId, providerInfo] of providerInfos.entries()) {
recentProvidersInfo.set(providerId, providerInfo)
if (providerIdsActivelyWalked.has(providerId)) continue

providerIdsActivelyWalked.add(providerId)
walkChain({
repository,
providerId,
getProviderInfo,
minStepIntervalInMs: 100
}).finally(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you worried about the maximum concurrency this can run in?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question! My current thinking is that there are several thousand storage providers now, plus maybe a few hundred non-Filecoin index providers. We should not be running more than 10k concurrent walkers. Walkers spend most of their time waiting for I/O (Redis, HTTP requests to index providers) or sleeping between steps.

I think that Node.js can easily handle such load. WDYT?

But! Maybe this is a sign that we need more visibility into this aspect. What I can do - but preferably in a follow-up pull request, is add an InfluxDB client and periodically write a data point with the number of concurrent walkers.

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to let it fail, debug and then see that maximum concurrency is the issue. It would be something else if we knew yes 100% it is, but like this I think it's fine.

What we need monitoring for is whether the indexer is delayed, I believe as long as it's not, no other metric is important.

() => providerIdsActivelyWalked.delete(providerId)
)
}
}
Loading
Loading