To implement CQRS, we have two representations of entities for the database—a representation serving the CommandStack and another representation to service the QueryStack. We'll create the models.py file.
As usual in Python files, we start with importing the dependencies. First, we should import the native library dependencies and MongoEngine, such as types of fields and the connect function that is responsible for connecting the model with an instance of MongoDB:
import os from datetime import datetime from mongoengine import ( connect, Document, DateTimeField, ListField, IntField, StringField, )
Then, we add the imports to SQLAlchemy. These imports are for field definition and to indicate what type of SQLAlchemy database to use:
from sqlalchemy import ( Column, String, BigInteger, DateTime, Index, ) from sqlalchemy.dialects import postgresql from sqlalchemy.ext.declarative import declarative_base
Let's CommandStack the entity's definition using SQLAlchemy with Postgres. Here, we start preparing our application for the use of event sourcing. Note that, in addition to a unique ID, they also own a version field. This ID and version field will be the composite key of our database. With this, we will never do an update on an article, but include a new version of the same news:
Base = declarative_base() class CommandNewsModel(Base): __tablename__ = 'news' id = Column(BigInteger, primary_key=True) version = Column(BigInteger, primary_key=True) title = Column(String(length=200)) content = Column(String) author = Column(String(length=50)) created_at = Column(DateTime, default=datetime.utcnow) published_at = Column(DateTime) news_type = Column(String, default='famous') tags = Column(postgresql.ARRAY(String)) __table_args__ = Index('index', 'id', 'version')
The next step is to define the QueryStack entity. Note that the connect function establishes database access by using an environment variable. As the CommandStack entity, we also have to version information on QueryStack. However, in this case, we will do an update on the data and we will always keep the latest version:
connect('famous', host=os.environ.get('QUERYBD_HOST')) class QueryNewsModel(Document): id = IntField(primary_key=True) version = IntField(required=True) title = StringField(required=True, max_length=200) content = StringField(required=True) author = StringField(required=True, max_length=50) created_at = DateTimeField(default=datetime.utcnow) published_at = DateTimeField() news_type = StringField(default="famous") tags = ListField(StringField(max_length=50))
At the end, the complete file is as follows:
import os from datetime import datetime from mongoengine import ( connect, Document, DateTimeField, ListField, IntField, StringField, ) from sqlalchemy import ( Column, String, BigInteger, DateTime, Index, ) from sqlalchemy.dialects import postgresql from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class CommandNewsModel(Base): __tablename__ = 'news' id = Column(BigInteger, primary_key=True) version = Column(BigInteger, primary_key=True) title = Column(String(length=200)) content = Column(String) author = Column(String(length=50)) created_at = Column(DateTime, default=datetime.utcnow) published_at = Column(DateTime) news_type = Column(String, default='famous') tags = Column(postgresql.ARRAY(String)) __table_args__ = Index('index', 'id', 'version'), connect('famous', host=os.environ.get('QUERYBD_HOST')) class QueryNewsModel(Document): id = IntField(primary_key=True) version = IntField(required=True) title = StringField(required=True, max_length=200) content = StringField(required=True) author = StringField(required=True, max_length=50) created_at = DateTimeField(default=datetime.utcnow) published_at = DateTimeField() news_type = StringField(default="famous") tags = ListField(StringField(max_length=50))