|
@@ -4,9 +4,7 @@ import cn.hutool.core.collection.CollectionUtil;
|
|
import cn.hutool.core.date.DateUtil;
|
|
import cn.hutool.core.date.DateUtil;
|
|
import com.abi.task.common.api.exception.BusinessException;
|
|
import com.abi.task.common.api.exception.BusinessException;
|
|
import com.abi.task.common.api.exception.ErrorCodeEnum;
|
|
import com.abi.task.common.api.exception.ErrorCodeEnum;
|
|
-import com.abi.task.common.tablestore.common.TableStoreEntity;
|
|
|
|
import com.abi.task.common.tablestore.common.TableStoreReq;
|
|
import com.abi.task.common.tablestore.common.TableStoreReq;
|
|
-import com.abi.task.common.tablestore.common.TableStoreRes;
|
|
|
|
import com.alicloud.openservices.tablestore.SyncClient;
|
|
import com.alicloud.openservices.tablestore.SyncClient;
|
|
import com.alicloud.openservices.tablestore.model.*;
|
|
import com.alicloud.openservices.tablestore.model.*;
|
|
import com.alicloud.openservices.tablestore.model.search.SearchQuery;
|
|
import com.alicloud.openservices.tablestore.model.search.SearchQuery;
|
|
@@ -20,13 +18,16 @@ import com.alicloud.openservices.tablestore.model.search.sort.SortOrder;
|
|
import com.google.common.collect.Maps;
|
|
import com.google.common.collect.Maps;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
-import org.springframework.beans.factory.annotation.Value;
|
|
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
import javax.annotation.PostConstruct;
|
|
import java.time.LocalDateTime;
|
|
import java.time.LocalDateTime;
|
|
-import java.util.*;
|
|
|
|
|
|
+import java.util.Arrays;
|
|
|
|
+import java.util.Date;
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -38,18 +39,9 @@ import java.util.stream.Collectors;
|
|
@Component
|
|
@Component
|
|
public class TableStoreUtils {
|
|
public class TableStoreUtils {
|
|
|
|
|
|
- @Value("${tableStore.endPoint}")
|
|
|
|
- private String endPoint;
|
|
|
|
-
|
|
|
|
- @Value("${tableStore.accessKeyId}")
|
|
|
|
- private String accessKeyId;
|
|
|
|
-
|
|
|
|
- @Value("${tableStore.accessKeySecret}")
|
|
|
|
- private String accessKeySecret;
|
|
|
|
-
|
|
|
|
- @Value("${tableStore.instanceName}")
|
|
|
|
- private String instanceName;
|
|
|
|
|
|
|
|
|
|
+ @Autowired
|
|
|
|
+ private SyncClient client;
|
|
|
|
|
|
/**
|
|
/**
|
|
* 单词交互条数 200条
|
|
* 单词交互条数 200条
|
|
@@ -71,14 +63,7 @@ public class TableStoreUtils {
|
|
taskExecutor.initialize();
|
|
taskExecutor.initialize();
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * 创建SyncClient
|
|
|
|
- *
|
|
|
|
- * @return
|
|
|
|
- */
|
|
|
|
- public SyncClient client() {
|
|
|
|
- return new SyncClient(endPoint, accessKeyId, accessKeySecret, instanceName);
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
|
|
/**
|
|
/**
|
|
* 创建数据表
|
|
* 创建数据表
|
|
@@ -99,16 +84,14 @@ public class TableStoreUtils {
|
|
CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);
|
|
CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);
|
|
//设置预留读写吞吐量,容量型实例中的数据表只能设置为0,高性能实例中的数据表可以设置为非零值。
|
|
//设置预留读写吞吐量,容量型实例中的数据表只能设置为0,高性能实例中的数据表可以设置为非零值。
|
|
request.setReservedThroughput(new ReservedThroughput(new CapacityUnit(0, 0)));
|
|
request.setReservedThroughput(new ReservedThroughput(new CapacityUnit(0, 0)));
|
|
- SyncClient client = client();
|
|
|
|
client.createTable(request);
|
|
client.createTable(request);
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* 查询所有表的表名
|
|
* 查询所有表的表名
|
|
*/
|
|
*/
|
|
public List<String> listTable() {
|
|
public List<String> listTable() {
|
|
- ListTableResponse listTableResponse = client().listTable();
|
|
|
|
|
|
+ ListTableResponse listTableResponse = client.listTable();
|
|
return listTableResponse.getTableNames();
|
|
return listTableResponse.getTableNames();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -121,7 +104,7 @@ public class TableStoreUtils {
|
|
*/
|
|
*/
|
|
public void putRow(String primaryKeyName, String pkValue, String tableName, List<Column> columns) {
|
|
public void putRow(String primaryKeyName, String pkValue, String tableName, List<Column> columns) {
|
|
RowPutChange rowPutChange = buildRowPutChange(primaryKeyName, pkValue, tableName, columns);
|
|
RowPutChange rowPutChange = buildRowPutChange(primaryKeyName, pkValue, tableName, columns);
|
|
- client().putRow(new PutRowRequest(rowPutChange));
|
|
|
|
|
|
+ client.putRow(new PutRowRequest(rowPutChange));
|
|
}
|
|
}
|
|
|
|
|
|
public void batchPutRow(String primaryKeyName, List<String> pkValueList, String tableName, List<List<Column>> columnsList) {
|
|
public void batchPutRow(String primaryKeyName, List<String> pkValueList, String tableName, List<List<Column>> columnsList) {
|
|
@@ -182,15 +165,15 @@ public class TableStoreUtils {
|
|
//添加到batch操作中。
|
|
//添加到batch操作中。
|
|
batchWriteRowRequest.addRowChange(rowPutChange);
|
|
batchWriteRowRequest.addRowChange(rowPutChange);
|
|
}
|
|
}
|
|
- BatchWriteRowResponse response = client().batchWriteRow(batchWriteRowRequest);
|
|
|
|
|
|
+ BatchWriteRowResponse response = client.batchWriteRow(batchWriteRowRequest);
|
|
|
|
|
|
long end = System.currentTimeMillis();
|
|
long end = System.currentTimeMillis();
|
|
|
|
|
|
log.info("是否全部成功:{} ,耗时{}毫秒", response.isAllSucceed(),end-begin);
|
|
log.info("是否全部成功:{} ,耗时{}毫秒", response.isAllSucceed(),end-begin);
|
|
if (!response.isAllSucceed()) {
|
|
if (!response.isAllSucceed()) {
|
|
for (BatchWriteRowResponse.RowResult rowResult : response.getFailedRows()) {
|
|
for (BatchWriteRowResponse.RowResult rowResult : response.getFailedRows()) {
|
|
- log.info("失败的行:{}", batchWriteRowRequest.getRowChange(rowResult.getTableName(), rowResult.getIndex()).getPrimaryKey());
|
|
|
|
- log.info("失败原因:{}", rowResult.getError());
|
|
|
|
|
|
+ log.warn("失败的行:{}", batchWriteRowRequest.getRowChange(rowResult.getTableName(), rowResult.getIndex()).getPrimaryKey());
|
|
|
|
+ log.warn("失败原因:{}", rowResult.getError());
|
|
}
|
|
}
|
|
/**
|
|
/**
|
|
* 可以通过createRequestForRetry方法再构造一个请求对失败的行进行重试。此处只给出构造重试请求的部分。
|
|
* 可以通过createRequestForRetry方法再构造一个请求对失败的行进行重试。此处只给出构造重试请求的部分。
|
|
@@ -212,7 +195,7 @@ public class TableStoreUtils {
|
|
|
|
|
|
SingleRowQueryCriteria criteria = queryCriteria(primaryKeyName, pkValue, tableName);
|
|
SingleRowQueryCriteria criteria = queryCriteria(primaryKeyName, pkValue, tableName);
|
|
|
|
|
|
- GetRowResponse getRowResponse = client().getRow(new GetRowRequest(criteria));
|
|
|
|
|
|
+ GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
|
|
return getRowResponse.getRow();
|
|
return getRowResponse.getRow();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -230,7 +213,7 @@ public class TableStoreUtils {
|
|
|
|
|
|
//设置读取某些列。
|
|
//设置读取某些列。
|
|
criteria.addColumnsToGet(columnName);
|
|
criteria.addColumnsToGet(columnName);
|
|
- GetRowResponse getRowResponse = client().getRow(new GetRowRequest(criteria));
|
|
|
|
|
|
+ GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
|
|
return getRowResponse.getRow();
|
|
return getRowResponse.getRow();
|
|
|
|
|
|
}
|
|
}
|
|
@@ -250,7 +233,7 @@ public class TableStoreUtils {
|
|
//更新列数据
|
|
//更新列数据
|
|
rowUpdateChange.put(new Column(columnName, ColumnValue.fromString(columnValue)));
|
|
rowUpdateChange.put(new Column(columnName, ColumnValue.fromString(columnValue)));
|
|
|
|
|
|
- client().updateRow(new UpdateRowRequest(rowUpdateChange));
|
|
|
|
|
|
+ client.updateRow(new UpdateRowRequest(rowUpdateChange));
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -267,7 +250,7 @@ public class TableStoreUtils {
|
|
//删除某一列。
|
|
//删除某一列。
|
|
rowUpdateChange.deleteColumns(columnName);
|
|
rowUpdateChange.deleteColumns(columnName);
|
|
|
|
|
|
- client().updateRow(new UpdateRowRequest(rowUpdateChange));
|
|
|
|
|
|
+ client.updateRow(new UpdateRowRequest(rowUpdateChange));
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -285,7 +268,7 @@ public class TableStoreUtils {
|
|
//设置数据表名称。
|
|
//设置数据表名称。
|
|
RowDeleteChange rowDeleteChange = new RowDeleteChange(tableName, primaryKey);
|
|
RowDeleteChange rowDeleteChange = new RowDeleteChange(tableName, primaryKey);
|
|
|
|
|
|
- client().deleteRow(new DeleteRowRequest(rowDeleteChange));
|
|
|
|
|
|
+ client.deleteRow(new DeleteRowRequest(rowDeleteChange));
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -305,7 +288,7 @@ public class TableStoreUtils {
|
|
throw new BusinessException("FieldName为空");
|
|
throw new BusinessException("FieldName为空");
|
|
}
|
|
}
|
|
if(req.getFieldValue()==null
|
|
if(req.getFieldValue()==null
|
|
- &&(req.getLessThan()==null || req.getGreaterThan()==null)){
|
|
|
|
|
|
+ &&(req.getLessThan()==null || req.getGreaterThan()==null)){
|
|
throw new BusinessException("查询条件为空");
|
|
throw new BusinessException("查询条件为空");
|
|
}
|
|
}
|
|
|
|
|
|
@@ -355,7 +338,7 @@ public class TableStoreUtils {
|
|
columnsToGet.setReturnAll(true);
|
|
columnsToGet.setReturnAll(true);
|
|
searchRequest.setColumnsToGet(columnsToGet);
|
|
searchRequest.setColumnsToGet(columnsToGet);
|
|
//table store的出参
|
|
//table store的出参
|
|
- SearchResponse resp = client().search(searchRequest);
|
|
|
|
|
|
+ SearchResponse resp = client.search(searchRequest);
|
|
|
|
|
|
return resp;
|
|
return resp;
|
|
}
|
|
}
|
|
@@ -448,7 +431,7 @@ public class TableStoreUtils {
|
|
int maxVersions = 1;
|
|
int maxVersions = 1;
|
|
TableOptions tableOptions = new TableOptions(timeToLive, maxVersions);
|
|
TableOptions tableOptions = new TableOptions(timeToLive, maxVersions);
|
|
updateTableRequest.setTableOptionsForUpdate(tableOptions);
|
|
updateTableRequest.setTableOptionsForUpdate(tableOptions);
|
|
- return client().updateTable(updateTableRequest);
|
|
|
|
|
|
+ return client.updateTable(updateTableRequest);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|