OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

mysql-replication | BinLogStreamReader is not resuming from last position (log_file, log_pos not working)

  • Thread starter Thread starter zigi
  • Start date Start date
Z

zigi

Guest
I face issue, that BinLogStreamReader is not resuming from last position.

I have hardcoded 1146 position, but process takes 875 position too. It should resume log stream from >=1146 position.

Code:
log_file="mysqld-bin.000003",

Code:
log_pos=1146,

Versions:

MariaDB version: 10.3.39

Package version: mysql-replication=="0.45.1" (Works with 10.3.39 MariaDB version)

my.cnf:

Code:
[mysqld]
log-bin
binlog-format=ROW
server-id=1
binlog_row_image=FULL

Process logs:

Code:
INFO:root:Resuming from mysqld-bin.000003:1146
INFO:root:Starting BinLogStreamReader with log_file=mysqld-bin.000003 and log_pos=1146
DEBUG:root:BinLogStreamReader initialized with log_file=mysqld-bin.000003 and log_pos=1146
INFO:root: -> Insert into mydatabase.example_table: {'id': 1, 'name': 'Alice', 'created_at': datetime.datetime(2024, 6, 28, 0, 34, 17), '_deleted': False}
INFO:root:Saved logs: mysqld-bin.000003:875
INFO:root: -> Insert into mydatabase.example_table: {'id': 2, 'name': 'Alice', 'created_at': datetime.datetime(2024, 6, 28, 0, 56, 39), '_deleted': False}
INFO:root:Saved logs: mysqld-bin.000003:1146

Python code:

Code:
import time
# mysql-replication=="0.45.1"
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
from pymysqlreplication.event import RotateEvent
import logging

# Database connection parameters
config = {
    'host': 'localhost',
    'port': 3307,
    'user': 'root',
    'password': 'rootpassword',
    'database': 'mydatabase'
}

# Setup logging
logging.basicConfig(level=logging.DEBUG)


def save_binlog_position(log_file, log_pos):
    with open("binlog_position.txt", "w") as f:
        f.write(f"{log_file},{log_pos}")
        logging.info(f"Saved logs: {log_file}:{log_pos}")


def load_binlog_position():
    try:
        with open("binlog_position.txt", "r") as f:
            log_file, log_pos = f.read().strip().split(',')
            logging.info(f"Resuming from {log_file}:{log_pos}")
            return log_file, int(log_pos)
    except FileNotFoundError:
        logging.info("Starting from beginning")
        return None, None


def get_values_from_logs(stream):
    for event in stream:
        table_name = event.table
        if isinstance(event, WriteRowsEvent):
            for row in event.rows:
                row['values'].update({"_deleted": False})
                logging.info(f" -> Insert into {event.schema}.{event.table}: {row['values']}")
        elif isinstance(event, UpdateRowsEvent):
            for row in event.rows:
                row['after_values'].update({"_deleted": False})
                logging.info(
                    f" -> Update {event.schema}.{event.table}: {row['before_values']} -> {row['after_values']}")
        elif isinstance(event, DeleteRowsEvent):
            for row in event.rows:
                row['values'].update({"_deleted": True})
                logging.info(f" -> Delete from {event.schema}.{event.table}: {row['values']}")
        log_file, log_pos = stream.log_file, stream.log_pos
        save_binlog_position(log_file, log_pos)


# Function to parse binary log events
def parse_binlog_events():
    log_file, log_pos = load_binlog_position()

    logging.info(f"Starting BinLogStreamReader with log_file={log_file} and log_pos={log_pos}")

    stream = BinLogStreamReader(
        connection_settings=config,
        server_id=1,
        only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
        is_mariadb=True,
        blocking=True,
        resume_stream=True,
        # log_file=log_file,
        # log_pos=log_pos,
        log_file="mysqld-bin.000003",
        log_pos=1146,
    )

    logging.debug(f"BinLogStreamReader initialized with log_file={stream.log_file} and log_pos={stream.log_pos}")
    get_values_from_logs(stream)
    stream.close()


if __name__ == "__main__":
    parse_binlog_events()

Docker-compose:

Code:
services:
  mariadb:
    image: mariadb:10.3.39
    ports:
      - "3307:3306"
    environment:
      MYSQL_ROOT_PASSWORD: rootpassword
      MYSQL_DATABASE: mydatabase
      MYSQL_USER: myuser
      MYSQL_PASSWORD: mypassword
    volumes:
      - ./mariadb/my.cnf:/etc/mysql/conf.d/my.cnf

Anyone could help? Thank you in advance!
<p>I face issue, that BinLogStreamReader is not resuming from last position.</p>
<p>I have hardcoded <strong>1146</strong> position, but process takes <strong>875</strong> position too. It should resume log stream from <strong>>=1146</strong> position.</p>
<pre><code>log_file="mysqld-bin.000003",
</code></pre>
<pre><code>log_pos=1146,
</code></pre>
<p>Versions:</p>
<p>MariaDB version: <code>10.3.39</code></p>
<p>Package version: <code>mysql-replication=="0.45.1"</code> (Works with 10.3.39 MariaDB version)</p>
<p>my.cnf:</p>
<pre><code>[mysqld]
log-bin
binlog-format=ROW
server-id=1
binlog_row_image=FULL
</code></pre>
<p>Process logs:</p>
<pre><code>INFO:root:Resuming from mysqld-bin.000003:1146
INFO:root:Starting BinLogStreamReader with log_file=mysqld-bin.000003 and log_pos=1146
DEBUG:root:BinLogStreamReader initialized with log_file=mysqld-bin.000003 and log_pos=1146
INFO:root: -> Insert into mydatabase.example_table: {'id': 1, 'name': 'Alice', 'created_at': datetime.datetime(2024, 6, 28, 0, 34, 17), '_deleted': False}
INFO:root:Saved logs: mysqld-bin.000003:875
INFO:root: -> Insert into mydatabase.example_table: {'id': 2, 'name': 'Alice', 'created_at': datetime.datetime(2024, 6, 28, 0, 56, 39), '_deleted': False}
INFO:root:Saved logs: mysqld-bin.000003:1146
</code></pre>
<p>Python code:</p>
<pre><code>
import time
# mysql-replication=="0.45.1"
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
from pymysqlreplication.event import RotateEvent
import logging

# Database connection parameters
config = {
'host': 'localhost',
'port': 3307,
'user': 'root',
'password': 'rootpassword',
'database': 'mydatabase'
}

# Setup logging
logging.basicConfig(level=logging.DEBUG)


def save_binlog_position(log_file, log_pos):
with open("binlog_position.txt", "w") as f:
f.write(f"{log_file},{log_pos}")
logging.info(f"Saved logs: {log_file}:{log_pos}")


def load_binlog_position():
try:
with open("binlog_position.txt", "r") as f:
log_file, log_pos = f.read().strip().split(',')
logging.info(f"Resuming from {log_file}:{log_pos}")
return log_file, int(log_pos)
except FileNotFoundError:
logging.info("Starting from beginning")
return None, None


def get_values_from_logs(stream):
for event in stream:
table_name = event.table
if isinstance(event, WriteRowsEvent):
for row in event.rows:
row['values'].update({"_deleted": False})
logging.info(f" -> Insert into {event.schema}.{event.table}: {row['values']}")
elif isinstance(event, UpdateRowsEvent):
for row in event.rows:
row['after_values'].update({"_deleted": False})
logging.info(
f" -> Update {event.schema}.{event.table}: {row['before_values']} -> {row['after_values']}")
elif isinstance(event, DeleteRowsEvent):
for row in event.rows:
row['values'].update({"_deleted": True})
logging.info(f" -> Delete from {event.schema}.{event.table}: {row['values']}")
log_file, log_pos = stream.log_file, stream.log_pos
save_binlog_position(log_file, log_pos)


# Function to parse binary log events
def parse_binlog_events():
log_file, log_pos = load_binlog_position()

logging.info(f"Starting BinLogStreamReader with log_file={log_file} and log_pos={log_pos}")

stream = BinLogStreamReader(
connection_settings=config,
server_id=1,
only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],
is_mariadb=True,
blocking=True,
resume_stream=True,
# log_file=log_file,
# log_pos=log_pos,
log_file="mysqld-bin.000003",
log_pos=1146,
)

logging.debug(f"BinLogStreamReader initialized with log_file={stream.log_file} and log_pos={stream.log_pos}")
get_values_from_logs(stream)
stream.close()


if __name__ == "__main__":
parse_binlog_events()
</code></pre>
<p>Docker-compose:</p>
<pre><code>services:
mariadb:
image: mariadb:10.3.39
ports:
- "3307:3306"
environment:
MYSQL_ROOT_PASSWORD: rootpassword
MYSQL_DATABASE: mydatabase
MYSQL_USER: myuser
MYSQL_PASSWORD: mypassword
volumes:
- ./mariadb/my.cnf:/etc/mysql/conf.d/my.cnf
</code></pre>
<p>Anyone could help? Thank you in advance!</p>
 

Latest posts

Online statistics

Members online
0
Guests online
3
Total visitors
3
Top