当前位置: 首页 > 知识库问答 >
问题:

信任商店和谷歌云数据流

马欣荣
2023-03-14

我需要使用信任存储在谷歌云数据流中建立SSLKafka连接。我可以从存储桶提供它,还是有没有办法将其存储在“本地文件系统”上?

共有2个答案

邹海荣
2023-03-14

感谢@jkff的解决方案,这是一个实现示例:

示例ConsumerFactoryFn实施:

    private static class ConsumerFactoryFn
        implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
{



    public Consumer<byte[], byte[]> apply(Map<String, Object> config) 
    {
        try 
        {
            Storage storage = StorageOptions.newBuilder()
                    .setProjectId("prj-id-of-your-bucket")
                    .setCredentials(GoogleCredentials.getApplicationDefault())
                    .build()
                    .getService();
            Blob blob = storage.get("your-bucket-name", "pth.to.your.kafka.client.truststore.jks");
            ReadChannel readChannel = blob.reader();
            FileOutputStream fileOuputStream;
            fileOuputStream = new FileOutputStream("/tmp/kafka.client.truststore.jks"); //path where the jks file will be stored
            fileOuputStream.getChannel().transferFrom(readChannel, 0, Long.MAX_VALUE);
            fileOuputStream.close();
            File f = new File("/tmp/kafka.client.truststore.jks"); //assuring the store file exists
            if (f.exists())
            {
                LOG.debug("key exists");

            }
            else
            {
                LOG.error("key does not exist");

            }

        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            LOG.error( e.getMessage());
        } catch (IOException e) {
            // TODO Auto-generated catch block
            LOG.error( e.getMessage());
        }


        config.put("ssl.truststore.location",(Object) "/tmp/kafka.client.truststore.jks" );

        return new KafkaConsumer<byte[], byte[]>(config);
    }
}

别忘了在KafkaIO中使用.withConsumerFactoryFn。read()调用,应该类似于:

Map<String, Object> configMap = new HashMap<String, Object>();
configMap.put("security.protocol", (Object) "SSL");
configMap.put("ssl.truststore.password", (Object) "clientpass");

p.apply("ReadFromKafka", KafkaIO.<String, String>read()
            .withBootstrapServers("ip:9093")
            .withTopic("pageviews")
            .withKeyDeserializer(StringDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .updateConsumerProperties(configMap)
            .withConsumerFactoryFn(new ConsumerFactoryFn()) ... etc.
翟誉
2023-03-14

您可以使用“消费者工厂”提供工厂函数,该函数将被调用以创建Kafka使用者。在该函数中,您可以自由地做任何您喜欢的事情,例如,您可以从GCS存储桶下载信任存储文件(我建议使用GCSUtil)并将其保存到本地磁盘上的临时文件中 - AFAIK Kafka本身仅支持将此文件放在本地磁盘上。然后手动创建一个 Kafka 使用者并将其指向该文件。

 类似资料:
  • 我使用谷歌商店定位器(http://storelocator.googlecode.com/git/examples/custom.html)显示体育赛事。 现在我想自定义来自谷歌的消息。我想隐藏信息“这个地区没有商店。然而,离你最近的商店列在下面。"如果缩放区域中没有事件。我想在面板中的孔线上放一个链接,以重定向到特定的网址。 是否有任何选项可以根据我的愿望定制商店定位器? 谢谢

  • 我在Google play store dashboard上查看我的应用程序的统计数据,发现每次发布后,用户流失都会增加。虽然活动安装数量几乎保持不变,但这些数字仍在发出警报(数千)。 这是否意味着用户正在卸载,或者可能是更新应用程序的人数导致卸载然后重新安装?因为用户流失的峰值总是在启动更新统计图像之后

  • 我正在看新的谷歌云数据存储,看起来很棒。但有件事我不明白。。。它应该替代谷歌应用引擎数据存储吗?我如何在GAE内部使用它?它们之间有什么区别? 我在Java有一个GAE应用程序,它使用3个实体,每个实体都有数千行,我需要经常做连接...

  • 介绍如何在谷歌云平台获取在云联壹云平台需要使用的配置参数。 如何获取谷歌云服务帐号密钥信息? 纳管指定项目 打开“GCP Console中的IAM和管理-IAM页面”页面并登录。 单击顶部“选择项目”,选择需要授权的项目。 在左侧导航栏中选择“服务账号”,进入指定项目的服务账号页面。 单击 “创建服务账号” 按钮,进入创建服务账号页面。 配置服务账号名称、服务账号ID、服务账号说明等,单击 “创建

  • 我正在尝试使用谷歌云数据流将谷歌PubSub消息写入谷歌云存储。PubSub消息采用json格式,我要执行的唯一操作是从json到parquet文件的转换。

  • 我目前正在应用中实现费率功能。 因此,我将有一个带有2个按钮的简单对话框,和,如果用户单击,他们将显示另一个带有Rating Bar和编辑文本的对话框以留下评论。单击确定后,它们将通过 然而,我不知道如何捆绑他们的评级和评论,并适用于我的应用程序在google play商店的评级领域。这在Android上可能吗? 谢啦