agn*_*aft 3 java gzipinputstream
我有一个包含gzip压缩日志文件的目录,每行一个事件.为了实时读取和处理这些,我创建了一个与此处列出的代码相同的WatcherService:http://docs.oracle.com/javase/tutorial/essential/io/notification.html
在processEvents()方法中,我添加了此代码以逐行读取已添加或追加的文件:
if (kind == ENTRY_MODIFY) {
try(BufferedReader reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(Files.newInputStream(child, StandardOpenOption.READ))))) {
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
}
catch(EOFException ex) {
//file is empty, so ignore until next signal
}
catch(Exception ex) {
ex.printStackTrace();
}
}
Run Code Online (Sandbox Code Playgroud)
现在,正如您可以想象的那样,这对于在几毫秒内创建的已编写和关闭的文件非常有用,但是,当处理随时间附加的大文件时,这将为每个附加行反复读取整个文件(给定生成器现在然后刷新和同步文件).
有没有什么办法可以在每次发送ENTRY_MODIFY信号时只读取此文件中的新行,或者找出文件"完成"的时间?
如何处理未附加但被覆盖的文件?
首先,我想回答您问题的技术方面:
一个WatchEvent只是给你一个改变(或创建或删除)文件,并没有更多的文件名.因此,如果您需要超出此范围的任何逻辑,您必须自己实现它(或者当然使用现有的库).
如果您只想读取新行,则必须记住每个文件的位置,每当更改此文件时,您可以移动到最后的已知位置.要获得当前位置,您可以使用CountingInputStreamCommons IO包中的一个(积分转到[1]).要跳到最后位置,您可以使用该功能skip.
但是你正在使用a GZIPInputStream,这意味着跳过不会给你带来很好的性能提升,因为跳过压缩流是不可能的.相反,GZIPInputStream跳过将解压缩流,就像您在阅读时一样,因此您只会遇到很少的性能改进(试试吧!).
我不明白你为什么要使用压缩日志文件?为什么不用a编写未压缩的日志DailyRollingFileAppender并在一天结束时压缩它,此时应用程序不再访问它?
另一种解决方案可能是保留GZIPInputStream(存储它),这样您就不必再次重新读取文件.这可能取决于您需要观察多少日志文件来确定这是否合理.
现在对您的要求提出一些问题:
您没有提到为什么要实时查看日志文件的原因.为什么不集中日志(请参阅集中式Java日志记录)?例如,看一下logstash和这个演示文稿(参见[2]和[3])或抄写或splunk,这是商业的(参见[4]).
集中日志可让您有机会根据日志数据真正做出实时反应.
[1] /sf/answers/16851831/
[2] 使用elasticsearch,logstash和kibana创建实时仪表板 - 幻灯片
[3] 使用elasticsearch,logstash和kibana创建实时仪表板 - 视频
[4] 日志使用Splunk聚合 - 幻灯片
更新
首先,使用Groovy脚本生成压缩日志文件.每次我想模拟日志文件更改时,我都会从GroovyConsole启动此脚本:
// Run with GroovyConsole each time you want new entries
def file = new File('D:\\Projekte\\watcher_service\\data\\log.gz')
// reading previous content since append is not possible
def content
if (file.exists()) {
def inStream = new java.util.zip.GZIPInputStream(file.newInputStream())
content = inStream.readLines()
}
// writing previous content and append new data
def random = new java.util.Random()
def lineCount = random.nextInt(30) + 1
def outStream = new java.util.zip.GZIPOutputStream(file.newOutputStream())
outStream.withWriter('UTF-8') { writer ->
if (content) {
content.each { writer << "$it\n" }
}
(1 .. lineCount).each {
writer.write "Writing line $it/$lineCount\n"
}
writer.write '---Finished---\n'
writer.flush()
writer.close()
}
println "Wrote ${lineCount + 1} lines."
Run Code Online (Sandbox Code Playgroud)
那么日志文件阅读器:
import java.nio.file.FileSystems
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.StandardOpenOption
import java.util.zip.GZIPInputStream
import org.apache.commons.io.input.CountingInputStream
import static java.nio.file.StandardWatchEventKinds.*
class LogReader
{
private final Path dir = Paths.get('D:\\Projekte\\watcher_service\\data\\')
private watcher
private positionMap = [:]
long lineCount = 0
static void main(def args)
{
new LogReader().processEvents()
}
LogReader()
{
watcher = FileSystems.getDefault().newWatchService()
dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)
}
void processEvents()
{
def key = watcher.take()
boolean doLeave = false
while ((key != null) && (doLeave == false))
{
key.pollEvents().each { event ->
def kind = event.kind()
Path name = event.context()
println "Event received $kind: $name"
if (kind == ENTRY_MODIFY) {
// use position from the map, if entry is not there use default value 0
processChange(name, positionMap.get(name.toString(), 0))
}
else if (kind == ENTRY_CREATE) {
processChange(name, 0)
}
else {
doLeave = true
return
}
}
key.reset()
key = watcher.take()
}
}
private void processChange(Path name, long position)
{
// open file and go to last position
Path absolutePath = dir.resolve(name)
def countingStream =
new CountingInputStream(
new GZIPInputStream(
Files.newInputStream(absolutePath, StandardOpenOption.READ)))
position = countingStream.skip(position)
println "Moving to position $position"
// processing each new line
// at the first start all lines are read
int newLineCount = 0
countingStream.withReader('UTF-8') { reader ->
reader.eachLine { line ->
println "${++lineCount}: $line"
++newLineCount
}
}
println "${++lineCount}: $newLineCount new lines +++Finished+++"
// store new position in map
positionMap[name.toString()] = countingStream.count
println "Storing new position $countingStream.count"
countingStream.close()
}
}
Run Code Online (Sandbox Code Playgroud)
在该功能中,processChange您可以看到1)输入流的创建.与.withReader创造InputStreamReader和的线BufferedReader.我总是使用Grovvy,它是立体声中的Java,当你开始使用它时,你无法停止.Java开发人员应该能够阅读它,但如果您有问题只需评论.
| 归档时间: |
|
| 查看次数: |
608 次 |
| 最近记录: |