SpringCloud3.x集成BigQuery的代码实现
作者:HBLOG
1.原理
Google BigQuery 是一种高性能、可应用于大数据分析的公主云数据库服务。Spring Cloud 提供了完善的工具和核心功能,可以进行泛动分布应用构建。通过集成 Spring Cloud GCP,应用可以便捷地使用 Google 云服务,如 BigQuery。 运行原理如下:
- Spring Cloud GCP 提供对 Google Cloud SDK 和 REST API 的封装,通过自定义配置简化了通信流程。
- 通过 BigQueryTemplate,应用可以实现数据提交、查询和分析。
- 使用 Spring 框架的常规构件,如信息通道和任务调度,应用可以进行规模化数据处理。
2.应用场景
- 大数据分析: 选择 BigQuery 进行大量数据的高性能分析,完善商业准备和内容提报。
- ETL 操作: 连接多种数据源,通过规则创建云数据分析的数据统一。
- BI 图表: 使用 BigQuery 提供的高速查询功能,支持 BI 平台实现动态绘图和数据分析。
- 可视化报表: 在进行精简数据计算后,展示对外分析结果。
3.环境创建
到 BigQuery 信息中心,为自己创建一个 BigQuery 数据集。在资源面板下,点击您的项目 ID,然后点击项目下的创建数据集。
4.代码实现
1. 配置环境
1.1 创建项目
使用 Spring Initializr 创建 Spring Boot 项目,选择以下依赖:
- Spring Web
- Spring Boot Actuator
- Spring Cloud GCP
1.2 加入依赖
在 pom.xml
中添加以下依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>spring-cloud-gcp</artifactId> <groupId>com.et</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>spring-cloud-gcp-bigquery-sample</artifactId> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.google.cloud</groupId> <artifactId>spring-cloud-gcp-starter-bigquery</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <!-- Test-related dependencies. --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project>
2. 配置文件
2.1 application.properties
spring.cloud.gcp.bigquery.dataset-name=test_dataset spring.cloud.gcp.bigquery.project-id=feisty-truth-447013-m7 spring.cloud.gcp.bigquery.credentials.location=file:/path-to-key/keyfile.json
举例:请将 path-to-key/keyfile.json
替换为你的服务账户私钥文件路径。
2.2 IAM 访问权限
确保对应的服务账户具备以下角色:
BigQuery Data Viewer
BigQuery Job User
使用以下指令加入 IAM 访问权限:
gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:YOUR_SERVICE_ACCOUNT_EMAIL" \ --role="roles/bigquery.jobUser"
替换 PROJECT_ID
和 YOUR_SERVICE_ACCOUNT_EMAIL
为你实际的项目 ID 和服务账户邮箱地址。
3. 实现逻辑
3.1 创建接口
将文件上传至 BigQuery
/* * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.et; import com.et.BigQuerySampleConfiguration.BigQueryFileGateway; import com.google.cloud.bigquery.*; import com.google.cloud.spring.bigquery.core.BigQueryTemplate; import com.google.cloud.spring.bigquery.core.WriteApiResponse; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Controller; import org.springframework.ui.ModelMap; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.multipart.MultipartFile; import org.springframework.web.servlet.ModelAndView; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.concurrent.CompletableFuture; /** Provides REST endpoint allowing you to load data files to BigQuery using Spring Integration. */ @Controller public class WebController { private final BigQueryFileGateway bigQueryFileGateway; private final BigQueryTemplate bigQueryTemplate; private static final String DATASET_NAME = "datasetName"; @Value("${spring.cloud.gcp.bigquery.datasetName}") private String datasetName; public WebController(BigQueryFileGateway bigQueryFileGateway, BigQueryTemplate bigQueryTemplate) { this.bigQueryFileGateway = bigQueryFileGateway; this.bigQueryTemplate = bigQueryTemplate; } @GetMapping("/") public ModelAndView renderIndex(ModelMap map) { map.put(DATASET_NAME, this.datasetName); return new ModelAndView("index.html", map); } @GetMapping("/write-api-json-upload") public ModelAndView renderUploadJson(ModelMap map) { map.put(DATASET_NAME, this.datasetName); return new ModelAndView("upload-json.html", map); } /** * Handles a file upload using {@link BigQueryTemplate}. * * @param file the JSON file to upload to BigQuery * @param tableName name of the table to load data into * @return ModelAndView of the response the send back to users * @throws IOException if the file is unable to be loaded. */ @PostMapping("/uploadJsonFile") public ModelAndView handleJsonFileUpload( @RequestParam("file") MultipartFile file, @RequestParam("tableName") String tableName, @RequestParam(name = "createTable", required = false) String createDefaultTable) throws IOException { CompletableFuture<WriteApiResponse> writeApiRes; if (createDefaultTable != null && createDefaultTable.equals("createTable")) { // create the default table writeApiRes = this.bigQueryTemplate.writeJsonStream( tableName, file.getInputStream(), getDefaultSchema()); } else { // we are expecting the table to be already existing writeApiRes = this.bigQueryTemplate.writeJsonStream(tableName, file.getInputStream()); } return getWriteApiResponse(writeApiRes, tableName); } private Schema getDefaultSchema() { return Schema.of( Field.of("CompanyName", StandardSQLTypeName.STRING), Field.of("Description", StandardSQLTypeName.STRING), Field.of("SerialNumber", StandardSQLTypeName.NUMERIC), Field.of("Leave", StandardSQLTypeName.NUMERIC), Field.of("EmpName", StandardSQLTypeName.STRING)); } /** * Handles JSON data upload using using {@link BigQueryTemplate}. * * @param jsonRows the String JSON data to upload to BigQuery * @param tableName name of the table to load data into * @return ModelAndView of the response the send back to users */ @PostMapping("/uploadJsonText") public ModelAndView handleJsonTextUpload( @RequestParam("jsonRows") String jsonRows, @RequestParam("tableName") String tableName, @RequestParam(name = "createTable", required = false) String createDefaultTable) { CompletableFuture<WriteApiResponse> writeApiRes; if (createDefaultTable != null && createDefaultTable.equals("createTable")) { // create the default table writeApiRes = this.bigQueryTemplate.writeJsonStream( tableName, new ByteArrayInputStream(jsonRows.getBytes()), getDefaultSchema()); } else { // we are expecting the table to be already existing writeApiRes = this.bigQueryTemplate.writeJsonStream( tableName, new ByteArrayInputStream(jsonRows.getBytes())); } return getWriteApiResponse(writeApiRes, tableName); } private ModelAndView getWriteApiResponse( CompletableFuture<WriteApiResponse> writeApiFuture, String tableName) { String message = null; try { WriteApiResponse apiResponse = writeApiFuture.get(); if (apiResponse.isSuccessful()) { message = "Successfully loaded data to " + tableName; } else if (apiResponse.getErrors() != null && !apiResponse.getErrors().isEmpty()) { message = String.format( "Error occurred while loading the file, printing first error %s. Use WriteApiResponse.getErrors() to get the complete list of errors", apiResponse.getErrors().get(0).getErrorMessage()); } } catch (Exception e) { e.printStackTrace(); message = "Error: " + e.getMessage(); } return new ModelAndView("upload-json.html") .addObject(DATASET_NAME, this.datasetName) .addObject("message", message); } /** * Handles a file upload using {@link BigQueryTemplate}. * * @param file the CSV file to upload to BigQuery * @param tableName name of the table to load data into * @return ModelAndView of the response to send back to users * @throws IOException if the file is unable to be loaded. */ @PostMapping("/uploadFile") public ModelAndView handleFileUpload( @RequestParam("file") MultipartFile file, @RequestParam("tableName") String tableName) throws IOException { CompletableFuture<Job> loadJob = this.bigQueryTemplate.writeDataToTable( tableName, file.getInputStream(), FormatOptions.csv()); return getResponse(loadJob, tableName); } /** * Handles CSV data upload using Spring Integration {@link BigQueryFileGateway}. * * @param csvData the String CSV data to upload to BigQuery * @param tableName name of the table to load data into * @return ModelAndView of the response the send back to users */ @PostMapping("/uploadCsvText") public ModelAndView handleCsvTextUpload( @RequestParam("csvText") String csvData, @RequestParam("tableName") String tableName) { CompletableFuture<Job> loadJob = this.bigQueryFileGateway.writeToBigQueryTable(csvData.getBytes(), tableName); return getResponse(loadJob, tableName); } private ModelAndView getResponse(CompletableFuture<Job> loadJob, String tableName) { String message; try { Job job = loadJob.get(); message = "Successfully loaded data file to " + tableName; } catch (Exception e) { e.printStackTrace(); message = "Error: " + e.getMessage(); } return new ModelAndView("index") .addObject(DATASET_NAME, this.datasetName) .addObject("message", message); } }
以上只是一些关键代码。
5.测试
在google cloud shell里面运行代码 运行 $ mvn spring-boot:run 命令。
单击 Cloud Shell 中的 Web Preview 按钮以在端口 8080 上预览应用,并尝试将一些数据加载到数据集下的 BigQuery 表中。该应用程序接受 CSV 文件上传或输入到文本区域的 CSV 数据。如果 BigQuery 数据集下尚不存在该表,则会为您创建该表。
查看导入结果
以上就是SpringCloud3.x集成BigQuery的代码实现的详细内容,更多关于SpringCloud3.x集成BigQuery的资料请关注脚本之家其它相关文章!