/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.kafka;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.utils.connectionpool2.PooledDriverManager;
import org.apache.plc4x.kafka.config.Constants;
import org.apache.plc4x.kafka.util.VersionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* Source Connector Task polling the data source at a given rate.
* A timer thread is scheduled which sets the fetch flag to true every rate milliseconds.
* When poll() is invoked, the calling thread waits until the fetch flag is set for WAIT_LIMIT_MILLIS.
* If the flag does not become true, the method returns null, otherwise a fetch is performed.
*/
public class Plc4xSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(Plc4xSinkTask.class);
/*
* Config of the task.
*/
static final String CONNECTION_NAME_CONFIG = "connection-name";
private static final String CONNECTION_NAME_STRING_DOC = "Connection Name";
static final String PLC4X_CONNECTION_STRING_CONFIG = "connectionString";
private static final String PLC4X_CONNECTION_STRING_DOC = "PLC4X Connection String";
static final String PLC4X_TOPIC_CONFIG = "topic";
private static final String PLC4X_TOPIC_DOC = "Task Topic";
private static final String PLC4X_RETRIES_CONFIG = "retries";
private static final String PLC4X_RETRIES_DOC = "Number of times to retry after failed write";
private static final String PLC4X_TIMEOUT_CONFIG = "timeout";
private static final String PLC4X_TIMEOUT_DOC = "Time between retries";
// Syntax for the queries: {field-alias}#{field-address}:{field-alias}#{field-address}...,{topic}:{rate}:....
static final String QUERIES_CONFIG = "queries";
private static final String QUERIES_DOC = "Fields to be sent to the PLC";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(CONNECTION_NAME_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
CONNECTION_NAME_STRING_DOC)
.define(PLC4X_CONNECTION_STRING_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
PLC4X_CONNECTION_STRING_DOC)
.define(PLC4X_TOPIC_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
PLC4X_TOPIC_DOC)
.define(PLC4X_RETRIES_CONFIG,
ConfigDef.Type.INT,
ConfigDef.Importance.HIGH,
PLC4X_RETRIES_DOC)
.define(PLC4X_TIMEOUT_CONFIG,
ConfigDef.Type.INT,
ConfigDef.Importance.HIGH,
PLC4X_TIMEOUT_DOC)
.define(QUERIES_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
QUERIES_DOC);
/*
* Configuration of the output.
*/
private static final String SINK_NAME_FIELD = "sink-name";
private static final String SINK_TOPIC_FIELD = "topic";
@Override
public String version() {
return VersionUtil.getVersion();
}
private PlcDriverManager driverManager;
private Transformation<SinkRecord> transformation;
private String plc4xConnectionString;
private String plc4xTopic;
private Integer plc4xRetries;
private Integer plc4xTimeout;
private Integer remainingRetries;
private AbstractConfig config;
private Map<String, String> fields;
@Override
public void start(Map<String, String> props) {
config = new AbstractConfig(CONFIG_DEF, props);
String connectionName = config.getString(CONNECTION_NAME_CONFIG);
plc4xConnectionString = config.getString(PLC4X_CONNECTION_STRING_CONFIG);
plc4xTopic = config.getString(PLC4X_TOPIC_CONFIG);
plc4xRetries = config.getInt(PLC4X_RETRIES_CONFIG);
remainingRetries = plc4xRetries;
plc4xTimeout = config.getInt(PLC4X_TIMEOUT_CONFIG);
String queries = config.getString(QUERIES_CONFIG);
fields = new HashMap<>();
String[] fieldsConfigSegments = queries.split("\\|");
for(int i = 0; i < fieldsConfigSegments.length; i++) {
String[] fieldSegments = fieldsConfigSegments[i].split("#");
if(fieldSegments.length != 2) {
log.warn(String.format("Error in field configuration. " +
"The field segment expects a format {field-alias}#{field-address}, but got '%s'",
fieldsConfigSegments[i]));
continue;
}
String fieldAlias = fieldSegments[0];
String fieldAddress = fieldSegments[1];
fields.put(fieldAlias, fieldAddress);
}
log.info("Creating Pooled PLC4x driver manager");
driverManager = new PooledDriverManager();
}
@Override
public void stop() {
synchronized (this) {
notifyAll(); // wake up thread waiting in awaitFetch
}
}
@Override
public void put(Collection<SinkRecord> records) {
if (records.isEmpty()) {
return;
}
PlcConnection connection = null;
try {
connection = driverManager.getConnection(plc4xConnectionString);
} catch (PlcConnectionException e) {
log.warn("Failed to Open Connection {}", plc4xConnectionString);
remainingRetries--;
if (remainingRetries > 0) {
if (context != null) {
context.timeout(plc4xTimeout);
}
throw new RetriableException("Failed to Write to " + plc4xConnectionString + " retrying records that haven't expired");
}
log.warn("Failed to write after {} retries", plc4xRetries);
return;
}
PlcWriteRequest writeRequest;
final PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
int validCount = 0;
for (SinkRecord r: records) {
Struct record = (Struct) r.value();
String topic = r.topic();
Struct plcFields = record.getStruct(Constants.FIELDS_CONFIG);
Schema plcFieldsSchema = plcFields.schema();
for (Field plcField : plcFieldsSchema.fields()) {
String field = plcField.name();
Object value = plcFields.get(field);
if (value != null) {
Long timestamp = record.getInt64("timestamp");
Long expiresOffset = record.getInt64("expires");
Long expires = 0L;
if (expiresOffset != null) {
expires = expiresOffset + timestamp;
}
//Discard records we are not or no longer interested in.
if (!topic.equals(plc4xTopic) || plc4xTopic.equals("")) {
log.debug("Ignoring write request received on wrong topic");
} else if (!fields.containsKey(field)) {
log.warn("Unable to find address for field " + field);
} else if ((System.currentTimeMillis() > expires) & !(expires == 0)) {
log.warn("Write request has expired {} - {}, discarding {}", expires, System.currentTimeMillis(), field);
} else {
String address = fields.get(field);
try {
//If an array value is passed instead of a single value then convert to a String array
if (value instanceof String) {
String sValue = (String) value;
if ((sValue.charAt(0) == '[') && (sValue.charAt(sValue.length() - 1) == ']')) {
String[] values = sValue.substring(1,sValue.length() - 1).split(",");
builder.addItem(address, address, values);
} else {
builder.addItem(address, address, value);
}
} else {
builder.addItem(address, address, value);
}
validCount += 1;
} catch (Exception e) {
//When building a request we want to discard the write if there is an error.
log.warn("Invalid Address format for protocol {}", address);
}
}
}
}
}
if (validCount > 0) {
try {
writeRequest = builder.build();
writeRequest.execute().get();
log.debug("Wrote records to {}", plc4xConnectionString);
} catch (Exception e) {
remainingRetries--;
if (remainingRetries > 0) {
if (context != null) {
context.timeout(plc4xTimeout);
}
try {
connection.close();
} catch (Exception f) {
log.warn("Failed to Close {} on RetriableException", plc4xConnectionString);
}
throw new RetriableException("Failed to Write to " + plc4xConnectionString + " retrying records that haven't expired");
}
log.warn("Failed to write after {} retries", plc4xRetries);
}
}
try {
connection.close();
} catch (Exception e) {
log.warn("Failed to Close {}", plc4xConnectionString);
}
remainingRetries = plc4xRetries;
return;
}
}
↑ V6021 Variable 'connectionName' is not used.
↑ V6070 Unsafe synchronization on 'this' instance in class 'Plc4xSinkTask'.