感谢@jkff的解决方案,这是一个实现示例:
示例ConsumerFactoryFn实施:
private static class ConsumerFactoryFn
implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
{
public Consumer<byte[], byte[]> apply(Map<String, Object> config)
{
try
{
Storage storage = StorageOptions.newBuilder()
.setProjectId("prj-id-of-your-bucket")
.setCredentials(GoogleCredentials.getApplicationDefault())
.build()
.getService();
Blob blob = storage.get("your-bucket-name", "pth.to.your.kafka.client.truststore.jks");
ReadChannel readChannel = blob.reader();
FileOutputStream fileOuputStream;
fileOuputStream = new FileOutputStream("/tmp/kafka.client.truststore.jks"); //path where the jks file will be stored
fileOuputStream.getChannel().transferFrom(readChannel, 0, Long.MAX_VALUE);
fileOuputStream.close();
File f = new File("/tmp/kafka.client.truststore.jks"); //assuring the store file exists
if (f.exists())
{
LOG.debug("key exists");
}
else
{
LOG.error("key does not exist");
}
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
LOG.error( e.getMessage());
} catch (IOException e) {
// TODO Auto-generated catch block
LOG.error( e.getMessage());
}
config.put("ssl.truststore.location",(Object) "/tmp/kafka.client.truststore.jks" );
return new KafkaConsumer<byte[], byte[]>(config);
}
}
别忘了在KafkaIO中使用.withConsumerFactoryFn。read()调用,应该类似于:
Map<String, Object> configMap = new HashMap<String, Object>();
configMap.put("security.protocol", (Object) "SSL");
configMap.put("ssl.truststore.password", (Object) "clientpass");
p.apply("ReadFromKafka", KafkaIO.<String, String>read()
.withBootstrapServers("ip:9093")
.withTopic("pageviews")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(configMap)
.withConsumerFactoryFn(new ConsumerFactoryFn()) ... etc.
您可以使用“消费者工厂”提供工厂函数,该函数将被调用以创建Kafka使用者。在该函数中,您可以自由地做任何您喜欢的事情,例如,您可以从GCS存储桶下载信任存储文件(我建议使用GCSUtil)并将其保存到本地磁盘上的临时文件中 - AFAIK Kafka本身仅支持将此文件放在本地磁盘上。然后手动创建一个 Kafka 使用者
并将其指向该文件。
我使用谷歌商店定位器(http://storelocator.googlecode.com/git/examples/custom.html)显示体育赛事。 现在我想自定义来自谷歌的消息。我想隐藏信息“这个地区没有商店。然而,离你最近的商店列在下面。"如果缩放区域中没有事件。我想在面板中的孔线上放一个链接,以重定向到特定的网址。 是否有任何选项可以根据我的愿望定制商店定位器? 谢谢
我在Google play store dashboard上查看我的应用程序的统计数据,发现每次发布后,用户流失都会增加。虽然活动安装数量几乎保持不变,但这些数字仍在发出警报(数千)。 这是否意味着用户正在卸载,或者可能是更新应用程序的人数导致卸载然后重新安装?因为用户流失的峰值总是在启动更新统计图像之后
我正在看新的谷歌云数据存储,看起来很棒。但有件事我不明白。。。它应该替代谷歌应用引擎数据存储吗?我如何在GAE内部使用它?它们之间有什么区别? 我在Java有一个GAE应用程序,它使用3个实体,每个实体都有数千行,我需要经常做连接...
介绍如何在谷歌云平台获取在云联壹云平台需要使用的配置参数。 如何获取谷歌云服务帐号密钥信息? 纳管指定项目 打开“GCP Console中的IAM和管理-IAM页面”页面并登录。 单击顶部“选择项目”,选择需要授权的项目。 在左侧导航栏中选择“服务账号”,进入指定项目的服务账号页面。 单击 “创建服务账号” 按钮,进入创建服务账号页面。 配置服务账号名称、服务账号ID、服务账号说明等,单击 “创建
我正在尝试使用谷歌云数据流将谷歌PubSub消息写入谷歌云存储。PubSub消息采用json格式,我要执行的唯一操作是从json到parquet文件的转换。
我目前正在应用中实现费率功能。 因此,我将有一个带有2个按钮的简单对话框,和,如果用户单击,他们将显示另一个带有Rating Bar和编辑文本的对话框以留下评论。单击确定后,它们将通过 然而,我不知道如何捆绑他们的评级和评论,并适用于我的应用程序在google play商店的评级领域。这在Android上可能吗? 谢啦