Skip to content

Commit

Permalink
Merge branch 'master' into dn-feat/link-colors
Browse files Browse the repository at this point in the history
  • Loading branch information
daibhin committed Nov 20, 2023
2 parents b3492ec + ff4bfee commit 6acc6ee
Show file tree
Hide file tree
Showing 37 changed files with 1,870 additions and 850 deletions.
9 changes: 9 additions & 0 deletions .run/Dev.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Dev" type="CompoundRunConfigurationType">
<toRun name="PostHog" type="Python.DjangoServer" />
<toRun name="Frontend" type="js.build_tools.npm" />
<toRun name="Plugin Server" type="js.build_tools.npm" />
<toRun name="Celery" type="PythonConfigurationType" />
<method v="2" />
</configuration>
</component>
129 changes: 0 additions & 129 deletions ee/clickhouse/test/test_client.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# name: ClickhouseTestExperimentSecondaryResults.test_basic_secondary_metric_results
'
/* user_id:126 celery:posthog.celery.sync_insight_caching_state */
/* user_id:132 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down
48 changes: 21 additions & 27 deletions ee/clickhouse/views/test/test_clickhouse_trends.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_includes_only_intervals_within_range(client: Client):
{
"action": ANY,
"breakdown_value": cohort["id"],
"label": "$pageview - test cohort",
"label": "test cohort",
"count": 3.0,
"data": [1.0, 1.0, 1.0],
# Prior to the fix this would also include '29-Aug-2021'
Expand Down Expand Up @@ -827,14 +827,12 @@ def test_insight_trends_cumulative(self):
],
)
data_response = get_trends_time_series_ok(self.client, request, self.team)
person_response = get_people_from_url_ok(
self.client, data_response["$pageview - val"]["2012-01-14"].person_url
)
person_response = get_people_from_url_ok(self.client, data_response["val"]["2012-01-14"].person_url)

assert data_response["$pageview - val"]["2012-01-13"].value == 1
assert data_response["$pageview - val"]["2012-01-13"].breakdown_value == "val"
assert data_response["$pageview - val"]["2012-01-14"].value == 3
assert data_response["$pageview - val"]["2012-01-14"].label == "14-Jan-2012"
assert data_response["val"]["2012-01-13"].value == 1
assert data_response["val"]["2012-01-13"].breakdown_value == "val"
assert data_response["val"]["2012-01-14"].value == 3
assert data_response["val"]["2012-01-14"].label == "14-Jan-2012"

assert sorted([p["id"] for p in person_response]) == sorted(
[str(created_people["p1"].uuid), str(created_people["p3"].uuid)]
Expand Down Expand Up @@ -862,12 +860,12 @@ def test_insight_trends_cumulative(self):
properties=[{"type": "person", "key": "key", "value": "some_val"}],
)
data_response = get_trends_time_series_ok(self.client, request, self.team)
people = get_people_from_url_ok(self.client, data_response["$pageview - val"]["2012-01-14"].person_url)
people = get_people_from_url_ok(self.client, data_response["val"]["2012-01-14"].person_url)

assert data_response["$pageview - val"]["2012-01-13"].value == 1
assert data_response["$pageview - val"]["2012-01-13"].breakdown_value == "val"
assert data_response["$pageview - val"]["2012-01-14"].value == 3
assert data_response["$pageview - val"]["2012-01-14"].label == "14-Jan-2012"
assert data_response["val"]["2012-01-13"].value == 1
assert data_response["val"]["2012-01-13"].breakdown_value == "val"
assert data_response["val"]["2012-01-14"].value == 3
assert data_response["val"]["2012-01-14"].label == "14-Jan-2012"

assert sorted([p["id"] for p in people]) == sorted(
[str(created_people["p1"].uuid), str(created_people["p3"].uuid)]
Expand All @@ -894,12 +892,12 @@ def test_insight_trends_cumulative(self):
],
)
data_response = get_trends_time_series_ok(self.client, request, self.team)
people = get_people_from_url_ok(self.client, data_response["$pageview - val"]["2012-01-14"].person_url)
people = get_people_from_url_ok(self.client, data_response["val"]["2012-01-14"].person_url)

assert data_response["$pageview - val"]["2012-01-13"].value == 1
assert data_response["$pageview - val"]["2012-01-13"].breakdown_value == "val"
assert data_response["$pageview - val"]["2012-01-14"].value == 2
assert data_response["$pageview - val"]["2012-01-14"].label == "14-Jan-2012"
assert data_response["val"]["2012-01-13"].value == 1
assert data_response["val"]["2012-01-13"].breakdown_value == "val"
assert data_response["val"]["2012-01-14"].value == 2
assert data_response["val"]["2012-01-14"].label == "14-Jan-2012"

assert sorted([p["id"] for p in people]) == sorted(
[str(created_people["p1"].uuid), str(created_people["p3"].uuid)]
Expand Down Expand Up @@ -933,12 +931,10 @@ def test_breakdown_with_filter(self):
properties=[{"key": "key", "value": "oh", "operator": "not_icontains"}],
)
data_response = get_trends_time_series_ok(self.client, params, self.team)
person_response = get_people_from_url_ok(
self.client, data_response["sign up - val"]["2012-01-13"].person_url
)
person_response = get_people_from_url_ok(self.client, data_response["val"]["2012-01-13"].person_url)

assert data_response["sign up - val"]["2012-01-13"].value == 1
assert data_response["sign up - val"]["2012-01-13"].breakdown_value == "val"
assert data_response["val"]["2012-01-13"].value == 1
assert data_response["val"]["2012-01-13"].breakdown_value == "val"

assert sorted([p["id"] for p in person_response]) == sorted([str(created_people["person1"].uuid)])

Expand All @@ -950,11 +946,9 @@ def test_breakdown_with_filter(self):
events=[{"id": "sign up", "name": "sign up", "type": "events", "order": 0}],
)
aggregate_response = get_trends_aggregate_ok(self.client, params, self.team)
aggregate_person_response = get_people_from_url_ok(
self.client, aggregate_response["sign up - val"].person_url
)
aggregate_person_response = get_people_from_url_ok(self.client, aggregate_response["val"].person_url)

assert aggregate_response["sign up - val"].value == 1
assert aggregate_response["val"].value == 1
assert sorted([p["id"] for p in aggregate_person_response]) == sorted([str(created_people["person1"].uuid)])

def test_insight_trends_compare(self):
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
17 changes: 14 additions & 3 deletions frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ import { EVENT_PROPERTY_DEFINITIONS_PER_PAGE } from 'scenes/data-management/prop
import { ActivityLogItem, ActivityScope } from 'lib/components/ActivityLog/humanizeActivity'
import { ActivityLogProps } from 'lib/components/ActivityLog/ActivityLog'
import { SavedSessionRecordingPlaylistsResult } from 'scenes/session-recordings/saved-playlists/savedSessionRecordingPlaylistsLogic'
import { QuerySchema } from '~/queries/schema'
import { QuerySchema, QueryStatus } from '~/queries/schema'
import { decompressSync, strFromU8 } from 'fflate'
import { getCurrentExporterData } from '~/exporter/exporterViewLogic'
import { encodeParams } from 'kea-router'
Expand Down Expand Up @@ -542,6 +542,10 @@ class ApiRequest {
return this.projectsDetail(teamId).addPathComponent('query')
}

public queryStatus(queryId: string, teamId?: TeamType['id']): ApiRequest {
return this.query(teamId).addPathComponent(queryId)
}

// Notebooks
public notebooks(teamId?: TeamType['id']): ApiRequest {
return this.projectsDetail(teamId).addPathComponent('notebooks')
Expand Down Expand Up @@ -1722,6 +1726,12 @@ const api = {
},
},

queryStatus: {
async get(queryId: string): Promise<QueryStatus> {
return await new ApiRequest().queryStatus(queryId).get()
},
},

queryURL: (): string => {
return new ApiRequest().query().assembleFullUrl(true)
},
Expand All @@ -1730,7 +1740,8 @@ const api = {
query: T,
options?: ApiMethodOptions,
queryId?: string,
refresh?: boolean
refresh?: boolean,
async?: boolean
): Promise<
T extends { [response: string]: any }
? T['response'] extends infer P | undefined
Expand All @@ -1740,7 +1751,7 @@ const api = {
> {
return await new ApiRequest()
.query()
.create({ ...options, data: { query, client_query_id: queryId, refresh: refresh } })
.create({ ...options, data: { query, client_query_id: queryId, refresh: refresh, async } })
},

/** Fetch data from specified URL. The result already is JSON-parsed. */
Expand Down
1 change: 1 addition & 0 deletions frontend/src/lib/constants.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ export const FEATURE_FLAGS = {
ROLE_BASED_ACCESS: 'role-based-access', // owner: #team-experiments, @liyiy
QUERY_RUNNING_TIME: 'query_running_time', // owner: @mariusandra
QUERY_TIMINGS: 'query-timings', // owner: @mariusandra
QUERY_ASYNC: 'query-async', // owner: @webjunkie
POSTHOG_3000: 'posthog-3000', // owner: @Twixes
POSTHOG_3000_NAV: 'posthog-3000-nav', // owner: @Twixes
ENABLE_PROMPTS: 'enable-prompts', // owner: @lharries
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/queries/nodes/DataNode/dataNodeLogic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ export const dataNodeLogic = kea<dataNodeLogicType>([
abortQuery: async ({ queryId }) => {
try {
const { currentTeamId } = values
await api.create(`api/projects/${currentTeamId}/insights/cancel`, { client_query_id: queryId })
await api.delete(`api/projects/${currentTeamId}/query/${queryId}/`)
} catch (e) {
console.warn('Failed cancelling query', e)
}
Expand Down
44 changes: 42 additions & 2 deletions frontend/src/queries/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ import {
isStickinessFilter,
isTrendsFilter,
} from 'scenes/insights/sharedUtils'
import { flattenObject, toParams } from 'lib/utils'
import { flattenObject, delay, toParams } from 'lib/utils'
import { queryNodeToFilter } from './nodes/InsightQuery/utils/queryNodeToFilter'
import { now } from 'lib/dayjs'
import { currentSessionId } from 'lib/internalMetrics'
import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
import { FEATURE_FLAGS } from 'lib/constants'

const QUERY_ASYNC_MAX_INTERVAL_SECONDS = 10
const QUERY_ASYNC_TOTAL_POLL_SECONDS = 300

//get export context for a given query
export function queryExportContext<N extends DataNode = DataNode>(
query: N,
Expand Down Expand Up @@ -91,6 +94,43 @@ export function queryExportContext<N extends DataNode = DataNode>(
throw new Error(`Unsupported query: ${query.kind}`)
}

async function executeQuery<N extends DataNode = DataNode>(
queryNode: N,
methodOptions?: ApiMethodOptions,
refresh?: boolean,
queryId?: string
): Promise<NonNullable<N['response']>> {
const queryAsyncEnabled = Boolean(featureFlagLogic.findMounted()?.values.featureFlags?.[FEATURE_FLAGS.QUERY_ASYNC])
const excludedKinds = ['HogQLMetadata']
const queryAsync = queryAsyncEnabled && !excludedKinds.includes(queryNode.kind)
const response = await api.query(queryNode, methodOptions, queryId, refresh, queryAsync)

if (!queryAsync || !response.query_async) {
return response
}

const pollStart = performance.now()
let currentDelay = 300 // start low, because all queries will take at minimum this

while (performance.now() - pollStart < QUERY_ASYNC_TOTAL_POLL_SECONDS * 1000) {
await delay(currentDelay)
currentDelay = Math.min(currentDelay * 2, QUERY_ASYNC_MAX_INTERVAL_SECONDS * 1000)

if (methodOptions?.signal?.aborted) {
const customAbortError = new Error('Query aborted')
customAbortError.name = 'AbortError'
throw customAbortError
}

const statusResponse = await api.queryStatus.get(response.id)

if (statusResponse.complete || statusResponse.error) {
return statusResponse.results
}
}
throw new Error('Query timed out')
}

// Return data for a given query
export async function query<N extends DataNode = DataNode>(
queryNode: N,
Expand Down Expand Up @@ -216,7 +256,7 @@ export async function query<N extends DataNode = DataNode>(
response = await fetchLegacyInsights()
}
} else {
response = await api.query(queryNode, methodOptions, queryId, refresh)
response = await executeQuery(queryNode, methodOptions, refresh, queryId)
if (isHogQLQuery(queryNode) && response && typeof response === 'object') {
logParams.clickhouse_sql = (response as HogQLQueryResponse)?.clickhouse
}
Expand Down
Loading

0 comments on commit 6acc6ee

Please sign in to comment.