Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions .github/workflows/test-java-sdk.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
name: Java SDK Test Suite

on:
pull_request:
branches:
- main
paths:
- apps/java-sdk/**
- .github/workflows/test-java-sdk.yml
push:
branches:
- main
paths:
- apps/java-sdk/**
workflow_dispatch:

jobs:
build-and-test:
name: Build and Test
runs-on: blacksmith-4vcpu-ubuntu-2404
if: >-
github.event_name == 'workflow_dispatch' ||
github.event_name == 'pull_request' ||
(github.event_name == 'push' && github.ref == 'refs/heads/main')
steps:
- uses: actions/checkout@v4

- name: Set up JDK 11
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: "11"

- name: Cache Gradle packages
uses: actions/cache@v4
with:
path: |
~/.gradle/caches
~/.gradle/wrapper
key: ${{ runner.os }}-gradle-${{ hashFiles('apps/java-sdk/**/*.gradle.kts', 'apps/java-sdk/gradle/wrapper/gradle-wrapper.properties') }}
restore-keys: |
${{ runner.os }}-gradle-

- name: Grant execute permission for gradlew
working-directory: ./apps/java-sdk
run: chmod +x gradlew

- name: Build
working-directory: ./apps/java-sdk
run: ./gradlew build -x test

- name: Run unit tests
working-directory: ./apps/java-sdk
run: ./gradlew test

- name: Run E2E tests
if: env.FIRECRAWL_API_KEY != ''
working-directory: ./apps/java-sdk
env:
FIRECRAWL_API_KEY: ${{ secrets.FIRECRAWL_API_KEY }}
run: ./gradlew test

- name: Publish test report
if: always() && (github.event_name != 'pull_request' || github.event.pull_request.head.repo.fork == false)
uses: dorny/test-reporter@v1
with:
name: Java SDK Test Report
path: apps/java-sdk/build/test-results/test/*.xml
reporter: java-junit
fail-on-error: true
2 changes: 1 addition & 1 deletion apps/api/src/controllers/v2/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { billTeam } from "../../services/billing/credit_billing";
import { enqueueBrowserSessionActivity } from "../../lib/browser-session-activity";
import { logRequest } from "../../services/logging/log_job";

const BROWSER_CREDITS_PER_HOUR = 100;
const BROWSER_CREDITS_PER_HOUR = 120;

/**
* Calculate credits to bill for a browser session based on its duration.
Expand Down
154 changes: 81 additions & 73 deletions apps/api/src/lib/concurrency-limit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ export async function pushCrawlConcurrencyLimitActiveJob(
);
}

async function removeCrawlConcurrencyLimitActiveJob(
export async function removeCrawlConcurrencyLimitActiveJob(
crawl_id: string,
id: string,
) {
Expand All @@ -188,7 +188,7 @@ async function removeCrawlConcurrencyLimitActiveJob(
* @param teamId
* @returns A job that can be run, or null if there are no more jobs to run.
*/
async function getNextConcurrentJob(teamId: string): Promise<{
export async function getNextConcurrentJob(teamId: string): Promise<{
job: ConcurrencyLimitedJob;
timeout: number;
} | null> {
Expand Down Expand Up @@ -283,6 +283,11 @@ async function getNextConcurrentJob(teamId: string): Promise<{
export async function concurrentJobDone(job: NuQJob<any>) {
if (job.id && job.data && job.data.team_id) {
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
await getRedisConnection().zrem(
constructQueueKey(job.data.team_id),
job.id,
);
await getRedisConnection().del(constructJobKey(job.id));
await cleanOldConcurrencyLimitEntries(job.data.team_id);
await cleanOldConcurrencyLimitedJobs(job.data.team_id);

Expand All @@ -291,89 +296,92 @@ export async function concurrentJobDone(job: NuQJob<any>) {
await cleanOldCrawlConcurrencyLimitEntries(job.data.crawl_id);
}

let i = 0;
for (; i < 10; i++) {
const maxTeamConcurrency =
(
await getACUCTeam(
job.data.team_id,
false,
true,
job.data.is_extract
? RateLimiterMode.Extract
: RateLimiterMode.Crawl,
)
)?.concurrency ?? 2;
const maxTeamConcurrency =
(
await getACUCTeam(
job.data.team_id,
false,
true,
job.data.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl,
)
)?.concurrency ?? 2;

let staleSkipped = 0;
while (staleSkipped < 100) {
const currentActiveConcurrency = (
await getConcurrencyLimitActiveJobs(job.data.team_id)
).length;

if (currentActiveConcurrency < maxTeamConcurrency) {
const nextJob = await getNextConcurrentJob(job.data.team_id);
if (nextJob !== null) {
await pushConcurrencyLimitActiveJob(
job.data.team_id,
nextJob.job.id,
60 * 1000,
);
if (currentActiveConcurrency >= maxTeamConcurrency) break;

if (nextJob.job.data.crawl_id) {
await pushCrawlConcurrencyLimitActiveJob(
nextJob.job.data.crawl_id,
nextJob.job.id,
60 * 1000,
);

const sc = await getCrawl(nextJob.job.data.crawl_id);
if (sc !== null && typeof sc.crawlerOptions?.delay === "number") {
await new Promise(resolve =>
setTimeout(resolve, sc.crawlerOptions.delay * 1000),
);
}
}
const nextJob = await getNextConcurrentJob(job.data.team_id);
if (nextJob === null) break;

abTestJob(nextJob.job.data);

const promotedSuccessfully =
(await scrapeQueue.promoteJobFromBacklogOrAdd(
nextJob.job.id,
nextJob.job.data,
{
priority: nextJob.job.priority,
listenable: nextJob.job.listenable,
ownerId: nextJob.job.data.team_id ?? undefined,
groupId: nextJob.job.data.crawl_id ?? undefined,
},
)) !== null;

if (promotedSuccessfully) {
logger.debug("Successfully promoted concurrent queued job", {
teamId: job.data.team_id,
jobId: nextJob.job.id,
zeroDataRetention: nextJob.job.data?.zeroDataRetention,
});
break;
} else {
logger.warn(
"Was unable to promote concurrent queued job as it already exists in the database",
{
teamId: job.data.team_id,
jobId: nextJob.job.id,
zeroDataRetention: nextJob.job.data?.zeroDataRetention,
},
);
}
} else {
break;
await pushConcurrencyLimitActiveJob(
job.data.team_id,
nextJob.job.id,
60 * 1000,
);

if (nextJob.job.data.crawl_id) {
await pushCrawlConcurrencyLimitActiveJob(
nextJob.job.data.crawl_id,
nextJob.job.id,
60 * 1000,
);

const sc = await getCrawl(nextJob.job.data.crawl_id);
if (sc !== null && typeof sc.crawlerOptions?.delay === "number") {
await new Promise(resolve =>
setTimeout(resolve, sc.crawlerOptions.delay * 1000),
);
}
} else {
}

abTestJob(nextJob.job.data);

const promotedSuccessfully =
(await scrapeQueue.promoteJobFromBacklogOrAdd(
nextJob.job.id,
nextJob.job.data,
{
priority: nextJob.job.priority,
listenable: nextJob.job.listenable,
ownerId: nextJob.job.data.team_id ?? undefined,
groupId: nextJob.job.data.crawl_id ?? undefined,
},
)) !== null;

if (promotedSuccessfully) {
logger.debug("Successfully promoted concurrent queued job", {
teamId: job.data.team_id,
jobId: nextJob.job.id,
zeroDataRetention: nextJob.job.data?.zeroDataRetention,
});
break;
} else {
logger.warn(
"Was unable to promote concurrent queued job as it already exists in the database",
{
teamId: job.data.team_id,
jobId: nextJob.job.id,
zeroDataRetention: nextJob.job.data?.zeroDataRetention,
},
);
await removeConcurrencyLimitActiveJob(job.data.team_id, nextJob.job.id);
if (nextJob.job.data.crawl_id) {
await removeCrawlConcurrencyLimitActiveJob(
nextJob.job.data.crawl_id,
nextJob.job.id,
);
}
staleSkipped++;
}
}

if (i === 10) {
if (staleSkipped >= 100) {
logger.warn(
"Failed to promote a concurrent job after 10 iterations, bailing!",
"Skipped 100 stale entries in concurrency queue without a successful promotion",
{
teamId: job.data.team_id,
},
Expand Down
Loading
Loading