Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,56 +11,52 @@ namespace NServiceBus;

public static class FunctionsHostApplicationBuilderExtensions
{
extension(FunctionsApplicationBuilder builder)
public static void AddNServiceBusFunction(this FunctionsApplicationBuilder builder, FunctionManifest functionManifest)
{
public void AddNServiceBusFunction(FunctionManifest functionManifest)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(functionManifest);

builder.Services.AddAzureClientsCore();

var endpointName = functionManifest.Name;
var endpointConfiguration = new EndpointConfiguration(endpointName);
endpointConfiguration.AssemblyScanner().Disable = true;
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(functionManifest);

functionManifest.Configuration(endpointConfiguration, builder.Configuration, builder.Environment);
builder.Services.AddAzureClientsCore();

var settings = endpointConfiguration.GetSettings();
if (settings.GetOrDefault<bool>(AzureServiceBusServerlessTransport.SendOnlyConfigKey))
{
throw new InvalidOperationException($"Functions can't be send only endpoints, use {nameof(AddSendOnlyNServiceBusEndpoint)}");
}
var endpointName = functionManifest.Name;
var endpointConfiguration = new EndpointConfiguration(endpointName);
endpointConfiguration.AssemblyScanner().Disable = true;

var transport = GetTransport(settings);
functionManifest.Configuration(endpointConfiguration, builder.Configuration, builder.Environment);

if (functionManifest.Name != functionManifest.Queue)
{
endpointConfiguration.OverrideLocalAddress(functionManifest.Queue);
}

transport.ConnectionName = functionManifest.ConnectionName;
builder.Services.AddNServiceBusEndpoint(endpointConfiguration);
builder.Services.AddKeyedSingleton<MessageProcessor>(endpointName, (_, _) => new MessageProcessor(transport, endpointName));
var settings = endpointConfiguration.GetSettings();
if (settings.GetOrDefault<bool>(AzureServiceBusServerlessTransport.SendOnlyConfigKey))
{
throw new InvalidOperationException($"Functions can't be send only endpoints, use {nameof(AddSendOnlyNServiceBusEndpoint)}");
}

public void AddSendOnlyNServiceBusEndpoint(string endpointName,
Action<EndpointConfiguration> configure)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(endpointName);
ArgumentNullException.ThrowIfNull(configure);
var transport = GetTransport(settings);

var endpointConfiguration = new EndpointConfiguration(endpointName);
endpointConfiguration.AssemblyScanner().Disable = true;
configure(endpointConfiguration);
endpointConfiguration.SendOnly();
if (functionManifest.Name != functionManifest.Queue)
{
endpointConfiguration.OverrideLocalAddress(functionManifest.Queue);
}

// Make sure that the correct transport is used
_ = GetTransport(endpointConfiguration.GetSettings());
transport.ConnectionName = functionManifest.ConnectionName;
builder.Services.AddNServiceBusEndpoint(endpointConfiguration);
builder.Services.AddKeyedSingleton<MessageProcessor>(endpointName, (_, _) => new MessageProcessor(transport, endpointName));
}

builder.Services.AddNServiceBusEndpoint(endpointConfiguration);
}
public static void AddSendOnlyNServiceBusEndpoint(this FunctionsApplicationBuilder builder, string endpointName,
Action<EndpointConfiguration> configure)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(endpointName);
ArgumentNullException.ThrowIfNull(configure);

var endpointConfiguration = new EndpointConfiguration(endpointName);
endpointConfiguration.AssemblyScanner().Disable = true;
configure(endpointConfiguration);
endpointConfiguration.SendOnly();

// Make sure that the correct transport is used
_ = GetTransport(endpointConfiguration.GetSettings());
builder.Services.AddNServiceBusEndpoint(endpointConfiguration);
}

static AzureServiceBusServerlessTransport GetTransport(SettingsHolder settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,23 @@ static class MessageExtensions
}

public string GetMessageId() => message.MessageId ?? Guid.NewGuid().ToString("N");
}

public BinaryData GetBody()
{
var body = message.Body ?? BinaryData.FromBytes(ReadOnlyMemory<byte>.Empty);
var memory = body.ToMemory();

if (memory.IsEmpty ||
!message.ApplicationProperties.TryGetValue(TransportMessageHeaders.TransportEncoding, out var value) ||
!value.Equals("wcf/byte-array"))
{
return body;
}
public static BinaryData GetBody(this ServiceBusReceivedMessage message)
{
var body = message.Body ?? BinaryData.FromBytes(ReadOnlyMemory<byte>.Empty);
var memory = body.ToMemory();

using var reader = XmlDictionaryReader.CreateBinaryReader(body.ToStream(), XmlDictionaryReaderQuotas.Max);
var bodyBytes = (byte[])Deserializer.ReadObject(reader)!;
return new BinaryData(bodyBytes);
if (memory.IsEmpty ||
!message.ApplicationProperties.TryGetValue(TransportMessageHeaders.TransportEncoding, out var value) ||
!value.Equals("wcf/byte-array"))
{
return body;
}

using var reader = XmlDictionaryReader.CreateBinaryReader(body.ToStream(), XmlDictionaryReaderQuotas.Max);
var bodyBytes = (byte[])Deserializer.ReadObject(reader)!;
return new BinaryData(bodyBytes);
}

static readonly DataContractSerializer Deserializer = new(typeof(byte[]));
Expand Down
Loading