/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.opcua.protocol;
import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.opcua.context.SecureChannel;
import org.apache.plc4x.java.opcua.field.OpcuaField;
import org.apache.plc4x.java.opcua.readwrite.*;
import org.apache.plc4x.java.spi.ConversationContext;
import org.apache.plc4x.java.spi.generation.*;
import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionField;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class OpcuaSubscriptionHandle extends DefaultPlcSubscriptionHandle {
private static final Logger LOGGER = LoggerFactory.getLogger(OpcuaSubscriptionHandle.class);
private Set<Consumer<PlcSubscriptionEvent>> consumers;
private List<String> fieldNames;
private SecureChannel channel;
private PlcSubscriptionRequest subscriptionRequest;
private AtomicBoolean destroy = new AtomicBoolean(false);
private OpcuaProtocolLogic plcSubscriber;
private Long subscriptionId;
private long cycleTime;
private long revisedCycleTime;
private boolean complete = false;
private final AtomicLong clientHandles = new AtomicLong(1L);
private ConversationContext<OpcuaAPU> context;
public OpcuaSubscriptionHandle(ConversationContext<OpcuaAPU> context, OpcuaProtocolLogic plcSubscriber, SecureChannel channel, PlcSubscriptionRequest subscriptionRequest, Long subscriptionId, long cycleTime) {
super(plcSubscriber);
this.consumers = new HashSet<>();
this.subscriptionRequest = subscriptionRequest;
this.fieldNames = new ArrayList<>(subscriptionRequest.getFieldNames());
this.channel = channel;
this.subscriptionId = subscriptionId;
this.plcSubscriber = plcSubscriber;
this.cycleTime = cycleTime;
this.revisedCycleTime = cycleTime;
this.context = context;
try {
onSubscribeCreateMonitoredItemsRequest().get();
} catch (Exception e) {
LOGGER.info("Unable to serialize the Create Monitored Item Subscription Message", e);
plcSubscriber.onDisconnect(context);
}
startSubscriber();
}
private CompletableFuture<CreateMonitoredItemsResponse> onSubscribeCreateMonitoredItemsRequest() {
List<ExtensionObjectDefinition> requestList = new ArrayList<>(this.fieldNames.size());
for (int i = 0; i < this.fieldNames.size(); i++) {
final DefaultPlcSubscriptionField fieldDefaultPlcSubscription = (DefaultPlcSubscriptionField) subscriptionRequest.getField(fieldNames.get(i));
NodeId idNode = generateNodeId((OpcuaField) fieldDefaultPlcSubscription.getPlcField());
ReadValueId readValueId = new ReadValueId(
idNode,
0xD,
OpcuaProtocolLogic.NULL_STRING,
new QualifiedName(0, OpcuaProtocolLogic.NULL_STRING));
MonitoringMode monitoringMode;
switch (fieldDefaultPlcSubscription.getPlcSubscriptionType()) {
case CYCLIC:
monitoringMode = MonitoringMode.monitoringModeSampling;
break;
case CHANGE_OF_STATE:
monitoringMode = MonitoringMode.monitoringModeReporting;
break;
case EVENT:
monitoringMode = MonitoringMode.monitoringModeReporting;
break;
default:
monitoringMode = MonitoringMode.monitoringModeReporting;
}
long clientHandle = clientHandles.getAndIncrement();
MonitoringParameters parameters = new MonitoringParameters(
clientHandle,
(double) cycleTime, // sampling interval
OpcuaProtocolLogic.NULL_EXTENSION_OBJECT, // filter, null means use default
1L, // queue size
true // discard oldest
);
MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(
readValueId, monitoringMode, parameters);
requestList.add(request);
}
CompletableFuture<CreateMonitoredItemsResponse> future = new CompletableFuture<>();
RequestHeader requestHeader = new RequestHeader(channel.getAuthenticationToken(),
SecureChannel.getCurrentDateTime(),
channel.getRequestHandle(),
0L,
OpcuaProtocolLogic.NULL_STRING,
SecureChannel.REQUEST_TIMEOUT_LONG,
OpcuaProtocolLogic.NULL_EXTENSION_OBJECT);
CreateMonitoredItemsRequest createMonitoredItemsRequest = new CreateMonitoredItemsRequest(
requestHeader,
subscriptionId,
TimestampsToReturn.timestampsToReturnBoth,
requestList.size(),
requestList
);
ExpandedNodeId expandedNodeId = new ExpandedNodeId(false, //Namespace Uri Specified
false, //Server Index Specified
new NodeIdFourByte((short) 0, Integer.parseInt(createMonitoredItemsRequest.getIdentifier())),
null,
null);
ExtensionObject extObject = new ExtensionObject(
expandedNodeId,
null,
createMonitoredItemsRequest,
false);
try {
WriteBufferByteBased buffer = new WriteBufferByteBased(extObject.getLengthInBytes(), ByteOrder.LITTLE_ENDIAN);
extObject.serialize(buffer);
Consumer<byte[]> consumer = opcuaResponse -> {
CreateMonitoredItemsResponse responseMessage = null;
try {
ExtensionObjectDefinition unknownExtensionObject = ExtensionObject.staticParse(new ReadBufferByteBased(opcuaResponse, ByteOrder.LITTLE_ENDIAN), false).getBody();
if (unknownExtensionObject instanceof CreateMonitoredItemsResponse) {
responseMessage = (CreateMonitoredItemsResponse) unknownExtensionObject;
} else {
ServiceFault serviceFault = (ServiceFault) unknownExtensionObject;
ResponseHeader header = (ResponseHeader) serviceFault.getResponseHeader();
LOGGER.error("Subscription ServiceFault returned from server with error code, '{}'", header.getServiceResult().toString());
plcSubscriber.onDisconnect(context);
}
} catch (ParseException e) {
LOGGER.error("Unable to parse the returned Subscription response", e);
plcSubscriber.onDisconnect(context);
}
MonitoredItemCreateResult[] array = responseMessage.getResults().toArray(new MonitoredItemCreateResult[0]);
for (int index = 0, arrayLength = array.length; index < arrayLength; index++) {
MonitoredItemCreateResult result = array[index];
if (OpcuaStatusCode.enumForValue(result.getStatusCode().getStatusCode()) != OpcuaStatusCode.Good) {
LOGGER.error("Invalid Field {}, subscription created without this field", fieldNames.get(index));
} else {
LOGGER.debug("Field {} was added to the subscription", fieldNames.get(index));
}
}
future.complete(responseMessage);
};
Consumer<TimeoutException> timeout = e -> {
LOGGER.info("Timeout while sending the Create Monitored Item Subscription Message", e);
plcSubscriber.onDisconnect(context);
};
BiConsumer<OpcuaAPU, Throwable> error = (message, e) -> {
LOGGER.info("Error while sending the Create Monitored Item Subscription Message", e);
plcSubscriber.onDisconnect(context);
};
channel.submit(context, timeout, error, consumer, buffer);
} catch (SerializationException e) {
LOGGER.info("Unable to serialize the Create Monitored Item Subscription Message", e);
plcSubscriber.onDisconnect(context);
}
return future;
}
private void sleep(long length) {
try {
Thread.sleep(length);
} catch (InterruptedException e) {
LOGGER.trace("Interrupted Exception");
}
}
/**
* Main subscriber loop. For subscription we still need to send a request the server on every cycle.
* Which includes a request for an update of the previsouly agreed upon list of tags.
* The server will respond at most once every cycle.
*
* @return
*/
public void startSubscriber() {
LOGGER.trace("Starting Subscription");
CompletableFuture.supplyAsync(() -> {
try {
LinkedList<ExtensionObjectDefinition> outstandingAcknowledgements = new LinkedList<>();
List<Long> outstandingRequests = new LinkedList<>();
while (!this.destroy.get()) {
long requestHandle = channel.getRequestHandle();
//If we are waiting on a response and haven't received one, just wait until we do. A keep alive will be sent out eventually
if (outstandingRequests.size() <= 1) {
RequestHeader requestHeader = new RequestHeader(channel.getAuthenticationToken(),
SecureChannel.getCurrentDateTime(),
requestHandle,
0L,
OpcuaProtocolLogic.NULL_STRING,
this.revisedCycleTime * 10,
OpcuaProtocolLogic.NULL_EXTENSION_OBJECT);
//Make a copy of the outstanding requests, so it isn't modified while we are putting the ack list together.
List<ExtensionObjectDefinition> acks = (LinkedList<ExtensionObjectDefinition>) outstandingAcknowledgements.clone();
int ackLength = acks.size() == 0 ? -1 : acks.size();
outstandingAcknowledgements.removeAll(acks);
PublishRequest publishRequest = new PublishRequest(
requestHeader,
ackLength,
acks
);
ExpandedNodeId extExpandedNodeId = new ExpandedNodeId(false, //Namespace Uri Specified
false, //Server Index Specified
new NodeIdFourByte((short) 0, Integer.parseInt(publishRequest.getIdentifier())),
null,
null);
ExtensionObject extObject = new ExtensionObject(
extExpandedNodeId,
null,
publishRequest,
false);
try {
WriteBufferByteBased buffer = new WriteBufferByteBased(extObject.getLengthInBytes(), ByteOrder.LITTLE_ENDIAN);
extObject.serialize(buffer);
/* Create Consumer for the response message, error and timeout to be sent to the Secure Channel */
Consumer<byte[]> consumer = opcuaResponse -> {
PublishResponse responseMessage = null;
ServiceFault serviceFault = null;
try {
ExtensionObjectDefinition unknownExtensionObject = ExtensionObject.staticParse(new ReadBufferByteBased(opcuaResponse, ByteOrder.LITTLE_ENDIAN), false).getBody();
if (unknownExtensionObject instanceof PublishResponse) {
responseMessage = (PublishResponse) unknownExtensionObject;
} else {
serviceFault = (ServiceFault) unknownExtensionObject;
ResponseHeader header = (ResponseHeader) serviceFault.getResponseHeader();
LOGGER.debug("Subscription ServiceFault returned from server with error code, '{}', ignoring as it is probably just a result of a Delete Subscription Request", header.getServiceResult().toString());
//plcSubscriber.onDisconnect(context);
}
} catch (ParseException e) {
LOGGER.error("Unable to parse the returned Subscription response", e);
plcSubscriber.onDisconnect(context);
}
if (serviceFault == null) {
outstandingRequests.remove(((ResponseHeader) responseMessage.getResponseHeader()).getRequestHandle());
for (long availableSequenceNumber : responseMessage.getAvailableSequenceNumbers()) {
outstandingAcknowledgements.add(new SubscriptionAcknowledgement(this.subscriptionId, availableSequenceNumber));
}
for (ExtensionObject notificationMessage : ((NotificationMessage) responseMessage.getNotificationMessage()).getNotificationData()) {
ExtensionObjectDefinition notification = notificationMessage.getBody();
if (notification instanceof DataChangeNotification) {
LOGGER.trace("Found a Data Change notification");
List<ExtensionObjectDefinition> items = ((DataChangeNotification) notification).getMonitoredItems();
onSubscriptionValue(items.toArray(new MonitoredItemNotification[0]));
} else {
LOGGER.warn("Unsupported Notification type");
}
}
}
};
Consumer<TimeoutException> timeout = e -> {
LOGGER.error("Timeout while waiting for subscription response", e);
plcSubscriber.onDisconnect(context);
};
BiConsumer<OpcuaAPU, Throwable> error = (message, e) -> {
LOGGER.error("Error while waiting for subscription response", e);
plcSubscriber.onDisconnect(context);
};
outstandingRequests.add(requestHandle);
channel.submit(context, timeout, error, consumer, buffer);
} catch (SerializationException e) {
LOGGER.warn("Unable to serialize subscription request", e);
}
}
/* Put the subscriber loop to sleep for the rest of the cycle. */
sleep(this.revisedCycleTime);
}
//Wait for any outstanding responses to arrive, using the request timeout length
//sleep(this.revisedCycleTime * 10);
complete = true;
} catch (Exception e) {
LOGGER.error("Failed to start subscription", e);
}
return null;
});
}
/**
* Stop the subscriber either on disconnect or on error
*
* @return
*/
public void stopSubscriber() {
this.destroy.set(true);
long requestHandle = channel.getRequestHandle();
RequestHeader requestHeader = new RequestHeader(channel.getAuthenticationToken(),
SecureChannel.getCurrentDateTime(),
requestHandle,
0L,
OpcuaProtocolLogic.NULL_STRING,
this.revisedCycleTime * 10,
OpcuaProtocolLogic.NULL_EXTENSION_OBJECT);
List<Long> subscriptions = new ArrayList<>(1);
subscriptions.add(subscriptionId);
DeleteSubscriptionsRequest deleteSubscriptionrequest = new DeleteSubscriptionsRequest(requestHeader,
1,
subscriptions
);
ExpandedNodeId extExpandedNodeId = new ExpandedNodeId(false, //Namespace Uri Specified
false, //Server Index Specified
new NodeIdFourByte((short) 0, Integer.parseInt(deleteSubscriptionrequest.getIdentifier())),
null,
null);
ExtensionObject extObject = new ExtensionObject(
extExpandedNodeId,
null,
deleteSubscriptionrequest,
false);
try {
WriteBufferByteBased buffer = new WriteBufferByteBased(extObject.getLengthInBytes(), ByteOrder.LITTLE_ENDIAN);
extObject.serialize(buffer);
// Create Consumer for the response message, error and timeout to be sent to the Secure Channel
Consumer<byte[]> consumer = opcuaResponse -> {
DeleteSubscriptionsResponse responseMessage = null;
try {
ExtensionObjectDefinition unknownExtensionObject = ExtensionObject.staticParse(new ReadBufferByteBased(opcuaResponse, ByteOrder.LITTLE_ENDIAN), false).getBody();
if (unknownExtensionObject instanceof DeleteSubscriptionsResponse) {
responseMessage = (DeleteSubscriptionsResponse) unknownExtensionObject;
} else {
ServiceFault serviceFault = (ServiceFault) unknownExtensionObject;
ResponseHeader header = (ResponseHeader) serviceFault.getResponseHeader();
LOGGER.debug("Fault when deleting Subscription ServiceFault return from server with error code, '{}', ignoring as it is probably just a result of a Delete Subscription Request", header.getServiceResult().toString());
}
} catch (ParseException e) {
LOGGER.error("Unable to parse the returned Delete Subscriptions Response", e);
}
};
Consumer<TimeoutException> timeout = e -> {
LOGGER.error("Timeout while waiting for delete subscription response", e);
plcSubscriber.onDisconnect(context);
};
BiConsumer<OpcuaAPU, Throwable> error = (message, e) -> {
LOGGER.error("Error while waiting for delete subscription response", e);
plcSubscriber.onDisconnect(context);
};
channel.submit(context, timeout, error, consumer, buffer);
} catch (SerializationException e) {
LOGGER.warn("Unable to serialize subscription request", e);
}
sleep(500);
plcSubscriber.removeSubscription(subscriptionId);
}
/**
* Receive the returned values from the OPCUA server and format it so that it can be received by the PLC4X client.
*
* @param values - array of data values to be sent to the client.
*/
private void onSubscriptionValue(MonitoredItemNotification[] values) {
LinkedHashSet<String> fieldList = new LinkedHashSet<>();
List<DataValue> dataValues = new ArrayList<>(values.length);
for (MonitoredItemNotification value : values) {
fieldList.add(fieldNames.get((int) value.getClientHandle() - 1));
dataValues.add(value.getValue());
}
Map<String, ResponseItem<PlcValue>> fields = plcSubscriber.readResponse(fieldList, dataValues);
final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(), fields);
consumers.forEach(plcSubscriptionEventConsumer -> plcSubscriptionEventConsumer.accept(event));
}
/**
* Registers a new Consumer, this allows multiple PLC4X consumers to use the same subscription.
*
* @param consumer - Consumer to be used to send any returned values.
* @return PlcConsumerRegistration - return the important information back to the client.
*/
@Override
public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer) {
LOGGER.info("Registering a new OPCUA subscription consumer");
consumers.add(consumer);
return new DefaultPlcConsumerRegistration(plcSubscriber, consumer, this);
}
/**
* Given an PLC4X OpcuaField generate the OPC UA Node Id
*
* @param field - The PLC4X OpcuaField, this is the field generated from the OpcuaField class from the parsed field string.
* @return NodeId - Returns an OPC UA Node Id which can be sent over the wire.
*/
private NodeId generateNodeId(OpcuaField field) {
NodeId nodeId = null;
if (field.getIdentifierType() == OpcuaIdentifierType.BINARY_IDENTIFIER) {
nodeId = new NodeId(new NodeIdTwoByte(Short.parseShort(field.getIdentifier())));
} else if (field.getIdentifierType() == OpcuaIdentifierType.NUMBER_IDENTIFIER) {
nodeId = new NodeId(new NodeIdNumeric((short) field.getNamespace(), Long.parseLong(field.getIdentifier())));
} else if (field.getIdentifierType() == OpcuaIdentifierType.GUID_IDENTIFIER) {
UUID guid = UUID.fromString(field.getIdentifier());
byte[] guidBytes = new byte[16];
System.arraycopy(guid.getMostSignificantBits(), 0, guidBytes, 0, 8);
System.arraycopy(guid.getLeastSignificantBits(), 0, guidBytes, 8, 8);
nodeId = new NodeId(new NodeIdGuid((short) field.getNamespace(), guidBytes));
} else if (field.getIdentifierType() == OpcuaIdentifierType.STRING_IDENTIFIER) {
nodeId = new NodeId(new NodeIdString((short) field.getNamespace(), new PascalString(field.getIdentifier())));
}
return nodeId;
}
}
↑ V6103 Ignored InterruptedException could lead to delayed thread shutdown.
↑ V6046 Incorrect format. A different number of format items is expected. Arguments not used: 1.
↑ V6046 Incorrect format. A different number of format items is expected. Arguments not used: 1.
↑ V6046 Incorrect format. A different number of format items is expected. Arguments not used: 1.
↑ V6046 Incorrect format. A different number of format items is expected. Arguments not used: 1.
↑ V6046 Incorrect format. A different number of format items is expected. Arguments not used: 1.
↑ V6046 Incorrect format. A different number of format items is expected. Arguments not used: 1.
↑ V6067 Two or more case-branches perform the same actions.