test_bulk_flow.py 1.1 KB

123456789101112131415161718192021222324252627282930
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import pytest
  3. from django.core.management import call_command
  4. from django.db import transaction
  5. from tests.dj_master import models as master_models
  6. from tests.dj_replica import models as replica_models
  7. from tests.test_commands.utils import remove_file
  8. @pytest.mark.django_db
  9. def tests_dumps_several_rows(mocker):
  10. mocker.patch('dj_cqrs.controller.producer.produce')
  11. remove_file('bulk_flow.dump')
  12. master_models.Author.objects.create(id=2, name='2')
  13. with transaction.atomic():
  14. publisher = master_models.Publisher.objects.create(id=1, name='publisher')
  15. master_models.Author.objects.create(id=1, name='1', publisher=publisher)
  16. assert replica_models.AuthorRef.objects.count() == 0
  17. assert replica_models.Publisher.objects.count() == 0
  18. call_command('cqrs_bulk_dump', '--cqrs-id=author', '-o=bulk_flow.dump')
  19. call_command('cqrs_bulk_load', '-i=bulk_flow.dump')
  20. assert replica_models.AuthorRef.objects.count() == 2
  21. assert replica_models.Publisher.objects.count() == 1