/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.plc4x.nifi.record;
 
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
 
import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.nifi.util.Plc4xCommon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class Plc4xReadResponseRecordSet implements RecordSet, Closeable {
    private static final Logger logger = LoggerFactory.getLogger(Plc4xReadResponseRecordSet.class);
    private final PlcReadResponse readResponse;
    private final Set<String> rsColumnNames;
    private boolean moreRows;
 
    // TODO: review this AtomicReference?
	// TODO: this could be enhanced checking if record schema should be updated (via a cache boolean, checking property values is a nifi expression language, etc)
  	private AtomicReference<RecordSchema> recordSchema;
 
    public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse) throws IOException {
        this.readResponse = readResponse;
        moreRows = true;
        
        logger.debug("Creating record schema from PlcReadResponse");
        Map<String, ? extends PlcValue> responseDataStructure = readResponse.getAsPlcValue().getStruct();
        rsColumnNames = responseDataStructure.keySet();
               
        if (recordSchema == null) {
        	Schema avroSchema = Plc4xCommon.createSchema(responseDataStructure); //TODO: review this method as it is the 'mapping' from PlcValues to avro datatypes        	
        	recordSchema = new AtomicReference<RecordSchema>();
        	recordSchema.set(AvroTypeUtil.createSchema(avroSchema));
        }
        logger.debug("Record schema from PlcReadResponse successfuly created.");
 
    }
 
    
    @Override
    public RecordSchema getSchema() {
        return this.recordSchema.get();
    }
 
    // Protected methods for subclasses to access private member variables
    protected PlcReadResponse getReadResponse() {
        return readResponse;
    }
 
    protected boolean hasMoreRows() {
        return moreRows;
    }
 
    protected void setMoreRows(boolean moreRows) {
        this.moreRows = moreRows;
    }
 
    @Override
    public Record next() throws IOException {
        if (moreRows) {
             final Record record = createRecord(readResponse);
             setMoreRows(false);
             return record;
        } else {
             return null;
        }
    }
 
    @Override
    public void close() {
        //do nothing
    }
 
    protected Record createRecord(final PlcReadResponse readResponse) throws IOException{
        final Map<String, Object> values = new HashMap<>(getSchema().getFieldCount());
 
        logger.debug("creating record.");
 
        for (final RecordField field : getSchema().getFields()) {
            final String fieldName = field.getFieldName();
 
            final Object value;
            
            if (rsColumnNames.contains(fieldName)) {
            	value = normalizeValue(readResponse.getAsPlcValue().getValue(fieldName));
            } else {
                value = null;
            }
            
            logger.trace(String.format("Adding %s field value to record.", fieldName));
            values.put(fieldName, value);
        }
 
        //add timestamp field to schema
        values.put(Plc4xCommon.PLC4X_RECORD_TIMESTAMP_FIELD_NAME, System.currentTimeMillis());
        logger.debug("added timestamp field to record.");
 
        	
        return new MapRecord(getSchema(), values);
    }
 
    @SuppressWarnings("rawtypes")
    private Object normalizeValue(final PlcValue value) {
        Object r = Plc4xCommon.normalizeValue(value);
        if (r != null) {
            logger.trace("Value data type: " + r.getClass());
        }
        return r;
        
    }
 
 
}

V6090 Field 'recordSchema' is being used before it was initialized.