当前位置: 首页 > 知识库问答 >
问题:

在logstash中以事务方式发送事件

咸晨
2023-03-14
START TXN 123         --No message sent to Kafka
123 - Event1 Message  --No message sent to Kafka
123 - Event2 Message  --No message sent to Kafka
123 - Event3 Message  --No message sent to Kafka
COMMIT TXN 123           --Event1, Event2, Event3 messages sent to Kafka
input {
  tcp {
    port => 9000
  }
}

output {
  kafka { 
    bootstrap_servers => "localhost:9092"
    topic_id =>  "alpayk"
  }
}

我试着使用use logstash的聚合过滤器来达到这个目的,但是我不能得到一个工作的结果。

非常感谢你

共有1个答案

薛楷
2023-03-14

我最终决定为此使用Apache水槽。我修改了它的netcat源,使未提交的消息驻留在Flume的堆中,并且一旦收到事务的提交消息,所有消息就会发送到kafka接收器。

我将把消息存储位置从flume堆更改为外部缓存,这样,如果事务停止或回滚,我将能够使存储的消息过期。

下面是该事务逻辑的代码:

String eventMessage = new String(body);
int indexOfTrxIdSeparator = eventMessage.indexOf("-");
if (indexOfTrxIdSeparator != -1) {
    String txnId = eventMessage.substring(0, indexOfTrxIdSeparator).trim();
    String message = eventMessage.substring(indexOfTrxIdSeparator + 1).trim();
    ArrayList<Event> events = cachedEvents.get(txnId);

    if (message.equals("COMMIT")) {

        System.out.println("@@@@@ COMMIT RECEIVED");

        if (events != null) {
            for (Event eventItem : events) {
                ChannelException ex = null;
                try {
                    source.getChannelProcessor().processEvent(eventItem);
                } catch (ChannelException chEx) {
                    ex = chEx;
                }

                if (ex == null) {
                    counterGroup.incrementAndGet("events.processed");
                } else {
                    counterGroup.incrementAndGet("events.failed");
                    logger.warn("Error processing event. Exception follows.", ex);
                }
            }

            cachedEvents.remove(txnId);
        }
    } else {
        System.out.println("@@@@@ MESSAGE RECEIVED: " + message);
        if (events == null) {
            events = new ArrayList<Event>();
        }
        events.add(EventBuilder.withBody(message.getBytes()));
        cachedEvents.put(txnId, events);
    }
}
 类似资料:
  • 我有一个后端服务器,它将事件作为服务器发送的事件发送给客户端。我还没有找到一个好的库来在Android上处理这项技术,所以我一直在使用一种回退方法,定期检查服务器(通过访问事件endpoint)中的新事件。 后台服务每10秒执行一次。不用说,这不是最好的方法。如果没有任何开源库可用于此场景,那么在内存使用和电池消耗方面,定期检查服务器后端是否有新事件的最佳方法是什么?与在Android中管理开放式

  • 概述 客户端代码 概述 建立连接 open事件 message事件 error事件 自定义事件 close方法 数据格式 概述 data:数据栏 id:数据标识符 event栏:自定义信息类型 retry:最大间隔时间 服务器代码 参考链接 概述 传统的网页都是浏览器向服务器“查询”数据,但是很多场合,最有效的方式是服务器向浏览器“发送”数据。比如,每当收到新的电子邮件,服务器就向浏览器发送一个“

  • 概述 客户端代码 概述 建立连接 open事件 message事件 error事件 自定义事件 close方法 数据格式 概述 data:数据栏 id:数据标识符 event栏:自定义信息类型 retry:最大间隔时间 服务器代码 参考链接 概述 传统的网页都是浏览器向服务器“查询”数据,但是很多场合,最有效的方式是服务器向浏览器“发送”数据。比如,每当收到新的电子邮件,服务器就向浏览器发送一个“

  • 当前,我已经在Express API上为服务器发送的事件设置了一个endpoint,当我从cmd调用endpoint时,使用http://localhost:3000/v1/devices,我得到了流,但当我使用新事件源从Vue应用程序调用endpoint时('http://localhost:3000/v1/devices“)但它只会继续加载,而不会流式传输任何数据。是否有我目前没有做的事情要做

  • 我的登录格式如下,它是一个带有嵌套字段的普通json。 如何使用Filebeat和Logstash正确地解析它,以将Kibana中的所有json字段视为单独的(已解析的)字段?我在“message”字段中遇到了一个问题,它嵌套了json字段。我解析一个在“message”中有字符串的事件没有问题,但不是JSON。 我的尝试: [2019-03-08T09:55:47,084][WARN][logs

  • 事件可以由Hyperledger Composer发出并由外部应用程序订阅。事件在业务网络定义的模型文件中定义,并由交易处理器函数文件中的交易JavaScript发出。 在你开始之前 在开始将事件添加到您的业务网络之前,您应该对业务网络的建模语言以及构成完整的业务网络定义的内容有深入的了解。 过程 1.事件在业务网络定义的模型文件(.cto)中定义,与资产和参与者相同。事件使用以下格式: e