Introduction
The Solace PubSub+ Platform's software event broker efficiently streams event-driven information between applications, IoT devices and user interfaces running in the cloud, on-premises, and hybrid environments using open APIs and protocols like AMQP, JMS, MQTT, REST and WebSocket. It can be installed into a variety of public and private clouds, PaaS, and on-premises environments, and brokers in multiple locations can be linked together in an event mesh to dynamically share events across the distributed enterprise.
Quarkus Extension for Solace
Solace Quarkus Extension provides the ability to integrate with Solace PubSub+ message brokers. The events published by this extension to solace will be available on event mesh and in the same way the extension can subscribe to any event available on event mesh.
Users have the choice to use the extension in two ways
1. com.solace.quarkus:quarkus-solace-client
This extension provides only Solace Java Messaging API and users need to have their own implementation and configuration to interact with Solace PubSub+ broker.
If you want to use this extension, you need to add the com.solace.quarkus:quarkus-solace-client
extension first to your build file.
For instance, with Maven, add the following dependency to your POM file:
<dependency>
<groupId>com.solace.quarkus</groupId>
<artifactId>quarkus-solace-client</artifactId>
<version>3.0.0</version>
</dependency>
2. com.solace.quarkus:quarkus-solace-messaging-connector
This extension is based on reactive messaging framework and provides pre-defined configurations for incoming and outgoing channels.
If you want to use this extension, you need to add the com.solace.quarkus:quarkus-solace-messaging-connector
extension first to your build file.
For instance, with Maven, add the following dependency to your POM file:
<dependency>
<groupId>com.solace.quarkus</groupId>
<artifactId>quarkus-solace-messaging-connector</artifactId>
<version>3.0.0</version>
</dependency>
Solace Quarkus Extension 3.0.0 is based on Quarkus 3.8.6 and support for Java 11 is deprecated in Quarkus 3.8.6. Solace Quarkus Extension uses Java 17 as baseline. |
Extension Configuration Reference
Configuration property fixed at build time - All other configuration properties are overridable at runtime
Type |
Default |
|
---|---|---|
Whether a metrics is enabled in case the micrometer is present. Environment variable: |
boolean |
|
Whether the liveness health check should be exposed if the smallrye-health extension is present. Environment variable: |
boolean |
|
If DevServices has been explicitly enabled or disabled. DevServices is generally enabled by default, unless there is an existing configuration present. When DevServices is enabled Quarkus will attempt to automatically configure and start the Solace broker when running in Dev or Test mode and when Docker is running. Environment variable: |
boolean |
|
The container image name to use, for container based DevServices providers. Environment variable: |
string |
|
Indicates if the Solace broker managed by Quarkus Dev Services is shared. When shared, Quarkus looks for running containers using label-based service discovery. If a matching container is found, it is used, and so a second one is not started. Otherwise, Dev Services for Solace starts a new container. The discovery uses the Container sharing is only used in dev mode. Environment variable: |
boolean |
|
The value of the This property is used when you need multiple shared Solace broker. Environment variable: |
string |
|
The Solace host (hostname:port) Environment variable: |
string |
required |
The Solace VPN Environment variable: |
string |
required |
Environment variables that are passed to the container. Environment variable: |
|
|
Any extra parameters to pass to the Solace client. Environment variable: |
|
Incoming Configuration Reference
Incoming configuration for Quarkus Solace Messaging Connector
Type |
Default |
|
---|---|---|
The back-pressure strategy to be applied for direct message consumer. Supported values are |
string |
|
It is possible for the client application to consume messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages. |
int |
|
The queue name of receiver. |
string |
required |
The queue type of receiver. Supported values |
string |
|
Create resources on broker if not available. When set to |
string |
|
Whether to add configured subscriptions to queue. Will fail if permissions to configure subscriptions is not allowed on broker. |
boolean |
|
The comma separated list of subscriptions, the channel name if empty. If If |
string |
required |
The receiver selector query. If configured, broker will filter messages using this condition. |
string |
|
The receiver replay strategy. Supported values |
string |
|
The receiver replay timebased start time. Set this value if |
string |
|
The receiver replay replication group message id. |
string |
|
Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are Following are the failure strategies supported when
Following are the failure strategies supported when
|
string |
ignore |
The error topic where message should be published in case of error. |
string |
|
Whether error message is eligible to move to dead message queue. |
boolean |
|
TTL for Error message before moving to dead message queue. |
long |
|
Maximum number of attempts to send a failed message to the error topic in case of failure. Each attempt will have a backoff interval of 1 second. When all delivery attempts have been exhausted, the failed message will be requeued on the queue for redelivery. |
int |
|
Whether to enable negative acknowledgments on failed messages. Nacks are supported on event brokers 10.2.1 and later. If an event broker does not support Nacks, an exception is thrown. |
boolean |
|
Outgoing Configuration Reference
Outgoing configuration for Quarkus Solace Messaging Connector
Type |
Default |
|
---|---|---|
In case of direct messaging broker will not send any acknowledgement for published messages. We can configure a timeout in milliseconds to check for any publish failures. If no failed event is received during this timeout published message is assumed to be successful. |
long |
|
The topic to publish messages, by default the channel name. |
string |
required |
The maximum number of messages to be written to Solace broker. It limits the number of messages waiting to be written and acknowledged by the broker. You can set this attribute to |
long |
|
Whether the client waits to receive publish receipt from Solace broker before sending acknowledgment. This property is considered only when In case of |
boolean |
|
Timeout to receive the publish receipt from broker. |
integer |
|
Publish Window will determine the maximum number of messages the application can send before the Solace API must receive an acknowledgment from the Solace. |
integer |
|
It is possible for the client application to publish messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages. Supported strategies |
string |
|
Outgoing messages backpressure buffer capacity. |
integer |
|
Common Configuration Reference
Common configuration for Quarkus Solace Messaging Connector Incoming and Outgoing channels
Type |
Default |
|
---|---|---|
The type of client when establishing connection to Solace. Solace supports two types of client Direct and Persistent. Use Direct client where message loss can be tolerated. The publisher publishes the event and the broker doesn’t send any acknowledgement back to publisher for guaranteed delivery. Use Persistent client where message loss cannot be tolerated. The publisher publishes the event and the broker sends an acknowledgement that message is guaranteed for delivery. |
string |
|
Whether the receiver or publisher is started at initialization or lazily at subscription time. |
boolean |
|
Whether to shutdown client gracefully. |
boolean |
|
Timeout in milliseconds to wait for messages to finish processing before shutdown. |
long |
|
Whether to enable or disable tracing for consumer or producer. |
boolean |
|
Configuring Quarkus Solace Client
Solace Broker supports different ways to connect and authenticate users. This section shows how to pass extra properties to quarkus solace client in different scenarios. Please refer to Solace Properties for supported properties and definitions.
1. Connecting to a standalone broker with basic authentication
quarkus.solace.host=tcp://localhost:55555
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=test
quarkus.solace.authentication.basic.password=test
2. Connecting to a standalone broker with TLS and basic authentication. This assumes you have a publicly signed CA.
quarkus.solace.host=tcps://localhost:55443
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=test
quarkus.solace.authentication.basic.password=test
3. Connecting to a standalone broker with TLS, basic authentication and custom trust store which hold your CA.
quarkus.solace.host=tcps://localhost:55443
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=test
quarkus.solace.authentication.basic.password=test
quarkus.solace.tls.trust-store-path=
quarkus.solace.tls.trust-store-type=
quarkus.solace.tls.trust-store-password=
4. Connecting to a standalone broker with TLS, trust store and client certificate authentication. In case of client certificate authentication broker will read from configured username source(ex: Common Name, Subject Alt Name etc…). Refer to Solace Client Certificate Authentication.
quarkus.solace.host=tcps://localhost:55443
quarkus.solace.vpn=default
quarkus.solace.authentication.scheme=AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE
quarkus.solace.tls.trust-store-path=
quarkus.solace.tls.trust-store-type=
quarkus.solace.tls.trust-store-password=
quarkus.solace.authentication.client-cert.keystore=
quarkus.solace.authentication.client-cert.keystore-password=
quarkus.solace.authentication.client-cert.keystore-format=
5. Connecting to a standalone broker with OAUTH authentication scheme.
quarkus.solace.host=tcp://localhost:55555
quarkus.solace.vpn=default
quarkus.solace.authentication.scheme=AUTHENTICATION_SCHEME_OAUTH2
quarkus.solace.oidc.client-name=solace // client name provided in oidc client config below
quarkus.solace.oidc.refresh.interval=50s // Refresh interval should be less than access token expiry time. Otherwise extension will fail to update access token in solace session.
quarkus.solace.oidc.refresh.timeout=10s // Token Refresh API timeout. Default is set to 10 seconds.
quarkus.oidc-client.solace.auth-server-url=http://localhost:7777/auth/realms/master
quarkus.oidc-client.solace.client-id=<client-id>
quarkus.oidc-client.solace.credentials.secret=<client-secret>
# 'client' is a shortcut for `client_credentials`
quarkus.oidc-client.solace.grant.type=client
6. Connecting to a standalone broker with TLS and OAUTH authentication scheme.
quarkus.solace.host=tcps://localhost:55443
quarkus.solace.vpn=default
quarkus.solace.authentication.scheme=AUTHENTICATION_SCHEME_OAUTH2
quarkus.solace.tls.trust-store-path=
quarkus.solace.tls.trust-store-type=
quarkus.solace.tls.trust-store-password=
quarkus.solace.oidc.client-name=solace // client name provided in oidc client config below
quarkus.solace.oidc.refresh.interval=50s // Refresh interval should be less than access token expiry time. Otherwise extension will fail to update access token in solace session.
quarkus.solace.oidc.refresh.timeout=10s // Token Refresh API timeout. Default is set to 10 seconds.
quarkus.oidc-client.solace.auth-server-url=http://localhost:7777/auth/realms/master
quarkus.oidc-client.solace.client-id=<client-id>
quarkus.oidc-client.solace.credentials.secret=<client-secret>
# 'client' is a shortcut for `client_credentials`
quarkus.oidc-client.solace.grant.type=client
quarkus.oidc-client.solace.tls.trust-store-file=<path-to-truststore-file>
quarkus.oidc-client.solace.tls.key-store-password=
quarkus.oidc-client.solace.tls.verification=<enable-or-disable-tls-verification>
For more details on Quarkus OIDC client supported configuration please refer to OPENID CONNECT (OIDC) AND OAUTH2 CLIENT AND FILTERS and OIDC configuration reference
The current version is tested with client_credentials grant type where Solace broker is configured as Resource Server. |
7. Connecting to a HA broker. Here you can configure both active and standby URL and the client will switch connectivity based on availability. Remaining configurations and authentication mechanisms can be used as-is.
quarkus.solace.host=tcp://active-host-name:55555,tcp://standby-host-name:55555
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=test
quarkus.solace.authentication.basic.password=test
In similar way other authentication mechanisms can be enabled
Configuring Quarkus Solace Messaging Connector
Reactive Messaging framework supports different messaging backends it employs a generic vocabulary:
-
Applications send and receive messages. A message wraps a payload and can be extended with some metadata. With the Solace connector, a message corresponds to Inbound or Outbound Message.
-
Messages transit on channels. Application components connect to channels to publish and consume messages. The Solace connector maps channels to Solace queues and topics.
-
Channels are connected to message backends using connectors. Connectors are configured to map incoming messages to a specific channel (consumed by the application) and collect outgoing messages sent to a specific channel. Each connector is dedicated to a specific messaging technology. For example, the connector dealing with Solace is named
quarkus-solace
.
The extension supports two types of messaging client direct
and persistent
. By default persistent
client is enabled.
A minimal configuration for the Solace connector with an incoming channel and persistent
client looks like the following:
The following lines of configuration assumes that a exclusive queue is already provisioned on the broker
quarkus.solace.host=tcp://localhost:55555
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=basic
quarkus.solace.authentication.basic.password=basic
mp.messaging.incoming.temperatures.connector=quarkus-solace
mp.messaging.incoming.temperatures.consumer.queue.name=temperatures
The extension also supports provisioning queues and subscriptions on broker given that the user has role access to create queues with subscriptions. Configuration is as follows
quarkus.solace.host=tcp://localhost:55555
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=basic
quarkus.solace.authentication.basic.password=basic
mp.messaging.incoming.temperatures.connector=quarkus-solace
mp.messaging.incoming.temperatures.consumer.queue.missing-resource-creation-strategy=create-on-start
mp.messaging.incoming.temperatures.consumer.queue.add-additional-subscriptions=true
mp.messaging.incoming.temperatures.consumer.subscriptions=hello/foobar
A minimal configuration for the Solace connector with an incoming channel and direct
client looks like the following:
quarkus.solace.host=tcp://localhost:55555
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=basic
quarkus.solace.authentication.basic.password=basic
mp.messaging.incoming.temperatures.client.type=direct
mp.messaging.incoming.temperatures.connector=quarkus-solace
mp.messaging.incoming.temperatures.consumer.subscriptions=sensor/temperatures
-
When running in dev mode or tests dev services will automatically start a Solace PubSub+ broker and if broker configuration details are not provided the extension automatically picks up the details of broker started by dev services.
-
If
consumer.queue.name
property is not specified, channel name will be used as queue name.
Receiving messages from Solace
Using the previous configuration, Quarkus application can receive message in several possible ways.
Direct Payload
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class TemperaturesConsumer {
@Incoming("temperatures")
public void consume(byte[] temperature) {
// process.
}
}
Message
@ApplicationScoped
public class TemperaturesConsumer {
@Incoming("temperatures")
public CompletionStage<Void> consume(Message<byte[]> msg) {
// access record metadata
SolaceInboundMetadata metadata = msg.getMetadata(SolaceInboundMetadata.class).orElseThrow();
// process the message payload.
double temperature = Double.parseDouble(new String(p.getPayload()));
// Acknowledge the incoming message
return msg.ack();
}
}
SolaceInboundMessage This is a wrapper to incoming Inbound Message from Solace Messaging API
import com.solace.messaging.receiver.InboundMessage;
@ApplicationScoped
public class TemperaturesConsumer {
@Incoming("temperatures")
public void consume(InboundMessage inboundMessage) {
// process the message payload.
String temperature = inboundMessage.getPayloadAsString();
}
}
Acknowledgment Handling
By default, for persistent
client acknowledgement strategy is set to client acknowledgement. This gives greater control over acknowledgement and ensures that messages are acknowledged only after successful processing.
In case of direct
client no acknowledgement is sent to the broker.
@ApplicationScoped
public class TemperaturesConsumer {
@Incoming("temperatures")
public CompletionStage<Void> consume(Message<byte[]> msg) {
// access record metadata
SolaceInboundMetadata metadata = msg.getMetadata(SolaceInboundMetadata.class).orElseThrow();
// process the message payload.
double temperature = Double.parseDouble(new String(p.getPayload()));
// Acknowledge the incoming message
return msg.ack();
}
}
Failure Strategies
If a message is nacked, a failure strategy is applied. Refer to Incoming Configuration Reference[quarkus-solace_quarkus.consumer.failure-strategy]. The default strategy is set to ignore
and move on to next message. Following are the strategies supported by Quarkus Solace Messaging Connector extension.
ignore
- Mark the message as IGNORED, will continue processing with next message. It TTL and DMQ are configured on the queue message will be moved to DMQ once TTL is reached. If no DMQ is configured but TTL is set message will be lost.
fail
- Mark the message as FAILED, broker will redeliver the message. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version.
discard
- Mark the message as REJECTED, broker will discard the message. The message will be moved to DMQ if DMQ is configured for queue and DMQ Eligible is set on message otherwise message will be lost. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version.
error_topic
- Will publish the message to configured error topic, on success the message will be acknowledged in the queue.
Sending messages to Solace
Outgoing channel configuration to publish persistent
messages to Solace.
quarkus.solace.host=tcp://localhost:55555
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=basic
quarkus.solace.authentication.basic.password=basic
mp.messaging.outgoing.temperatures-out.connector=quarkus-solace
mp.messaging.outgoing.temperatures-out.producer.topic=temperatures
Outgoing channel configuration to publish direct
messages to Solace.
quarkus.solace.host=tcp://localhost:55555
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=basic
quarkus.solace.authentication.basic.password=basic
mp.messaging.outgoing.temperatures-out.client.type=direct
mp.messaging.outgoing.temperatures-out.connector=quarkus-solace
mp.messaging.outgoing.temperatures-out.producer.topic=temperatures
-
When running in dev mode or tests dev services will automatically start a Solace PubSub+ broker and if broker configuration details are not provided the extension automatically picks up the details of broker started by dev services.
-
If
producer.topic
property is not specified, channel name will be used as topic name.
Using the previous configuration Quarkus application can publish messages as follows
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class TemperaturesProducer {
private final Random random = new Random();
@Outgoing("temperatures-out")
public Multi<Double> generate() {
// Emit 1000 records
return Multi.createFrom().range(0, 1000)
.map(x -> random.nextDouble());
}
}
You can also generate a org.eclipse.microprofile.reactive.messaging.Message
with required metadata and publish to Solace.
@ApplicationScoped
public class TemperaturesProducer {
private final Random random = new Random();
@Outgoing("temperatures-out")
Multi<Message<Double>> publishTemperatures() {
return Multi.createFrom().range(0, 1000)
.map(i -> {
SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
.setApplicationMessageId(Integer.toString(i)).createPubSubOutboundMetadata();
return Message.of(random.nextDouble(), Metadata.of(outboundMetadata));
});
}
}
SolaceOutboundMetadata allows to configure metadata for the message. It supports all the headers supported by Solace and custom user properties. In addition to this it also supports configuring dynamic topic which overrides the default topic in application configuration file.
Generating org.eclipse.microprofile.reactive.messaging.Message
with dynamic topic and publish to Solace.
@ApplicationScoped
public class TemperaturesProducer {
private final Random random = new Random();
@Outgoing("temperatures-out")
Multi<Message<Double>> publishTemperatures() {
return Multi.createFrom().range(0, 1000)
.map(i -> {
SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
.setApplicationMessageId(Integer.toString(i))
.setDynamicDestination("device/" + Integer.toString(i) + "/temperature").createPubSubOutboundMetadata();
return Message.of(random.nextDouble(), Metadata.of(outboundMetadata));
});
}
}
Generating org.eclipse.microprofile.reactive.messaging.Message
with partition key and publish to Solace.
@ApplicationScoped
public class TemperaturesProducer {
private final Random random = new Random();
@Outgoing("temperatures-out")
Multi<Message<Double>> publishTemperatures() {
return Multi.createFrom().range(0, 1000)
.map(i -> {
String partitionKey = "Group-1";
if(i % 2 == 0) {
partitionKey = "Group-2";
}
SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
.setApplicationMessageId(Integer.toString(i))
.setPartitionKey(partitionKey).createPubSubOutboundMetadata();
return Message.of(random.nextDouble(), Metadata.of(outboundMetadata));
});
}
}
Sending messages with @Emitter
@Path("/temperatures")
public class PublisherResource {
@Channel("temperatures-out")
MutinyEmitter<Temperature> temperatureEmitter;
@POST
@Path("/publish")
public Uni<Void> publish(Temperature temperature) {
return temperatureEmitter.send(temperature);
}
}
Producer Acknowledgement
Producer can return successful acknowledgement when [quarkus-solace_quarkus.producer.waitForPublishReceipt] is enabled.
This property is considered only when [quarkus-solace_quarkus.client.type] is set to persistent
. The connector will wait for response from broker and will return success or failed acknowledgement.
In case of [quarkus-solace_quarkus.client.type] is set to direct
this property is ignored as broker will not send any response. By default, success acknowledgement is returned and any failures during publish are logged as exceptions.
Producer Back-Pressure strategies
Quarkus Solace Messaging connector provides three different strategies to handle back-pressure when publishing messages
1.Reject - Publisher will start rejecting messages once specified limit is reached
2.Wait - Publisher is throttled when a specified limit is reached
3.Elastic - Use an unlimited internal buffer (default)
In the current version we don’t recommend to use back-pressure strategy Reject as it is in evolving phase.
|
Refer to Outgoing Configuration Reference[quarkus-solace_quarkus.producer.back-pressure.strategy] and Outgoing Configuration Reference[quarkus-solace_quarkus.producer.back-pressure.buffer-capacity] on how to configure back-pressure for producer.
Processing Messages
Applications streaming data often need to consume some events from a topic, process them and publish the result to a different topic. A processor method can be simply implemented using both the @Incoming and @Outgoing annotations:
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class TemperaturesProcessor {
@Incoming("temperatures-in")
@Outgoing("temperatures-out")
public double process(byte[] temperature) {
return (Double.parseDouble(new String(p.getPayload())) - 32) * 5 / 9;
}
}
Open Telemetry Tracing
Extension supports generating trace messages for th messages consumed and published by the extension. To enabling tracing for consumers and producers use the below configuration.
quarkus.solace.host=tcp://localhost:55555
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=test
quarkus.solace.authentication.basic.password=test
mp.messaging.incoming.temperatures.connector=quarkus-solace
mp.messaging.incoming.temperatures.consumer.queue.name=temperatures
mp.messaging.incoming.temperatures.client.tracing-enabled=true
mp.messaging.outgoing.temperatures-out.connector=quarkus-solace
mp.messaging.outgoing.temperatures-out.producer.topic=temperatures
mp.messaging.outgoing.temperatures-out.client.tracing-enabled=true
Context Propagation is not fully supported in current version. |
Health Checks
Quarkus provides several health checks for Solace. These checks are used in combination with the quarkus-smallrye-health extension.
Reactive Messaging Health Checks
When using Reactive Messaging and the Quarkus Solace Messaging Connector, each configured channel (incoming or outgoing) provides startup, liveness and readiness checks.
The startup check verifies that the communication with Solace Broker is established.
The liveness check captures any unrecoverable failure happening during the communication with Solace.
The readiness check verifies that the Quarkus Solace Messaging Connector is ready to consume/produce messages to the configured Solace queues/topics.
Dev Services
Solace Dev Services for Quarkus will spin up latest version of Solace PubSub standard with label solace
when running tests or in dev mode. Solace Dev Services are enabled by default and will check for any existing containers with same label to reuse. If none is present a new container is started.
Metrics
Quarkus Solace Messaging Connector exposes different metrics provided by Solace Java Messaging API. The metrics are enabled by default and can be accessed at http://localhost:8080/q/dev-ui/io.quarkus.quarkus-micrometer/prometheus