/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.nifi.record;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.nifi.util.Plc4xCommon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Plc4xReadResponseRecordSet implements RecordSet, Closeable {
private static final Logger logger = LoggerFactory.getLogger(Plc4xReadResponseRecordSet.class);
private final PlcReadResponse readResponse;
private final Set<String> rsColumnNames;
private boolean moreRows;
// TODO: review this AtomicReference?
// TODO: this could be enhanced checking if record schema should be updated (via a cache boolean, checking property values is a nifi expression language, etc)
private AtomicReference<RecordSchema> recordSchema;
public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse) throws IOException {
this.readResponse = readResponse;
moreRows = true;
logger.debug("Creating record schema from PlcReadResponse");
Map<String, ? extends PlcValue> responseDataStructure = readResponse.getAsPlcValue().getStruct();
rsColumnNames = responseDataStructure.keySet();
if (recordSchema == null) {
Schema avroSchema = Plc4xCommon.createSchema(responseDataStructure); //TODO: review this method as it is the 'mapping' from PlcValues to avro datatypes
recordSchema = new AtomicReference<RecordSchema>();
recordSchema.set(AvroTypeUtil.createSchema(avroSchema));
}
logger.debug("Record schema from PlcReadResponse successfuly created.");
}
@Override
public RecordSchema getSchema() {
return this.recordSchema.get();
}
// Protected methods for subclasses to access private member variables
protected PlcReadResponse getReadResponse() {
return readResponse;
}
protected boolean hasMoreRows() {
return moreRows;
}
protected void setMoreRows(boolean moreRows) {
this.moreRows = moreRows;
}
@Override
public Record next() throws IOException {
if (moreRows) {
final Record record = createRecord(readResponse);
setMoreRows(false);
return record;
} else {
return null;
}
}
@Override
public void close() {
//do nothing
}
protected Record createRecord(final PlcReadResponse readResponse) throws IOException{
final Map<String, Object> values = new HashMap<>(getSchema().getFieldCount());
logger.debug("creating record.");
for (final RecordField field : getSchema().getFields()) {
final String fieldName = field.getFieldName();
final Object value;
if (rsColumnNames.contains(fieldName)) {
value = normalizeValue(readResponse.getAsPlcValue().getValue(fieldName));
} else {
value = null;
}
logger.trace(String.format("Adding %s field value to record.", fieldName));
values.put(fieldName, value);
}
//add timestamp field to schema
values.put(Plc4xCommon.PLC4X_RECORD_TIMESTAMP_FIELD_NAME, System.currentTimeMillis());
logger.debug("added timestamp field to record.");
return new MapRecord(getSchema(), values);
}
@SuppressWarnings("rawtypes")
private Object normalizeValue(final PlcValue value) {
Object r = Plc4xCommon.normalizeValue(value);
if (r != null) {
logger.trace("Value data type: " + r.getClass());
}
return r;
}
}
↑ V6090 Field 'recordSchema' is being used before it was initialized.