/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.plc4x.java.examples.helloinflux;
 
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.builder.fluent.Configurations;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.value.*;
import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
import org.apache.plc4x.java.spi.values.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.io.File;
import java.util.Map;
import java.util.concurrent.TimeUnit;
 
public class HelloInflux {
 
    private static final Logger logger = LoggerFactory.getLogger(HelloInflux.class);
 
    private Configuration configuration;
 
    public HelloInflux(File configFile) {
        Configurations configs = new Configurations();
        try {
            configuration = configs.properties(configFile);
        } catch (ConfigurationException cex) {
            throw new RuntimeException("Error reading configuration");
        }
    }
 
    public void run() {
        InfluxDBClient dbConnection = connectToDb();
        WriteApi writeApi = dbConnection.getWriteApi();
        try {
            PlcConnection plcConnection = connectToPlc();
 
            final PlcSubscriptionRequest subscriptionRequest =
                plcConnection.subscriptionRequestBuilder().addChangeOfStateField("query",
                    configuration.getString("plc.query")).build();
            final PlcSubscriptionResponse subscriptionResponse =
                subscriptionRequest.execute().get(10, TimeUnit.SECONDS);
            subscriptionResponse.getSubscriptionHandle("query").register(plcSubscriptionEvent -> {
                DefaultPlcSubscriptionEvent internalEvent = (DefaultPlcSubscriptionEvent) plcSubscriptionEvent;
                final Point point = Point.measurement(configuration.getString("influx.measurement"))
                    .time(plcSubscriptionEvent.getTimestamp().toEpochMilli(), WritePrecision.MS);
                final Map<String, ResponseItem<PlcValue>> values = internalEvent.getValues();
                values.forEach((fieldName, fieldResponsePair) -> {
                    final PlcResponseCode responseCode = fieldResponsePair.getCode();
                    final PlcValue plcValue = fieldResponsePair.getValue();
                    if(responseCode == PlcResponseCode.OK) {
                        PlcStruct structValue = (PlcStruct) plcValue;
                        for (String key : structValue.getKeys()) {
                            PlcValue subValue = structValue.getValue(key);
                            registerFields(point, key, subValue);
                        }
                    }
                });
                writeApi.writePoint(
                    configuration.getString("influx.bucket"), configuration.getString("influx.org"), point);
            });
        } catch (PlcException e) {
            logger.error("PLC Error", e);
        } catch (Exception e) {
            logger.error("General Error", e);
        }
    }
 
    private void registerFields(Point point, String contextName, PlcValue plcValue) {
        if (contextName.equals("address")) {
            point.addTag(contextName, plcValue.getString());
        } else {
            if (plcValue instanceof PlcBOOL) {
                point.addField(contextName, plcValue.getBoolean());
            } else if (plcValue instanceof PlcSINT) {
                point.addField(contextName, plcValue.getByte());
            } else if (plcValue instanceof PlcUSINT) {
                point.addField(contextName, plcValue.getShort());
            } else if (plcValue instanceof PlcINT) {
                point.addField(contextName, plcValue.getShort());
            } else if (plcValue instanceof PlcUINT) {
                point.addField(contextName, plcValue.getInteger());
            } else if (plcValue instanceof PlcDINT) {
                point.addField(contextName, plcValue.getInteger());
            } else if (plcValue instanceof PlcUDINT) {
                point.addField(contextName, plcValue.getLong());
            } else if (plcValue instanceof PlcLINT) {
                point.addField(contextName, plcValue.getLong());
            } else if (plcValue instanceof PlcULINT) {
                point.addField(contextName, plcValue.getBigInteger());
            } else if (plcValue instanceof PlcREAL) {
                point.addField(contextName, plcValue.getFloat());
            } else if (plcValue instanceof PlcLREAL) {
                point.addField(contextName, plcValue.getDouble());
            } else if (plcValue instanceof PlcSTRING) {
                point.addField(contextName, plcValue.getString());
            } else if (plcValue instanceof PlcStruct) {
                PlcStruct structValue = (PlcStruct) plcValue;
                for (String key : structValue.getKeys()) {
                    PlcValue subValue = structValue.getValue(key);
                    registerFields(point, contextName + "-" + key, subValue);
                }
            }
        }
    }
 
    private InfluxDBClient connectToDb() {
        char[] token = configuration.getString("influx.accessToken").toCharArray();
        return InfluxDBClientFactory.create(configuration.getString("influx.connectionString"), token);
    }
 
    private PlcConnection connectToPlc() throws PlcException {
        final PlcConnection connection =
            new PlcDriverManager().getConnection(configuration.getString("plc.connectionString"));
        connection.connect();
        return connection;
    }
 
    public static void main(String[] args) {
        if(args.length != 1) {
            System.out.println("Usage: HelloInflux {path-to-config-file}");
        }
        final File configFile = new File(args[0]);
        if(!configFile.exists() || !configFile.isFile()) {
            throw new PlcRuntimeException("Could not read config file");
        }
        new HelloInflux(configFile).run();
    }
 
}

V6046 Incorrect format. A different number of format items is expected. Arguments not used: 1.