当前位置: 首页 > 知识库问答 >
问题:

Elasticsearch连接器作为Flink中的源

澹台承
2023-03-14

我使用Elasticsearch Connector作为Sink将数据插入到Elasticsearch中(参见:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html)。

但是,我并没有找到任何从Elasticsearch获取数据作为源的连接器。

在Flink pipline中是否有使用Elasticsearch文档作为源的连接器或示例?

当做

阿里

共有3个答案

隆飞宇
2023-03-14

Hadoop兼容性弹性搜索Hadoop

https://github.com/cclient/flink-connector-elasticsearch-source

秦联
2023-03-14

我最终定义了一个简单的ReadfromElasticSearch函数

    public static class ElasticsearchFunction
        extends ProcessFunction<MetricMeasurement, MetricPrediction> {

    public ElasticsearchFunction() throws UnknownHostException {
        client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("YOUR_IP"), PORT_NUMBER));
    }

    @Override
    public void processElement(MetricMeasurement in, Context context, Collector<MetricPrediction> out) throws Exception {
        MetricPrediction metricPrediction = new MetricPrediction();

        metricPrediction.setMetricId(in.getMetricId());
        metricPrediction.setGroupId(in.getGroupId());
        metricPrediction.setBucket(in.getBucket());

        // Get the metric measurement from Elasticsearch
        SearchResponse response = client.prepareSearch("YOUR_INDEX_NAME")
                .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
                .setQuery(QueryBuilders.termQuery("YOUR_TERM", in.getMetricId()))   // Query
                .setPostFilter(QueryBuilders.rangeQuery("value").from(0L).to(50L))     // Filter
                .setFrom(0).setSize(1).setExplain(true)
                .get();

        SearchHit[] results = response.getHits().getHits();
        for(SearchHit hit : results){
            String sourceAsString = hit.getSourceAsString();
            if (sourceAsString != null) {
                ObjectMapper mapper = new ObjectMapper();
                MetricMeasurement obj = mapper.readValue(sourceAsString, MetricMeasurement.class);
                obj.getMetricId();
                metricPrediction.setPredictionValue(obj.getValue());
            }
        }
        out.collect(metricPrediction);
    }
}
雍宇定
2023-03-14

我不知道Flink有没有明确的ES来源。我确实看到一个用户在谈论使用elasticsearch hadoop作为Flink的HadoopInputFormat,但我不知道这是否对他们有效(参见他们的代码)。

 类似资料:
  • 问题内容: 通过阅读文档,我了解到使用Apache Flink 1.3,我应该能够使用Elasticsearch5.x。 但是,在我的pom.xml中: 我懂了 : 找不到依赖项“ org.apache.flink:flink-connector-elasticsearch5_2.10:1.3.0” 知道为什么没有这种依赖性吗? 问题答案: 这是1.3.0版本中的错误,并已在1.3.1版中修复(即

  • 我使用的是和连接器jar版本为0.10.2,kafka版本为0.9.1,flink版本为1.0.0。 当我在IDE中作为独立的主程序运行Java消费者时,它工作得很好。但是当我从运行它时,我不会看到正在使用的消息,也不会看到中JobManager的stdout中的任何日志。请告诉我可能有什么问题。

  • 我正在使用Apache Flink,并尝试通过使用Apache Kafka协议从它接收消息来连接到Azure eventhub。我设法连接到Azure eventhub并接收消息,但我不能使用这里(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-star

  • 根据ApacheFlink文档,它有预定义的数据源。它还提到了其他一些连接器,例如RabitMq连接器等。我想知道是否有类似的项目可以使用Webhook作为数据源。

  • 在Apache Flink流处理中,连接操作与连接有何不同,因此CoProcessFunction和ProcessJoinFunction有何不同,这是CoProcessFunction提供的onTimer函数吗?您能否提供一个适用于以相互排斥的方式连接/连接的示例用例。

  • 我有一个包含Docker和Elasticsearch(OS:Centos7)的VM。我想创建一个Kibana docker并与我的ES连接。 ES包含索引,如果输入curl-s http://localhost:9200/_cat/index,我就得到了索引列表。 我使用Dockerfile创建了我的Kibana映像: docker构建-t=“kibana_test”。 docker运行--名称k