続・ティール組織 研究会のブログ

続・ティール組織 研究会のブログ

ティール組織が話題になっているが、具現化するにはどうしたらよいか?
その研究を続けるにあたり、さらに次の形態である、続・ティール組織なるものまで視野に入れ、具体的な施策・行動内容を研究・支援する会。

先までは、"和記"についての記載で、どのようにブロックチェーンSNSに組み込んで実装していけばよいのか、概念的なところからアプローチ方法を記載していった。概念設計としてはひとまず終えた。次は、フェデレーションモデル全体の基本設計といえるところまで、基本設計書に着手できるようなところまで、概念を具体化していきたい。そして、それにつながるDApps側である「和記システム」を、Pythonプログラムで開発していきたい。

 

では、DApps側での設計の続きを記載していこう。和記システムを設計するための基本手順を以下に示す。このシステムは、Pythonを用いて市町村のブロックチェーンと連携し、個人および市町村単位での和の行動のデータを収集、記録し、決算書(PL、BS)として公表するものである。

基本設計のステップ

  1. 要件定義
  2. アーキテクチャ設計
  3. データベース設計
  4. API設計
  5. ブロックチェーンインターフェース
  6. 決算書の生成
  7. フロントエンド開発
  8. テストとデプロイ

基本設計の各ステップを順番に進めることで、ブロックチェーンとDAppsとして繋がる「和記システム」の詳細な設計が可能になる。各ステップでは、関係者との協議やレビューを通じて設計内容を確定していくことが重要である。

2.アーキテクチャ設計

ブロックチェーンにおいて、なぜブロックにする必要あるのか?なお、suiはブロックにせず、トランザクションのままをオブジェクトにしてDAGにて処理している。私のフェデレーションモデルにおいて、suiを真似したgossipアルゴリズムを独自設計して実装したい。一つずつやってみよう。

 

私のフェデレーションモデルにおける MongoDB のレプリカセット構成 は、市町村ごとに配置されたサーバーをクラウド上で構築・管理し、それらをレプリカセットとして機能させる形になる。そして クラウド環境の設計 も、全体のスケーラビリティや耐障害性を考慮して決めていく必要がある。

MongoDB レプリカセット + クラウド設計のポイント

1. レプリカセットの基本構成

MongoDBのレプリカセットは、プライマリ(Primary)1台 + セカンダリ(Secondary)複数台 + オプションでArbiter(投票のみのノード) という構成で分散管理される。

  • Primary(1台):
    • 読み書きを担当する主要ノード
    • 書き込みはすべてここに集約され、他のノードにレプリケートされる
  • Secondary(2台以上):
    • Primary のデータを複製し、読み取り処理を分散
    • Primary が落ちた場合、フェイルオーバー(自動切り替え)が行われる
  • Arbiter(任意):
    • データを保持せず、レプリカセット内の選挙で投票のみを行う
    • 低コストでフェイルオーバーをサポート

2. クラウド環境の選択

私のケースでは、MongoDB Atlas(マネージドサービス) をメインで使用しつつ、市町村ごとに AWS でレプリカセットを構築することを想定している。

  • MongoDB Atlas:
    • 市町村のデータ分析・バックアップ用に利用
    • グローバルクラウドサービスなので、フェデレーション全体の監視・管理が容易
  • AWS(各市町村ごとのレプリカセット):
    • 各市町村ごとに専用の MongoDB ノードを設置し、レプリカセットを構成
    • AWS VPC(仮想プライベートネットワーク)内で通信することで安全性を確保
    • AWS の EC2(仮想サーバー)+ EBS(ストレージ)を活用し、クラウドネイティブなレプリカセットを設計

3. VPC + クラウド設計

VPC(仮想プライベートクラウド)を活用して、AWS の EC2 インスタンスと MongoDB Atlas を統合。

VPC 設計例

VPC (AWS Region: Tokyo)

├── Subnet (市町村A)
│    ├── EC2-1 (MongoDB Primary)
│    ├── EC2-2 (MongoDB Secondary)
│    ├── EC2-3 (MongoDB Arbiter)

├── Subnet (市町村B)
│    ├── EC2-4 (MongoDB Primary)
│    ├── EC2-5 (MongoDB Secondary)
│    ├── EC2-6 (MongoDB Arbiter)

├── MongoDB Atlas(グローバル管理・分析サーバー)

  • 各市町村が 独立したレプリカセット を持ち、AWS上のVPCで隔離
  • MongoDB Atlas が中央データ管理・分析サーバーとして機能

MongoDB レプリカセット構築(AWS の EC2 上)

1. MongoDB のインストール(各ノード)

sudo apt update 

sudo apt install -y mongodb-org

2. MongoDB の設定(Primary & Secondary)

各サーバーの /etc/mongod.conf を編集し、レプリカセット名 を設定:

replication: 

    replSetName: "city_replica"

 

適用:

sudo systemctl restart mongod

3. レプリカセットの初期化(Primary で実行)

mongo --host <Primary-EC2-IP> --eval '
rs.initiate({
  _id: "city_replica",
  members: [
    { _id: 0, host: "<Primary-EC2-IP>:27017" },
    { _id: 1, host: "<Secondary-EC2-IP>:27017" },
    { _id: 2, host: "<Arbiter-EC2-IP>:27017", arbiterOnly: true }
  ]
})'


AWS + Atlas 統合(VPC Peering + Private Link)

MongoDB Atlas と AWS VPC の接続を確立するには、VPC Peering または Private Link を活用。

1. VPC Peering を設定

MongoDB Atlas 側で AWS VPC Peering を設定し、AWS の MongoDB インスタンスと接続。

2. AWS のセキュリティグループ設定

MongoDB のポート 27017 を開放し、VPC 内の通信を許可。

aws ec2 authorize-security-group-ingress --group-id sg-xxxxxx \
    --protocol tcp --port 27017 --cidr <VPC-CIDR>


Python / Rust 非同期処理の統合

Python 側(非同期のデータ保存)

import motor.motor_asyncio

# MongoDB クライアントの初期化
client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://primary-city-node:27017,secondary-node:27017/?replicaSet=city_replica")

async def insert_data():
    db = client.city_database
    collection = db.transactions
    result = await collection.insert_one({"user": "Alice", "amount": 100})
    print(f"Inserted ID: {result.inserted_id}")
 

Rust 側(非同期でトランザクション保存)

use mongodb::{bson::doc, options::ClientOptions, Client};
use tokio;

#[tokio::main]
async fn main() -> mongodb::error::Result<()> {
    let client_options = ClientOptions::parse("mongodb://primary-city-node:27017/?replicaSet=city_replica").await?;
    let client = Client::with_options(client_options)?;

    let db = client.database("city_database");
    let collection = db.collection("transactions");

    let doc = doc! { "user": "Alice", "amount": 100 };
    collection.insert_one(doc, None).await?;

    println!("Transaction saved!");
    Ok(())
}


まとめ

MongoDB レプリカセットを AWS 上で構築VPC を活用し、各市町村のデータを独立して管理MongoDB Atlas を統合し、全体の分析とバックアップを担当Python & Rust の非同期処理で超高速なデータ保存VPC Peering & Security Group でクラウド統合を実現

この設計なら、DAGの完全分散型保存MongoDBの市町村ごとのレプリカセット保存 を両立しつつ、クラウドでスケーラブルなシステム を構築できる.

 

 

では、マネージクラウドサービスにて、いままでの設計を組み込んでやり直してみたい。

マネージドクラウドサービスを活用した MongoDB レプリカセット & DAG の統合設計


📌 設計のポイント

  • MongoDB Atlas を活用し、クラウド上での マネージドレプリカセット を運用
  • AWS ECS(Fargate) + VPC による マイクロサービス化 で市町村ごとのデータ管理
  • DAG(分散型トランザクション保存)各ノードのストレージ(IPFS / Arweave) に保存
  • AWS PrivateLink / VPC Peering を活用し、AWS と Atlas 間を セキュアに接続
  • 非同期処理(Rust & Python) による 超高速データ保存 & 分散分析

1. 全体構成

AWS Cloud (Tokyo Region)

├── MongoDB Atlas(マネージドレプリカセット)
│    ├── Primary(分析・バックアップ)
│    ├── Secondary(読み取り分散)
│    ├── Arbiter(フェイルオーバー)

├── AWS ECS(各市町村のデータ保存 & 分析用)
│    ├── EC2 Fargate(MongoDBノード + APIサーバー)
│    ├── Lambda(トランザクション処理)
│    ├── Step Functions(ワークフロー管理)

├── AWS VPC(プライベートネットワーク)
│    ├── PrivateLink(AWS ⇄ MongoDB Atlas 直結)
│    ├── VPC Peering(AWS 内部ネットワーク)

└── DAG(ノード内分散ストレージ)
     ├── IPFS / Arweave(トランザクション保存)
     ├── Rust + Python(DAGアクセス & 署名検証)


2. AWS マネージドサービスの役割

コンポーネント 役割
MongoDB Atlas 市町村のデータ管理(レプリカセット)
AWS ECS Fargate 各市町村のAPIとMongoDBをコンテナ化
AWS Lambda 分散分析 & トランザクション処理
AWS Step Functions ワークフロー制御(DAG & DB処理)
AWS VPC + PrivateLink セキュアなデータ接続
DAG(IPFS / Arweave) 完全分散型ストレージ
Rust & Python(非同期) 高速トランザクション処理

3. MongoDB Atlas のレプリカセット設定

MongoDB Atlas のデプロイ

  1. MongoDB Atlas でクラスタ作成

    • レプリカセット(Primary + Secondary 2台 + Arbiter
    • AWS PrivateLink を有効化(Atlas ⇄ AWS 間を直結)
  2. レプリカセットの確認

    mongo "mongodb+srv://<your-cluster>.mongodb.net/admin" --username admin --password <your-password>
    rs.status()


4. AWS 側の VPC & PrivateLink 設定

1️⃣ AWS VPC(仮想プライベートネットワーク)の作成

aws ec2 create-vpc --cidr-block 10.0.0.0/16
aws ec2 create-subnet --vpc-id <VPC_ID> --cidr-block 10.0.1.0/24

2️⃣ AWS PrivateLink で Atlas へセキュア接続

aws ec2 create-vpc-endpoint --vpc-id <VPC_ID> \
  --service-name "com.amazonaws.ap-northeast-1.vpce-svc-XXXXXXX" \
  --vpc-endpoint-type "Interface" \
  --subnet-ids <SUBNET_ID> --security-group-ids <SECURITY_GROUP_ID>


5. AWS ECS(Fargate)で API & DB をマイクロサービス化

AWS ECS Fargate の設定

  1. Fargate クラスター作成

    aws ecs create-cluster --cluster-name CityChainCluster
     

  2. コンテナ定義(MongoDB + APIサーバー)

    {
      "containerDefinitions": [
        {
          "name": "citychain-api",
          "image": "your-api-image",
          "cpu": 256,
          "memory": 512,
          "networkMode": "awsvpc",
          "portMappings": [
            { "containerPort": 8000, "hostPort": 8000 }
          ]
        },
        {
          "name": "mongo",
          "image": "mongo:latest",
          "cpu": 256,
          "memory": 512,
          "portMappings": [
            { "containerPort": 27017, "hostPort": 27017 }
          ]
        }
      ]
    }
     

  3. ECS サービス起動

    aws ecs create-service --cluster CityChainCluster \
        --service-name CityChainService \
        --task-definition citychain-task \
        --desired-count 3


6. 非同期データ保存

Python (FastAPI) + MongoDB Atlas

import motor.motor_asyncio
from fastapi import FastAPI

app = FastAPI()
client = motor.motor_asyncio.AsyncIOMotorClient("mongodb+srv://<your-cluster>.mongodb.net")
db = client.city_database

@app.post("/save")
async def save_transaction(data: dict):
    result = await db.transactions.insert_one(data)
    return {"status": "ok", "id": str(result.inserted_id)}

Rust(tokio + MongoDB)

use mongodb::{bson::doc, options::ClientOptions, Client};
use tokio;

#[tokio::main]
async fn main() -> mongodb::error::Result<()> {
    let client_options = ClientOptions::parse("mongodb+srv://<your-cluster>.mongodb.net").await?;
    let client = Client::with_options(client_options)?;

    let db = client.database("city_database");
    let collection = db.collection("transactions");

    let doc = doc! { "user": "Alice", "amount": 100 };
    collection.insert_one(doc, None).await?;

    println!("Transaction saved!");
    Ok(())
}


7. DAG のトランザクション保存(IPFS / Arweave)

Python(DAG にトランザクション登録)

import ipfshttpclient

client = ipfshttpclient.connect()
data = {"sender": "Alice", "receiver": "Bob", "amount": 100}
hash = client.add_json(data)
print(f"Transaction saved in IPFS: {hash}")

Rust(Arweave に保存)

use arweave_rs::Arweave;
use tokio;

#[tokio::main]
async fn main() {
    let arweave = Arweave::default();
    let data = br#"{"sender": "Alice", "receiver": "Bob", "amount": 100}"#;
    let tx_id = arweave.send_transaction(data).await.unwrap();
    println!("Transaction stored in Arweave: {}", tx_id);
}


8. AWS Step Functions でデータフロー制御

{
  "Comment": "CityChain Transaction Processing",
  "StartAt": "SaveTransaction",
  "States": {
    "SaveTransaction": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:function:SaveTransaction",
      "Next": "DAGSave"
    },
    "DAGSave": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:function:DAGSave",
      "End": true
    }
  }
}

 


✅ まとめ

🔹 MongoDB Atlas + AWS ECS + DAG を統合し、超スケーラブルな分散保存を実現!
🔹 VPC Peering + PrivateLink により AWS ⇄ MongoDB Atlas の高速接続
🔹 非同期 Rust & Python によるトランザクション処理で超高速化
🔹 DAG(IPFS / Arweave)で完全分散保存を実現!

これで マネージドクラウドを活用したフェデレーションモデルの最適設計 が完成!

 

 

いかがであろうか、これでマネージドクラウドを活用したイメージができた。実践に移したい。