我使用Elasticsearch Connector作为Sink将数据插入到Elasticsearch中(参见:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html)。
但是,我并没有找到任何从Elasticsearch获取数据作为源的连接器。
在Flink pipline中是否有使用Elasticsearch文档作为源的连接器或示例?
当做
阿里
Hadoop兼容性弹性搜索Hadoop
https://github.com/cclient/flink-connector-elasticsearch-source
我最终定义了一个简单的ReadfromElasticSearch函数
public static class ElasticsearchFunction
extends ProcessFunction<MetricMeasurement, MetricPrediction> {
public ElasticsearchFunction() throws UnknownHostException {
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("YOUR_IP"), PORT_NUMBER));
}
@Override
public void processElement(MetricMeasurement in, Context context, Collector<MetricPrediction> out) throws Exception {
MetricPrediction metricPrediction = new MetricPrediction();
metricPrediction.setMetricId(in.getMetricId());
metricPrediction.setGroupId(in.getGroupId());
metricPrediction.setBucket(in.getBucket());
// Get the metric measurement from Elasticsearch
SearchResponse response = client.prepareSearch("YOUR_INDEX_NAME")
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termQuery("YOUR_TERM", in.getMetricId())) // Query
.setPostFilter(QueryBuilders.rangeQuery("value").from(0L).to(50L)) // Filter
.setFrom(0).setSize(1).setExplain(true)
.get();
SearchHit[] results = response.getHits().getHits();
for(SearchHit hit : results){
String sourceAsString = hit.getSourceAsString();
if (sourceAsString != null) {
ObjectMapper mapper = new ObjectMapper();
MetricMeasurement obj = mapper.readValue(sourceAsString, MetricMeasurement.class);
obj.getMetricId();
metricPrediction.setPredictionValue(obj.getValue());
}
}
out.collect(metricPrediction);
}
}
我不知道Flink有没有明确的ES来源。我确实看到一个用户在谈论使用elasticsearch hadoop作为Flink的HadoopInputFormat
,但我不知道这是否对他们有效(参见他们的代码)。
问题内容: 通过阅读文档,我了解到使用Apache Flink 1.3,我应该能够使用Elasticsearch5.x。 但是,在我的pom.xml中: 我懂了 : 找不到依赖项“ org.apache.flink:flink-connector-elasticsearch5_2.10:1.3.0” 知道为什么没有这种依赖性吗? 问题答案: 这是1.3.0版本中的错误,并已在1.3.1版中修复(即
我使用的是和连接器jar版本为0.10.2,kafka版本为0.9.1,flink版本为1.0.0。 当我在IDE中作为独立的主程序运行Java消费者时,它工作得很好。但是当我从运行它时,我不会看到正在使用的消息,也不会看到中JobManager的stdout中的任何日志。请告诉我可能有什么问题。
我正在使用Apache Flink,并尝试通过使用Apache Kafka协议从它接收消息来连接到Azure eventhub。我设法连接到Azure eventhub并接收消息,但我不能使用这里(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-star
根据ApacheFlink文档,它有预定义的数据源。它还提到了其他一些连接器,例如RabitMq连接器等。我想知道是否有类似的项目可以使用Webhook作为数据源。
在Apache Flink流处理中,连接操作与连接有何不同,因此CoProcessFunction和ProcessJoinFunction有何不同,这是CoProcessFunction提供的onTimer函数吗?您能否提供一个适用于以相互排斥的方式连接/连接的示例用例。
我有一个包含Docker和Elasticsearch(OS:Centos7)的VM。我想创建一个Kibana docker并与我的ES连接。 ES包含索引,如果输入curl-s http://localhost:9200/_cat/index,我就得到了索引列表。 我使用Dockerfile创建了我的Kibana映像: docker构建-t=“kibana_test”。 docker运行--名称k