Skip to content

Commit

Permalink
Merge pull request #3244 from KouShenhai/dev
Browse files Browse the repository at this point in the history
fix: maven运行报错
  • Loading branch information
KouShenhai authored Jan 1, 2025
2 parents 4252407 + 2e5fd96 commit 51913ba
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ public <T, M> void batch(List<T> dataList, Class<M> clazz, String ds, BiConsumer
*/
public <T, M> void batch(List<T> dataList, int batchNum, int timeout, Class<M> clazz, String ds,
BiConsumer<M, T> consumer) {
// 数据分组
List<List<T>> partition = Lists.partition(dataList, batchNum);
AtomicBoolean rollback = new AtomicBoolean(false);
CyclicBarrier cyclicBarrier = new CyclicBarrier(partition.size());
try (ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
try {
// 数据分组
List<List<T>> partition = Lists.partition(dataList, batchNum);
AtomicBoolean rollback = new AtomicBoolean(false);
CyclicBarrier cyclicBarrier = new CyclicBarrier(partition.size());
// 虚拟线程
List<Callable<Boolean>> futures = partition.stream().map(item -> (Callable<Boolean>) () -> {
handleBatch(timeout, item, clazz, consumer, rollback, ds, cyclicBarrier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.laokou.logstash.common.support;

import lombok.RequiredArgsConstructor;
import org.laokou.common.core.utils.ThreadUtil;
import org.laokou.common.elasticsearch.template.ElasticsearchTemplate;
import org.laokou.logstash.gateway.database.dataobject.TraceLogIndex;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

@RequiredArgsConstructor
public class TraceLogElasticsearchStorage extends AbstractTraceLogStorage {
Expand All @@ -31,9 +33,11 @@ public class TraceLogElasticsearchStorage extends AbstractTraceLogStorage {

@Override
public CompletableFuture<Void> batchSave(Map<String, Object> map) {
return elasticsearchTemplate.asyncCreateIndex(getIndexName(), TRACE_INDEX, TraceLogIndex.class, EXECUTOR)
.thenAcceptAsync(res -> elasticsearchTemplate.asyncBulkCreateDocument(getIndexName(), map, EXECUTOR),
EXECUTOR);
try (ExecutorService executor = ThreadUtil.newVirtualTaskExecutor()) {
return elasticsearchTemplate.asyncCreateIndex(getIndexName(), TRACE_INDEX, TraceLogIndex.class, executor)
.thenAcceptAsync(res -> elasticsearchTemplate.asyncBulkCreateDocument(getIndexName(), map, executor),
executor);
}
}

@Override
Expand Down

0 comments on commit 51913ba

Please sign in to comment.