当前位置: 首页 > 知识库问答 >
问题:

数据税Cassandra散货船

汲灿
2023-03-14

我正在尝试使用JMX BulkLoader将ETL数据从远程节点传输到集群上的Cassandra

https://github.com/PatrickCallaghan/datastax-analytics-example/blob/master/src/main/java/com/datastax/jmxloader/JmxBulkLoader.java

但是,在成功建立JMX连接后,它似乎无法批量加载。

请注意,批量负载是从远程节点发布到cassandra集群的。

它看起来似乎期望在cassandra集群的本地运行(即cassandra集群的本地主机)

我错过了什么吗?有人能给点建议吗

以下例外情况

java.lang.IllegalArgumentException:位于org.apache.casandra.service.StorageService.bulkLoadInternal(StorageService.java:3970)位于org.apache/casandra.server.StorageServices.bulkLoad Async(StorageService.java:3962)位于sun.relect.GeneratedMethodAccessor21.invoke(未知源)位于sun/relect.DelegatingMethodAccessorImpl.invokesun.reflect.Method.invoke(Method.java:606)at sun.reflekt.misc.Trampoline.invouse(MethodUtil.java:75)at sun/reflect.GeneratedMethodAccessor2.invoke(未知源)at sun-reflect.DelegatingMethodAccessorImpl.invokecom.sun.jmx.mbeanserver.StandardMBeanIntrospector.invokeM2(StandardMBeanIntrosopector.java:112),位于com.sun.jmx.mbeanserver.MBeanSupport.invoke,位于com.sun.jmx.mmbeanserver.PerInterface.invoke(PerInterface.java:138)(MBeanSupport.java:252)位于com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)位于com.sun.jmx.mmbeanserver.JmxMBeanServer.invoke。

class JmxBulkLoader(host: String, port: Int) {

  private var connector: JMXConnector = _

  private var storageBean: StorageServiceMBean = _

  private var timer: Timer = new Timer()

  connect("http://hostip , 7199)

 private def connect(host: String, port: Int) {

    val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(host,

      port))

    Logger.info(" Connected to JMX Entity " + jmxUrl)

    val env = new HashMap[String, Any]()

    connector = JMXConnectorFactory.connect(jmxUrl, env)

    val mbeanServerConn = connector.getMBeanServerConnection

    val name = new ObjectName("org.apache.cassandra.db:type=StorageService")

    storageBean = JMX.newMBeanProxy(mbeanServerConn, name, classOf[StorageServiceMBean])

  }

  def close() {

    connector.close()

  }

  def bulkLoad(path: String): Boolean = {

    try {

      val timer = new Stopwatch().start

      val result = storageBean.bulkLoadAsync(path)

      timer.stop

      Logger.info("Async Result of Bulk Load " + result)

      Logger.info("Bulk load took " + timer.getElapsedTime + "millsecs.")

      true

    } catch {

      case e: Exception =>

        Logger.error("Error in Bulk Loading " + e.printStackTrace())

        false

    }

  }

}

共有2个答案

晏修诚
2023-03-14
  1. 该表应存在于卡桑德拉中
  2. 该目录对于 cassandra 节点应该是可访问的(本地)。
  3. 目录应以键空间和目标表名结尾:
    /some_path/$KeySpaceName/$TableName
江德润
2023-03-14

它看起来似乎期望在cassandra集群的本地运行(即cassandra集群的本地主机)

差一点。但是想想看:你用一个字符串参数调用Cassandra节点的mbean函数。此调用由您正在调用的 Cassandra 进程执行(即连接到)。该参数指定要连接到的节点一侧的路径。

您必须确保路径存在于目标上并保存您期望的数据(例如,通过共享存储或事先复制文件)。

 类似资料:
  • 我正在为我的应用程序创建一个cassandra会话对象,并为它创建一些准备好的语句。对每个准备好的语句设置不同的一致性级别。 我们之前只有statement1,cassandra读取延迟小于10ms。当我们添加statement2并从代码的一部分开始使用它时,每次cassandra调用的延迟都会增加到250ms。 这是datastax中的错误吗?有没有可能 正在将一致性级别设置为? 我错过了一些愚

  • 我试图理解警告,每次我看到下面的异常,当我运行我的火花作业。我在我的3个节点中的2个节点中看到了这一点cluster.But正如我所说,它只是警告,作业成功了。 卡珊德拉日志 信息〔SharedPool-Worker-1〕2017-07-17 22:25:48716消息。java:605-请求期间出现意外异常;通道=[id:0xf0ee1096,/x.x.x.x:54863=

  • 货币报价最新数据 接口: currency_latest 目标地址: https://currencyscoop.com/ 描述: 获取货币报价最新数据 限量: 单次返回指定货币的最新报价数据-免费账号每月限量访问 5000 次 输入参数 名称 类型 必选 描述 base str Y base="USD" api_key str Y api_key="Please put your api key

  • 期货基础信息 主要提供金融期货和商品期货相关的基本面和行情数据 期货交易所 交易所名称 交易所代码 合约后缀 首页地址 中国金融期货交易所 CFFEX .CFX http://www.cffex.com.cn/ 上海期货交易所 SHFE .SHF http://www.shfe.com.cn/ 上海国际能源交易中心 INE .INE http://www.ine.cn/ 郑州商品交易所 CZCE

  • 接口: get_js_dc_current 目标地址: https://datacenter.jin10.com/reportType/dc_bitcoin_current 描述: 获取数字货币实时行情, 实时更新 限量: 单次返回主流数字货币当前时点行情数据 输入参数 名称 类型 必选 描述 无 无 无 无 输出参数 名称 类型 默认显示 描述 reported_at str Y 日期时间-索引

  • 主要内容:集合类型,用户定义的数据类型:CQL提供了丰富的内置数据类型,包括集合类型。 除了这些数据类型,用户还可以创建自己的自定义数据类型。 下表提供了CQL中可用的内置数据类型的列表。 数据类型 常量 说明 ascii strings 表示ASCII字符串 bigint bigint 表示64位有符号long类型 blob blobs 表示任意字节 Boolean booleans 表示真或假 counter integers 表示