1.turbine是什么?它的作用是什么?
Turbine is a tool for aggregating streams of Server-Sent Event (SSE) JSON data into a single stream. The targeted use case is metrics streams from instances in an SOA being aggregated for dashboards.
For example, Netflix uses Hystrix which has a realtime dashboard that uses Turbine to aggregate data from 100s or 1000s of machines.
2.eureka启动turbine类StartEurekaTurbine
public static void main(String[] args) { OptionParser optionParser = new OptionParser(); optionParser.accepts("port").withRequiredArg(); optionParser.accepts("app").withRequiredArg(); optionParser.accepts("urlTemplate").withRequiredArg(); OptionSet options = optionParser.parse(args); int port = -1; if (!options.has("port")) { System.err.println("Argument -port required for SSE HTTP server to start on. Eg. -port 8888"); System.exit(-1); } else { try { port = Integer.parseInt(String.valueOf(options.valueOf("port"))); } catch (NumberFormatException e) { System.err.println("Value of port must be an integer but was: " + options.valueOf("port")); } } String app = null; if (!options.has("app")) { System.err.println("Argument -app required for Eureka instance discovery. Eg. -app api"); System.exit(-1); } else { app = String.valueOf(options.valueOf("app")); } String template = null; if (!options.has("urlTemplate")) { System.err.println("Argument -urlTemplate required. Eg. http://" + EurekaStreamDiscovery.HOSTNAME + "/metrics.stream"); System.exit(-1); } else { template = String.valueOf(options.valueOf("urlTemplate")); if (!template.contains(EurekaStreamDiscovery.HOSTNAME)) { System.err.println("Argument -urlTemplate must contain " + EurekaStreamDiscovery.HOSTNAME + " marker. Eg. http://" + EurekaStreamDiscovery.HOSTNAME + "/metrics.stream"); System.exit(-1); } } logger.info("Turbine => Eureka App: " + app); logger.info("Turbine => Eureka URL Template: " + template); try { Turbine.startServerSentEventServer(port, EurekaStreamDiscovery.create(app, template)); } catch (Throwable e) { e.printStackTrace(); } }
执行类如下;
startServerSentEventServer(port, aggregateHttpSSE(discovery));
首先,聚合http
/** * Aggregate multiple HTTP Server-Sent Event streams into one stream with the values summed. * <p> * The returned data must be JSON data that contains the following keys: * <p> * instanceId => Unique instance representing each stream to be merged, such as the instanceId of the server the stream is from. * type => The type of data such as HystrixCommand or HystrixThreadPool if aggregating Hystrix metrics. * name => Name of a group of metrics to be aggregated, such as a HystrixCommand name if aggregating Hystrix metrics. * * @param uri * @return */ public static Observable<GroupedObservable<TypeAndNameKey, Map<String, Object>>> aggregateHttpSSE(StreamDiscovery discovery) { Observable<StreamAction> streamActions = discovery.getInstanceList().publish().refCount(); Observable<StreamAction> streamAdds = streamActions.filter(a -> a.getType() == ActionType.ADD); Observable<StreamAction> streamRemoves = streamActions.filter(a -> a.getType() == ActionType.REMOVE); Observable<GroupedObservable<InstanceKey, Map<String, Object>>> streamPerInstance = streamAdds.map(streamAction -> { URI uri = streamAction.getUri(); Observable<Map<String, Object>> io = Observable.defer(() -> { Observable<Map<String, Object>> flatMap = RxNetty.createHttpClient(uri.getHost(), uri.getPort(), PipelineConfigurators.<ByteBuf>sseClientConfigurator()) .submit(createRequest(uri)) .flatMap(response -> { if (response.getStatus().code() != 200) { return Observable.error(new RuntimeException("Failed to connect: " + response.getStatus())); } return response.getContent() .doOnSubscribe(() -> logger.info("Turbine => Aggregate Stream from URI: " + uri.toASCIIString())) .doOnUnsubscribe(() -> logger.info("Turbine => Unsubscribing Stream: " + uri)) .takeUntil(streamRemoves.filter(a -> a.getUri().equals(streamAction.getUri()))) // unsubscribe when we receive a remove event .map(sse -> JsonUtility.jsonToMap(sse.getEventData())); }); // eclipse is having issues with type inference so breaking up return flatMap.retryWhen(attempts -> { return attempts.flatMap(e -> { return Observable.timer(1, TimeUnit.SECONDS) .doOnEach(n -> logger.info("Turbine => Retrying connection to: " + uri)); }); }); }); return GroupedObservable.from(InstanceKey.create(uri.toASCIIString()), io); }); return StreamAggregator.aggregateGroupedStreams(streamPerInstance); }
然后启动聚合
public static void startServerSentEventServer(int port, Observable<GroupedObservable<TypeAndNameKey, Map<String, Object>>> streams) { logger.info("Turbine => Starting server on " + port); // multicast so multiple concurrent subscribers get the same stream Observable<Map<String, Object>> publishedStreams = streams .doOnUnsubscribe(() -> logger.info("Turbine => Unsubscribing aggregation.")) .doOnSubscribe(() -> logger.info("Turbine => Starting aggregation")) .flatMap(o -> o).publish().refCount(); RxNetty.createHttpServer(port, (request, response) -> { logger.info("Turbine => SSE Request Received"); response.getHeaders().setHeader("Content-Type", "text/event-stream"); return publishedStreams .doOnUnsubscribe(() -> logger.info("Turbine => Unsubscribing RxNetty server connection")) .flatMap(data -> { return response.writeAndFlush(new ServerSentEvent(null, null, JsonUtility.mapToJson(data))); }); }, PipelineConfigurators.<ByteBuf>sseServerConfigurator()).startAndWait(); }
3.单独启动turbine的过程和上面类似StartTurbine
public static void main(String[] args) { OptionParser optionParser = new OptionParser(); optionParser.accepts("port").withRequiredArg(); optionParser.accepts("streams").withRequiredArg(); OptionSet options = optionParser.parse(args); int port = -1; if (!options.has("port")) { System.err.println("Argument -port required for SSE HTTP server to start on."); System.exit(-1); } else { try { port = Integer.parseInt(String.valueOf(options.valueOf("port"))); } catch (NumberFormatException e) { System.err.println("Value of port must be an integer but was: " + options.valueOf("port")); } } URI[] streams = null; if (!options.hasArgument("streams")) { System.err.println("Argument -streams required with URIs to connect to. Eg. -streams \"http://host1/metrics.stream http://host2/metrics.stream\""); System.exit(-1); } else { String streamsArg = String.valueOf(options.valueOf("streams")); String[] ss = streamsArg.split(" "); streams = new URI[ss.length]; for (int i = 0; i < ss.length; i++) { try { streams[i] = new URI(ss[i]); } catch (URISyntaxException e) { System.err.println("ERROR: Could not parse stream into URI: " + ss[i]); System.exit(-1); } } } if (streams == null || streams.length == 0) { System.err.println("There must be at least 1 valid stream URI."); System.exit(-1); } try { Turbine.startServerSentEventServer(port, Turbine.aggregateHttpSSE(streams)); } catch (Throwable e) { e.printStackTrace(); } }