/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.plc4x.java.transport.serial;
 
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.DefaultPromise;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
 
class SerialPollingSelector extends AbstractSelector {
 
    private static final Logger logger = LoggerFactory.getLogger(SerialPollingSelector.class);
 
    private final List<SelectionKey> registeredChannels;
    private final Set<SelectorEvent> events = ConcurrentHashMap.newKeySet();
 
    // Use a Netty Promise
    private final DefaultEventExecutor executor = new DefaultEventExecutor();
    private DefaultPromise<Void> selectPromise;
 
    public static class SelectorEvent {
 
        private final SelectionKey key;
 
        private final int event;
        public SelectorEvent(SelectionKey key, int event) {
            this.key = key;
            this.event = event;
        }
 
        public SelectionKey getKey() {
            return this.key;
        }
 
        public int getEvent() {
            return event;
        }
 
    }
    public SerialPollingSelector(SelectorProvider selectorProvider) {
        super(selectorProvider);
        registeredChannels = new ArrayList<>();
    }
 
    @Override
    public Set<SelectionKey> keys() {
        return new HashSet<>(registeredChannels);
    }
 
    /**
     * Returns all keys that are in the events queue
     * @return
     */
    @Override
    public Set<SelectionKey> selectedKeys() {
        return events.stream().map(SelectorEvent::getKey).collect(Collectors.toSet());
    }
 
    @Override
    public int selectNow() {
        // throw new NotImplementedException("");
        logger.debug("selectNow()");
        // check if one channel is active
        return events.size();
    }
 
    @Override
    public int select(long timeout) {
        if (!events.isEmpty()) {
            return events.size();
        }
        this.selectPromise = new DefaultPromise<>(executor);
        try {
            if(timeout == 0) {
                selectPromise.await();
            } else {
                selectPromise.await(timeout);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Was interrupted", e);
        }
        final int eventCount = events.size();
        logger.debug("returning from select with {} events", eventCount);
        return eventCount;
    }
 
    @Override
    public int select() {
        return select(0);
    }
 
    @Override
    public Selector wakeup() {
        logger.debug("being asked to wake up from select");
        // throw new NotImplementedException("Not implemented for this selector, should not be needed.");
        if (!selectPromise.isDone()) {
            selectPromise.setSuccess(null);
        }
        return this;
    }
 
    public void addEvent(SelectorEvent event) {
        logger.debug("Adding Event to Selector, canceling Promise...");
        this.events.add(event);
        // Add the OP to the SelectionKey
        ((SerialSelectionKey) event.key).addReadyOp(event.event);
        // Close the future so that the select is fired immediately
        if (!selectPromise.isDone()) {
            selectPromise.setSuccess(null);
        } else {
            logger.debug("Promise is already cancelled, skipping that.");
        }
    }
 
    public void removeEvent(SerialSelectionKey serialSelectionKey) {
        events.removeIf(event -> event.key.equals(serialSelectionKey));
    }
 
    @Override
    protected void implCloseSelector() {
        // TODO should we do something here?
    }
 
    @Override
    protected SelectionKey register(AbstractSelectableChannel ch, int ops, Object att) {
        logger.debug("Registering Channel for selector {} with operations {}", ch, ops);
        if (!(ch instanceof SerialSocketChannel)) {
            throw new IllegalArgumentException("Given channel has to be of type " + SerialSocketChannel.class);
        }
        final SerialSelectionKey key = new SerialSelectionKey(ch, this, ops);
        // Attach attr
        key.attach(att);
        synchronized (this) {
            // TODO is this always the case??
            final int index = registeredChannels.size();
            registeredChannels.add(key);
            key.setIndex(index);
        }
        return key;
    }
 
}

V6070 Unsafe synchronization on 'this' instance in class 'SerialPollingSelector'.