Conversation
The only way to get a ClickHouse client now is through the factory. Refactored all existing code to use that and pass in an org. The runReplication and otlpExporter are the hot paths here which need special attention in reviews.
|
WalkthroughThis pull request implements an organization-scoped data store system that enables organizations to override which ClickHouse instances they connect to. The changes replace global ClickHouse client singletons with a factory-based approach that resolves per-organization ClickHouse connections at runtime. A new registry system reads organization data store configurations from the database and secrets storage, caching clients and event repositories by organization and type. The refactoring spans routes, presenters, services, and event repositories—consistently switching from static imports to factory calls—while adding new database tables ( Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 16
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs/route.tsx (1)
137-159:⚠️ Potential issue | 🟠 MajorDon't let ClickHouse resolution bypass the deferred error path.
getClickhouseForOrganization()is now awaited beforetypeddefer(). If org datastore lookup or client initialization fails, the loader throws beforeTypedAwaitcan render itserrorElement, and the page shell won't stream until that lookup finishes. Fold the factory lookup intolistPromiseso it shares the existing deferred failure handling.♻️ Suggested shape
- const logsClickhouse = await clickhouseFactory.getClickhouseForOrganization(project.organizationId, "logs"); - const presenter = new LogsListPresenter($replica, logsClickhouse); - - const listPromise = presenter - .call(project.organizationId, environment.id, { - userId, - projectId: project.id, - tasks: tasks.length > 0 ? tasks : undefined, - runId, - search, - levels, - period, - from, - to, - defaultPeriod: "1h", - retentionLimitDays - }) + const listPromise = clickhouseFactory + .getClickhouseForOrganization(project.organizationId, "logs") + .then((logsClickhouse) => + new LogsListPresenter($replica, logsClickhouse).call(project.organizationId, environment.id, { + userId, + projectId: project.id, + tasks: tasks.length > 0 ? tasks : undefined, + runId, + search, + levels, + period, + from, + to, + defaultPeriod: "1h", + retentionLimitDays, + }) + ) .catch((error) => { if (error instanceof ServiceValidationError) { return { error: error.message }; } throw error; });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/webapp/app/routes/_app.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.logs/route.tsx around lines 137 - 159, The ClickHouse client lookup is awaited up front which can throw before the deferred error handling; instead, defer the factory call so failures are handled by the same promise chain: build listPromise by calling clickhouseFactory.getClickhouseForOrganization(...) inside the async chain used to create the LogsListPresenter and call presenter.call (keep references to clickhouseFactory.getClickhouseForOrganization, LogsListPresenter, presenter.call, and listPromise) so any errors from the org datastore/client init are caught by the existing .catch that handles ServiceValidationError and rethrows other errors, rather than throwing before TypedAwait renders its errorElement.apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts (1)
13-17:⚠️ Potential issue | 🟠 MajorDon't silently drop the connection-tuning fields.
Lines 13-17 still require
keepAliveEnabled,keepAliveIdleSocketTtl, andmaxOpenConnections, but Lines 82-119 no longer pass them anywhere. This endpoint will accept those values and return success even though they no longer affect replication behavior. Either wire them into the new factory path or remove them from the request schema before merge.One fix if these settings are no longer supported
const CreateRunReplicationServiceParams = z.object({ name: z.string(), - keepAliveEnabled: z.boolean(), - keepAliveIdleSocketTtl: z.number(), - maxOpenConnections: z.number(), maxFlushConcurrency: z.number(), flushIntervalMs: z.number(), flushBatchSize: z.number(),Also applies to: 82-93, 95-119
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts` around lines 13 - 17, The request schema CreateRunReplicationServiceParams still requires keepAliveEnabled, keepAliveIdleSocketTtl, and maxOpenConnections but the handler that constructs the replication service (the new factory path used in the run replication creation flow) no longer consumes these fields, so they are effectively ignored; either (A) thread these three fields into the replication service factory/constructor/creation call (the function that builds the replication client/service used in the handler) so the values are applied, making sure to map the names exactly, or (B) remove these fields from CreateRunReplicationServiceParams (and any validation/usage sites) so the API no longer accepts unused settings; locate the schema CreateRunReplicationServiceParams and the replication service factory/constructor function referenced in the create run replication handler to implement one of these fixes.
🧹 Nitpick comments (4)
apps/webapp/test/runsReplicationService.part2.test.ts (1)
26-27: Cover org-to-ClickHouse routing with a multi-store case.All of these setups hand the service a factory backed by a single
ClickHouseinstance. That means the suite still passes ifRunsReplicationServicesends the wrong organization ID into the factory, which is the new behavior this change depends on. Add one case that maps two orgs to different clients and asserts each org's rows land in the correct backend.Also applies to: 46-47, 155-156, 270-271, 391-392, 525-526, 632-633, 801-802, 926-927, 1139-1140
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/webapp/test/runsReplicationService.part2.test.ts` around lines 26 - 27, The tests currently instantiate RunsReplicationService with a TestReplicationClickhouseFactory backed by a single ClickHouse instance, so they don't catch org-to-ClickHouse routing bugs; update the affected test cases (instances where new RunsReplicationService is constructed with new TestReplicationClickhouseFactory(clickhouse)) to create a multi-store scenario: build two distinct Test ClickHouse clients (e.g., clickhouseA and clickhouseB), construct a TestReplicationClickhouseFactory that maps orgA -> clickhouseA and orgB -> clickhouseB, pass that factory into the RunsReplicationService, then insert rows for both orgs and assert each org's rows appear in the corresponding backend client (use the same helper assertions already used elsewhere in the tests). Ensure you change all occurrences referenced (the other construction sites of TestReplicationClickhouseFactory) so the suite validates org routing across multiple stores.apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts (1)
94-130: Validate and typeconfigat this service boundary.These methods accept
config: anyand persist it directly. Any caller that skips route-level validation can store a payload that later makesloadFromDatabase()skip the datastore entirely. Parseconfighere with the schema forkind, and replaceanywith the inferred type.As per coding guidelines, "Use zod for validation in packages/core and apps/webapp."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts` around lines 94 - 130, addDataStore and updateDataStore accept config: any and persist it directly; validate and type the config at this service boundary using the zod schema for the DataStoreKind before storing secrets or writing to Prisma. Locate the methods addDataStore and updateDataStore and replace the loose any type with the inferred TypeScript type from the zod parser; run the appropriate zod.parse/partial-parse for the provided kind (e.g., schemaForKind(kind)) and throw or return a clear error on parse failure, then use the validated config when calling getSecretStore().setSecret(secretKey, ...) and when writing config to this._prisma.organizationDataStore.create/update; ensure loadFromDatabase behavior is preserved by storing only the validated shape (or a secured secretKey) and include error handling to surface validation failures.apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts (2)
336-389: Consider reducing duplication betweenclickhouseandclickhouse_v2cases.The two cases share ~95% identical configuration. Extracting common options would improve maintainability.
♻️ Proposed refactor
function buildEventRepository(store: string, clickhouse: ClickHouse): ClickhouseEventRepository { + const commonOptions = { + clickhouse, + batchSize: env.EVENTS_CLICKHOUSE_BATCH_SIZE, + flushInterval: env.EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS, + maximumTraceSummaryViewCount: env.EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT, + maximumTraceDetailedSummaryViewCount: env.EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT, + maximumLiveReloadingSetting: env.EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING, + insertStrategy: env.EVENTS_CLICKHOUSE_INSERT_STRATEGY, + waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1", + asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE, + asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS, + llmMetricsBatchSize: env.LLM_METRICS_BATCH_SIZE, + llmMetricsFlushInterval: env.LLM_METRICS_FLUSH_INTERVAL_MS, + llmMetricsMaxBatchSize: env.LLM_METRICS_MAX_BATCH_SIZE, + llmMetricsMaxConcurrency: env.LLM_METRICS_MAX_CONCURRENCY, + otlpMetricsBatchSize: env.METRICS_CLICKHOUSE_BATCH_SIZE, + otlpMetricsFlushInterval: env.METRICS_CLICKHOUSE_FLUSH_INTERVAL_MS, + otlpMetricsMaxConcurrency: env.METRICS_CLICKHOUSE_MAX_CONCURRENCY, + }; + switch (store) { case "clickhouse": { return new ClickhouseEventRepository({ - clickhouse, - batchSize: env.EVENTS_CLICKHOUSE_BATCH_SIZE, - // ... all the options + ...commonOptions, + startTimeMaxAgeMs: env.EVENTS_CLICKHOUSE_START_TIME_MAX_AGE_MS, version: "v1", }); } case "clickhouse_v2": { return new ClickhouseEventRepository({ - clickhouse: clickhouse, - batchSize: env.EVENTS_CLICKHOUSE_BATCH_SIZE, - // ... all the options + ...commonOptions, version: "v2", }); } default: { throw new Error(`Unknown ClickHouse event repository store: ${store}`); } } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts` around lines 336 - 389, The switch in buildEventRepository duplicates nearly identical ClickhouseEventRepository construction for "clickhouse" and "clickhouse_v2"; refactor by extracting the shared options into a single commonOptions object (populate fields like clickhouse, batchSize, flushInterval, maximumTraceSummaryViewCount, maximumTraceDetailedSummaryViewCount, maximumLiveReloadingSetting, insertStrategy, waitForAsyncInsert, asyncInsertMaxDataSize, asyncInsertBusyTimeoutMs, startTimeMaxAgeMs (only present for v1), llmMetrics*, otlpMetrics*, etc.), then call new ClickhouseEventRepository({...commonOptions, version: "v1"}) for "clickhouse" and new ClickhouseEventRepository({...commonOptions, version: "v2"}) for "clickhouse_v2"); ensure you preserve startTimeMaxAgeMs presence only where required and keep explicit clickhouse property name to match the constructor.
164-179: Org-scoped clients use uniform configuration regardless ofclientType.Unlike the default clients where logs and replication have specialized settings (e.g.,
clickhouseSettingsfor logs,RUN_REPLICATION_*env vars for replication), all org-scoped clients use the same genericCLICKHOUSE_*configuration. If org-specific logs or replication clients require different settings (likemax_memory_usagefor logs queries), this could cause performance or resource issues.Consider whether org-scoped clients need type-specific configurations, or document that org data stores intentionally use simplified settings.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts` around lines 164 - 179, buildOrgClickhouseClient currently creates org-scoped ClickHouse clients with the same generic CLICKHOUSE_* config regardless of ClientType; update it to apply type-specific settings (e.g., when clientType === "logs" include clickhouseSettings like max_memory_usage or when clientType === "replication" honor RUN_REPLICATION_* env vars) or add a clear comment/docstring stating that org clients intentionally use simplified settings. Locate buildOrgClickhouseClient and branch on the ClientType argument to merge the same specialized options used for default clients (name already contains clientType) or document the design decision so callers know org clients lack type-specific tuning.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/webapp/app/env.server.ts`:
- Around line 1304-1305: The env schema allows
ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS to be zero or negative, enabling
tight-loop reloads; update the zod schema for
ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS to enforce sensible bounds (e.g.,
.int().min(60000) to require at least 1 minute and optionally .max(86400000) to
cap at 24 hours) while keeping the existing default (5 * 60 * 1000); use
z.coerce.number().int().min(...).max(...).default(...) or a .refine validator if
you prefer custom logic so invalid env values fail validation.
In
`@apps/webapp/app/routes/_app.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.errors.$fingerprint/route.tsx:
- Around line 248-253: The presenter is being given the same ClickHouse "query"
client twice; call clickhouseFactory.getClickhouseForOrganization a second time
with the logs-scoped role (e.g., "logs") to obtain a logs client and pass that
as the presenter’s third argument so ErrorGroupPresenter($replica, queryClient,
logsClient) receives a query-scoped client and a logs-scoped client respectively
(identify the factory call clickhouseFactory.getClickhouseForOrganization and
the constructor ErrorGroupPresenter to locate where to change).
In
`@apps/webapp/app/routes/_app.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.models.$modelId/route.tsx:
- Around line 71-72: The ModelRegistryPresenter is being instantiated with only
ClickHouse but it needs the Prisma replica client for methods like
getModelDetail(); update the instantiation where you call new
ModelRegistryPresenter(clickhouse) to pass the Prisma replica (named $replica)
as the second argument, ensure you import $replica from ~/db.server if not
already imported, and verify presenter methods (e.g., getModelDetail,
getUserMetrics) now use the provided clients.
In
`@apps/webapp/app/routes/_app.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx:
- Around line 105-106: Move the call to
clickhouseFactory.getClickhouseForOrganization(...) so it executes inside the
loader's existing try block before constructing the TestTaskPresenter;
specifically ensure the clickhouse lookup and assignment to clickhouse happen
inside the try that wraps presenter.call(), then instantiate presenter = new
TestTaskPresenter($replica, clickhouse) after the successful lookup so any
errors from getClickhouseForOrganization are caught, logged, and handled by the
existing redirect/path in the catch.
In `@apps/webapp/app/routes/admin.data-stores.tsx`:
- Line 31: Update the import of tryCatch in the admin.data-stores route to use
the package subpath export: replace the current root import of tryCatch (import
{ tryCatch } from "@trigger.dev/core") with the subpath import from
"@trigger.dev/core/utils" so the symbol tryCatch is imported via the utils
subpath.
- Around line 88-137: ClickhouseConnectionSchema.parse() is used directly in the
"add" and "update" action handlers and will throw on invalid ClickHouse URLs
before the surrounding tryCatch runs; replace both usages with
ClickhouseConnectionSchema.safeParse(connectionObject) and check the returned
.success flag, returning typedjson({ error: validation.error.message }, {
status: 400 }) (or similar) when validation fails, then pass the validated value
to organizationDataStoresRegistry.addDataStore / updateDataStore; locate the
parse call sites in the "add" branch (where ClickhouseConnectionSchema.parse is
called before calling organizationDataStoresRegistry.addDataStore) and the
"update" branch (conditional parse before
organizationDataStoresRegistry.updateDataStore) and apply the safeParse pattern
and explicit error handling there.
- Line 1: The component currently calls onOpenChange(false) during render when
an operation reports success (the direct call to onOpenChange in the
render/JSX); move this side-effect into a useEffect so it runs after render
instead: remove the onOpenChange(false) call from the render/JSX, add a
useEffect that watches the isSuccess (or equivalent success flag) and calls
onOpenChange(false) when it becomes true, and include onOpenChange and the
success flag in the effect dependency array to avoid stale closures. Ensure you
reference the same success state variable and the onOpenChange prop used in the
component (replace the inline call with the new effect).
In `@apps/webapp/app/routes/otel.v1.metrics.ts`:
- Line 10: The code eagerly awaits otlpExporter (const exporter = await
otlpExporter) before validating the request content-type, causing unnecessary
exporter startup and possible init failures for requests that should return 400;
move the await of otlpExporter so it only runs after the content-type check (the
branch that returns the unsupported-content-type response) so malformed/probe
traffic never triggers exporter initialization — locate the otlpExporter await
in the handler in otel.v1.metrics.ts and only resolve otlpExporter after the
content-type validation code path.
In `@apps/webapp/app/runEngine/services/triggerFailedTask.server.ts`:
- Around line 70-74: The current write selects repository/store via
getEventRepository but only records taskEventStore in the run model, which will
break reads if an org is re-pointed; update the write path in
triggerFailedTask.server.ts so that when calling getEventRepository you also
persist the concrete OrganizationDataStore identifier (e.g., the returned
repository/store metadata or an OrganizationDataStore enum/value) into the run
model alongside taskEventStore (or alternatively mark the org's datastore
assignment immutable until a migration flag is set); specifically, modify the
code that assigns repository/store and the run creation/update logic to save the
concrete datastore binding (referencing getEventRepository, repository, store,
taskEventStore, and OrganizationDataStore) so future reads resolve the original
backend used for those writes.
In `@apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts`:
- Around line 313-334: getEventsClickhouseClient currently instantiates a new
ClickHouse on each call; change it to use the existing singleton pattern: create
a singleton holder (e.g., const defaultEventsClickhouseClient = singleton(() =>
new ClickHouse({...same options...}))) and have getEventsClickhouseClient return
defaultEventsClickhouseClient() (or inline return singleton(() => new
ClickHouse(...))()). Keep the same URL parsing and options
(url.searchParams.delete("secure"), keepAlive, compression, etc.) but ensure
only one ClickHouse instance is created and reused across calls.
- Around line 362-384: The clickhouse_v2 case building the
ClickhouseEventRepository is missing the startTimeMaxAgeMs config that the
clickhouse case passes; update the clickhouse_v2 branch where
ClickhouseEventRepository is constructed to include startTimeMaxAgeMs:
env.EVENTS_CLICKHOUSE_START_TIME_MAX_AGE_MS so the same start-time
clamping/validation used by the v1 config is applied to v2 as well.
In
`@apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts`:
- Around line 72-75: The loop that builds the lookup map silently overwrites
keys produced from `${orgId}:${row.kind}`, causing nondeterministic behavior
when an org is assigned to multiple stores; before calling lookup.set(key,
parsed) check lookup.has(key) and if present throw a clear error (or assert)
that includes the conflicting orgId, kind, and identifiers for both the existing
entry and the current row (e.g., existingParsed/id and parsed/id) so overlapping
assignments discovered during findMany() fail fast and surface which rows/IDs
conflict.
- Around line 94-157: The addDataStore, updateDataStore and deleteDataStore
methods only write to the DB but don't update the in-memory registry map
(_lookup), leaving routing stale; after each successful mutation (after the
secret write and after the prisma create/update/delete) call the registry's
method that repopulates the in-memory map used by get() (the same function the
background loader uses — e.g. this.reload() / this._reloadLookup() /
this._refreshLookup(), whichever exists) so _lookup is refreshed immediately and
subsequent get() calls see the new data.
In `@apps/webapp/app/services/queryService.server.ts`:
- Around line 278-279: The concurrency limiter is currently keyed by projectId
but the ClickHouse client and execution are organization-scoped (see
clickhouseFactory.getClickhouseForOrganization and executeTSQL), so update the
limiter acquire/release calls (where limiter.acquire and limiter.release are
used around query execution) to use organizationId instead of projectId so
concurrent queries are limited per organization rather than per project; ensure
any helper that constructs limiter keys or caches limiter instances uses
organizationId consistently in the same places referenced around the query
execution.
In `@apps/webapp/app/services/runsReplicationService.server.ts`:
- Around line 655-685: The current loop serializes inserts per ClickHouse group
because each await blocks the next; change it to run per-group flushes
concurrently with a bounded concurrency limiter (e.g., p-limit or an internal
semaphore) so slow/unhealthy ClickHouse targets don't stall unrelated orgs. For
each group (the items handled around sortTaskRunInserts, sortPayloadInserts,
combinedTaskRunInserts/combinedPayloadInserts and using this.#insertWithRetry
which calls this.#insertTaskRunInserts and this.#insertPayloadInserts with
flushId), create an async task that performs the two insertWithRetry calls,
records the first taskRunError/payloadError seen, and updates
_taskRunsInsertedCounter/_payloadsInsertedCounter when each insert succeeds;
schedule those tasks through the limiter and await Promise.all on the scheduled
tasks; ensure you keep pushing group.taskRunInserts and group.payloadInserts
into combined* before scheduling so semantics remain the same.
In `@apps/webapp/app/v3/otlpExporter.server.ts`:
- Around line 1207-1215: The initializer initializeOTLPExporter is reading
settings directly from process.env; change it to use the typed env export
instead: replace process.env.OTLP_EXPORTER_VERBOSE with
env.OTLP_EXPORTER_VERBOSE (coerce to "1" check as before) and replace
process.env.SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT with
env.SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT and parseInt that value,
falling back to 8192; ensure the existing env import is used (or add it if
missing) and keep clickhouseFactory and OTLPExporter usage unchanged.
---
Outside diff comments:
In
`@apps/webapp/app/routes/_app.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.logs/route.tsx:
- Around line 137-159: The ClickHouse client lookup is awaited up front which
can throw before the deferred error handling; instead, defer the factory call so
failures are handled by the same promise chain: build listPromise by calling
clickhouseFactory.getClickhouseForOrganization(...) inside the async chain used
to create the LogsListPresenter and call presenter.call (keep references to
clickhouseFactory.getClickhouseForOrganization, LogsListPresenter,
presenter.call, and listPromise) so any errors from the org datastore/client
init are caught by the existing .catch that handles ServiceValidationError and
rethrows other errors, rather than throwing before TypedAwait renders its
errorElement.
In `@apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts`:
- Around line 13-17: The request schema CreateRunReplicationServiceParams still
requires keepAliveEnabled, keepAliveIdleSocketTtl, and maxOpenConnections but
the handler that constructs the replication service (the new factory path used
in the run replication creation flow) no longer consumes these fields, so they
are effectively ignored; either (A) thread these three fields into the
replication service factory/constructor/creation call (the function that builds
the replication client/service used in the handler) so the values are applied,
making sure to map the names exactly, or (B) remove these fields from
CreateRunReplicationServiceParams (and any validation/usage sites) so the API no
longer accepts unused settings; locate the schema
CreateRunReplicationServiceParams and the replication service
factory/constructor function referenced in the create run replication handler to
implement one of these fixes.
---
Nitpick comments:
In `@apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts`:
- Around line 336-389: The switch in buildEventRepository duplicates nearly
identical ClickhouseEventRepository construction for "clickhouse" and
"clickhouse_v2"; refactor by extracting the shared options into a single
commonOptions object (populate fields like clickhouse, batchSize, flushInterval,
maximumTraceSummaryViewCount, maximumTraceDetailedSummaryViewCount,
maximumLiveReloadingSetting, insertStrategy, waitForAsyncInsert,
asyncInsertMaxDataSize, asyncInsertBusyTimeoutMs, startTimeMaxAgeMs (only
present for v1), llmMetrics*, otlpMetrics*, etc.), then call new
ClickhouseEventRepository({...commonOptions, version: "v1"}) for "clickhouse"
and new ClickhouseEventRepository({...commonOptions, version: "v2"}) for
"clickhouse_v2"); ensure you preserve startTimeMaxAgeMs presence only where
required and keep explicit clickhouse property name to match the constructor.
- Around line 164-179: buildOrgClickhouseClient currently creates org-scoped
ClickHouse clients with the same generic CLICKHOUSE_* config regardless of
ClientType; update it to apply type-specific settings (e.g., when clientType ===
"logs" include clickhouseSettings like max_memory_usage or when clientType ===
"replication" honor RUN_REPLICATION_* env vars) or add a clear comment/docstring
stating that org clients intentionally use simplified settings. Locate
buildOrgClickhouseClient and branch on the ClientType argument to merge the same
specialized options used for default clients (name already contains clientType)
or document the design decision so callers know org clients lack type-specific
tuning.
In
`@apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts`:
- Around line 94-130: addDataStore and updateDataStore accept config: any and
persist it directly; validate and type the config at this service boundary using
the zod schema for the DataStoreKind before storing secrets or writing to
Prisma. Locate the methods addDataStore and updateDataStore and replace the
loose any type with the inferred TypeScript type from the zod parser; run the
appropriate zod.parse/partial-parse for the provided kind (e.g.,
schemaForKind(kind)) and throw or return a clear error on parse failure, then
use the validated config when calling getSecretStore().setSecret(secretKey, ...)
and when writing config to this._prisma.organizationDataStore.create/update;
ensure loadFromDatabase behavior is preserved by storing only the validated
shape (or a secured secretKey) and include error handling to surface validation
failures.
In `@apps/webapp/test/runsReplicationService.part2.test.ts`:
- Around line 26-27: The tests currently instantiate RunsReplicationService with
a TestReplicationClickhouseFactory backed by a single ClickHouse instance, so
they don't catch org-to-ClickHouse routing bugs; update the affected test cases
(instances where new RunsReplicationService is constructed with new
TestReplicationClickhouseFactory(clickhouse)) to create a multi-store scenario:
build two distinct Test ClickHouse clients (e.g., clickhouseA and clickhouseB),
construct a TestReplicationClickhouseFactory that maps orgA -> clickhouseA and
orgB -> clickhouseB, pass that factory into the RunsReplicationService, then
insert rows for both orgs and assert each org's rows appear in the corresponding
backend client (use the same helper assertions already used elsewhere in the
tests). Ensure you change all occurrences referenced (the other construction
sites of TestReplicationClickhouseFactory) so the suite validates org routing
across multiple stores.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 3a31161c-3ac9-4af9-bcd2-f229a19f7a61
📒 Files selected for processing (69)
.cursor/mcp.json.server-changes/organization-scoped-clickhouse.mdCLAUDE.mdapps/webapp/app/env.server.tsapps/webapp/app/presenters/v3/ApiRunListPresenter.server.tsapps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.tsapps/webapp/app/presenters/v3/RunPresenter.server.tsapps/webapp/app/presenters/v3/RunTagListPresenter.server.tsapps/webapp/app/presenters/v3/SpanPresenter.server.tsapps/webapp/app/presenters/v3/TaskListPresenter.server.tsapps/webapp/app/presenters/v3/UsagePresenter.server.tsapps/webapp/app/presenters/v3/ViewSchedulePresenter.server.tsapps/webapp/app/presenters/v3/WaitpointPresenter.server.tsapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.$dashboardKey/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors.$fingerprint/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors._index/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models.$modelId/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models._index/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.models.compare/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts.$promptSlug/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts._index/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs._index/route.tsxapps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsxapps/webapp/app/routes/admin.api.v1.runs-replication.create.tsapps/webapp/app/routes/admin.api.v1.runs-replication.start.tsapps/webapp/app/routes/admin.data-stores.tsxapps/webapp/app/routes/admin.tsxapps/webapp/app/routes/api.v1.prompts.$slug.tsapps/webapp/app/routes/api.v1.prompts.$slug.versions.tsapps/webapp/app/routes/api.v1.prompts._index.tsapps/webapp/app/routes/otel.v1.logs.tsapps/webapp/app/routes/otel.v1.metrics.tsapps/webapp/app/routes/otel.v1.traces.tsapps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs.$logId.tsxapps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.logs.tsapps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.prompts.$promptSlug.generations.tsapps/webapp/app/runEngine/concerns/traceEvents.server.tsapps/webapp/app/runEngine/services/triggerFailedTask.server.tsapps/webapp/app/services/admin/missingLlmModels.server.tsapps/webapp/app/services/clickhouse/clickhouseFactory.server.tsapps/webapp/app/services/clickhouse/clickhouseSecretSchemas.server.tsapps/webapp/app/services/clickhouseInstance.server.tsapps/webapp/app/services/dataStores/organizationDataStoreConfigSchemas.server.tsapps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.tsapps/webapp/app/services/dataStores/organizationDataStoresRegistryInstance.server.tsapps/webapp/app/services/queryService.server.tsapps/webapp/app/services/runsReplicationInstance.server.tsapps/webapp/app/services/runsReplicationService.server.tsapps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.tsapps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.tsapps/webapp/app/v3/eventRepository/eventRepository.server.tsapps/webapp/app/v3/eventRepository/eventRepository.types.tsapps/webapp/app/v3/eventRepository/index.server.tsapps/webapp/app/v3/otlpExporter.server.tsapps/webapp/app/v3/services/alerts/errorAlertEvaluator.server.tsapps/webapp/app/v3/services/bulk/BulkActionV2.server.tsapps/webapp/test/clickhouseFactory.test.tsapps/webapp/test/organizationDataStoresRegistry.test.tsapps/webapp/test/runsBackfiller.test.tsapps/webapp/test/runsReplicationBenchmark.test.tsapps/webapp/test/runsReplicationService.part1.test.tsapps/webapp/test/runsReplicationService.part2.test.tsapps/webapp/test/utils/replicationUtils.tsapps/webapp/test/utils/testReplicationClickhouseFactory.tsapps/webapp/test/utils/tracing.tsinternal-packages/database/prisma/migrations/20260331212308_add_organization_data_stores/migration.sqlinternal-packages/database/prisma/schema.prisma
💤 Files with no reviewable changes (2)
- apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts
- apps/webapp/app/services/clickhouseInstance.server.ts
| // Organization data stores registry | ||
| ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5 * 60 * 1000), // 5 minutes |
There was a problem hiding this comment.
Validate reload interval bounds to prevent tight-loop reloads.
Line 1305 currently allows 0 or negative values. A bad env value can make the registry reload loop run too frequently and degrade service stability.
Suggested fix
- ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5 * 60 * 1000), // 5 minutes
+ ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS: z.coerce
+ .number()
+ .int()
+ .min(1_000)
+ .default(5 * 60 * 1000), // 5 minutes📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Organization data stores registry | |
| ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5 * 60 * 1000), // 5 minutes | |
| // Organization data stores registry | |
| ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS: z.coerce | |
| .number() | |
| .int() | |
| .min(1_000) | |
| .default(5 * 60 * 1000), // 5 minutes |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/webapp/app/env.server.ts` around lines 1304 - 1305, The env schema
allows ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS to be zero or negative,
enabling tight-loop reloads; update the zod schema for
ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS to enforce sensible bounds (e.g.,
.int().min(60000) to require at least 1 minute and optionally .max(86400000) to
cap at 24 hours) while keeping the existing default (5 * 60 * 1000); use
z.coerce.number().int().min(...).max(...).default(...) or a .refine validator if
you prefer custom logic so invalid env values fail validation.
| const clickhouseClient = await clickhouseFactory.getClickhouseForOrganization( | ||
| environment.organizationId, | ||
| "query" | ||
| ); | ||
|
|
||
| const presenter = new ErrorGroupPresenter($replica, clickhouseClient, clickhouseClient); |
There was a problem hiding this comment.
Pass a logs-scoped client into the presenter's second ClickHouse slot.
Line 253 currently injects the "query" client twice. That keeps the error-side queries working, but any org that routes logs to a separate datastore will still read logs from the wrong backend on this page.
Suggested fix
- const clickhouseClient = await clickhouseFactory.getClickhouseForOrganization(
- environment.organizationId,
- "query"
- );
-
- const presenter = new ErrorGroupPresenter($replica, clickhouseClient, clickhouseClient);
+ const [queryClickhouseClient, logsClickhouseClient] = await Promise.all([
+ clickhouseFactory.getClickhouseForOrganization(environment.organizationId, "query"),
+ clickhouseFactory.getClickhouseForOrganization(environment.organizationId, "logs"),
+ ]);
+
+ const presenter = new ErrorGroupPresenter(
+ $replica,
+ queryClickhouseClient,
+ logsClickhouseClient
+ );📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const clickhouseClient = await clickhouseFactory.getClickhouseForOrganization( | |
| environment.organizationId, | |
| "query" | |
| ); | |
| const presenter = new ErrorGroupPresenter($replica, clickhouseClient, clickhouseClient); | |
| const [queryClickhouseClient, logsClickhouseClient] = await Promise.all([ | |
| clickhouseFactory.getClickhouseForOrganization(environment.organizationId, "query"), | |
| clickhouseFactory.getClickhouseForOrganization(environment.organizationId, "logs"), | |
| ]); | |
| const presenter = new ErrorGroupPresenter( | |
| $replica, | |
| queryClickhouseClient, | |
| logsClickhouseClient | |
| ); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/webapp/app/routes/_app.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.errors.$fingerprint/route.tsx
around lines 248 - 253, The presenter is being given the same ClickHouse "query"
client twice; call clickhouseFactory.getClickhouseForOrganization a second time
with the logs-scoped role (e.g., "logs") to obtain a logs client and pass that
as the presenter’s third argument so ErrorGroupPresenter($replica, queryClient,
logsClient) receives a query-scoped client and a logs-scoped client respectively
(identify the factory call clickhouseFactory.getClickhouseForOrganization and
the constructor ErrorGroupPresenter to locate where to change).
| const clickhouse = await clickhouseFactory.getClickhouseForOrganization(project.organizationId, "standard"); | ||
| const presenter = new ModelRegistryPresenter(clickhouse); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
presenter=$(fd 'ModelRegistryPresenter.server.ts' | head -n1)
factory=$(fd 'clickhouseFactory.server.ts' | head -n1)
echo "== ModelRegistryPresenter queries =="
rg -n -C4 'getModelDetail|getUserMetrics|getModelComparison|llm_models|llm_metrics' "$presenter"
echo
echo "== ClickHouse factory resolution =="
rg -n -C4 'getClickhouseForOrganization|OrganizationDataStore|standard' "$factory"Repository: triggerdotdev/trigger.dev
Length of output: 4487
🏁 Script executed:
fd 'ModelRegistryPresenter.server.ts' -x wc -l {}Repository: triggerdotdev/trigger.dev
Length of output: 137
🏁 Script executed:
fd 'ModelRegistryPresenter.server.ts' -x head -n 50 {}Repository: triggerdotdev/trigger.dev
Length of output: 1982
🏁 Script executed:
fd 'route.tsx' -path '*projects*$modelId*' -x cat -n {}Repository: triggerdotdev/trigger.dev
Length of output: 237
🏁 Script executed:
rg -n -A15 'constructor\(' apps/webapp/app/presenters/v3/ModelRegistryPresenter.server.ts | head -n 30Repository: triggerdotdev/trigger.dev
Length of output: 661
🏁 Script executed:
fd 'ModelRegistryPresenter' --type f -path '*$modelId*'Repository: triggerdotdev/trigger.dev
Length of output: 237
🏁 Script executed:
rg -l '\$modelId' apps/webapp/app/routes | head -n 5Repository: triggerdotdev/trigger.dev
Length of output: 51
🏁 Script executed:
rg -l 'ModelRegistryPresenter' apps/webapp/app/routes --type tsRepository: triggerdotdev/trigger.dev
Length of output: 407
🏁 Script executed:
rg -B5 -A10 'new ModelRegistryPresenter' apps/webapp/app/routesRepository: triggerdotdev/trigger.dev
Length of output: 7330
🏁 Script executed:
rg -B5 'this\._replica' apps/webapp/app/presenters/v3/ModelRegistryPresenter.server.ts | head -n 20Repository: triggerdotdev/trigger.dev
Length of output: 506
🏁 Script executed:
rg -B5 'class BasePresenter' apps/webapp/app/presenters/v3/basePresenter.server.ts | head -n 15Repository: triggerdotdev/trigger.dev
Length of output: 386
🏁 Script executed:
rg -A20 'class BasePresenter' apps/webapp/app/presenters/v3/basePresenter.server.tsRepository: triggerdotdev/trigger.dev
Length of output: 754
Pass the Prisma replica client when instantiating ModelRegistryPresenter.
The presenter requires both Prisma (for getModelDetail()) and ClickHouse (for getUserMetrics()). Currently instantiated with only the ClickHouse client, any call to getModelDetail() will crash when accessing this._replica.llmModel.findFirst(...).
Pass $replica as the second argument:
Diff
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(project.organizationId, "standard");
- const presenter = new ModelRegistryPresenter(clickhouse);
+ const presenter = new ModelRegistryPresenter(clickhouse, $replica);
const model = await presenter.getModelDetail(modelId);Import $replica from ~/db.server if not already present.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/webapp/app/routes/_app.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.models.$modelId/route.tsx
around lines 71 - 72, The ModelRegistryPresenter is being instantiated with only
ClickHouse but it needs the Prisma replica client for methods like
getModelDetail(); update the instantiation where you call new
ModelRegistryPresenter(clickhouse) to pass the Prisma replica (named $replica)
as the second argument, ensure you import $replica from ~/db.server if not
already imported, and verify presenter methods (e.g., getModelDetail,
getUserMetrics) now use the provided clients.
| const clickhouse = await clickhouseFactory.getClickhouseForOrganization(project.organizationId, "standard"); | ||
| const presenter = new TestTaskPresenter($replica, clickhouse); |
There was a problem hiding this comment.
Move the factory lookup back under the loader's try.
getClickhouseForOrganization() can now fail before presenter.call() runs. With it outside the try, those failures bypass the current logging + redirect path and surface as a 500 instead.
Proposed fix
- const clickhouse = await clickhouseFactory.getClickhouseForOrganization(project.organizationId, "standard");
- const presenter = new TestTaskPresenter($replica, clickhouse);
try {
+ const clickhouse = await clickhouseFactory.getClickhouseForOrganization(
+ project.organizationId,
+ "standard"
+ );
+ const presenter = new TestTaskPresenter($replica, clickhouse);
const [result, regionsResult] = await Promise.all([
presenter.call({
userId: user.id,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/webapp/app/routes/_app.orgs`.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx
around lines 105 - 106, Move the call to
clickhouseFactory.getClickhouseForOrganization(...) so it executes inside the
loader's existing try block before constructing the TestTaskPresenter;
specifically ensure the clickhouse lookup and assignment to clickhouse happen
inside the try that wraps presenter.call(), then instantiate presenter = new
TestTaskPresenter($replica, clickhouse) after the successful lookup so any
errors from getClickhouseForOrganization are caught, logged, and handled by the
existing redirect/path in the catch.
| @@ -0,0 +1,400 @@ | |||
| import { useState } from "react"; | |||
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cd apps/webapp && find . -name "admin.data-stores.tsx" -type fRepository: triggerdotdev/trigger.dev
Length of output: 103
🏁 Script executed:
cat -n apps/webapp/app/routes/admin.data-stores.tsxRepository: triggerdotdev/trigger.dev
Length of output: 16350
Move the close-on-success update out of render.
Calling onOpenChange(false) during render (lines 307-310) triggers a render-phase state update on the parent. React will warn about this, and it gets noisy in Strict Mode.
🔧 Suggested fix
-import { useState } from "react";
+import { useEffect, useState } from "react";
...
- // Close dialog on success
- if (fetcher.data?.success && open) {
- onOpenChange(false);
- }
+ // Close dialog on success
+ useEffect(() => {
+ if (fetcher.data?.success && open) {
+ onOpenChange(false);
+ }
+ }, [fetcher.data?.success, open, onOpenChange]);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import { useState } from "react"; | |
| import { useEffect, useState } from "react"; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/webapp/app/routes/admin.data-stores.tsx` at line 1, The component
currently calls onOpenChange(false) during render when an operation reports
success (the direct call to onOpenChange in the render/JSX); move this
side-effect into a useEffect so it runs after render instead: remove the
onOpenChange(false) call from the render/JSX, add a useEffect that watches the
isSuccess (or equivalent success flag) and calls onOpenChange(false) when it
becomes true, and include onOpenChange and the success flag in the effect
dependency array to avoid stale closures. Ensure you reference the same success
state variable and the onOpenChange prop used in the component (replace the
inline call with the new effect).
| for (const orgId of row.organizationIds) { | ||
| const key = `${orgId}:${row.kind}`; | ||
| lookup.set(key, parsed); | ||
| } |
There was a problem hiding this comment.
Fail fast on overlapping org assignments.
lookup.set() silently overwrites any earlier ${orgId}:${row.kind} entry. Because findMany() is unordered here, an org that appears in two stores can end up with a nondeterministic effective route depending on row order.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts`
around lines 72 - 75, The loop that builds the lookup map silently overwrites
keys produced from `${orgId}:${row.kind}`, causing nondeterministic behavior
when an org is assigned to multiple stores; before calling lookup.set(key,
parsed) check lookup.has(key) and if present throw a clear error (or assert)
that includes the conflicting orgId, kind, and identifiers for both the existing
entry and the current row (e.g., existingParsed/id and parsed/id) so overlapping
assignments discovered during findMany() fail fast and surface which rows/IDs
conflict.
| async addDataStore({ | ||
| key, | ||
| kind, | ||
| organizationIds, | ||
| config, | ||
| }: { | ||
| key: string; | ||
| kind: DataStoreKind; | ||
| organizationIds: string[]; | ||
| config: any; | ||
| }) { | ||
| const secretKey = this.#secretKey(key, kind); | ||
|
|
||
| const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma }); | ||
| await secretStore.setSecret(secretKey, config); | ||
|
|
||
| return this._prisma.organizationDataStore.create({ | ||
| data: { | ||
| key, | ||
| organizationIds, | ||
| kind: "CLICKHOUSE", | ||
| config: { version: 1, data: { secretKey } }, | ||
| }, | ||
| }); | ||
| } | ||
|
|
||
| async updateDataStore({ | ||
| key, | ||
| kind, | ||
| organizationIds, | ||
| config, | ||
| }: { | ||
| key: string; | ||
| kind: DataStoreKind; | ||
| organizationIds: string[]; | ||
| config?: any; | ||
| }) { | ||
| const secretKey = this.#secretKey(key, kind); | ||
|
|
||
| if (config) { | ||
| const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma }); | ||
| await secretStore.setSecret(secretKey, config); | ||
| } | ||
|
|
||
| return this._prisma.organizationDataStore.update({ | ||
| where: { | ||
| key, | ||
| }, | ||
| data: { | ||
| organizationIds, | ||
| kind: "CLICKHOUSE", | ||
| }, | ||
| }); | ||
| } | ||
|
|
||
| async deleteDataStore({ key, kind }: { key: string; kind: DataStoreKind }) { | ||
| const secretKey = this.#secretKey(key, kind); | ||
| const secretStore = getSecretStore("DATABASE", { prismaClient: this._prisma }); | ||
| await secretStore.deleteSecret(secretKey).catch(() => { | ||
| // Secret may not exist — proceed with deletion | ||
| }); | ||
|
|
||
| await this._prisma.organizationDataStore.delete({ where: { key } }); | ||
| } |
There was a problem hiding this comment.
Refresh _lookup after mutations.
get() only reads the in-memory map, but these mutation methods only write to the database. On the singleton registry that means routing stays stale until the next background reload, so OTLP/replication can keep targeting the old ClickHouse instance after an admin change.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/webapp/app/services/dataStores/organizationDataStoresRegistry.server.ts`
around lines 94 - 157, The addDataStore, updateDataStore and deleteDataStore
methods only write to the DB but don't update the in-memory registry map
(_lookup), leaving routing stale; after each successful mutation (after the
secret write and after the prisma create/update/delete) call the registry's
method that repopulates the in-memory map used by get() (the same function the
background loader uses — e.g. this.reload() / this._reloadLookup() /
this._refreshLookup(), whichever exists) so _lookup is refreshed immediately and
subsequent get() calls see the new data.
| const queryClickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "query"); | ||
| const result = await executeTSQL(queryClickhouse.reader, { |
There was a problem hiding this comment.
Key the concurrency limiter by organizationId now that the backend is org-scoped.
Line 278 routes execution through an organization-scoped ClickHouse client, but the limiter in Lines 155-160 and 376-379 still acquires/releases by projectId. One organization with multiple projects can therefore exceed the advertised org limit and push more concurrent queries onto the same backend than intended.
⚠️ Proposed fix
const acquireResult = await queryConcurrencyLimiter.acquire({
- key: projectId,
+ key: organizationId,
requestId,
keyLimit: orgLimit,
globalLimit: GLOBAL_CONCURRENCY_LIMIT,
});
if (!acquireResult.success) {
const errorMessage =
acquireResult.reason === "key_limit"
- ? `You've exceeded your query concurrency of ${orgLimit} for this project. Please try again later.`
+ ? `You've exceeded your query concurrency of ${orgLimit} for this organization. Please try again later.`
: "We're experiencing a lot of queries at the moment. Please try again later.";
return { success: false, error: new QueryError(errorMessage, { query: options.query }) };
}
@@
} finally {
// Always release the concurrency slot
await queryConcurrencyLimiter.release({
- key: projectId,
+ key: organizationId,
requestId,
});
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/webapp/app/services/queryService.server.ts` around lines 278 - 279, The
concurrency limiter is currently keyed by projectId but the ClickHouse client
and execution are organization-scoped (see
clickhouseFactory.getClickhouseForOrganization and executeTSQL), so update the
limiter acquire/release calls (where limiter.acquire and limiter.release are
used around query execution) to use organizationId instead of projectId so
concurrent queries are limited per organization rather than per project; ensure
any helper that constructs limiter keys or caches limiter instances uses
organizationId consistently in the same places referenced around the query
execution.
| for (const [clickhouse, group] of groups) { | ||
| sortTaskRunInserts(group.taskRunInserts); | ||
| sortPayloadInserts(group.payloadInserts); | ||
| combinedTaskRunInserts.push(...group.taskRunInserts); | ||
| combinedPayloadInserts.push(...group.payloadInserts); | ||
|
|
||
| const [trErr] = await this.#insertWithRetry( | ||
| (attempt) => this.#insertTaskRunInserts(clickhouse, group.taskRunInserts, attempt), | ||
| "task run inserts", | ||
| flushId | ||
| ); | ||
| if (trErr && !taskRunError) { | ||
| taskRunError = trErr; | ||
| } | ||
|
|
||
| const [plErr] = await this.#insertWithRetry( | ||
| (attempt) => this.#insertPayloadInserts(clickhouse, group.payloadInserts, attempt), | ||
| "payload inserts", | ||
| flushId | ||
| ); | ||
| if (plErr && !payloadError) { | ||
| payloadError = plErr; | ||
| } | ||
|
|
||
| if (!trErr) { | ||
| this._taskRunsInsertedCounter.add(group.taskRunInserts.length); | ||
| } | ||
| if (!plErr) { | ||
| this._payloadsInsertedCounter.add(group.payloadInserts.length); | ||
| } | ||
| } |
There was a problem hiding this comment.
Avoid serializing the per-ClickHouse-group flushes.
Each await here makes every org group wait behind the slowest ClickHouse target. If one org-specific service is unhealthy and spends a few seconds in retry/backoff, unrelated orgs in the same replication batch inherit that lag too. On this hot path, the groups should be flushed behind a bounded limiter instead of one-by-one.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/webapp/app/services/runsReplicationService.server.ts` around lines 655 -
685, The current loop serializes inserts per ClickHouse group because each await
blocks the next; change it to run per-group flushes concurrently with a bounded
concurrency limiter (e.g., p-limit or an internal semaphore) so slow/unhealthy
ClickHouse targets don't stall unrelated orgs. For each group (the items handled
around sortTaskRunInserts, sortPayloadInserts,
combinedTaskRunInserts/combinedPayloadInserts and using this.#insertWithRetry
which calls this.#insertTaskRunInserts and this.#insertPayloadInserts with
flushId), create an async task that performs the two insertWithRetry calls,
records the first taskRunError/payloadError seen, and updates
_taskRunsInsertedCounter/_payloadsInsertedCounter when each insert succeeds;
schedule those tasks through the limiter and await Promise.all on the scheduled
tasks; ensure you keep pushing group.taskRunInserts and group.payloadInserts
into combined* before scheduling so semantics remain the same.
| async function initializeOTLPExporter() { | ||
| await clickhouseFactory.isReady(); | ||
| return new OTLPExporter({ | ||
| clickhouseFactory, | ||
| verbose: process.env.OTLP_EXPORTER_VERBOSE === "1", | ||
| spanAttributeValueLengthLimit: process.env.SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT | ||
| ? parseInt(process.env.SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT, 10) | ||
| : 8192 | ||
| ); | ||
| } No newline at end of file | ||
| : 8192, | ||
| }); |
There was a problem hiding this comment.
Use env for these new OTLP exporter settings.
This initializer now bypasses the webapp's typed env surface even though env is already imported in the file. Please route both settings through env instead of process.env.
As per coding guidelines, "Environment variables must be accessed via the env export from app/env.server.ts and never use process.env directly."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/webapp/app/v3/otlpExporter.server.ts` around lines 1207 - 1215, The
initializer initializeOTLPExporter is reading settings directly from
process.env; change it to use the typed env export instead: replace
process.env.OTLP_EXPORTER_VERBOSE with env.OTLP_EXPORTER_VERBOSE (coerce to "1"
check as before) and replace
process.env.SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT with
env.SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT and parseInt that value,
falling back to 8192; ensure the existing env import is used (or add it if
missing) and keep clickhouseFactory and OTLPExporter usage unchanged.
Added
OrganizationDataStorewhich allows orgs to have data stored in specific separate services.For now this is just used for ClickHouse. When using ClickHouse we get a client for the factory and pass in the org id.
Particular care has to be made with two hot-insert paths: