/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.utils.pcapreplay.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.oio.OioByteStreamChannel;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.plc4x.java.utils.pcap.netty.exception.PcapException;
import org.apache.plc4x.java.utils.pcapreplay.netty.address.PcapReplayAddress;
import org.apache.plc4x.java.utils.pcapreplay.netty.config.PcapReplayChannelConfig;
import org.pcap4j.core.*;
import org.pcap4j.packet.Packet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.sql.Timestamp;
import java.util.concurrent.TimeUnit;
public class PcapReplayChannel extends OioByteStreamChannel {
private static final Logger logger = LoggerFactory.getLogger(PcapReplayChannel.class);
private final PcapReplayChannelConfig config;
private PcapReplayAddress remoteRawSocketAddress;
private SocketAddress localAddress;
private PcapHandle handle;
private Thread loopThread;
public PcapReplayChannel() {
super(null);
config = new PcapReplayChannelConfig(this);
}
@Override
protected boolean isInputShutdown() {
return false;
}
@Override
protected ChannelFuture shutdownInput() {
throw new NotImplementedException("");
}
@Override
protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
this.localAddress = localAddress;
if (!(remoteAddress instanceof PcapReplayAddress)) {
logger.error("Expecting remote address of type PcapSocketAddress");
pipeline().fireExceptionCaught(new PcapException("Expecting remote address of type PcapSocketAddress"));
return;
}
remoteRawSocketAddress = (PcapReplayAddress) remoteAddress;
// Get a handle to the network-device and open it.
File pcapFile = remoteRawSocketAddress.getPcapFile();
if (!pcapFile.exists()) {
logger.error("Couldn't find PCAP capture file at: {}", pcapFile.getAbsolutePath());
PcapException exception = new PcapException(String.format("Couldn't find PCAP capture file at: %s", pcapFile.getAbsolutePath()));
pipeline().fireExceptionCaught(exception);
return;
}
logger.debug("Opening PCAP capture file at: {}", pcapFile.getAbsolutePath());
handle = Pcaps.openOffline(remoteRawSocketAddress.getPcapFile().getAbsolutePath(),
PcapHandle.TimestampPrecision.NANO);
// If the address allows fine tuning which packets to process, set a filter to reduce the load.
String filter = config.getFilter();
if (filter.length() > 0) {
handle.setFilter(filter, BpfProgram.BpfCompileMode.OPTIMIZE);
}
// Create a buffer where the raw socket worker can send data to.
ByteBuf buffer = Unpooled.buffer();
// Start a thread that processes the callbacks from the raw socket and simply
// forwards the bytes read to the buffer.
loopThread = new Thread(() -> {
try {
handle.loop(-1, new PacketListener() {
private Timestamp lastPacketTime = null;
@Override
public void gotPacket(Packet packet) {
Timestamp curPacketTime = handle.getTimestamp();
// Only enable the throttling if it is not disabled.
if (config.getSpeedFactor() != PcapReplayChannelConfig.SPEED_FAST_FULL) {
// If last-time is not null, wait for the given number of nano-seconds.
if (lastPacketTime != null) {
int numMicrosecondsSleep = (int)
((curPacketTime.getNanos() - lastPacketTime.getNanos()) / config.getSpeedFactor());
nanoSecondSleep(numMicrosecondsSleep);
}
}
// Send the bytes to the netty pipeline.
byte[] data = config.getPacketHandler().getData(packet);
if (data != null) {
buffer.writeBytes(data);
}
// Remember the timestamp of the current packet.
lastPacketTime = curPacketTime;
}
});
} catch (PcapNativeException | NotOpenException e) {
logger.error("PCAP loop thread died!", e);
pipeline().fireExceptionCaught(e);
} catch (InterruptedException e) {
logger.warn("PCAP loop thread was interrupted (hopefully intentionally)", e);
Thread.currentThread().interrupt();
}
});
loopThread.start();
// Right now we're using an output stream that simply discards everything.
// This is ok while implementing passive drivers for protocols, however as
// soon as we start implementing ethernet layer protocols, we'll have to also
// be able to actually send data. The PcapInputStream simply acts as a
// breaking point if no packets are coming in and the read operation would
// simply block indefinitely.
activate(new PcapInputStream(buffer), new DiscardingOutputStream());
}
@Override
protected SocketAddress localAddress0() {
return localAddress;
}
@Override
protected SocketAddress remoteAddress0() {
return remoteRawSocketAddress;
}
@Override
protected void doBind(SocketAddress localAddress) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
protected void doDisconnect() {
this.loopThread.interrupt();
if (this.handle != null) {
this.handle.close();
}
}
@Override
protected int doReadBytes(ByteBuf buf) throws Exception {
if (handle == null || !handle.isOpen()) {
return -1;
}
try {
return super.doReadBytes(buf);
} catch (SocketTimeoutException ignored) {
return 0;
}
}
@Override
public ChannelConfig config() {
return this.config;
}
@Override
public boolean isOpen() {
return true;
}
@Override
protected AbstractUnsafe newUnsafe() {
return new RawSocketUnsafe();
}
private void nanoSecondSleep(long numNanos) {
try {
TimeUnit.NANOSECONDS.sleep(numNanos);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* This output stream simply discards anything it should send.
*/
private static class DiscardingOutputStream extends OutputStream {
@Override
public void write(int b) throws IOException {
// discard
logger.debug("Discarding {}", b);
}
@Override
public void write(byte[] b, int off, int len) {
logger.debug("Discarding {}", b);
}
}
/**
* InputStream that fulfills the contract of Netty for read operations to timeout.
* Without this the InputStream would simply block indefinitely which would block
* the entire IO module.
*/
private static class PcapInputStream extends InputStream {
final ByteBuf buf;
private PcapInputStream(ByteBuf buf) {
this.buf = buf;
}
@Override
public int available() {
return buf.readableBytes();
}
@Override
public int read() throws IOException {
// Timeout 10 ms
final long timeout = System.nanoTime() + 10_000;
// Yes, this could make the thread go nuts in case of no data,
// but the Netty guys are doing it the same way and there probably
// is a reason for it ;-)
while (System.nanoTime() < timeout) {
if (buf.readableBytes() > 0) {
return buf.readByte() & 0xFF;
}
}
throw new SocketTimeoutException();
}
}
/**
* Internal helper to wrap access to unsafe operations (Only used internally by netty)
*/
private class RawSocketUnsafe extends AbstractUnsafe {
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
try {
doConnect(remoteAddress, localAddress);
pipeline().fireChannelActive();
promise.setSuccess();
} catch (Exception e) {
promise.setFailure(e);
}
}
}
}
↑ V6096 An odd precise comparison. Consider using a comparison with defined precision: Math.abs(A - B) > Epsilon.