/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.utils.connectionpool2;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* Driver Manager who Caches ONE Single Connection.
* <p>
* Usage Example:
* <code>
* PlcDriverManager manager = new PlcDriverManager();
* PlcDriverManager cached = new CachedDriverManager(url, () -> manager.getConnection(url));
* </code>
* Now you can use "cached" everywhere you need the corresponding connection.
*/
public class CachedDriverManager extends PlcDriverManager implements CachedDriverManagerMBean {
private static final Logger logger = LoggerFactory.getLogger(CachedDriverManager.class);
// Constants
public static final int LONG_BORROW_WATCHDOG_TIMEOUT_MS = 5_000;
// JMX
private final AtomicInteger numberOfConnects = new AtomicInteger(0);
private final AtomicInteger numberOfBorrows = new AtomicInteger(0);
private final AtomicInteger numberOfRejections = new AtomicInteger(0);
private final AtomicInteger numberOfWatchdogs = new AtomicInteger(0);
// End of JMX
private final String url;
private final PlcConnectionFactory connectionFactory;
// Add Queue for Requests
private final Queue<CompletableFuture<PlcConnection>> queue = new LinkedList<>();
private final int timeoutMillis;
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private AtomicReference<ConnectionState> state = new AtomicReference<>(ConnectionState.DISCONNECTED);
private PlcConnection activeConnection;
private CachedPlcConnection borrowedConnection;
private ScheduledFuture<?> borrowWatchdog;
public CachedDriverManager(String url, PlcConnectionFactory connectionFactory) {
this(url, connectionFactory, 1000);
}
/**
* @param url Url that this connection is for
* @param connectionFactory Factory to create a suitable connection.
* @param timeoutMillis Time out in milliseconds
*/
public CachedDriverManager(String url, PlcConnectionFactory connectionFactory, int timeoutMillis) {
logger.info("Creating new cached Connection for url {} with timeout {} ms", url, timeoutMillis);
this.url = url;
this.connectionFactory = connectionFactory;
this.timeoutMillis = timeoutMillis;
// MBean
try {
ManagementFactory.getPlatformMBeanServer().registerMBean(this, new ObjectName("org.apache.plc4x.plc:name=cached-driver-manager,url=\"" + url + "\""));
} catch (Exception ignore) {
}
}
public synchronized void returnConnection(PlcConnection activeConnection) {
logger.debug("Borrowed Connection is closed and returned.");
// Stop Watchdog
cancelWatchdog();
if (state.get() == ConnectionState.DISCONNECTED) {
// Getting Disconnected Connection, nothing to do.
logger.trace("Connection already disconnected");
return;
}
if (state.get() != ConnectionState.BORROWED) {
logger.warn("Connection was returned, although it is not borrowed, currently.");
}
this.borrowedConnection = null;
setState(ConnectionState.AVAILABLE);
// Check the queue
checkQueue();
logger.trace("Connection successfully returned");
}
private void setState(ConnectionState available) {
logger.trace("Setting State from {} to {}", this.state.get(), available);
this.state.set(available);
}
/**
* used to signal, that the connection does not work and has to be re-established.
*/
public synchronized void handleBrokenConnection() {
logger.debug("Connection was detected as broken and is invalidated in Cached Manager");
// Stop Watchdog
cancelWatchdog();
if (state.get() != ConnectionState.BORROWED) {
// Log at least a WARN???
logger.warn("Broken Connection was returned, although it is not borrowed, currently.");
}
this.borrowedConnection = null;
try {
this.activeConnection.close();
} catch (Exception e) {
logger.debug("Unable to Close 'broken' Connection", e);
}
this.activeConnection = null;
setState(ConnectionState.DISCONNECTED);
}
public boolean isConnectionAvailable() {
return this.getState().equals(ConnectionState.AVAILABLE);
}
/**
* This call now waits (with the timeout given in constructor) until it fails
* or returns a valid connection in this window.
*
* @throws PlcConnectionException if connection cannot be established
*/
@Override
public PlcConnection getConnection(String url) throws PlcConnectionException {
if (!this.url.equals(url)) {
throw new IllegalArgumentException("This Cached Driver Manager only supports the Connection " + url);
}
CompletableFuture<PlcConnection> future;
synchronized (this) {
logger.trace("current queue size before check {}", queue.size());
if (queue.isEmpty() && isConnectionAvailable()) {
logger.trace("queue is empty and a connection is available");
return getConnection_(url);
} else {
logger.trace("Getting a connection and instantly close it");
// At least trigger a connection
try {
getConnection_(url).close();
} catch (Exception ignore) {
}
}
future = new CompletableFuture<>();
logger.trace("current queue size before add {}", queue.size());
queue.add(future);
}
try {
return future.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
handleBrokenConnection();
throw new PlcConnectionException("No Connection Available, timed out while waiting in queue.", e);
} catch (InterruptedException e) {
handleBrokenConnection();
Thread.currentThread().interrupt();
throw new PlcConnectionException("No Connection Available, interrupted while waiting in queue.", e);
} finally {
future.cancel(true);
}
}
/**
* Private Impl.
*/
private synchronized PlcConnection getConnection_(String url) throws PlcConnectionException {
logger.trace("Current State {}", this.state.get());
switch (state.get()) {
case AVAILABLE:
logger.debug("Connection was requested and is available, thus, returning Chached Connection for usage");
setState(ConnectionState.BORROWED);
this.numberOfBorrows.incrementAndGet();
this.borrowedConnection = new CachedPlcConnection(this, activeConnection);
// Set the Borrwed Counter
startWatchdog(this.borrowedConnection);
return this.borrowedConnection;
case DISCONNECTED:
logger.debug("Connection was requested but no connection is active, trying to establish a Connection");
// Initialize Connection
setState(ConnectionState.CONNECTING);
this.numberOfConnects.incrementAndGet();
// Start Connection in Background
CompletableFuture.runAsync(() -> {
logger.debug("Starting to establish Connection");
try {
PlcConnection connection = this.connectionFactory.create();
logger.debug("Connection successfully established");
synchronized (this) {
this.activeConnection = connection;
setState(ConnectionState.AVAILABLE);
// Now See if there is someone waiting in the line
checkQueue();
logger.trace("Inline queue check succeeded");
}
} catch (Exception e) {
logger.warn("Unable to establish connection to PLC {}", url, e);
setState(ConnectionState.DISCONNECTED);
}
});
this.numberOfRejections.incrementAndGet();
throw new PlcConnectionException("No Connection Available, Starting Connection");
case CONNECTING:
// We cannot give a Connection
logger.debug("Connection was requsted, but currently establishing one, so none available");
this.numberOfRejections.incrementAndGet();
throw new PlcConnectionException("No Connection Available, Currently Connecting");
case BORROWED:
// We cannot give a Connection
logger.debug("Connection was requsted, but Connection currently is borrowed, so none available");
this.numberOfRejections.incrementAndGet();
throw new PlcConnectionException("No Connection Available, its in Use");
}
throw new IllegalStateException();
}
/**
* Checks if someone is waiting in the line to get the connection.
*/
private synchronized void checkQueue() {
logger.debug("Connection is available, checking if someone is waiting in the queue...");
CompletableFuture<PlcConnection> next;
logger.trace("current queue size before check queue {}", queue.size());
while ((next = queue.poll()) != null) {
if (next.isCancelled()) {
logger.trace("Cleaning up already timed out connection...");
continue;
}
// Not timed out, give this connection
try {
next.complete(getConnection_(url));
return;
} catch (PlcConnectionException e) {
logger.debug("Got an Exception on fetching a connection", e);
}
}
logger.trace("check queue ended");
}
private void startWatchdog(CachedPlcConnection connection) {
borrowWatchdog = executorService.schedule(() -> {
// Just close the borrowed connection
logger.warn("Watchdog detected a long borrowed connection, will be forcefully closed!");
this.numberOfWatchdogs.incrementAndGet();
handleBrokenConnection();
try {
connection.close();
} catch (Exception e) {
logger.warn("Unable to close the borrowed Connection from Watchdog", e);
}
}, LONG_BORROW_WATCHDOG_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
private void cancelWatchdog() {
if (borrowWatchdog != null ) {
borrowWatchdog.cancel(false);
}
}
@Override
public PlcConnection getConnection(String url, PlcAuthentication authentication) throws PlcConnectionException {
throw new NotImplementedException("");
}
public ConnectionState getState() {
return state.get();
}
enum ConnectionState {
DISCONNECTED,
CONNECTING,
AVAILABLE,
BORROWED;
}
// JMX
@Override
public String getStateString() {
return getState().toString();
}
@Override
public int getNumberOfConnects() {
return numberOfConnects.get();
}
@Override
public int getNumberOfBorrows() {
return numberOfBorrows.get();
}
@Override
public int getNumberOfRejections() {
return this.numberOfRejections.get();
}
@Override
public int getNumberOfWachtdogs() {
return numberOfWatchdogs.get();
}
@Override
public int getQueueSize() {
return this.queue.size();
}
@Override
public synchronized void triggerReconnect() {
logger.info("Disconnecting current connection, was triggered from external via JMX");
handleBrokenConnection();
if (this.state.get() == ConnectionState.BORROWED) {
try {
borrowedConnection.close();
} catch (Exception e) {
logger.warn("Unable to close the borrowed Connection from JMX", e);
}
}
}
}
↑ V6070 Unsafe synchronization on 'this' instance in class 'CachedDriverManager'.
↑ V6102 Inconsistent synchronization of the 'queue' field. Consider synchronizing the field on all usages.
↑ V6070 Unsafe synchronization on 'this' instance in class 'CachedDriverManager'.
↑ V6046 Incorrect format. A different number of format items is expected. Arguments not used: 1.