嘿,希望有人能帮我一下。
我正在尝试使用Scala Actors和 Netty.io 库来获取异步http请求。(是的,我知道Scala演员正在被弃用,但这对我来说是一个学习练习)
我编写了一个角色HttpRequest estActor
,它接受案例类Request Page(uri: URI)形式的消息。
当它接收到消息时,它创建了必要的Netty对象,需要发出超文本传输协议请求,我已经基于来自[HttpSnoopClient
](http://static.netty.io/3.5/xref/org/jboss/netty/example/http/snoop/HttpSnoopClient.html)示例的大部分代码。
我创建一个客户端,并将当前执行组件实例传递给我的通道管道工厂的实现,
该函数还将该执行组件传递给我的简单通道上游处理程序
的实现,我已在其中重写了消息接收
函数。
actor实例作为侦听器传递,我使用< code>DefaultHttpRequest类创建一个请求,并写入通道以发出请求。
使用从写入通道返回的通道未来
对象对执行组件对象的阻塞调用。当我的处理程序类的 messageRecieved
函数被调用时,我将 netty http 请求的响应解析为字符串,将消息发送回具有响应内容的消息并关闭通道。
在future完成后,我的代码尝试将收到的http内容响应发送给调用参与者。
代码工作正常,我能够得到回复,将其发送到我的演员实例,打印出内容并向正在使用的演员实例发布资源发送消息。
问题是当我测试它时,对演员的原始调用没有得到回复,线程只是保持打开状态。
我的< code>HttpRequestActor类的代码
import scala.actors.Actor
import java.net.{InetSocketAddress,URI}
import org.jboss.netty.handler.codec.http._
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel.Channel
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.group.DefaultChannelGroup
import java.util.concurrent.{Executors,CancellationException}
import org.jboss.netty.util.CharsetUtil
import scala.concurrent.{ Promise, Future }
import scala.concurrent.ExecutionContext.Implicits.global
/**
* @author mebinum
*
*/
class HttpRequestActor extends Actor {
//initialize response with default uninitialized value
private var resp:Response = _
private val executor = Executors.newCachedThreadPool
private val executor2 = Executors.newCachedThreadPool
private val factory = new NioClientSocketChannelFactory(
executor,
executor2);
private val allChannels = new DefaultChannelGroup("httpRequester")
def act = loop {
react {
case RequestPage(uri) => requestUri(uri)
case Reply(msg) => setResponse(Reply(msg))
case NoReply => println("didnt get a reply");setResponse(NoReply)
case NotReadable => println("got a reply but its not readable");setResponse(NotReadable)
case ShutDown => shutDown()
}
}
private def requestUri(uri:URI) = {
makeChannel(uri) map {
channel => {
allChannels.add(channel)
val request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toString)
request.setHeader(HttpHeaders.Names.HOST, uri.getHost())
request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE)
request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP)
val writeFuture = channel.write(request).awaitUninterruptibly()
FutureReactor !? writeFuture match {
case future : ChannelFuture => {
future.addListener(new ChannelFutureListener() {
def operationComplete(future:ChannelFuture) {
// Perform post-closure operation
println("current response is " + resp)
sendResponse("look ma I finished")
}
})
future.getChannel().close()
}
}
this ! ShutDown
}
}
//thread ends only if you send a reply from here
//println("this is final sender " + sender)
//reply("I am the true end")
}
private def makeChannel(uri:URI) = {
val scheme = Some(uri.getScheme()).getOrElse("http")
val host = Some(uri.getHost()).getOrElse("localhost")
val port = Utils.getPort(uri.getPort, uri.getScheme)
// Set up the event pipeline factory.
val client = new ClientBootstrap(factory)
client.setPipelineFactory(new PipelineFactory(this))
//get the promised channel
val channel = NettyFutureBridge(client.connect(new InetSocketAddress(host, port)))
channel
}
private def setResponse(aResponse:Response) = resp = aResponse
private def sendResponse(msg:String) = {
println("Sending the response " + msg)
reply(resp)
}
private def shutDown() = {
println("got a shutdown message")
val groupFuture = allChannels.close().awaitUninterruptibly()
factory.releaseExternalResources()
}
override def exceptionHandler = {
case e : CancellationException => println("The request was cancelled"); throw e
case tr: Throwable => println("An unknown exception happened " + tr.getCause()); throw tr
}
}
trait Response
case class RequestPage(url:URI)
case class Reply(content:String) extends Response
case object NoReply extends Response
case object NotReadable extends Response
case object ShutDown
object FutureReactor extends Actor{
def act = //loop {
react {
case future: ChannelFuture => {
if (future.isCancelled) {
throw new CancellationException()
}
if (!future.isSuccess()) {
future.getCause().printStackTrace()
throw future.getCause()
}
if(future.isSuccess() && future.isDone()){
future.getChannel().getCloseFuture().awaitUninterruptibly()
reply(future)
}
}
}
//}
this.start
}
class ClientHandler(listener:Actor) extends SimpleChannelUpstreamHandler {
override def exceptionCaught( ctx:ChannelHandlerContext, e:ExceptionEvent){
e.getCause().printStackTrace()
e.getChannel().close();
throw e.getCause()
}
override def messageReceived(ctx:ChannelHandlerContext, e:MessageEvent) = {
var contentString = ""
var httpResponse:Response = null.asInstanceOf[Response]
e.getMessage match {
case (response: HttpResponse) if !response.isChunked => {
println("STATUS: " + response.getStatus);
println("VERSION: " + response.getProtocolVersion);
println
val content = response.getContent();
if (content.readable()) {
contentString = content.toString(CharsetUtil.UTF_8)
httpResponse = Reply(contentString)
//notify actor
}else{
httpResponse = NotReadable
}
}
case chunk: HttpChunk if !chunk.isLast => {
//get chunked content
contentString = chunk.getContent().toString(CharsetUtil.UTF_8)
httpResponse = Reply(contentString)
}
case _ => httpResponse = NoReply
}
println("sending actor my response")
listener ! httpResponse
println("closing the channel")
e.getChannel().close()
//send the close event
}
}
class PipelineFactory(listener:Actor) extends ChannelPipelineFactory {
def getPipeline(): ChannelPipeline = {
// Create a default pipeline implementation.
val pipeline = org.jboss.netty.channel.Channels.pipeline()
pipeline.addLast("codec", new HttpClientCodec())
// Remove the following line if you don't want automatic content decompression.
pipeline.addLast("inflater", new HttpContentDecompressor())
// Uncomment the following line if you don't want to handle HttpChunks.
//pipeline.addLast("aggregator", new HttpChunkAggregator(1048576))
pipeline.addLast("decoder", new HttpRequestDecoder())
//assign the handler
pipeline.addLast("handler", new ClientHandler(listener))
pipeline;
}
}
object NettyFutureBridge {
import scala.concurrent.{ Promise, Future }
import scala.util.Try
import java.util.concurrent.CancellationException
import org.jboss.netty.channel.{ Channel, ChannelFuture, ChannelFutureListener }
def apply(nettyFuture: ChannelFuture): Future[Channel] = {
val p = Promise[Channel]()
nettyFuture.addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit = p complete Try(
if (future.isSuccess) {
println("Success")
future.getChannel
}
else if (future.isCancelled) {
println("Was cancelled")
throw new CancellationException
}
else {
future.getCause.printStackTrace()
throw future.getCause
})
})
p.future
}
}
测试代码
val url = "http://hiverides.com"
test("Http Request Actor can recieve and react to message"){
val actor = new HttpRequestActor()
actor.start
val response = actor !? new RequestPage(new URI(url))
match {
case Reply(msg) => {
println("this is the reply response in test")
assert(msg != "")
println(msg)
}
case NoReply => println("Got No Reply")
case NotReadable => println("Got a not Reachable")
case None => println("Got a timeout")
case s:Response => println("response string \n" + s)
case x => {println("Got a value not sure what it is"); println(x);}
}
}
使用的库: - Scala 2.9.2 - Netty.io 3.6.1.Final - Junit 4.7 - 最新 1.8 - 我还使用@viktorklang NettyFutureBridge 对象要点为返回的通道对象创建一个 scala 未来
如何从Netty向actor对象发送包含响应内容的回复并结束线程?
任何帮助都将不胜感激
我不了解Scala,但我也遇到过类似的问题。尝试指定响应的content-length标头。
普通java语言:
HttpRequest r = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uri);
ChannelBuffer buffer = ChannelBuffers.copiedBuffer(input);
r.setHeader(HttpHeaders.Names.HOST, "host");
r.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
r.setHeader(HttpHeaders.Names.CONTENT_LENGTH, buffer.readableBytes());
r.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
r.setContent(buffer);
否则,服务器不知道何时从客户端完成内容,除非客户端关闭连接。
您也可以使用块编码,但您必须自己实现块编码(至少我不知道Netty中有哪个库可以实现它)。
在过去的几天里,我一直在探索Netty,因为我正在编写一个快速而紧凑的HTTP服务器,它应该能够接收大量请求,而Netty的HTTP服务器实现非常简单,可以完成这项工作。 我的下一步是作为请求处理的一部分,我需要启动一个到外部web服务器的HTTP请求。我的直觉是实现一个可以同时发送大量请求的异步客户机,但我有点困惑什么是正确的方法。我的理解是,Netty服务器对每个传入消息都使用一个工作线程,因
我有2个微服务(A和B)。 有一个接受POST请求的endpoint。当用户发出发布请求时,会发生以下情况: 服务A从POST请求正文中获取对象并将其存储在数据库中。 服务A将对象转换为不同的对象。新对象通过泽西HTTP客户端发送到服务B。 步骤 2 发生在我创建的 Java 线程池(Executors.new缓存线程池)上。通过在新线程上执行步骤 2,服务 A 的终结点的响应时间不受影响。 但是
我在使用netty作为http提供者的异步http客户端时遇到了一个奇怪的问题,它是一个调用远程web服务的play应用程序,而我们没有访问权限。还需要注意的是,该服务位于SSL加密的域上,并且只转发到该域,而不转发到IP。堆栈跟踪: 最奇怪的是,这是突然发生的(这是一个客户使用的测试环境),这让我相信接收服务或网络出了问题,但我无法发现哪里出了问题,因为使用curl调用同一个url可以正常工作。
问题内容: 我有一个PHP脚本,需要从远程服务器下载多个文件。目前,我只有一个循环使用cURL下载和处理文件,这意味着它直到下一个文件完成后才开始下载一个文件- 这大大增加了脚本的运行时间。 是否可以启动多个cURL实例,例如,同时异步下载这些文件而无需等待上一个实例完成?如果是这样,将如何实现? 问题答案: 是。 有一个multirequest PHP库 (或参见:已归档的Google Code
但是,如下所示的调用序列使用相同的TCP连接。 我还没有对此进行调试,但OkHttp似乎迫使我们首先在主线程上发出阻塞HTTP请求,以获得TCP连接上下文,然后与其他线程共享该上下文?或者,我是不是漏掉了什么?
什么是 Netty Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。Netty 提供高性能和可扩展性,让你可以自由地专注于你真正感兴趣的东西,你的独特的应用! 在这一章我们将解释 Netty 在处理一些高并发的网络问题体现的价值。然后,我们将介绍基本概念和构成 Netty 的工具包,我们将在这本书的其余部分深入研究。 一些历