当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect启动,但什么都没有发生

帅博远
2023-03-14

我正在编写一个Kafka source connector,它基于一个我用于音频文件的工作生成器。连接器启动了,但是没有任何反应,没有错误,没有数据,我不确定这是编码问题还是配置问题。

连接器应该读取整个目录,并将文件读取为字节数组。

配置类:

package hothman.example;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Importance;
import java.util.Map;



public class AudioSourceConnectorConfig extends AbstractConfig {

  public static final String FILENAME_CONFIG="fileName";
  private static final String FILENAME_DOC ="Enter the path of the audio files";

  public static final String TOPIC_CONFIG = "topic";
  private static final String TOPIC_DOC = "Enter the topic to write to..";



  public AudioSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
    super(config, parsedConfig);
  }

  public AudioSourceConnectorConfig(Map<String, String> parsedConfig) {
    this(conf(), parsedConfig);
  }

  public static ConfigDef conf() {
    return new ConfigDef()
            .define(FILENAME_CONFIG, Type.STRING, Importance.HIGH, FILENAME_DOC)
            .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, TOPIC_DOC);

  }

  public String getFilenameConfig(){
    return this.getString("fileName");
  }
  public String getTopicConfig(){
    return this.getString("topic");
  }
}

源连接器类

package hothman.example;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



public class AudioSourceConnector extends SourceConnector {
  /*
    Your connector should never use System.out for logging. All of your classes should use slf4j
    for logging
 */
  private static Logger log = LoggerFactory.getLogger(AudioSourceConnector.class);

  private AudioSourceConnectorConfig config;
  private String filename;
  private String topic;

  @Override
  public String version() {
    return VersionUtil.getVersion();
  }

  @Override
  public void start(Map<String, String> props) {
    filename = config.getFilenameConfig();
    topic = config.getTopicConfig();
    if (topic == null || topic.isEmpty())
      throw new ConnectException("AudiSourceConnector configuration must include 'topic' setting");
    if (topic.contains(","))
      throw new ConnectException("AudioSourceConnector should only have a single topic when used as a source.");
  }

  @Override
  public Class<? extends Task> taskClass() {
    //TODO: Return your task implementation.
    return AudioSourceTask.class;
  }

  @Override
  public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configsList = new ArrayList<>();
    // Only one input stream makes sense.
    Map<String, String> configs = new HashMap<>();
    if (filename != null)
      configs.put(config.getFilenameConfig(), filename);
    configs.put(config.getTopicConfig(), topic);
    configsList.add(configs);
    return configsList;
  }

  @Override
  public void stop() {

  }
  @Override
  public ConfigDef config() {
    return AudioSourceConnectorConfig.conf();

  }
}

源任务类

package hothman.example;


import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;

import java.nio.file.*;
import java.util.*;

import static com.sun.nio.file.ExtendedWatchEventModifier.FILE_TREE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;

public class AudioSourceTask extends SourceTask {
  /*
    Your connector should never use System.out for logging. All of your classes should use slf4j
    for logging
 */
  static final Logger log = LoggerFactory.getLogger(AudioSourceTask.class);

  private AudioSourceConnectorConfig config;
  public static final String POSITION_FIELD = "position";
  private static final Schema VALUE_SCHEMA = Schema.BYTES_SCHEMA;

  private String filename;
  private String topic = null;
  private int offset = 0;


  private FileSystem fs = FileSystems.getDefault();
  private WatchService ws = fs.newWatchService();

  private Path dir;
  private File directoryPath;
  private ArrayList<File> listOfFiles;
  private byte[] temp = null;


  public AudioSourceTask() throws IOException {
  }

  @Override
  public String version() {
    return VersionUtil.getVersion();
  }

  @Override
  public void start(Map<String, String> props) {
    filename = config.getFilenameConfig();
    topic = config.getTopicConfig();
    if (topic == null)
      throw new ConnectException("AudioSourceTask config missing topic setting");

    dir = Paths.get(filename);
    try {
      dir.register(ws, new WatchEvent.Kind[]{ENTRY_CREATE, ENTRY_DELETE}, FILE_TREE);
    } catch (IOException e) {
      e.printStackTrace();
    }

    directoryPath = new File(String.valueOf(dir));
  }


  @Override
  public List<SourceRecord> poll() throws InterruptedException {
    //TODO: Create SourceRecord objects that will be sent the kafka cluster.

    listOfFiles = new ArrayList<File>(Arrays.asList(directoryPath.listFiles()));
    Map<String, Object> offset = context.offsetStorageReader().
            offset(Collections.singletonMap(config.getFilenameConfig(), filename));


    ArrayList<SourceRecord> records = new ArrayList<>(1);

    try {
      for (File file : listOfFiles) {
        // send existing files first
        temp = Files.readAllBytes(Paths.get(file.toString()));

        records.add(new SourceRecord(null,
                null, topic, Schema.BYTES_SCHEMA, temp));
       
      }

      return records;
    } catch (IOException e) {
      e.printStackTrace();
    }
    return null;
  }

  @Override
  public void stop() {
    //TODO: Do whatever is required to stop your task.
  }




}

版本Class

package hothman.example;

/**
 * Created by jeremy on 5/3/16.
 */
class VersionUtil {
  public static String getVersion() {
    try {
      return VersionUtil.class.getPackage().getImplementationVersion();
    } catch(Exception ex){
      return "0.0.0.0";
    }
  }
}

Connector.properties

name=AudioSourceConnector
tasks.max=1
connector.class=hothman.example.AudioSourceConnector


fileName = G:\\Files
topic= my-topic

Connect-standalone.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter


key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter


# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.file.filename=G:/Kafka/kafka_2.12-2.8.0/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=G:/Kafka/kafka_2.12-2.8.0/plugins

错误:

[2021-05-05 01:24:27,926] INFO WorkerSourceTask{id=AudioSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
[2021-05-05 01:24:27,928] ERROR WorkerSourceTask{id=AudioSourceConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.file.Files.read(Files.java:3099)
        at java.nio.file.Files.readAllBytes(Files.java:3158)
        at hothman.example.AudioSourceTask.poll(AudioSourceTask.java:93)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:273)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2021-05-05 01:24:27,929] INFO [Producer clientId=connector-producer-AudioSourceConnector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1204)
[2021-05-05 01:24:27,933] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:659)
[2021-05-05 01:24:27,934] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:663)
[2021-05-05 01:24:27,934] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:669)
[2021-05-05 01:24:27,935] INFO App info kafka.producer for connector-producer-AudioSourceConnector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2021-05-05 01:24:36,479] INFO WorkerSourceTask{id=AudioSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)

共有1个答案

陈昂熙
2023-03-14

使用基于@OneCricketeer推荐的记录器,我能够查明问题。

config.getFilenameConfig();

返回 null,所以我不得不暂时在连接器中手动编码路径。

连接器工作,但给出了java.lang.OutOfMemoryError:Java堆空间错误。为了解决这个问题,我必须编辑connect-standalone.properties文件并更改producer.max.request.size和producer.buffer.memory的大小,并确保它们的值高于我要发送的任何文件。

我还编辑了AudioSourceTask类,去掉了poll方法中的for循环,并将listOfFiles的初始化从poll方法移到start方法,现在如下所示

     public void start(Map<String, String> props) {
    
            filename = "G:\\AudioFiles";//config.getFilenameConfig();//
            topic = "voice-wav1";//config.getTopicConfig();//
            if (topic == null)
              throw new ConnectException("AudioSourceTask config missing topic setting");
        
            dir = Paths.get(filename);
            try {
              dir.register(ws, new WatchEvent.Kind[]{ENTRY_CREATE, ENTRY_DELETE}, FILE_TREE);
            } catch (IOException e) {
              e.printStackTrace();
            }
        
            directoryPath = new File(String.valueOf(dir));
            listOfFiles = new ArrayList<File>(Arrays.asList(directoryPath.listFiles()));
          }

  @Override
  public List<SourceRecord> poll() throws InterruptedException {
    //TODO: Create SourceRecord objects that will be sent the kafka cluster.


    Map<String, Object> offset = context.offsetStorageReader().
            offset(Collections.singletonMap("G:\\AudioFiles", filename));


    ArrayList<SourceRecord> records = new ArrayList<>(1);

    try{
      
        // send existing files first
      if(listOfFiles.size()!=0) {
        File file = listOfFiles.get(listOfFiles.size() - 1);

        listOfFiles.remove(listOfFiles.size() - 1);
        temp = Files.readAllBytes(Paths.get(file.toString()));
        records.add(new SourceRecord(null,
                null, topic, Schema.BYTES_SCHEMA, temp));
        LOGGER.info("Reading file {}", file);
        return records;
      }

        
    } catch (IOException e) {
      e.printStackTrace();
    }

    return null;
  }
 类似资料:
  • 我刚来卡拉夫。我有一个jar有一个类App和一个方法main。当我将jar放入Karaf日志服务控制台时,bundle已启动,但似乎什么也没有发生。(jar)做的第一件事是编写一个简单的数据库,这样我就可以查看它是否正在运行(没有生成日志文件,但应该生成一个)。 这个罐子依赖于许多其他罐子。我们的sysadmin不会在生产服务器上安装Maven。helper jar(比如mysql-connect

  • 这个问题已经被问了很多次了,但没有什么像我的问题一样。我不明白为什么它没有启动或启动我的apk。 它卡在那里了。什么都没有发生,只是等待目标设备响应即将到来。 直到现在,一切都运行得很好。 我甚至重新安装了也没什么用 电脑规格:i7内核2.2 GHZ 16GB内存英特尔Iris Pro 1500MB Android Studio2.3 还是一样的错误……

  • 我正在尝试用Spring 3、JPA 2和Hibernate 3制作一个应用程序。我有一个问题,当你坚持一个实体:什么都没发生!数据不插入数据库,也不执行查询。但是,当我使用一个请求,如query.getResultList()选择正常工作。 所以我认为我的问题只是在坚持/更新和事务管理器上,但我不太擅长Spring。你能帮帮我吗? 下面是我的配置文件: 谁能帮帮我吗?

  • 我正在使用Hibernate Envers来审核我的日志表,它是使用Spring配置的。但是,在执行更新、修改或删除操作时,不会发生审核。以下是配置。 Spring配置 我已经将@Audited注释添加到实体类中。我使用的是Hibernate core 3.5.0-Final和envers的相同版本。 当我检查hibernate记录的SQL时,我可以看到更新查询已经执行,但没有任何将数据插入审计表

  • 我正在编写一个应用程序,我需要在单击不同按钮时出现相同的自定义弹出窗口。现在弹出窗口只是一个简单的“你确定吗?确定/取消”窗口,但稍后它将扩展以包括更多自定义功能......所以我不能使用内置的快速对话框。 奇怪的是。当按下按钮X时,弹出窗口(在FXML中定义)启动得很好,但我的控制器类似乎没有运行。我不认为你能做到这一点。我无法弄清楚的是控制器没有运行的原因。我本来以为如果控制器不工作,应用程序