-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Fixed multiple configNode bugs #17609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -87,13 +87,13 @@ | |
| private static final String CURRENT_FILE_DIR = | ||
| ConsensusManager.getConfigRegionDir() + File.separator + "current"; | ||
| private static final String PROGRESS_FILE_PATH = | ||
| CURRENT_FILE_DIR + File.separator + "log_inprogress_"; | ||
|
Check failure on line 90 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
|
||
| private static final String FILE_PATH = CURRENT_FILE_DIR + File.separator + "log_"; | ||
| private static final long LOG_FILE_MAX_SIZE = | ||
| CONF.getConfigNodeSimpleConsensusLogSegmentSizeMax(); | ||
| private final TEndPoint currentNodeTEndPoint; | ||
| private static Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+"); | ||
| private static Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$"); | ||
| private static final Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("log_inprogress_(\\d+)$"); | ||
| private static final Pattern LOG_PATTERN = Pattern.compile("log_(\\d+)_(\\d+)$"); | ||
|
|
||
| public ConfigRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor executor) { | ||
| this.executor = executor; | ||
|
|
@@ -121,6 +121,13 @@ | |
|
|
||
| /** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */ | ||
| protected TSStatus write(ConfigPhysicalPlan plan) { | ||
| if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) { | ||
| final TSStatus persistStatus = persistPlanForSimpleConsensus(plan); | ||
| if (persistStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { | ||
| return persistStatus; | ||
| } | ||
| } | ||
|
|
||
| TSStatus result; | ||
| try { | ||
| result = executor.executeNonQueryPlan(plan); | ||
|
|
@@ -129,10 +136,6 @@ | |
| result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); | ||
| } | ||
|
|
||
| if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) { | ||
| writeLogForSimpleConsensus(plan); | ||
| } | ||
|
|
||
| if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { | ||
| PipeConfigNodeAgent.runtime().listener().tryListenToPlan(plan, false); | ||
| } | ||
|
|
@@ -197,22 +200,25 @@ | |
| PipeConfigNodeAgent.runtime() | ||
| .listener() | ||
| .tryListenToSnapshots(ConfigNodeSnapshotParser.getSnapshots()); | ||
| return true; | ||
| } catch (IOException e) { | ||
| if (PipeConfigNodeAgent.runtime().listener().isOpened()) { | ||
| LOGGER.warn( | ||
| "Config Region Listening Queue Listen to snapshot failed, the historical data may not be transferred.", | ||
| e); | ||
| } | ||
| } | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public void loadSnapshot(final File latestSnapshotRootDir) { | ||
| if (!executor.loadSnapshot(latestSnapshotRootDir)) { | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| executor.loadSnapshot(latestSnapshotRootDir); | ||
| // We recompute the snapshot for pipe listener when loading snapshot | ||
| // to recover the newest snapshot in cache | ||
| PipeConfigNodeAgent.runtime() | ||
|
|
@@ -342,6 +348,9 @@ | |
|
|
||
| @Override | ||
| public void stop() { | ||
| if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) { | ||
| closeSimpleLogWriter(); | ||
| } | ||
| // Shutdown leader related service for config pipe | ||
| PipeConfigNodeAgent.runtime().notifyLeaderUnavailable(); | ||
| } | ||
|
|
@@ -351,56 +360,48 @@ | |
| return CommonDescriptor.getInstance().getConfig().isReadOnly(); | ||
| } | ||
|
|
||
| private void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) { | ||
| if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) { | ||
| try { | ||
| simpleLogWriter.force(); | ||
| File completedFilePath = new File(FILE_PATH + startIndex + "_" + endIndex); | ||
| Files.move( | ||
| simpleLogFile.toPath(), completedFilePath.toPath(), StandardCopyOption.ATOMIC_MOVE); | ||
| } catch (IOException e) { | ||
| LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus mode", e); | ||
| private TSStatus persistPlanForSimpleConsensus(ConfigPhysicalPlan plan) { | ||
| try { | ||
| if (simpleLogWriter == null || simpleLogFile == null) { | ||
| throw new IOException("SimpleConsensus log writer is not initialized."); | ||
| } | ||
| for (int retry = 0; retry < 5; retry++) { | ||
| try { | ||
| simpleLogWriter.close(); | ||
| } catch (IOException e) { | ||
| LOGGER.warn( | ||
| "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, " | ||
| + "filePath: {}, retry: {}", | ||
| simpleLogFile.getAbsolutePath(), | ||
| retry); | ||
| try { | ||
| // Sleep 1s and retry | ||
| TimeUnit.SECONDS.sleep(1); | ||
| } catch (InterruptedException e2) { | ||
| Thread.currentThread().interrupt(); | ||
| LOGGER.warn("Unexpected interruption during the close method of logWriter"); | ||
| } | ||
| continue; | ||
| } | ||
| break; | ||
|
|
||
| if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) { | ||
| rollSimpleConsensusLogFile(); | ||
| } | ||
| startIndex = endIndex + 1; | ||
| createLogFile(startIndex); | ||
| } | ||
|
|
||
| try { | ||
| ByteBuffer buffer = plan.serializeToByteBuffer(); | ||
| buffer.position(buffer.limit()); | ||
| simpleLogWriter.write(buffer); | ||
| simpleLogWriter.force(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Now that every write is synchronously forced here, the scheduled This is harmless (double-force is idempotent) but worth either:
For ConfigNode metadata operations (low write frequency), the per-write |
||
|
|
||
| endIndex = endIndex + 1; | ||
| } catch (Exception e) { | ||
| LOGGER.error( | ||
| "Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode", e); | ||
| "Persist current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode failed", e); | ||
| return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) | ||
| .setMessage( | ||
| "Persist ConfigNode SimpleConsensus log failed: " + String.valueOf(e.getMessage())); | ||
|
Check warning on line 384 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: "Persist ConfigNode SimpleConsensus log failed: " + e.getMessage() |
||
| } | ||
| return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); | ||
| } | ||
|
|
||
| private void rollSimpleConsensusLogFile() throws IOException { | ||
| simpleLogWriter.force(); | ||
| closeSimpleLogWriter(); | ||
| Files.move( | ||
| simpleLogFile.toPath(), | ||
| new File(FILE_PATH + startIndex + "_" + endIndex).toPath(), | ||
| StandardCopyOption.ATOMIC_MOVE); | ||
| startIndex = endIndex + 1; | ||
| createLogFile(startIndex); | ||
| } | ||
|
|
||
| private void initStandAloneConfigNode() { | ||
| File dir = new File(CURRENT_FILE_DIR); | ||
| dir.mkdirs(); | ||
| String[] list = new File(CURRENT_FILE_DIR).list(); | ||
| endIndex = 0; | ||
| if (list != null && list.length != 0) { | ||
| Arrays.sort(list, new FileComparator()); | ||
| for (String logFileName : list) { | ||
|
|
@@ -417,7 +418,7 @@ | |
| continue; | ||
| } | ||
|
|
||
| startIndex = endIndex; | ||
| final int recoveredStartIndex = parseStartIndex(logFileName); | ||
| while (logReader.hasNext()) { | ||
| endIndex++; | ||
| // Read and re-serialize the PhysicalPlan | ||
|
|
@@ -435,13 +436,13 @@ | |
| } | ||
| } | ||
| logReader.close(); | ||
| if (isInProgressLogFile(logFileName)) { | ||
| sealRecoveredInProgressLogFile(logFile, recoveredStartIndex, endIndex); | ||
| } | ||
| } | ||
| } else { | ||
| startIndex = 0; | ||
| endIndex = 0; | ||
| } | ||
| startIndex = startIndex + 1; | ||
| createLogFile(endIndex); | ||
| startIndex = endIndex + 1; | ||
| createLogFile(startIndex); | ||
|
|
||
| ScheduledExecutorService simpleConsensusThread = | ||
| IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( | ||
|
|
@@ -482,26 +483,72 @@ | |
| } | ||
| } | ||
|
|
||
| private void sealRecoveredInProgressLogFile( | ||
| File logFile, int recoveredStartIndex, int recoveredEndIndex) { | ||
| try { | ||
| if (recoveredStartIndex > recoveredEndIndex) { | ||
| Files.deleteIfExists(logFile.toPath()); | ||
| return; | ||
| } | ||
| Files.move( | ||
| logFile.toPath(), | ||
| new File(FILE_PATH + recoveredStartIndex + "_" + recoveredEndIndex).toPath(), | ||
| StandardCopyOption.ATOMIC_MOVE); | ||
| } catch (IOException e) { | ||
| LOGGER.warn("Seal recovered ConfigNode SimpleConsensus log failed: {}", logFile, e); | ||
| } | ||
| } | ||
|
|
||
| private boolean isInProgressLogFile(String filename) { | ||
| return filename.startsWith("log_inprogress_"); | ||
| } | ||
|
|
||
| private void closeSimpleLogWriter() { | ||
| if (simpleLogWriter == null) { | ||
| return; | ||
| } | ||
| for (int retry = 0; retry < 5; retry++) { | ||
| try { | ||
| simpleLogWriter.close(); | ||
| simpleLogWriter = null; | ||
| return; | ||
| } catch (IOException e) { | ||
| LOGGER.warn( | ||
| "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, " | ||
| + "filePath: {}, retry: {}", | ||
| simpleLogFile == null ? null : simpleLogFile.getAbsolutePath(), | ||
| retry); | ||
| try { | ||
| TimeUnit.SECONDS.sleep(1); | ||
| } catch (InterruptedException e2) { | ||
| Thread.currentThread().interrupt(); | ||
| LOGGER.warn("Unexpected interruption during the close method of logWriter"); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| static class FileComparator implements Comparator<String> { | ||
|
|
||
| @Override | ||
| public int compare(String filename1, String filename2) { | ||
| long id1 = parseEndIndex(filename1); | ||
| long id2 = parseEndIndex(filename2); | ||
| long id1 = parseStartIndex(filename1); | ||
| long id2 = parseStartIndex(filename2); | ||
| return Long.compare(id1, id2); | ||
| } | ||
| } | ||
|
|
||
| static long parseEndIndex(String filename) { | ||
| static int parseStartIndex(String filename) { | ||
| if (filename.startsWith("log_inprogress_")) { | ||
| Matcher matcher = LOG_INPROGRESS_PATTERN.matcher(filename); | ||
| if (matcher.find()) { | ||
| return Long.parseLong(matcher.group()); | ||
| return Integer.parseInt(matcher.group(1)); | ||
| } | ||
| } else { | ||
| Matcher matcher = LOG_PATTERN.matcher(filename); | ||
| if (matcher.find()) { | ||
| return Long.parseLong(matcher.group()); | ||
| return Integer.parseInt(matcher.group(1)); | ||
| } | ||
| } | ||
| return 0; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WAL replay idempotency requirement
With the new write-ahead ordering (persist → execute), if
persistPlanForSimpleConsensussucceeds at line 125 butexecutor.executeNonQueryPlanfails at line 133, the plan is already persisted in WAL and will be replayed on restart.This is correct WAL semantics, but it introduces a hard requirement: all
ConfigPhysicalPlanimplementations must be idempotent under replay. For example, aRegisterDataNodePlanreplayed against an already-registered node must not fail or double-register.Is this idempotency property currently guaranteed across all ConfigPhysicalPlan types? If not, this could cause issues on crash recovery in edge cases.