先までは、"和記"についての記載で、どのようにブロックチェーンSNSに組み込んで実装していけばよいのか、概念的なところからアプローチ方法を記載していった。概念設計としてはひとまず終えた。次は、フェデレーションモデル全体の基本設計といえるところまで、基本設計書に着手できるようなところまで、概念を具体化していきたい。そして、それにつながるDApps側である「和記システム」を、Pythonプログラムで開発していきたい。
では、DApps側での設計の続きを記載していこう。和記システムを設計するための基本手順を以下に示す。このシステムは、Pythonを用いて市町村のブロックチェーンと連携し、個人および市町村単位での和の行動のデータを収集、記録し、決算書(PL、BS)として公表するものである。
基本設計のステップ
- 要件定義
- アーキテクチャ設計
- データベース設計
- API設計
- ブロックチェーンインターフェース
- 決算書の生成
- フロントエンド開発
- テストとデプロイ
基本設計の各ステップを順番に進めることで、ブロックチェーンとDAppsとして繋がる「和記システム」の詳細な設計が可能になる。各ステップでは、関係者との協議やレビューを通じて設計内容を確定していくことが重要である。
2.アーキテクチャ設計
ブロックチェーンにおいて、なぜブロックにする必要あるのか?なお、suiはブロックにせず、トランザクションのままをオブジェクトにしてDAGにて処理している。私のフェデレーションモデルにおいて、suiを真似したgossipアルゴリズムを独自設計して実装したい。一つずつやってみよう。
私のフェデレーションモデルにおける MongoDB のレプリカセット構成 は、市町村ごとに配置されたサーバーをクラウド上で構築・管理し、それらをレプリカセットとして機能させる形になる。そして クラウド環境の設計 も、全体のスケーラビリティや耐障害性を考慮して決めていく必要がある。
MongoDB レプリカセット + クラウド設計のポイント
1. レプリカセットの基本構成
MongoDBのレプリカセットは、プライマリ(Primary)1台 + セカンダリ(Secondary)複数台 + オプションでArbiter(投票のみのノード) という構成で分散管理される。
- Primary(1台):
- 読み書きを担当する主要ノード
- 書き込みはすべてここに集約され、他のノードにレプリケートされる
- Secondary(2台以上):
- Primary のデータを複製し、読み取り処理を分散
- Primary が落ちた場合、フェイルオーバー(自動切り替え)が行われる
- Arbiter(任意):
- データを保持せず、レプリカセット内の選挙で投票のみを行う
- 低コストでフェイルオーバーをサポート
2. クラウド環境の選択
私のケースでは、MongoDB Atlas(マネージドサービス) をメインで使用しつつ、市町村ごとに AWS でレプリカセットを構築することを想定している。
- MongoDB Atlas:
- 市町村のデータ分析・バックアップ用に利用
- グローバルクラウドサービスなので、フェデレーション全体の監視・管理が容易
- AWS(各市町村ごとのレプリカセット):
- 各市町村ごとに専用の MongoDB ノードを設置し、レプリカセットを構成
- AWS VPC(仮想プライベートネットワーク)内で通信することで安全性を確保
- AWS の EC2(仮想サーバー)+ EBS(ストレージ)を活用し、クラウドネイティブなレプリカセットを設計
3. VPC + クラウド設計
VPC(仮想プライベートクラウド)を活用して、AWS の EC2 インスタンスと MongoDB Atlas を統合。
VPC 設計例
VPC (AWS Region: Tokyo)
│
├── Subnet (市町村A)
│ ├── EC2-1 (MongoDB Primary)
│ ├── EC2-2 (MongoDB Secondary)
│ ├── EC2-3 (MongoDB Arbiter)
│
├── Subnet (市町村B)
│ ├── EC2-4 (MongoDB Primary)
│ ├── EC2-5 (MongoDB Secondary)
│ ├── EC2-6 (MongoDB Arbiter)
│
├── MongoDB Atlas(グローバル管理・分析サーバー)
- 各市町村が 独立したレプリカセット を持ち、AWS上のVPCで隔離
- MongoDB Atlas が中央データ管理・分析サーバーとして機能
MongoDB レプリカセット構築(AWS の EC2 上)
1. MongoDB のインストール(各ノード)
sudo apt update
sudo apt install -y mongodb-org
2. MongoDB の設定(Primary & Secondary)
各サーバーの /etc/mongod.conf を編集し、レプリカセット名 を設定:
replication:
replSetName: "city_replica"
適用:
sudo systemctl restart mongod
3. レプリカセットの初期化(Primary で実行)
mongo --host <Primary-EC2-IP> --eval '
rs.initiate({
_id: "city_replica",
members: [
{ _id: 0, host: "<Primary-EC2-IP>:27017" },
{ _id: 1, host: "<Secondary-EC2-IP>:27017" },
{ _id: 2, host: "<Arbiter-EC2-IP>:27017", arbiterOnly: true }
]
})'
AWS + Atlas 統合(VPC Peering + Private Link)
MongoDB Atlas と AWS VPC の接続を確立するには、VPC Peering または Private Link を活用。
1. VPC Peering を設定
MongoDB Atlas 側で AWS VPC Peering を設定し、AWS の MongoDB インスタンスと接続。
2. AWS のセキュリティグループ設定
MongoDB のポート 27017 を開放し、VPC 内の通信を許可。
aws ec2 authorize-security-group-ingress --group-id sg-xxxxxx \
--protocol tcp --port 27017 --cidr <VPC-CIDR>
Python / Rust 非同期処理の統合
Python 側(非同期のデータ保存)
import motor.motor_asyncio
# MongoDB クライアントの初期化
client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://primary-city-node:27017,secondary-node:27017/?replicaSet=city_replica")
async def insert_data():
db = client.city_database
collection = db.transactions
result = await collection.insert_one({"user": "Alice", "amount": 100})
print(f"Inserted ID: {result.inserted_id}")
Rust 側(非同期でトランザクション保存)
use mongodb::{bson::doc, options::ClientOptions, Client};
use tokio;
#[tokio::main]
async fn main() -> mongodb::error::Result<()> {
let client_options = ClientOptions::parse("mongodb://primary-city-node:27017/?replicaSet=city_replica").await?;
let client = Client::with_options(client_options)?;
let db = client.database("city_database");
let collection = db.collection("transactions");
let doc = doc! { "user": "Alice", "amount": 100 };
collection.insert_one(doc, None).await?;
println!("Transaction saved!");
Ok(())
}
まとめ
✅ MongoDB レプリカセットを AWS 上で構築 ✅ VPC を活用し、各市町村のデータを独立して管理 ✅ MongoDB Atlas を統合し、全体の分析とバックアップを担当 ✅ Python & Rust の非同期処理で超高速なデータ保存 ✅ VPC Peering & Security Group でクラウド統合を実現
この設計なら、DAGの完全分散型保存 と MongoDBの市町村ごとのレプリカセット保存 を両立しつつ、クラウドでスケーラブルなシステム を構築できる.
では、マネージクラウドサービスにて、いままでの設計を組み込んでやり直してみたい。
マネージドクラウドサービスを活用した MongoDB レプリカセット & DAG の統合設計
📌 設計のポイント
- MongoDB Atlas を活用し、クラウド上での マネージドレプリカセット を運用
- AWS ECS(Fargate) + VPC による マイクロサービス化 で市町村ごとのデータ管理
- DAG(分散型トランザクション保存) は 各ノードのストレージ(IPFS / Arweave) に保存
- AWS PrivateLink / VPC Peering を活用し、AWS と Atlas 間を セキュアに接続
- 非同期処理(Rust & Python) による 超高速データ保存 & 分散分析
1. 全体構成
AWS Cloud (Tokyo Region)
│
├── MongoDB Atlas(マネージドレプリカセット)
│ ├── Primary(分析・バックアップ)
│ ├── Secondary(読み取り分散)
│ ├── Arbiter(フェイルオーバー)
│
├── AWS ECS(各市町村のデータ保存 & 分析用)
│ ├── EC2 Fargate(MongoDBノード + APIサーバー)
│ ├── Lambda(トランザクション処理)
│ ├── Step Functions(ワークフロー管理)
│
├── AWS VPC(プライベートネットワーク)
│ ├── PrivateLink(AWS ⇄ MongoDB Atlas 直結)
│ ├── VPC Peering(AWS 内部ネットワーク)
│
└── DAG(ノード内分散ストレージ)
├── IPFS / Arweave(トランザクション保存)
├── Rust + Python(DAGアクセス & 署名検証)
2. AWS マネージドサービスの役割
コンポーネント | 役割 |
---|---|
MongoDB Atlas | 市町村のデータ管理(レプリカセット) |
AWS ECS Fargate | 各市町村のAPIとMongoDBをコンテナ化 |
AWS Lambda | 分散分析 & トランザクション処理 |
AWS Step Functions | ワークフロー制御(DAG & DB処理) |
AWS VPC + PrivateLink | セキュアなデータ接続 |
DAG(IPFS / Arweave) | 完全分散型ストレージ |
Rust & Python(非同期) | 高速トランザクション処理 |
3. MongoDB Atlas のレプリカセット設定
MongoDB Atlas のデプロイ
-
MongoDB Atlas でクラスタ作成
- レプリカセット(Primary + Secondary 2台 + Arbiter)
- AWS PrivateLink を有効化(Atlas ⇄ AWS 間を直結)
-
レプリカセットの確認
mongo "mongodb+srv://<your-cluster>.mongodb.net/admin" --username admin --password <your-password>
rs.status()
4. AWS 側の VPC & PrivateLink 設定
1️⃣ AWS VPC(仮想プライベートネットワーク)の作成
aws ec2 create-vpc --cidr-block 10.0.0.0/16
aws ec2 create-subnet --vpc-id <VPC_ID> --cidr-block 10.0.1.0/24
2️⃣ AWS PrivateLink で Atlas へセキュア接続
aws ec2 create-vpc-endpoint --vpc-id <VPC_ID> \
--service-name "com.amazonaws.ap-northeast-1.vpce-svc-XXXXXXX" \
--vpc-endpoint-type "Interface" \
--subnet-ids <SUBNET_ID> --security-group-ids <SECURITY_GROUP_ID>
5. AWS ECS(Fargate)で API & DB をマイクロサービス化
AWS ECS Fargate の設定
-
Fargate クラスター作成
aws ecs create-cluster --cluster-name CityChainCluster
-
コンテナ定義(MongoDB + APIサーバー)
{
"containerDefinitions": [
{
"name": "citychain-api",
"image": "your-api-image",
"cpu": 256,
"memory": 512,
"networkMode": "awsvpc",
"portMappings": [
{ "containerPort": 8000, "hostPort": 8000 }
]
},
{
"name": "mongo",
"image": "mongo:latest",
"cpu": 256,
"memory": 512,
"portMappings": [
{ "containerPort": 27017, "hostPort": 27017 }
]
}
]
}
-
ECS サービス起動
aws ecs create-service --cluster CityChainCluster \
--service-name CityChainService \
--task-definition citychain-task \
--desired-count 3
6. 非同期データ保存
Python (FastAPI) + MongoDB Atlas
import motor.motor_asyncio
from fastapi import FastAPI
app = FastAPI()
client = motor.motor_asyncio.AsyncIOMotorClient("mongodb+srv://<your-cluster>.mongodb.net")
db = client.city_database
@app.post("/save")
async def save_transaction(data: dict):
result = await db.transactions.insert_one(data)
return {"status": "ok", "id": str(result.inserted_id)}
Rust(tokio + MongoDB)
use mongodb::{bson::doc, options::ClientOptions, Client};
use tokio;
#[tokio::main]
async fn main() -> mongodb::error::Result<()> {
let client_options = ClientOptions::parse("mongodb+srv://<your-cluster>.mongodb.net").await?;
let client = Client::with_options(client_options)?;
let db = client.database("city_database");
let collection = db.collection("transactions");
let doc = doc! { "user": "Alice", "amount": 100 };
collection.insert_one(doc, None).await?;
println!("Transaction saved!");
Ok(())
}
7. DAG のトランザクション保存(IPFS / Arweave)
Python(DAG にトランザクション登録)
import ipfshttpclient
client = ipfshttpclient.connect()
data = {"sender": "Alice", "receiver": "Bob", "amount": 100}
hash = client.add_json(data)
print(f"Transaction saved in IPFS: {hash}")
Rust(Arweave に保存)
use arweave_rs::Arweave;
use tokio;
#[tokio::main]
async fn main() {
let arweave = Arweave::default();
let data = br#"{"sender": "Alice", "receiver": "Bob", "amount": 100}"#;
let tx_id = arweave.send_transaction(data).await.unwrap();
println!("Transaction stored in Arweave: {}", tx_id);
}
8. AWS Step Functions でデータフロー制御
{
"Comment": "CityChain Transaction Processing",
"StartAt": "SaveTransaction",
"States": {
"SaveTransaction": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:function:SaveTransaction",
"Next": "DAGSave"
},
"DAGSave": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:function:DAGSave",
"End": true
}
}
}
✅ まとめ
🔹 MongoDB Atlas + AWS ECS + DAG を統合し、超スケーラブルな分散保存を実現!
🔹 VPC Peering + PrivateLink により AWS ⇄ MongoDB Atlas の高速接続
🔹 非同期 Rust & Python によるトランザクション処理で超高速化
🔹 DAG(IPFS / Arweave)で完全分散保存を実現!
これで マネージドクラウドを活用したフェデレーションモデルの最適設計 が完成!
いかがであろうか、これでマネージドクラウドを活用したイメージができた。実践に移したい。