/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.spi.transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
* This is a limited Queue of Requests, a Protocol can use.
* <p>
* The Following Steps are necessary
* <ul>
* <li>Register Slot</li>
* <li>Pass Runnable</li>
* <li>On Request or Response unregister Slot</li>
* </ul>
*/
public class RequestTransactionManager {
private static final Logger logger = LoggerFactory.getLogger(RequestTransactionManager.class);
/** Executor that performs all operations */
static final ExecutorService executor = Executors.newFixedThreadPool(4);
private final Set<RequestTransaction> runningRequests;
/** How many Transactions are allowed to run at the same time? */
private int numberOfConcurrentRequests;
/** Assigns each request a Unique Transaction Id, especially important for failure handling */
private AtomicInteger transactionId = new AtomicInteger(0);
/** Important, this is a FIFO Queue for Fairness! */
private Queue<RequestTransaction> workLog = new ConcurrentLinkedQueue<>();
public RequestTransactionManager(int numberOfConcurrentRequests) {
this.numberOfConcurrentRequests = numberOfConcurrentRequests;
// Immutable Map
runningRequests = ConcurrentHashMap.newKeySet();
}
public RequestTransactionManager() {
this(1);
}
public int getNumberOfConcurrentRequests() {
return numberOfConcurrentRequests;
}
public void setNumberOfConcurrentRequests(int numberOfConcurrentRequests) {
// If we reduced the number of concurrent requests and more requests are in-flight
// than should be, at least log a warning.
if(numberOfConcurrentRequests < runningRequests.size()) {
logger.warn("The number of concurrent requests was reduced and currently more requests are in flight.");
}
this.numberOfConcurrentRequests = numberOfConcurrentRequests;
// As we might have increased the number, try to send some more requests.
processWorklog();
}
public void submit(Consumer<RequestTransaction> context) {
RequestTransaction transaction = startRequest();
context.accept(transaction);
// this.submit(transaction);
}
void submit(RequestTransaction handle) {
assert handle.operation != null;
// Add this Request with this handle i the Worklog
// Put Transaction into Worklog
this.workLog.add(handle);
// Try to Process the Worklog
processWorklog();
}
private void processWorklog() {
while (runningRequests.size() < getNumberOfConcurrentRequests() && !workLog.isEmpty()) {
RequestTransaction next = workLog.remove();
this.runningRequests.add(next);
Future<?> completionFuture = executor.submit(next.operation);
next.setCompletionFuture(completionFuture);
}
}
public RequestTransaction startRequest() {
return new RequestTransaction(this, this.transactionId.getAndIncrement());
}
public int getNumberOfActiveRequests() {
return this.runningRequests.size();
}
private void failRequest(RequestTransaction transaction) {
// Try to fail it!
transaction.getCompletionFuture().cancel(true);
// End it
endRequest(transaction);
}
private void endRequest(RequestTransaction transaction) {
if (!this.runningRequests.contains(transaction)) {
throw new IllegalArgumentException("Unknown Transaction or Transaction already finished!");
}
this.runningRequests.remove(transaction);
// Process the worklog, a slot should be free now
processWorklog();
}
public static class RequestTransaction {
private final RequestTransactionManager parent;
private final int transactionId;
/** The iniital operation to perform to kick off the request */
private Runnable operation;
private Future<?> completionFuture;
public RequestTransaction(RequestTransactionManager parent, int transactionId) {
this.parent = parent;
this.transactionId = transactionId;
}
public void start() {
// TODO: what is this start method used for?
}
public void failRequest(Throwable t) {
this.parent.failRequest(this);
}
public void endRequest() {
// Remove it from Running Requests
this.parent.endRequest(this);
}
public void setOperation(Runnable operation) {
this.operation = operation;
}
public Future<?> getCompletionFuture() {
return completionFuture;
}
public void setCompletionFuture(Future<?> completionFuture) {
this.completionFuture = completionFuture;
}
public void submit(Runnable operation) {
logger.trace("Submission of transaction {}", transactionId);
this.setOperation(new TransactionOperation(transactionId, operation));
this.parent.submit(this);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RequestTransaction that = (RequestTransaction) o;
return transactionId == that.transactionId;
}
@Override
public int hashCode() {
return Objects.hash(transactionId);
}
}
static class TransactionOperation implements Runnable {
private final int transactionId;
private final Runnable delegate;
public TransactionOperation(int transactionId, Runnable delegate) {
this.transactionId = transactionId;
this.delegate = delegate;
}
@Override
public void run() {
try (final MDC.MDCCloseable closeable = MDC.putCloseable("plc4x.transactionId", Integer.toString(transactionId))) {
logger.trace("Start execution of transaction {}", transactionId);
delegate.run();
logger.trace("Completed execution of transaction {}", transactionId);
}
}
}
}
↑ V6021 Variable 'closeable' is not used.