/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.opcua.protocol;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.opcua.OpcuaPlcDriverTest;
import org.apache.plc4x.test.DisableOnParallelsVmFlag;
import org.eclipse.milo.examples.server.ExampleServer;
import org.junit.jupiter.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
@DisableOnParallelsVmFlag
public class OpcuaSubscriptionHandleTest {
private static final Logger LOGGER = LoggerFactory.getLogger(OpcuaPlcDriverTest.class);
private static ExampleServer exampleServer;
// Address of local milo server
private static String miloLocalAddress = "127.0.0.1:12686/milo";
//Tcp pattern of OPC UA
private static String opcPattern = "opcua:tcp://";
private String paramSectionDivider = "?";
private String paramDivider = "&";
private static String tcpConnectionAddress = opcPattern + miloLocalAddress;
// Read only variables of milo example server of version 3.6
private static final String BOOL_IDENTIFIER_READ_WRITE = "ns=2;s=HelloWorld/ScalarTypes/Boolean";
private static final String BYTE_IDENTIFIER_READ_WRITE = "ns=2;s=HelloWorld/ScalarTypes/Byte";
private static final String DOUBLE_IDENTIFIER_READ_WRITE = "ns=2;s=HelloWorld/ScalarTypes/Double";
private static final String FLOAT_IDENTIFIER_READ_WRITE = "ns=2;s=HelloWorld/ScalarTypes/Float";
private static final String INT16_IDENTIFIER_READ_WRITE = "ns=2;s=HelloWorld/ScalarTypes/Int16";
private static final String INT32_IDENTIFIER_READ_WRITE = "ns=2;s=HelloWorld/ScalarTypes/Int32";
private static final String INT64_IDENTIFIER_READ_WRITE = "ns=2;s=HelloWorld/ScalarTypes/Int64";
private static final String INTEGER_IDENTIFIER_READ_WRITE = "ns=2;s=HelloWorld/ScalarTypes/Integer";
private static final String SBYTE_IDENTIFIER_READ_WRITE = "ns=2;s=HelloWorld/ScalarTypes/SByte";
private static final String STRING_IDENTIFIER_READ_WRITE = "ns=2;s=HelloWorld/ScalarTypes/String";
private static final String UINT16_IDENTIFIER_READ_WRITE = "ns=2;s=HelloWorld/ScalarTypes/UInt16";
private static final String UINT32_IDENTIFIER_READ_WRITE = "ns=2;s=HelloWorld/ScalarTypes/UInt32";
private static final String UINT64_IDENTIFIER_READ_WRITE = "ns=2;s=HelloWorld/ScalarTypes/UInt64";
private static final String UINTEGER_IDENTIFIER_READ_WRITE = "ns=2;s=HelloWorld/ScalarTypes/UInteger";
private static final String DOES_NOT_EXIST_IDENTIFIER_READ_WRITE = "ns=2;i=12512623";
private static PlcConnection opcuaConnection;
@BeforeEach
public void before() {
}
@AfterEach
public void after() {
}
@BeforeAll
public static void setup() {
try {
// When switching JDK versions from a newer to an older version,
// this can cause the server to not start correctly.
// Deleting the directory makes sure the key-store is initialized correctly.
Path securityBaseDir = Paths.get(System.getProperty("java.io.tmpdir"), "server", "security");
try {
Files.delete(securityBaseDir);
} catch (Exception e) {
// Ignore this ...
}
exampleServer = new ExampleServer();
exampleServer.startup().get();
//Connect
opcuaConnection = new PlcDriverManager().getConnection(tcpConnectionAddress);
assert opcuaConnection.isConnected();
} catch (Exception e) {
e.printStackTrace();
try {
exampleServer.shutdown().get();
} catch (Exception j) {
j.printStackTrace();
}
}
}
@AfterAll
public static void tearDown() {
try {
// Close Connection
opcuaConnection.close();
assert !opcuaConnection.isConnected();
exampleServer.shutdown().get();
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void subscribeBool() throws Exception {
String field = "Bool";
String identifier = BOOL_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field).equals(PlcResponseCode.OK);
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeByte() throws Exception {
String field = "Byte";
String identifier = BYTE_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field).equals(PlcResponseCode.OK);
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeDouble() throws Exception {
String field = "Double";
String identifier = DOUBLE_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field).equals(PlcResponseCode.OK);
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeFloat() throws Exception {
String field = "Float";
String identifier = FLOAT_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field).equals(PlcResponseCode.OK);
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeInt16() throws Exception {
String field = "Int16";
String identifier = INT16_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field).equals(PlcResponseCode.OK);
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeInt32() throws Exception {
String field = "Int32";
String identifier = INT32_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field).equals(PlcResponseCode.OK);
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeInt64() throws Exception {
String field = "Int64";
String identifier = INT64_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field).equals(PlcResponseCode.OK);
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeInteger() throws Exception {
String field = "Integer";
String identifier = INTEGER_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field).equals(PlcResponseCode.OK);
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeSByte() throws Exception {
String field = "SByte";
String identifier = SBYTE_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field).equals(PlcResponseCode.OK);
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeString() throws Exception {
String field = "String";
String identifier = STRING_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field).equals(PlcResponseCode.OK);
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeUInt16() throws Exception {
String field = "Uint16";
String identifier = UINT16_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field).equals(PlcResponseCode.OK);
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeUInt32() throws Exception {
String field = "UInt32";
String identifier = UINT32_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field).equals(PlcResponseCode.OK);
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeUInt64() throws Exception {
String field = "UInt64";
String identifier = UINT64_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field).equals(PlcResponseCode.OK);
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeUInteger() throws Exception {
String field = "UInteger";
String identifier = UINTEGER_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field).equals(PlcResponseCode.OK);
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeDoesNotExists() throws Exception {
String field = "DoesNotExists";
String identifier = DOES_NOT_EXIST_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} test", field);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field, identifier);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
//This should never be called,
assert false;
LOGGER.info("Received a response from {} test {}", field, plcSubscriptionEvent.getPlcValue(field).toString());
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
@Test
public void subscribeMultiple() throws Exception {
String field1 = "UInteger";
String identifier1 = UINTEGER_IDENTIFIER_READ_WRITE;
String field2 = "Integer";
String identifier2 = INTEGER_IDENTIFIER_READ_WRITE;
LOGGER.info("Starting subscription {} and {} test", field1, field2);
// Create Subscription
PlcSubscriptionRequest.Builder builder = opcuaConnection.subscriptionRequestBuilder();
builder.addChangeOfStateField(field1, identifier1);
builder.addChangeOfStateField(field2, identifier2);
PlcSubscriptionRequest request = builder.build();
// Get result of creating subscription
PlcSubscriptionResponse response = request.execute().get();
final OpcuaSubscriptionHandle subscriptionHandle = (OpcuaSubscriptionHandle) response.getSubscriptionHandle(field1);
// Create handler for returned value
subscriptionHandle.register(plcSubscriptionEvent -> {
assert plcSubscriptionEvent.getResponseCode(field1).equals(PlcResponseCode.OK);
assert plcSubscriptionEvent.getResponseCode(field2).equals(PlcResponseCode.OK);
});
//Wait for value to be returned from server
Thread.sleep(1200);
subscriptionHandle.stopSubscriber();
}
}
↑ V6019 Unreachable code detected. It is possible that an error is present.