Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,41 @@ The `cf-java-client` project is a Java language binding for interacting with a C
## Versions
The Cloud Foundry Java Client has two active versions. The `5.x` line is compatible with Spring Boot `2.4.x - 2.6.x` just to manage its dependencies, while the `4.x` line uses Spring Boot `2.3.x`.

## Deprecations

### `DopplerClient.recentLogs()` — Recent Logs via Doppler

> [!WARNING]
> **Deprecated since cf-java-client `5.17.x`**
>
> The `DopplerClient.recentLogs()` endpoint (and the related `RecentLogsRequest` / `LogMessage` types from the `org.cloudfoundry.doppler` package) are **deprecated** and will be removed in a future release.
>
> This API relies on the [Loggregator][loggregator] Doppler/Traffic Controller endpoint `/apps/{id}/recentlogs`, which was removed in **Loggregator ≥ 107.0**.
> The affected platform versions are:
>
> | Platform | Last version with Doppler recent-logs support |
> | -------- | --------------------------------------------- |
> | CF Deployment (CFD) | `< 24.3` |
> | Tanzu Application Service (TAS) | `< 4.0` |
>
> **Migration:** Replace any call to `DopplerClient.recentLogs()` with [`LogCacheClient.read()`][log-cache-api] (available via `org.cloudfoundry.logcache.v1.LogCacheClient`).
>
> ```java
> // Before (deprecated)
> dopplerClient.recentLogs(RecentLogsRequest.builder()
> .applicationId(appId)
> .build());
>
> // After
> logCacheClient.read(ReadRequest.builder()
> .sourceId(appId)
> .envelopeTypes(EnvelopeType.LOG)
> .build());
> ```

[loggregator]: https://github.com/cloudfoundry/loggregator
[log-cache-api]: https://github.com/cloudfoundry/log-cache

## Dependencies
Most projects will need two dependencies; the Operations API and an implementation of the Client API. For Maven, the dependencies would be defined like this:

Expand Down Expand Up @@ -76,6 +111,9 @@ Both the `cloudfoundry-operations` and `cloudfoundry-client` projects follow a [

### `CloudFoundryClient`, `DopplerClient`, `UaaClient` Builders

> [!NOTE]
> **`DopplerClient` — partial deprecation:** The `recentLogs()` method on `DopplerClient` is deprecated and only works against Loggregator \< 107.0 (CFD \< 24.3 / TAS \< 4.0). See the [Deprecations](#deprecations) section above for the migration path to `LogCacheClient`.

The lowest-level building blocks of the API are `ConnectionContext` and `TokenProvider`. These types are intended to be shared between instances of the clients, and come with out of the box implementations. To instantiate them, you configure them with builders:

```java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,24 @@

package org.cloudfoundry.reactor.logcache.v1;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.cloudfoundry.logcache.v1.Envelope;
import org.cloudfoundry.logcache.v1.EnvelopeBatch;
import org.cloudfoundry.logcache.v1.EnvelopeType;
import org.cloudfoundry.logcache.v1.InfoRequest;
import org.cloudfoundry.logcache.v1.InfoResponse;
import org.cloudfoundry.logcache.v1.MetaRequest;
import org.cloudfoundry.logcache.v1.MetaResponse;
import org.cloudfoundry.logcache.v1.ReadRequest;
import org.cloudfoundry.logcache.v1.ReadResponse;
import org.cloudfoundry.logcache.v1.TailLogsRequest;
import org.cloudfoundry.reactor.ConnectionContext;
import org.cloudfoundry.reactor.TokenProvider;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

final class ReactorLogCacheEndpoints extends AbstractLogCacheOperations {
Expand All @@ -48,4 +57,91 @@ Mono<MetaResponse> meta(MetaRequest request) {
Mono<ReadResponse> read(ReadRequest request) {
return get(request, ReadResponse.class, "read", request.getSourceId()).checkpoint();
}

Mono<ReadResponse> recentLogs(ReadRequest request) {
return read(request);
}

/**
* Continuously polls Log Cache and emits new {@link Envelope}s as they appear.
*
* <p>Algorithm (mirrors the Go {@code logcache.Walk()} implementation):
* <ol>
* <li>Start from {@code startTime} (defaults to now – 5 s in nanoseconds).</li>
* <li>Issue a {@code /api/v1/read} with {@code start_time = cursor}.</li>
* <li>Emit every returned envelope in ascending timestamp order and advance cursor
* to {@code lastTimestamp + 1}.</li>
* <li>If the response was empty, wait {@code pollInterval} before next poll.</li>
* <li>Repeat indefinitely until cancelled.</li>
* </ol>
*/
Flux<Envelope> logsTail(TailLogsRequest request) {
long defaultStartNanos = (System.currentTimeMillis() - 5_000L) * 1_000_000L;
AtomicLong cursor =
new AtomicLong(
request.getStartTime() != null
? request.getStartTime()
: defaultStartNanos);

List<EnvelopeType> envelopeTypes =
request.getEnvelopeTypes() != null
? request.getEnvelopeTypes()
: Collections.emptyList();
String nameFilter = request.getNameFilter();

// One poll: read from cursor, sort, emit, advance cursor.
// Returns empty Mono when batch is empty (triggers delay+repeat).
Flux<Envelope> poll =
Mono.defer(
() -> {
ReadRequest.Builder builder =
ReadRequest.builder()
.sourceId(request.getSourceId())
.startTime(cursor.get());
if (!envelopeTypes.isEmpty()) {
builder.envelopeTypes(envelopeTypes);
}
if (nameFilter != null && !nameFilter.isEmpty()) {
builder.nameFilter(nameFilter);
}
return read(builder.build())
.map(ReadResponse::getEnvelopes)
.map(EnvelopeBatch::getBatch)
.onErrorReturn(Collections.emptyList());
})
.flatMapMany(
batch -> {
if (batch.isEmpty()) {
return Flux.empty();
}
List<Envelope> sorted = new ArrayList<>(batch);
sorted.sort(
(a, b) -> {
long ta =
a.getTimestamp() != null
? a.getTimestamp()
: 0L;
long tb =
b.getTimestamp() != null
? b.getTimestamp()
: 0L;
return Long.compare(ta, tb);
});
long last =
sorted.get(sorted.size() - 1).getTimestamp() != null
? sorted.get(sorted.size() - 1).getTimestamp()
: cursor.get();
cursor.set(last + 1);
return Flux.fromIterable(sorted);
});

// Repeat poll indefinitely; when empty (no new data) delay before next attempt.
return poll.repeatWhen(
flux ->
flux.flatMap(
count ->
count == 0
? Mono.delay(request.getPollInterval())
: Mono.just(count)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@

package org.cloudfoundry.reactor.logcache.v1;

import org.cloudfoundry.logcache.v1.Envelope;
import org.cloudfoundry.logcache.v1.InfoRequest;
import org.cloudfoundry.logcache.v1.InfoResponse;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.logcache.v1.MetaRequest;
import org.cloudfoundry.logcache.v1.MetaResponse;
import org.cloudfoundry.logcache.v1.ReadRequest;
import org.cloudfoundry.logcache.v1.ReadResponse;
import org.cloudfoundry.logcache.v1.TailLogsRequest;
import org.cloudfoundry.reactor.ConnectionContext;
import org.cloudfoundry.reactor.TokenProvider;
import org.immutables.value.Value;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.net.URI;
Expand Down Expand Up @@ -53,6 +56,16 @@ public Mono<ReadResponse> read(ReadRequest request) {
return getReactorLogCacheEndpoints().read(request);
}

@Override
public Mono<ReadResponse> recentLogs(ReadRequest request) {
return getReactorLogCacheEndpoints().recentLogs(request);
}

@Override
public Flux<Envelope> logsTail(TailLogsRequest request) {
return getReactorLogCacheEndpoints().logsTail(request);
}

/**
* The connection context
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ public interface DopplerClient {
/**
* Makes the <a href="https://github.com/cloudfoundry/loggregator/tree/develop/src/trafficcontroller#endpoints">Recent Logs</a> request
*
* @deprecated Use {@link org.cloudfoundry.logcache.v1.LogCacheClient#recentLogs(org.cloudfoundry.logcache.v1.ReadRequest)} instead.
* Only works with {@code Loggregator < 107.0}, shipped in {@code CFD < 24.3} and {@code TAS < 4.0}.
* @param request the Recent Logs request
* @return the events from the recent logs
* @return a flux of events from the recent logs
* @see <a href="https://github.com/cloudfoundry/loggregator">Loggregator</a>
* @see <a href="https://github.com/cloudfoundry/log-cache">Log Cache</a>
* @see org.cloudfoundry.logcache.v1.LogCacheClient#recentLogs(org.cloudfoundry.logcache.v1.ReadRequest)
*/
@Deprecated
Flux<Envelope> recentLogs(RecentLogsRequest request);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.cloudfoundry.logcache.v1;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
Expand Down Expand Up @@ -46,4 +47,25 @@ public interface LogCacheClient {
* @return the read response
*/
Mono<ReadResponse> read(ReadRequest request);

/**
* Makes the Log Cache RecentLogs /api/v1/read request
*
* @param request the Recent Logs request
* @return the events from the recent logs
*/
Mono<ReadResponse> recentLogs(ReadRequest request);

/**
* Continuously polls the Log Cache /api/v1/read endpoint and streams new {@link Envelope}s
* as they appear. This is the Java equivalent of the Go {@code logcache.Walk()} API and
* {@code cf tail --follow}.
* <p>
* The returned {@link Flux} will never complete on its own – unsubscribe (or cancel) it to
* stop streaming.
*
* @param request the tail request (source id, optional filters, poll interval)
* @return an infinite stream of envelopes
*/
Flux<Envelope> logsTail(TailLogsRequest request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.cloudfoundry.logcache.v1;

import org.cloudfoundry.Nullable;
import org.immutables.value.Value;

import java.time.Duration;
import java.util.List;

/**
* The request options for the Log Cache tail (streaming follow) operation.
* This continuously polls the Log Cache /api/v1/read endpoint, emitting new envelopes
* as they appear – equivalent to {@code cf tail --follow} or the Go {@code logcache.Walk()} API.
*/
@Value.Immutable
abstract class _TailLogsRequest {

/**
* The source id (application guid or service guid) to stream logs for.
*/
abstract String getSourceId();

/**
* Optional start time (UNIX nanoseconds). Defaults to "now – 5 seconds" when not set.
*/
@Nullable
abstract Long getStartTime();

/**
* Optional envelope type filter.
*/
@Nullable
abstract List<EnvelopeType> getEnvelopeTypes();

/**
* Optional regex name filter (requires Log Cache ≥ 2.1.0).
*/
@Nullable
abstract String getNameFilter();

/**
* How long to wait between successive polls when no new envelopes are available.
* Defaults to 250 ms (matching the Go client's {@code AlwaysRetryBackoff}).
*/
@Value.Default
Duration getPollInterval() {
return Duration.ofMillis(250);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.cloudfoundry.client.v3.spaces.ListSpacesRequest;
import org.cloudfoundry.client.v3.spaces.SpaceResource;
import org.cloudfoundry.doppler.DopplerClient;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.networking.NetworkingClient;
import org.cloudfoundry.operations.advanced.Advanced;
import org.cloudfoundry.operations.advanced.DefaultAdvanced;
Expand Down Expand Up @@ -79,7 +80,7 @@ public Advanced advanced() {
@Override
@Value.Derived
public Applications applications() {
return new DefaultApplications(getCloudFoundryClientPublisher(), getDopplerClientPublisher(), getSpaceId());
return new DefaultApplications(getCloudFoundryClientPublisher(), getDopplerClientPublisher(), getLogCacheClientPublisher(), getSpaceId());
}

@Override
Expand Down Expand Up @@ -197,6 +198,19 @@ Mono<DopplerClient> getDopplerClientPublisher() {
.orElse(Mono.error(new IllegalStateException("DopplerClient must be set")));
}

/**
* The {@link LogCacheClient} to use for operations functionality
*/
@Nullable
abstract LogCacheClient getLogCacheClient();

@Value.Derived
Mono<LogCacheClient> getLogCacheClientPublisher() {
return Optional.ofNullable(getLogCacheClient())
.map(Mono::just)
.orElse(Mono.error(new IllegalStateException("LogCacheClient must be set")));
}

/**
* The {@link NetworkingClient} to use for operations functionality
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
package org.cloudfoundry.operations.applications;

import org.cloudfoundry.doppler.LogMessage;
import org.cloudfoundry.logcache.v1.Envelope;
import org.cloudfoundry.logcache.v1.Log;
import org.cloudfoundry.logcache.v1.ReadRequest;
import org.cloudfoundry.logcache.v1.TailLogsRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -126,6 +130,25 @@ public interface Applications {
@Deprecated
Flux<LogMessage> logs(LogsRequest request);

/**
* List the applications logs from logCacheClient.
* If no messages are available, an empty Flux is returned.
*
* @param request the application logs request
* @return the applications logs
*/
Flux<Log> logsRecent(ReadRequest request);

/**
* Continuously streams application log envelopes from Log Cache by repeatedly polling
* the {@code /api/v1/read} endpoint. The returned {@link Flux} is infinite – cancel it
* to stop streaming. This is the Java equivalent of {@code cf tail --follow}.
*
* @param request the tail request (source id, optional filters, poll interval)
* @return an infinite stream of envelopes
*/
Flux<Envelope> logsTail(TailLogsRequest request);

/**
* List the applications logs.
* Only works with {@code Loggregator < 107.0}, shipped in {@code CFD < 24.3}
Expand Down
Loading