我正在编写一个Kafka source connector,它基于一个我用于音频文件的工作生成器。连接器启动了,但是没有任何反应,没有错误,没有数据,我不确定这是编码问题还是配置问题。
连接器应该读取整个目录,并将文件读取为字节数组。
配置类:
package hothman.example;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Importance;
import java.util.Map;
public class AudioSourceConnectorConfig extends AbstractConfig {
public static final String FILENAME_CONFIG="fileName";
private static final String FILENAME_DOC ="Enter the path of the audio files";
public static final String TOPIC_CONFIG = "topic";
private static final String TOPIC_DOC = "Enter the topic to write to..";
public AudioSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
}
public AudioSourceConnectorConfig(Map<String, String> parsedConfig) {
this(conf(), parsedConfig);
}
public static ConfigDef conf() {
return new ConfigDef()
.define(FILENAME_CONFIG, Type.STRING, Importance.HIGH, FILENAME_DOC)
.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, TOPIC_DOC);
}
public String getFilenameConfig(){
return this.getString("fileName");
}
public String getTopicConfig(){
return this.getString("topic");
}
}
源连接器类
package hothman.example;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AudioSourceConnector extends SourceConnector {
/*
Your connector should never use System.out for logging. All of your classes should use slf4j
for logging
*/
private static Logger log = LoggerFactory.getLogger(AudioSourceConnector.class);
private AudioSourceConnectorConfig config;
private String filename;
private String topic;
@Override
public String version() {
return VersionUtil.getVersion();
}
@Override
public void start(Map<String, String> props) {
filename = config.getFilenameConfig();
topic = config.getTopicConfig();
if (topic == null || topic.isEmpty())
throw new ConnectException("AudiSourceConnector configuration must include 'topic' setting");
if (topic.contains(","))
throw new ConnectException("AudioSourceConnector should only have a single topic when used as a source.");
}
@Override
public Class<? extends Task> taskClass() {
//TODO: Return your task implementation.
return AudioSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configsList = new ArrayList<>();
// Only one input stream makes sense.
Map<String, String> configs = new HashMap<>();
if (filename != null)
configs.put(config.getFilenameConfig(), filename);
configs.put(config.getTopicConfig(), topic);
configsList.add(configs);
return configsList;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return AudioSourceConnectorConfig.conf();
}
}
源任务类
package hothman.example;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.util.*;
import static com.sun.nio.file.ExtendedWatchEventModifier.FILE_TREE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
public class AudioSourceTask extends SourceTask {
/*
Your connector should never use System.out for logging. All of your classes should use slf4j
for logging
*/
static final Logger log = LoggerFactory.getLogger(AudioSourceTask.class);
private AudioSourceConnectorConfig config;
public static final String POSITION_FIELD = "position";
private static final Schema VALUE_SCHEMA = Schema.BYTES_SCHEMA;
private String filename;
private String topic = null;
private int offset = 0;
private FileSystem fs = FileSystems.getDefault();
private WatchService ws = fs.newWatchService();
private Path dir;
private File directoryPath;
private ArrayList<File> listOfFiles;
private byte[] temp = null;
public AudioSourceTask() throws IOException {
}
@Override
public String version() {
return VersionUtil.getVersion();
}
@Override
public void start(Map<String, String> props) {
filename = config.getFilenameConfig();
topic = config.getTopicConfig();
if (topic == null)
throw new ConnectException("AudioSourceTask config missing topic setting");
dir = Paths.get(filename);
try {
dir.register(ws, new WatchEvent.Kind[]{ENTRY_CREATE, ENTRY_DELETE}, FILE_TREE);
} catch (IOException e) {
e.printStackTrace();
}
directoryPath = new File(String.valueOf(dir));
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
//TODO: Create SourceRecord objects that will be sent the kafka cluster.
listOfFiles = new ArrayList<File>(Arrays.asList(directoryPath.listFiles()));
Map<String, Object> offset = context.offsetStorageReader().
offset(Collections.singletonMap(config.getFilenameConfig(), filename));
ArrayList<SourceRecord> records = new ArrayList<>(1);
try {
for (File file : listOfFiles) {
// send existing files first
temp = Files.readAllBytes(Paths.get(file.toString()));
records.add(new SourceRecord(null,
null, topic, Schema.BYTES_SCHEMA, temp));
}
return records;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
public void stop() {
//TODO: Do whatever is required to stop your task.
}
}
版本Class
package hothman.example;
/**
* Created by jeremy on 5/3/16.
*/
class VersionUtil {
public static String getVersion() {
try {
return VersionUtil.class.getPackage().getImplementationVersion();
} catch(Exception ex){
return "0.0.0.0";
}
}
}
Connector.properties
name=AudioSourceConnector
tasks.max=1
connector.class=hothman.example.AudioSourceConnector
fileName = G:\\Files
topic= my-topic
Connect-standalone.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=G:/Kafka/kafka_2.12-2.8.0/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=G:/Kafka/kafka_2.12-2.8.0/plugins
错误:
[2021-05-05 01:24:27,926] INFO WorkerSourceTask{id=AudioSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-05-05 01:24:27,928] ERROR WorkerSourceTask{id=AudioSourceConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
java.lang.OutOfMemoryError: Java heap space
at java.nio.file.Files.read(Files.java:3099)
at java.nio.file.Files.readAllBytes(Files.java:3158)
at hothman.example.AudioSourceTask.poll(AudioSourceTask.java:93)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:273)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2021-05-05 01:24:27,929] INFO [Producer clientId=connector-producer-AudioSourceConnector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1204)
[2021-05-05 01:24:27,933] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:659)
[2021-05-05 01:24:27,934] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:663)
[2021-05-05 01:24:27,934] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:669)
[2021-05-05 01:24:27,935] INFO App info kafka.producer for connector-producer-AudioSourceConnector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2021-05-05 01:24:36,479] INFO WorkerSourceTask{id=AudioSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
使用基于@OneCricketeer推荐的记录器,我能够查明问题。
config.getFilenameConfig();
返回 null,所以我不得不暂时在连接器中手动编码路径。
连接器工作,但给出了java.lang.OutOfMemoryError:Java堆空间
错误。为了解决这个问题,我必须编辑connect-standalone.properties文件并更改producer.max.request.size和producer.buffer.memory的大小,并确保它们的值高于我要发送的任何文件。
我还编辑了AudioSourceTask类,去掉了poll方法中的for循环,并将listOfFiles的初始化从poll方法移到start方法,现在如下所示
public void start(Map<String, String> props) {
filename = "G:\\AudioFiles";//config.getFilenameConfig();//
topic = "voice-wav1";//config.getTopicConfig();//
if (topic == null)
throw new ConnectException("AudioSourceTask config missing topic setting");
dir = Paths.get(filename);
try {
dir.register(ws, new WatchEvent.Kind[]{ENTRY_CREATE, ENTRY_DELETE}, FILE_TREE);
} catch (IOException e) {
e.printStackTrace();
}
directoryPath = new File(String.valueOf(dir));
listOfFiles = new ArrayList<File>(Arrays.asList(directoryPath.listFiles()));
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
//TODO: Create SourceRecord objects that will be sent the kafka cluster.
Map<String, Object> offset = context.offsetStorageReader().
offset(Collections.singletonMap("G:\\AudioFiles", filename));
ArrayList<SourceRecord> records = new ArrayList<>(1);
try{
// send existing files first
if(listOfFiles.size()!=0) {
File file = listOfFiles.get(listOfFiles.size() - 1);
listOfFiles.remove(listOfFiles.size() - 1);
temp = Files.readAllBytes(Paths.get(file.toString()));
records.add(new SourceRecord(null,
null, topic, Schema.BYTES_SCHEMA, temp));
LOGGER.info("Reading file {}", file);
return records;
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
我刚来卡拉夫。我有一个jar有一个类App和一个方法main。当我将jar放入Karaf日志服务控制台时,bundle已启动,但似乎什么也没有发生。(jar)做的第一件事是编写一个简单的数据库,这样我就可以查看它是否正在运行(没有生成日志文件,但应该生成一个)。 这个罐子依赖于许多其他罐子。我们的sysadmin不会在生产服务器上安装Maven。helper jar(比如mysql-connect
这个问题已经被问了很多次了,但没有什么像我的问题一样。我不明白为什么它没有启动或启动我的apk。 它卡在那里了。什么都没有发生,只是等待目标设备响应即将到来。 直到现在,一切都运行得很好。 我甚至重新安装了也没什么用 电脑规格:i7内核2.2 GHZ 16GB内存英特尔Iris Pro 1500MB Android Studio2.3 还是一样的错误……
我正在尝试用Spring 3、JPA 2和Hibernate 3制作一个应用程序。我有一个问题,当你坚持一个实体:什么都没发生!数据不插入数据库,也不执行查询。但是,当我使用一个请求,如query.getResultList()选择正常工作。 所以我认为我的问题只是在坚持/更新和事务管理器上,但我不太擅长Spring。你能帮帮我吗? 下面是我的配置文件: 谁能帮帮我吗?
null 谢谢你!
我正在使用Hibernate Envers来审核我的日志表,它是使用Spring配置的。但是,在执行更新、修改或删除操作时,不会发生审核。以下是配置。 Spring配置 我已经将@Audited注释添加到实体类中。我使用的是Hibernate core 3.5.0-Final和envers的相同版本。 当我检查hibernate记录的SQL时,我可以看到更新查询已经执行,但没有任何将数据插入审计表
我正在编写一个应用程序,我需要在单击不同按钮时出现相同的自定义弹出窗口。现在弹出窗口只是一个简单的“你确定吗?确定/取消”窗口,但稍后它将扩展以包括更多自定义功能......所以我不能使用内置的快速对话框。 奇怪的是。当按下按钮X时,弹出窗口(在FXML中定义)启动得很好,但我的控制器类似乎没有运行。我不认为你能做到这一点。我无法弄清楚的是控制器没有运行的原因。我本来以为如果控制器不工作,应用程序