当前位置: 首页 > 知识库问答 >
问题:

Rest APIKafka流

张森
2023-03-14
{"topic":"twitterFeeds","value":"RT @flyingtsunami: @SethAbramson **READ THIS UNTIL IT SINKS IN** #POTUS  #Comey's firing (he was); T…","offset":0,"partition":0,"highWaterOffset":1906,"key":null}
{"topic":"twitterFeeds","value":"RT @RCorbettMEP: Why does the @BBC news only …","offset":1,"partition":0,"highWaterOffset":1906,"key":null}
[
{"topic":"twitterFeeds","value":"RT @flyingtsunami: @SethAbramson **READ THIS UNTIL IT SINKS IN** #POTUS  #Comey's firing (he was); T…","offset":0,"partition":0,"highWaterOffset":1906,"key":null}
,
{"topic":"twitterFeeds","value":"RT @RCorbettMEP: Why does the @BBC news only …","offset":1,"partition":0,"highWaterOffset":1906,"key":null}
]
const express = require('express');
const router = express.Router();
    var output1 = '';
var http = require('http');  
var kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client(),
    consumer = new Consumer(
        client,
        [
        { topic: 'twitterFeeds', partition: 0 }
        ],
        {
            autoCommit: false
        }
    );
    var output2 = consumer.on('message', function (message) {
     obj = JSON.stringify(message);
     output1 = output1 + obj;
    });

router.get('/',(req,res)=>{
        res.send(output1);
    });
module.exports = router;

共有1个答案

邹毅
2023-03-14

我目前正在做一个类似的项目(如果你有兴趣,请帮忙)-超燃冲压发动机-Kafka

我已经在scramjet Datastream.tojsonArray中解决了上述问题--它只是将数据作为JSON数组流式传输。在您的情况下,它非常简单:

new ConsumerStream(
    client,
    [ { topic: 'twitterFeeds', partition: 0 } ],
    { autoCommit: false }
)
    .pipe(new scramjet.DataStream)
    // you may want to do some transforms here
    .toJSONArray()
    .pipe(process.stdout)
    // or pipe it to any other stream, accumulate and so on.

如果你喜欢这个项目,你可以帮助-请通过GitHub联系。

 类似资料:
  • 问题内容: 我有3台机器: 文件所在的服务器 运行REST服务的服务器(泽西岛) 可以访问第二台服务器但不能访问第一台服务器的客户端(浏览器) 我如何直接(不将文件保存在第二台服务器上)将文件从第一台服务器下载到客户端计算机? 从第二台服务器可以获取 ByteArrayOutputStream 来从第一台服务器获取文件,我可以使用REST服务将此流进一步传递给客户端吗? 这样行吗? 因此,基本上我

  • 问题内容: 我正在使用Java开发SIP应用程序,并且想知道目前最常用的SIP库是什么。 MJSIP? 问题答案: 据我所知,它的JAIN- SIP 。顺便说一句,很高兴了解MjSip。您可能对JBoss Mobicent的 关注感兴趣,当前用户指南尚不完整。并且您不会在Mobicent上找到太多帮助。 或者,正如metadaddy说在这里,“你可能想看看SailFin -它是一个SIP Serv

  • 问题内容: 在上一个问题中如何在Java 8中动态进行过滤?StuartMarks给出了一个很好的答案,并提供了一些有用的实用程序来处理从流中选择topN和topPercent。 我将从他的原始答案中将它们包括在这里: 我的问题是: [1]如何从具有一定数量项目的流中获取3到7的顶级项目,因此,如果流中有A1,A2 .... A10中的项目,则调用 将返回{A3,A4,A5,A6,A7} 我能想到

  • 问题内容: 将字符串拆分为流的 最佳 方法是什么? 我看到了这些变化: 我的优先事项是: 坚固性 可读性 性能 一个完整的可编译示例: 问题答案: / 由于返回数组,因此我始终建议您使用流式处理数组的规范用法。 / 是一个 varargs 方法,它恰好接受一个数组,这是由于varargs方法是通过数组实现的,并且在将varargs引入Java以及改型为接受可变参数的现有方法时,存在兼容性方面的问题

  • 问题内容: 我是Java 8的新手,不确定如何使用流及其排序方法。如果我的地图如下,如何使用Java 8按值对地图进行排序以仅获取前10个条目。 我知道在Java 8之前,我们可以按以下链接进行排序:http://codingdict.com/questions/116310 问题答案: 您可以随时开始阅读文档和一些 教程。 参考 http://www.leveluplunch.com/java/

  • 问题内容: 我有以下情况 我已经合并所有列表从从对象地图。 知道如何使用Java 8流API吗? 问题答案: 我想这就是您要寻找的。 例如:

  • 问题内容: 我需要使用bouncycastle提供程序使用pgp加密流。我可以找到的所有示例都是关于获取纯文本文件并对其进行加密的,但是我没有文件,因此务必不要将纯文本写入磁盘。 我见过的大多数方法都在使用 希望传递纯文本的PGPUtil.writeFileToLiteralData。我宁愿传递byte []或inputStream。 有人可以指出一个例子吗 从字符串/字节[] /输入流开始 将所

  • 问题内容: 我一直在寻找新的rx java 2,但我不确定我是否已经明白了这个主意… 我知道我们所拥有的并没有支持。 因此,基于例如,可以说我有有: 在大约128个值之后,这将崩溃,这很明显我消耗的速度比获取项目要慢。 但是,我们有相同的 即使我延迟使用它,它仍然完全不会崩溃。为了工作,可以说我放了一个运算符,崩溃已经消失了,但并不是所有值都被发出。 因此,我目前在脑海中找不到答案的基本问题是,为