Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

回滚后再调用GetWithoutAckAsync时报错,求助 #116

Open
3 tasks
liuzeqi1997 opened this issue Jul 1, 2024 · 0 comments
Open
3 tasks

回滚后再调用GetWithoutAckAsync时报错,求助 #116

liuzeqi1997 opened this issue Jul 1, 2024 · 0 comments

Comments

@liuzeqi1997
Copy link

Please answer these questions before submitting your issue.

  • Why do you submit this issue?
  • [√] Question or discussion
  • Bug
  • Requirement
  • Feature or performance improvement

Question

  • What do you want to know?
    我在测试回滚,回滚后再获取数据时报错“Received an error returned by the server: something goes wrong with channel:[id: 0x146b0a4c, /127.0.0.1:64186 => /127.0.0.1:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: rollback error, clientId:1 batchId:-1 is not exist , please check

Bug

  • Which version of Canal-Server, OS ,Mysql?
    MySQL版本:8.0.35
    CanalSharp:1.2.1

  • What happen?
    If possible, provide a way for reproducing the error. e.g. demo application.
    var loggerFactory = LoggerFactory.Create(builder =>
    {
    builder
    .AddFilter("Microsoft", LogLevel.Debug)
    .AddFilter("System", LogLevel.Information)
    .AddConsole();
    });
    var _logger = loggerFactory.CreateLogger();
    string id = Console.ReadLine();
    var conn = new SimpleCanalConnection(new SimpleCanalOptions("127.0.0.1", 11111, id), _logger);
    //连接到 Canal Server
    await conn.ConnectAsync();
    //订阅
    await conn.SubscribeAsync("canal_test.test_user");

while (true)
{
//var entries = await conn.GetAsync(1024);
var entries = await conn.GetWithoutAckAsync(1024);
foreach (var entry in entries.Entries)
{
//不处理事务标记
if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
{
continue;
}

    RowChange rowChange = null;
    try
    {
        rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
        var eventType = rowChange.EventType;
        foreach (var rowData in rowChange.RowDatas)
        {
            //删除的数据
            if (eventType == CanalSharp.Protocol.EventType.Delete)
            {
                PrintColumn(rowData.BeforeColumns.ToList());
            }
            //插入的数据
            else if (eventType == CanalSharp.Protocol.EventType.Insert)
            {
                PrintColumn(rowData.AfterColumns.ToList());
            }
            //更新的数据
            else
            {
                _logger.LogInformation("-------> before");
                PrintColumn(rowData.BeforeColumns.ToList());
                _logger.LogInformation("-------> after");
                PrintColumn(rowData.AfterColumns.ToList());
            }
        }

    }
    catch (Exception e)
    {
        Console.WriteLine(e);
    }
}
await conn.RollbackAsync(entries.Id);

}

Console.ReadLine();


Requirement or improvement

  • Please describe about your requirements or improvement suggestions.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant