愛記システムの基本設計:DApps側である愛記システム データの一元管理 データ保存 | 続・ティール組織 研究会のブログ

続・ティール組織 研究会のブログ

ティール組織が話題になっているが、具現化するにはどうしたらよいか?
その研究を続けるにあたり、さらに次の形態である、続・ティール組織なるものまで視野に入れ、具体的な施策・行動内容を研究・支援する会。

先までは、"愛記"についての記載で、どのようにブロックチェーンSNSに組み込んで実装していけばよいのか、概念的なところからアプローチ方法を記載していった。概念設計としてはひとまず終えた。次は、フェデレーションモデル全体の基本設計といえるところまで、基本設計書に着手できるようなところまで、概念を具体化していきたい。そして、それにつながるDApps側である「愛記システム」を、Pythonプログラムで開発していきたい。

 

愛の行動のPL,BSを決算書として、個人単位、市町村単位、で公表するような愛記システムというものを考えている。愛の行動のデータベースはブロックチェーンのプログラムであり、日々の愛の行動による愛貨の移動を決算書にまとめていきたい。なお、市町村のブロックチェーンのプログラムは以前にも記載している。その市町村のブロックチェーンのプログラムにつながる愛記システムを、DApps側であるPythonプログラムとして設計したい。その場合、基本設計をどのような手順で進めていけばよいか、詳しく見ていこう。

 

愛記システムを設計するための基本手順を以下に示す。このシステムは、Pythonを用いて市町村のブロックチェーンと連携し、個人および市町村単位での愛の行動のデータを収集、記録し、決算書(PL、BS)として公表するものである。

基本設計のステップ

  1. 要件定義
  2. アーキテクチャ設計
  3. データベース設計
  4. API設計
  5. ブロックチェーンインターフェース
  6. 決算書の生成
  7. フロントエンド開発
  8. テストとデプロイ

基本設計の各ステップを順番に進めることで、ブロックチェーンとDAppsとして繋がる「愛記システム」の詳細な設計が可能になる。各ステップでは、関係者との協議やレビューを通じて設計内容を確定していくことが重要である。

1.要件定義

まず、基本設計の最初のステップである要件定義をしていきたい。どのような機能が必要か、どのような問題を解決するのかを洗い出したい。要件定義はシステム設計の最初の重要なステップであり、システムが解決するべき問題と、必要な機能を明確に定義するプロセスである。以下に、愛記システムのプログラムに必要な機能と解決すべき問題を列挙してみよう。

機能要件

  1. 愛の行動の記録

  2. 愛貨の移動の記録

  3. 決算書の生成

  4. 個人および市町村単位でのデータの集約

  5. データのブロックチェーンへの記録と取得

  6. 愛貨の管理

  7. ユーザー管理

  8. 通知機能

  9. レポート機能

  10. ダッシュボード

非機能要件

  1. セキュリティ

  2. 可用性

  3. パフォーマンス

  4. スケーラビリティ

  5. ユーザビリティ

  6. コンプライアンス

解決すべき問題

  1. 透明性と信頼性の確保

  2. データの一元管理

  3. 愛の行動の促進

  4. 評価制度の確立

  5. データのセキュリティとプライバシーの保護

これらの要件を基に、愛記システムの基本設計を進めていくことが重要である。次のステップでは、これらの要件を具体的なアーキテクチャ設計に反映していくことになる。まずは、要件定義の解決すべき問題を一つずつクリアにしていきたい。

データの一元管理

愛記システムにおけるデータの一元管理は、データの収集、統合、保存、アクセス制御、およびバックアップを適切に行うことが重要である。以下に、これを実現するための設計を示す。

1. データ収集

・タイミング:

  • ユーザーが愛の行動を記録したとき

  • 愛貨の移動が発生したとき

  • 定期的な集計およびレポート生成時

・データの種類:

  • 愛の行動データ(行動の内容、日時、場所、関与者)

  • 愛貨の移動データ(送信者、受信者、金額、移動日時)

  • 決算データ(収支、資産、負債)

  • ユーザーデータ(ユーザーID、プロフィール、認証情報)

  • 市町村データ(市町村ID、名前、地理情報)

・方法:

  • 各種データを専用のAPIエンドポイントを通じて収集

  • フォーム入力、スキャン、GPSデータなどのユーザーインターフェースからの入力

2. データ統合

・タイミング:

  • リアルタイムでのデータ入力時

  • 定期的なバッチ処理(例: 毎日夜間)

・データの種類:

  • 各データソースから取得した生データを統合データベースに変換

・方法:

  • ETL(Extract, Transform, Load)プロセスを使用して、異なるフォーマットのデータを統合

  • リアルタイムデータのストリーム処理(例: Kafkaなどのデータストリーミングプラットフォーム)

3. データ保存

・タイミング:

  • データ収集時

  • データ統合後

・データの種類:

  • 統合データベース(例: MongoDB、PostgreSQL)

  • ブロックチェーンデータ

・方法:

  • 各データベースに適切にインデックスを設定し、効率的なデータ格納を実現

  • ブロックチェーンに重要データをハッシュ化して記録、ハッシュのみをオンチェーンに保存し、詳細データはオフチェーンに保存

4. アクセス制御

・タイミング:

  • データアクセスリクエスト時

・データの種類:

  • ユーザー認証情報

  • アクセス制御リスト(ACL)

・方法:

  • JWT(JSON Web Tokens)を使用してユーザー認証を行い、各APIリクエストの認証と認可を実施

  • ユーザーごとにアクセス権限を設定し、アクセス制御リストで管理

  • RBAC(Role-Based Access Control)を導入し、役割ごとにアクセス権限を定義

5. データのバックアップとリカバリ

・タイミング:

  • 定期的なバックアップ(例: 毎日、毎週)

  • システム障害発生時

・データの種類:

  • 全データベースのスナップショット

  • ブロックチェーンデータ

・方法:

  • 定期的なバックアップスケジュールを設定し、バックアップをクラウドストレージやオフサイトストレージに保存

  • 自動バックアップと手動バックアップの両方をサポート

  • データリカバリ計画を策定し、定期的にリカバリテストを実施
     

データ保存の設計

1. データ収集時の保存

タイミング:

  • ユーザーが愛の行動を記録したとき、または愛貨の移動が発生したときにデータを即座に保存する。

データの種類:

  • 愛の行動データ: 行動の内容、日時、場所、関与者
  • 愛貨の移動データ: 送信者、受信者、金額、移動日時
  • ユーザーデータ: ユーザーID、プロフィール、認証情報
  • 市町村データ: 市町村ID、名前、地理情報

方法:

  • MongoDBを使用して、各種データを保存。データ収集時にKafkaを使用してリアルタイムデータのストリーミング処理を行うことで、データが即座に処理されることが可能になる。以下に、Kafkaを使用したリアルタイムデータのストリーミング処理を追加した設計を示す。
    ・Kafkaプロデューサー
    from kafka import KafkaProducer
    import json
    from datetime import datetime

    producer = KafkaProducer(
        bootstrap_servers='localhost:9092',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )

    def send_to_kafka(topic, data):
        producer.send(topic, data)
        producer.flush()

    def save_love_action_kafka(action_content, location, participants):
        action_data = {
            "action_content": action_content,
            "timestamp": datetime.utcnow().isoformat(),
            "location": location,
            "participants": participants
        }
        send_to_kafka('love_actions', action_data)

    def save_currency_movement_kafka(sender, receiver, amount):
        movement_data = {
            "sender": sender,
            "receiver": receiver,
            "amount": amount,
            "timestamp": datetime.utcnow().isoformat()
        }
        send_to_kafka('currency_movements', movement_data)

    def save_user_kafka(user_id, profile, auth_info):
        user_data = {
            "user_id": user_id,
            "profile": profile,
            "auth_info": auth_info
        }
        send_to_kafka('users', user_data)

    def save_municipality_kafka(municipality_id, name, geo_info):
        municipality_data = {
            "municipality_id": municipality_id,
            "name": name,
            "geo_info": geo_info
        }
        send_to_kafka('municipalities', municipality_data)

    # 使用例
    save_love_action_kafka("Helped neighbor", "35.6895, 139.6917", ["user1", "user2"])
    save_currency_movement_kafka("user1", "user2", 50)
    save_user_kafka("user1", {"name": "Alice"}, {"password_hash": "hashed_password"})
    save_municipality_kafka("city123", "Sample City", {"latitude": 35.6895, "longitude": 139.6917})


    ・Kafkaコンシューマー
    from kafka import KafkaConsumer
    from pymongo import MongoClient
    import json

    # MongoDB接続
    client = MongoClient('mongodb://localhost:27017/')
    db = client.love_currency_db

    # コレクションの定義
    actions_collection = db.love_actions
    movements_collection = db.currency_movements
    users_collection = db.users
    municipalities_collection = db.municipalities

    # Kafkaコンシューマーの設定
    consumer = KafkaConsumer(
        'love_actions',
        'currency_movements',
        'users',
        'municipalities',
        bootstrap_servers='localhost:9092',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )

    def save_to_mongo(collection, data):
        collection.insert_one(data)

    # コンシューマーの処理
    for message in consumer:
        topic = message.topic
        data = message.value

        if topic == 'love_actions':
            save_to_mongo(actions_collection, data)
        elif topic == 'currency_movements':
            save_to_mongo(movements_collection, data)
        elif topic == 'users':
            save_to_mongo(users_collection, data)
        elif topic == 'municipalities':
            save_to_mongo(municipalities_collection, data)

    Kafkaプロデューサーの役割
    Kafkaプロデューサーは、ユーザーが愛の行動を記録したり、愛貨の移動が発生したりしたときにデータをKafkaトピックに送信する。各データタイプ(愛の行動データ、愛貨の移動データ、ユーザーデータ、市町村データ)に対応するトピックにデータを送信する。

    Kafkaコンシューマーの役割
    Kafkaコンシューマーは、各トピックからデータを受信し、それをMongoDBに保存する。これにより、リアルタイムでデータが収集・保存される。
     

2. データ統合後の保存

○タイミング:

  1. リアルタイム処理:
    • データ収集時にリアルタイムでMongoDBに保存し、即座にブロックチェーンにハッシュを保存する。
  2. 定期的なバッチ処理:
    • 毎日夜間にバッチ処理を実行し、MongoDBからデータを抽出・変換・ロード(ETL)してPostgreSQLに保存し、データのハッシュをブロックチェーンに保存する。

○データの種類:

  1. 統合データベース(例: PostgreSQL):
    • 愛の行動データ(行動の内容、日時、場所、関与者)
    • 愛貨の移動データ(送信者、受信者、金額、移動日時)
    • 決算データ(収支、資産、負債)
    • ユーザーデータ(ユーザーID、プロフィール、認証情報)
    • 市町村データ(市町村ID、名前、地理情報)
  2. ブロックチェーンデータ:
    • 重要なデータをハッシュ化してブロックチェーンに記録し、ハッシュのみをオンチェーンに保存し、詳細データはオフチェーンに保存。

○方法:

  1. リアルタイム処理の流れ:

    • ユーザーが愛の行動を記録したとき、愛貨の移動が発生したときにデータを収集。
    • MongoDBにデータを保存。
    • データをハッシュ化してブロックチェーンに記録。
  2. 定期的なバッチ処理の流れ:

    • 毎日夜間にMongoDBからデータを抽出。
    • データを必要なフォーマットに変換。
    • PostgreSQLにデータを保存。
    • データのハッシュを計算してブロックチェーンに保存。

○プログラム例:

以下の例では、リアルタイム処理でMongoDBにデータを保存し、そのハッシュをブロックチェーンに記録する部分と、バッチ処理でMongoDBからデータを抽出し、PostgreSQLに保存し、そのハッシュをブロックチェーンに記録する部分を示す。
 

・MongoDBとブロックチェーンのリアルタイム処理
from pymongo import MongoClient
from datetime import datetime
import hashlib
from web3 import Web3

# MongoDB接続
client = MongoClient('mongodb://localhost:27017/')
db = client.love_currency_db
actions_collection = db.love_actions
movements_collection = db.currency_movements
users_collection = db.users
municipalities_collection = db.municipalities

# Web3接続
w3 = Web3(Web3.HTTPProvider("http://127.0.0.1:8545"))

# ブロックチェーンのコントラクトインターフェース
contract_address = 'your_contract_address'
contract_abi = 'your_contract_abi'
contract = w3.eth.contract(address=contract_address, abi=contract_abi)

def hash_data(data):
    data_string = str(data).encode()
    return hashlib.sha256(data_string).hexdigest()

def record_hash_on_chain(data_hash):
    tx_hash = contract.functions.recordHash(data_hash).transact({'from': w3.eth.accounts[0]})
    receipt = w3.eth.waitForTransactionReceipt(tx_hash)
    return receipt

def save_data_and_hash(collection, data):
    collection.insert_one(data)
    data_hash = hash_data(data)
    receipt = record_hash_on_chain(data_hash)
    print(f"Data hash {data_hash} recorded on blockchain. Transaction receipt: {receipt}")

def save_love_action(action_content, location, participants):
    action_data = {
        "action_content": action_content,
        "timestamp": datetime.utcnow(),
        "location": location,
        "participants": participants
    }
    save_data_and_hash(actions_collection, action_data)

def save_currency_movement(sender, receiver, amount):
    movement_data = {
        "sender": sender,
        "receiver": receiver,
        "amount": amount,
        "timestamp": datetime.utcnow()
    }
    save_data_and_hash(movements_collection, movement_data)

# 使用例
save_love_action("Helped neighbor", "35.6895, 139.6917", ["user1", "user2"])
save_currency_movement("user1", "user2", 50)

 

・PostgreSQLへのバッチ処理
import psycopg2
from hashlib import sha256
from datetime import datetime

# PostgreSQL接続
conn = psycopg2.connect(
    dbname='love_currency_db',
    user='your_user',
    password='your_password',
    host='localhost'
)
cur = conn.cursor()

# テーブルの作成
cur.execute('''
    CREATE TABLE IF NOT EXISTS love_actions (
        action_id SERIAL PRIMARY KEY,
        action_content TEXT,
        timestamp TIMESTAMP,
        location TEXT,
        participants TEXT[]
    );
''')

cur.execute('''
    CREATE TABLE IF NOT EXISTS currency_movements (
        movement_id SERIAL PRIMARY KEY,
        sender TEXT,
        receiver TEXT,
        amount NUMERIC,
        timestamp TIMESTAMP
    );
''')

cur.execute('''
    CREATE TABLE IF NOT EXISTS users (
        user_id TEXT PRIMARY KEY,
        profile JSON,
        auth_info JSON
    );
''')

cur.execute('''
    CREATE TABLE IF NOT EXISTS municipalities (
        municipality_id TEXT PRIMARY KEY,
        name TEXT,
        geo_info JSON
    );
''')

def save_to_postgresql(table, data):
    if table == 'love_actions':
        cur.execute('''
            INSERT INTO love_actions (action_content, timestamp, location, participants)
            VALUES (%s, %s, %s, %s)
        ''', (data['action_content'], data['timestamp'], data['location'], data['participants']))
    elif table == 'currency_movements':
        cur.execute('''
            INSERT INTO currency_movements (sender, receiver, amount, timestamp)
            VALUES (%s, %s, %s, %s)
        ''', (data['sender'], data['receiver'], data['amount'], data['timestamp']))
    elif table == 'users':
        cur.execute('''
            INSERT INTO users (user_id, profile, auth_info)
            VALUES (%s, %s, %s)
        ''', (data['user_id'], data['profile'], data['auth_info']))
    elif table == 'municipalities':
        cur.execute('''
            INSERT INTO municipalities (municipality_id, name, geo_info)
            VALUES (%s, %s, %s)
        ''', (data['municipality_id'], data['name'], data['geo_info']))
    conn.commit()

def hash_and_save_to_blockchain(data):
    data_hash = sha256(str(data).encode()).hexdigest()
    print(f"Data hash {data_hash} saved to blockchain.")
    # ここでブロックチェーンにデータハッシュを保存する処理を行う(例:スマートコントラクトを呼び出す)

def batch_process():
    # MongoDBからデータを取得
    love_actions = actions_collection.find()
    currency_movements = movements_collection.find()
    users = users_collection.find()
    municipalities = municipalities_collection.find()

    # PostgreSQLにデータを保存し、ブロックチェーンにハッシュを記録
    for action in love_actions:
        save_to_postgresql('love_actions', action)
        hash_and_save_to_blockchain(action)
    
    for movement in currency_movements:
        save_to_postgresql('currency_movements', movement)
        hash_and_save_to_blockchain(movement)

    for user in users:
        save_to_postgresql('users', user)
        hash_and_save_to_blockchain(user)

    for municipality in municipalities:
        save_to_postgresql('municipalities', municipality)
        hash_and_save_to_blockchain(municipality)

# バッチ処理の実行例(定期的に実行するスケジュールを設定)
batch_process()
 

・解説

データ保存のタイミング:

  1. リアルタイム処理:

    • ユーザーがアクションを記録した時点や、愛貨の移動が発生した時点で即座にデータをMongoDBに保存する。
    • これにより、リアルタイムでのデータ収集が可能になる。
  2. 定期バッチ処理:

    • 毎日夜間などのタイミングで、データベースからデータを抽出し、統合データベースに保存する。
    • このプロセスでは、必要に応じてデータの変換・集計を行い、最適なフォーマットでデータを保存する。

データの種類:

  • 統合データベース:

    • MongoDB: 各種データ(愛の行動、愛貨の移動、ユーザー、市町村)の保存に使用される。
    • PostgreSQL: 定期的なバッチ処理で集計・分析されたデータの保存に使用される。
  • ブロックチェーンデータ:

    • 重要なデータはハッシュ化されてブロックチェーンに記録される。詳細データはオフチェーンに保存される。

方法:

  • リアルタイム処理:

    • Kafkaを使用して、リアルタイムデータのストリーミング処理を行う。
    • Kafkaプロデューサーがデータを送信し、Kafkaコンシューマーがデータを受信してMongoDBに保存する。
  • バッチ処理:

    • 定期的にMongoDBからデータを抽出し、必要な変換を行った後、PostgreSQLに保存する。
    • ハッシュ化された重要データはブロックチェーンに記録される。

 

いかがであろうか、今回はデータ一元管理におけるデータ保存について記載した。データ収集と保存を即座に行う処理はとても利便性が高い。このように設計していくと効率的だろう。