Skip to content

Commit 2722fc3

Browse files
committed
Removed named pipe support because of file descriptor leaks.
1 parent 16adcd5 commit 2722fc3

5 files changed

Lines changed: 32 additions & 110 deletions

File tree

src/main/java/info/fetter/logstashforwarder/FileReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private int readFile(FileState state, int spaceLeftInSpool) {
105105

106106
private boolean isCompressedFile(FileState state) {
107107
RandomAccessFile reader = state.getRandomAccessFile();
108-
if (!reader.canSeek()) return false;
108+
109109
try {
110110
for(byte[] magic : MAGICS) {
111111
byte[] fileBytes = new byte[magic.length];

src/main/java/info/fetter/logstashforwarder/FileSigner.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,8 @@
99

1010
public class FileSigner {
1111
private static final Adler32 adler32 = new Adler32();
12-
private static long fakeSignatureForPipes = System.currentTimeMillis();
13-
12+
1413
public static long computeSignature(RandomAccessFile file, int signatureLength) throws IOException {
15-
// If the file is not seekable, a pipe for instance,
16-
// we report an ever-changing fake signature to keep
17-
// FileWatcher trying to read again as it would with
18-
// a normal file that had changed.
19-
if (!file.canSeek()) return ++fakeSignatureForPipes;
20-
2114
adler32.reset();
2215
byte[] input = new byte[signatureLength];
2316
file.seek(0);

src/main/java/info/fetter/logstashforwarder/FileWatcher.java

Lines changed: 19 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
/*
44
* Copyright 2015 Didier Fetter
5-
* Copyright 2017 Alberto González Palomo https://sentido-labs.com
65
*
76
* Licensed under the Apache License, Version 2.0 (the "License");
87
* you may not use this file except in compliance with the License.
@@ -60,7 +59,7 @@ public void initialize() throws IOException {
6059
oldWatchMap.put(state.getFile(), state);
6160
}
6261
}
63-
synchronized (newWatchMap) { processModifications(); }
62+
processModifications();
6463
if(tail) {
6564
for(FileState state : oldWatchMap.values()) {
6665
if(state.getPointer() == 0) {
@@ -91,7 +90,7 @@ public void checkFiles() throws IOException {
9190
for(FileAlterationObserver observer : observerList) {
9291
observer.checkAndNotify();
9392
}
94-
synchronized (newWatchMap) { processModifications(); }
93+
processModifications();
9594
printWatchMap();
9695
}
9796

@@ -224,33 +223,12 @@ private void processModifications() throws IOException {
224223
removeMarkedFilesFromWatchMap();
225224
}
226225

227-
// This filter will accept anything that is not a directory,
228-
// including named pipes (FIFOs), sockets and device files.
229-
// The standard org.apache.commons.io.filefilter.FileFileFilter excludes
230-
// them even if their documentation says
231-
// "This filter accepts Files that are files (not directories)."
232-
protected class FileFileFilter implements IOFileFilter
233-
{
234-
@Override
235-
public boolean accept(File file) {
236-
return !file.isDirectory();
237-
}
238-
239-
@Override
240-
public boolean accept(File dir, String name) {
241-
return accept(new File(dir, name));
242-
}
243-
}
244-
protected IOFileFilter fileFileFilter() {
245-
return new FileFileFilter();
246-
}
247-
248226
private void addSingleFile(String fileToWatch, Event fields, long deadTime, Multiline multiline, Filter filter) throws Exception {
249227
logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath());
250228
String directory = FilenameUtils.getFullPath(fileToWatch);
251229
String fileName = FilenameUtils.getName(fileToWatch);
252230
IOFileFilter fileFilter = FileFilterUtils.and(
253-
fileFileFilter(),
231+
FileFilterUtils.fileFileFilter(),
254232
FileFilterUtils.nameFileFilter(fileName),
255233
new LastModifiedFileFilter(deadTime));
256234
initializeWatchMap(new File(directory), fileFilter, fields, multiline, filter);
@@ -262,7 +240,7 @@ private void addWildCardFiles(String filesToWatch, Event fields, long deadTime,
262240
String wildcard = FilenameUtils.getName(filesToWatch);
263241
logger.trace("Directory : " + new File(directory).getCanonicalPath() + ", wildcard : " + wildcard);
264242
IOFileFilter fileFilter = FileFilterUtils.and(
265-
fileFileFilter(),
243+
FileFilterUtils.fileFileFilter(),
266244
new WildcardFileFilter(wildcard),
267245
new LastModifiedFileFilter(deadTime));
268246
initializeWatchMap(new File(directory), fileFilter, fields, multiline, filter);
@@ -289,50 +267,22 @@ private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event f
289267
}
290268
}
291269

292-
// This class will wait until the file is open, which happens in
293-
// new FileState(file).
294-
// Normal files open immediately, but named pipes block until data
295-
// is written in them.
296-
protected class FileAdderThread extends Thread
297-
{
298-
Map<File,FileState> map;
299-
File file;
300-
Event fields;
301-
Multiline multiline;
302-
Filter filter;
303-
304-
private FileAdderThread() {}
305-
public FileAdderThread(Map<File,FileState> map, File file, Event fields, Multiline multiline, Filter filter) {
306-
this.map = map;
307-
this.file = file;
308-
this.fields = fields;
309-
this.multiline = multiline;
310-
this.filter = filter;
311-
}
312-
313-
public void run() {
314-
try {
315-
FileState state = new FileState(file);
316-
state.setFields(fields);
317-
int signatureLength = (int) (state.getSize() > maxSignatureLength ? maxSignatureLength : state.getSize());
318-
state.setSignatureLength(signatureLength);
319-
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
320-
state.setSignature(signature);
321-
logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature);
322-
state.setMultiline(multiline);
323-
state.setFilter(filter);
324-
synchronized (map /* This is actually newWatchMap. */) {
325-
map.put(file, state);
326-
}
327-
} catch(IOException e) {
328-
logger.error("Caught IOException in addFileToWatchMap : " +
329-
e.getMessage());
330-
}
331-
}
332-
}
333-
334270
private void addFileToWatchMap(Map<File,FileState> map, File file, Event fields, Multiline multiline, Filter filter) {
335-
(new FileAdderThread(map, file, fields, multiline, filter)).start();
271+
try {
272+
FileState state = new FileState(file);
273+
state.setFields(fields);
274+
int signatureLength = (int) (state.getSize() > maxSignatureLength ? maxSignatureLength : state.getSize());
275+
state.setSignatureLength(signatureLength);
276+
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
277+
state.setSignature(signature);
278+
logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature);
279+
state.setMultiline(multiline);
280+
state.setFilter(filter);
281+
map.put(file, state);
282+
} catch(IOException e) {
283+
logger.error("Caught IOException in addFileToWatchMap : " +
284+
e.getMessage());
285+
}
336286
}
337287

338288
public void onFileChange(File file, Event fields, Multiline multiline, Filter filter) {

src/main/java/info/fetter/logstashforwarder/config/FilesSection.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
import java.util.List;
2121
import java.util.Map;
22-
import java.util.regex.Pattern;
23-
2422
import org.apache.commons.lang.builder.ToStringBuilder;
2523

2624
import com.fasterxml.jackson.annotation.JsonProperty;

src/main/java/info/fetter/logstashforwarder/util/RandomAccessFile.java

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
/*
22
* Copyright 1998-2009 University Corporation for Atmospheric Research/Unidata
3-
* Copyright 2017 Alberto González Palomo https://sentido-labs.com
43
*
54
* Portions of this software were developed by the Unidata Program at the
65
* University Corporation for Atmospheric Research.
@@ -161,14 +160,13 @@ static public long getDebugNbytes() {
161160
*/
162161
protected java.io.RandomAccessFile file;
163162
protected java.nio.channels.FileChannel fileChannel;
164-
protected boolean canSeek;
165163

166164
/**
167165
* The offset in bytes from the file start, of the next read or
168166
* write operation.
169167
*/
170168
protected long filePosition;
171-
169+
172170
/**
173171
* The buffer used for reading the data.
174172
*/
@@ -263,12 +261,6 @@ public RandomAccessFile(String location, String mode, int bufferSize) throws IOE
263261
}
264262

265263
this.file = new java.io.RandomAccessFile(location, mode);
266-
try {
267-
this.file.seek(0);
268-
canSeek = true;
269-
} catch (IOException e) {
270-
canSeek = false;
271-
}
272264
this.readonly = mode.equals("r");
273265
init(bufferSize);
274266

@@ -382,10 +374,6 @@ public void seek(long pos) throws IOException {
382374
readBuffer(pos);
383375
}
384376

385-
public boolean canSeek() {
386-
return canSeek;
387-
}
388-
389377
protected void readBuffer(long pos) throws IOException {
390378
// If the current buffer is modified, write it to disk.
391379
if (bufferModified) {
@@ -436,27 +424,14 @@ public String getLocation() {
436424
* @throws IOException if an I/O error occurrs.
437425
*/
438426
public long length() throws IOException {
439-
long fileLength = canSeek? file.length(): 0;
427+
long fileLength = file.length();
440428
if (fileLength < dataEnd) {
441429
return dataEnd;
442430
} else {
443431
return fileLength;
444432
}
445433
}
446434

447-
/**
448-
* Check whether the file is empty: normal files are empty
449-
* when their size is zero, but other kinds of files like
450-
* named pipes / FIFOs do not report a size.
451-
*/
452-
public boolean isEmpty() throws IOException {
453-
if (canSeek) {
454-
return length() == 0;
455-
} else {
456-
return false;
457-
}
458-
}
459-
460435
/**
461436
* Change the current endian mode. Subsequent reads of short, int, float, double, long, char will
462437
* use this. Does not currently affect writes.
@@ -478,6 +453,10 @@ public void order(int endian) {
478453
public FileDescriptor getFD() throws IOException {
479454
return (file == null) ? null : file.getFD();
480455
}
456+
457+
public boolean isEmpty() throws IOException {
458+
return length() == 0;
459+
}
481460

482461
/**
483462
* Copy the contents of the buffer to the disk.
@@ -486,7 +465,7 @@ public FileDescriptor getFD() throws IOException {
486465
*/
487466
public void flush() throws IOException {
488467
if (bufferModified) {
489-
if (canSeek) file.seek(bufferStart);
468+
file.seek(bufferStart);
490469
file.write(buffer, 0, dataSize);
491470
//System.out.println("--flush at "+bufferStart+" dataSize= "+dataSize+ " filePosition= "+filePosition);
492471
bufferModified = false;
@@ -653,7 +632,7 @@ public long readToByteChannel(WritableByteChannel dest, long offset, long nbytes
653632
* @throws IOException on io error
654633
*/
655634
protected int read_(long pos, byte[] b, int offset, int len) throws IOException {
656-
if (canSeek) file.seek(pos);
635+
file.seek(pos);
657636
int n = file.read(b, offset, len);
658637
if (debugAccess) {
659638
if (showRead) System.out.println(" **read_ " + location + " = " + len + " bytes at " + pos + "; block = " + (pos / buffer.length));
@@ -877,7 +856,7 @@ public void writeBytes(byte b[], int off, int len) throws IOException {
877856
if (bufferModified) {
878857
flush();
879858
}
880-
if (canSeek) file.seek(filePosition); // moved per Steve Cerruti; Jan 14, 2005
859+
file.seek(filePosition); // moved per Steve Cerruti; Jan 14, 2005
881860
file.write(b, off, len);
882861
//System.out.println("--write at "+filePosition+" "+len);
883862

@@ -1759,3 +1738,5 @@ public boolean searchForward(KMPMatch match, int maxBytes) throws IOException {
17591738
}
17601739

17611740
}
1741+
1742+

0 commit comments

Comments
 (0)