diff --git a/logback-core/src/main/java/ch/qos/logback/core/net/AbstractSocketAppender.java b/logback-core/src/main/java/ch/qos/logback/core/net/AbstractSocketAppender.java index 319fce0854fe270305671208235fc8b0fe24ca10..2940f473f16c7e37342068869fb99f45cebc0fec 100755 --- a/logback-core/src/main/java/ch/qos/logback/core/net/AbstractSocketAppender.java +++ b/logback-core/src/main/java/ch/qos/logback/core/net/AbstractSocketAppender.java @@ -15,25 +15,20 @@ package ch.qos.logback.core.net; import java.io.IOException; -import java.io.ObjectOutputStream; import java.io.Serializable; import java.net.ConnectException; import java.net.InetAddress; import java.net.Socket; import java.net.UnknownHostException; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; - import javax.net.SocketFactory; import ch.qos.logback.core.AppenderBase; -import ch.qos.logback.core.CoreConstants; import ch.qos.logback.core.spi.PreSerializationTransformer; import ch.qos.logback.core.util.CloseUtil; import ch.qos.logback.core.util.Duration; @@ -45,10 +40,11 @@ import ch.qos.logback.core.util.Duration; * @author Ceki Gülcü * @author Sébastien Pennec * @author Carl Harris + * @author Sebastian Gröbler */ public abstract class AbstractSocketAppender extends AppenderBase - implements Runnable, SocketConnector.ExceptionHandler { + implements SocketConnector.ExceptionHandler { /** * The default port number of remote logging server (4560). @@ -61,7 +57,7 @@ public abstract class AbstractSocketAppender extends AppenderBase public static final int DEFAULT_RECONNECTION_DELAY = 30000; /** - * Default size of the queue used to hold logging events that are destined + * Default size of the deque used to hold logging events that are destined * for the remote peer. */ public static final int DEFAULT_QUEUE_SIZE = 128; @@ -78,6 +74,9 @@ public abstract class AbstractSocketAppender extends AppenderBase */ private static final int DEFAULT_EVENT_DELAY_TIMEOUT = 100; + private final ObjectWriterFactory objectWriterFactory; + private final QueueFactory queueFactory; + private String remoteHost; private int port = DEFAULT_PORT; private InetAddress address; @@ -86,7 +85,7 @@ public abstract class AbstractSocketAppender extends AppenderBase private int acceptConnectionTimeout = DEFAULT_ACCEPT_CONNECTION_DELAY; private Duration eventDelayLimit = new Duration(DEFAULT_EVENT_DELAY_TIMEOUT); - private BlockingQueue queue; + private BlockingDeque deque; private String peerId; private Future task; private Future connectorTask; @@ -97,8 +96,17 @@ public abstract class AbstractSocketAppender extends AppenderBase * Constructs a new appender. */ protected AbstractSocketAppender() { + this(new QueueFactory(), new ObjectWriterFactory()); } + /** + * Constructs a new appender using the given {@link QueueFactory} and {@link ObjectWriterFactory}. + */ + AbstractSocketAppender(QueueFactory queueFactory, ObjectWriterFactory objectWriterFactory) { + this.objectWriterFactory = objectWriterFactory; + this.queueFactory = queueFactory; + } + /** * {@inheritDoc} */ @@ -119,9 +127,13 @@ public abstract class AbstractSocketAppender extends AppenderBase + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_host"); } + if (queueSize == 0) { + addWarn("Queue size of zero is deprecated, use a size of one to indicate synchronous processing"); + } + if (queueSize < 0) { errorCount++; - addError("Queue size must be non-negative"); + addError("Queue size must be greater than zero"); } if (errorCount == 0) { @@ -134,9 +146,14 @@ public abstract class AbstractSocketAppender extends AppenderBase } if (errorCount == 0) { - queue = newBlockingQueue(queueSize); + deque = queueFactory.newLinkedBlockingDeque(queueSize); peerId = "remote peer " + remoteHost + ":" + port + ": "; - task = getContext().getExecutorService().submit(this); + task = getContext().getExecutorService().submit(new Runnable() { + @Override + public void run() { + connectSocketAndDispatchEvents(); + } + }); super.start(); } } @@ -162,21 +179,16 @@ public abstract class AbstractSocketAppender extends AppenderBase if (event == null || !isStarted()) return; try { - final boolean inserted = queue.offer(event, eventDelayLimit.getMilliseconds(), TimeUnit.MILLISECONDS); + final boolean inserted = deque.offer(event, eventDelayLimit.getMilliseconds(), TimeUnit.MILLISECONDS); if (!inserted) { - addInfo("Dropping event due to timeout limit of [" + eventDelayLimit + - "] milliseconds being exceeded"); + addInfo("Dropping event due to timeout limit of [" + eventDelayLimit + "] being exceeded"); } } catch (InterruptedException e) { addError("Interrupted while appending event to SocketAppender", e); } } - /** - * {@inheritDoc} - */ - public final void run() { - signalEntryInRunMethod(); + private void connectSocketAndDispatchEvents() { try { while (!Thread.currentThread().isInterrupted()) { SocketConnector connector = createConnector(address, port, 0, @@ -186,22 +198,37 @@ public abstract class AbstractSocketAppender extends AppenderBase if(connectorTask == null) break; - socket = waitForConnectorToReturnASocket(); + socket = waitForConnectorToReturnSocket(); if(socket == null) break; - dispatchEvents(); + + try { + ObjectWriter objectWriter = createObjectWriterForSocket(); + addInfo(peerId + "connection established"); + dispatchEvents(objectWriter); + } catch (IOException ex) { + addInfo(peerId + "connection failed: " + ex); + } finally { + CloseUtil.closeQuietly(socket); + socket = null; + addInfo(peerId + "connection closed"); + } } } catch (InterruptedException ex) { assert true; // ok... we'll exit now } + // TODO I guess the appender should also be stopped at this point addInfo("shutting down"); } - protected void signalEntryInRunMethod() { - // do nothing by default - } + private ObjectWriter createObjectWriterForSocket() throws IOException { + socket.setSoTimeout(acceptConnectionTimeout); + ObjectWriter objectWriter = objectWriterFactory.newAutoFlushingObjectWriter(socket.getOutputStream()); + socket.setSoTimeout(0); + return objectWriter; + } - private SocketConnector createConnector(InetAddress address, int port, + private SocketConnector createConnector(InetAddress address, int port, int initialDelay, long retryDelay) { SocketConnector connector = newConnector(address, port, initialDelay, retryDelay); @@ -218,9 +245,9 @@ public abstract class AbstractSocketAppender extends AppenderBase } } - private Socket waitForConnectorToReturnASocket() throws InterruptedException { + private Socket waitForConnectorToReturnSocket() throws InterruptedException { try { - Socket s = connectorTask.get(); + Socket s = connectorTask.get(); connectorTask = null; return s; } catch (ExecutionException e) { @@ -228,35 +255,27 @@ public abstract class AbstractSocketAppender extends AppenderBase } } - private void dispatchEvents() throws InterruptedException { - try { - socket.setSoTimeout(acceptConnectionTimeout); - ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream()); - socket.setSoTimeout(0); - addInfo(peerId + "connection established"); - int counter = 0; - while (true) { - E event = queue.take(); - postProcessEvent(event); - Serializable serEvent = getPST().transform(event); - oos.writeObject(serEvent); - oos.flush(); - if (++counter >= CoreConstants.OOS_RESET_FREQUENCY) { - // Failing to reset the object output stream every now and - // then creates a serious memory leak. - oos.reset(); - counter = 0; - } + private void dispatchEvents(ObjectWriter objectWriter) throws InterruptedException, IOException { + while (true) { + E event = deque.takeFirst(); + postProcessEvent(event); + Serializable serializableEvent = getPST().transform(event); + try { + objectWriter.write(serializableEvent); + } catch (IOException e) { + tryReAddingEventToFrontOfQueue(event); + throw e; } - } catch (IOException ex) { - addInfo(peerId + "connection failed: " + ex); - } finally { - CloseUtil.closeQuietly(socket); - socket = null; - addInfo(peerId + "connection closed"); } - } - + } + + private void tryReAddingEventToFrontOfQueue(E event) { + final boolean wasInserted = deque.offerFirst(event); + if (!wasInserted) { + addInfo("Dropping event due to socket connection error and maxed out deque capacity"); + } + } + /** * {@inheritDoc} */ @@ -270,8 +289,6 @@ public abstract class AbstractSocketAppender extends AppenderBase } } - - /** * Creates a new {@link SocketConnector}. *

@@ -299,24 +316,6 @@ public abstract class AbstractSocketAppender extends AppenderBase return SocketFactory.getDefault(); } - /** - * Creates a blocking queue that will be used to hold logging events until - * they can be delivered to the remote receiver. - *

- * The default implementation creates a (bounded) {@link ArrayBlockingQueue} - * for positive queue sizes. Otherwise it creates a {@link SynchronousQueue}. - *

- * This method is exposed primarily to support instrumentation for unit - * testing. - * - * @param queueSize size of the queue - * @return - */ - BlockingQueue newBlockingQueue(int queueSize) { - return queueSize <= 0 ? - new SynchronousQueue() : new ArrayBlockingQueue(queueSize); - } - /** * Post-processes an event before it is serialized for delivery to the * remote receiver. @@ -332,20 +331,6 @@ public abstract class AbstractSocketAppender extends AppenderBase */ protected abstract PreSerializationTransformer getPST(); - /* - * This method is used by logback modules only in the now deprecated - * convenience constructors for SocketAppender - */ - @Deprecated - protected static InetAddress getAddressByName(String host) { - try { - return InetAddress.getByName(host); - } catch (Exception e) { - // addError("Could not find address of [" + host + "].", e); - return null; - } - } - /** * The RemoteHost property takes the name of of the host where a corresponding server is running. */ @@ -397,14 +382,14 @@ public abstract class AbstractSocketAppender extends AppenderBase /** * The queueSize property takes a non-negative integer representing * the number of logging events to retain for delivery to the remote receiver. - * When the queue size is zero, event delivery to the remote receiver is - * synchronous. When the queue size is greater than zero, the + * When the deque size is zero, event delivery to the remote receiver is + * synchronous. When the deque size is greater than zero, the * {@link #append(Object)} method returns immediately after enqueing the - * event, assuming that there is space available in the queue. Using a - * non-zero queue length can improve performance by eliminating delays + * event, assuming that there is space available in the deque. Using a + * non-zero deque length can improve performance by eliminating delays * caused by transient network delays. * - * @param queueSize the queue size to set. + * @param queueSize the deque size to set. */ public void setQueueSize(int queueSize) { this.queueSize = queueSize; diff --git a/logback-core/src/main/java/ch/qos/logback/core/net/AutoFlushingObjectWriter.java b/logback-core/src/main/java/ch/qos/logback/core/net/AutoFlushingObjectWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..6a5c07ca37659fbb00ae583bdf4a04cc8e0c8b90 --- /dev/null +++ b/logback-core/src/main/java/ch/qos/logback/core/net/AutoFlushingObjectWriter.java @@ -0,0 +1,61 @@ +/** + * Logback: the reliable, generic, fast and flexible logging framework. + * Copyright (C) 1999-2013, QOS.ch. All rights reserved. + * + * This program and the accompanying materials are dual-licensed under + * either the terms of the Eclipse Public License v1.0 as published by + * the Eclipse Foundation + * + * or (per the licensee's choosing) + * + * under the terms of the GNU Lesser General Public License version 2.1 + * as published by the Free Software Foundation. + */ + +package ch.qos.logback.core.net; + +import java.io.IOException; +import java.io.ObjectOutputStream; + +/** + * Automatically flushes the underlying {@link java.io.ObjectOutputStream} immediately after calling + * it's {@link java.io.ObjectOutputStream#writeObject(Object)} method. + * + * @author Sebastian Gröbler + */ +public class AutoFlushingObjectWriter implements ObjectWriter { + + private final ObjectOutputStream objectOutputStream; + private final int resetFrequency; + private int writeCounter = 0; + + /** + * Creates a new instance for the given {@link java.io.ObjectOutputStream}. + * + * @param objectOutputStream the stream to write to + * @param resetFrequency the frequency with which the given stream will be + * automatically reset to prevent a memory leak + */ + public AutoFlushingObjectWriter(ObjectOutputStream objectOutputStream, int resetFrequency) { + this.objectOutputStream = objectOutputStream; + this.resetFrequency = resetFrequency; + } + + @Override + public void write(Object object) throws IOException { + objectOutputStream.writeObject(object); + objectOutputStream.flush(); + preventMemoryLeak(); + } + + /** + * Failing to reset the object output stream every now and then creates a serious memory leak which + * is why the underlying stream will be reset according to the {@code resetFrequency}. + */ + private void preventMemoryLeak() throws IOException { + if (++writeCounter >= resetFrequency) { + objectOutputStream.reset(); + writeCounter = 0; + } + } +} diff --git a/logback-core/src/main/java/ch/qos/logback/core/net/ObjectWriter.java b/logback-core/src/main/java/ch/qos/logback/core/net/ObjectWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..dbd528b15866990848b466668745fd6fe867f424 --- /dev/null +++ b/logback-core/src/main/java/ch/qos/logback/core/net/ObjectWriter.java @@ -0,0 +1,33 @@ +/** + * Logback: the reliable, generic, fast and flexible logging framework. + * Copyright (C) 1999-2013, QOS.ch. All rights reserved. + * + * This program and the accompanying materials are dual-licensed under + * either the terms of the Eclipse Public License v1.0 as published by + * the Eclipse Foundation + * + * or (per the licensee's choosing) + * + * under the terms of the GNU Lesser General Public License version 2.1 + * as published by the Free Software Foundation. + */ +package ch.qos.logback.core.net; + +import java.io.IOException; + +/** + * Writes objects to an output. + * + * @author Sebastian Gröbler + */ +public interface ObjectWriter { + + /** + * Writes an object to an output. + * + * @param object the {@link Object} to write + * @throws IOException in case input/output fails, details are defined by the implementation + */ + void write(Object object) throws IOException; + +} diff --git a/logback-core/src/main/java/ch/qos/logback/core/net/ObjectWriterFactory.java b/logback-core/src/main/java/ch/qos/logback/core/net/ObjectWriterFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..eca1405131cbc6496b35185cc7a06e1a4ac3de9b --- /dev/null +++ b/logback-core/src/main/java/ch/qos/logback/core/net/ObjectWriterFactory.java @@ -0,0 +1,39 @@ +/** + * Logback: the reliable, generic, fast and flexible logging framework. + * Copyright (C) 1999-2013, QOS.ch. All rights reserved. + * + * This program and the accompanying materials are dual-licensed under + * either the terms of the Eclipse Public License v1.0 as published by + * the Eclipse Foundation + * + * or (per the licensee's choosing) + * + * under the terms of the GNU Lesser General Public License version 2.1 + * as published by the Free Software Foundation. + */ +package ch.qos.logback.core.net; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.OutputStream; + +import ch.qos.logback.core.CoreConstants; + +/** + * Factory for {@link ch.qos.logback.core.net.ObjectWriter} instances. + * + * @author Sebastian Gröbler + */ +public class ObjectWriterFactory { + + /** + * Creates a new {@link ch.qos.logback.core.net.AutoFlushingObjectWriter} instance. + * + * @param outputStream the underlying {@link java.io.OutputStream} to write to + * @return a new {@link ch.qos.logback.core.net.AutoFlushingObjectWriter} instance + * @throws IOException if an I/O error occurs while writing stream header + */ + public AutoFlushingObjectWriter newAutoFlushingObjectWriter(OutputStream outputStream) throws IOException { + return new AutoFlushingObjectWriter(new ObjectOutputStream(outputStream), CoreConstants.OOS_RESET_FREQUENCY); + } +} diff --git a/logback-core/src/main/java/ch/qos/logback/core/net/QueueFactory.java b/logback-core/src/main/java/ch/qos/logback/core/net/QueueFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..d9f17ff3a3dce97d90ecba46f096e2634ddbeb35 --- /dev/null +++ b/logback-core/src/main/java/ch/qos/logback/core/net/QueueFactory.java @@ -0,0 +1,39 @@ +/** + * Logback: the reliable, generic, fast and flexible logging framework. + * Copyright (C) 1999-2013, QOS.ch. All rights reserved. + * + * This program and the accompanying materials are dual-licensed under + * either the terms of the Eclipse Public License v1.0 as published by + * the Eclipse Foundation + * + * or (per the licensee's choosing) + * + * under the terms of the GNU Lesser General Public License version 2.1 + * as published by the Free Software Foundation. + */ +package ch.qos.logback.core.net; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * Factory for {@link java.util.Queue} instances. + * + * @author Sebastian Gröbler + */ +public class QueueFactory { + + /** + * Creates a new {@link LinkedBlockingDeque} with the given {@code capacity}. + * In case the given capacity is smaller than one it will automatically be + * converted to one. + * + * @param capacity the capacity to use for the queue + * @param the type of elements held in the queue + * @return a new instance of {@link ArrayBlockingQueue} + */ + public LinkedBlockingDeque newLinkedBlockingDeque(int capacity) { + final int actualCapacity = capacity < 1 ? 1 : capacity; + return new LinkedBlockingDeque(actualCapacity); + } +} diff --git a/logback-core/src/test/java/ch/qos/logback/core/net/AbstractSocketAppenderIntegrationTest.java b/logback-core/src/test/java/ch/qos/logback/core/net/AbstractSocketAppenderIntegrationTest.java new file mode 100755 index 0000000000000000000000000000000000000000..7d16ed5a2e3f87652007bd98945876b0c4b1c2a3 --- /dev/null +++ b/logback-core/src/test/java/ch/qos/logback/core/net/AbstractSocketAppenderIntegrationTest.java @@ -0,0 +1,129 @@ +/** + * Logback: the reliable, generic, fast and flexible logging framework. + * Copyright (C) 1999-2011, QOS.ch. All rights reserved. + * + * This program and the accompanying materials are dual-licensed under + * either the terms of the Eclipse Public License v1.0 as published by + * the Eclipse Foundation + * + * or (per the licensee's choosing) + * + * under the terms of the GNU Lesser General Public License version 2.1 + * as published by the Free Software Foundation. + */ + +package ch.qos.logback.core.net; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import ch.qos.logback.core.net.mock.MockContext; +import ch.qos.logback.core.net.server.ServerSocketUtil; +import ch.qos.logback.core.spi.PreSerializationTransformer; +import org.junit.After; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Integration tests for {@link ch.qos.logback.core.net.AbstractSocketAppender}. + * + * @author Carl Harris + * @author Sebastian Gröbler + */ +public class AbstractSocketAppenderIntegrationTest { + + private static final int TIMEOUT = 2000; + + private ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool(); + private MockContext mockContext = new MockContext(executorService); + private AutoFlushingObjectWriter objectWriter; + private ObjectWriterFactory objectWriterFactory = new SpyProducingObjectWriterFactory(); + private LinkedBlockingDeque deque = spy(new LinkedBlockingDeque(1)); + private QueueFactory queueFactory = mock(QueueFactory.class); + private InstrumentedSocketAppender instrumentedAppender = new InstrumentedSocketAppender(queueFactory, objectWriterFactory); + + @Before + public void setUp() throws Exception { + when(queueFactory.newLinkedBlockingDeque(anyInt())).thenReturn(deque); + instrumentedAppender.setContext(mockContext); + } + + @After + public void tearDown() throws Exception { + instrumentedAppender.stop(); + assertFalse(instrumentedAppender.isStarted()); + executorService.shutdownNow(); + assertTrue(executorService.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS)); + } + + @Test + public void dispatchesEvents() throws Exception { + + // given + ServerSocket serverSocket = ServerSocketUtil.createServerSocket(); + instrumentedAppender.setRemoteHost(serverSocket.getInetAddress().getHostAddress()); + instrumentedAppender.setPort(serverSocket.getLocalPort()); + instrumentedAppender.start(); + + Socket appenderSocket = serverSocket.accept(); + serverSocket.close(); + + // when + instrumentedAppender.append("some event"); + + // wait for event to be taken from deque and being written into the stream + verify(deque, timeout(TIMEOUT)).takeFirst(); + verify(objectWriter, timeout(TIMEOUT)).write("some event"); + + // then + ObjectInputStream ois = new ObjectInputStream(appenderSocket.getInputStream()); + assertEquals("some event", ois.readObject()); + appenderSocket.close(); + } + + private static class InstrumentedSocketAppender extends AbstractSocketAppender { + + public InstrumentedSocketAppender(QueueFactory queueFactory, ObjectWriterFactory objectWriterFactory) { + super(queueFactory, objectWriterFactory); + } + + @Override + protected void postProcessEvent(String event) { + } + + @Override + protected PreSerializationTransformer getPST() { + return new PreSerializationTransformer() { + public Serializable transform(String event) { + return event; + } + }; + } + } + + private class SpyProducingObjectWriterFactory extends ObjectWriterFactory { + + @Override + public AutoFlushingObjectWriter newAutoFlushingObjectWriter(OutputStream outputStream) throws IOException { + objectWriter = spy(super.newAutoFlushingObjectWriter(outputStream)); + return objectWriter; + } + } +} diff --git a/logback-core/src/test/java/ch/qos/logback/core/net/AbstractSocketAppenderTest.java b/logback-core/src/test/java/ch/qos/logback/core/net/AbstractSocketAppenderTest.java index 8efd80c5c291dd634885d39ab88d3b9ac317161a..371981afa04bb2f01d7f9d1ecfd5bada2f645245 100755 --- a/logback-core/src/test/java/ch/qos/logback/core/net/AbstractSocketAppenderTest.java +++ b/logback-core/src/test/java/ch/qos/logback/core/net/AbstractSocketAppenderTest.java @@ -14,242 +14,467 @@ package ch.qos.logback.core.net; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.ObjectInputStream; +import java.io.IOException; +import java.io.OutputStream; import java.io.Serializable; -import java.net.ServerSocket; +import java.net.InetAddress; import java.net.Socket; -import java.util.List; -import java.util.concurrent.*; - -import ch.qos.logback.core.BasicStatusManager; -import ch.qos.logback.core.status.Status; -import ch.qos.logback.core.status.StatusListener; -import ch.qos.logback.core.status.StatusManager; -import ch.qos.logback.core.util.StatusPrinter; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import ch.qos.logback.core.net.mock.MockContext; -import ch.qos.logback.core.net.server.ServerSocketUtil; import ch.qos.logback.core.spi.PreSerializationTransformer; +import ch.qos.logback.core.util.Duration; +import org.junit.After; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.contains; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; /** * Unit tests for {@link AbstractSocketAppender}. * * @author Carl Harris + * @author Sebastian Gröbler */ public class AbstractSocketAppenderTest { - private static final int DELAY = 10000; + /** + * Timeout used for all blocking operations in multi-threading contexts. + */ + private static final int TIMEOUT = 1000; - private ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool(); + private ThreadPoolExecutor executorService = spy((ThreadPoolExecutor) Executors.newCachedThreadPool()); private MockContext mockContext = new MockContext(executorService); - private InstrumentedSocketAppender instrumentedAppender = new InstrumentedSocketAppender(); + private PreSerializationTransformer preSerializationTransformer = spy(new StringPreSerializationTransformer()); + private Socket socket = mock(Socket.class); + private SocketConnector socketConnector = mock(SocketConnector.class); + private AutoFlushingObjectWriter objectWriter = mock(AutoFlushingObjectWriter.class); + private ObjectWriterFactory objectWriterFactory = mock(ObjectWriterFactory.class); + private LinkedBlockingDeque deque = spy(new LinkedBlockingDeque(1)); + private QueueFactory queueFactory = mock(QueueFactory.class); + private InstrumentedSocketAppender appender = spy(new InstrumentedSocketAppender(preSerializationTransformer, queueFactory, objectWriterFactory, socketConnector)); @Before public void setUp() throws Exception { - instrumentedAppender.setContext(mockContext); + // setup valid appender with mock dependencies + when(socketConnector.call()).thenReturn(socket); + when(objectWriterFactory.newAutoFlushingObjectWriter(any(OutputStream.class))).thenReturn(objectWriter); + when(queueFactory.newLinkedBlockingDeque(anyInt())).thenReturn(deque); + + appender.setContext(mockContext); + appender.setRemoteHost("localhost"); } @After public void tearDown() throws Exception { - instrumentedAppender.stop(); - assertFalse(instrumentedAppender.isStarted()); + appender.stop(); + assertFalse(appender.isStarted()); executorService.shutdownNow(); - assertTrue(executorService.awaitTermination(DELAY, TimeUnit.MILLISECONDS)); + assertTrue(executorService.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS)); } @Test - public void appenderShouldFailToStartWithoutValidPort() throws Exception { - instrumentedAppender.setPort(-1); - instrumentedAppender.setRemoteHost("localhost"); - instrumentedAppender.setQueueSize(0); - instrumentedAppender.start(); - assertFalse(instrumentedAppender.isStarted()); - assertTrue(mockContext.getLastStatus().getMessage().contains("port")); + public void failsToStartWithoutValidPort() throws Exception { + + // given + appender.setPort(-1); + + // when + appender.start(); + + // then + assertFalse(appender.isStarted()); + verify(appender).addError(contains("port")); } @Test - public void appenderShouldFailToStartWithoutValidRemoteHost() throws Exception { - instrumentedAppender.setPort(1); - instrumentedAppender.setRemoteHost(null); - instrumentedAppender.setQueueSize(0); - instrumentedAppender.start(); - assertFalse(instrumentedAppender.isStarted()); - assertTrue(mockContext.getLastStatus().getMessage().contains("remote host")); + public void failsToStartWithoutValidRemoteHost() throws Exception { + + // given + appender.setRemoteHost(null); + + // when + appender.start(); + + // then + assertFalse(appender.isStarted()); + verify(appender).addError(contains("remote host")); } @Test - public void appenderShouldFailToStartWithNegativeQueueSize() throws Exception { - instrumentedAppender.setPort(1); - instrumentedAppender.setRemoteHost("localhost"); - instrumentedAppender.setQueueSize(-1); - instrumentedAppender.start(); - assertFalse(instrumentedAppender.isStarted()); - assertTrue(mockContext.getLastStatus().getMessage().contains("Queue")); + public void failsToStartWithNegativeQueueSize() throws Exception { + + // given + appender.setQueueSize(-1); + + // when + appender.start(); + + // then + assertFalse(appender.isStarted()); + verify(appender).addError(contains("Queue size must be greater than zero")); } @Test - public void appenderShouldFailToStartWithUnresolvableRemoteHost() throws Exception { - instrumentedAppender.setPort(1); - instrumentedAppender.setRemoteHost("NOT.A.VALID.REMOTE.HOST.NAME"); - instrumentedAppender.setQueueSize(0); - instrumentedAppender.start(); - assertFalse(instrumentedAppender.isStarted()); - assertTrue(mockContext.getLastStatus().getMessage().contains("unknown host")); + public void failsToStartWithUnresolvableRemoteHost() throws Exception { + + // given + appender.setRemoteHost("NOT.A.VALID.REMOTE.HOST.NAME"); + + // when + appender.start(); + + // then + assertFalse(appender.isStarted()); + verify(appender).addError(contains("unknown host")); } @Test - public void appenderShouldFailToStartWithZeroQueueLength() throws Exception { - instrumentedAppender.setPort(1); - instrumentedAppender.setRemoteHost("localhost"); - instrumentedAppender.setQueueSize(0); - instrumentedAppender.start(); - assertTrue(instrumentedAppender.isStarted()); - assertTrue(instrumentedAppender.lastQueue instanceof SynchronousQueue); + public void startsButOutputsWarningWhenQueueSizeIsZero() throws Exception { + + // given + appender.setQueueSize(0); + + // when + appender.start(); + + // then + assertTrue(appender.isStarted()); + verify(appender).addWarn("Queue size of zero is deprecated, use a size of one to indicate synchronous processing"); } @Test - public void appenderShouldStartWithValidParameters() throws Exception { - instrumentedAppender.setPort(1); - instrumentedAppender.setRemoteHost("localhost"); - instrumentedAppender.setQueueSize(1); - instrumentedAppender.start(); - assertTrue(instrumentedAppender.isStarted()); - assertTrue(instrumentedAppender.lastQueue instanceof ArrayBlockingQueue); - assertEquals(1, instrumentedAppender.lastQueue.remainingCapacity()); + public void startsWithValidParameters() throws Exception { + + // when + appender.start(); + + // then + assertTrue(appender.isStarted()); } - // this test takes 1 second and is deemed too long - @Ignore - @Test(timeout = 2000) - public void appenderShouldCleanupTasksWhenStopped() throws Exception { - mockContext.setStatusManager(new BasicStatusManager()); - instrumentedAppender.setPort(1); - instrumentedAppender.setRemoteHost("localhost"); - instrumentedAppender.setQueueSize(1); - instrumentedAppender.start(); - assertTrue(instrumentedAppender.isStarted()); + @Test + public void createsSocketConnectorWithConfiguredParameters() throws Exception { + + // given + appender.setReconnectionDelay(new Duration(42)); + appender.setRemoteHost("localhost"); + appender.setPort(21); - waitForActiveCountToEqual(executorService, 2); - instrumentedAppender.stop(); - waitForActiveCountToEqual(executorService, 0); - StatusPrinter.print(mockContext); - assertEquals(0, executorService.getActiveCount()); + // when + appender.start(); + // then + verify(appender, timeout(TIMEOUT)).newConnector(InetAddress.getByName("localhost"), 21, 0, 42); } - private void waitForActiveCountToEqual(ThreadPoolExecutor executorService, int i) { - while (executorService.getActiveCount() != i) { - try { - Thread.yield(); - Thread.sleep(1); - System.out.print("."); - } catch (InterruptedException e) { - } - } + @Test + public void addsInfoMessageWhenSocketConnectionWasEstablished() { + + // when + appender.start(); + + // then + verify(appender, timeout(TIMEOUT)).addInfo(contains("connection established")); } + @Test + public void addsInfoMessageWhenSocketConnectionFailed() throws Exception { + + // given + doThrow(new IOException()).when(objectWriterFactory).newAutoFlushingObjectWriter(any(OutputStream.class)); + appender.start(); + + // when + appender.append("some event"); + + // then + verify(appender, timeout(TIMEOUT).atLeastOnce()).addInfo(contains("connection failed")); + } @Test - public void testAppendWhenNotStarted() throws Exception { - instrumentedAppender.setRemoteHost("localhost"); - instrumentedAppender.start(); - instrumentedAppender.stop(); + public void closesSocketOnException() throws Exception { - // make sure the appender task has stopped - executorService.shutdownNow(); - assertTrue(executorService.awaitTermination(DELAY, TimeUnit.MILLISECONDS)); + // given + doThrow(new IOException()).when(objectWriterFactory).newAutoFlushingObjectWriter(any(OutputStream.class)); + appender.start(); - instrumentedAppender.append("some event"); - assertTrue(instrumentedAppender.lastQueue.isEmpty()); + // when + appender.append("some event"); + + // then + verify(socket, timeout(TIMEOUT).atLeastOnce()).close(); } - @Test(timeout = 1000) - public void testAppendSingleEvent() throws Exception { - instrumentedAppender.setRemoteHost("localhost"); - instrumentedAppender.start(); + @Test + public void addsInfoMessageWhenSocketConnectionClosed() throws Exception { - instrumentedAppender.latch.await(); - instrumentedAppender.append("some event"); - assertTrue(instrumentedAppender.lastQueue.size() == 1); + // given + doThrow(new IOException()).when(objectWriterFactory).newAutoFlushingObjectWriter(any(OutputStream.class)); + appender.start(); + + // when + appender.append("some event"); + + // then + verify(appender, timeout(TIMEOUT).atLeastOnce()).addInfo(contains("connection closed")); } @Test - public void testAppendEvent() throws Exception { - instrumentedAppender.setRemoteHost("localhost"); - instrumentedAppender.setQueueSize(1); - instrumentedAppender.start(); + public void shutsDownWhenConnectorTaskCouldNotBeActivated() { - // stop the appender task, but don't stop the appender - executorService.shutdownNow(); - assertTrue(executorService.awaitTermination(DELAY, TimeUnit.MILLISECONDS)); + // given + doThrow(new RejectedExecutionException()).when(executorService).submit(socketConnector); - instrumentedAppender.append("some event"); - assertEquals("some event", instrumentedAppender.lastQueue.poll()); + // when + appender.start(); + + // then + verify(appender, timeout(TIMEOUT)).addInfo("shutting down"); } @Test - public void testDispatchEvent() throws Exception { - ServerSocket serverSocket = ServerSocketUtil.createServerSocket(); - instrumentedAppender.setRemoteHost(serverSocket.getInetAddress().getHostAddress()); - instrumentedAppender.setPort(serverSocket.getLocalPort()); - instrumentedAppender.setQueueSize(1); - instrumentedAppender.start(); + public void shutsDownWhenConnectorTaskThrewAnException() throws Exception { - Socket appenderSocket = serverSocket.accept(); - serverSocket.close(); + // given + doThrow(new IllegalStateException()).when(socketConnector).call(); - instrumentedAppender.append("some event"); + // when + appender.start(); - final int shortDelay = 100; - for (int i = 0, retries = DELAY / shortDelay; - !instrumentedAppender.lastQueue.isEmpty() && i < retries; - i++) { - Thread.sleep(shortDelay); - } - assertTrue(instrumentedAppender.lastQueue.isEmpty()); + // then + verify(appender, timeout(TIMEOUT)).addInfo("shutting down"); + } + + @Test + public void offersEventsToTheEndOfTheDeque() throws Exception { + + // given + appender.start(); + + // when + appender.append("some event"); + + // then + verify(deque).offer(eq("some event"), anyLong(), any(TimeUnit.class)); + } + + @Test + public void doesNotQueueAnyEventsWhenStopped() throws Exception { + + // given + appender.start(); + appender.stop(); + + // when + appender.append("some event"); + + // then + verifyZeroInteractions(deque); + } - ObjectInputStream ois = new ObjectInputStream(appenderSocket.getInputStream()); - assertEquals("some event", ois.readObject()); - appenderSocket.close(); + @Test + public void addsInfoMessageWhenEventCouldNotBeQueuedInConfiguredTimeoutDueToQueueSizeLimitation() throws Exception { + + // given + long eventDelayLimit = 42; + doReturn(false).when(deque).offer("some event", eventDelayLimit, TimeUnit.MILLISECONDS); + appender.setEventDelayLimit(new Duration(eventDelayLimit)); + appender.start(); + + // when + appender.append("some event"); + + // then + verify(appender).addInfo("Dropping event due to timeout limit of [" + eventDelayLimit + " milliseconds] being exceeded"); + } + + @Test + public void takesEventsFromTheFrontOfTheDeque() throws Exception { + + // given + appender.start(); + awaitStartOfEventDispatching(); + // when + appender.append("some event"); + + // then + verify(deque, timeout(TIMEOUT).atLeastOnce()).takeFirst(); + } + + @Test + public void reAddsEventAtTheFrontOfTheDequeWhenTransmissionFails() throws Exception { + + // given + doThrow(new IOException()).when(objectWriter).write(anyObject()); + appender.start(); + awaitStartOfEventDispatching(); + + // when + appender.append("some event"); + + // then + verify(deque, timeout(TIMEOUT).atLeastOnce()).offerFirst("some event"); + } + + @Test + public void addsErrorMessageWhenAppendingIsInterruptedWhileWaitingForTheQueueToAcceptTheEvent() throws Exception { + + // given + final InterruptedException interruptedException = new InterruptedException(); + doThrow(interruptedException).when(deque).offer(eq("some event"), anyLong(), any(TimeUnit.class)); + appender.start(); + + // when + appender.append("some event"); + + // then + verify(appender).addError("Interrupted while appending event to SocketAppender", interruptedException); + } + + @Test + public void postProcessesEventsBeforeTransformingItToASerializable() throws Exception { + + // given + appender.start(); + awaitStartOfEventDispatching(); + + // when + appender.append("some event"); + awaitAtLeastOneEventToBeDispatched(); + + // then + InOrder inOrder = inOrder(appender, preSerializationTransformer); + inOrder.verify(appender).postProcessEvent("some event"); + inOrder.verify(preSerializationTransformer).transform("some event"); + } + + @Test + public void writesSerializedEventToStream() throws Exception { + + // given + when(preSerializationTransformer.transform("some event")).thenReturn("some serialized event"); + appender.start(); + awaitStartOfEventDispatching(); + + // when + appender.append("some event"); + + // then + verify(objectWriter, timeout(TIMEOUT)).write("some serialized event"); + } + + @Test + public void addsInfoMessageWhenEventIsBeingDroppedBecauseOfConnectionProblemAndDequeCapacityLimitReached() throws Exception { + + // given + doThrow(new IOException()).when(objectWriter).write(anyObject()); + doThrow(new IllegalStateException()).when(deque).offerFirst("some event"); + appender.start(); + awaitStartOfEventDispatching(); + reset(appender); + + // when + appender.append("some event"); + + // then + verify(appender, timeout(TIMEOUT)).addInfo("Dropping event due to socket connection error and maxed out deque capacity"); + } + + @Test + public void reEstablishesSocketConnectionOnConnectionDrop() throws Exception { + + // given + doThrow(new IOException()).when(objectWriter).write(anyObject()); + appender.start(); + awaitStartOfEventDispatching(); + + // when + appender.append("some event"); + + // then + verify(objectWriterFactory, timeout(TIMEOUT).atLeast(2)).newAutoFlushingObjectWriter(any(OutputStream.class)); + } + + @Test + public void usesConfiguredAcceptConnectionTimeoutAndResetsSocketTimeoutAfterSuccessfulConnection() throws Exception { + + // when + appender.setAcceptConnectionTimeout(42); + appender.start(); + awaitStartOfEventDispatching(); + + // then + InOrder inOrder = inOrder(socket); + inOrder.verify(socket).setSoTimeout(42); + inOrder.verify(socket).setSoTimeout(0); + } + + private void awaitAtLeastOneEventToBeDispatched() throws IOException { + verify(objectWriter, timeout(TIMEOUT)).write(anyString()); + } + + private void awaitStartOfEventDispatching() throws InterruptedException { + verify(deque, timeout(TIMEOUT)).takeFirst(); } private static class InstrumentedSocketAppender extends AbstractSocketAppender { - private BlockingQueue lastQueue; - CountDownLatch latch = new CountDownLatch(1); + private PreSerializationTransformer preSerializationTransformer; + private SocketConnector socketConnector; + + public InstrumentedSocketAppender(PreSerializationTransformer preSerializationTransformer, + QueueFactory queueFactory, + ObjectWriterFactory objectWriterFactory, + SocketConnector socketConnector) { + super(queueFactory, objectWriterFactory); + this.preSerializationTransformer = preSerializationTransformer; + this.socketConnector = socketConnector; + } + @Override protected void postProcessEvent(String event) { } @Override protected PreSerializationTransformer getPST() { - return new PreSerializationTransformer() { - public Serializable transform(String event) { - return event; - } - }; + return preSerializationTransformer; } @Override - protected void signalEntryInRunMethod() { - latch.countDown(); + protected SocketConnector newConnector(InetAddress address, int port, long initialDelay, long retryDelay) { + return socketConnector; } + } + + private static class StringPreSerializationTransformer implements PreSerializationTransformer { @Override - BlockingQueue newBlockingQueue(int queueSize) { - lastQueue = super.newBlockingQueue(queueSize); - return lastQueue; + public Serializable transform(String event) { + return event; } - } - } diff --git a/logback-core/src/test/java/ch/qos/logback/core/net/AutoFlushingObjectWriterTest.java b/logback-core/src/test/java/ch/qos/logback/core/net/AutoFlushingObjectWriterTest.java new file mode 100644 index 0000000000000000000000000000000000000000..1a4672703a9f6bad82e627818184d1c49fab332a --- /dev/null +++ b/logback-core/src/test/java/ch/qos/logback/core/net/AutoFlushingObjectWriterTest.java @@ -0,0 +1,114 @@ +/** + * Logback: the reliable, generic, fast and flexible logging framework. + * Copyright (C) 1999-2011, QOS.ch. All rights reserved. + * + * This program and the accompanying materials are dual-licensed under + * either the terms of the Eclipse Public License v1.0 as published by + * the Eclipse Foundation + * + * or (per the licensee's choosing) + * + * under the terms of the GNU Lesser General Public License version 2.1 + * as published by the Free Software Foundation. + */ + +package ch.qos.logback.core.net; + +import java.io.IOException; +import java.io.ObjectOutputStream; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link ch.qos.logback.core.net.AutoFlushingObjectWriter}. + * + * @author Sebastian Gröbler + */ +public class AutoFlushingObjectWriterTest { + + private InstrumentedObjectOutputStream objectOutputStream; + + @Before + public void beforeEachTest() throws IOException { + objectOutputStream = spy(new InstrumentedObjectOutputStream()); + } + + @Test + public void writesToUnderlyingObjectOutputStream() throws IOException { + + // given + ObjectWriter objectWriter = new AutoFlushingObjectWriter(objectOutputStream, 2); + String object = "foo"; + + // when + objectWriter.write(object); + + // then + verify(objectOutputStream).writeObjectOverride(object); + } + + @Test + public void flushesAfterWrite() throws IOException { + + // given + ObjectWriter objectWriter = new AutoFlushingObjectWriter(objectOutputStream, 2); + String object = "foo"; + + // when + objectWriter.write(object); + + // then + InOrder inOrder = inOrder(objectOutputStream); + inOrder.verify(objectOutputStream).writeObjectOverride(object); + inOrder.verify(objectOutputStream).flush(); + } + + @Test + public void resetsObjectOutputStreamAccordingToGivenResetFrequency() throws IOException { + + // given + ObjectWriter objectWriter = new AutoFlushingObjectWriter(objectOutputStream, 2); + String object = "foo"; + + // when + objectWriter.write(object); + objectWriter.write(object); + objectWriter.write(object); + objectWriter.write(object); + + // then + InOrder inOrder = inOrder(objectOutputStream); + inOrder.verify(objectOutputStream).writeObjectOverride(object); + inOrder.verify(objectOutputStream).writeObjectOverride(object); + inOrder.verify(objectOutputStream).reset(); + inOrder.verify(objectOutputStream).writeObjectOverride(object); + inOrder.verify(objectOutputStream).writeObjectOverride(object); + inOrder.verify(objectOutputStream).reset(); + } + + private static class InstrumentedObjectOutputStream extends ObjectOutputStream { + + protected InstrumentedObjectOutputStream() throws IOException, SecurityException { + } + + @Override + protected void writeObjectOverride(final Object obj) throws IOException { + // nop + } + + @Override + public void flush() throws IOException { + // nop + } + + @Override + public void reset() throws IOException { + // nop + } + } +} diff --git a/logback-site/src/site/pages/manual/appenders.html b/logback-site/src/site/pages/manual/appenders.html index b55380ffda7f39ed3c26c808beace4568340e7bd..b01d0aeceaaf32605d0039821c4223879662360c 100755 --- a/logback-site/src/site/pages/manual/appenders.html +++ b/logback-site/src/site/pages/manual/appenders.html @@ -1649,15 +1649,15 @@ public interface TriggeringPolicy<E> extends LifeCycle { queueSize int -

The queueSize property takes a - non-negative integer representing the number of logging +

The queueSize property takes an + integer (greater than zero) representing the number of logging events to retain for delivery to the remote receiver. When - the queue size is zero, event delivery to the remote + the queue size is one, event delivery to the remote receiver is synchronous. When the queue size is greater - than zero, new events are enqueued, assuming that there is - space available in the queue. Using a non-zero queue length - can improve performance by eliminating delays caused by - transient network delays. + than one, new events are enqueued, assuming that there is + space available in the queue. Using a queue length greater + than one can improve performance by eliminating delays caused + by transient network delays.

See also the eventDelayLimit