SemiLayerDocs

Realtime Sync

SemiLayer gives you four distinct paths for keeping the vector index fresh. They differ in how much infrastructure you need to set up, how quickly changes land, and whether deletes propagate automatically.

The four paths at a glance

PathBridge scanWipes firstHash dedupDetects deletesTypical use
Smart syncAll rowsNoYesYes — tombstoneDefault refresh. Catches edits and deletes, only re-embeds rows whose content changed. Console button or semilayer sync only — no webhook path.
IncrementalWHERE updated_at > $cursorNoYesNoCheap polling. Fires automatically from syncInterval in config. Misses deletes and rows where the tracking column didn't move.
RecordsSpecific IDs from the change bufferNoYesYes — explicit action: 'delete'CDC push. For sources with native change-data-capture. Sub-second latency, zero polling overhead.
RebuildAll rowsYes — destructiven/an/aClean slate. Use after a mapping config change, partition corruption, or when certainty matters more than speed.

The mental model: Smart sync is the default. Use Rebuild only when you need a clean slate. Use Incremental + Records to keep things fresh automatically between manual refreshes — Incremental via syncInterval for the cheap path, Records via the webhook for the precise path.


The easy path: config and Console

You do not need to wire up any infrastructure to keep most lenses reasonably fresh. Two zero-code options handle the common case.

Option A — syncInterval (Incremental, automatic)

Add one line to your lens config and SemiLayer handles the rest. Every N minutes the worker fires an incremental scan: it reads rows where changeTrackingColumn > last_cursor, re-embeds anything new or changed, and advances the cursor.

lenses: {
  products: {
    source: 'main-db',
    table: 'public.products',
    primaryKey: 'id',
    syncInterval: '15m',               // '1m' | '5m' | '15m' | '30m' | '1h' | '6h' | '24h'
    changeTrackingColumn: 'updated_at', // defaults to 'updated_at' if present
    // ...
  },
}
semilayer push   # push the config change — sync starts ticking immediately

Under the hood, each lens's syncInterval is evaluated every minute by a global tick handler. When a lens is due, the worker reads all rows changed since the last cursor and re-embeds only those rows. No separate queue setup, no infrastructure to manage.

Tradeoffs:

SetupOne config line
LatencyUp to the interval (e.g. 15 minutes for '15m')
DeletesNot detected. Deleted rows don't appear in the scan — they linger in the index until a Smart sync or Rebuild.
CostLow — only reads changed rows

When changeTrackingColumn doesn't exist: Incremental reads all rows on every tick and skips nothing. This is equivalent to a full scan at the interval cost. If your table has no reliable timestamp column, use Smart sync instead.


Option B — Smart sync on demand (Console or CLI)

Smart sync scans every row in the source, computes a content hash, and re-embeds only rows whose hash changed since the last sync. Rows that were deleted from the source are detected by their absence (tombstone) and removed from the index.

From the Console: Open a Lens → click Smart sync. No config change required.

From the CLI:

semilayer sync                  # smart sync all lenses
semilayer sync --lens products  # smart sync one lens
⚠️

Smart sync has no webhook path — { "mode": "smart" } is not a valid ingest request body and will return 400. It is only available via the Console button and semilayer sync. Use { "mode": "full" } if you need to trigger a clean-slate rebuild from external infrastructure.

Tradeoffs:

SetupNone
LatencyCompletes after the full scan (seconds to minutes depending on table size)
DeletesYes — tombstone detection. Rows absent from the source are purged.
CostHigher than incremental — reads every row to compute hashes

Smart sync is the right choice when you've made changes that incremental can't detect: hard deletes, bulk updates that bypassed updated_at, or a manual data correction. Running it once after a migration is a common pattern.


Choosing between the two easy paths

QuestionSmart syncsyncInterval (Incremental)
Your table has an updated_at columnWorksWorks best
Your table hard-deletes rowsWorks — tombstone detects themDoes not work — deletes linger
You want fully automatic, no manual interventionRun periodically via cron or on a scheduleSet syncInterval and forget
Your table is large (millions of rows)Slower — hashes all rowsFast — only reads recent changes
You need changes in the index within secondsNot idealNot ideal — use Records mode

A common production setup is syncInterval: '15m' for continuous incremental refresh, combined with a nightly Smart sync (scheduled via semilayer sync in cron) to catch deletes and any drift that slipped through.


When to go further: the Records webhook

If neither easy path is precise enough — you need sub-second freshness, you want deletes to propagate immediately without a nightly Smart sync, or you have a high-write table where a 15-minute lag is too long — the Records webhook is the answer.

You call POST /v1/ingest/:lens from your infrastructure (a DB trigger, a Lambda, your application code) with the exact IDs that changed and whether each was an upsert or delete. SemiLayer processes only those rows.

{
  "mode": "records",
  "changes": [
    { "id": "42",  "action": "upsert" },
    { "id": "101", "action": "delete" }
  ]
}

This is what the rest of this guide is about.


Ingest Keys (ik_)

Every Environment has one ingest key auto-created at provisioning time. Ingest keys are purpose-built for this one job: calling POST /v1/ingest/:lens.

PrefixWhat it can do
ik_dev_Trigger ingest on development lenses
ik_live_Trigger ingest on production lenses

ik_ keys cannot call search, similar, query, or any other API. If one leaks, an attacker can trigger an unnecessary re-index — annoying, not catastrophic. You can revoke and rotate them freely.

sk_ (secret) keys also work for the ingest endpoint, but use ik_ in any infrastructure that doesn't need full API access.

Console: Environment → API Keys → filter by type "Ingest"

CLI:

semilayer keys list --env development
# KEY ID       TYPE    LABEL                  CREATED
# key_abc123   ik      auto (development)     2 days ago

semilayer keys create --type ingest --label "postgres-trigger"

The endpoint

POST /v1/ingest/:lens
Authorization: Bearer ik_live_...
Content-Type: application/json

:lens is the lens name from your sl.config.ts.

Response

{ "jobId": "job_8f3a...", "status": "queued" }

Or if absorbed by a debounce window:

{ "jobId": "job_8f3a...", "status": "deduplicated" }

202 Accepted in both cases. Either way the change will be processed.

Error responses

StatusMeaning
400Invalid request body — check mode and changes shape
404Lens not found — check the lens name and environment
429Rate limit exceeded (SaaS only) — retry after Retry-After seconds
💡

Enterprise deployments have no rate limits on the ingest endpoint. The 429 path only applies to SaaS plans.


Mode reference

Smart sync (Console / CLI only)

Full-table scan with hash dedup and tombstone delete detection. Only available via semilayer sync or the Console Smart sync button — there is no { "mode": "smart" } webhook body. See The easy path above.


You tell SemiLayer exactly which rows changed and what to do with them. This is the most efficient mode: no full scan, no timestamp window — just targeted operations on the rows you name.

{
  "mode": "records",
  "changes": [
    { "id": "42",  "action": "upsert" },
    { "id": "99",  "action": "upsert" },
    { "id": "101", "action": "delete" }
  ]
}

id is the value of the primaryKey field declared in your Lens, as a string.

Upsert: SemiLayer fetches the current row via the Bridge, re-embeds it, and updates the index. If the row doesn't exist in the source, its embedding is removed automatically.

Delete: The embedding is removed immediately. SemiLayer does not re-query your database — it trusts the action field. No changeTrackingColumn needed.

Debounce: 2-second window per lens. Calls within the window are merged. If the same ID appears multiple times, the last action wins — an upsert then delete for the same ID correctly results in a delete.

Batch limit: Maximum 10,000 changes per request. Split larger batches.


incremental mode

SemiLayer queries rows where changeTrackingColumn > last_cursor, re-embeds whatever changed, and advances the cursor. Normally fired automatically via syncInterval — you rarely call this directly.

{ "mode": "incremental" }

Requires changeTrackingColumn on your Lens (defaults to updated_at).

Debounce: 5-second window. Useful when bursts of change events arrive and you want them coalesced into one scan.

Limitation: incremental cannot detect hard deletes. It reads rows that changed — rows that were deleted simply aren't there to read. Use Smart sync, Records mode with action: 'delete', or a nightly Rebuild to purge them.


full (Rebuild) mode

Clears all existing embeddings for the lens and re-reads, re-embeds, and re-indexes every row from scratch.

{ "mode": "full" }

Destructive. All vectors are wiped before the scan begins. If the rebuild fails mid-way, the index is empty until it completes.

Singleton dedup: Only one Rebuild runs per lens at a time.

Use Rebuild for:

  • First-time indexing after initial push with semilayer push --rebuild
  • After changing a field's searchable weight or transform chain (embeddings are stale)
  • Recovery after corruption or a botched migration
  • When you want absolute certainty the index matches the source with no drift

How dedup and debounce work

SemiLayer's ingest queue uses debounce windows and singleton keys to absorb bursts.

ModeDedup typeWindow
recordsDebounce per lens2 seconds
incrementalDebounce per lens5 seconds
smartSingleton per lensUntil job completes
fullSingleton per lensUntil job completes

In practice: If a Postgres trigger fires POST /v1/ingest/products (incremental) on every row insert and you bulk-insert 5,000 rows in a transaction, you get one ingest job queued — not 5,000. The first call starts a 5-second timer; every subsequent call within the window returns status: 'deduplicated' and the timer resets. After 5 seconds of quiet the job runs.


Handling deletes

This is the most important thing to get right.

Which modes propagate deletes:

ModeHow deletes are handled
smartTombstone detection — rows absent from the full scan are purged
recordsExplicit action: 'delete' — you send the delete, SemiLayer trusts it
incrementalNot supported — deleted rows are invisible to the timestamp scan
full (Rebuild)All vectors wiped before rebuild — index always reflects current source

For records mode: send the delete before or after the row is gone — SemiLayer doesn't re-query the DB for deletes, so timing doesn't matter. But you must send it.

{
  "mode": "records",
  "changes": [{ "id": "101", "action": "delete" }]
}

Patterns for capturing deletes per source:

SourceApproach
PostgreSQLAFTER DELETE trigger → pg_net.http_post
MySQLAFTER DELETE trigger → changes table → poll
Prisma$extends middleware — read IDs before deleteMany, then push deletes
Application codeCall the webhook in your delete service method
DynamoDBDynamoDB Streams — REMOVE event type
MongoDBChange Streams — delete operation type

If you can't capture deletes: Use syncInterval for freshness and schedule a nightly semilayer sync (Smart sync) to tombstone-detect any deletions. This is the lowest-infrastructure delete strategy.


When a lens is paused

If a lens is paused, webhook calls are accepted and queued — not dropped. When you resume, queued jobs run in order. The 202 response you receive still means "will be processed."


10 Examples

Each example shows the complete wiring from database event to SemiLayer webhook.


1. PostgreSQL — pg_net trigger (INSERT / UPDATE / DELETE)

The cleanest Postgres approach. pg_net is available on Supabase, Neon, and most managed Postgres providers. It makes HTTP requests directly from SQL triggers — no application server involved.

-- Enable the extension (run once)
create extension if not exists pg_net;

-- Helper function that fires the webhook
create or replace function notify_semilayer()
returns trigger
language plpgsql
as $$
declare
  _action text;
  _body   text;
begin
  if (TG_OP = 'DELETE') then
    _action := 'delete';
    _body := json_build_object(
      'mode',    'records',
      'changes', json_build_array(
        json_build_object('id', OLD.id::text, 'action', _action)
      )
    )::text;
  else
    _action := 'upsert';
    _body := json_build_object(
      'mode',    'records',
      'changes', json_build_array(
        json_build_object('id', NEW.id::text, 'action', _action)
      )
    )::text;
  end if;

  perform net.http_post(
    url     := 'https://api.semilayer.com/v1/ingest/products',
    headers := '{"Authorization":"Bearer ik_live_...","Content-Type":"application/json"}',
    body    := _body
  );

  return coalesce(NEW, OLD);
end;
$$;

-- Attach to the table
create trigger semilayer_products_sync
after insert or update or delete on products
for each row execute function notify_semilayer();
💡

Store the ik_ key in a Postgres configuration parameter so it isn't hardcoded: alter system set app.semilayer_ingest_key = 'ik_live_...' then read it with current_setting('app.semilayer_ingest_key').

Delete support: Yes — TG_OP = 'DELETE' maps to action: 'delete' in real time.


2. PostgreSQL — LISTEN / NOTIFY + Node listener

No pg_net extension required. A lightweight Node.js process listens on a Postgres channel and calls the webhook. Good for environments where extensions aren't available.

create or replace function notify_semilayer_channel()
returns trigger
language plpgsql
as $$
declare
  _payload text;
begin
  _payload := json_build_object(
    'lens',   'products',
    'action', case when TG_OP = 'DELETE' then 'delete' else 'upsert' end,
    'id',     coalesce(NEW.id, OLD.id)::text
  )::text;

  perform pg_notify('semilayer_changes', _payload);
  return coalesce(NEW, OLD);
end;
$$;

create trigger semilayer_products_notify
after insert or update or delete on products
for each row execute function notify_semilayer_channel();
// listener.ts — runs alongside your app or as a standalone process
import pg from 'pg'

const client     = new pg.Client(process.env.DATABASE_URL)
const INGEST_KEY = process.env.SEMILAYER_INGEST_KEY!
const BASE_URL   = 'https://api.semilayer.com'

await client.connect()
await client.query('LISTEN semilayer_changes')

client.on('notification', async (msg) => {
  const { lens, action, id } = JSON.parse(msg.payload!)

  await fetch(`${BASE_URL}/v1/ingest/${lens}`, {
    method:  'POST',
    headers: {
      'Authorization': `Bearer ${INGEST_KEY}`,
      'Content-Type':  'application/json',
    },
    body: JSON.stringify({ mode: 'records', changes: [{ id, action }] }),
  })
})

Batching enhancement: Buffer notifications for 100ms and send them as a single changes array to reduce HTTP calls during bulk operations.


3. Supabase — Database Webhook + Edge Function

Supabase has built-in database webhooks in the Dashboard. No trigger SQL needed.

Edge Function:

// supabase/functions/semilayer-sync/index.ts
import { serve } from 'https://deno.land/std@0.168.0/http/server.ts'

const INGEST_KEY = Deno.env.get('SEMILAYER_INGEST_KEY')!
const BASE_URL   = 'https://api.semilayer.com'

serve(async (req) => {
  const { type, table, record, old_record } = await req.json()

  const action = type === 'DELETE' ? 'delete' : 'upsert'
  const id     = (record ?? old_record).id?.toString()
  const lens   = table  // "products" → lens "products"

  await fetch(`${BASE_URL}/v1/ingest/${lens}`, {
    method:  'POST',
    headers: { 'Authorization': `Bearer ${INGEST_KEY}`, 'Content-Type': 'application/json' },
    body: JSON.stringify({ mode: 'records', changes: [{ id, action }] }),
  })

  return new Response('ok')
})

Dashboard wiring:

  1. Database → Webhooks → Create a new hook
  2. Table: products, Events: INSERT, UPDATE, DELETE
  3. Webhook URL: your Edge Function URL
  4. Add SEMILAYER_INGEST_KEY in Edge Function secrets

Delete support: Yes — type === 'DELETE' maps directly.


4. MySQL / PlanetScale — Prisma $extends middleware

Prisma's query extensions intercept every write at the ORM layer. Works with any database Prisma supports — no trigger SQL needed.

// prisma-extensions.ts
import { PrismaClient } from '@prisma/client'

const INGEST_KEY = process.env.SEMILAYER_INGEST_KEY!
const BASE_URL   = 'https://api.semilayer.com'

async function notifySemiLayer(
  lens: string,
  action: 'upsert' | 'delete',
  ids: string[]
) {
  if (ids.length === 0) return
  await fetch(`${BASE_URL}/v1/ingest/${lens}`, {
    method:  'POST',
    headers: { 'Authorization': `Bearer ${INGEST_KEY}`, 'Content-Type': 'application/json' },
    body: JSON.stringify({ mode: 'records', changes: ids.map(id => ({ id, action })) }),
  })
}

export function withSemiLayer(prisma: PrismaClient) {
  return prisma.$extends({
    query: {
      product: {
        async create({ args, query }) {
          const result = await query(args)
          await notifySemiLayer('products', 'upsert', [String(result.id)])
          return result
        },
        async update({ args, query }) {
          const result = await query(args)
          await notifySemiLayer('products', 'upsert', [String(result.id)])
          return result
        },
        async updateMany({ args, query }) {
          // updateMany doesn't return IDs — fall back to incremental
          const result = await query(args)
          await fetch(`${BASE_URL}/v1/ingest/products`, {
            method:  'POST',
            headers: { 'Authorization': `Bearer ${INGEST_KEY}`, 'Content-Type': 'application/json' },
            body: JSON.stringify({ mode: 'incremental' }),
          })
          return result
        },
        async delete({ args, query }) {
          // Read ID before deleting — it won't exist after
          const existing = await prisma.product.findUnique({ where: args.where, select: { id: true } })
          const result   = await query(args)
          if (existing) await notifySemiLayer('products', 'delete', [String(existing.id)])
          return result
        },
        async deleteMany({ args, query }) {
          const rows   = await prisma.product.findMany({ where: args.where, select: { id: true } })
          const result = await query(args)
          await notifySemiLayer('products', 'delete', rows.map(r => String(r.id)))
          return result
        },
      },
    },
  })
}
// db.ts
import { PrismaClient } from '@prisma/client'
import { withSemiLayer } from './prisma-extensions'

export const db = withSemiLayer(new PrismaClient())
💡

Collect IDs before deleteMany — after deletion those rows are gone. The example above does this correctly.


5. DynamoDB Streams → Lambda

DynamoDB Streams fires a Lambda with every insert, modify, and remove.

// lambda/semilayer-sync/index.ts
import type { DynamoDBStreamEvent } from 'aws-lambda'
import { unmarshall } from '@aws-sdk/util-dynamodb'
import type { AttributeValue } from '@aws-sdk/client-dynamodb'

const INGEST_KEY = process.env.SEMILAYER_INGEST_KEY!
const BASE_URL   = process.env.SEMILAYER_BASE_URL ?? 'https://api.semilayer.com'
const LENS       = process.env.SEMILAYER_LENS!

export async function handler(event: DynamoDBStreamEvent) {
  const changes: Array<{ id: string; action: 'upsert' | 'delete' }> = []

  for (const record of event.Records) {
    const image = record.dynamodb?.NewImage ?? record.dynamodb?.OldImage
    if (!image) continue

    const item = unmarshall(image as Record<string, AttributeValue>)
    const id   = String(item.id ?? item.PK ?? item.pk)

    changes.push({
      id,
      action: record.eventName === 'REMOVE' ? 'delete' : 'upsert',
    })
  }

  if (changes.length === 0) return

  const resp = await fetch(`${BASE_URL}/v1/ingest/${LENS}`, {
    method:  'POST',
    headers: { 'Authorization': `Bearer ${INGEST_KEY}`, 'Content-Type': 'application/json' },
    body: JSON.stringify({ mode: 'records', changes }),
  })

  if (!resp.ok && resp.status !== 202) {
    throw new Error(`SemiLayer ingest failed: ${resp.status}`)
  }
}

IAM permissions for the Lambda execution role:

{
  "Effect": "Allow",
  "Action": [
    "dynamodb:GetRecords",
    "dynamodb:GetShardIterator",
    "dynamodb:DescribeStream",
    "dynamodb:ListStreams"
  ],
  "Resource": "arn:aws:dynamodb:us-east-1:123456789:table/Products/stream/*"
}

Enable DynamoDB Streams on the table with StreamViewType: NEW_AND_OLD_IMAGES. Delete support: Yes — REMOVE events carry OldImage with the deleted key.


6. MongoDB Change Streams

Change Streams give a real-time cursor over all writes. Run inside your app or as a dedicated microservice.

// semilayer-watcher.ts
import { MongoClient } from 'mongodb'

const mongo      = new MongoClient(process.env.MONGODB_URI!)
const INGEST_KEY = process.env.SEMILAYER_INGEST_KEY!
const BASE_URL   = 'https://api.semilayer.com'

await mongo.connect()
const collection = mongo.db('myapp').collection('products')

const changeStream = collection.watch(
  [{ $match: { operationType: { $in: ['insert', 'update', 'replace', 'delete'] } } }],
  { fullDocument: 'updateLookup' }
)

// Batching: buffer 150ms to reduce HTTP calls during bulk inserts
let buffer: Array<{ id: string; action: 'upsert' | 'delete' }> = []
let timer:  ReturnType<typeof setTimeout> | null = null

async function flush() {
  const batch = buffer.splice(0)
  if (batch.length === 0) return
  await fetch(`${BASE_URL}/v1/ingest/products`, {
    method:  'POST',
    headers: { 'Authorization': `Bearer ${INGEST_KEY}`, 'Content-Type': 'application/json' },
    body: JSON.stringify({ mode: 'records', changes: batch }),
  })
}

for await (const change of changeStream) {
  buffer.push({
    id:     String(change.documentKey._id),
    action: change.operationType === 'delete' ? 'delete' : 'upsert',
  })
  if (!timer) {
    timer = setTimeout(async () => { timer = null; await flush() }, 150)
  }
}

Resumability: Persist changeStream.resumeToken to durable storage so the watcher can resume after a restart without missing events.


7. Express / Fastify — application service layer

If you own the write path, call the webhook directly from your service methods. No triggers, no separate process.

// lib/sync.ts
const INGEST_KEY = process.env.SEMILAYER_INGEST_KEY!
const BASE_URL   = process.env.SEMILAYER_BASE_URL ?? 'https://api.semilayer.com'

export function syncRecords(
  lens: string,
  changes: Array<{ id: string; action: 'upsert' | 'delete' }>
): void {
  // Fire-and-forget — don't block the HTTP response
  fetch(`${BASE_URL}/v1/ingest/${lens}`, {
    method:  'POST',
    headers: { 'Authorization': `Bearer ${INGEST_KEY}`, 'Content-Type': 'application/json' },
    body: JSON.stringify({ mode: 'records', changes }),
  }).catch((err) => console.error('[semilayer] sync failed', err))
}
// routes/products.ts
import { syncRecords } from '../lib/sync'

router.post('/products', async (req, res) => {
  const product = await db.products.create(req.body)
  res.json(product)
  syncRecords('products', [{ id: String(product.id), action: 'upsert' }])
})

router.patch('/products/:id', async (req, res) => {
  const product = await db.products.update(req.params.id, req.body)
  res.json(product)
  syncRecords('products', [{ id: req.params.id, action: 'upsert' }])
})

router.delete('/products/:id', async (req, res) => {
  await db.products.delete(req.params.id)
  res.status(204).end()
  syncRecords('products', [{ id: req.params.id, action: 'delete' }])
})
💡

Call syncRecords after res.json() / res.end(). The write is committed before the sync fires, and your API latency is unaffected even if SemiLayer is momentarily slow.


8. Kafka consumer

Wire a Kafka consumer for event-driven architectures where writes publish as domain events. Works with AWS MSK, Confluent Cloud, and self-hosted Kafka.

// kafka-semilayer-consumer.ts
import { Kafka } from 'kafkajs'

const kafka      = new Kafka({ brokers: [process.env.KAFKA_BROKER!] })
const consumer   = kafka.consumer({ groupId: 'semilayer-sync' })
const INGEST_KEY = process.env.SEMILAYER_INGEST_KEY!
const BASE_URL   = 'https://api.semilayer.com'

await consumer.connect()
await consumer.subscribe({ topics: ['products.events'], fromBeginning: false })

await consumer.run({
  eachBatchAutoResolve: true,
  eachBatch: async ({ batch }) => {
    const map = new Map<string, 'upsert' | 'delete'>()

    for (const message of batch.messages) {
      const event = JSON.parse(message.value!.toString())
      // { type: 'created'|'updated'|'deleted', id: string }
      map.set(String(event.id), event.type === 'deleted' ? 'delete' : 'upsert')
    }

    // Deduplicated: same ID in one batch → last action wins
    const changes = Array.from(map.entries()).map(([id, action]) => ({ id, action }))

    // Chunk to 10k per request
    for (let i = 0; i < changes.length; i += 10_000) {
      await fetch(`${BASE_URL}/v1/ingest/products`, {
        method:  'POST',
        headers: { 'Authorization': `Bearer ${INGEST_KEY}`, 'Content-Type': 'application/json' },
        body: JSON.stringify({ mode: 'records', changes: changes.slice(i, i + 10_000) }),
      })
    }
  },
})

9. AWS SQS — queue-driven sync

SQS gives guaranteed delivery and natural backpressure. Any service that writes to your DB also enqueues a message; a consumer drains the queue and calls SemiLayer.

// sqs-consumer/index.ts
import { SQSClient, ReceiveMessageCommand, DeleteMessageBatchCommand } from '@aws-sdk/client-sqs'

const sqs        = new SQSClient({ region: 'us-east-1' })
const QUEUE_URL  = process.env.SQS_QUEUE_URL!
const INGEST_KEY = process.env.SEMILAYER_INGEST_KEY!
const BASE_URL   = 'https://api.semilayer.com'

async function poll() {
  const { Messages } = await sqs.send(new ReceiveMessageCommand({
    QueueUrl: QUEUE_URL, MaxNumberOfMessages: 10, WaitTimeSeconds: 20,
  }))
  if (!Messages?.length) return

  const changes = Messages.map((msg) => {
    const body = JSON.parse(msg.Body!)
    // { entityId: string, operation: 'put'|'delete' }
    return {
      id:     String(body.entityId),
      action: body.operation === 'delete' ? 'delete' : 'upsert' as const,
    }
  })

  await fetch(`${BASE_URL}/v1/ingest/products`, {
    method:  'POST',
    headers: { 'Authorization': `Bearer ${INGEST_KEY}`, 'Content-Type': 'application/json' },
    body: JSON.stringify({ mode: 'records', changes }),
  })

  await sqs.send(new DeleteMessageBatchCommand({
    QueueUrl: QUEUE_URL,
    Entries: Messages.map((m, i) => ({ Id: String(i), ReceiptHandle: m.ReceiptHandle! })),
  }))
}

while (true) await poll()

For Lambda-based SQS consumers, SQS can invoke Lambda directly. Return normally to auto-delete messages; throw to retry. Add a Dead Letter Queue for poison messages.


10. Periodic cron + records mode (poll-and-push)

For legacy databases, third-party APIs, or flat files that don't emit change events, a cron job polls for recent changes and pushes them with records mode. This is more precise than incremental because you control exactly which IDs get pushed — including soft-deleted rows.

// cron/sync-changed-products.ts — runs every 5 minutes
import { subMinutes } from 'date-fns'

const INGEST_KEY = process.env.SEMILAYER_INGEST_KEY!
const BASE_URL   = 'https://api.semilayer.com'
const WINDOW_MIN = 6  // slightly wider than interval to avoid boundary gaps

async function run() {
  const since = subMinutes(new Date(), WINDOW_MIN)

  const [updated, deleted] = await Promise.all([
    db.query<{ id: string }>('SELECT id FROM products WHERE updated_at >= $1', [since]),
    db.query<{ id: string }>('SELECT id FROM products WHERE deleted_at >= $1', [since]),
  ])

  const changes = [
    ...updated.rows.map(r => ({ id: r.id, action: 'upsert' as const })),
    ...deleted.rows.map(r => ({ id: r.id, action: 'delete' as const })),
  ]

  if (changes.length === 0) return

  await fetch(`${BASE_URL}/v1/ingest/products`, {
    method:  'POST',
    headers: { 'Authorization': `Bearer ${INGEST_KEY}`, 'Content-Type': 'application/json' },
    body: JSON.stringify({ mode: 'records', changes }),
  })

  console.log(`[semilayer] pushed ${changes.length} changes`)
}

run().catch(console.error)
💡

Use a window slightly wider than your cron interval (6 minutes for a 5-minute cron) to avoid missing records at the boundary. Duplicate upsert calls for unchanged rows are harmless — the hash dedup step skips them.

Hard deletes with no deleted_at column? Schedule a nightly Smart sync instead of a nightly Rebuild — it tombstone-detects missing rows without wiping the index first:

# crontab: 0 3 * * *
semilayer sync --lens products

Choosing the right approach

ScenarioRecommended
Just getting started, Postgres sourcesyncInterval: '15m' in config
Needs delete propagation, minimal setupsyncInterval + nightly semilayer sync
Postgres with pg_netExample 1 — SQL trigger → pg_net
Postgres without extensionsExample 2 — LISTEN/NOTIFY + listener
SupabaseExample 3 — built-in database webhook
MySQL / PlanetScale via PrismaExample 4 — Prisma $extends
DynamoDBExample 5 — DynamoDB Streams → Lambda
MongoDB / DocumentDBExample 6 — Change Streams
You own the write pathExample 7 — service-layer sync call
Event-driven / KafkaExample 8 — Kafka consumer
SQS / SNS / EventBridgeExample 9 — SQS consumer
Legacy DB or no change eventsExample 10 — cron poll + records

Troubleshooting

404 Not Found — Lens name in the URL doesn't match any lens in this environment. Check you're using the correct environment's ik_ key and that semilayer push has run.

429 Too Many Requests — Rate limit exceeded (SaaS only). Read Retry-After and wait. Use records mode and batch changes to reduce call volume.

Vectors not updating — Run semilayer status. If the lens is paused, calls queue but don't process until semilayer resume. If error, check Jobs in the Console.

Deletes not removing from search results — You're likely using incremental mode or syncInterval without a Smart sync. Neither can detect hard deletes. Switch to records mode with action: 'delete', or add a nightly semilayer sync to tombstone-detect them.

deduplicated responses — Normal during high-write periods. Your change is merged into an already-queued job and will still be processed.

changeTrackingColumn not found — Incremental reads all rows on every tick (no filtering). Add the column or switch to Smart sync.