我相信我做的一切正确。我创建一个管道,将接收器传递到编写器线程,使用OP_READ在选择器上注册源,启动选择器。一切正常,但是一旦我向接收器写入内容,就会出现管道异常的情况。为什么!!!
???
这里没有破损的管道。我烦了。我如何调试/了解这里发生了什么?有没有人有一个简单的管道示例,我可以运行它来测试是否正常。写在接收器上的线程,选择器读取它。
编辑:
我几乎遵循了这里的建议。很难在Internet上找到NIO管道的具体示例。
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class SystemOutPipe extends Thread {
public static void main(String[] args)
{
try {
SystemOutPipe sop = new SystemOutPipe();
sop.start();
System.out.println("This message should be redirected to System.err\nNow waiting 5 seconds ...");
Thread.sleep(5000L);
sop.setStopped(true);
sop.join();
} catch (Exception e) {
e.printStackTrace();
}
}
private Selector selector;
private Pipe pipe;
private boolean stopped = false;
public SystemOutPipe() throws IOException {
super("SystemOutPipe");
pipe = Pipe.open();
System.setOut(new PrintStream(new PipeOutputStream(pipe)));
selector = Selector.open();
pipe.source().configureBlocking(false);
pipe.source().register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
@Override
public void run() {
try {
while (!isStopped()) {
int n = selector.select(1L);
if (n > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if (key.isReadable()) {
new ReadHandler(key).run();
}
}
}
}
} catch (Exception e) {
e.printStackTrace(); // writes to System.err !
}
}
public synchronized boolean isStopped() {
return stopped;
}
public synchronized void setStopped(final boolean stopped) {
this.stopped = stopped;
}
public class ReadHandler implements Runnable {
private final SelectionKey key;
public ReadHandler(final SelectionKey key) {
this.key = key;
}
@Override
public void run() {
ByteBuffer bbuf = (ByteBuffer) key.attachment();
ReadableByteChannel channel = (ReadableByteChannel) key.channel();
try
{
int count = 0;
do {
bbuf.clear();
count = channel.read(bbuf);
if (count > 0) System.err.write(bbuf.array(), 0, count);
} while(count > 0);
} catch (IOException e) {
e.printStackTrace();
key.cancel();
}
}
}
public class PipeOutputStream extends OutputStream {
private final Pipe pipe;
public PipeOutputStream(final Pipe pipe) {
this.pipe = pipe;
}
@Override
public void write(final int b) throws IOException {
write(new byte[] { (byte) b });
}
@Override
public void write(final byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
ByteBuffer bbuf = ByteBuffer.wrap(b, off, len);
bbuf.position(len);
bbuf.flip();
int count = 0;
while (count < len) {
int n = pipe.sink().write(bbuf);
if (n == 0) {
// let's wait a bit and not consume cpu
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
else count += n;
}
}
}
}
例外:
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcher.write0(Native Method)
at sun.nio.ch.FileDispatcher.write(FileDispatcher.java:39)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:72)
at sun.nio.ch.IOUtil.write(IOUtil.java:43)
at sun.nio.ch.SinkChannelImpl.write(SinkChannelImpl.java:149)
at com.niostuff.util.GCLogInterceptor.fileModified(GCLogInterceptor.java:180)
at net.contentobjects.jnotify.linux.JNotifyAdapterLinux$WatchData.notifyFileModified(Unknown Source)
at net.contentobjects.jnotify.linux.JNotifyAdapterLinux.notifyChangeEvent(Unknown Source)
at net.contentobjects.jnotify.linux.JNotifyAdapterLinux$1.notify(Unknown Source)
at net.contentobjects.jnotify.linux.JNotify_linux.callbackProcessEvent(Unknown Source)
at net.contentobjects.jnotify.linux.JNotify_linux.nativeNotifyLoop(Native Method)
at net.contentobjects.jnotify.linux.JNotify_linux.access$000(Unknown Source)
at net.contentobjects.jnotify.linux.JNotify_linux$1.run(Unknown Source)
好的,所以我发现了问题。首先,感谢所有尝试提供帮助的人。希望你能从我的错误中学到东西。事件链是:
1-我没有耗尽接收缓冲区(源通道读入的缓冲区),最终缓冲区已满。
2-现在已满,pipeSourceChannel.read(readBuffer)返回0字节。有要读取的数据,但无法在完整的缓冲区上读取。
3-这导致通道被关闭(我自己在bytesRead == 0上这样做)和BrokenPipe。
我正在尝试使用hdfs水槽运行水槽。hdfs在不同的机器上正常运行,我甚至可以与水槽机器上的hdfs交互,但是当我运行水槽并向其发送事件时,我收到以下错误: 同样,一致性不是问题,因为我可以使用hadoop命令行与hdfs交互(水槽机不是datanode)。最奇怪的是,在杀死水槽后,我可以看到tmp文件是在hdfs中创建的,但它是空的(扩展名仍然是. tmp)。 关于为什么会发生这种情况的任何想法
我是使用Flume和Hadoop的新手,所以我试图尽可能设置一个最简单的(但有些帮助/现实的)例子。我在虚拟机客户端中使用HortonWorks沙盒。在完成了教程12(包括设置和使用Flume)之后,一切看起来都正常了。 所以我建立了自己的flume.conf 从apache访问日志中读取 使用内存通道 写入HDFS 够简单吧?这是我的会议文件 我见过几个人在给HDFS写信时遇到问题,大多数情况下
错误消息失败: bank_spec.rb gem'spork'gem'autotest','4.4.6'gem'zentest',:require=>false,:group=>:test gem'autotest-rails-pure“,'4.1.2'gem'autotest-growl','0.2.9'gem'factory_girl_rails”,'1.0'gem'capybara“,:gi
我在其中一个POJO中使用javax.validation.constraints.NotNull,如下所示: 然后,即使我为两个字段都使用 null 值制作 Abc 的对象,它也不会引发任何异常。知道为什么吗? 例如。 不会抛出任何异常。
这是一个Spring项目。我使用下面的代码来处理传入的http GET请求,它的格式是http://xxx.comm/vasttracking?rtbprovider=AAA&adgroupkey=bbb&transactionid=ccc&event=ddd。当服务器运行时,大多数请求都得到了正确的处理,但有些会引发异常--大多数是 在很少的情况下,它们是“String无法转换为类型VastEv
当hdfs不可用时,是否有方法确保数据安全?场景是:kafka源,flume内存通道,hdfs接收器。如果水槽服务关闭了,它是否可以存储主题分区的偏移量,并在恢复后从正确的位置消费?