Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@
private static final String CURRENT_FILE_DIR =
ConsensusManager.getConfigRegionDir() + File.separator + "current";
private static final String PROGRESS_FILE_PATH =
CURRENT_FILE_DIR + File.separator + "log_inprogress_";

Check failure on line 90 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "log_inprogress_" 3 times.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ38iLDgrmaxjvmVY4qH&open=AZ38iLDgrmaxjvmVY4qH&pullRequest=17609
private static final String FILE_PATH = CURRENT_FILE_DIR + File.separator + "log_";
private static final long LOG_FILE_MAX_SIZE =
CONF.getConfigNodeSimpleConsensusLogSegmentSizeMax();
private final TEndPoint currentNodeTEndPoint;
private static Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+");
private static Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$");
private static final Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("log_inprogress_(\\d+)$");
private static final Pattern LOG_PATTERN = Pattern.compile("log_(\\d+)_(\\d+)$");

public ConfigRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor executor) {
this.executor = executor;
Expand Down Expand Up @@ -121,6 +121,13 @@

/** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */
protected TSStatus write(ConfigPhysicalPlan plan) {
if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
final TSStatus persistStatus = persistPlanForSimpleConsensus(plan);
if (persistStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return persistStatus;
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WAL replay idempotency requirement

With the new write-ahead ordering (persist → execute), if persistPlanForSimpleConsensus succeeds at line 125 but executor.executeNonQueryPlan fails at line 133, the plan is already persisted in WAL and will be replayed on restart.

This is correct WAL semantics, but it introduces a hard requirement: all ConfigPhysicalPlan implementations must be idempotent under replay. For example, a RegisterDataNodePlan replayed against an already-registered node must not fail or double-register.

Is this idempotency property currently guaranteed across all ConfigPhysicalPlan types? If not, this could cause issues on crash recovery in edge cases.


TSStatus result;
try {
result = executor.executeNonQueryPlan(plan);
Expand All @@ -129,10 +136,6 @@
result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}

if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
writeLogForSimpleConsensus(plan);
}

if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
PipeConfigNodeAgent.runtime().listener().tryListenToPlan(plan, false);
}
Expand Down Expand Up @@ -197,22 +200,25 @@
PipeConfigNodeAgent.runtime()
.listener()
.tryListenToSnapshots(ConfigNodeSnapshotParser.getSnapshots());
return true;
} catch (IOException e) {
if (PipeConfigNodeAgent.runtime().listener().isOpened()) {
LOGGER.warn(
"Config Region Listening Queue Listen to snapshot failed, the historical data may not be transferred.",
e);
}
}
return true;
}
return false;
}

@Override
public void loadSnapshot(final File latestSnapshotRootDir) {
if (!executor.loadSnapshot(latestSnapshotRootDir)) {
return;
}

try {
executor.loadSnapshot(latestSnapshotRootDir);
// We recompute the snapshot for pipe listener when loading snapshot
// to recover the newest snapshot in cache
PipeConfigNodeAgent.runtime()
Expand Down Expand Up @@ -342,6 +348,9 @@

@Override
public void stop() {
if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
closeSimpleLogWriter();
}
// Shutdown leader related service for config pipe
PipeConfigNodeAgent.runtime().notifyLeaderUnavailable();
}
Expand All @@ -351,56 +360,48 @@
return CommonDescriptor.getInstance().getConfig().isReadOnly();
}

private void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
try {
simpleLogWriter.force();
File completedFilePath = new File(FILE_PATH + startIndex + "_" + endIndex);
Files.move(
simpleLogFile.toPath(), completedFilePath.toPath(), StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus mode", e);
private TSStatus persistPlanForSimpleConsensus(ConfigPhysicalPlan plan) {
try {
if (simpleLogWriter == null || simpleLogFile == null) {
throw new IOException("SimpleConsensus log writer is not initialized.");
}
for (int retry = 0; retry < 5; retry++) {
try {
simpleLogWriter.close();
} catch (IOException e) {
LOGGER.warn(
"Can't close StandAloneLog for ConfigNode SimpleConsensus mode, "
+ "filePath: {}, retry: {}",
simpleLogFile.getAbsolutePath(),
retry);
try {
// Sleep 1s and retry
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
LOGGER.warn("Unexpected interruption during the close method of logWriter");
}
continue;
}
break;

if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
rollSimpleConsensusLogFile();
}
startIndex = endIndex + 1;
createLogFile(startIndex);
}

try {
ByteBuffer buffer = plan.serializeToByteBuffer();
buffer.position(buffer.limit());
simpleLogWriter.write(buffer);
simpleLogWriter.force();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

force() on every write makes scheduled flush redundant

Now that every write is synchronously forced here, the scheduled flushWALForSimpleConsensus thread (lines 447-455) becomes redundant — it calls simpleLogWriter.force() periodically, but there's nothing left to flush.

This is harmless (double-force is idempotent) but worth either:

  1. Removing the scheduled thread to avoid confusion, or
  2. Adding a brief comment explaining the intentional belt-and-suspenders approach.

For ConfigNode metadata operations (low write frequency), the per-write force() is the right durability choice.


endIndex = endIndex + 1;
} catch (Exception e) {
LOGGER.error(
"Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode", e);
"Persist current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode failed", e);
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(
"Persist ConfigNode SimpleConsensus log failed: " + String.valueOf(e.getMessage()));

Check warning on line 384 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Directly append the argument of String.valueOf().

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ38iLDgrmaxjvmVY4qG&open=AZ38iLDgrmaxjvmVY4qG&pullRequest=17609
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: String.valueOf(e.getMessage()) is redundant — string concatenation with + already handles null by converting to "null". Can simplify to:

"Persist ConfigNode SimpleConsensus log failed: " + e.getMessage()

}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

private void rollSimpleConsensusLogFile() throws IOException {
simpleLogWriter.force();
closeSimpleLogWriter();
Files.move(
simpleLogFile.toPath(),
new File(FILE_PATH + startIndex + "_" + endIndex).toPath(),
StandardCopyOption.ATOMIC_MOVE);
startIndex = endIndex + 1;
createLogFile(startIndex);
}

private void initStandAloneConfigNode() {
File dir = new File(CURRENT_FILE_DIR);
dir.mkdirs();
String[] list = new File(CURRENT_FILE_DIR).list();
endIndex = 0;
if (list != null && list.length != 0) {
Arrays.sort(list, new FileComparator());
for (String logFileName : list) {
Expand All @@ -417,7 +418,7 @@
continue;
}

startIndex = endIndex;
final int recoveredStartIndex = parseStartIndex(logFileName);
while (logReader.hasNext()) {
endIndex++;
// Read and re-serialize the PhysicalPlan
Expand All @@ -435,13 +436,13 @@
}
}
logReader.close();
if (isInProgressLogFile(logFileName)) {
sealRecoveredInProgressLogFile(logFile, recoveredStartIndex, endIndex);
}
}
} else {
startIndex = 0;
endIndex = 0;
}
startIndex = startIndex + 1;
createLogFile(endIndex);
startIndex = endIndex + 1;
createLogFile(startIndex);

ScheduledExecutorService simpleConsensusThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -482,26 +483,72 @@
}
}

private void sealRecoveredInProgressLogFile(
File logFile, int recoveredStartIndex, int recoveredEndIndex) {
try {
if (recoveredStartIndex > recoveredEndIndex) {
Files.deleteIfExists(logFile.toPath());
return;
}
Files.move(
logFile.toPath(),
new File(FILE_PATH + recoveredStartIndex + "_" + recoveredEndIndex).toPath(),
StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
LOGGER.warn("Seal recovered ConfigNode SimpleConsensus log failed: {}", logFile, e);
}
}

private boolean isInProgressLogFile(String filename) {
return filename.startsWith("log_inprogress_");
}

private void closeSimpleLogWriter() {
if (simpleLogWriter == null) {
return;
}
for (int retry = 0; retry < 5; retry++) {
try {
simpleLogWriter.close();
simpleLogWriter = null;
return;
} catch (IOException e) {
LOGGER.warn(
"Can't close StandAloneLog for ConfigNode SimpleConsensus mode, "
+ "filePath: {}, retry: {}",
simpleLogFile == null ? null : simpleLogFile.getAbsolutePath(),
retry);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
LOGGER.warn("Unexpected interruption during the close method of logWriter");
break;
}
}
}
}

static class FileComparator implements Comparator<String> {

@Override
public int compare(String filename1, String filename2) {
long id1 = parseEndIndex(filename1);
long id2 = parseEndIndex(filename2);
long id1 = parseStartIndex(filename1);
long id2 = parseStartIndex(filename2);
return Long.compare(id1, id2);
}
}

static long parseEndIndex(String filename) {
static int parseStartIndex(String filename) {
if (filename.startsWith("log_inprogress_")) {
Matcher matcher = LOG_INPROGRESS_PATTERN.matcher(filename);
if (matcher.find()) {
return Long.parseLong(matcher.group());
return Integer.parseInt(matcher.group(1));
}
} else {
Matcher matcher = LOG_PATTERN.matcher(filename);
if (matcher.find()) {
return Long.parseLong(matcher.group());
return Integer.parseInt(matcher.group(1));
}
}
return 0;
Expand Down
Loading
Loading