/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.spi.optimizer;
import io.vavr.control.Either;
import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
import org.apache.plc4x.java.spi.context.DriverContext;
import org.apache.plc4x.java.spi.messages.DefaultPlcReadResponse;
import org.apache.plc4x.java.spi.messages.DefaultPlcWriteResponse;
import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
public abstract class BaseOptimizer {
protected List<PlcRequest> processReadRequest(PlcReadRequest readRequest, DriverContext driverContext) {
return Collections.singletonList(readRequest);
}
protected PlcReadResponse processReadResponses(PlcReadRequest readRequest, Map<PlcRequest, Either<PlcResponse, Exception>> readResponses) {
Map<String, ResponseItem<PlcValue>> fields = new HashMap<>();
for (Map.Entry<PlcRequest, Either<PlcResponse, Exception>> requestsEntries : readResponses.entrySet()) {
PlcReadRequest curRequest = (PlcReadRequest) requestsEntries.getKey();
Either<PlcResponse, Exception> readResponse = requestsEntries.getValue();
for (String fieldName : curRequest.getFieldNames()) {
if (readResponse.isLeft()) {
PlcReadResponse subReadResponse = (PlcReadResponse) readResponse.getLeft();
PlcResponseCode responseCode = subReadResponse.getResponseCode(fieldName);
PlcValue value = (responseCode == PlcResponseCode.OK) ?
subReadResponse.getAsPlcValue().getValue(fieldName) : null;
fields.put(fieldName, new ResponseItem<>(responseCode, value));
} else {
fields.put(fieldName, new ResponseItem<>(PlcResponseCode.INTERNAL_ERROR, null));
}
}
}
return new DefaultPlcReadResponse(readRequest, fields);
}
protected List<PlcRequest> processWriteRequest(PlcWriteRequest writeRequest, DriverContext driverContext) {
return Collections.singletonList(writeRequest);
}
protected PlcWriteResponse processWriteResponses(PlcWriteRequest writeRequest,
Map<PlcRequest, Either<PlcResponse, Exception>> writeResponses) {
Map<String, PlcResponseCode> fields = new HashMap<>();
for (Map.Entry<PlcRequest, Either<PlcResponse, Exception>> requestsEntries : writeResponses.entrySet()) {
PlcWriteRequest subWriteRequest = (PlcWriteRequest) requestsEntries.getKey();
Either<PlcResponse, Exception> writeResponse = requestsEntries.getValue();
for (String fieldName : subWriteRequest.getFieldNames()) {
if (writeResponse.isLeft()) {
PlcWriteResponse subWriteResponse = (PlcWriteResponse) writeResponse.getLeft();
fields.put(fieldName, subWriteResponse.getResponseCode(fieldName));
} else {
fields.put(fieldName, PlcResponseCode.INTERNAL_ERROR);
}
}
}
return new DefaultPlcWriteResponse(writeRequest, fields);
}
protected List<PlcRequest> processSubscriptionRequest(PlcSubscriptionRequest subscriptionRequest,
DriverContext driverContext) {
return Collections.singletonList(subscriptionRequest);
}
protected PlcSubscriptionResponse processSubscriptionResponses(PlcSubscriptionRequest subscriptionRequest,
Map<PlcRequest, Either<PlcResponse, Exception>> subscriptionResponses) {
// TODO: Implement
return null;
}
protected List<PlcRequest> processUnsubscriptionRequest(PlcRequest unsubscriptionRequest,
DriverContext driverContext) {
return Collections.singletonList(unsubscriptionRequest);
}
protected PlcUnsubscriptionResponse processUnsubscriptionResponses(PlcRequest unsubscriptionRequest,
Map<PlcRequest, Either<PlcResponse, Exception>> unsubscriptionResponses) {
// TODO: Implement
return null;
}
public CompletableFuture<PlcReadResponse> optimizedRead(PlcReadRequest readRequest, Plc4xProtocolBase reader) {
List<PlcRequest> subRequests = processReadRequest(readRequest, reader.getDriverContext());
return send(readRequest, subRequests, request -> reader.read((PlcReadRequest) request),
response -> processReadResponses(readRequest, response));
}
public CompletableFuture<PlcWriteResponse> optimizedWrite(PlcWriteRequest writeRequest, Plc4xProtocolBase writer) {
List<PlcRequest> subRequests = processWriteRequest(writeRequest, writer.getDriverContext());
return send(writeRequest, subRequests, request -> writer.write((PlcWriteRequest) request),
response -> processWriteResponses(writeRequest, response));
}
public CompletableFuture<PlcSubscriptionResponse> optimizedSubscribe(
PlcSubscriptionRequest subscriptionRequest, Plc4xProtocolBase subscriber) {
List<PlcRequest> subRequests = processSubscriptionRequest(subscriptionRequest, subscriber.getDriverContext());
return send(subscriptionRequest, subRequests, request -> subscriber.subscribe((PlcSubscriptionRequest) request),
response -> processSubscriptionResponses(subscriptionRequest, response));
}
public CompletableFuture<PlcUnsubscriptionResponse> optmizedUnsubscribe(
PlcUnsubscriptionRequest unsubscriptionRequest, Plc4xProtocolBase subscriber) {
List<PlcRequest> subRequests = processUnsubscriptionRequest(unsubscriptionRequest, subscriber.getDriverContext());
return send(unsubscriptionRequest, subRequests, request -> subscriber.unsubscribe((PlcUnsubscriptionRequest) request),
response -> processUnsubscriptionResponses(unsubscriptionRequest, response));
}
private CompletableFuture send(PlcRequest originalRequest,
List<? extends PlcRequest> requests,
Function<PlcRequest, CompletableFuture<PlcResponse>> sender,
Function<Map<PlcRequest, Either<PlcResponse, Exception>>, PlcResponse> responseProcessor) {
// If this send has only one sub-request and this matches the original one, don't do any special handling
// and just forward the request to the normal sending method.
if((requests.size() == 1) && (requests.get(0) == originalRequest)) {
return sender.apply(requests.get(0));
}
// If at least one sub request is requested, split up each field request into a separate sub-request
// And have the reader process each one independently. After the last sub-request is finished,
// Merge the results back together.
else if (!requests.isEmpty()) {
// Create a new future which will be used to return the aggregated response back to the application.
CompletableFuture<PlcResponse> parentFuture = new CompletableFuture<>();
// Create one sub-request for every single field and store the futures in a map.
Map<PlcRequest, CompletableFuture<PlcResponse>> subFutures = new HashMap<>();
for (PlcRequest subRequest : requests) {
subFutures.put(subRequest, sender.apply(subRequest));
}
// As soon as all sub-futures are done, merge the individual responses back to one big response.
CompletableFuture.allOf(subFutures.values().toArray(new CompletableFuture[0])).handle((aVoid, t) -> {
if (t != null) {
parentFuture.completeExceptionally(t);
}
Map<PlcRequest, Either<PlcResponse, Exception>> results = new HashMap<>();
for (Map.Entry<PlcRequest, CompletableFuture<PlcResponse>> subFutureEntry : subFutures.entrySet()) {
PlcRequest subRequest = subFutureEntry.getKey();
CompletableFuture<PlcResponse> subFuture = subFutureEntry.getValue();
try {
final PlcResponse subResponse = subFuture.get();
results.put(subRequest, Either.left(subResponse));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
results.put(subRequest, Either.right(new Exception("Something went wrong")));
} catch (Exception e) {
results.put(subRequest, Either.right(new Exception("Something went wrong")));
}
}
PlcResponse response = responseProcessor.apply(results);
parentFuture.complete(response);
return Void.TYPE;
}).exceptionally(throwable -> {
// TODO: If would be cool if we could still process all of the successful ones ...
parentFuture.completeExceptionally(throwable);
return null;
});
return parentFuture;
} else {
return CompletableFuture.completedFuture(responseProcessor.apply(Collections.emptyMap()));
}
}
}
↑ V061 The exception during analysis. See logs: F:/git-java/plc4x/.PVS-Studio/logs.