我正在尝试使用JMX BulkLoader将ETL数据从远程节点传输到集群上的Cassandra
https://github.com/PatrickCallaghan/datastax-analytics-example/blob/master/src/main/java/com/datastax/jmxloader/JmxBulkLoader.java
但是,在成功建立JMX连接后,它似乎无法批量加载。
请注意,批量负载是从远程节点发布到cassandra集群的。
它看起来似乎期望在cassandra集群的本地运行(即cassandra集群的本地主机)
我错过了什么吗?有人能给点建议吗
以下例外情况
java.lang.IllegalArgumentException:位于org.apache.casandra.service.StorageService.bulkLoadInternal(StorageService.java:3970)位于org.apache/casandra.server.StorageServices.bulkLoad Async(StorageService.java:3962)位于sun.relect.GeneratedMethodAccessor21.invoke(未知源)位于sun/relect.DelegatingMethodAccessorImpl.invokesun.reflect.Method.invoke(Method.java:606)at sun.reflekt.misc.Trampoline.invouse(MethodUtil.java:75)at sun/reflect.GeneratedMethodAccessor2.invoke(未知源)at sun-reflect.DelegatingMethodAccessorImpl.invokecom.sun.jmx.mbeanserver.StandardMBeanIntrospector.invokeM2(StandardMBeanIntrosopector.java:112),位于com.sun.jmx.mbeanserver.MBeanSupport.invoke,位于com.sun.jmx.mmbeanserver.PerInterface.invoke(PerInterface.java:138)(MBeanSupport.java:252)位于com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)位于com.sun.jmx.mmbeanserver.JmxMBeanServer.invoke。
class JmxBulkLoader(host: String, port: Int) {
private var connector: JMXConnector = _
private var storageBean: StorageServiceMBean = _
private var timer: Timer = new Timer()
connect("http://hostip , 7199)
private def connect(host: String, port: Int) {
val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(host,
port))
Logger.info(" Connected to JMX Entity " + jmxUrl)
val env = new HashMap[String, Any]()
connector = JMXConnectorFactory.connect(jmxUrl, env)
val mbeanServerConn = connector.getMBeanServerConnection
val name = new ObjectName("org.apache.cassandra.db:type=StorageService")
storageBean = JMX.newMBeanProxy(mbeanServerConn, name, classOf[StorageServiceMBean])
}
def close() {
connector.close()
}
def bulkLoad(path: String): Boolean = {
try {
val timer = new Stopwatch().start
val result = storageBean.bulkLoadAsync(path)
timer.stop
Logger.info("Async Result of Bulk Load " + result)
Logger.info("Bulk load took " + timer.getElapsedTime + "millsecs.")
true
} catch {
case e: Exception =>
Logger.error("Error in Bulk Loading " + e.printStackTrace())
false
}
}
}
/some_path/$KeySpaceName/$TableName
它看起来似乎期望在cassandra集群的本地运行(即cassandra集群的本地主机)
差一点。但是想想看:你用一个字符串参数调用Cassandra节点的mbean函数。此调用由您正在调用的 Cassandra 进程执行(即连接到)。该参数指定要连接到的节点一侧的路径。
您必须确保路径存在于目标上并保存您期望的数据(例如,通过共享存储或事先复制文件)。
我正在为我的应用程序创建一个cassandra会话对象,并为它创建一些准备好的语句。对每个准备好的语句设置不同的一致性级别。 我们之前只有statement1,cassandra读取延迟小于10ms。当我们添加statement2并从代码的一部分开始使用它时,每次cassandra调用的延迟都会增加到250ms。 这是datastax中的错误吗?有没有可能 正在将一致性级别设置为? 我错过了一些愚
我试图理解警告,每次我看到下面的异常,当我运行我的火花作业。我在我的3个节点中的2个节点中看到了这一点cluster.But正如我所说,它只是警告,作业成功了。 卡珊德拉日志 信息〔SharedPool-Worker-1〕2017-07-17 22:25:48716消息。java:605-请求期间出现意外异常;通道=[id:0xf0ee1096,/x.x.x.x:54863=
货币报价最新数据 接口: currency_latest 目标地址: https://currencyscoop.com/ 描述: 获取货币报价最新数据 限量: 单次返回指定货币的最新报价数据-免费账号每月限量访问 5000 次 输入参数 名称 类型 必选 描述 base str Y base="USD" api_key str Y api_key="Please put your api key
期货基础信息 主要提供金融期货和商品期货相关的基本面和行情数据 期货交易所 交易所名称 交易所代码 合约后缀 首页地址 中国金融期货交易所 CFFEX .CFX http://www.cffex.com.cn/ 上海期货交易所 SHFE .SHF http://www.shfe.com.cn/ 上海国际能源交易中心 INE .INE http://www.ine.cn/ 郑州商品交易所 CZCE
接口: get_js_dc_current 目标地址: https://datacenter.jin10.com/reportType/dc_bitcoin_current 描述: 获取数字货币实时行情, 实时更新 限量: 单次返回主流数字货币当前时点行情数据 输入参数 名称 类型 必选 描述 无 无 无 无 输出参数 名称 类型 默认显示 描述 reported_at str Y 日期时间-索引
主要内容:集合类型,用户定义的数据类型:CQL提供了丰富的内置数据类型,包括集合类型。 除了这些数据类型,用户还可以创建自己的自定义数据类型。 下表提供了CQL中可用的内置数据类型的列表。 数据类型 常量 说明 ascii strings 表示ASCII字符串 bigint bigint 表示64位有符号long类型 blob blobs 表示任意字节 Boolean booleans 表示真或假 counter integers 表示