/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.plc4x.java.canopen.protocol;
 
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.types.PlcSubscriptionType;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.can.adapter.Plc4xCANProtocolBase;
import org.apache.plc4x.java.canopen.transport.CANOpenAbortException;
import org.apache.plc4x.java.canopen.readwrite.CANOpenFrame;
import org.apache.plc4x.java.canopen.api.conversation.canopen.CANConversation;
import org.apache.plc4x.java.canopen.api.conversation.canopen.SDODownloadConversation;
import org.apache.plc4x.java.canopen.api.conversation.canopen.SDOUploadConversation;
import org.apache.plc4x.java.canopen.configuration.CANOpenConfiguration;
import org.apache.plc4x.java.canopen.context.CANOpenDriverContext;
import org.apache.plc4x.java.canopen.field.CANOpenField;
import org.apache.plc4x.java.canopen.field.CANOpenHeartbeatField;
import org.apache.plc4x.java.canopen.field.CANOpenNMTField;
import org.apache.plc4x.java.canopen.field.CANOpenPDOField;
import org.apache.plc4x.java.canopen.field.CANOpenSDOField;
import org.apache.plc4x.java.canopen.conversation.CANTransportConversation;
import org.apache.plc4x.java.canopen.readwrite.CANOpenHeartbeatPayload;
import org.apache.plc4x.java.canopen.readwrite.CANOpenNetworkPayload;
import org.apache.plc4x.java.canopen.readwrite.CANOpenPDO;
import org.apache.plc4x.java.canopen.readwrite.CANOpenPDOPayload;
import org.apache.plc4x.java.canopen.readwrite.CANOpenPayload;
import org.apache.plc4x.java.canopen.readwrite.IndexAddress;
import org.apache.plc4x.java.canopen.readwrite.DataItem;
import org.apache.plc4x.java.canopen.readwrite.CANOpenService;
import org.apache.plc4x.java.canopen.readwrite.NMTState;
import org.apache.plc4x.java.canopen.readwrite.NMTStateRequest;
import org.apache.plc4x.java.spi.ConversationContext;
import org.apache.plc4x.java.spi.configuration.HasConfiguration;
import org.apache.plc4x.java.spi.context.DriverContext;
import org.apache.plc4x.java.spi.generation.*;
import org.apache.plc4x.java.spi.messages.DefaultPlcReadResponse;
import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionRequest;
import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionResponse;
import org.apache.plc4x.java.spi.messages.DefaultPlcUnsubscriptionResponse;
import org.apache.plc4x.java.spi.messages.DefaultPlcWriteRequest;
import org.apache.plc4x.java.spi.messages.DefaultPlcWriteResponse;
import org.apache.plc4x.java.spi.messages.PlcSubscriber;
import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionField;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
import org.apache.plc4x.java.spi.values.PlcLINT;
import org.apache.plc4x.java.spi.values.PlcNull;
import org.apache.plc4x.java.spi.values.PlcStruct;
import org.apache.plc4x.java.spi.values.PlcUSINT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
 
public class CANOpenProtocolLogic extends Plc4xCANProtocolBase<CANOpenFrame>
    implements HasConfiguration<CANOpenConfiguration>, PlcSubscriber {
 
    private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(10L);
    private Logger logger = LoggerFactory.getLogger(CANOpenProtocolLogic.class);
 
    private CANOpenConfiguration configuration;
    private RequestTransactionManager tm;
    private Timer heartbeat;
    private CANOpenDriverContext canContext;
    private CANConversation conversation;
 
    private Map<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> consumers = new ConcurrentHashMap<>();
 
    @Override
    public void setConfiguration(CANOpenConfiguration configuration) {
        this.configuration = configuration;
    }
 
    @Override
    public void setDriverContext(DriverContext driverContext) {
        super.setDriverContext(driverContext);
        this.canContext = (CANOpenDriverContext) driverContext;
 
        // Initialize Transaction Manager.
        // Until the number of concurrent requests is successfully negotiated we set it to a
        // maximum of only one request being able to be sent at a time. During the login process
        // No concurrent requests can be sent anyway. It will be updated when receiving the
        // S7ParameterSetupCommunication response.
        this.tm = new RequestTransactionManager(1);
    }
 
    @Override
    public void onConnect(ConversationContext<CANOpenFrame> context) {
        try {
            if (configuration.isHeartbeat()) {
                context.sendToWire(createFrame(new CANOpenHeartbeatPayload(NMTState.BOOTED_UP)));
 
                this.heartbeat = new Timer("can-heartbeat");
                this.heartbeat.scheduleAtFixedRate(new TimerTask() {
                    @Override
                    public void run() {
                        try {
                            context.sendToWire(createFrame(new CANOpenHeartbeatPayload(NMTState.OPERATIONAL)));
                        } catch (ParseException e) {
                            throw new PlcRuntimeException(e);
                        }
                    }
                }, 10000, 10000);
            }
            context.fireConnected();
        } catch (ParseException e) {
            throw new PlcRuntimeException(e);
        }
    }
 
    @Override
    public void setContext(ConversationContext<CANOpenFrame> context) {
        super.setContext(context);
        this.conversation = new CANTransportConversation(configuration.getNodeId(), context, configuration.getRequestTimeout());
    }
 
    private CANOpenFrame createFrame(CANOpenHeartbeatPayload state) throws ParseException {
        return new CANOpenFrame((short) configuration.getNodeId(), CANOpenService.HEARTBEAT, state);
    }
 
    public CompletableFuture<PlcWriteResponse> write(PlcWriteRequest writeRequest) {
        CompletableFuture<PlcWriteResponse> response = new CompletableFuture<>();
        if (writeRequest.getFieldNames().size() != 1) {
            response.completeExceptionally(new IllegalArgumentException("You can write only one field at the time"));
            return response;
        }
 
        PlcField field = writeRequest.getFields().get(0);
        if (!(field instanceof CANOpenField)) {
            response.completeExceptionally(new IllegalArgumentException("Only CANOpenField instances are supported"));
            return response;
        }
 
        if (field instanceof CANOpenSDOField) {
            writeInternally((DefaultPlcWriteRequest) writeRequest, (CANOpenSDOField) field, response);
            return response;
        }
        if (field instanceof CANOpenPDOField) {
            writeInternally((DefaultPlcWriteRequest) writeRequest, (CANOpenPDOField) field, response);
            return response;
        }
 
        response.completeExceptionally(new IllegalArgumentException("Only CANOpenSDOField instances are supported"));
        return response;
    }
 
    private void writeInternally(DefaultPlcWriteRequest writeRequest, CANOpenSDOField field, CompletableFuture<PlcWriteResponse> response) {
        final RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
 
        String fieldName = writeRequest.getFieldNames().iterator().next();
 
        CompletableFuture<PlcResponseCode> callback = new CompletableFuture<>();
        callback.whenComplete((code, error) -> {
            if (error != null) {
                if (error instanceof CANOpenAbortException) {
                    response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.REMOTE_ERROR)));
                } else {
                    response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.INTERNAL_ERROR)));
                }
                transaction.endRequest();
                return;
            }
            response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, code)));
            transaction.endRequest();
        });
 
        PlcValue writeValue = writeRequest.getPlcValues().get(0);
        SDODownloadConversation download = new SDODownloadConversation(conversation, field.getNodeId(), field.getAnswerNodeId(),
            new IndexAddress(field.getIndex(), field.getSubIndex()), writeValue, field.getCanOpenDataType());
        transaction.submit(() -> download.execute(callback));
    }
 
    private void writeInternally(DefaultPlcWriteRequest writeRequest, CANOpenPDOField field, CompletableFuture<PlcWriteResponse> response) {
        PlcValue writeValue = writeRequest.getPlcValues().get(0);
 
        try {
            String fieldName = writeRequest.getFieldNames().iterator().next();
 
            WriteBufferByteBased writeBuffer = new WriteBufferByteBased(DataItem.getLengthInBytes(writeValue, field.getCanOpenDataType(), writeValue.getLength()), ByteOrder.LITTLE_ENDIAN);
            DataItem.staticSerialize(writeBuffer, writeValue, field.getCanOpenDataType(), writeValue.getLength(), ByteOrder.LITTLE_ENDIAN);
            final CANOpenPDOPayload payload = new CANOpenPDOPayload(new CANOpenPDO(writeBuffer.getData()));
            context.sendToWire(new CANOpenFrame((short) field.getNodeId(), field.getService(), payload));
            response.complete(new DefaultPlcWriteResponse(writeRequest, Collections.singletonMap(fieldName, PlcResponseCode.OK)));
        } catch (Exception e) {
            response.completeExceptionally(e);
        }
    }
 
    public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
        CompletableFuture<PlcReadResponse> response = new CompletableFuture<>();
        if (readRequest.getFieldNames().size() != 1) {
            response.completeExceptionally(new IllegalArgumentException("SDO requires single field to be read"));
            return response;
        }
 
        PlcField field = readRequest.getFields().get(0);
        if (!(field instanceof CANOpenField)) {
            response.completeExceptionally(new IllegalArgumentException("Only CANOpenField instances are supported"));
            return response;
        }
 
        if (!(field instanceof CANOpenSDOField)) {
            response.completeExceptionally(new IllegalArgumentException("Only CANOpenSDOField instances are supported"));
            return response;
        }
 
        readInternally(readRequest, (CANOpenSDOField) field, response);
        return response;
    }
 
    @Override
    public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest request) {
        DefaultPlcSubscriptionRequest rq = (DefaultPlcSubscriptionRequest) request;
 
        Map<String, ResponseItem<PlcSubscriptionHandle>> answers = new LinkedHashMap<>();
        DefaultPlcSubscriptionResponse response = new DefaultPlcSubscriptionResponse(rq, answers);
 
        for (String key : rq.getFieldNames()) {
            DefaultPlcSubscriptionField subscription = (DefaultPlcSubscriptionField) rq.getField(key);
            if (subscription.getPlcSubscriptionType() != PlcSubscriptionType.EVENT) {
                answers.put(key, new ResponseItem<>(PlcResponseCode.UNSUPPORTED, null));
            } else if ((subscription.getPlcField() instanceof CANOpenPDOField)) {
                answers.put(key, new ResponseItem<>(PlcResponseCode.OK,
                    new CANOpenSubscriptionHandle(this, key, (CANOpenPDOField) subscription.getPlcField())
                ));
            } else if ((subscription.getPlcField() instanceof CANOpenNMTField)) {
                answers.put(key, new ResponseItem<>(PlcResponseCode.OK,
                    new CANOpenSubscriptionHandle(this, key, (CANOpenNMTField) subscription.getPlcField())
                ));
            } else if ((subscription.getPlcField() instanceof CANOpenHeartbeatField)) {
                answers.put(key, new ResponseItem<>(PlcResponseCode.OK,
                    new CANOpenSubscriptionHandle(this, key, (CANOpenHeartbeatField) subscription.getPlcField())
                ));
            } else {
                answers.put(key, new ResponseItem<>(PlcResponseCode.INVALID_ADDRESS, null));
            }
        }
 
        return CompletableFuture.completedFuture(response);
    }
 
    @Override
    public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest request) {
        List<PlcSubscriptionHandle> handles = request.getSubscriptionHandles();
 
        for (Map.Entry<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> entry : consumers.entrySet()) {
            entry.getKey().getSubscriptionHandles().removeAll(handles);
        }
 
        return CompletableFuture.completedFuture(new DefaultPlcUnsubscriptionResponse(request));
    }
 
    private void readInternally(PlcReadRequest readRequest, CANOpenSDOField field, CompletableFuture<PlcReadResponse> response) {
        String fieldName = readRequest.getFieldNames().iterator().next();
 
        final RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
        CompletableFuture<PlcValue> callback = new CompletableFuture<>();
        callback.whenComplete((value, error) -> {
            if (error != null) {
                Map<String, ResponseItem<PlcValue>> fields = new HashMap<>();
                if (error instanceof CANOpenAbortException) {
                    fields.put(fieldName, new ResponseItem<>(PlcResponseCode.REMOTE_ERROR, new PlcLINT(((CANOpenAbortException) error).getAbortCode())));
                } else {
                    fields.put(fieldName, new ResponseItem<>(PlcResponseCode.REMOTE_ERROR, null));
                }
                response.complete(new DefaultPlcReadResponse(readRequest, fields));
                transaction.endRequest();
 
                return;
            }
 
            Map<String, ResponseItem<PlcValue>> fields = new HashMap<>();
            fields.put(fieldName, new ResponseItem<>(PlcResponseCode.OK, value));
            response.complete(new DefaultPlcReadResponse(readRequest, fields));
            transaction.endRequest();
        });
 
        SDOUploadConversation upload = new SDOUploadConversation(conversation, field.getNodeId(), field.getAnswerNodeId(), new IndexAddress(field.getIndex(), field.getSubIndex()), field.getCanOpenDataType());
        transaction.submit(() -> upload.execute(callback));
    }
 
    @Override
    public void decode(ConversationContext<CANOpenFrame> context, CANOpenFrame msg) throws Exception {
        int nodeId = msg.getNodeId();
        CANOpenService service = msg.getService();
        CANOpenPayload payload = msg.getPayload();
 
        if (service != null && nodeId != this.configuration.getNodeId()) {
            if (service.getPdo() && payload instanceof CANOpenPDOPayload) {
                publishEvent(service, nodeId, payload);
            } else if (service == CANOpenService.HEARTBEAT && payload instanceof CANOpenHeartbeatPayload) {
                publishEvent(service, nodeId, payload);
            } else {
                logger.debug("Decoded CANOpen {} from {}, message {}", service, nodeId, payload);
            }
        }
 
//        int identifier = msg.getIdentifier();
//        CANOpenService service = CANOpenService.valueOf((byte) (identifier >> 7));
//        if (service != null) {
//            ReadBuffer buffer = new ReadBuffer(msg.getData());
//            CANOpenPayload payload = CANOpenPayloadIO.staticParse(buffer, service);
//
//
//        }
    }
 
    private void publishEvent(CANOpenService service, int nodeId, CANOpenPayload payload) {
        DefaultPlcSubscriptionHandle dispatchedHandle = null;
        for (Map.Entry<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> entry : consumers.entrySet()) {
            DefaultPlcConsumerRegistration registration = entry.getKey();
            Consumer<PlcSubscriptionEvent> consumer = entry.getValue();
 
            for (PlcSubscriptionHandle handler : registration.getSubscriptionHandles()) {
                CANOpenSubscriptionHandle handle = (CANOpenSubscriptionHandle) handler;
                if (payload instanceof CANOpenPDOPayload) {
 
                    if (handle.matches(service, nodeId)) {
                        logger.trace("Dispatching notification {} for node {} to {}", service, nodeId, handle);
                        dispatchedHandle = handle;
 
                        CANOpenPDOField field = (CANOpenPDOField) handle.getField();
                        byte[] data = ((CANOpenPDOPayload) payload).getPdo().getData();
                        try {
                            PlcValue value = DataItem.staticParse(new ReadBufferByteBased(data, ByteOrder.LITTLE_ENDIAN), field.getCanOpenDataType(), data.length);
                            DefaultPlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(
                                Instant.now(),
                                Collections.singletonMap(
                                    handle.getName(),
                                    new ResponseItem<>(PlcResponseCode.OK, value)
                                )
                            );
                            consumer.accept(event);
                        } catch (ParseException e) {
                            logger.warn("Could not parse data to desired type: {}", field.getCanOpenDataType(), e);
                            DefaultPlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(
                                Instant.now(),
                                Collections.singletonMap(
                                    handle.getName(),
                                    new ResponseItem<>(PlcResponseCode.INVALID_DATA, new PlcNull())
                                )
                            );
                            consumer.accept(event);
                        }
                    }
                } else if (payload instanceof CANOpenHeartbeatPayload) {
                    if (handle.matches(service, nodeId)) {
                        logger.trace("Dispatching notification {} for node {} to {}", service, nodeId, handle);
                        dispatchedHandle = handle;
 
                        final NMTState state = ((CANOpenHeartbeatPayload) payload).getState();
                        Map<String, PlcValue> fields = new HashMap<>();
                        fields.put("state", new PlcUSINT(state.getValue()));
                        fields.put("node", new PlcUSINT(nodeId));
                        PlcStruct struct = new PlcStruct(fields);
                        DefaultPlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(
                            Instant.now(),
                            Collections.singletonMap(
                                handle.getName(),
                                new ResponseItem<>(PlcResponseCode.OK, struct)
                            )
                        );
                        consumer.accept(event);
                    }
                } else if (payload instanceof CANOpenNetworkPayload) {
                    if (handle.matches(service, nodeId)) {
                        logger.trace("Dispatching notification {} for node {} to {}", service, nodeId, handle);
                        dispatchedHandle = handle;
 
                        final NMTStateRequest state = ((CANOpenNetworkPayload) payload).getRequest();
                        Map<String, PlcValue> fields = new HashMap<>();
                        fields.put("state", new PlcUSINT(state.getValue()));
                        fields.put("node", new PlcUSINT(nodeId));
                        PlcStruct struct = new PlcStruct(fields);
                        DefaultPlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(
                            Instant.now(),
                            Collections.singletonMap(
                                handle.getName(),
                                new ResponseItem<>(PlcResponseCode.OK, struct)
                            )
                        );
                        consumer.accept(event);
                    }
                }
            }
        }
 
        if (dispatchedHandle == null) {
            logger.trace("Could not find subscription matching {} and node {}", service, nodeId);
        }
    }
 
    @Override
    public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
        final DefaultPlcConsumerRegistration consumerRegistration = new DefaultPlcConsumerRegistration(this, consumer, handles.toArray(new DefaultPlcSubscriptionHandle[0]));
        consumers.put(consumerRegistration, consumer);
        return consumerRegistration;
    }
 
    @Override
    public void unregister(PlcConsumerRegistration registration) {
        consumers.remove(registration);
    }
 
    @Override
    public void close(ConversationContext<CANOpenFrame> context) {
 
    }
 
    @Override
    public void onDisconnect(ConversationContext<CANOpenFrame> context) {
        if (this.heartbeat != null) {
            this.heartbeat.cancel();
            this.heartbeat = null;
        }
    }
 
}

V6046 Incorrect format. A different number of format items is expected. Arguments not used: 2.