我正在尝试将Flink与Elasticsearch 2.1.1集成,我正在使用Maven依赖项
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
这是我从Kafka队列中读取事件的Java代码(工作正常),但是无论如何,如果我更改了任何相关设置,则事件不会在Elasticsearch中发布,也没有错误,在以下代码中到ElasticSearch的端口,主机名,集群名称或索引名称,然后立即看到错误,但当前它不显示任何错误,也没有在ElasticSearch中创建任何新文档
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.print();
Map<String, String> config = new HashMap<>();
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "1");
config.put("cluster.name", "FlinkDemo");
List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));
messageStream.addSink(new ElasticsearchSink<String>(config, transports, new TestElasticsearchSinkFunction()));
env.execute();
}
private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<String> {
private static final long serialVersionUID = 1L;
public IndexRequest createIndexRequest(String element) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("flink").id("hash"+element).source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
我确实确实在本地计算机上运行它并进行调试,但是,我缺少的唯一一件事就是正确配置日志记录,因为大多数弹性问题在“
log.warn”语句中进行了描述。问题是elasticsearch-2.2.1客户端API中“
BulkRequestHandler.java”内部的异常,该异常引发错误-“
org.elasticsearch.action.ActionRequestValidationException:验证失败:1:类型丢失;”。因为我创建了索引但没有创建一个类型,但我发现它很奇怪,因为它应该主要与索引有关,并默认创建该类型。
我正在从事一个使用Apache Camel和Elasticsearch的项目,我想知道Camel支持哪个版本的Elasticsearch? 我的pom.xml是这样的: 但是当我想将文件路由到elasticsearch时,我遇到了以下错误: Java语言lang.IllegalStateException:收到来自不受支持版本的消息:[2.0.0]最小兼容版本为:[5.0.0] 我发现这个异常是由
问题内容: 我已经安装了Elasticsearch以及Neo4j。我想使用“用于ElasticSearch的Neo4j River插件”插件将Elasticsearch与Neo4j集成。谁能告诉我如何整合这两者。我也在寻找一些用例示例,其中我将清楚地了解noe4j如何与elasticsearch一起工作。 问题答案: 我们应该已经安装了Elasticsearch&Neo4j。要与Neo4j Riv
问题内容: 在我的一个项目中,我计划将ElasticSearch与mysql一起使用。我已经成功安装了ElasticSearch。我能够单独管理ES中的索引。但我不知道如何用mysql实现相同的功能。 我已经阅读了几份文件,但我有点困惑,不清楚。谁能帮帮我吗? 提前致谢。 问题答案: 终于我找到了答案。分享我的发现。 要将ElasticSearch与Mysql一起使用,您将需要Java数据库连接(
我在运行一个将spring boot与弹性搜索集成在一起的简单应用程序时遇到了这个错误。你能帮我解决这个问题吗?我是初学者。 我正在尝试将一些书籍映射到ES中,并能够使用ES Java API中的客户端配置打印它们。我想知道这些版本是否使用不当,所以请查看我的pom。xml 它表示通过字段“es”表示的未满足的依赖关系,我不知道这是什么意思。关于NoClassDefFoundError- 这是st
问题内容: 有任何方法可以将用于Ruby的新Elasticsearch宝石集成到轨道中,轮胎很棒,但两个月以来就已淘汰,并由新宝石替代,但是还没有与轨道的集成功能。 所有教程都使用累了,但是现在,我们如何在Elasticsearch中使用rails? 问题答案: 还有另一个名为“ searchkick”的宝石,它将elasticsearch与Rails集成在一起: https://github.c
EdIndexBolt,EsPercolateBolt和Estate允许用户将storm中的数据直接传输到Elasticsearch。 详细说明请参考以下内容。 EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt) EsIndexBolt将tuples直接流入Elasticsearch索。 Tuples以指定的索引和类型组合进行